git_bsmd/AIS/bsmd.AISService/AIS/AIS_QueueManager.cs

227 lines
7.8 KiB
C#

using System;
using System.Collections.Generic;
using System.Text;
using System.Timers;
using System.Diagnostics;
using System.Threading;
namespace bsmd.AISService.AIS
{
/// <summary>
/// Hier laufen die Fäden zusammen. Diese Klasse enthält alle Objekte und kann direkt von
/// Konsolen / services und Windowsprogrammen verwendet werden
/// </summary>
public class AIS_QueueManager
{
public delegate void AISQueueChangedHandler(AIS_Target target);
public event AISQueueChangedHandler AISQueueChanged;
public event AISQueueChangedHandler DBUpdateRequired;
private Dictionary<int, AIS_Target> activeTargets = new Dictionary<int, AIS_Target>();
private List<AIS_Target> activeTargetList = new List<AIS_Target>();
private List<AIS_Target> databaseTargets = new List<AIS_Target>();
private List<AIS_Target> watchkeeperTargets = new List<AIS_Target>();
private AIS_Configuration configuration;
private List<SerialDataHandler> serialHandlerList = new List<SerialDataHandler>();
private List<TelnetDataHandler> telnetHandlerList = new List<TelnetDataHandler>();
private List<AIS_Target> dbUpdateQueue = new List<AIS_Target>();
private System.Timers.Timer dbUpdateTimer = new System.Timers.Timer();
private bool isStarted = false;
private Mutex dbSingleMutex = new Mutex();
#region Construction
public AIS_QueueManager(AIS_Configuration configuration, List<Serial_IO> serialIOs, List<AIS_Telnet> ais_Telnets)
{
this.configuration = configuration;
foreach (Serial_IO serialIO in serialIOs)
{
AIS_Decoder decoder = new AIS_Decoder();
decoder.AISMessageReceived += new AIS_Decoder.AISMessageHandler(this.decoder_AISMessageReceived);
SerialDataHandler handler = new SerialDataHandler(serialIO, decoder);
this.serialHandlerList.Add(handler);
}
foreach (AIS_Telnet aisTelnet in ais_Telnets)
{
AIS_Decoder decoder = new AIS_Decoder();
decoder.AISMessageReceived += new AIS_Decoder.AISMessageHandler(this.decoder_AISMessageReceived);
TelnetDataHandler tdn = new TelnetDataHandler(aisTelnet, decoder);
this.telnetHandlerList.Add(tdn);
}
AIS_Target.dbUpdateInterval = new TimeSpan(0, 0, configuration.DBMinPosReportTimeDifference);
this.dbUpdateTimer.Interval = configuration.DBUpdateInterval;
this.dbUpdateTimer.Elapsed += new ElapsedEventHandler(dbUpdateTimer_Elapsed);
}
#endregion
#region Properties
public List<AIS_Target> ActiveTargets
{
get
{
return this.activeTargetList;
}
}
public bool IsStarted
{
get { return this.isStarted; }
}
#endregion
#region event handler
void dbUpdateTimer_Elapsed(object sender, ElapsedEventArgs e)
{
if (dbSingleMutex.WaitOne(0))
{
while (this.dbUpdateQueue.Count > 0)
{
AIS_Target currentTarget = null;
lock (this.dbUpdateQueue)
{
// Trace.WriteLine(string.Format("Update queue size: {0}", this.dbUpdateQueue.Count));
currentTarget = this.dbUpdateQueue[0];
this.dbUpdateQueue.RemoveAt(0);
}
this.OnDBUpdateRequired(currentTarget);
}
// remove stale targets
lock (this.activeTargetList)
{
for (int i = 0; i < this.activeTargetList.Count; i++)
{
if (!this.activeTargetList[i].LastUpdate.HasValue)
continue;
int diffmin = (int)(DateTime.Now - this.activeTargetList[i].LastUpdate.Value).TotalMinutes;
if (diffmin > this.configuration.TargetStaleMins)
{
this.activeTargetList.RemoveAt(i);
i--;
}
}
}
dbSingleMutex.ReleaseMutex();
}
}
void decoder_AISMessageReceived(AIS message)
{
lock (this.activeTargets)
{
// Trace.WriteLine(string.Format("Queue manager: AIS message received, queue size: {0}", activeTargets.Count));
if (!this.activeTargets.ContainsKey(message.MMSI))
{
AIS_Target target = new AIS_Target(message.MMSI);
this.activeTargets.Add(message.MMSI, target);
lock (this.activeTargetList)
{
this.activeTargetList.Add(target);
}
}
this.activeTargets[message.MMSI].Update(message);
this.OnAISQueueChanged(this.activeTargets[message.MMSI]);
if (this.activeTargets[message.MMSI].UpdateDB)
{
lock (this.dbUpdateQueue)
{
if (!this.dbUpdateQueue.Contains(this.activeTargets[message.MMSI]))
this.dbUpdateQueue.Add(this.activeTargets[message.MMSI]);
}
}
}
}
#endregion
#region public methods
public bool Start(ref string message)
{
bool retval = true;
if (this.isStarted)
{
message = "Queue manager already started";
return true;
}
foreach (SerialDataHandler sdh in this.serialHandlerList)
{
string messagePart = "";
retval &= sdh.Start(ref messagePart);
if (!retval)
message += messagePart + Environment.NewLine;
if(retval) sdh.AIS_Decoder.Start();
}
foreach (TelnetDataHandler tdh in this.telnetHandlerList)
{
string messagePart = "";
retval &= tdh.Start(ref messagePart);
if (!retval)
message += messagePart + Environment.NewLine;
if (retval) tdh.AIS_Decoder.Start();
}
if (retval)
this.dbUpdateTimer.Start();
if (retval) this.isStarted = true;
return retval;
}
public void Stop()
{
if (this.isStarted)
{
foreach (SerialDataHandler sdh in this.serialHandlerList)
{
sdh.Stop();
sdh.AIS_Decoder.Stop();
}
foreach (TelnetDataHandler tdh in this.telnetHandlerList)
{
tdh.Stop();
tdh.AIS_Decoder.Stop();
}
this.dbUpdateTimer.Stop();
this.isStarted = false;
}
}
#endregion
#region OnEvent methods
protected void OnAISQueueChanged(AIS_Target target)
{
this.AISQueueChanged?.Invoke(target);
}
protected void OnDBUpdateRequired(AIS_Target target)
{
this.DBUpdateRequired?.Invoke(target);
}
#endregion
}
}