Added multiplayer plugin

This commit is contained in:
Anthony Berg
2020-11-30 08:12:07 +00:00
parent 9cb342dd42
commit f64cf54803
450 changed files with 33131 additions and 10 deletions

View File

@@ -0,0 +1,213 @@
using System;
using System.Collections.Concurrent;
using System.Net.Sockets;
using System.Threading;
namespace Telepathy
{
public class Client : Common
{
public TcpClient client;
Thread receiveThread;
Thread sendThread;
// TcpClient.Connected doesn't check if socket != null, which
// results in NullReferenceExceptions if connection was closed.
// -> let's check it manually instead
public bool Connected => client != null &&
client.Client != null &&
client.Client.Connected;
// TcpClient has no 'connecting' state to check. We need to keep track
// of it manually.
// -> checking 'thread.IsAlive && !Connected' is not enough because the
// thread is alive and connected is false for a short moment after
// disconnecting, so this would cause race conditions.
// -> we use a threadsafe bool wrapper so that ThreadFunction can remain
// static (it needs a common lock)
// => Connecting is true from first Connect() call in here, through the
// thread start, until TcpClient.Connect() returns. Simple and clear.
// => bools are atomic according to
// https://docs.microsoft.com/en-us/dotnet/csharp/language-reference/language-specification/variables
// made volatile so the compiler does not reorder access to it
volatile bool _Connecting;
public bool Connecting => _Connecting;
// send queue
// => SafeQueue is twice as fast as ConcurrentQueue, see SafeQueue.cs!
SafeQueue<byte[]> sendQueue = new SafeQueue<byte[]>();
// ManualResetEvent to wake up the send thread. better than Thread.Sleep
// -> call Set() if everything was sent
// -> call Reset() if there is something to send again
// -> call WaitOne() to block until Reset was called
ManualResetEvent sendPending = new ManualResetEvent(false);
// the thread function
void ReceiveThreadFunction(string ip, int port)
{
// absolutely must wrap with try/catch, otherwise thread
// exceptions are silent
try
{
// connect (blocking)
client.Connect(ip, port);
_Connecting = false;
// set socket options after the socket was created in Connect()
// (not after the constructor because we clear the socket there)
client.NoDelay = NoDelay;
client.SendTimeout = SendTimeout;
// start send thread only after connected
sendThread = new Thread(() => { SendLoop(0, client, sendQueue, sendPending); });
sendThread.IsBackground = true;
sendThread.Start();
// run the receive loop
ReceiveLoop(0, client, receiveQueue, MaxMessageSize);
}
catch (SocketException exception)
{
// this happens if (for example) the ip address is correct
// but there is no server running on that ip/port
Logger.LogError($"Client Recv: failed to connect to ip={ip} port={port} reason={exception}");
// add 'Disconnected' event to message queue so that the caller
// knows that the Connect failed. otherwise they will never know
receiveQueue.Enqueue(new Message(0, EventType.Disconnected, null));
}
catch (ThreadInterruptedException)
{
// expected if Disconnect() aborts it
}
catch (ThreadAbortException)
{
// expected if Disconnect() aborts it
}
catch (Exception exception)
{
// something went wrong. probably important.
Logger.LogError($"Client Recv Exception: {exception}");
}
// sendthread might be waiting on ManualResetEvent,
// so let's make sure to end it if the connection
// closed.
// otherwise the send thread would only end if it's
// actually sending data while the connection is
// closed.
sendThread?.Interrupt();
// Connect might have failed. thread might have been closed.
// let's reset connecting state no matter what.
_Connecting = false;
// if we got here then we are done. ReceiveLoop cleans up already,
// but we may never get there if connect fails. so let's clean up
// here too.
client?.Close();
}
public void Connect(string ip, int port)
{
// not if already started
if (Connecting || Connected)
{
Logger.LogWarning("Telepathy Client can not create connection because an existing connection is connecting or connected");
return;
}
// We are connecting from now until Connect succeeds or fails
_Connecting = true;
// create a TcpClient with perfect IPv4, IPv6 and hostname resolving
// support.
//
// * TcpClient(hostname, port): works but would connect (and block)
// already
// * TcpClient(AddressFamily.InterNetworkV6): takes Ipv4 and IPv6
// addresses but only connects to IPv6 servers (e.g. Telepathy).
// does NOT connect to IPv4 servers (e.g. Mirror Booster), even
// with DualMode enabled.
// * TcpClient(): creates IPv4 socket internally, which would force
// Connect() to only use IPv4 sockets.
//
// => the trick is to clear the internal IPv4 socket so that Connect
// resolves the hostname and creates either an IPv4 or an IPv6
// socket as needed (see TcpClient source)
// creates IPv4 socket
client = new TcpClient();
// clear internal IPv4 socket until Connect()
client.Client = null;
// clear old messages in queue, just to be sure that the caller
// doesn't receive data from last time and gets out of sync.
// -> calling this in Disconnect isn't smart because the caller may
// still want to process all the latest messages afterwards
receiveQueue = new ConcurrentQueue<Message>();
sendQueue.Clear();
// client.Connect(ip, port) is blocking. let's call it in the thread
// and return immediately.
// -> this way the application doesn't hang for 30s if connect takes
// too long, which is especially good in games
// -> this way we don't async client.BeginConnect, which seems to
// fail sometimes if we connect too many clients too fast
receiveThread = new Thread(() => { ReceiveThreadFunction(ip, port); });
receiveThread.IsBackground = true;
receiveThread.Start();
}
public void Disconnect()
{
// only if started
if (Connecting || Connected)
{
// close client
client.Close();
// wait until thread finished. this is the only way to guarantee
// that we can call Connect() again immediately after Disconnect
// -> calling .Join would sometimes wait forever, e.g. when
// calling Disconnect while trying to connect to a dead end
receiveThread?.Interrupt();
// we interrupted the receive Thread, so we can't guarantee that
// connecting was reset. let's do it manually.
_Connecting = false;
// clear send queues. no need to hold on to them.
// (unlike receiveQueue, which is still needed to process the
// latest Disconnected message, etc.)
sendQueue.Clear();
// let go of this one completely. the thread ended, no one uses
// it anymore and this way Connected is false again immediately.
client = null;
}
}
public bool Send(byte[] data)
{
if (Connected)
{
// respect max message size to avoid allocation attacks.
if (data.Length <= MaxMessageSize)
{
// add to send queue and return immediately.
// calling Send here would be blocking (sometimes for long times
// if other side lags or wire was disconnected)
sendQueue.Enqueue(data);
// interrupt SendThread WaitOne()
sendPending.Set();
return true;
}
Logger.LogError($"Client.Send: message too big: {data.Length}. Limit: {MaxMessageSize}");
return false;
}
Logger.LogWarning("Client.Send: not connected!");
return false;
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: a5b95294cc4ec4b15aacba57531c7985
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,296 @@
// common code used by server and client
using System;
using System.Collections.Concurrent;
using System.Net.Sockets;
using System.Threading;
namespace Telepathy
{
public abstract class Common
{
/////////////////////////////////////////////////////////
// common code
// incoming message queue of <connectionId, message>
// (not a HashSet because one connection can have multiple new messages)
protected ConcurrentQueue<Message> receiveQueue = new ConcurrentQueue<Message>();
// queue count, useful for debugging / benchmarks
public int ReceiveQueueCount => receiveQueue.Count;
// warning if message queue gets too big
// if the average message is about 20 bytes then:
// - 1k messages are 20KB
// - 10k messages are 200KB
// - 100k messages are 1.95MB
// 2MB are not that much, but it is a bad sign if the caller process
// can't call GetNextMessage faster than the incoming messages.
public static int messageQueueSizeWarning = 100000;
// removes and returns the oldest message from the message queue.
// (might want to call this until it doesn't return anything anymore)
// -> Connected, Data, Disconnected events are all added here
// -> bool return makes while (GetMessage(out Message)) easier!
// -> no 'is client connected' check because we still want to read the
// Disconnected message after a disconnect
public bool GetNextMessage(out Message message)
{
return receiveQueue.TryDequeue(out message);
}
// NoDelay disables nagle algorithm. lowers CPU% and latency but
// increases bandwidth
public bool NoDelay = true;
// Prevent allocation attacks. Each packet is prefixed with a length
// header, so an attacker could send a fake packet with length=2GB,
// causing the server to allocate 2GB and run out of memory quickly.
// -> simply increase max packet size if you want to send around bigger
// files!
// -> 16KB per message should be more than enough.
public int MaxMessageSize = 16 * 1024;
// Send would stall forever if the network is cut off during a send, so
// we need a timeout (in milliseconds)
public int SendTimeout = 5000;
// avoid header[4] allocations but don't use one buffer for all threads
[ThreadStatic] static byte[] header;
// avoid payload[packetSize] allocations but don't use one buffer for
// all threads
[ThreadStatic] static byte[] payload;
/////////////////////////////////////////////
// static helper functions
// send message (via stream) with the <size,content> message structure
// this function is blocking sometimes!
// (e.g. if someone has high latency or wire was cut off)
protected static bool SendMessagesBlocking(NetworkStream stream, byte[][] messages)
{
// stream.Write throws exceptions if client sends with high
// frequency and the server stops
try
{
// we might have multiple pending messages. merge into one
// packet to avoid TCP overheads and improve performance.
int packetSize = 0;
for (int i = 0; i < messages.Length; ++i)
// header + content
packetSize += sizeof(int) + messages[i].Length;
// create payload buffer if not created yet or previous one is
// too small
// IMPORTANT: payload.Length might be > packetSize! don't use it!
if (payload == null || payload.Length < packetSize)
payload = new byte[packetSize];
// create the packet
int position = 0;
for (int i = 0; i < messages.Length; ++i)
{
// create header buffer if not created yet
if (header == null)
header = new byte[4];
// construct header (size)
Utils.IntToBytesBigEndianNonAlloc(messages[i].Length, header);
// copy header + message into buffer
Array.Copy(header, 0, payload, position, header.Length);
Array.Copy(messages[i], 0, payload, position + header.Length, messages[i].Length);
position += header.Length + messages[i].Length;
}
// write the whole thing
stream.Write(payload, 0, packetSize);
return true;
}
catch (Exception exception)
{
// log as regular message because servers do shut down sometimes
Logger.Log("Send: stream.Write exception: " + exception);
return false;
}
}
// read message (via stream) with the <size,content> message structure
protected static bool ReadMessageBlocking(NetworkStream stream, int MaxMessageSize, out byte[] content)
{
content = null;
// create header buffer if not created yet
if (header == null)
header = new byte[4];
// read exactly 4 bytes for header (blocking)
if (!stream.ReadExactly(header, 4))
return false;
// convert to int
int size = Utils.BytesToIntBigEndian(header);
// protect against allocation attacks. an attacker might send
// multiple fake '2GB header' packets in a row, causing the server
// to allocate multiple 2GB byte arrays and run out of memory.
if (size <= MaxMessageSize)
{
// read exactly 'size' bytes for content (blocking)
content = new byte[size];
return stream.ReadExactly(content, size);
}
Logger.LogWarning("ReadMessageBlocking: possible allocation attack with a header of: " + size + " bytes.");
return false;
}
// thread receive function is the same for client and server's clients
// (static to reduce state for maximum reliability)
protected static void ReceiveLoop(int connectionId, TcpClient client, ConcurrentQueue<Message> receiveQueue, int MaxMessageSize)
{
// get NetworkStream from client
NetworkStream stream = client.GetStream();
// keep track of last message queue warning
DateTime messageQueueLastWarning = DateTime.Now;
// absolutely must wrap with try/catch, otherwise thread exceptions
// are silent
try
{
// add connected event to queue with ip address as data in case
// it's needed
receiveQueue.Enqueue(new Message(connectionId, EventType.Connected, null));
// let's talk about reading data.
// -> normally we would read as much as possible and then
// extract as many <size,content>,<size,content> messages
// as we received this time. this is really complicated
// and expensive to do though
// -> instead we use a trick:
// Read(2) -> size
// Read(size) -> content
// repeat
// Read is blocking, but it doesn't matter since the
// best thing to do until the full message arrives,
// is to wait.
// => this is the most elegant AND fast solution.
// + no resizing
// + no extra allocations, just one for the content
// + no crazy extraction logic
while (true)
{
// read the next message (blocking) or stop if stream closed
byte[] content;
if (!ReadMessageBlocking(stream, MaxMessageSize, out content))
// break instead of return so stream close still happens!
break;
// queue it
receiveQueue.Enqueue(new Message(connectionId, EventType.Data, content));
// and show a warning if the queue gets too big
// -> we don't want to show a warning every single time,
// because then a lot of processing power gets wasted on
// logging, which will make the queue pile up even more.
// -> instead we show it every 10s, so that the system can
// use most it's processing power to hopefully process it.
if (receiveQueue.Count > messageQueueSizeWarning)
{
TimeSpan elapsed = DateTime.Now - messageQueueLastWarning;
if (elapsed.TotalSeconds > 10)
{
Logger.LogWarning("ReceiveLoop: messageQueue is getting big(" + receiveQueue.Count + "), try calling GetNextMessage more often. You can call it more than once per frame!");
messageQueueLastWarning = DateTime.Now;
}
}
}
}
catch (Exception exception)
{
// something went wrong. the thread was interrupted or the
// connection closed or we closed our own connection or ...
// -> either way we should stop gracefully
Logger.Log("ReceiveLoop: finished receive function for connectionId=" + connectionId + " reason: " + exception);
}
finally
{
// clean up no matter what
stream.Close();
client.Close();
// add 'Disconnected' message after disconnecting properly.
// -> always AFTER closing the streams to avoid a race condition
// where Disconnected -> Reconnect wouldn't work because
// Connected is still true for a short moment before the stream
// would be closed.
receiveQueue.Enqueue(new Message(connectionId, EventType.Disconnected, null));
}
}
// thread send function
// note: we really do need one per connection, so that if one connection
// blocks, the rest will still continue to get sends
protected static void SendLoop(int connectionId, TcpClient client, SafeQueue<byte[]> sendQueue, ManualResetEvent sendPending)
{
// get NetworkStream from client
NetworkStream stream = client.GetStream();
try
{
// try this. client will get closed eventually.
while (client.Connected)
{
// reset ManualResetEvent before we do anything else. this
// way there is no race condition. if Send() is called again
// while in here then it will be properly detected next time
// -> otherwise Send might be called right after dequeue but
// before .Reset, which would completely ignore it until
// the next Send call.
// WaitOne() blocks until .Set() again
sendPending.Reset();
// dequeue all
// SafeQueue.TryDequeueAll is twice as fast as
// ConcurrentQueue, see SafeQueue.cs!
byte[][] messages;
if (sendQueue.TryDequeueAll(out messages))
{
// send message (blocking) or stop if stream is closed
if (!SendMessagesBlocking(stream, messages))
// break instead of return so stream close still happens!
break;
}
// don't choke up the CPU: wait until queue not empty anymore
sendPending.WaitOne();
}
}
catch (ThreadAbortException)
{
// happens on stop. don't log anything.
}
catch (ThreadInterruptedException)
{
// happens if receive thread interrupts send thread.
}
catch (Exception exception)
{
// something went wrong. the thread was interrupted or the
// connection closed or we closed our own connection or ...
// -> either way we should stop gracefully
Logger.Log("SendLoop Exception: connectionId=" + connectionId + " reason: " + exception);
}
finally
{
// clean up no matter what
// we might get SocketExceptions when sending if the 'host has
// failed to respond' - in which case we should close the connection
// which causes the ReceiveLoop to end and fire the Disconnected
// message. otherwise the connection would stay alive forever even
// though we can't send anymore.
stream.Close();
client.Close();
}
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: c4d56322cf0e248a89103c002a505dab
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,9 @@
namespace Telepathy
{
public enum EventType
{
Connected,
Data,
Disconnected
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 49f1a330755814803be5f27f493e1910
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2018, vis2k
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: 0ba11103b95fd4721bffbb08440d5b8e
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,15 @@
// A simple logger class that uses Console.WriteLine by default.
// Can also do Logger.LogMethod = Debug.Log for Unity etc.
// (this way we don't have to depend on UnityEngine.DLL and don't need a
// different version for every UnityEngine version here)
using System;
namespace Telepathy
{
public static class Logger
{
public static Action<string> Log = Console.WriteLine;
public static Action<string> LogWarning = Console.WriteLine;
public static Action<string> LogError = Console.Error.WriteLine;
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: aa8d703f0b73f4d6398b76812719b68b
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,18 @@
// incoming message queue of <connectionId, message>
// (not a HashSet because one connection can have multiple new messages)
// -> a struct to minimize GC
namespace Telepathy
{
public struct Message
{
public readonly int connectionId;
public readonly EventType eventType;
public readonly byte[] data;
public Message(int connectionId, EventType eventType, byte[] data)
{
this.connectionId = connectionId;
this.eventType = eventType;
this.data = data;
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: aedf812e9637b4f92a35db1aedca8c92
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,59 @@
using System.IO;
using System.Net.Sockets;
namespace Telepathy
{
public static class NetworkStreamExtensions
{
// .Read returns '0' if remote closed the connection but throws an
// IOException if we voluntarily closed our own connection.
//
// let's add a ReadSafely method that returns '0' in both cases so we don't
// have to worry about exceptions, since a disconnect is a disconnect...
public static int ReadSafely(this NetworkStream stream, byte[] buffer, int offset, int size)
{
try
{
return stream.Read(buffer, offset, size);
}
catch (IOException)
{
return 0;
}
}
// helper function to read EXACTLY 'n' bytes
// -> default .Read reads up to 'n' bytes. this function reads exactly 'n'
// bytes
// -> this is blocking until 'n' bytes were received
// -> immediately returns false in case of disconnects
public static bool ReadExactly(this NetworkStream stream, byte[] buffer, int amount)
{
// there might not be enough bytes in the TCP buffer for .Read to read
// the whole amount at once, so we need to keep trying until we have all
// the bytes (blocking)
//
// note: this just is a faster version of reading one after another:
// for (int i = 0; i < amount; ++i)
// if (stream.Read(buffer, i, 1) == 0)
// return false;
// return true;
int bytesRead = 0;
while (bytesRead < amount)
{
// read up to 'remaining' bytes with the 'safe' read extension
int remaining = amount - bytesRead;
int result = stream.ReadSafely(buffer, bytesRead, remaining);
// .Read returns 0 if disconnected
if (result == 0)
return false;
// otherwise add to bytes read
bytesRead += result;
}
return true;
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 7a8076c43fa8d4d45831adae232d4d3c
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,75 @@
// Net 4.X has ConcurrentQueue, but ConcurrentQueue has no TryDequeueAll method,
// which makes SafeQueue twice as fast for the send thread.
//
// uMMORPG 450 CCU
// SafeQueue: 900-1440ms latency
// ConcurrentQueue: 2000ms latency
//
// It's also noticeable in the LoadTest project, which hardly handles 300 CCU
// with ConcurrentQueue!
using System.Collections.Generic;
namespace Telepathy
{
public class SafeQueue<T>
{
readonly Queue<T> queue = new Queue<T>();
// for statistics. don't call Count and assume that it's the same after the
// call.
public int Count
{
get
{
lock (queue)
{
return queue.Count;
}
}
}
public void Enqueue(T item)
{
lock (queue)
{
queue.Enqueue(item);
}
}
// can't check .Count before doing Dequeue because it might change inbetween,
// so we need a TryDequeue
public bool TryDequeue(out T result)
{
lock (queue)
{
result = default(T);
if (queue.Count > 0)
{
result = queue.Dequeue();
return true;
}
return false;
}
}
// for when we want to dequeue and remove all of them at once without
// locking every single TryDequeue.
public bool TryDequeueAll(out T[] result)
{
lock (queue)
{
result = queue.ToArray();
queue.Clear();
return result.Length > 0;
}
}
public void Clear()
{
lock (queue)
{
queue.Clear();
}
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 8fc06e2fb29854a0c9e90c0188d36a08
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,298 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace Telepathy
{
public class Server : Common
{
// listener
public TcpListener listener;
Thread listenerThread;
// class with all the client's data. let's call it Token for consistency
// with the async socket methods.
class ClientToken
{
public TcpClient client;
// send queue
// SafeQueue is twice as fast as ConcurrentQueue, see SafeQueue.cs!
public SafeQueue<byte[]> sendQueue = new SafeQueue<byte[]>();
// ManualResetEvent to wake up the send thread. better than Thread.Sleep
// -> call Set() if everything was sent
// -> call Reset() if there is something to send again
// -> call WaitOne() to block until Reset was called
public ManualResetEvent sendPending = new ManualResetEvent(false);
public ClientToken(TcpClient client)
{
this.client = client;
}
}
// clients with <connectionId, ClientData>
readonly ConcurrentDictionary<int, ClientToken> clients = new ConcurrentDictionary<int, ClientToken>();
// connectionId counter
int counter;
// public next id function in case someone needs to reserve an id
// (e.g. if hostMode should always have 0 connection and external
// connections should start at 1, etc.)
public int NextConnectionId()
{
int id = Interlocked.Increment(ref counter);
// it's very unlikely that we reach the uint limit of 2 billion.
// even with 1 new connection per second, this would take 68 years.
// -> but if it happens, then we should throw an exception because
// the caller probably should stop accepting clients.
// -> it's hardly worth using 'bool Next(out id)' for that case
// because it's just so unlikely.
if (id == int.MaxValue)
{
throw new Exception("connection id limit reached: " + id);
}
return id;
}
// check if the server is running
public bool Active => listenerThread != null && listenerThread.IsAlive;
// the listener thread's listen function
// note: no maxConnections parameter. high level API should handle that.
// (Transport can't send a 'too full' message anyway)
void Listen(int port)
{
// absolutely must wrap with try/catch, otherwise thread
// exceptions are silent
try
{
// start listener on all IPv4 and IPv6 address via .Create
listener = TcpListener.Create(port);
listener.Server.NoDelay = NoDelay;
listener.Server.SendTimeout = SendTimeout;
listener.Start();
Logger.Log("Server: listening port=" + port);
// keep accepting new clients
while (true)
{
// wait and accept new client
// note: 'using' sucks here because it will try to
// dispose after thread was started but we still need it
// in the thread
TcpClient client = listener.AcceptTcpClient();
// set socket options
client.NoDelay = NoDelay;
client.SendTimeout = SendTimeout;
// generate the next connection id (thread safely)
int connectionId = NextConnectionId();
// add to dict immediately
ClientToken token = new ClientToken(client);
clients[connectionId] = token;
// spawn a send thread for each client
Thread sendThread = new Thread(() =>
{
// wrap in try-catch, otherwise Thread exceptions
// are silent
try
{
// run the send loop
SendLoop(connectionId, client, token.sendQueue, token.sendPending);
}
catch (ThreadAbortException)
{
// happens on stop. don't log anything.
// (we catch it in SendLoop too, but it still gets
// through to here when aborting. don't show an
// error.)
}
catch (Exception exception)
{
Logger.LogError("Server send thread exception: " + exception);
}
});
sendThread.IsBackground = true;
sendThread.Start();
// spawn a receive thread for each client
Thread receiveThread = new Thread(() =>
{
// wrap in try-catch, otherwise Thread exceptions
// are silent
try
{
// run the receive loop
ReceiveLoop(connectionId, client, receiveQueue, MaxMessageSize);
// remove client from clients dict afterwards
clients.TryRemove(connectionId, out ClientToken _);
// sendthread might be waiting on ManualResetEvent,
// so let's make sure to end it if the connection
// closed.
// otherwise the send thread would only end if it's
// actually sending data while the connection is
// closed.
sendThread.Interrupt();
}
catch (Exception exception)
{
Logger.LogError("Server client thread exception: " + exception);
}
});
receiveThread.IsBackground = true;
receiveThread.Start();
}
}
catch (ThreadAbortException exception)
{
// UnityEditor causes AbortException if thread is still
// running when we press Play again next time. that's okay.
Logger.Log("Server thread aborted. That's okay. " + exception);
}
catch (SocketException exception)
{
// calling StopServer will interrupt this thread with a
// 'SocketException: interrupted'. that's okay.
Logger.Log("Server Thread stopped. That's okay. " + exception);
}
catch (Exception exception)
{
// something went wrong. probably important.
Logger.LogError("Server Exception: " + exception);
}
}
// start listening for new connections in a background thread and spawn
// a new thread for each one.
public bool Start(int port)
{
// not if already started
if (Active)
return false;
// clear old messages in queue, just to be sure that the caller
// doesn't receive data from last time and gets out of sync.
// -> calling this in Stop isn't smart because the caller may
// still want to process all the latest messages afterwards
receiveQueue = new ConcurrentQueue<Message>();
// start the listener thread
// (on low priority. if main thread is too busy then there is not
// much value in accepting even more clients)
Logger.Log("Server: Start port=" + port);
listenerThread = new Thread(() => { Listen(port); });
listenerThread.IsBackground = true;
listenerThread.Priority = ThreadPriority.BelowNormal;
listenerThread.Start();
return true;
}
public void Stop()
{
// only if started
if (!Active)
return;
Logger.Log("Server: stopping...");
// stop listening to connections so that no one can connect while we
// close the client connections
// (might be null if we call Stop so quickly after Start that the
// thread was interrupted before even creating the listener)
listener?.Stop();
// kill listener thread at all costs. only way to guarantee that
// .Active is immediately false after Stop.
// -> calling .Join would sometimes wait forever
listenerThread?.Interrupt();
listenerThread = null;
// close all client connections
foreach (KeyValuePair<int, ClientToken> kvp in clients)
{
TcpClient client = kvp.Value.client;
// close the stream if not closed yet. it may have been closed
// by a disconnect already, so use try/catch
try { client.GetStream().Close(); } catch { }
client.Close();
}
// clear clients list
clients.Clear();
// reset the counter in case we start up again so
// clients get connection ID's starting from 1
counter = 0;
}
// send message to client using socket connection.
public bool Send(int connectionId, byte[] data)
{
// respect max message size to avoid allocation attacks.
if (data.Length <= MaxMessageSize)
{
// find the connection
ClientToken token;
if (clients.TryGetValue(connectionId, out token))
{
// add to send queue and return immediately.
// calling Send here would be blocking (sometimes for long times
// if other side lags or wire was disconnected)
token.sendQueue.Enqueue(data);
// interrupt SendThread WaitOne()
token.sendPending.Set();
return true;
}
// sending to an invalid connectionId is expected sometimes.
// for example, if a client disconnects, the server might still
// try to send for one frame before it calls GetNextMessages
// again and realizes that a disconnect happened.
// so let's not spam the console with log messages.
//Logger.Log("Server.Send: invalid connectionId: " + connectionId);
return false;
}
Logger.LogError("Client.Send: message too big: " + data.Length + ". Limit: " + MaxMessageSize);
return false;
}
// client's ip is sometimes needed by the server, e.g. for bans
public string GetClientAddress(int connectionId)
{
// find the connection
ClientToken token;
if (clients.TryGetValue(connectionId, out token))
{
return ((IPEndPoint)token.client.Client.RemoteEndPoint).Address.ToString();
}
return "";
}
// disconnect (kick) a client
public bool Disconnect(int connectionId)
{
// find the connection
ClientToken token;
if (clients.TryGetValue(connectionId, out token))
{
// just close it. client thread will take care of the rest.
token.client.Close();
Logger.Log("Server.Disconnect connectionId:" + connectionId);
return true;
}
return false;
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: fb98a16841ccc4338a7e0b4e59136563
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,12 @@
{
"name": "Telepathy",
"references": [],
"optionalUnityReferences": [],
"includePlatforms": [],
"excludePlatforms": [],
"allowUnsafeCode": false,
"overrideReferences": false,
"precompiledReferences": [],
"autoReferenced": true,
"defineConstraints": []
}

View File

@@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: 725ee7191c021de4dbf9269590ded755
AssemblyDefinitionImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,26 @@
using System.Threading;
namespace Telepathy
{
public static class ThreadExtensions
{
// helper function to abort a thread and not return until it's fully done
public static void AbortAndJoin(this Thread thread)
{
// kill thread at all costs
// -> calling .Join would sometimes wait forever
// -> calling .Interrupt only interrupts certain states.
// => Abort() is the better solution.
thread.Abort();
// wait until thread is TRULY finished. this is the only way
// to guarantee that everything was properly cleaned up before
// returning.
// => this means that this function may sometimes block for a while
// but there is no other way to guarantee that everything is
// cleaned up properly by the time Stop() returns.
// we have to live with the wait time.
thread.Join();
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 64df4eaebe4ff9a43a9fb318c3e8e321
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@@ -0,0 +1,44 @@
namespace Telepathy
{
public static class Utils
{
// fast int to byte[] conversion and vice versa
// -> test with 100k conversions:
// BitConverter.GetBytes(ushort): 144ms
// bit shifting: 11ms
// -> 10x speed improvement makes this optimization actually worth it
// -> this way we don't need to allocate BinaryWriter/Reader either
// -> 4 bytes because some people may want to send messages larger than
// 64K bytes
// => big endian is standard for network transmissions, and necessary
// for compatibility with erlang
public static byte[] IntToBytesBigEndian(int value)
{
return new byte[] {
(byte)(value >> 24),
(byte)(value >> 16),
(byte)(value >> 8),
(byte)value
};
}
// IntToBytes version that doesn't allocate a new byte[4] each time.
// -> important for MMO scale networking performance.
public static void IntToBytesBigEndianNonAlloc(int value, byte[] bytes)
{
bytes[0] = (byte)(value >> 24);
bytes[1] = (byte)(value >> 16);
bytes[2] = (byte)(value >> 8);
bytes[3] = (byte)value;
}
public static int BytesToIntBigEndian(byte[] bytes)
{
return
(bytes[0] << 24) |
(bytes[1] << 16) |
(bytes[2] << 8) |
bytes[3];
}
}
}

View File

@@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 951d08c05297f4b3e8feb5bfcab86531
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant: