--- /dev/null
+// OpenTween - Client of Twitter
+// Copyright (c) 2018 kim_upsilon (@kim_upsilon) <https://upsilo.net/~upsilon/>
+// All rights reserved.
+//
+// This file is part of OpenTween.
+//
+// This program is free software; you can redistribute it and/or modify it
+// under the terms of the GNU General Public License as published by the Free
+// Software Foundation; either version 3 of the License, or (at your option)
+// any later version.
+//
+// This program is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+// or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+// for more details.
+//
+// You should have received a copy of the GNU General Public License along
+// with this program. If not, see <http://www.gnu.org/licenses/>, or write to
+// the Free Software Foundation, Inc., 51 Franklin Street - Fifth Floor,
+// Boston, MA 02110-1301, USA.
+
+using System.Runtime.Serialization;
+
+namespace OpenTween.Api.DataModel
+{
+ public interface ITwitterStreamMessage
+ {
+ }
+
+ public class StreamMessageStatus : ITwitterStreamMessage
+ {
+ public TwitterStatusCompat Status { get; }
+
+ public StreamMessageStatus(TwitterStatusCompat status)
+ => this.Status = status;
+
+ public static StreamMessageStatus ParseJson(string json)
+ => new StreamMessageStatus(TwitterStatusCompat.ParseJson(json));
+ }
+
+ public class StreamMessageEvent : ITwitterStreamMessage
+ {
+ public TwitterStreamEvent Event { get; }
+ public string Json { get; }
+
+ public StreamMessageEvent(TwitterStreamEvent eventData, string json)
+ {
+ this.Event = eventData;
+ this.Json = json;
+ }
+
+ public TwitterStreamEvent<T> ParseTargetObjectAs<T>()
+ => TwitterStreamEvent<T>.ParseJson(this.Json);
+
+ public static StreamMessageEvent ParseJson(string json)
+ => new StreamMessageEvent(TwitterStreamEvent.ParseJson(json), json);
+ }
+
+ [DataContract]
+ public class StreamMessageDirectMessage : ITwitterStreamMessage
+ {
+ [DataMember(Name = "direct_message")]
+ public TwitterDirectMessage DirectMessage { get; set; }
+
+ public static StreamMessageDirectMessage ParseJson(string json)
+ => MyCommon.CreateDataFromJson<StreamMessageDirectMessage>(json);
+ }
+
+ [DataContract]
+ public class StreamMessageDelete : ITwitterStreamMessage
+ {
+ [DataContract]
+ public class DeletedId
+ {
+ [DataMember(Name = "id")]
+ public long Id { get; set; }
+ }
+
+ [DataMember(Name = "direct_message", IsRequired = false)]
+ public DeletedId DirectMessage { get; set; } // Nullable
+
+ [DataMember(Name = "status", IsRequired = false)]
+ public DeletedId Status { get; set; } // Nullable
+
+ public static StreamMessageDelete ParseJson(string json)
+ => MyCommon.CreateDataFromJson<StreamMessageDelete>(json);
+ }
+
+ [DataContract]
+ public class StreamMessageScrubGeo : ITwitterStreamMessage
+ {
+ [DataMember(Name = "user_id")]
+ public long UserId { get; set; }
+
+ [DataMember(Name = "up_to_status_id")]
+ public long UpToStatusId { get; set; }
+
+ public static StreamMessageScrubGeo ParseJson(string json)
+ => MyCommon.CreateDataFromJson<StreamMessageScrubGeo>(json);
+ }
+
+ public class StreamMessageKeepAlive : ITwitterStreamMessage
+ {
+ }
+
+ public class StreamMessageUnknown : ITwitterStreamMessage
+ {
+ public string Json { get; }
+
+ public StreamMessageUnknown(string json)
+ => this.Json = json;
+ }
+}
// the Free Software Foundation, Inc., 51 Franklin Street - Fifth Floor,
// Boston, MA 02110-1301, USA.
+using OpenTween.Api.DataModel;
using System;
using System.IO;
+using System.Linq;
+using System.Runtime.Serialization;
+using System.Runtime.Serialization.Json;
+using System.Text;
using System.Threading;
using System.Threading.Tasks;
+using System.Xml;
+using System.Xml.Linq;
namespace OpenTween.Api
{
- public class TwitterStreamObservable : IObservable<string>
+ public class TwitterStreamObservable : IObservable<ITwitterStreamMessage>
{
private readonly Func<Task<Stream>> streamOpener;
public TwitterStreamObservable(Func<Task<Stream>> streamOpener)
=> this.streamOpener = streamOpener;
- public IDisposable Subscribe(IObserver<string> observer)
+ public IDisposable Subscribe(IObserver<ITwitterStreamMessage> observer)
{
var cts = new CancellationTokenSource();
return new Unsubscriber(cts);
}
- private async void StreamLoop(IObserver<string> observer, CancellationToken cancellationToken)
+ private async void StreamLoop(IObserver<ITwitterStreamMessage> observer, CancellationToken cancellationToken)
{
try
{
var line = await reader.ReadLineAsync()
.ConfigureAwait(false);
- observer.OnNext(line);
+ var message = ParseLine(line);
+
+ observer.OnNext(message);
}
observer.OnCompleted();
}
}
}
+ public static ITwitterStreamMessage ParseLine(string line)
+ {
+ if (string.IsNullOrEmpty(line))
+ return new StreamMessageKeepAlive();
+
+ if (line.First() != '{' || line.Last() != '}')
+ {
+ MyCommon.TraceOut("Invalid JSON (ParseLine):" + Environment.NewLine + line);
+ return new StreamMessageUnknown(line);
+ }
+
+ try
+ {
+ var bytes = Encoding.UTF8.GetBytes(line);
+ using (var jsonReader = JsonReaderWriterFactory.CreateJsonReader(bytes, XmlDictionaryReaderQuotas.Max))
+ {
+ var xElm = XElement.Load(jsonReader);
+
+ if (xElm.Element("text") != null)
+ return StreamMessageStatus.ParseJson(line);
+
+ if (xElm.Element("delete") != null)
+ return StreamMessageDelete.ParseJson(line);
+
+ if (xElm.Element("event") != null)
+ return StreamMessageEvent.ParseJson(line);
+
+ if (xElm.Element("direct_message") != null)
+ return StreamMessageDirectMessage.ParseJson(line);
+
+ if (xElm.Element("scrub_geo") != null)
+ return StreamMessageScrubGeo.ParseJson(line);
+
+ return new StreamMessageUnknown(line);
+ }
+ }
+ catch (XmlException)
+ {
+ MyCommon.TraceOut("XmlException (ParseLine): " + line);
+ return new StreamMessageUnknown(line);
+ }
+ catch (SerializationException)
+ {
+ MyCommon.TraceOut("SerializationException (ParseLine): " + line);
+ return new StreamMessageUnknown(line);
+ }
+ }
+
private sealed class Unsubscriber : IDisposable
{
private readonly CancellationTokenSource cts;
using System.Net;
using System.Net.Http;
using System.Runtime.CompilerServices;
-using System.Runtime.Serialization;
-using System.Runtime.Serialization.Json;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
-using System.Web;
-using System.Xml;
-using System.Xml.Linq;
-using System.Xml.XPath;
using System;
using System.Reflection;
using System.Collections.Generic;
public bool IsUserstreamDataReceived
=> (DateTimeUtc.Now - this._lastUserstreamDataReceived).TotalSeconds < 31;
- private void userStream_StatusArrived(string line)
+ private void userStream_StatusArrived(ITwitterStreamMessage message)
{
this._lastUserstreamDataReceived = DateTimeUtc.Now;
- if (string.IsNullOrEmpty(line)) return;
- if (line.First() != '{' || line.Last() != '}')
+ switch (message)
{
- MyCommon.TraceOut("Invalid JSON (StatusArrived):" + Environment.NewLine + line);
- return;
- }
-
- var isDm = false;
-
- try
- {
- using (var jsonReader = JsonReaderWriterFactory.CreateJsonReader(Encoding.UTF8.GetBytes(line), XmlDictionaryReaderQuotas.Max))
- {
- var xElm = XElement.Load(jsonReader);
- if (xElm.Element("friends") != null)
- {
- Debug.WriteLine("friends");
- return;
- }
- else if (xElm.Element("delete") != null)
- {
- Debug.WriteLine("delete");
- Int64 id;
- XElement idElm;
- if ((idElm = xElm.Element("delete").Element("direct_message")?.Element("id")) != null)
- {
- id = 0;
- long.TryParse(idElm.Value, out id);
+ case StreamMessageStatus statusMessage:
+ var status = statusMessage.Status.Normalize();
- this.PostDeleted?.Invoke(this, new PostDeletedEventArgs(id));
- }
- else if ((idElm = xElm.Element("delete").Element("status")?.Element("id")) != null)
- {
- id = 0;
- long.TryParse(idElm.Value, out id);
-
- this.PostDeleted?.Invoke(this, new PostDeletedEventArgs(id));
- }
- else
- {
- MyCommon.TraceOut("delete:" + line);
- return;
- }
- for (int i = this.StoredEvent.Count - 1; i >= 0; i--)
- {
- var sEvt = this.StoredEvent[i];
- if (sEvt.Id == id && (sEvt.Event == "favorite" || sEvt.Event == "unfavorite"))
- {
- this.StoredEvent.RemoveAt(i);
- }
- }
- return;
- }
- else if (xElm.Element("limit") != null)
+ if (status.RetweetedStatus is TwitterStatus retweetedStatus)
{
- Debug.WriteLine(line);
- return;
- }
- else if (xElm.Element("event") != null)
- {
- Debug.WriteLine("event: " + xElm.Element("event").Value);
- CreateEventFromJson(line);
- return;
- }
- else if (xElm.Element("direct_message") != null)
- {
- Debug.WriteLine("direct_message");
- isDm = true;
- }
- else if (xElm.Element("retweeted_status") != null)
- {
- var sourceUserId = xElm.XPathSelectElement("/user/id_str").Value;
- var targetUserId = xElm.XPathSelectElement("/retweeted_status/user/id_str").Value;
+ var sourceUserId = statusMessage.Status.User.Id;
+ var targetUserId = retweetedStatus.User.Id;
// 自分に関係しないリツイートの場合は無視する
- var selfUserId = this.UserId.ToString();
+ var selfUserId = this.UserId;
if (sourceUserId == selfUserId || targetUserId == selfUserId)
{
// 公式 RT をイベントとしても扱う
- var evt = CreateEventFromRetweet(xElm);
- if (evt != null)
- {
- this.StoredEvent.Insert(0, evt);
-
- this.UserStreamEventReceived?.Invoke(this, new UserStreamEventReceivedEventArgs(evt));
- }
+ var evt = this.CreateEventFromRetweet(status);
+ this.StoredEvent.Insert(0, evt);
+ this.UserStreamEventReceived?.Invoke(this, new UserStreamEventReceivedEventArgs(evt));
}
-
- // 従来通り公式 RT の表示も行うため return しない
+ // 従来通り公式 RT の表示も行うため break しない
}
- else if (xElm.Element("scrub_geo") != null)
+
+ this.CreatePostsFromJson(new[] { status }, MyCommon.WORKERTYPE.UserStream, null, false);
+ this.NewPostFromStream?.Invoke(this, EventArgs.Empty);
+ break;
+
+ case StreamMessageDirectMessage dmMessage:
+ this.CreateDirectMessagesFromJson(new[] { dmMessage.DirectMessage }, MyCommon.WORKERTYPE.UserStream, false);
+ this.NewPostFromStream?.Invoke(this, EventArgs.Empty);
+ break;
+
+ case StreamMessageDelete deleteMessage:
+ var deletedId = deleteMessage.Status?.Id ?? deleteMessage.DirectMessage?.Id;
+ if (deletedId == null)
+ break;
+
+ this.PostDeleted?.Invoke(this, new PostDeletedEventArgs(deletedId.Value));
+
+ foreach (var index in MyCommon.CountDown(this.StoredEvent.Count - 1, 0))
{
- try
- {
- TabInformations.GetInstance().ScrubGeoReserve(long.Parse(xElm.Element("scrub_geo").Element("user_id").Value),
- long.Parse(xElm.Element("scrub_geo").Element("up_to_status_id").Value));
- }
- catch(Exception)
+ var evt = this.StoredEvent[index];
+ if (evt.Id == deletedId.Value && (evt.Event == "favorite" || evt.Event == "unfavorite"))
{
- MyCommon.TraceOut("scrub_geo:" + line);
+ this.StoredEvent.RemoveAt(index);
}
- return;
}
- }
+ break;
- if (isDm)
- {
- try
- {
- var message = TwitterStreamEventDirectMessage.ParseJson(line).DirectMessage;
- this.CreateDirectMessagesFromJson(new[] { message }, MyCommon.WORKERTYPE.UserStream, false);
- }
- catch (SerializationException ex)
- {
- throw TwitterApiException.CreateFromException(ex, line);
- }
- }
- else
- {
- try
- {
- var status = TwitterStatusCompat.ParseJson(line);
- this.CreatePostsFromJson(new[] { status.Normalize() }, MyCommon.WORKERTYPE.UserStream, null, false);
- }
- catch (SerializationException ex)
- {
- throw TwitterApiException.CreateFromException(ex, line);
- }
- }
- }
- catch (WebApiException ex)
- {
- MyCommon.TraceOut(ex);
- return;
- }
- catch (XmlException)
- {
- MyCommon.TraceOut("XmlException (StatusArrived): " + line);
- }
- catch(NullReferenceException)
- {
- MyCommon.TraceOut("NullRef StatusArrived: " + line);
- }
+ case StreamMessageEvent eventMessage:
+ this.CreateEventFromJson(eventMessage);
+ break;
+
+ case StreamMessageScrubGeo scrubGeoMessage:
+ TabInformations.GetInstance().ScrubGeoReserve(scrubGeoMessage.UserId, scrubGeoMessage.UpToStatusId);
+ break;
- this.NewPostFromStream?.Invoke(this, EventArgs.Empty);
+ default:
+ break;
+ }
}
/// <summary>
/// UserStreamsから受信した公式RTをイベントに変換します
/// </summary>
- private FormattedEvent CreateEventFromRetweet(XElement xElm)
+ private FormattedEvent CreateEventFromRetweet(TwitterStatus status)
{
return new FormattedEvent
{
Eventtype = MyCommon.EVENTTYPE.Retweet,
Event = "retweet",
- CreatedAt = MyCommon.DateTimeParse(xElm.XPathSelectElement("/created_at").Value),
- IsMe = xElm.XPathSelectElement("/user/id_str").Value == this.UserId.ToString(),
- Username = xElm.XPathSelectElement("/user/screen_name").Value,
+ CreatedAt = MyCommon.DateTimeParse(status.CreatedAt),
+ IsMe = status.User.Id == this.UserId,
+ Username = status.User.ScreenName,
Target = string.Format("@{0}:{1}", new[]
{
- xElm.XPathSelectElement("/retweeted_status/user/screen_name").Value,
- WebUtility.HtmlDecode(xElm.XPathSelectElement("/retweeted_status/text").Value),
+ status.RetweetedStatus.User.ScreenName,
+ WebUtility.HtmlDecode(status.RetweetedStatus.FullText),
}),
- Id = long.Parse(xElm.XPathSelectElement("/retweeted_status/id_str").Value),
+ Id = status.RetweetedStatus.Id,
};
}
- private void CreateEventFromJson(string content)
+ private void CreateEventFromJson(StreamMessageEvent message)
{
- TwitterStreamEvent eventData = null;
- try
- {
- eventData = TwitterStreamEvent.ParseJson(content);
- }
- catch(SerializationException ex)
- {
- MyCommon.TraceOut(ex, "Event Serialize Exception!" + Environment.NewLine + content);
- }
- catch(Exception ex)
- {
- MyCommon.TraceOut(ex, "Event Exception!" + Environment.NewLine + content);
- }
+ var eventData = message.Event;
var evt = new FormattedEvent
{
return;
case "favorite":
case "unfavorite":
- tweetEvent = TwitterStreamEvent<TwitterStatusCompat>.ParseJson(content);
+ tweetEvent = message.ParseTargetObjectAs<TwitterStatusCompat>();
tweet = tweetEvent.TargetObject.Normalize();
evt.Target = "@" + tweet.User.ScreenName + ":" + WebUtility.HtmlDecode(tweet.FullText);
evt.Id = tweet.Id;
case "quoted_tweet":
if (evt.IsMe) return;
- tweetEvent = TwitterStreamEvent<TwitterStatusCompat>.ParseJson(content);
+ tweetEvent = message.ParseTargetObjectAs<TwitterStatusCompat>();
tweet = tweetEvent.TargetObject.Normalize();
evt.Target = "@" + tweet.User.ScreenName + ":" + WebUtility.HtmlDecode(tweet.FullText);
evt.Id = tweet.Id;
case "list_updated":
case "list_user_subscribed":
case "list_user_unsubscribed":
- var listEvent = TwitterStreamEvent<TwitterList>.ParseJson(content);
+ var listEvent = message.ParseTargetObjectAs<TwitterList>();
evt.Target = listEvent.TargetObject.FullName;
break;
case "block":
break;
default:
- MyCommon.TraceOut("Unknown Event:" + evt.Event + Environment.NewLine + content);
+ MyCommon.TraceOut("Unknown Event:" + evt.Event + Environment.NewLine + message.Json);
break;
}
this.StoredEvent.Insert(0, evt);
public bool IsStreamActive { get; private set; }
- public event Action<string> StatusArrived;
+ public event Action<ITwitterStreamMessage> StatusArrived;
public event Action Stopped;
public event Action Started;