Added Server Sent Events

This commit is contained in:
Michael Nolan 2022-09-01 00:26:26 -05:00
parent 3162360069
commit 9277fda0d5
4 changed files with 440 additions and 20 deletions

View File

@ -79,6 +79,53 @@ internal class SizedStream : Stream
/// Method (ex GET, POST, HEAD)
/// </summary>
public string Method { get; set; }
Func<bool> isConnected;
public bool Connected {
get{
if(isConnected != null)
{
return isConnected();
}
return true;
}
}
public ServerContext(string method,Stream strm,string path,Dictionary<string,List<string>> headers,Func<bool> isConnected)
{
Method = method;
NetworkStream = strm;
RequestHeaders = headers;
ResponseHeaders = new Dictionary<string, List<string>>();
QueryParams = new Dictionary<string, List<string>>();
ResponseHeaders.Add("Server","Tesses.WebServer");
ResponseHeaders.Add("Connection","close");
RawUrl=path;
StatusCode = 200;
// /joel/path/luigi?local=jim&john_surname=connor&demi_surname=lovato&local=tim
string[] splitUrl = path.Split(new char[] { '?' }, 2);
if (splitUrl.Length > 0)
{
UrlPath = splitUrl[0];
OriginalUrlPath=splitUrl[0];
if (splitUrl.Length == 2)
{
//local=jim&john_surname=connor&demi_surname=lovato&local=tim
//we want to split on &
q_parm = splitUrl[1];
}
else
{
q_parm = "";
}
ResetQueryParms();
}
this.isConnected=isConnected;
}
public ServerContext(string method,Stream strm,string path,Dictionary<string,List<string>> headers)
{
Method = method;
@ -111,7 +158,7 @@ internal class SizedStream : Stream
}
ResetQueryParms();
}
isConnected=null;
}
/// <summary>
/// Reset query parms (If api sets them)

View File

@ -78,8 +78,97 @@ namespace Tesses.WebServer
return str;
}
public static void SendNonSeekableStream(this ServerContext ctx,Stream strm,long readFor=-1,string contentType = "application/octet-stream")
{
try
{
long tread=0;
byte[] buffer=new byte[8*1024*1024];
int read=0;
do
{
if(readFor > -1){
read=(int)Math.Min(buffer.Length,readFor-tread);
}else{
read=buffer.Length;
}
if(read == 0) break;
read = strm.Read(buffer,0,read);
strm.Write(buffer,0,read);
}while(read > 0);
} finally {
strm.Close();
ctx.NetworkStream.Close();
}
}
public static async Task SendNonSeekableStreamAsync(this ServerContext ctx,Stream strm,long readFor=-1,string contentType="application/octet-stream")
{
try
{
long tread=0;
byte[] buffer=new byte[8*1024*1024];
int read=0;
do
{
if(readFor > -1){
read=(int)Math.Min(buffer.Length,readFor-tread);
}else{
read=buffer.Length;
}
if(read == 0) break;
read = await strm.ReadAsync(buffer,0,read);
await strm.WriteAsync(buffer,0,read);
}while(read > 0);
} finally {
strm.Close();
ctx.NetworkStream.Close();
}
}
public static void SendStream(this ServerContext ctx,Stream strm,string contentType="application/octet-stream")
{
//ctx.StatusCode = 200;
int start = 0, end = (int)strm.Length - 1;
if (ctx.RequestHeaders.ContainsKey(BYTES_RANGE_HEADER) && strm.CanSeek)
{
if (ctx.RequestHeaders[BYTES_RANGE_HEADER].Count > 1)
{
throw new NotSupportedException("Multiple 'Range' headers are not supported.");
}
var range = ctx.RequestHeaders[BYTES_RANGE_HEADER][0].Replace("bytes=", String.Empty)
.Split(new string[] { "-" }, StringSplitOptions.RemoveEmptyEntries)
.Select(x => Int32.Parse(x))
.ToArray();
start = (range.Length > 0) ? range[0] : 0;
end = (range.Length > 1) ? range[1] : (int)(strm.Length - 1);
var hdrs = ctx.ResponseHeaders;
hdrs.Add("Accept-Ranges", "bytes");
hdrs.Add("Content-Range", "bytes " + start + "-" + end + "/" + strm.Length);
ctx.StatusCode = 206;
}
ctx.ResponseHeaders.Add("Content-Length", (end - start + 1).ToString());
ctx.ResponseHeaders.Add("Content-Type", contentType);
ctx.WriteHeaders();
if (!ctx.Method.Equals("HEAD", StringComparison.Ordinal))
{
try
{
if(strm.CanSeek)
strm.Position = start;
strm.CopyTo(ctx.NetworkStream, Math.Min(8 * 1024 * 1024, end - start + 1));
}
finally
{
strm.Close();
ctx.NetworkStream.Close();
}
}
}
public static async Task SendStreamAsync(this ServerContext ctx, Stream strm, string contentType = "application/octet-stream")
{
//ctx.StatusCode = 200;
@ -116,7 +205,7 @@ namespace Tesses.WebServer
{
if(strm.CanSeek)
strm.Position = start;
strm.CopyTo(ctx.NetworkStream, Math.Min(8 * 1024 * 1024, end - start + 1));
await strm.CopyToAsync(ctx.NetworkStream, Math.Min(8 * 1024 * 1024, end - start + 1));
}
finally
{

View File

@ -5,9 +5,9 @@
<PackageId>Tesses.WebServer</PackageId>
<Author>Mike Nolan</Author>
<Company>Tesses</Company>
<Version>1.0.3.7</Version>
<AssemblyVersion>1.0.3.7</AssemblyVersion>
<FileVersion>1.0.3.7</FileVersion>
<Version>1.0.3.8</Version>
<AssemblyVersion>1.0.3.8</AssemblyVersion>
<FileVersion>1.0.3.8</FileVersion>
<Description>A TCP Listener HTTP(s) Server</Description>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<PackageTags>HTTP, WebServer, Website</PackageTags>

View File

@ -15,9 +15,63 @@ using System.Security.Authentication;
namespace Tesses.WebServer
{
internal class SendEventArgs : EventArgs
{
public string Data {get;set;}
}
public class SendEvents
{
internal event EventHandler<SendEventArgs> EventReceived;
public void SendEvent(object data)
{
SendEvent(JsonConvert.SerializeObject(data));
}
public void SendEvent(string e)
{
try{
EventReceived?.Invoke(this,new SendEventArgs(){Data=e});
}catch(Exception ex)
{
_=ex;
}
}
}
public static class Extensions
{
{
public static async Task WriteAsync(this Stream strm,string text)
{
var data=Encoding.UTF8.GetBytes(text);
await strm.WriteAsync(data,0,data.Length);
await strm.FlushAsync();
}
public static void Write(this Stream strm,string text)
{
var data=Encoding.UTF8.GetBytes(text);
strm.Write(data,0,data.Length);
strm.Flush();
}
public static void ServerSentEvents(this ServerContext ctx,SendEvents evt)
{
bool __connected=true;
ctx.ResponseHeaders.Add("Content-Type","text/event-stream");
ctx.ResponseHeaders.Add("Cache-Control","no-cache");
ctx.WriteHeaders();
try{
EventHandler<SendEventArgs> cb= (sender,e0)=>{
if(__connected)
ctx.NetworkStream.Write($"data: {e0.Data}\n\n");
};
evt.EventReceived += cb;
while(ctx.Connected);
evt.EventReceived -= cb;
__connected=false;
}catch(Exception ex)
{
_=ex;
}
}
/// <summary>
/// Read string from request body
/// </summary>
@ -26,7 +80,7 @@ namespace Tesses.WebServer
public static async Task<string> ReadStringAsync(this ServerContext ctx)
{
string str = null;
using (var reader = new StreamReader(ctx.NetworkStream))
using (var reader = new StreamReader(ctx.GetRequestStream()))
{
str = await reader.ReadToEndAsync();
}
@ -34,6 +88,21 @@ namespace Tesses.WebServer
return str;
}
/// <summary>
/// Read string from request body
/// </summary>
/// <param name="ctx">ServerContext</param>
/// <returns>the contents of request</returns>
public static string ReadString(this ServerContext ctx)
{
string str = null;
using (var reader = new StreamReader(ctx.GetRequestStream()))
{
str = reader.ReadToEnd();
}
return str;
}
/// <summary>
/// Read json from request body
/// </summary>
/// <param name="ctx">ServerContext</param>
@ -45,6 +114,17 @@ namespace Tesses.WebServer
return JsonConvert.DeserializeObject<T>(json);
}
/// <summary>
/// Read json from request body
/// </summary>
/// <param name="ctx">ServerContext</param>
/// <typeparam name="T">type of object (for scema)</typeparam>
/// <returns>object of type T with deserialized json data</returns>
public static T ReadJson<T>(this ServerContext ctx)
{
var json= ctx.ReadString();
return JsonConvert.DeserializeObject<T>(json);
}
/// <summary>
/// Read request body to array
/// </summary>
/// <param name="ctx">ServerContext</param>
@ -54,6 +134,17 @@ namespace Tesses.WebServer
MemoryStream strm = new MemoryStream();
await ctx.ReadToStreamAsync(strm);
return strm.ToArray();
}
/// <summary>
/// Read request body to array
/// </summary>
/// <param name="ctx">ServerContext</param>
/// <returns>Request body data</returns>
public static byte[] ReadBytes(this ServerContext ctx)
{
MemoryStream strm = new MemoryStream();
ctx.ReadToStream(strm);
return strm.ToArray();
}
/// <summary>
/// Read request body to stream
@ -63,7 +154,17 @@ namespace Tesses.WebServer
public static async Task ReadToStreamAsync(this ServerContext ctx,Stream strm)
{
await ctx.NetworkStream.CopyToAsync(strm);
await ctx.GetRequestStream().CopyToAsync(strm);
}
/// <summary>
/// Read request body to stream
/// </summary>
/// <param name="ctx">ServerContext</param>
/// <param name="strm">Stream to write to</param>
public static void ReadToStream(this ServerContext ctx,Stream strm)
{
ctx.GetRequestStream().CopyTo(strm);
}
/// <summary>
/// Read request body to file
@ -84,10 +185,33 @@ namespace Tesses.WebServer
}
using(var f = File.Create(filename))
{
await ctx.NetworkStream.CopyToAsync(f);
await ctx.ReadToStreamAsync(f);
}
return filename;
}
/// <summary>
/// Read request body to file
/// </summary>
/// <param name="ctx">ServerContext</param>
/// <param name="filename">name of file to write too, can be without extension</param>
/// <returns>file path with extension unless mimetype header is missing</returns>
public static string ReadToFile(this ServerContext ctx,string filename)
{
if(string.IsNullOrWhiteSpace(Path.GetExtension(filename)))
{
string val;
if(ctx.RequestHeaders.TryGetFirst("Content-Type",out val))
{
filename += $".{MimeTypesMap.GetExtension(val)}";
}
}
using(var f = File.Create(filename))
{
ctx.ReadToStream(f);
}
return filename;
}
/// <summary>
/// Write headers to stream
@ -110,6 +234,26 @@ namespace Tesses.WebServer
await ctx.NetworkStream.WriteAsync(data, 0, data.Length);
}
/// <summary>
/// Write headers to stream
/// </summary>
/// <param name="ctx">ServerContext</param>
public static void WriteHeaders(this ServerContext ctx)
{
string status_line = $"HTTP/1.1 {ctx.StatusCode} {StatusCodeMap.GetStatusString(ctx.StatusCode)}\r\n";
StringBuilder b = new StringBuilder(status_line);
foreach (var hdr in ctx.ResponseHeaders)
{
foreach (var v in hdr.Value)
{
b.Append($"{hdr.Key}: {v}\r\n");
}
}
b.Append("\r\n");
var data = Encoding.UTF8.GetBytes(b.ToString());
ctx.NetworkStream.Write(data, 0, data.Length);
}
/// <summary>
/// Send file to client (supports range partial content)
/// </summary>
/// <param name="ctx">ServerContext</param>
@ -121,6 +265,19 @@ namespace Tesses.WebServer
{
await ctx.SendStreamAsync( strm, MimeTypesMap.GetMimeType(file));
}
}
/// <summary>
/// Send file to client (supports range partial content)
/// </summary>
/// <param name="ctx">ServerContext</param>
/// <param name="file">the file to serve</param>
public static void SendFile(this ServerContext ctx, string file)
{
using (var strm = File.OpenRead(file))
{
ctx.SendStream( strm, MimeTypesMap.GetMimeType(file));
}
}
/// <summary>
/// Send exception to client
@ -136,6 +293,19 @@ namespace Tesses.WebServer
await ctx.SendTextAsync(j);
}
/// <summary>
/// Send exception to client
/// </summary>
/// <param name="ctx">ServerContext</param>
/// <param name="ex">the Exception</param>
public static void SendException(this ServerContext ctx, Exception ex)
{
string name = ex.GetType().FullName;
string j = $"<html><head><title>{WebUtility.HtmlEncode(name)} thrown</title></head><body><h1>{WebUtility.HtmlEncode(name)} thrown</h1><h3>Description: {WebUtility.HtmlEncode(ex.Message)}</h3></body></html>";
ctx.StatusCode = 500;
ctx.SendText(j);
}
/// <summary>
/// Send object as json to client
/// </summary>
/// <param name="ctx">ServerContext</param>
@ -145,6 +315,15 @@ namespace Tesses.WebServer
await ctx.SendTextAsync(JsonConvert.SerializeObject(value), "application/json");
}
/// <summary>
/// Send object as json to client
/// </summary>
/// <param name="ctx">ServerContext</param>
/// <param name="value">an object to serialize with newtonsoft.json</param>
public static void SendJson(this ServerContext ctx,object value)
{
ctx.SendText(JsonConvert.SerializeObject(value), "application/json");
}
/// <summary>
/// Send text to client
/// </summary>
/// <param name="ctx">ServerContext</param>
@ -156,6 +335,17 @@ namespace Tesses.WebServer
await ctx.SendBytesAsync(Encoding.UTF8.GetBytes(data), contentType);
}
/// <summary>
/// Send text to client
/// </summary>
/// <param name="ctx">ServerContext</param>
/// <param name="data">some text</param>
/// <param name="contentType">mime type</param>
public static void SendText(this ServerContext ctx, string data, string contentType = "text/html")
{
ctx.SendBytes(Encoding.UTF8.GetBytes(data), contentType);
}
/// <summary>
/// Send redirect
/// </summary>
/// <param name="ctx">ServerContext</param>
@ -168,6 +358,18 @@ namespace Tesses.WebServer
await ctx.WriteHeadersAsync();
}
/// <summary>
/// Send redirect
/// </summary>
/// <param name="ctx">ServerContext</param>
/// <param name="url">Url to redirect to</param>
public static void SendRedirect(this ServerContext ctx,string url)
{
ctx.StatusCode = 301;
ctx.ResponseHeaders.Add("Cache-Control","no-cache");
ctx.ResponseHeaders.Add("Location",url);
ctx.WriteHeaders();
}
/// <summary>
/// Send byte[] to client
/// </summary>
/// <param name="ctx">ServerContext</param>
@ -181,6 +383,19 @@ namespace Tesses.WebServer
}
}
/// <summary>
/// Send byte[] to client
/// </summary>
/// <param name="ctx">ServerContext</param>
/// <param name="array">a byte[] array</param>
/// <param name="contentType">mime type</param>
public static void SendBytes(this ServerContext ctx, byte[] array, string contentType = "application/octet-stream")
{
using (var ms = new MemoryStream(array))
{
ctx.SendStream( ms, contentType);
}
}
/// <summary>
/// Get first item in Dictionary<T1,List<T2>> based on key
/// </summary>
/// <param name="args">the dictionary with list<T2> value</param>
@ -461,7 +676,7 @@ namespace Tesses.WebServer
{
ctx.StatusCode = (int)HttpStatusCode.MethodNotAllowed;
await ctx.SendTextAsync("Method Not Supported");
}
/// <summary>
/// Called on OPTIONS Request
@ -839,35 +1054,96 @@ namespace Tesses.WebServer
public bool PrintUrls {get;set;}
bool https;
X509Certificate cert;
IServer _server;
ChangeableServer _server;
TcpListener _listener;
SslProtocols protocols;
public HttpServerListener(int port)
{
_server=new ChangeableServer();
_listener = new TcpListener(new IPEndPoint(IPAddress.Any,port));
https = false;
PrintUrls=false;
}
public HttpServerListener(IPEndPoint endpoint)
{
_server=new ChangeableServer();
_listener = new TcpListener(endpoint);
https = false;
PrintUrls=false;
}
public HttpServerListener(int port,IServer server)
{
_server=new ChangeableServer();
_listener = new TcpListener(new IPEndPoint(IPAddress.Any,port));
_server.Server=server;
https = false;
PrintUrls=false;
}
public HttpServerListener(IPEndPoint endPoint,IServer server)
{
_server=new ChangeableServer();
_listener = new TcpListener(endPoint);
_server = server;
_server.Server=server;
https = false;
PrintUrls=false;
}
public HttpServerListener(IServer server)
{
_server=new ChangeableServer();
_listener = new TcpListener(new IPEndPoint(IPAddress.Any, 3251));
_server = server;
_server.Server = server;
https = false;
PrintUrls=false;
}
public HttpServerListener()
{
_server=new ChangeableServer();
_listener = new TcpListener(new IPEndPoint(IPAddress.Any, 3251));
https = false;
PrintUrls=false;
}
public HttpServerListener(int port,IServer server,X509Certificate cert,SslProtocols protocols=SslProtocols.Default)
{
_server=new ChangeableServer();
_listener = new TcpListener(new IPEndPoint(IPAddress.Any,port));
_server.Server = server;
https = cert != null;
this.cert = cert;
this.protocols=protocols;
PrintUrls=false;
}
public HttpServerListener(IPEndPoint endpoint,IServer server,X509Certificate cert,SslProtocols protocols=SslProtocols.Default)
{
_server=new ChangeableServer();
_listener = new TcpListener(endpoint);
_server = server;
_server.Server = server;
https = cert != null;
this.cert = cert;
this.protocols=protocols;
PrintUrls=false;
}
public void Listen()
{
ListenAsync().Wait();
}
public void Listen(CancellationToken token)
{
ListenAsync(token).Wait();
}
public async Task ListenAsync()
{
await ListenAsync(CancellationToken.None);
}
public async Task ListenAsync(CancellationToken token)
{
_listener.Start();
using (var r = token.Register(() => _listener.Stop())) {
while (!token.IsCancellationRequested)
@ -876,7 +1152,9 @@ namespace Tesses.WebServer
var socket=await _listener.AcceptTcpClientAsync();
Task.Factory.StartNew(async()=>{
try{
await CommunicateHostAsync(socket);
await CommunicateHostAsync(socket,()=>{
return socket.Connected;
});
}catch(Exception ex)
{
_=ex;
@ -891,6 +1169,10 @@ namespace Tesses.WebServer
}
}
public async Task PushAsync(Stream strm,EndPoint local,EndPoint remote)
{
await PushAsync(strm,local,remote,null);
}
public async Task PushAsync(Stream strm,EndPoint local,EndPoint remote,Func<bool> isConnected)
{
string request_line = "";
string res=ReadHeaders(strm);
@ -904,7 +1186,7 @@ namespace Tesses.WebServer
{
string path = request[1];
string ver = request[2];
ctx = new ServerContext(method, strm, path, headers);
ctx = new ServerContext(method, strm, path, headers,isConnected);
ctx.Server =local as IPEndPoint;
ctx.Client = remote as IPEndPoint;
_server.AddCors(ctx);
@ -956,7 +1238,9 @@ namespace Tesses.WebServer
{
try{
var socket=await _listener.AcceptTcpClientAsync();
await CommunicateHostAsync(socket).ConfigureAwait(false);
await CommunicateHostAsync(socket,()=>{
return socket.Connected;
}).ConfigureAwait(false);
}catch(Exception ex)
{
_=ex;
@ -1023,7 +1307,7 @@ namespace Tesses.WebServer
}
return clt.GetStream();
}
private async Task CommunicateHostAsync(TcpClient clt)
private async Task CommunicateHostAsync(TcpClient clt,Func<bool> isConnected)
{
try{
//<METHOD> <PATH> HTTP/1.1\r\n
@ -1039,7 +1323,7 @@ namespace Tesses.WebServer
using (Stream strm = GetStream(clt))
{
await PushAsync(strm,clt.Client.LocalEndPoint,clt.Client.RemoteEndPoint);
await PushAsync(strm,clt.Client.LocalEndPoint,clt.Client.RemoteEndPoint,isConnected);
}
}catch(Exception ex)
{