OSDN Git Service

Twitter.TwitterUserstreamを非同期なメソッドで書き換え
authorKimura Youichi <kim.upsilon@bucyou.net>
Mon, 2 May 2016 18:52:32 +0000 (03:52 +0900)
committerKimura Youichi <kim.upsilon@bucyou.net>
Mon, 2 May 2016 18:56:39 +0000 (03:56 +0900)
OpenTween.Tests/Api/TwitterApiTest.cs
OpenTween/Api/TwitterApi.cs
OpenTween/AppendSettingDialog.cs
OpenTween/Connection/HttpTwitter.cs
OpenTween/Tween.cs
OpenTween/Twitter.cs

index eaf4331..db7a11d 100644 (file)
@@ -21,6 +21,7 @@
 
 using System;
 using System.Collections.Generic;
+using System.IO;
 using System.Linq;
 using System.Net.Http;
 using System.Reflection;
@@ -829,5 +830,32 @@ namespace OpenTween.Api
                 mock.VerifyAll();
             }
         }
+
+        [Fact]
+        public async Task UserStreams_Test()
+        {
+            using (var twitterApi = new TwitterApi())
+            {
+                var mock = new Mock<IApiConnection>();
+                mock.Setup(x =>
+                    x.GetStreamAsync(
+                        new Uri("https://userstream.twitter.com/1.1/user.json", UriKind.Absolute),
+                        new Dictionary<string, string> {
+                            { "replies", "all" },
+                            { "track", "OpenTween" },
+                        })
+                )
+                .ReturnsAsync(new MemoryStream());
+
+                twitterApi.apiConnection = mock.Object;
+
+                var stream = await twitterApi.UserStreams(replies: "all", track: "OpenTween")
+                    .ConfigureAwait(false);
+
+                stream.Dispose();
+
+                mock.VerifyAll();
+            }
+        }
     }
 }
index eab35a0..3362c8a 100644 (file)
@@ -21,6 +21,7 @@
 
 using System;
 using System.Collections.Generic;
+using System.IO;
 using System.Linq;
 using System.Text;
 using System.Threading;
@@ -451,6 +452,19 @@ namespace OpenTween.Api
             return this.apiConnection.PostLazyAsync<TwitterUploadMediaResult>(endpoint, null, paramMedia);
         }
 
+        public Task<Stream> UserStreams(string replies = null, string track = null)
+        {
+            var endpoint = new Uri("https://userstream.twitter.com/1.1/user.json");
+            var param = new Dictionary<string, string>();
+
+            if (replies != null)
+                param["replies"] = replies;
+            if (track != null)
+                param["track"] = track;
+
+            return this.apiConnection.GetStreamAsync(endpoint, param);
+        }
+
         public void Dispose()
         {
             this.apiConnection?.Dispose();
index 9d3115f..bcc7de6 100644 (file)
@@ -342,7 +342,7 @@ namespace OpenTween
 
         private void CheckPostAndGet_CheckedChanged(object sender, EventArgs e)
         {
-            this.GetPeriodPanel.LabelPostAndGet.Visible = this.GetPeriodPanel.CheckPostAndGet.Checked && !tw.UserStreamEnabled;
+            this.GetPeriodPanel.LabelPostAndGet.Visible = this.GetPeriodPanel.CheckPostAndGet.Checked && !tw.UserStreamActive;
         }
 
         private void Setting_Shown(object sender, EventArgs e)
@@ -354,8 +354,8 @@ namespace OpenTween
             } while (!this.IsHandleCreated);
             this.TopMost = this.PreviewPanel.CheckAlwaysTop.Checked;
 
-            this.GetPeriodPanel.LabelPostAndGet.Visible = this.GetPeriodPanel.CheckPostAndGet.Checked && !tw.UserStreamEnabled;
-            this.GetPeriodPanel.LabelUserStreamActive.Visible = tw.UserStreamEnabled;
+            this.GetPeriodPanel.LabelPostAndGet.Visible = this.GetPeriodPanel.CheckPostAndGet.Checked && !tw.UserStreamActive;
+            this.GetPeriodPanel.LabelUserStreamActive.Visible = tw.UserStreamActive;
         }
 
         private bool BitlyValidation(string id, string apikey)
index ef44e73..a4c0775 100644 (file)
@@ -437,26 +437,6 @@ namespace OpenTween
             };
         }
 
-        public HttpStatusCode UserStream(ref Stream content,
-                                         bool allAtReplies,
-                                         string trackwords,
-                                         string userAgent)
-        {
-            Dictionary<string, string> param = new Dictionary<string, string>();
-
-            if (allAtReplies)
-                param.Add("replies", "all");
-
-            if (!string.IsNullOrEmpty(trackwords))
-                param.Add("track", trackwords);
-
-            return httpCon.GetContent(GetMethod,
-                this.CreateTwitterUserStreamUri("/1.1/user.json"),
-                param,
-                ref content,
-                userAgent);
-        }
-
         public HttpStatusCode FilterStream(ref Stream content,
                                            string trackwords,
                                            string userAgent)
index 16c8a78..cba318e 100644 (file)
@@ -1383,7 +1383,7 @@ namespace OpenTween
             if (ResetTimers.UserStream || usCounter <= 0 && this._cfgCommon.UserstreamPeriod > 0)
             {
                 Interlocked.Exchange(ref usCounter, this._cfgCommon.UserstreamPeriod);
-                if (this._isActiveUserstream)
+                if (this.tw.UserStreamActive)
                     this.RefreshTimeline();
                 ResetTimers.UserStream = false;
             }
@@ -3187,7 +3187,7 @@ namespace OpenTween
 
             if (this._cfgCommon.PostAndGet)
             {
-                if (this._isActiveUserstream)
+                if (this.tw.UserStreamActive)
                     this.RefreshTimeline();
                 else
                     await this.GetHomeTimelineAsync();
@@ -3251,7 +3251,7 @@ namespace OpenTween
                     this._postTimestamps.RemoveAt(i);
             }
 
-            if (this._cfgCommon.PostAndGet && !this._isActiveUserstream)
+            if (this._cfgCommon.PostAndGet && !this.tw.UserStreamActive)
                 await this.GetHomeTimelineAsync();
         }
 
@@ -11020,11 +11020,10 @@ namespace OpenTween
             tw.PostDeleted += tw_PostDeleted;
             tw.UserStreamEventReceived += tw_UserStreamEventArrived;
 
-            MenuItemUserStream.Text = "&UserStream ■";
-            MenuItemUserStream.Enabled = true;
-            StopToolStripMenuItem.Text = "&Start";
-            StopToolStripMenuItem.Enabled = true;
-            if (this._cfgCommon.UserstreamStartup) tw.StartUserStream();
+            this.RefreshUserStreamsMenu();
+
+            if (this._cfgCommon.UserstreamStartup)
+                tw.StartUserStream();
         }
 
         private async void TweenMain_Shown(object sender, EventArgs e)
@@ -12825,8 +12824,6 @@ namespace OpenTween
         }
 
 #region "Userstream"
-        private bool _isActiveUserstream = false;
-
         private void tw_PostDeleted(object sender, PostDeletedEventArgs e)
         {
             try
@@ -12889,7 +12886,6 @@ namespace OpenTween
 
         private void tw_UserStreamStarted(object sender, EventArgs e)
         {
-            this._isActiveUserstream = true;
             try
             {
                 if (InvokeRequired && !IsDisposed)
@@ -12907,17 +12903,14 @@ namespace OpenTween
                 return;
             }
 
-            MenuItemUserStream.Text = "&UserStream ▶";
-            MenuItemUserStream.Enabled = true;
-            StopToolStripMenuItem.Text = "&Stop";
-            StopToolStripMenuItem.Enabled = true;
+            this.RefreshUserStreamsMenu();
+            this.MenuItemUserStream.Enabled = true;
 
             StatusLabel.Text = "UserStream Started.";
         }
 
         private void tw_UserStreamStopped(object sender, EventArgs e)
         {
-            this._isActiveUserstream = false;
             try
             {
                 if (InvokeRequired && !IsDisposed)
@@ -12935,14 +12928,26 @@ namespace OpenTween
                 return;
             }
 
-            MenuItemUserStream.Text = "&UserStream ■";
-            MenuItemUserStream.Enabled = true;
-            StopToolStripMenuItem.Text = "&Start";
-            StopToolStripMenuItem.Enabled = true;
+            this.RefreshUserStreamsMenu();
+            this.MenuItemUserStream.Enabled = true;
 
             StatusLabel.Text = "UserStream Stopped.";
         }
 
+        private void RefreshUserStreamsMenu()
+        {
+            if (this.tw.UserStreamActive)
+            {
+                this.MenuItemUserStream.Text = "&UserStream ▶";
+                this.StopToolStripMenuItem.Text = "&Stop";
+            }
+            else
+            {
+                this.MenuItemUserStream.Text = "&UserStream ■";
+                this.StopToolStripMenuItem.Text = "&Start";
+            }
+        }
+
         private void tw_UserStreamEventArrived(object sender, UserStreamEventReceivedEventArgs e)
         {
             try
@@ -13071,7 +13076,7 @@ namespace OpenTween
                 StopRefreshAllMenuItem.Checked = false;
                 return;
             }
-            if (this._isActiveUserstream)
+            if (this.tw.UserStreamActive)
             {
                 tw.StopUserStream();
             }
index f8bf3f6..205b727 100644 (file)
@@ -29,6 +29,7 @@ using System.Diagnostics;
 using System.IO;
 using System.Linq;
 using System.Net;
+using System.Net.Http;
 using System.Runtime.CompilerServices;
 using System.Runtime.Serialization;
 using System.Runtime.Serialization.Json;
@@ -2678,263 +2679,164 @@ namespace OpenTween
             this.UserStreamStopped?.Invoke(this, EventArgs.Empty);
         }
 
-        public bool UserStreamEnabled
-        {
-            get
-            {
-                return userStream == null ? false : userStream.Enabled;
-            }
-        }
+        public bool UserStreamActive
+            => this.userStream == null ? false : this.userStream.IsStreamActive;
 
         public void StartUserStream()
         {
-            if (userStream != null)
-            {
-                StopUserStream();
-            }
-            userStream = new TwitterUserstream(twCon);
-            userStream.StatusArrived += userStream_StatusArrived;
-            userStream.Started += userStream_Started;
-            userStream.Stopped += userStream_Stopped;
-            userStream.Start(this.AllAtReply, this.TrackWord);
+            var newStream = new TwitterUserstream(this.Api);
+
+            newStream.StatusArrived += userStream_StatusArrived;
+            newStream.Started += userStream_Started;
+            newStream.Stopped += userStream_Stopped;
+
+            newStream.Start(this.AllAtReply, this.TrackWord);
+
+            var oldStream = Interlocked.Exchange(ref this.userStream, newStream);
+            oldStream?.Dispose();
         }
 
         public void StopUserStream()
         {
-            userStream?.Dispose();
-            userStream = null;
-            if (!MyCommon._endingFlag)
-            {
-                this.UserStreamStopped?.Invoke(this, EventArgs.Empty);
-            }
+            var oldStream = Interlocked.Exchange(ref this.userStream, null);
+            oldStream?.Dispose();
         }
 
         public void ReconnectUserStream()
         {
-            if (userStream != null)
-            {
-                this.StartUserStream();
-            }
+            this.StartUserStream();
         }
 
         private class TwitterUserstream : IDisposable
         {
+            public bool AllAtReplies { get; private set; }
+            public string TrackWords { get; private set; }
+
+            public bool IsStreamActive { get; private set; }
+
             public event Action<string> StatusArrived;
             public event Action Stopped;
             public event Action Started;
-            private HttpTwitter twCon;
 
-            private Thread _streamThread;
-            private bool _streamActive;
+            private TwitterApi twitterApi;
 
-            private bool _allAtreplies = false;
-            private string _trackwords = "";
+            private Task streamTask;
+            private CancellationTokenSource streamCts;
 
-            public TwitterUserstream(HttpTwitter twitterConnection)
+            public TwitterUserstream(TwitterApi twitterApi)
             {
-                twCon = (HttpTwitter)twitterConnection.Clone();
+                this.twitterApi = twitterApi;
             }
 
             public void Start(bool allAtReplies, string trackwords)
             {
                 this.AllAtReplies = allAtReplies;
                 this.TrackWords = trackwords;
-                _streamActive = true;
-                if (_streamThread != null && _streamThread.IsAlive) return;
-                _streamThread = new Thread(UserStreamLoop);
-                _streamThread.Name = "UserStreamReceiver";
-                _streamThread.IsBackground = true;
-                _streamThread.Start();
-            }
 
-            public bool Enabled
-            {
-                get
-                {
-                    return _streamActive;
-                }
-            }
+                var cts = new CancellationTokenSource();
 
-            public bool AllAtReplies
-            {
-                get
-                {
-                    return _allAtreplies;
-                }
-                set
+                this.streamCts = cts;
+                this.streamTask = Task.Run(async () =>
                 {
-                    _allAtreplies = value;
-                }
+                    try
+                    {
+                        await this.UserStreamLoop(cts.Token)
+                            .ConfigureAwait(false);
+                    }
+                    catch (OperationCanceledException) { }
+                });
             }
 
-            public string TrackWords
+            public void Stop()
             {
-                get
-                {
-                    return _trackwords;
-                }
-                set
-                {
-                    _trackwords = value;
-                }
+                this.streamCts?.Cancel();
+
+                // streamTask の完了を待たずに IsStreamActive を false にセットする
+                this.IsStreamActive = false;
+                this.Stopped?.Invoke();
             }
 
-            private void UserStreamLoop()
+            private async Task UserStreamLoop(CancellationToken cancellationToken)
             {
-                var sleepSec = 0;
-                do
+                TimeSpan? sleep = null;
+                for (;;)
                 {
-                    Stream st = null;
-                    StreamReader sr = null;
-                    try
+                    if (sleep != null)
                     {
-                        if (!MyCommon.IsNetworkAvailable())
-                        {
-                            sleepSec = 30;
-                            continue;
-                        }
+                        await Task.Delay(sleep.Value, cancellationToken)
+                            .ConfigureAwait(false);
+                        sleep = null;
+                    }
 
-                        Started?.Invoke();
+                    if (!MyCommon.IsNetworkAvailable())
+                    {
+                        sleep = TimeSpan.FromSeconds(30);
+                        continue;
+                    }
 
-                        var res = twCon.UserStream(ref st, _allAtreplies, _trackwords, Networking.GetUserAgentString());
+                    this.IsStreamActive = true;
+                    this.Started?.Invoke();
 
-                        switch (res)
-                        {
-                            case HttpStatusCode.OK:
-                                Twitter.AccountState = MyCommon.ACCOUNT_STATE.Valid;
-                                break;
-                            case HttpStatusCode.Unauthorized:
-                                Twitter.AccountState = MyCommon.ACCOUNT_STATE.Invalid;
-                                sleepSec = 120;
-                                continue;
-                        }
+                    try
+                    {
+                        var replies = this.AllAtReplies ? "all" : null;
 
-                        if (st == null)
+                        using (var stream = await this.twitterApi.UserStreams(replies, this.TrackWords)
+                            .ConfigureAwait(false))
+                        using (var reader = new StreamReader(stream))
                         {
-                            sleepSec = 30;
-                            //MyCommon.TraceOut("Stop:stream is null")
-                            continue;
-                        }
+                            while (!reader.EndOfStream)
+                            {
+                                var line = await reader.ReadLineAsync()
+                                    .ConfigureAwait(false);
 
-                        sr = new StreamReader(st);
+                                cancellationToken.ThrowIfCancellationRequested();
 
-                        while (_streamActive && !sr.EndOfStream && Twitter.AccountState == MyCommon.ACCOUNT_STATE.Valid)
-                        {
-                            StatusArrived?.Invoke(sr.ReadLine());
-                            //this.LastTime = Now;
+                                this.StatusArrived?.Invoke(line);
+                            }
                         }
 
-                        if (sr.EndOfStream || Twitter.AccountState == MyCommon.ACCOUNT_STATE.Invalid)
-                        {
-                            sleepSec = 30;
-                            //MyCommon.TraceOut("Stop:EndOfStream")
-                            continue;
-                        }
-                        break;
-                    }
-                    catch(WebException ex)
-                    {
-                        if (ex.Status == WebExceptionStatus.Timeout)
-                        {
-                            sleepSec = 30;                        //MyCommon.TraceOut("Stop:Timeout")
-                        }
-                        else if (ex.Response != null && (int)((HttpWebResponse)ex.Response).StatusCode == 420)
-                        {
-                            //MyCommon.TraceOut("Stop:Connection Limit")
-                            break;
-                        }
-                        else
-                        {
-                            sleepSec = 30;
-                            //MyCommon.TraceOut("Stop:WebException " + ex.Status.ToString())
-                        }
-                    }
-                    catch(ThreadAbortException)
-                    {
-                        break;
-                    }
-                    catch(IOException)
-                    {
-                        sleepSec = 30;
-                        //MyCommon.TraceOut("Stop:IOException with Active." + Environment.NewLine + ex.Message)
+                        // キャンセルされていないのにストリームが終了した場合
+                        sleep = TimeSpan.FromSeconds(30);
                     }
-                    catch(ArgumentException ex)
+                    catch (HttpRequestException) { sleep = TimeSpan.FromSeconds(30); }
+                    catch (IOException) { sleep = TimeSpan.FromSeconds(30); }
+                    catch (OperationCanceledException)
                     {
-                        //System.ArgumentException: ストリームを読み取れませんでした。
-                        //サーバー側もしくは通信経路上で切断された場合?タイムアウト頻発後発生
-                        sleepSec = 30;
-                        MyCommon.TraceOut(ex, "Stop:ArgumentException");
+                        if (cancellationToken.IsCancellationRequested)
+                            throw;
+
+                        // cancellationToken によるキャンセルではない(=タイムアウトエラー)
+                        sleep = TimeSpan.FromSeconds(30);
                     }
-                    catch(Exception ex)
+                    catch (Exception ex)
                     {
-                        MyCommon.TraceOut("Stop:Exception." + Environment.NewLine + ex.Message);
                         MyCommon.ExceptionOut(ex);
-                        sleepSec = 30;
+                        sleep = TimeSpan.FromSeconds(30);
                     }
                     finally
                     {
-                        if (_streamActive)
-                        {
-                            Stopped?.Invoke();
-                        }
-                        twCon.RequestAbort();
-                        sr?.Close();
-                        if (sleepSec > 0)
-                        {
-                            var ms = 0;
-                            while (_streamActive && ms < sleepSec * 1000)
-                            {
-                                Thread.Sleep(500);
-                                ms += 500;
-                            }
-                        }
-                        sleepSec = 0;
+                        this.IsStreamActive = false;
+                        this.Stopped?.Invoke();
                     }
-                } while (this._streamActive);
-
-                if (_streamActive)
-                {
-                    Stopped?.Invoke();
                 }
-                MyCommon.TraceOut("Stop:EndLoop");
             }
 
-#region "IDisposable Support"
-            private bool disposedValue; // 重複する呼び出しを検出するには
+            private bool disposed = false;
 
-            // IDisposable
-            protected virtual void Dispose(bool disposing)
+            public void Dispose()
             {
-                if (!this.disposedValue)
-                {
-                    if (disposing)
-                    {
-                        _streamActive = false;
-                        if (_streamThread != null && _streamThread.IsAlive)
-                        {
-                            _streamThread.Abort();
-                        }
-                    }
-                }
-                this.disposedValue = true;
-            }
+                if (this.disposed)
+                    return;
 
-            //protected Overrides void Finalize()
-            //{
-            //    // このコードを変更しないでください。クリーンアップ コードを上の Dispose(bool disposing) に記述します。
-            //    Dispose(false)
-            //    MyBase.Finalize()
-            //}
+                this.disposed = true;
 
-            // このコードは、破棄可能なパターンを正しく実装できるように Visual Basic によって追加されました。
-            public void Dispose()
-            {
-                // このコードを変更しないでください。クリーンアップ コードを上の Dispose(bool disposing) に記述します。
-                Dispose(true);
-                GC.SuppressFinalize(this);
-            }
-#endregion
+                this.Stop();
 
+                this.Started = null;
+                this.Stopped = null;
+                this.StatusArrived = null;
+            }
         }
 #endregion