OSDN Git Service

eaf3a6f832f2b48c12bc4639b299dacb22f059e1
[opentween/open-tween.git] / OpenTween / Api / TwitterStreamObservable.cs
1 // OpenTween - Client of Twitter
2 // Copyright (c) 2018 kim_upsilon (@kim_upsilon) <https://upsilo.net/~upsilon/>
3 // All rights reserved.
4 //
5 // This file is part of OpenTween.
6 //
7 // This program is free software; you can redistribute it and/or modify it
8 // under the terms of the GNU General Public License as published by the Free
9 // Software Foundation; either version 3 of the License, or (at your option)
10 // any later version.
11 //
12 // This program is distributed in the hope that it will be useful, but
13 // WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 // or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
15 // for more details.
16 //
17 // You should have received a copy of the GNU General Public License along
18 // with this program. If not, see <http://www.gnu.org/licenses/>, or write to
19 // the Free Software Foundation, Inc., 51 Franklin Street - Fifth Floor,
20 // Boston, MA 02110-1301, USA.
21
22 using OpenTween.Api.DataModel;
23 using System;
24 using System.IO;
25 using System.Linq;
26 using System.Runtime.Serialization;
27 using System.Runtime.Serialization.Json;
28 using System.Text;
29 using System.Threading;
30 using System.Threading.Tasks;
31 using System.Xml;
32 using System.Xml.Linq;
33
34 namespace OpenTween.Api
35 {
36     public class TwitterStreamObservable : IObservable<ITwitterStreamMessage>
37     {
38         private readonly Func<Task<Stream>> streamOpener;
39
40         public TwitterStreamObservable(Func<Task<Stream>> streamOpener)
41             => this.streamOpener = streamOpener;
42
43         public IDisposable Subscribe(IObserver<ITwitterStreamMessage> observer)
44         {
45             var cts = new CancellationTokenSource();
46
47             this.StreamLoop(observer, cts.Token);
48
49             return new Unsubscriber(cts);
50         }
51
52         private async void StreamLoop(IObserver<ITwitterStreamMessage> observer, CancellationToken cancellationToken)
53         {
54             try
55             {
56                 using var stream = await this.streamOpener().ConfigureAwait(false);
57                 using var reader = new StreamReader(stream);
58
59                 while (!reader.EndOfStream)
60                 {
61                     cancellationToken.ThrowIfCancellationRequested();
62
63                     var line = await reader.ReadLineAsync()
64                         .ConfigureAwait(false);
65
66                     var message = ParseLine(line);
67
68                     observer.OnNext(message);
69                 }
70                 observer.OnCompleted();
71             }
72             catch (Exception ex)
73             {
74                 observer.OnError(ex);
75             }
76         }
77
78         public static ITwitterStreamMessage ParseLine(string line)
79         {
80             if (string.IsNullOrEmpty(line))
81                 return new StreamMessageKeepAlive();
82
83             if (line.First() != '{' || line.Last() != '}')
84             {
85                 MyCommon.TraceOut("Invalid JSON (ParseLine):" + Environment.NewLine + line);
86                 return new StreamMessageUnknown(line);
87             }
88
89             try
90             {
91                 var bytes = Encoding.UTF8.GetBytes(line);
92                 using var jsonReader = JsonReaderWriterFactory.CreateJsonReader(bytes, XmlDictionaryReaderQuotas.Max);
93                 var xElm = XElement.Load(jsonReader);
94
95                 if (xElm.Element("text") != null)
96                     return StreamMessageStatus.ParseJson(line);
97
98                 if (xElm.Element("delete") != null)
99                     return StreamMessageDelete.ParseJson(line);
100
101                 if (xElm.Element("event") != null)
102                     return StreamMessageEvent.ParseJson(line);
103
104                 if (xElm.Element("direct_message") != null)
105                     return StreamMessageDirectMessage.ParseJson(line);
106
107                 if (xElm.Element("scrub_geo") != null)
108                     return StreamMessageScrubGeo.ParseJson(line);
109
110                 return new StreamMessageUnknown(line);
111             }
112             catch (XmlException)
113             {
114                 MyCommon.TraceOut("XmlException (ParseLine): " + line);
115                 return new StreamMessageUnknown(line);
116             }
117             catch (SerializationException)
118             {
119                 MyCommon.TraceOut("SerializationException (ParseLine): " + line);
120                 return new StreamMessageUnknown(line);
121             }
122         }
123
124         private sealed class Unsubscriber : IDisposable
125         {
126             private readonly CancellationTokenSource cts;
127
128             public Unsubscriber(CancellationTokenSource cts)
129                 => this.cts = cts;
130
131             public void Dispose()
132                 => this.cts.Cancel();
133         }
134     }
135 }