Skip to content

Commit

Permalink
remove old BulkOperationHeader.cs
Browse files Browse the repository at this point in the history
  • Loading branch information
Mpdreamz committed Oct 8, 2024
1 parent 6453743 commit 9991c38
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ public class BulkRequestCreationForDataStreamBenchmarks
private ITransport? _transport;
private TransportConfiguration? _transportConfiguration;
private StockData[] _data = Array.Empty<StockData>();
private readonly BulkOperationHeader _bulkOperationHeader = new CreateOperation();

public Stream MemoryStream { get; } = new MemoryStream();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ namespace Elastic.Ingest.Elasticsearch.DataStreams;
/// <summary> A channel to push messages to Elasticsearch data streams </summary>
public class DataStreamChannel<TEvent> : ElasticsearchChannelBase<TEvent, DataStreamChannelOptions<TEvent>>
{
private readonly CreateOperation _fixedHeader;
private readonly string _url;

/// <inheritdoc cref="DataStreamChannel{TEvent}"/>
Expand All @@ -25,12 +24,10 @@ public DataStreamChannel(DataStreamChannelOptions<TEvent> options, ICollection<I
var dataStream = Options.DataStream.ToString();

_url = $"{dataStream}/{base.BulkUrl}";

_fixedHeader = new CreateOperation();
}

/// <inheritdoc cref="GetIndexOp"/>
protected override IndexOp GetIndexOp(TEvent @event) => IndexOp.CreateNoParams;
protected override HeaderSerialization GetIndexOp(TEvent @event) => HeaderSerialization.CreateNoParams;

/// <inheritdoc cref="MutateHeader"/>
protected override void MutateHeader(TEvent @event, ref BulkHeader header) { }
Expand Down
88 changes: 10 additions & 78 deletions src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,6 @@

namespace Elastic.Ingest.Elasticsearch;

/// <summary> TODO </summary>
public enum IndexOp
{
/// <summary> </summary>
Index,
/// <summary> </summary>
IndexNoParams,
/// <summary> </summary>
Create,
/// <summary> </summary>
CreateNoParams,
/// <summary> </summary>
Delete,
/// <summary> </summary>
Update,
}

/// <summary> TODO </summary>
public struct BulkHeader
{

/// <summary> TODO </summary>
public string Index { get; set; }

/// <summary> TODO </summary>
public string? Id { get; set; }
}

/// <summary>
/// An abstract base class for both <see cref="DataStreamChannel{TEvent}"/> and <see cref="IndexChannel{TEvent}"/>
/// <para>Coordinates most of the sending to- and bootstrapping of Elasticsearch</para>
Expand All @@ -53,7 +25,7 @@ public abstract partial class ElasticsearchChannelBase<TEvent, TChannelOptions>
where TChannelOptions : ElasticsearchChannelOptionsBase<TEvent>
{
/// <summary> TODO </summary>
protected abstract IndexOp GetIndexOp(TEvent @event);
protected abstract HeaderSerialization GetIndexOp(TEvent @event);

/// <summary> </summary>
/// <param name="event"></param>
Expand Down Expand Up @@ -88,16 +60,16 @@ public async Task WriteBufferToStreamAsync(
var op = GetIndexOp(@event);
switch (op)
{
case IndexOp.IndexNoParams:
await ElasticsearchChannelBase<TEvent, TChannelOptions>.SerializePlainIndexHeaderAsync(stream, ctx).ConfigureAwait(false);
case HeaderSerialization.IndexNoParams:
await SerializePlainIndexHeaderAsync(stream, ctx).ConfigureAwait(false);
break;
case IndexOp.CreateNoParams:
case HeaderSerialization.CreateNoParams:
await SerializePlainCreateHeaderAsync(stream, ctx).ConfigureAwait(false);
break;
case IndexOp.Index:
case IndexOp.Create:
case IndexOp.Delete:
case IndexOp.Update:
case HeaderSerialization.Index:
case HeaderSerialization.Create:
case HeaderSerialization.Delete:
case HeaderSerialization.Update:
var header = new BulkHeader();
MutateHeader(@event, ref header);
await SerializeHeaderAsync(stream, ref header, SerializerOptions, ctx).ConfigureAwait(false);
Expand All @@ -106,7 +78,7 @@ public async Task WriteBufferToStreamAsync(

await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false);

if (op == IndexOp.Update)
if (op == HeaderSerialization.Update)
await stream.WriteAsync(DocUpdateHeaderStart, 0, DocUpdateHeaderStart.Length, ctx).ConfigureAwait(false);

if (options.EventWriter?.WriteToStreamAsync != null)
Expand All @@ -115,50 +87,10 @@ public async Task WriteBufferToStreamAsync(
await JsonSerializer.SerializeAsync(stream, @event, SerializerOptions, ctx)
.ConfigureAwait(false);

if (op == IndexOp.Update)
if (op == HeaderSerialization.Update)
await stream.WriteAsync(DocUpdateHeaderEnd, 0, DocUpdateHeaderEnd.Length, ctx).ConfigureAwait(false);

await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false);
}
}

/// <summary>
/// Create the bulk operation header with the appropriate action and meta data for a bulk request targeting an index.
/// </summary>
/// <param name="event">The <typeparamref name="TEvent"/> for which the header will be produced.</param>
/// <param name="options">The <see cref="IndexChannelOptions{TEvent}"/> for the channel.</param>
/// <param name="skipIndexName">Control whether the index name is included in the meta data for the operation.</param>
/// <returns>A <see cref="BulkOperationHeader"/> instance.</returns>
public static BulkOperationHeader CreateBulkOperationHeaderForIndex(TEvent @event, IndexChannelOptions<TEvent> options,
bool skipIndexName = false)
{
var indexTime = options.TimestampLookup?.Invoke(@event) ?? DateTimeOffset.Now;
if (options.IndexOffset.HasValue) indexTime = indexTime.ToOffset(options.IndexOffset.Value);

var index = skipIndexName ? string.Empty : string.Format(options.IndexFormat, indexTime);

var id = options.BulkOperationIdLookup?.Invoke(@event);

if (options.OperationMode == OperationMode.Index)
{
return skipIndexName
? !string.IsNullOrWhiteSpace(id) ? new IndexOperation { Id = id } : new IndexOperation()
: !string.IsNullOrWhiteSpace(id) ? new IndexOperation { Index = index, Id = id } : new IndexOperation { Index = index };
}

if (options.OperationMode == OperationMode.Create)
{
return skipIndexName
? !string.IsNullOrWhiteSpace(id) ? new CreateOperation { Id = id } : new CreateOperation()
: !string.IsNullOrWhiteSpace(id) ? new CreateOperation { Index = index, Id = id } : new CreateOperation { Index = index };
}

if (!string.IsNullOrWhiteSpace(id) && id != null && (options.BulkUpsertLookup?.Invoke(@event, id) ?? false))
return skipIndexName ? new UpdateOperation { Id = id } : new UpdateOperation { Id = id, Index = index };

return
!string.IsNullOrWhiteSpace(id)
? skipIndexName ? new IndexOperation { Id = id } : new IndexOperation { Index = index, Id = id }
: skipIndexName ? new CreateOperation() : new CreateOperation { Index = index };
}
}
10 changes: 5 additions & 5 deletions src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public IndexChannel(IndexChannelOptions<TEvent> options, ICollection<IChannelCal
protected override string BulkUrl => _url;

/// <inheritdoc cref="GetIndexOp"/>
protected override IndexOp GetIndexOp(TEvent @event)
protected override HeaderSerialization GetIndexOp(TEvent @event)
{
var indexTime = Options.TimestampLookup?.Invoke(@event) ?? DateTimeOffset.Now;
if (Options.IndexOffset.HasValue) indexTime = indexTime.ToOffset(Options.IndexOffset.Value);
Expand All @@ -52,12 +52,12 @@ protected override IndexOp GetIndexOp(TEvent @event)
var id = Options.BulkOperationIdLookup?.Invoke(@event);
if (string.IsNullOrWhiteSpace(index) && string.IsNullOrWhiteSpace(id))
return Options.OperationMode == OperationMode.Index
? IndexOp.IndexNoParams
: IndexOp.CreateNoParams;
? HeaderSerialization.IndexNoParams
: HeaderSerialization.CreateNoParams;

return Options.OperationMode == OperationMode.Index
? IndexOp.Index
: IndexOp.Create;
? HeaderSerialization.Index
: HeaderSerialization.Create;
}

/// <inheritdoc cref="MutateHeader"/>
Expand Down
25 changes: 25 additions & 0 deletions src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Collections.Generic;
using Elastic.Ingest.Elasticsearch.DataStreams;

namespace Elastic.Ingest.Elasticsearch.Serialization;

/// <summary> TODO </summary>
public struct BulkHeader
{

/// <summary> The index to write to, never set when writing using <see cref="DataStreamChannel{TEvent}"/> </summary>
public string? Index { get; set; }

/// <summary> The id of the object being written, never set when writing using <see cref="DataStreamChannel{TEvent}"/> </summary>
public string? Id { get; set; }

/// <summary> Require <see cref="Index"/> to point to an alias, never set when writing using <see cref="DataStreamChannel{TEvent}"/> </summary>
public bool? RequireAlias { get; set; }

/// <summary> TODO </summary>
public Dictionary<string, string>? DynamicTemplates { get; init; }
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

namespace Elastic.Ingest.Elasticsearch.Serialization;

/// <summary> TODO </summary>
public enum HeaderSerialization
{
/// <summary> </summary>
Index,
/// <summary> </summary>
IndexNoParams,
/// <summary> </summary>
Create,
/// <summary> </summary>
CreateNoParams,
/// <summary> </summary>
Delete,
/// <summary> </summary>
Update,
}

0 comments on commit 9991c38

Please sign in to comment.