-
Notifications
You must be signed in to change notification settings - Fork 374
/
Copy pathWebSocketServerImpl.cs
235 lines (207 loc) · 8.42 KB
/
WebSocketServerImpl.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System;
using System.IO;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Diagnostics.NETCore.Client.WebSocketServer;
using HttpContext = Microsoft.AspNetCore.Http.HttpContext;
using LogLevel = Microsoft.Extensions.Logging.LogLevel;
namespace Microsoft.Diagnostics.WebSocketServer;
// This class implements the IWebSocketServer interface exposed by the Microsoft.Diagnostics.NETCore.Client library.
// It is responsible for coordinating between an underlying web server that creates web socket connections and the diagnostic server router that
// is used by dotnet-dsrouter to pass the diagnostic server connections to the diagnostic clients.
public class WebSocketServerImpl : IWebSocketServer
{
private EmbeddedWebSocketServer _server;
private volatile int _started;
// Used to coordinate between the webserver accepting incoming websocket connections and the diagnostic server waiting for a stream to be available.
// This could be a deeper queue if we wanted to somehow allow multiple browser tabs to connect to the same dsrouter, but it's unclear what to do with them
// since on the other end we have a single IpcStream with a single diagnostic client.
private readonly Queue<Conn> _acceptQueue = new();
private readonly LogLevel _logLevel;
public WebSocketServerImpl(LogLevel logLevel)
{
_logLevel = logLevel;
}
public async Task StartServer(string endpoint, CancellationToken cancellationToken)
{
if (Interlocked.CompareExchange(ref _started, 1, 0) != 0)
{
throw new InvalidOperationException("Server already started");
}
ParseWebSocketURL(endpoint, out Uri uri);
string scheme = uri.Scheme switch
{
"ws" => "http",
"http" => "http",
"wss" => "https",
"https" => "https",
_ => throw new ArgumentException(string.Format("Unsupported Uri schema, \"{0}\"", uri.Scheme))
};
EmbeddedWebSocketServer.Options options = new()
{
Scheme = scheme,
Host = uri.Host,
Port = uri.Port.ToString(),
Path = uri.PathAndQuery,
LogLevel = _logLevel,
};
_server = EmbeddedWebSocketServer.CreateWebServer(options, HandleWebSocket);
await _server.StartWebServer(cancellationToken).ConfigureAwait(false);
}
public async Task StopServer(CancellationToken cancellationToken)
{
if (_started == 0)
{
throw new InvalidOperationException("Server not started");
}
if (_server == null)
{
return;
}
await _server.StopWebServer(cancellationToken).ConfigureAwait(false);
_server = null;
}
public async Task HandleWebSocket(HttpContext context, WebSocket webSocket, CancellationToken cancellationToken)
{
// Called by the web server when a new websocket connection is established. We put the connection into our queue of accepted connections
// and wait until someone uses it and disposes of the connection.
await QueueWebSocketUntilClose(context, webSocket, cancellationToken).ConfigureAwait(false);
}
internal async Task QueueWebSocketUntilClose(HttpContext context, WebSocket webSocket, CancellationToken cancellationToken)
{
// we have to "keep the middleware alive" until we're done with the websocket.
// make a TCS that will be signaled when the stream is disposed.
TaskCompletionSource streamDisposedTCS = new(cancellationToken);
await _acceptQueue.Enqueue(new Conn(context, webSocket, streamDisposedTCS), cancellationToken).ConfigureAwait(false);
await streamDisposedTCS.Task.ConfigureAwait(false);
}
internal Task<Conn> GetOrRequestConnection(CancellationToken cancellationToken)
{
// This is called from the diagnostic server when it is ready to start talking to a connection. We give them back a connection from
// the ones the web server has accepted, or block until the web server queues a new one.
return _acceptQueue.Dequeue(cancellationToken);
}
public async Task<Stream> AcceptConnection(CancellationToken cancellationToken)
{
Conn conn = await GetOrRequestConnection(cancellationToken).ConfigureAwait(false);
return conn.GetStream();
}
// Single-element queue where both queueing and dequeueing are async operations that wait until
// the queue has capacity (or an item, respectively).
internal sealed class Queue<T>
{
private T _obj;
private readonly SemaphoreSlim _empty;
private readonly SemaphoreSlim _full;
private readonly SemaphoreSlim _objLock;
public Queue()
{
_obj = default;
int capacity = 1;
_empty = new SemaphoreSlim(capacity, capacity);
_full = new SemaphoreSlim(0, capacity);
_objLock = new SemaphoreSlim(1, 1);
}
public async Task Enqueue(T t, CancellationToken cancellationToken)
{
bool locked = false;
try
{
await _empty.WaitAsync(cancellationToken).ConfigureAwait(false);
await _objLock.WaitAsync(cancellationToken).ConfigureAwait(false);
locked = true;
_obj = t;
}
finally
{
if (locked)
{
_objLock.Release();
_full.Release();
}
}
}
public async Task<T> Dequeue(CancellationToken cancellationToken)
{
bool locked = false;
try
{
await _full.WaitAsync(cancellationToken).ConfigureAwait(false);
await _objLock.WaitAsync(cancellationToken).ConfigureAwait(false);
locked = true;
T t = _obj;
_obj = default;
return t;
}
finally
{
if (locked)
{
_objLock.Release();
_empty.Release();
}
}
}
}
private static void ParseWebSocketURL(string endPoint, out Uri uri)
{
string uriToParse;
// Host can contain wildcard (*) that is a reserved charachter in URI's.
// Replace with dummy localhost representation just for parsing purpose.
if (endPoint.Contains("//*"))
{
// FIXME: This is a workaround for the fact that Uri.Host is not set for wildcard host.
throw new ArgumentException("Wildcard host is not supported for WebSocket endpoints");
}
else
{
uriToParse = endPoint;
}
string[] supportedSchemes = new string[] { "ws", "wss", "http", "https" };
if (!string.IsNullOrEmpty(uriToParse) && Uri.TryCreate(uriToParse, UriKind.Absolute, out uri))
{
bool supported = false;
foreach (string scheme in supportedSchemes)
{
if (string.Equals(uri.Scheme, scheme, StringComparison.InvariantCultureIgnoreCase))
{
supported = true;
break;
}
}
if (!supported)
{
throw new ArgumentException(string.Format("Unsupported Uri schema, \"{0}\"", uri.Scheme));
}
return;
}
else
{
throw new ArgumentException(string.Format("Could not parse {0} into host, port", endPoint));
}
}
// An abstraction encapsulating an open websocket connection.
internal sealed class Conn
{
private readonly WebSocket _webSocket;
private readonly HttpContext _context;
private readonly TaskCompletionSource _streamDisposed;
public Conn(HttpContext context, WebSocket webSocket, TaskCompletionSource streamDisposed)
{
_context = context;
_webSocket = webSocket;
_streamDisposed = streamDisposed;
}
public Stream GetStream()
{
return new WebSocketStreamAdapter(_webSocket, OnStreamDispose);
}
private void OnStreamDispose()
{
_streamDisposed.SetResult();
}
}
}