Skip to content

Commit

Permalink
Support .NET 8.0 and improve serialization methods
Browse files Browse the repository at this point in the history
Updated `ConsumeOutboundEventsAsync` in `BufferedChannelBase.cs` to conditionally cancel `_exitCancelSource` using `CancelAsync` for .NET 8.0 or greater, and `Cancel` for other versions. Modified `ToString` method to return a nullable string (`string?`).

Updated target frameworks in `Elastic.Channels.csproj` and `Elastic.Ingest.Elasticsearch.csproj` to include `net8.0`.

Enhanced `WriteBufferToStreamAsync` in `ElasticsearchChannelBase.Bytes.cs` to handle `IndexOp.CreateNoParams` and use `SerializePlainIndexHeaderAsync` for serialization.

Removed several `using` directives in `ElasticsearchChannelBase.Serialization.cs` and introduced static read-only spans for `PlainIndexBytesSpan` and `PlainCreateBytesSpan`. Added conditional compilation for serialization methods to use `ValueTask` and `ReadOnlySpan<byte>` for .NET 8.0 or greater, and `byte[]` for other versions.
  • Loading branch information
stevejgordon committed Oct 8, 2024
1 parent be24c65 commit 7d2f9be
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 16 deletions.
6 changes: 5 additions & 1 deletion src/Elastic.Channels/BufferedChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,11 @@ private async Task ConsumeOutboundEventsAsync()
}
}
await Task.WhenAll(_taskList).ConfigureAwait(false);
#if NET8_0_OR_GREATER
await _exitCancelSource.CancelAsync().ConfigureAwait(false);
#else
_exitCancelSource.Cancel();
#endif
_callbacks.OutboundChannelExitedCallback?.Invoke();
}

Expand Down Expand Up @@ -422,7 +426,7 @@ async Task<bool> AsyncSlowPath(IOutboundBuffer<TEvent> b)
}

/// <inheritdoc cref="object.ToString"/>>
public override string ToString()
public override string? ToString()
{
if (DiagnosticsListener == null) return base.ToString();
var sb = new StringBuilder();
Expand Down
2 changes: 1 addition & 1 deletion src/Elastic.Channels/Elastic.Channels.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
<TargetFrameworks>netstandard2.0;netstandard2.1;net8.0</TargetFrameworks>
<Description>Provides components to build a buffer-backed channel that flushes batches of data in a controlled (Max N || Max Duration) manner.</Description>
<PackageTags>elastic, channels, buffer</PackageTags>
<LangVersion>latest</LangVersion>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
<TargetFrameworks>netstandard2.0;netstandard2.1;net8.0</TargetFrameworks>
<Description>Offers an easy to use ChannelWriter implementation to push data concurrently to Elasticsearch using Elastic.Transport</Description>
<PackageTags>elastic, channels, elasticsearch, ingest</PackageTags>
<LangVersion>latest</LangVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@ public async Task WriteBufferToStreamAsync(
switch (op)
{
case IndexOp.IndexNoParams:
await SerializePlainIndexHeaderAsync(stream, ctx).ConfigureAwait(false);
await ElasticsearchChannelBase<TEvent, TChannelOptions>.SerializePlainIndexHeaderAsync(stream, ctx).ConfigureAwait(false);
break;
case IndexOp.CreateNoParams:
await SerializePlainCreateHeaderAsync(stream, ctx).ConfigureAwait(false);
break;
case IndexOp.Index:
case IndexOp.Create:
Expand All @@ -150,8 +153,6 @@ public async Task WriteBufferToStreamAsync(
MutateHeader(ref header);
await SerializeHeaderAsync(stream, ref header, SerializerOptions, ctx).ConfigureAwait(false);
break;


}

await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,14 @@
// See the LICENSE file in the project root for more information

using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Channels;
using Elastic.Channels.Diagnostics;
using Elastic.Ingest.Elasticsearch.DataStreams;
using Elastic.Ingest.Elasticsearch.Indices;
using Elastic.Ingest.Elasticsearch.Serialization;
using Elastic.Ingest.Transport;
using Elastic.Transport;
using Elastic.Transport.Products.Elasticsearch;
using static Elastic.Ingest.Elasticsearch.ElasticsearchChannelStatics;

namespace Elastic.Ingest.Elasticsearch;

Expand All @@ -30,9 +22,46 @@ public abstract partial class ElasticsearchChannelBase<TEvent, TChannelOptions>
: TransportChannelBase<TChannelOptions, TEvent, BulkResponse, BulkResponseItem>
where TChannelOptions : ElasticsearchChannelOptionsBase<TEvent>
{
private static ReadOnlySpan<byte> PlainIndexBytesSpan => """
{"index":{}}
"""u8;

private static ReadOnlySpan<byte> PlainCreateBytesSpan => """
{"create":{}}
"""u8;

#if NETSTANDARD

private static byte[] PlainIndexBytes => PlainIndexBytesSpan.ToArray();

private static byte[] PlainCreateBytes => PlainCreateBytesSpan.ToArray();
#endif

private Task SerializeHeaderAsync(Stream stream, ref readonly BulkHeader header, JsonSerializerOptions serializerOptions, CancellationToken ctx) =>
throw new NotImplementedException();

private Task SerializePlainIndexHeaderAsync(Stream stream, CancellationToken ctx) =>
throw new NotImplementedException();

#if NET8_0_OR_GREATER
private static ValueTask SerializePlainIndexHeaderAsync(Stream stream, CancellationToken ctx = default)
{
stream.Write(PlainIndexBytesSpan);
return ValueTask.CompletedTask;
}
#else
private static async ValueTask SerializePlainIndexHeaderAsync(Stream stream, CancellationToken ctx) =>
await stream.WriteAsync(PlainIndexBytes, 0, PlainIndexBytes.Length, ctx).ConfigureAwait(false);
#endif

#if NET8_0_OR_GREATER
private static ValueTask SerializePlainCreateHeaderAsync(Stream stream, CancellationToken ctx = default)
{
stream.Write(PlainCreateBytesSpan);
return ValueTask.CompletedTask;
}
#else
private static async ValueTask SerializePlainCreateHeaderAsync(Stream stream, CancellationToken ctx) =>
await stream.WriteAsync(PlainCreateBytes, 0, PlainCreateBytes.Length, ctx).ConfigureAwait(false);
#endif
}

0 comments on commit 7d2f9be

Please sign in to comment.