// // Class: MultiplexManager // Current CLR: 4.0.30319.296 // System: Microsoft Visual Studio 10.0 // Author: dani // Created: 4/8/2013 9:02:48 PM // // Copyright (c) 2013 Informatikbüro Daniel Schick. All rights reserved. using System; using System.Text; using System.Collections.Generic; using System.Threading; using System.IO; using System.Net; using System.Net.Sockets; using System.Diagnostics; using log4net; namespace LS100PortProxy { public class MultiplexManager { private int ifIdx; private int clientPort; private int serverPort; private string remoteServerAddress; private bool shouldStop = false; private Thread _acceptThread; private Socket _serverSocket; private List _connections = new List(); private TcpClient client; private Thread _clientThread; private NetworkStream clientStream; private DateTime lastDataReceivedAt; private ILog _log = LogManager.GetLogger(typeof(MultiplexManager)); private class Chunk { public int length; public byte[] data; public Chunk DeepClone() { Chunk newChunk = new Chunk(); newChunk.length = this.length; newChunk.data = new byte[this.length]; Array.Copy(this.data, newChunk.data, this.length); return newChunk; } } private class ConnectionInfo { public Socket Socket; public Thread Thread; public Queue chunks = new Queue(); } public MultiplexManager(int ifIdx, string address, int cPort, int sPort) { this.ifIdx = ifIdx; this.remoteServerAddress = address; this.clientPort = cPort; this.serverPort = sPort; } public void StartServer() { this.SetupServerSocket(); _acceptThread = new Thread(AcceptConnections); _acceptThread.IsBackground = true; _acceptThread.Start(); } public void StopServer() { this.shouldStop = true; _serverSocket.Close(); _acceptThread.Join(); } public static string ListIfs() { StringBuilder sb = new StringBuilder(); IPHostEntry localMachineInfo = Dns.GetHostEntry(Dns.GetHostName()); for(int i= 0;i< localMachineInfo.AddressList.Length; i++) { IPAddress address = localMachineInfo.AddressList[i]; sb.AppendLine(string.Format("# {0}: {1}", i, address.ToString())); } return sb.ToString(); } private void SetupServerSocket() { // Resolving local machine information IPHostEntry localMachineInfo = Dns.GetHostEntry(Dns.GetHostName()); IPEndPoint myEndpoint = new IPEndPoint(IPAddress.Any, serverPort); //IPEndPoint myEndpoint = new IPEndPoint(localMachineInfo.AddressList[this.ifIdx], serverPort); _log.InfoFormat("listening on {0}, port {1}", myEndpoint.Address, serverPort); // Create the socket, bind it, and start listening _serverSocket = new Socket(myEndpoint.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp); _serverSocket.Blocking = true; _serverSocket.Bind(myEndpoint); _serverSocket.Listen(32); } private void AcceptConnections() { while (!shouldStop) { // Accept a connection try { Socket socket = _serverSocket.Accept(); ConnectionInfo connection = new ConnectionInfo(); connection.Socket = socket; // Create the thread for the receives. connection.Thread = new Thread(ProcessConnection); connection.Thread.IsBackground = true; connection.Thread.Start(connection); // Store the socket lock (_connections) _connections.Add(connection); _log.InfoFormat("new connection from {0}, total connections: {1}", socket.RemoteEndPoint.ToString(), _connections.Count); } catch (SocketException ex) { /* tu was? */ _log.WarnFormat("Exception on server socket:{0}", ex.Message); } Thread.Sleep(250); } _log.Info("Server connection closed"); this._serverSocket.Close(); } private void ProcessConnection(object state) { ConnectionInfo connection = (ConnectionInfo)state; try { while (!shouldStop) { // write all chunks in incoming chunk queue to this socket if (connection.Socket.Poll(500, SelectMode.SelectWrite)) { while (connection.chunks.Count > 0) { Chunk aChunk = null; lock (_connections) { aChunk = connection.chunks.Dequeue(); //Trace.WriteLine(string.Format("{0} : {1}", connection.Socket.RemoteEndPoint, connection.chunks.Count)); if (connection.chunks.Count % 10 == 0) _log.WarnFormat("Chunk count reached {0} elements for Endpoint {1}", connection.chunks.Count, connection.Socket.RemoteEndPoint); } int bytesToSend = aChunk.length; byte[] buffer = aChunk.data; while (bytesToSend > 0) { //string testOutput = Encoding.ASCII.GetString(aChunk.data); //string[] lines = testOutput.Split('\r'); //foreach(string line in lines) // _log.Debug(line); int actuallySentBytes = connection.Socket.Send(buffer, bytesToSend, SocketFlags.None); //Trace.WriteLine(string.Format("{0}/{1}", actuallySentBytes, bytesToSend)); bytesToSend -= actuallySentBytes; if (bytesToSend > 0) { byte[] newBuffer = new byte[bytesToSend]; Array.Copy(buffer, actuallySentBytes, newBuffer, 0, bytesToSend); buffer = newBuffer; } } aChunk = null; } } else if(connection.Socket.Poll(500, SelectMode.SelectError)) { _log.WarnFormat("This Socket has an error, breaking the " + "forever loop"); lock (_connections) { connection.chunks.Clear(); } break; } else { _log.WarnFormat("cannot write to reader, clearing chunks.."); // dump queue in this case, packets cannot be sent.. lock (_connections) { connection.chunks.Clear(); } // break; } Thread.Sleep(500); } } catch (SocketException exc) { Console.WriteLine("Socket exception: " + exc.SocketErrorCode); _log.WarnFormat("SocketException: {0}", exc.SocketErrorCode); } catch (Exception exc) { Console.WriteLine("Exception: " + exc); } finally { connection.Socket.Close(); lock (_connections) _connections.Remove(connection); _log.InfoFormat("connection closed, total connections: {0}", _connections.Count); GC.Collect(); } } internal void StartClient() { this._clientThread = new Thread(new ThreadStart(this.ClientConnection)); this._clientThread.Start(); } private void ConnectClient() { try { if ((this.client != null) && (this.client.Connected)) return; if (this.client != null) { this.client.Close(); GC.Collect(); } this.client = new TcpClient(this.remoteServerAddress, this.clientPort); this.clientStream = client.GetStream(); _log.InfoFormat("TCP stream connected ({0}:{1})", this.remoteServerAddress, this.clientPort); } catch (Exception ex) { _log.ErrorFormat("ConnectClient(): {0}", ex.Message); } } private void ClientConnection() { while (!this.shouldStop) { try { if ((this.client == null) || !this.client.Connected) { this.ConnectClient(); } if ((this.client != null) && this.client.Connected && this.clientStream.CanRead) { while (this.clientStream.CanRead && !this.shouldStop) { Chunk chunk = new Chunk(); chunk.data = new byte[1024]; chunk.length = this.clientStream.Read(chunk.data, 0, 1024); byte[] target = new byte[2048]; // einfach größer int j = 0; for (int i = 0; i < chunk.length; i++, j++) { target[j] = chunk.data[i]; if (chunk.data[i] == 10) // LF { target[j] = 13; // CR j++; target[j] = 10; //LF } } if(j > chunk.length) // es wurden CR eingefügt { chunk.data = null; chunk.data = target; chunk.length = j; _log.DebugFormat("Chunk data replaced, new length {0}", j); } if (chunk.length > 0 && (this._connections.Count > 0)) { this.lastDataReceivedAt = DateTime.Now; // reset data timeout // clone chunk for each connected client lock (this._connections) { foreach (ConnectionInfo connectionInfo in _connections) { connectionInfo.chunks.Enqueue(chunk.DeepClone()); } } chunk.data = null; } chunk = null; } } else { if ((DateTime.Now - lastDataReceivedAt) > TimeSpan.FromMinutes(10)) { // close client connection if no data is received for 10 minutes _log.Info("Server inactive for 10 minutes, disconnecting"); if(this.client != null) this.client.Close(); this.client = null; } Thread.Sleep(120000); // try connect again in 2 minutes } Thread.Sleep(50); } catch (Exception ex) { _log.ErrorFormat("client-side exception: {0}", ex.Message); if (this.client != null) { this.client.Close(); this.client = null; } Thread.Sleep(60000); // 60 Sekunden warten und neu verbinden } } if (this.client != null) { _log.Info("closing client connection"); this.client.Close(); } } } }