Skip to content

Commit

Permalink
Quick fix to improve robustness (#133)
Browse files Browse the repository at this point in the history
* Experimental robustification

* Update readme for .net 6.0

* Update PowerShell path

Co-authored-by: Howard van Rooijen <[email protected]>
  • Loading branch information
idg10 and HowardvanRooijen authored Feb 15, 2022
1 parent 5390667 commit 764d8c8
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 31 deletions.
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ This project contains a [NmeaReceiver](https://github.com/ais-dotnet/Ais.Net.Rec

The project also includes a [demo console](https://github.com/ais-dotnet/Ais.Net.Receiver/blob/master/Solutions/Ais.Net.Receiver.Host.Console/Program.cs) which shows how the various pieces can fit together, including subscribing to the `IObservable<string>` and `IObservable<IAisMessage>` streams and displaying the results or batch the AIVDM/AIVDO sentences and write them to Azure Blob Storage using the [Append Blob](https://docs.microsoft.com/en-us/rest/api/storageservices/append-block) feature, to create timestamped hour-long rolling logs.

The purpose of this application is to provide sample data for [Ais.Net](https://github.com/ais-dotnet/Ais.Net) - the .NET Standard, high performance, zero allocation AIS decoder. The majority of raw AIS data is only available via commerical sources, and thus creating AIS datasets large enough to test / benchmark [Ais.Net](https://github.com/ais-dotnet/Ais.Net) is almost impossible.
The purpose of this application is to provide sample data for [Ais.Net](https://github.com/ais-dotnet/Ais.Net) - the .NET Standard, high performance, zero allocation AIS decoder. The majority of raw AIS data is only available via commercial sources, and thus creating AIS datasets large enough to test / benchmark [Ais.Net](https://github.com/ais-dotnet/Ais.Net) is almost impossible.

The Norwegian Costal Administration TCP endpoint produces:

Expand Down Expand Up @@ -186,7 +186,7 @@ From the command line: `dotnet Ais.Net.Receiver.Host.Console.exe`

# Raspberry Pi

As the AIS.NET stack is written in .NET 5.0 and .NET Standard you can publish the Ais.Net.Recevier.Host.Console application with a target runtime of Portable. This will allow you to run the recevier on a Raspberry Pi if you want to capture your own AIS data.
As the AIS.NET stack is written in .NET 6.0 and .NET Standard you can publish the Ais.Net.Recevier.Host.Console application with a target runtime of Portable. This will allow you to run the recevier on a Raspberry Pi if you want to capture your own AIS data.

For reliability you can run `Ais.Net.Recevier.Host.Console.dll` as daemon.

Expand All @@ -198,11 +198,11 @@ Install [Windows Terminal](https://github.com/microsoft/terminal). You can downl

Open Windows Terminal and use `ssh pi@<Raspberry PI IP Address>` to connect to your Pi.

### Install .NET 5.0
### Install .NET 6.0

Use the following commands to install .NET 5.0 on your Pi.
Use the following commands to install .NET 6.0 on your Pi.

1. `curl -sSL https://dot.net/v1/dotnet-install.sh | bash /dev/stdin -c 5.0`
1. `curl -sSL https://dot.net/v1/dotnet-install.sh | bash /dev/stdin --channel Current`
1. `echo 'export DOTNET_ROOT=$HOME/.dotnet' >> ~/.bashrc`
1. `echo 'export PATH=$PATH:$HOME/.dotnet' >> ~/.bashrc`
1. `source ~/.bashrc`
Expand All @@ -212,9 +212,9 @@ Use the following commands to install .NET 5.0 on your Pi.

Use the following commands to install PowerShell on your Pi.

1. Download the latest package `wget https://github.com/PowerShell/PowerShell/releases/download/v7.1.2/powershell-7.1.2-linux-arm32.tar.gz`
1. Download the latest package `wget https://github.com/PowerShell/PowerShell/releases/download/v7.2.1/powershell-7.2.1-linux-arm32.tar.gz`
1. Create a directory for it to be unpacked into `mkdir ~/powershell`
1. Unpack `tar -xvf ./powershell-7.1.2-linux-arm32.tar.gz -C ~/powershell`
1. Unpack `tar -xvf ./powershell-7.2.1-linux-arm32.tar.gz -C ~/powershell`
1. Give it executable rights `sudo chmod +x /opt/microsoft/powershell/7/pwsh`
1. Create a symbolic link `sudo ln -s /opt/microsoft/powershell/7/pwsh /usr/bin/pwsh`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ namespace Ais.Net.Receiver.Receiver
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

public class NetworkStreamNmeaReceiver : INmeaReceiver
{
private readonly TcpClient tcpClient = new();

public NetworkStreamNmeaReceiver(string host, int port, TimeSpan? retryPeriodicity, int retryAttemptLimit = 100)
{
this.Host = host;
Expand All @@ -24,8 +23,6 @@ public NetworkStreamNmeaReceiver(string host, int port, TimeSpan? retryPeriodici
this.RetryAttemptLimit = retryAttemptLimit;
}

public bool Connected => this.tcpClient.Connected;

public string Host { get; }

public int Port { get; }
Expand All @@ -34,32 +31,66 @@ public NetworkStreamNmeaReceiver(string host, int port, TimeSpan? retryPeriodici

public TimeSpan RetryPeriodicity { get; }

public async IAsyncEnumerable<string> GetAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
//public IAsyncEnumerable<string> GetAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
public IAsyncEnumerable<string> GetAsync(CancellationToken cancellationToken = default)
{
await this.tcpClient.ConnectAsync(this.Host, this.Port, cancellationToken);
await using NetworkStream stream = this.tcpClient.GetStream();
using StreamReader reader = new(stream);

int retryAttempt = 0;
// We're letting Rx handle the retries for us. Since the rest of the code is currently written
// to assume we return an IAsyncEnumerable (which we used to) we convert to that, but it's now
// really all Rx. And since I think it's Rx above us too, we can probably remove IAsyncEnumerable
// from the picture completely. This is all reactive stuff, so I don't think it really belongs.
return this.GetObservable(cancellationToken).ToAsyncEnumerable();
}

while (this.tcpClient.Connected)
public IObservable<string> GetObservable(CancellationToken cancellationToken = default)
{
IObservable<string> withoutRetry = Observable.Create<string>(async (obs, innerCancel) =>
{
while (stream.DataAvailable && !cancellationToken.IsCancellationRequested)
{
string? line = await reader.ReadLineAsync().ConfigureAwait(false);
if (line is not null) { yield return line; }
retryAttempt = 0;
}
using CancellationTokenSource? cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, innerCancel);
CancellationToken mergedToken = cts.Token;

if (cancellationToken.IsCancellationRequested || retryAttempt == this.RetryAttemptLimit)
while (!mergedToken.IsCancellationRequested)
{
break;
}
// Seems like we need a new one each time we try to connect, because if we reuse
// the previous TcpClient after a failure, it tells us it's disposed even if we
// didn't dispose it directly. (Perhaps disposing the NetworkStream has that effect?)
using TcpClient tcpClient = new();
await tcpClient.ConnectAsync(this.Host, this.Port, mergedToken);
await using NetworkStream stream = tcpClient.GetStream();
using StreamReader reader = new(stream);

int retryAttempt = 0;

while (tcpClient.Connected)
{
while (stream.DataAvailable && !mergedToken.IsCancellationRequested)
{
string? line = await reader.ReadLineAsync().ConfigureAwait(false);
if (line is not null) { obs.OnNext(line); }
retryAttempt = 0;
}

if (mergedToken.IsCancellationRequested || retryAttempt == this.RetryAttemptLimit)
{
break;
}

await Task.Delay(this.RetryPeriodicity, cancellationToken).ConfigureAwait(false);
await Task.Delay(this.RetryPeriodicity, mergedToken).ConfigureAwait(false);

retryAttempt++;
}

// Sometimes if the network connection drops, the TcpClient will just calmly set its
// Connected property to false and it won't throw an exception. So we need a non-exception
// retry loop. If we hit this point we just go round the outer try loop one more time.
// (It's quite likely if we hit this point that the very next thing to happen will
// be that the attempt to reconnect fails with an exception, but at that point the
// Rx-based retry will save us.
}
});

retryAttempt++;
}
// Let Rx handle the retries for us in the event of a failure that produces
// an exception.
return withoutRetry.Retry();
}
}
}

0 comments on commit 764d8c8

Please sign in to comment.