3 using System
.Threading
.Tasks
;
4 using Newtonsoft
.Json
.Linq
;
6 using System
.Net
.WebSockets
;
7 using System
.Threading
;
10 using System
.Collections
.Generic
;
13 public class WsClient
: IDisposable
{
14 ClientWebSocket socket
;
15 List
<Task
> pending_ops
= new List
<Task
> ();
16 TaskCompletionSource
<bool> side_exit
= new TaskCompletionSource
<bool> ();
17 List
<byte []> pending_writes
= new List
<byte []> ();
27 public void Dispose() {
31 protected virtual void Dispose (bool disposing
) {
36 Task
Pump (Task task
, CancellationToken token
)
38 if (task
!= current_write
)
42 pending_writes
.RemoveAt (0);
44 if (pending_writes
.Count
> 0) {
45 current_write
= socket
.SendAsync (new ArraySegment
<byte> (pending_writes
[0]), WebSocketMessageType
.Text
, true, token
);
51 async Task
<string> ReadOne (CancellationToken token
)
53 byte [] buff
= new byte [4000];
54 var mem
= new MemoryStream ();
56 var result
= await this.socket
.ReceiveAsync (new ArraySegment
<byte> (buff
), token
);
57 if (result
.MessageType
== WebSocketMessageType
.Close
) {
61 if (result
.EndOfMessage
) {
62 mem
.Write (buff
, 0, result
.Count
);
63 return Encoding
.UTF8
.GetString (mem
.GetBuffer (), 0, (int)mem
.Length
);
65 mem
.Write (buff
, 0, result
.Count
);
70 protected void Send (byte [] bytes
, CancellationToken token
)
72 pending_writes
.Add (bytes
);
73 if (pending_writes
.Count
== 1) {
74 if (current_write
!= null)
75 throw new Exception ("Internal state is bad. current_write must be null if there are no pending writes");
77 current_write
= socket
.SendAsync (new ArraySegment
<byte> (bytes
), WebSocketMessageType
.Text
, true, token
);
78 pending_ops
.Add (current_write
);
82 async Task
MarkCompleteAfterward (Func
<CancellationToken
, Task
> send
, CancellationToken token
)
86 side_exit
.SetResult (true);
87 } catch (Exception e
) {
88 side_exit
.SetException (e
);
91 protected async Task
<bool> ConnectWithMainLoops(
93 Func
<string, CancellationToken
, Task
> receive
,
94 Func
<CancellationToken
, Task
> send
,
95 CancellationToken token
) {
97 Console
.WriteLine ("connecting to {0}", uri
);
98 this.socket
= new ClientWebSocket ();
99 this.socket
.Options
.KeepAliveInterval
= Timeout
.InfiniteTimeSpan
;
101 await this.socket
.ConnectAsync (uri
, token
);
102 pending_ops
.Add (ReadOne (token
));
103 pending_ops
.Add (side_exit
.Task
);
104 pending_ops
.Add (MarkCompleteAfterward (send
, token
));
106 while (!token
.IsCancellationRequested
) {
107 var task
= await Task
.WhenAny (pending_ops
);
108 if (task
== pending_ops
[0]) { //pending_ops[0] is for message reading
109 var msg
= ((Task
<string>)task
).Result
;
110 pending_ops
[0] = ReadOne (token
);
111 Task tsk
= receive (msg
, token
);
113 pending_ops
.Add (tsk
);
114 } else if (task
== pending_ops
[1]) {
115 var res
= ((Task
<bool>)task
).Result
;
116 //it might not throw if exiting successfull
118 } else { //must be a background task
119 pending_ops
.Remove (task
);
120 var tsk
= Pump (task
, token
);
122 pending_ops
.Add (tsk
);