if (ResetTimers.UserStream || usCounter <= 0 && this._cfgCommon.UserstreamPeriod > 0)
{
Interlocked.Exchange(ref usCounter, this._cfgCommon.UserstreamPeriod);
- if (this._isActiveUserstream)
+ if (this.tw.UserStreamActive)
this.RefreshTimeline();
ResetTimers.UserStream = false;
}
if (this._cfgCommon.PostAndGet)
{
- if (this._isActiveUserstream)
+ if (this.tw.UserStreamActive)
this.RefreshTimeline();
else
await this.GetHomeTimelineAsync();
this._postTimestamps.RemoveAt(i);
}
- if (this._cfgCommon.PostAndGet && !this._isActiveUserstream)
+ if (this._cfgCommon.PostAndGet && !this.tw.UserStreamActive)
await this.GetHomeTimelineAsync();
}
tw.PostDeleted += tw_PostDeleted;
tw.UserStreamEventReceived += tw_UserStreamEventArrived;
- MenuItemUserStream.Text = "&UserStream ■";
- MenuItemUserStream.Enabled = true;
- StopToolStripMenuItem.Text = "&Start";
- StopToolStripMenuItem.Enabled = true;
- if (this._cfgCommon.UserstreamStartup) tw.StartUserStream();
+ this.RefreshUserStreamsMenu();
+
+ if (this._cfgCommon.UserstreamStartup)
+ tw.StartUserStream();
}
private async void TweenMain_Shown(object sender, EventArgs e)
}
#region "Userstream"
- private bool _isActiveUserstream = false;
-
private void tw_PostDeleted(object sender, PostDeletedEventArgs e)
{
try
private void tw_UserStreamStarted(object sender, EventArgs e)
{
- this._isActiveUserstream = true;
try
{
if (InvokeRequired && !IsDisposed)
return;
}
- MenuItemUserStream.Text = "&UserStream ▶";
- MenuItemUserStream.Enabled = true;
- StopToolStripMenuItem.Text = "&Stop";
- StopToolStripMenuItem.Enabled = true;
+ this.RefreshUserStreamsMenu();
+ this.MenuItemUserStream.Enabled = true;
StatusLabel.Text = "UserStream Started.";
}
private void tw_UserStreamStopped(object sender, EventArgs e)
{
- this._isActiveUserstream = false;
try
{
if (InvokeRequired && !IsDisposed)
return;
}
- MenuItemUserStream.Text = "&UserStream ■";
- MenuItemUserStream.Enabled = true;
- StopToolStripMenuItem.Text = "&Start";
- StopToolStripMenuItem.Enabled = true;
+ this.RefreshUserStreamsMenu();
+ this.MenuItemUserStream.Enabled = true;
StatusLabel.Text = "UserStream Stopped.";
}
+ private void RefreshUserStreamsMenu()
+ {
+ if (this.tw.UserStreamActive)
+ {
+ this.MenuItemUserStream.Text = "&UserStream ▶";
+ this.StopToolStripMenuItem.Text = "&Stop";
+ }
+ else
+ {
+ this.MenuItemUserStream.Text = "&UserStream ■";
+ this.StopToolStripMenuItem.Text = "&Start";
+ }
+ }
+
private void tw_UserStreamEventArrived(object sender, UserStreamEventReceivedEventArgs e)
{
try
StopRefreshAllMenuItem.Checked = false;
return;
}
- if (this._isActiveUserstream)
+ if (this.tw.UserStreamActive)
{
tw.StopUserStream();
}
using System.IO;
using System.Linq;
using System.Net;
+using System.Net.Http;
using System.Runtime.CompilerServices;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Json;
this.UserStreamStopped?.Invoke(this, EventArgs.Empty);
}
- public bool UserStreamEnabled
- {
- get
- {
- return userStream == null ? false : userStream.Enabled;
- }
- }
+ public bool UserStreamActive
+ => this.userStream == null ? false : this.userStream.IsStreamActive;
public void StartUserStream()
{
- if (userStream != null)
- {
- StopUserStream();
- }
- userStream = new TwitterUserstream(twCon);
- userStream.StatusArrived += userStream_StatusArrived;
- userStream.Started += userStream_Started;
- userStream.Stopped += userStream_Stopped;
- userStream.Start(this.AllAtReply, this.TrackWord);
+ var newStream = new TwitterUserstream(this.Api);
+
+ newStream.StatusArrived += userStream_StatusArrived;
+ newStream.Started += userStream_Started;
+ newStream.Stopped += userStream_Stopped;
+
+ newStream.Start(this.AllAtReply, this.TrackWord);
+
+ var oldStream = Interlocked.Exchange(ref this.userStream, newStream);
+ oldStream?.Dispose();
}
public void StopUserStream()
{
- userStream?.Dispose();
- userStream = null;
- if (!MyCommon._endingFlag)
- {
- this.UserStreamStopped?.Invoke(this, EventArgs.Empty);
- }
+ var oldStream = Interlocked.Exchange(ref this.userStream, null);
+ oldStream?.Dispose();
}
public void ReconnectUserStream()
{
- if (userStream != null)
- {
- this.StartUserStream();
- }
+ this.StartUserStream();
}
private class TwitterUserstream : IDisposable
{
+ public bool AllAtReplies { get; private set; }
+ public string TrackWords { get; private set; }
+
+ public bool IsStreamActive { get; private set; }
+
public event Action<string> StatusArrived;
public event Action Stopped;
public event Action Started;
- private HttpTwitter twCon;
- private Thread _streamThread;
- private bool _streamActive;
+ private TwitterApi twitterApi;
- private bool _allAtreplies = false;
- private string _trackwords = "";
+ private Task streamTask;
+ private CancellationTokenSource streamCts;
- public TwitterUserstream(HttpTwitter twitterConnection)
+ public TwitterUserstream(TwitterApi twitterApi)
{
- twCon = (HttpTwitter)twitterConnection.Clone();
+ this.twitterApi = twitterApi;
}
public void Start(bool allAtReplies, string trackwords)
{
this.AllAtReplies = allAtReplies;
this.TrackWords = trackwords;
- _streamActive = true;
- if (_streamThread != null && _streamThread.IsAlive) return;
- _streamThread = new Thread(UserStreamLoop);
- _streamThread.Name = "UserStreamReceiver";
- _streamThread.IsBackground = true;
- _streamThread.Start();
- }
- public bool Enabled
- {
- get
- {
- return _streamActive;
- }
- }
+ var cts = new CancellationTokenSource();
- public bool AllAtReplies
- {
- get
- {
- return _allAtreplies;
- }
- set
+ this.streamCts = cts;
+ this.streamTask = Task.Run(async () =>
{
- _allAtreplies = value;
- }
+ try
+ {
+ await this.UserStreamLoop(cts.Token)
+ .ConfigureAwait(false);
+ }
+ catch (OperationCanceledException) { }
+ });
}
- public string TrackWords
+ public void Stop()
{
- get
- {
- return _trackwords;
- }
- set
- {
- _trackwords = value;
- }
+ this.streamCts?.Cancel();
+
+ // streamTask の完了を待たずに IsStreamActive を false にセットする
+ this.IsStreamActive = false;
+ this.Stopped?.Invoke();
}
- private void UserStreamLoop()
+ private async Task UserStreamLoop(CancellationToken cancellationToken)
{
- var sleepSec = 0;
- do
+ TimeSpan? sleep = null;
+ for (;;)
{
- Stream st = null;
- StreamReader sr = null;
- try
+ if (sleep != null)
{
- if (!MyCommon.IsNetworkAvailable())
- {
- sleepSec = 30;
- continue;
- }
+ await Task.Delay(sleep.Value, cancellationToken)
+ .ConfigureAwait(false);
+ sleep = null;
+ }
- Started?.Invoke();
+ if (!MyCommon.IsNetworkAvailable())
+ {
+ sleep = TimeSpan.FromSeconds(30);
+ continue;
+ }
- var res = twCon.UserStream(ref st, _allAtreplies, _trackwords, Networking.GetUserAgentString());
+ this.IsStreamActive = true;
+ this.Started?.Invoke();
- switch (res)
- {
- case HttpStatusCode.OK:
- Twitter.AccountState = MyCommon.ACCOUNT_STATE.Valid;
- break;
- case HttpStatusCode.Unauthorized:
- Twitter.AccountState = MyCommon.ACCOUNT_STATE.Invalid;
- sleepSec = 120;
- continue;
- }
+ try
+ {
+ var replies = this.AllAtReplies ? "all" : null;
- if (st == null)
+ using (var stream = await this.twitterApi.UserStreams(replies, this.TrackWords)
+ .ConfigureAwait(false))
+ using (var reader = new StreamReader(stream))
{
- sleepSec = 30;
- //MyCommon.TraceOut("Stop:stream is null")
- continue;
- }
+ while (!reader.EndOfStream)
+ {
+ var line = await reader.ReadLineAsync()
+ .ConfigureAwait(false);
- sr = new StreamReader(st);
+ cancellationToken.ThrowIfCancellationRequested();
- while (_streamActive && !sr.EndOfStream && Twitter.AccountState == MyCommon.ACCOUNT_STATE.Valid)
- {
- StatusArrived?.Invoke(sr.ReadLine());
- //this.LastTime = Now;
+ this.StatusArrived?.Invoke(line);
+ }
}
- if (sr.EndOfStream || Twitter.AccountState == MyCommon.ACCOUNT_STATE.Invalid)
- {
- sleepSec = 30;
- //MyCommon.TraceOut("Stop:EndOfStream")
- continue;
- }
- break;
- }
- catch(WebException ex)
- {
- if (ex.Status == WebExceptionStatus.Timeout)
- {
- sleepSec = 30; //MyCommon.TraceOut("Stop:Timeout")
- }
- else if (ex.Response != null && (int)((HttpWebResponse)ex.Response).StatusCode == 420)
- {
- //MyCommon.TraceOut("Stop:Connection Limit")
- break;
- }
- else
- {
- sleepSec = 30;
- //MyCommon.TraceOut("Stop:WebException " + ex.Status.ToString())
- }
- }
- catch(ThreadAbortException)
- {
- break;
- }
- catch(IOException)
- {
- sleepSec = 30;
- //MyCommon.TraceOut("Stop:IOException with Active." + Environment.NewLine + ex.Message)
+ // キャンセルされていないのにストリームが終了した場合
+ sleep = TimeSpan.FromSeconds(30);
}
- catch(ArgumentException ex)
+ catch (HttpRequestException) { sleep = TimeSpan.FromSeconds(30); }
+ catch (IOException) { sleep = TimeSpan.FromSeconds(30); }
+ catch (OperationCanceledException)
{
- //System.ArgumentException: ストリームを読み取れませんでした。
- //サーバー側もしくは通信経路上で切断された場合?タイムアウト頻発後発生
- sleepSec = 30;
- MyCommon.TraceOut(ex, "Stop:ArgumentException");
+ if (cancellationToken.IsCancellationRequested)
+ throw;
+
+ // cancellationToken によるキャンセルではない(=タイムアウトエラー)
+ sleep = TimeSpan.FromSeconds(30);
}
- catch(Exception ex)
+ catch (Exception ex)
{
- MyCommon.TraceOut("Stop:Exception." + Environment.NewLine + ex.Message);
MyCommon.ExceptionOut(ex);
- sleepSec = 30;
+ sleep = TimeSpan.FromSeconds(30);
}
finally
{
- if (_streamActive)
- {
- Stopped?.Invoke();
- }
- twCon.RequestAbort();
- sr?.Close();
- if (sleepSec > 0)
- {
- var ms = 0;
- while (_streamActive && ms < sleepSec * 1000)
- {
- Thread.Sleep(500);
- ms += 500;
- }
- }
- sleepSec = 0;
+ this.IsStreamActive = false;
+ this.Stopped?.Invoke();
}
- } while (this._streamActive);
-
- if (_streamActive)
- {
- Stopped?.Invoke();
}
- MyCommon.TraceOut("Stop:EndLoop");
}
-#region "IDisposable Support"
- private bool disposedValue; // 重複する呼び出しを検出するには
+ private bool disposed = false;
- // IDisposable
- protected virtual void Dispose(bool disposing)
+ public void Dispose()
{
- if (!this.disposedValue)
- {
- if (disposing)
- {
- _streamActive = false;
- if (_streamThread != null && _streamThread.IsAlive)
- {
- _streamThread.Abort();
- }
- }
- }
- this.disposedValue = true;
- }
+ if (this.disposed)
+ return;
- //protected Overrides void Finalize()
- //{
- // // このコードを変更しないでください。クリーンアップ コードを上の Dispose(bool disposing) に記述します。
- // Dispose(false)
- // MyBase.Finalize()
- //}
+ this.disposed = true;
- // このコードは、破棄可能なパターンを正しく実装できるように Visual Basic によって追加されました。
- public void Dispose()
- {
- // このコードを変更しないでください。クリーンアップ コードを上の Dispose(bool disposing) に記述します。
- Dispose(true);
- GC.SuppressFinalize(this);
- }
-#endregion
+ this.Stop();
+ this.Started = null;
+ this.Stopped = null;
+ this.StatusArrived = null;
+ }
}
#endregion