diff --git a/src/Elastic.Channels/BufferedChannelBase.cs b/src/Elastic.Channels/BufferedChannelBase.cs index ae2128c..1deaf41 100644 --- a/src/Elastic.Channels/BufferedChannelBase.cs +++ b/src/Elastic.Channels/BufferedChannelBase.cs @@ -310,7 +310,11 @@ private async Task ConsumeOutboundEventsAsync() } } await Task.WhenAll(_taskList).ConfigureAwait(false); +#if NET8_0_OR_GREATER + await _exitCancelSource.CancelAsync().ConfigureAwait(false); +#else _exitCancelSource.Cancel(); +#endif _callbacks.OutboundChannelExitedCallback?.Invoke(); } @@ -422,7 +426,7 @@ async Task AsyncSlowPath(IOutboundBuffer b) } /// > - public override string ToString() + public override string? ToString() { if (DiagnosticsListener == null) return base.ToString(); var sb = new StringBuilder(); diff --git a/src/Elastic.Channels/Elastic.Channels.csproj b/src/Elastic.Channels/Elastic.Channels.csproj index 4f0e70a..165103f 100644 --- a/src/Elastic.Channels/Elastic.Channels.csproj +++ b/src/Elastic.Channels/Elastic.Channels.csproj @@ -1,7 +1,7 @@ - netstandard2.0;netstandard2.1 + netstandard2.0;netstandard2.1;net8.0 Provides components to build a buffer-backed channel that flushes batches of data in a controlled (Max N || Max Duration) manner. elastic, channels, buffer latest diff --git a/src/Elastic.Ingest.Elasticsearch/Elastic.Ingest.Elasticsearch.csproj b/src/Elastic.Ingest.Elasticsearch/Elastic.Ingest.Elasticsearch.csproj index 6e537ed..730ec27 100644 --- a/src/Elastic.Ingest.Elasticsearch/Elastic.Ingest.Elasticsearch.csproj +++ b/src/Elastic.Ingest.Elasticsearch/Elastic.Ingest.Elasticsearch.csproj @@ -1,7 +1,7 @@ - netstandard2.0;netstandard2.1 + netstandard2.0;netstandard2.1;net8.0 Offers an easy to use ChannelWriter implementation to push data concurrently to Elasticsearch using Elastic.Transport elastic, channels, elasticsearch, ingest latest diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs index 4db13a1..779bba8 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs @@ -140,7 +140,10 @@ public async Task WriteBufferToStreamAsync( switch (op) { case IndexOp.IndexNoParams: - await SerializePlainIndexHeaderAsync(stream, ctx).ConfigureAwait(false); + await ElasticsearchChannelBase.SerializePlainIndexHeaderAsync(stream, ctx).ConfigureAwait(false); + break; + case IndexOp.CreateNoParams: + await SerializePlainCreateHeaderAsync(stream, ctx).ConfigureAwait(false); break; case IndexOp.Index: case IndexOp.Create: @@ -150,8 +153,6 @@ public async Task WriteBufferToStreamAsync( MutateHeader(ref header); await SerializeHeaderAsync(stream, ref header, SerializerOptions, ctx).ConfigureAwait(false); break; - - } await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false); diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs index 6358864..953a360 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs @@ -3,22 +3,14 @@ // See the LICENSE file in the project root for more information using System; -using System.Buffers; -using System.Collections.Generic; using System.IO; -using System.Linq; using System.Text.Json; using System.Threading; using System.Threading.Tasks; -using Elastic.Channels; -using Elastic.Channels.Diagnostics; using Elastic.Ingest.Elasticsearch.DataStreams; using Elastic.Ingest.Elasticsearch.Indices; using Elastic.Ingest.Elasticsearch.Serialization; using Elastic.Ingest.Transport; -using Elastic.Transport; -using Elastic.Transport.Products.Elasticsearch; -using static Elastic.Ingest.Elasticsearch.ElasticsearchChannelStatics; namespace Elastic.Ingest.Elasticsearch; @@ -30,9 +22,46 @@ public abstract partial class ElasticsearchChannelBase : TransportChannelBase where TChannelOptions : ElasticsearchChannelOptionsBase { + private static ReadOnlySpan PlainIndexBytesSpan => """ + {"index":{}} + + """u8; + + private static ReadOnlySpan PlainCreateBytesSpan => """ + {"create":{}} + + """u8; + +#if NETSTANDARD + + private static byte[] PlainIndexBytes => PlainIndexBytesSpan.ToArray(); + + private static byte[] PlainCreateBytes => PlainCreateBytesSpan.ToArray(); +#endif + private Task SerializeHeaderAsync(Stream stream, ref readonly BulkHeader header, JsonSerializerOptions serializerOptions, CancellationToken ctx) => throw new NotImplementedException(); - private Task SerializePlainIndexHeaderAsync(Stream stream, CancellationToken ctx) => - throw new NotImplementedException(); + +#if NET8_0_OR_GREATER + private static ValueTask SerializePlainIndexHeaderAsync(Stream stream, CancellationToken ctx = default) + { + stream.Write(PlainIndexBytesSpan); + return ValueTask.CompletedTask; + } +#else + private static async ValueTask SerializePlainIndexHeaderAsync(Stream stream, CancellationToken ctx) => + await stream.WriteAsync(PlainIndexBytes, 0, PlainIndexBytes.Length, ctx).ConfigureAwait(false); +#endif + +#if NET8_0_OR_GREATER + private static ValueTask SerializePlainCreateHeaderAsync(Stream stream, CancellationToken ctx = default) + { + stream.Write(PlainCreateBytesSpan); + return ValueTask.CompletedTask; + } +#else + private static async ValueTask SerializePlainCreateHeaderAsync(Stream stream, CancellationToken ctx) => + await stream.WriteAsync(PlainCreateBytes, 0, PlainCreateBytes.Length, ctx).ConfigureAwait(false); +#endif }