1 // OpenTween - Client of Twitter
2 // Copyright (c) 2018 kim_upsilon (@kim_upsilon) <https://upsilo.net/~upsilon/>
3 // All rights reserved.
5 // This file is part of OpenTween.
7 // This program is free software; you can redistribute it and/or modify it
8 // under the terms of the GNU General Public License as published by the Free
9 // Software Foundation; either version 3 of the License, or (at your option)
12 // This program is distributed in the hope that it will be useful, but
13 // WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 // or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
17 // You should have received a copy of the GNU General Public License along
18 // with this program. If not, see <http://www.gnu.org/licenses/>, or write to
19 // the Free Software Foundation, Inc., 51 Franklin Street - Fifth Floor,
20 // Boston, MA 02110-1301, USA.
22 using OpenTween.Api.DataModel;
26 using System.Runtime.Serialization;
27 using System.Runtime.Serialization.Json;
29 using System.Threading;
30 using System.Threading.Tasks;
32 using System.Xml.Linq;
34 namespace OpenTween.Api
36 public class TwitterStreamObservable : IObservable<ITwitterStreamMessage>
38 private readonly Func<Task<Stream>> streamOpener;
40 public TwitterStreamObservable(Func<Task<Stream>> streamOpener)
41 => this.streamOpener = streamOpener;
43 public IDisposable Subscribe(IObserver<ITwitterStreamMessage> observer)
45 var cts = new CancellationTokenSource();
47 this.StreamLoop(observer, cts.Token);
49 return new Unsubscriber(cts);
52 private async void StreamLoop(IObserver<ITwitterStreamMessage> observer, CancellationToken cancellationToken)
56 using var stream = await this.streamOpener().ConfigureAwait(false);
57 using var reader = new StreamReader(stream);
59 while (!reader.EndOfStream)
61 cancellationToken.ThrowIfCancellationRequested();
63 var line = await reader.ReadLineAsync()
64 .ConfigureAwait(false);
66 var message = ParseLine(line);
68 observer.OnNext(message);
70 observer.OnCompleted();
78 public static ITwitterStreamMessage ParseLine(string line)
80 if (string.IsNullOrEmpty(line))
81 return new StreamMessageKeepAlive();
83 if (line.First() != '{' || line.Last() != '}')
85 MyCommon.TraceOut("Invalid JSON (ParseLine):" + Environment.NewLine + line);
86 return new StreamMessageUnknown(line);
91 var bytes = Encoding.UTF8.GetBytes(line);
92 using var jsonReader = JsonReaderWriterFactory.CreateJsonReader(bytes, XmlDictionaryReaderQuotas.Max);
93 var xElm = XElement.Load(jsonReader);
95 if (xElm.Element("text") != null)
96 return StreamMessageStatus.ParseJson(line);
98 if (xElm.Element("delete") != null)
99 return StreamMessageDelete.ParseJson(line);
101 if (xElm.Element("event") != null)
102 return StreamMessageEvent.ParseJson(line);
104 if (xElm.Element("direct_message") != null)
105 return StreamMessageDirectMessage.ParseJson(line);
107 if (xElm.Element("scrub_geo") != null)
108 return StreamMessageScrubGeo.ParseJson(line);
110 return new StreamMessageUnknown(line);
114 MyCommon.TraceOut("XmlException (ParseLine): " + line);
115 return new StreamMessageUnknown(line);
117 catch (SerializationException)
119 MyCommon.TraceOut("SerializationException (ParseLine): " + line);
120 return new StreamMessageUnknown(line);
124 private sealed class Unsubscriber : IDisposable
126 private readonly CancellationTokenSource cts;
128 public Unsubscriber(CancellationTokenSource cts)
131 public void Dispose()
132 => this.cts.Cancel();