git_bsmd/AIS/LS100PortProxy/MultiplexManager.cs

338 lines
14 KiB
C#

//
// Class: MultiplexManager
// Current CLR: 4.0.30319.296
// System: Microsoft Visual Studio 10.0
// Author: dani
// Created: 4/8/2013 9:02:48 PM
//
// Copyright (c) 2013 Informatikbüro Daniel Schick. All rights reserved.
using System;
using System.Text;
using System.Collections.Generic;
using System.Threading;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Diagnostics;
using log4net;
namespace LS100PortProxy
{
public class MultiplexManager
{
private int ifIdx;
private int clientPort;
private int serverPort;
private string remoteServerAddress;
private bool shouldStop = false;
private Thread _acceptThread;
private Socket _serverSocket;
private List<ConnectionInfo> _connections = new List<ConnectionInfo>();
private TcpClient client;
private Thread _clientThread;
private NetworkStream clientStream;
private DateTime lastDataReceivedAt;
private ILog _log = LogManager.GetLogger(typeof(MultiplexManager));
private class Chunk
{
public int length;
public byte[] data;
public Chunk DeepClone()
{
Chunk newChunk = new Chunk();
newChunk.length = this.length;
newChunk.data = new byte[this.length];
Array.Copy(this.data, newChunk.data, this.length);
return newChunk;
}
}
private class ConnectionInfo
{
public Socket Socket;
public Thread Thread;
public Queue<Chunk> chunks = new Queue<Chunk>();
}
public MultiplexManager(int ifIdx, string address, int cPort, int sPort)
{
this.ifIdx = ifIdx;
this.remoteServerAddress = address;
this.clientPort = cPort;
this.serverPort = sPort;
}
public void StartServer()
{
this.SetupServerSocket();
_acceptThread = new Thread(AcceptConnections);
_acceptThread.IsBackground = true;
_acceptThread.Start();
}
public void StopServer()
{
this.shouldStop = true;
_serverSocket.Close();
_acceptThread.Join();
}
public static string ListIfs()
{
StringBuilder sb = new StringBuilder();
IPHostEntry localMachineInfo = Dns.GetHostEntry(Dns.GetHostName());
for(int i= 0;i< localMachineInfo.AddressList.Length; i++)
{
IPAddress address = localMachineInfo.AddressList[i];
sb.AppendLine(string.Format("# {0}: {1}", i, address.ToString()));
}
return sb.ToString();
}
private void SetupServerSocket()
{
// Resolving local machine information
IPHostEntry localMachineInfo = Dns.GetHostEntry(Dns.GetHostName());
IPEndPoint myEndpoint = new IPEndPoint(IPAddress.Any, serverPort);
//IPEndPoint myEndpoint = new IPEndPoint(localMachineInfo.AddressList[this.ifIdx], serverPort);
_log.InfoFormat("listening on {0}, port {1}", myEndpoint.Address, serverPort);
// Create the socket, bind it, and start listening
_serverSocket = new Socket(myEndpoint.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
_serverSocket.Blocking = true;
_serverSocket.Bind(myEndpoint);
_serverSocket.Listen(32);
}
private void AcceptConnections()
{
while (!shouldStop)
{ // Accept a connection
try
{
Socket socket = _serverSocket.Accept();
ConnectionInfo connection = new ConnectionInfo();
connection.Socket = socket; // Create the thread for the receives.
connection.Thread = new Thread(ProcessConnection);
connection.Thread.IsBackground = true;
connection.Thread.Start(connection);
// Store the socket
lock (_connections)
_connections.Add(connection);
_log.InfoFormat("new connection from {0}, total connections: {1}", socket.RemoteEndPoint.ToString(), _connections.Count);
}
catch (SocketException ex)
{ /* tu was? */
_log.WarnFormat("Exception on server socket:{0}", ex.Message);
}
Thread.Sleep(250);
}
_log.Info("Server connection closed");
this._serverSocket.Close();
}
private void ProcessConnection(object state)
{
ConnectionInfo connection = (ConnectionInfo)state;
try
{
while (!shouldStop)
{
// write all chunks in incoming chunk queue to this socket
if (connection.Socket.Poll(500, SelectMode.SelectWrite))
{
while (connection.chunks.Count > 0)
{
Chunk aChunk = null;
lock (_connections)
{
aChunk = connection.chunks.Dequeue();
//Trace.WriteLine(string.Format("{0} : {1}", connection.Socket.RemoteEndPoint, connection.chunks.Count));
if (connection.chunks.Count % 10 == 0)
_log.WarnFormat("Chunk count reached {0} elements for Endpoint {1}",
connection.chunks.Count,
connection.Socket.RemoteEndPoint);
}
int bytesToSend = aChunk.length;
byte[] buffer = aChunk.data;
while (bytesToSend > 0)
{
//string testOutput = Encoding.ASCII.GetString(aChunk.data);
//string[] lines = testOutput.Split('\r');
//foreach(string line in lines)
// _log.Debug(line);
int actuallySentBytes = connection.Socket.Send(buffer, bytesToSend, SocketFlags.None);
//Trace.WriteLine(string.Format("{0}/{1}", actuallySentBytes, bytesToSend));
bytesToSend -= actuallySentBytes;
if (bytesToSend > 0)
{
byte[] newBuffer = new byte[bytesToSend];
Array.Copy(buffer, actuallySentBytes, newBuffer, 0, bytesToSend);
buffer = newBuffer;
}
}
aChunk = null;
}
}
else if(connection.Socket.Poll(500, SelectMode.SelectError)) {
_log.WarnFormat("This Socket has an error, breaking the " +
"forever loop");
lock (_connections)
{
connection.chunks.Clear();
}
break;
}
else
{
_log.WarnFormat("cannot write to reader, clearing chunks..");
// dump queue in this case, packets cannot be sent..
lock (_connections)
{
connection.chunks.Clear();
}
// break;
}
Thread.Sleep(500);
}
}
catch (SocketException exc)
{
Console.WriteLine("Socket exception: " + exc.SocketErrorCode);
_log.WarnFormat("SocketException: {0}", exc.SocketErrorCode);
}
catch (Exception exc)
{ Console.WriteLine("Exception: " + exc); }
finally
{
connection.Socket.Close();
lock (_connections)
_connections.Remove(connection);
_log.InfoFormat("connection closed, total connections: {0}", _connections.Count);
GC.Collect();
}
}
internal void StartClient()
{
this._clientThread = new Thread(new ThreadStart(this.ClientConnection));
this._clientThread.Start();
}
private void ConnectClient()
{
try
{
if ((this.client != null) && (this.client.Connected)) return;
if (this.client != null)
{
this.client.Close();
GC.Collect();
}
this.client = new TcpClient(this.remoteServerAddress, this.clientPort);
this.clientStream = client.GetStream();
_log.InfoFormat("TCP stream connected ({0}:{1})", this.remoteServerAddress, this.clientPort);
}
catch (Exception ex)
{
_log.ErrorFormat("ConnectClient(): {0}", ex.Message);
}
}
private void ClientConnection()
{
while (!this.shouldStop)
{
try
{
if ((this.client == null) || !this.client.Connected)
{
this.ConnectClient();
}
if ((this.client != null) && this.client.Connected && this.clientStream.CanRead)
{
while (this.clientStream.CanRead && !this.shouldStop)
{
Chunk chunk = new Chunk();
chunk.data = new byte[1024];
chunk.length = this.clientStream.Read(chunk.data, 0, 1024);
byte[] target = new byte[2048]; // einfach größer
int j = 0;
for (int i = 0; i < chunk.length; i++, j++)
{
target[j] = chunk.data[i];
if (chunk.data[i] == 10) // LF
{
target[j] = 13; // CR
j++;
target[j] = 10; //LF
}
}
if(j > chunk.length) // es wurden CR eingefügt
{
chunk.data = null;
chunk.data = target;
chunk.length = j;
_log.DebugFormat("Chunk data replaced, new length {0}", j);
}
if (chunk.length > 0 && (this._connections.Count > 0))
{
this.lastDataReceivedAt = DateTime.Now; // reset data timeout
// clone chunk for each connected client
lock (this._connections)
{
foreach (ConnectionInfo connectionInfo in _connections)
{
connectionInfo.chunks.Enqueue(chunk.DeepClone());
}
}
chunk.data = null;
}
chunk = null;
}
}
else
{
if ((DateTime.Now - lastDataReceivedAt) > TimeSpan.FromMinutes(10))
{
// close client connection if no data is received for 10 minutes
_log.Info("Server inactive for 10 minutes, disconnecting");
if(this.client != null)
this.client.Close();
this.client = null;
}
Thread.Sleep(120000); // try connect again in 2 minutes
}
Thread.Sleep(50);
}
catch (Exception ex)
{
_log.ErrorFormat("client-side exception: {0}", ex.Message);
if (this.client != null)
{
this.client.Close();
this.client = null;
}
Thread.Sleep(60000); // 60 Sekunden warten und neu verbinden
}
}
if (this.client != null)
{
_log.Info("closing client connection");
this.client.Close();
}
}
}
}