Added SQLite storage logic

This commit is contained in:
Daniel Schick 2022-10-13 11:30:00 +02:00
parent c1337e3134
commit e3e829b006
11 changed files with 393 additions and 17 deletions

Binary file not shown.

View File

@ -15,8 +15,7 @@ namespace bsmd.AIS2Service
private readonly ConcurrentQueue<string> _inputLines;
private readonly ConcurrentQueue<AISClass> _outputAISClasses;
private Thread _thread;
private readonly Dictionary<string, List<AISQueueElement>> fragmentDict = new Dictionary<string, List<AISQueueElement>>();
private const int sleepMS = 250;
private readonly Dictionary<string, List<AISQueueElement>> fragmentDict = new Dictionary<string, List<AISQueueElement>>();
private bool _stopFlag = false;
private static readonly ILog _log = LogManager.GetLogger(typeof(AISDecoder));
@ -99,7 +98,7 @@ namespace bsmd.AIS2Service
}
else
{
Thread.Sleep(sleepMS);
Thread.Sleep(Properties.Settings.Default.ThreadSleepMS);
}
}
}

View File

@ -16,25 +16,41 @@ namespace bsmd.AIS2Service
private static readonly ConcurrentQueue<AISClass> _decodedClasses = new ConcurrentQueue<AISClass>();
private static readonly ILog _log = LogManager.GetLogger(typeof(AISManager));
private static readonly ConcurrentDictionary<int, AIS_Target> _sitRepList = new ConcurrentDictionary<int, AIS_Target>();
private static readonly ConcurrentQueue<AIS_Target> _dbSaveTargets = new ConcurrentQueue<AIS_Target>();
private static Timer _staleTargetTimer;
public static void Start()
public static async void Start()
{
_tasks.Add(new SerialTCPReader(Properties.Settings.Default.DataSourceHost, Properties.Settings.Default.DataSourcePort, _inputLines));
_tasks.Add(new AISDecoder(_inputLines, _decodedClasses));
_tasks.Add(new SitRep(_decodedClasses, _sitRepList));
_tasks.Add(new SitRep(_decodedClasses, _sitRepList, _dbSaveTargets));
AIS_SQLiteStorage sqliteStorage = new AIS_SQLiteStorage(_dbSaveTargets);
_tasks.Add(sqliteStorage);
// preload sit rep
Dictionary<int, AIS_Target> targets = await sqliteStorage.LoadTargets();
foreach(int key in targets.Keys)
{
_sitRepList.TryAdd(key, targets[key]);
}
foreach (var task in _tasks)
{
task.Start();
_log.InfoFormat("{0} started", task.Name);
task.FatalErrorOccurred += Task_FatalErrorOccurred;
}
_staleTargetTimer = new Timer(staleTimerCheck, null, 0, 60000); // check every minute
_staleTargetTimer = new Timer(StaleTimerCheck, null, 0, 60000); // check every minute
}
private static void staleTimerCheck(object state)
private static void Task_FatalErrorOccurred(object sender, EventArgs e)
{
throw new NotImplementedException("TBD: shutdown the whole operation?");
}
private static void StaleTimerCheck(object state)
{
List<int> removeKeyList = new List<int>();
foreach (int key in _sitRepList.Keys)

View File

@ -0,0 +1,256 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Threading.Tasks;
using System.Threading;
using System.Data.SQLite;
using log4net;
using System.Collections.Concurrent;
namespace bsmd.AIS2Service
{
/// <summary>
/// This class handles the (short-time) storage of AIS targets with a limited
/// past track. It is just intended to function as a "saving the state" of the AIS situation.
/// </summary>
internal class AIS_SQLiteStorage : IAISThread
{
#region Fields
private readonly SQLiteConnection _connection;
private Thread _thread;
private bool _stopFlag = false;
private static readonly ILog _log = LogManager.GetLogger(typeof(AIS_SQLiteStorage));
private readonly ConcurrentQueue<AIS_Target> _inputQueue;
private readonly Dictionary<int, AIS_Target> _storageTargets = new Dictionary<int, AIS_Target>();
#endregion
#region Construction
public AIS_SQLiteStorage (ConcurrentQueue<AIS_Target> inputQueue)
{
_inputQueue = inputQueue;
_connection = new SQLiteConnection(Properties.Settings.Default.SQLiteDBConnectionString);
_connection.Open();
}
#endregion
#region private methods
private void ReadQueue()
{
try
{
// we create "common" commands and just reload the parameters after each shot :P
string updateStaticString = "UPDATE staticdata SET destination = ?, eta = ?, version = ?, imo = ?, callsign = ?, name = ?, shiptype = ?, typeofdevice = ?, maxpresetstaticdraught = ?, dte = ?, classb = ?, breadth = ?, length = ? WHERE mmsi = ?";
SQLiteCommand updateStaticCmd = new SQLiteCommand(updateStaticString, _connection);
updateStaticCmd.Parameters.Add("DESTINATION", DbType.String);
updateStaticCmd.Parameters.Add("ETA", DbType.DateTime);
updateStaticCmd.Parameters.Add("VERSION", DbType.Int32);
updateStaticCmd.Parameters.Add("IMO", DbType.Int32);
updateStaticCmd.Parameters.Add("CALLSIGN", DbType.String);
updateStaticCmd.Parameters.Add("NAME", DbType.String);
updateStaticCmd.Parameters.Add("SHIPTYPE", DbType.Int32);
updateStaticCmd.Parameters.Add("TYPEOFDEVICE", DbType.Int32);
updateStaticCmd.Parameters.Add("DRAUGHT", DbType.Int32);
updateStaticCmd.Parameters.Add("DTE", DbType.Boolean);
updateStaticCmd.Parameters.Add("CLASSB", DbType.Boolean);
updateStaticCmd.Parameters.Add("BREADTH", DbType.Int32);
updateStaticCmd.Parameters.Add("LENGTH", DbType.Int32);
updateStaticCmd.Parameters.Add("MMSI", DbType.Int32);
string insertStaticString = "INSERT INTO staticdata (mmsi, version, imo, callsign, name, shiptype, typeofdevice, maxpresetstaticdraught, destination, dte, classb, breadth, length, eta) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
SQLiteCommand insertStaticCmd = new SQLiteCommand(insertStaticString, _connection);
insertStaticCmd.Parameters.Add("MMSI", DbType.Int32);
insertStaticCmd.Parameters.Add("VERSION", DbType.Int32);
insertStaticCmd.Parameters.Add("IMO", DbType.Int32);
insertStaticCmd.Parameters.Add("CALLSIGN", DbType.String);
insertStaticCmd.Parameters.Add("NAME", DbType.String);
insertStaticCmd.Parameters.Add("SHIPTYPE", DbType.Int32);
insertStaticCmd.Parameters.Add("TYPEOFDEVICE", DbType.Int32);
insertStaticCmd.Parameters.Add("DRAUGHT", DbType.Int32);
insertStaticCmd.Parameters.Add("DESTINATION", DbType.String);
insertStaticCmd.Parameters.Add("DTE", DbType.Boolean);
insertStaticCmd.Parameters.Add("CLASSB", DbType.Boolean);
insertStaticCmd.Parameters.Add("BREADTH", DbType.Int32);
insertStaticCmd.Parameters.Add("LENGTH", DbType.Int32);
insertStaticCmd.Parameters.Add("ETA", DbType.DateTime);
string insertPosReportString = "INSERT INTO posreport (mmsi, navstatus, rot, cog, sog, accuracy, longitude, latitude, heading, timestamp, raim, commstate) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
SQLiteCommand insertPosReportCmd = new SQLiteCommand(insertPosReportString, _connection);
insertPosReportCmd.Parameters.Add("MMSI", DbType.Int32);
insertPosReportCmd.Parameters.Add("NAVSTATUS", DbType.Int32);
insertPosReportCmd.Parameters.Add("ROT", DbType.Int32);
insertPosReportCmd.Parameters.Add("COG", DbType.Int32);
insertPosReportCmd.Parameters.Add("SOG", DbType.Int32);
insertPosReportCmd.Parameters.Add("ACCURACY", DbType.Boolean);
insertPosReportCmd.Parameters.Add("LONGITUDE", DbType.Int32);
insertPosReportCmd.Parameters.Add("LATITUDE", DbType.Int32);
insertPosReportCmd.Parameters.Add("HEADING", DbType.Int32);
insertPosReportCmd.Parameters.Add("TIMESTAMP", DbType.DateTime);
insertPosReportCmd.Parameters.Add("RAIM", DbType.Boolean);
insertPosReportCmd.Parameters.Add("COMMSTATE", DbType.Int32);
while (!_stopFlag)
{
if(_inputQueue.TryDequeue(out AIS_Target target))
{
if(!_storageTargets.ContainsKey(target.MMSI))
{
// insert
insertStaticCmd.Parameters["MMSI"].Value = target.MMSI;
insertStaticCmd.Parameters["VERSION"].Value = target.AISVersion ?? (object)DBNull.Value;
insertStaticCmd.Parameters["IMO"].Value = target.IMO ?? (object) DBNull.Value;
insertStaticCmd.Parameters["CALLSIGN"].Value = target.Callsign ?? "";
insertStaticCmd.Parameters["NAME"].Value = target.Name ?? "";
insertStaticCmd.Parameters["SHIPTYPE"].Value = target.ShipType ?? (object) DBNull.Value;
insertStaticCmd.Parameters["TYPEOFDEVICE"].Value = target.TypeOfDevice;
if (target.Draught.HasValue)
insertStaticCmd.Parameters["DRAUGHT"].Value = (int) (target.Draught * 10);
else
insertStaticCmd.Parameters["DRAUGHT"].Value = DBNull.Value;
insertStaticCmd.Parameters["DESTINATION"].Value = target.Destination ?? "";
insertStaticCmd.Parameters["DTE"].Value = target.DTE ?? (object)DBNull.Value ;
insertStaticCmd.Parameters["CLASSB"].Value = target.IsClassB ?? (object)DBNull.Value;
insertStaticCmd.Parameters["BREADTH"].Value = target.Breadth ?? (object)DBNull.Value;
insertStaticCmd.Parameters["LENGTH"].Value = target.Length ?? (object)DBNull.Value;
insertStaticCmd.Parameters["ETA"].Value = target.ETA ?? (object)DBNull.Value;
if (_stopFlag) break;
if (insertStaticCmd.ExecuteNonQuery() != 1)
{
_log.WarnFormat("Query didn't affect any rows");
}
_storageTargets.Add(target.MMSI, target);
}
else
{
// update
updateStaticCmd.Parameters["DESTINATION"].Value = target.Destination ?? "";
updateStaticCmd.Parameters["ETA"].Value = target.ETA ?? (object)DBNull.Value;
updateStaticCmd.Parameters["MMSI"].Value = target.MMSI;
updateStaticCmd.Parameters["VERSION"].Value = target.AISVersion ?? (object)DBNull.Value;
updateStaticCmd.Parameters["IMO"].Value = target.IMO ?? (object)DBNull.Value;
updateStaticCmd.Parameters["CALLSIGN"].Value = target.Callsign ?? "";
updateStaticCmd.Parameters["NAME"].Value = target.Name ?? "";
updateStaticCmd.Parameters["SHIPTYPE"].Value = target.ShipType ?? (object)DBNull.Value;
updateStaticCmd.Parameters["TYPEOFDEVICE"].Value = target.TypeOfDevice ?? (object)DBNull.Value;
if (target.Draught.HasValue)
updateStaticCmd.Parameters["DRAUGHT"].Value = (int) (target.Draught * 10);
else
updateStaticCmd.Parameters["DRAUGHT"].Value = DBNull.Value;
updateStaticCmd.Parameters["DTE"].Value = target.DTE ?? (object)DBNull.Value;
updateStaticCmd.Parameters["CLASSB"].Value = target.IsClassB ?? (object)DBNull.Value;
updateStaticCmd.Parameters["BREADTH"].Value = target.Breadth ?? (object) DBNull.Value;
updateStaticCmd.Parameters["LENGTH"].Value = target.Length ?? (object)DBNull.Value;
if (_stopFlag) break;
if (updateStaticCmd.ExecuteNonQuery() != 1)
{
_log.WarnFormat("Query didn't affect any rows");
}
}
// now create a position report
insertPosReportCmd.Parameters["MMSI"].Value = target.MMSI;
insertPosReportCmd.Parameters["NAVSTATUS"].Value = target.NavStatus;
insertPosReportCmd.Parameters["ROT"].Value = target.ROT;
if(target.COG.HasValue)
insertPosReportCmd.Parameters["COG"].Value = (int) (target.COG * 10);
else
insertPosReportCmd.Parameters["COG"].Value = DBNull.Value;
if (target.SOG.HasValue)
insertPosReportCmd.Parameters["SOG"].Value = (int) (target.SOG * 10);
else
insertPosReportCmd.Parameters["SOG"].Value = DBNull.Value;
insertPosReportCmd.Parameters["ACCURACY"].Value = target.Accuracy;
if (target.Longitude.HasValue)
insertPosReportCmd.Parameters["LONGITUDE"].Value = (int)(target.Longitude.Value * 1000);
else
insertPosReportCmd.Parameters["LONGITUDE"].Value = DBNull.Value;
if (target.Latitude.HasValue)
insertPosReportCmd.Parameters["LATITUDE"].Value = (int)(target.Latitude.Value * 1000);
else
insertPosReportCmd.Parameters["LATITUDE"].Value = DBNull.Value;
insertPosReportCmd.Parameters["HEADING"].Value = target.Heading;
insertPosReportCmd.Parameters["TIMESTAMP"].Value = target.LastUpdate;
insertPosReportCmd.Parameters["RAIM"].Value = target.RAIM;
insertPosReportCmd.Parameters["COMMSTATE"].Value = target.Radio;
if (_stopFlag) break;
if (insertPosReportCmd.ExecuteNonQuery() != 1)
{
_log.WarnFormat("Query didn't affect any rows");
}
}
else
{
Thread.Sleep(Properties.Settings.Default.ThreadSleepMS);
}
}
}
catch (Exception ex)
{
_log.ErrorFormat("Something bad has happened: {0}", ex.Message);
this.FatalErrorOccurred?.Invoke(this, new EventArgs());
}
}
/// <summary>
/// pre-load all targets already stored in the sqlite database
/// </summary>
public async Task<Dictionary<int, AIS_Target>> LoadTargets()
{
await Task.Run(() =>
{
string staticQuery = "SELECT mmsi, version, imo, callsign, name, shiptype, typeofdevice, maxpresetstaticdraught, destination, dte, classb, breadth, length, eta, changed, updatecount FROM staticdata";
SQLiteCommand cmd = new SQLiteCommand(staticQuery, _connection);
SQLiteDataReader reader = cmd.ExecuteReader();
while(reader.Read())
{
AIS_Target target = AIS_Target.CreateFromReader(reader);
_storageTargets.Add(target.MMSI, target);
}
reader.Close();
});
return _storageTargets;
}
#endregion
#region IAISThread implementation
public string Name { get { return "SQLite storage"; } }
public event EventHandler FatalErrorOccurred;
public void Start()
{
if (_thread != null) return; // may not run twice
ThreadStart runReader = new ThreadStart(this.ReadQueue);
_thread = new Thread(runReader);
_thread.Start();
}
public void Stop()
{
if (_thread == null) return;
_connection.Close();
_stopFlag = true;
_thread.Join();
_thread = null;
}
#endregion
}
}

View File

@ -1,4 +1,5 @@
using System;
using System.Data;
namespace bsmd.AIS2Service
{
@ -7,7 +8,7 @@ namespace bsmd.AIS2Service
#region private members
private int _mmsi;
private readonly int _mmsi;
private bool? _isClassB = false;
private DateTime? _lastUpdate;
private string _name;
@ -204,6 +205,8 @@ namespace bsmd.AIS2Service
get { return _dte; } private set { _dte = value; }
}
public int UpdateCount { get; private set; }
#endregion
#region public methods
@ -239,10 +242,51 @@ namespace bsmd.AIS2Service
this.Draught = staticData.Draught;
this.Destination = staticData.Destination;
this.DTE = staticData.DTE == 0;
this.IsClassB = false;
this.LastUpdate = DateTime.Now;
}
internal static AIS_Target CreateFromReader(IDataReader reader)
{
int mmsi = reader.GetInt32(0);
AIS_Target result = new AIS_Target(mmsi);
if (!reader.IsDBNull(1))
result.AISVersion = reader.GetInt32(1);
if (!reader.IsDBNull(2))
result.IMO = reader.GetInt32(2);
if (!reader.IsDBNull(3))
result.Callsign = reader.GetString(3);
if (!reader.IsDBNull(4))
result.Name = reader.GetString(4);
if (!reader.IsDBNull(5))
result.ShipType = reader.GetInt32(5);
if (!reader.IsDBNull(6))
result.TypeOfDevice = reader.GetInt32(6);
if (!reader.IsDBNull(7))
result.Draught = reader.GetInt32(7) / 10.0;
if (!reader.IsDBNull(8))
result.Destination = reader.GetString(8);
if (!reader.IsDBNull(9))
result.DTE = reader.GetBoolean(9);
if (!reader.IsDBNull(10))
result.IsClassB = reader.GetBoolean(10);
if (!reader.IsDBNull(11))
result.Breadth = reader.GetInt32(11);
if (!reader.IsDBNull(12))
result.Length = reader.GetInt32(12);
if (!reader.IsDBNull(13))
result.ETA = DateTime.Parse(reader.GetString(13));
if (!reader.IsDBNull(14))
result.LastUpdate = DateTime.Parse(reader.GetString(14));
result.UpdateCount = reader.GetInt32(15);
return result;
}
#endregion
#region overrides

View File

@ -43,6 +43,15 @@
<setting name="StaleTargetTimeoutMins" serializeAs="String">
<value>60</value>
</setting>
<setting name="ThreadSleepMS" serializeAs="String">
<value>250</value>
</setting>
<setting name="PosReportMinTimeDiffSecs" serializeAs="String">
<value>60</value>
</setting>
<setting name="SQLiteDBConnectionString" serializeAs="String">
<value>Data Source=ais_initial.db;Version=3;</value>
</setting>
</bsmd.AIS2Service.Properties.Settings>
</applicationSettings>
</configuration>

View File

@ -49,5 +49,32 @@ namespace bsmd.AIS2Service.Properties {
return ((uint)(this["StaleTargetTimeoutMins"]));
}
}
[global::System.Configuration.ApplicationScopedSettingAttribute()]
[global::System.Diagnostics.DebuggerNonUserCodeAttribute()]
[global::System.Configuration.DefaultSettingValueAttribute("250")]
public int ThreadSleepMS {
get {
return ((int)(this["ThreadSleepMS"]));
}
}
[global::System.Configuration.ApplicationScopedSettingAttribute()]
[global::System.Diagnostics.DebuggerNonUserCodeAttribute()]
[global::System.Configuration.DefaultSettingValueAttribute("60")]
public uint PosReportMinTimeDiffSecs {
get {
return ((uint)(this["PosReportMinTimeDiffSecs"]));
}
}
[global::System.Configuration.ApplicationScopedSettingAttribute()]
[global::System.Diagnostics.DebuggerNonUserCodeAttribute()]
[global::System.Configuration.DefaultSettingValueAttribute("Data Source=ais_initial.db;Version=3;")]
public string SQLiteDBConnectionString {
get {
return ((string)(this["SQLiteDBConnectionString"]));
}
}
}
}

View File

@ -11,5 +11,14 @@
<Setting Name="StaleTargetTimeoutMins" Type="System.UInt32" Scope="Application">
<Value Profile="(Default)">60</Value>
</Setting>
<Setting Name="ThreadSleepMS" Type="System.Int32" Scope="Application">
<Value Profile="(Default)">250</Value>
</Setting>
<Setting Name="PosReportMinTimeDiffSecs" Type="System.UInt32" Scope="Application">
<Value Profile="(Default)">60</Value>
</Setting>
<Setting Name="SQLiteDBConnectionString" Type="System.String" Scope="Application">
<Value Profile="(Default)">Data Source=ais_initial.db;Version=3;</Value>
</Setting>
</Settings>
</SettingsFile>

View File

@ -3,11 +3,9 @@ using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace bsmd.AIS2Service
{
@ -49,8 +47,7 @@ namespace bsmd.AIS2Service
{
_log.ErrorFormat("Something bad has happened: {0}", ex.Message);
this.FatalErrorOccurred?.Invoke(this, new EventArgs());
}
}
}
private void Connect()
@ -69,6 +66,7 @@ namespace bsmd.AIS2Service
{
yield return line;
}
_log.InfoFormat("passed behind yield and closing StreamReader");
}
}

View File

@ -17,18 +17,27 @@ namespace bsmd.AIS2Service
/// </summary>
internal class SitRep : IAISThread
{
#region Fields
private Thread _thread;
private bool _stopFlag = false;
private static readonly ILog _log = LogManager.GetLogger(typeof(SitRep));
private readonly ConcurrentQueue<AISClass> _inputQueue;
private readonly ConcurrentDictionary<int, AIS_Target> _sitRep;
private const int sleepMS = 250;
private readonly ConcurrentQueue<AIS_Target> _dbQueue;
public SitRep(ConcurrentQueue<AISClass> inputQueue, ConcurrentDictionary<int, AIS_Target> sitRep)
#endregion
#region Construction
public SitRep(ConcurrentQueue<AISClass> inputQueue, ConcurrentDictionary<int, AIS_Target> sitRep, ConcurrentQueue<AIS_Target> dbQueue)
{
_inputQueue = inputQueue; _sitRep = sitRep;
_inputQueue = inputQueue; _sitRep = sitRep; _dbQueue = dbQueue;
}
#endregion
private void ReadMessages()
{
try
@ -49,7 +58,11 @@ namespace bsmd.AIS2Service
AIS_Target target = new AIS_Target(posReport.MMSI);
_sitRep[posReport.MMSI] = target;
}
DateTime? lastUpdate = _sitRep[posReport.MMSI].LastUpdate;
_sitRep[posReport.MMSI].Update(posReport);
if (!lastUpdate.HasValue ||
((DateTime.Now - lastUpdate.Value).TotalSeconds > Properties.Settings.Default.PosReportMinTimeDiffSecs))
_dbQueue.Enqueue(_sitRep[posReport.MMSI]);
}
break;
case AISClass.AISType.STATIC_VOYAGE_DATA:
@ -61,6 +74,7 @@ namespace bsmd.AIS2Service
_sitRep[staticData.MMSI] = target;
}
_sitRep[staticData.MMSI].Update(staticData);
_dbQueue.Enqueue(_sitRep[staticData.MMSI]);
}
break;
default:
@ -70,8 +84,7 @@ namespace bsmd.AIS2Service
}
else
{
Thread.Sleep(sleepMS);
_log.DebugFormat("Targets: {0}", _sitRep.Count);
Thread.Sleep(Properties.Settings.Default.ThreadSleepMS);
}
}
}

View File

@ -68,6 +68,7 @@
<Compile Include="AIS_ClassBExt.cs" />
<Compile Include="AIS_ClassBStatic.cs" />
<Compile Include="AIS_PosReport.cs" />
<Compile Include="AIS_SQLiteStorage.cs" />
<Compile Include="AIS_StaticData.cs" />
<Compile Include="AIS_Target.cs" />
<Compile Include="IAISThread.cs" />
@ -86,6 +87,10 @@
<Compile Include="SitRep.cs" />
</ItemGroup>
<ItemGroup>
<None Include="..\SQL\ais_initial.db">
<Link>ais_initial.db</Link>
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Include="App.config" />
<None Include="packages.config" />
<None Include="Properties\Settings.settings">