Skip to content

Commit

Permalink
beta3 release changes (#767)
Browse files Browse the repository at this point in the history
* beta3 release changes

* small readme fixes

* review changes
  • Loading branch information
mhowlett authored Feb 7, 2019
1 parent eee0de8 commit 400125f
Show file tree
Hide file tree
Showing 15 changed files with 55 additions and 43 deletions.
28 changes: 17 additions & 11 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@
## New Features

- Revamped producer and consumer serialization functionality.
- All producer functionality except the public methods used to produce messages is now provided by `ProducerBase`.
- There are two producer classes deriving from `ProducerBase`: `Producer` and `Producer<TKey, TValue>`.
- `Producer` is specialized for the case of producing messages with `byte[]` keys and values.
- `Producer<TKey, TValue>` provides flexible integration with serialization functionality.
- On the consumer side, there are analogous classes: `ConsumerBase`, `Consumer` and `Consumer<TKey, TValue>`.
- There are two types of serializer and deserializer: `ISerializer<T>` / `IAsyncSerializer<T>` and `IDeserializer<T>` / `IAsyncDeserializer<T>`.
- There are now two types of serializer and deserializer: `ISerializer<T>` / `IAsyncSerializer<T>` and `IDeserializer<T>` / `IAsyncDeserializer<T>`.
- `ISerializer<T>`/`IDeserializer<T>` are appropriate for most use cases.
- `IAsyncSerializer<T>`/`IAsyncDeserializer<T>` are more general, but less performant (they return `Task`s).
- The generic producer and consumer can be used with both types of serializer.
- `IAsyncSerializer<T>`/`IAsyncDeserializer<T>` are async friendly, but less performant (they return `Task`s).
- Changed the name of `Confluent.Kafka.Avro` to `Confluent.SchemaRegistry.Serdes` (Schema Registry may support other serialization formats in the future).
- Added a example demonstrating working with protobuf serialized data.
- Added an example demonstrating working with protobuf serialized data.
- `Consumer`s, `Producer`s and `AdminClient`s are now constructed using builder classes.
- This is more verbose, but provides a sufficiently flexible and future proof API for specifying serdes and other configuration information.
- All `event`s on the client classes have been replaced with corresponding `Set...Handler` methods on the builder classes.
- This allows (enforces) handlers are set on librdkafka initialization (which is important for some handlers, particularly the log handler).
- `event`s allow for more than one handler to be set, but this is often not appropriate (e.g. `OnPartitionsAssigned`), and never necessary. This is no longer possible.
- `event`s are also not async friendly (handlers can't return `Task`). The Set...Handler appropach can be extend in such a way that it is.
- Avro serdes no longer make blocking calls to `ICachedSchemaRegistryClient` - everything is `await`ed.
- References librdkafka.redist [1.0.0-RC5](https://github.com/edenhill/librdkafka/releases/tag/v1.0.0-RC5)
- Note: The `Consumer` implementation still calls async deserializers synchronously because the `Consumer` API is still otherwise fully synchronous.
- Reference librdkafka.redist [1.0.0-RC7](https://github.com/edenhill/librdkafka/releases/tag/v1.0.0-RC7)
- Notable features: idempotent producer, sparse connections, KIP-62 (max.poll.interval.ms).
- Note: End of partition notification is now disabled by default (enable using the `EnablePartitionEof` config property).
- Removed `Consumer.OnPartitionEOF` in favor of `ConsumeResult.IsPartitionEOF`.
- Removed the `Consumer.OnPartitionEOF` event in favor notifying of partition eof via `ConsumeResult.IsPartitionEOF`.
- Removed `ErrorEvent` class and added `IsFatal` to `Error` class.
- The `IsFatal` flag is now set appropriately for all errors (previously it was always set to `false`).
- Added `PersistenceStatus` property to `DeliveryResult`, which provides information on the persitence status of the message.
Expand All @@ -26,6 +28,10 @@

- Added `Close` method to `IConsumer` interface.
- Changed the name of `ProduceException.DeliveryReport` to `ProduceException.DeliveryResult`.
- Fixed bug where enum config property couldn't be read after setting it.
- Added `SchemaRegistryBasicAuthCredentialsSource` back into `SchemaRegistryConfig` (#679).
- Fixed schema registry client failover connection issue (#737).
- Improvements to librdkafka dependnecy discovery (#743).


# 1.0.0-beta2
Expand Down
42 changes: 24 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ confluent-kafka-dotnet is distributed via NuGet. We provide three packages:
To install Confluent.Kafka from within Visual Studio, search for Confluent.Kafka in the NuGet Package Manager UI, or run the following command in the Package Manager Console:

```
Install-Package Confluent.Kafka -Version 1.0-beta2
Install-Package Confluent.Kafka -Version 1.0.0-beta3
```

To add a reference to a dotnet core project, execute the following at the command line:

```
dotnet add package -v 1.0-beta2 Confluent.Kafka
dotnet add package -v 1.0.0-beta3 Confluent.Kafka
```

**Note:** We recommend using the `1.0-beta2` version of Confluent.Kafka for new projects in preference to the most recent stable release (0.11.5).
**Note:** We recommend using the `1.0.0-beta3` version of Confluent.Kafka for new projects in preference to the most recent stable release (0.11.6).
The 1.0 API provides more features, is considerably improved and is more performant than 0.11.x releases. In choosing the label 'beta',
we are signaling that we do not anticipate making any high impact changes to the API before the 1.0 release, however be warned that some
breaking changes are still planned. You can track progress and provide feedback on the new 1.0 API
Expand Down Expand Up @@ -90,10 +90,10 @@ class Program
{
var config = new ProducerConfig { BootstrapServers = "localhost:9092" };

// If serializers are not specified as constructor arguments, default
// serializers from `Confluent.Kafka.Serializers` will be automatically
// used where available. Note: by default strings are encoded as UTF8.
using (var p = new Producer<Null, string>(config))
// If serializers are not specified, default serializers from
// `Confluent.Kafka.Serializers` will be automatically used where
// available. Note: by default strings are encoded as UTF8.
using (var p = new ProducerBuilder<Null, string>(config).Build())
{
try
{
Expand Down Expand Up @@ -125,12 +125,12 @@ class Program
{
var conf = new ProducerConfig { BootstrapServers = "localhost:9092" };

Action<DeliveryReportResult<Null, string>> handler = r =>
Action<DeliveryReport<Null, string>> handler = r =>
Console.WriteLine(!r.Error.IsError
? $"Delivered message to {r.TopicPartitionOffset}"
: $"Delivery Error: {r.Error.Reason}");

using (var p = new Producer<Null, string>(conf))
using (var p = new ProducerBuilder<Null, string>(conf).Build())
{
for (int i=0; i<100; ++i)
{
Expand All @@ -148,6 +148,7 @@ class Program

```csharp
using System;
using System.Threading;
using Confluent.Kafka;

class Program
Expand All @@ -163,29 +164,34 @@ class Program
// topic/partitions of interest. By default, offsets are committed
// automatically, so in this example, consumption will only start from the
// earliest message in the topic 'my-topic' the first time you run the program.
AutoOffsetReset = AutoOffsetResetType.Earliest
AutoOffsetReset = AutoOffsetReset.Earliest
};

using (var c = new Consumer<Ignore, string>(conf))
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe("my-topic");

bool consuming = true;
// The client will automatically recover from non-fatal errors. You typically
// don't need to take any action unless an error is marked as fatal.
c.OnError += (_, e) => consuming = !e.IsFatal;
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};

while (consuming)
while (!cts.IsCancellationRequested)
{
try
{
var cr = c.Consume();
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
catch (OperationCanceledException)
{
break;
}
}

// Ensure the consumer leaves the group cleanly and final offsets are committed.
Expand Down Expand Up @@ -219,7 +225,7 @@ For more information about working with Avro in .NET, refer to the the blog post

### Error Handling

Errors raised via a client's `OnError` event should be considered informational except when the `IsFatal` flag
Errors delivered to a client's error handler should be considered informational except when the `IsFatal` flag
is set to `true`, indicating that the client is in an un-recoverable state. Currently, this can only happen on
the producer, and only when `enable.itempotence` has been set to `true`. In all other scenarios, clients are
able to recover from all errors automatically.
Expand Down
2 changes: 1 addition & 1 deletion examples/AdminClient/AdminClient.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.0-beta2" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.0.0-beta3" /> -->
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion examples/AvroBlogExamples/AvroBlogExamples.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes" Version="1.0-beta2" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes" Version="1.0.0-beta3" /> -->
<ProjectReference Include="../../src/Confluent.SchemaRegistry.Serdes/Confluent.SchemaRegistry.Serdes.csproj" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion examples/AvroGeneric/AvroGeneric.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes" Version="1.0-beta2" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes" Version="1.0.0-beta3" /> -->
<ProjectReference Include="../../src/Confluent.SchemaRegistry.Serdes/Confluent.SchemaRegistry.Serdes.csproj" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion examples/AvroSpecific/AvroSpecific.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes" Version="1.0-beta2" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes" Version="1.0.0-beta3" /> -->
<ProjectReference Include="../../src/Confluent.SchemaRegistry.Serdes/Confluent.SchemaRegistry.Serdes.csproj" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion examples/ConfluentCloud/ConfluentCloud.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.0-beta2" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.0.0-beta3" /> -->
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion examples/Consumer/Consumer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.0-beta2" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.0.0-beta3" /> -->
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion examples/MultiProducer/MultiProducer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.0-beta2" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.0.0-beta3" /> -->
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion examples/Producer/Producer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.0-beta2" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.0.0-beta3" /> -->
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion examples/Protobuf/Protobuf.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.0-beta2" /> -->
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.0.0-beta3" /> -->
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
<PackageReference Include="Grpc.Tools" Version="1.16.0" />
<PackageReference Include="Google.Protobuf" Version="3.6.1" />
Expand Down
4 changes: 2 additions & 2 deletions src/Confluent.Kafka/Confluent.Kafka.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
<PackageId>Confluent.Kafka</PackageId>
<Title>Confluent.Kafka</Title>
<AssemblyName>Confluent.Kafka</AssemblyName>
<VersionPrefix>1.0-beta2</VersionPrefix>
<VersionPrefix>1.0.0-beta3</VersionPrefix>
<TargetFrameworks>net45;net46;netcoreapp2.1;netstandard1.3;</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="librdkafka.redist" Version="1.0.0-RC5">
<PackageReference Include="librdkafka.redist" Version="1.0.0-RC7">
<PrivateAssets Condition="'$(TargetFrameworkIdentifier)' == '.NETFramework'">None</PrivateAssets>
</PackageReference>
<PackageReference Include="System.Runtime.CompilerServices.Unsafe" Version="4.5.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<PackageId>Confluent.SchemaRegistry.Serdes</PackageId>
<Title>Confluent.SchemaRegistry.Serdes</Title>
<AssemblyName>Confluent.SchemaRegistry.Serdes</AssemblyName>
<VersionPrefix>1.0-beta2</VersionPrefix>
<VersionPrefix>1.0.0-beta3</VersionPrefix>
<TargetFrameworks>netstandard2.0;net452;</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<PackageId>Confluent.SchemaRegistry</PackageId>
<Title>Confluent.SchemaRegistry</Title>
<AssemblyName>Confluent.SchemaRegistry</AssemblyName>
<VersionPrefix>1.0-beta2</VersionPrefix>
<VersionPrefix>1.0.0-beta3</VersionPrefix>
<TargetFrameworks>net452;netstandard1.4;</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="librdkafka.redist" Version="1.0.0-RC5" />
<PackageReference Include="librdkafka.redist" Version="1.0.0-RC7" />
</ItemGroup>

<ItemGroup>
Expand Down

0 comments on commit 400125f

Please sign in to comment.