diff --git a/src/kafunk/Protocol.fs b/src/kafunk/Protocol.fs index beb5990..fc77e43 100644 --- a/src/kafunk/Protocol.fs +++ b/src/kafunk/Protocol.fs @@ -796,18 +796,30 @@ module Protocol = let numRecords = buf.ReadInt32() if numRecords < 0 then failwithf "invalid_record_count|num_records=%i compression=%i mb=%i first_offset=%i mc=%i" numRecords compression magicByte firstOffset mss.Count - match compression with - | CompressionCodec.None -> - MessageSet.ReadRecords (buf,magicByte,numRecords,firstOffset,timestampType,firstTimestamp,maxTimestamp,mss) - | compression -> - let recordsLength = sizeInBytes - RecordBatch.RECORD_BATCH_OVERHEAD - if buf.Buffer.Count < recordsLength then - buf.ShiftOffset buf.Buffer.Count - else - let compressedValue = buf.Slice recordsLength - let decompressedValue = CompressionCodec.decompress compression compressedValue - MessageSet.ReadRecords (BinaryZipper(decompressedValue),magicByte,numRecords,firstOffset,timestampType,firstTimestamp,maxTimestamp,mss) - buf.ShiftOffset recordsLength + let lastOffset : Offset = + match compression with + | CompressionCodec.None -> + //let c0 = mss.Count + MessageSet.ReadRecords (buf,magicByte,numRecords,firstOffset,timestampType,firstTimestamp,maxTimestamp,mss) + if mss.Count > 0 then + let lastMessage = mss.[mss.Count - 1] + //let count = mss.Count - c0 + //if lastMessage.offset <> lastOffset then + // failwithf "unmatched_offsets|batch_last_offset=%i message_last_offset=%i count=%i num_records=%i" lastOffset lastMessage.offset count numRecords + lastMessage.offset + //lastOffset + else + lastOffset + | compression -> + let recordsLength = sizeInBytes - RecordBatch.RECORD_BATCH_OVERHEAD + if buf.Buffer.Count < recordsLength then + buf.ShiftOffset buf.Buffer.Count + else + let compressedValue = buf.Slice recordsLength + let decompressedValue = CompressionCodec.decompress compression compressedValue + MessageSet.ReadRecords (BinaryZipper(decompressedValue),magicByte,numRecords,firstOffset,timestampType,firstTimestamp,maxTimestamp,mss) + buf.ShiftOffset recordsLength + lastOffset if checkCrc then let crcCount = buf.Buffer.Count - attributesOffset let crc = Crc.crc32C buf.Buffer.Array attributesOffset crcCount diff --git a/tests/kafunk.Tests/AsyncTest.fsx b/tests/kafunk.Tests/AsyncTest.fsx index 575ec2f..9d53353 100644 --- a/tests/kafunk.Tests/AsyncTest.fsx +++ b/tests/kafunk.Tests/AsyncTest.fsx @@ -123,4 +123,17 @@ let go3 = async { |> Async.parallelThrottledIgnore 1000 } -Async.RunSynchronously go2 \ No newline at end of file +let go4 = async { + let t : Task = Task.never + let N = 1000000 + return! + Seq.init N id + |> Seq.map (fun i -> async { + let delay = Async.Sleep 100 |> Async.StartAsTask + let! t = Task.WhenAny [| t ; delay |] |> Async.AwaitTask + let r = t.Result + return () }) + |> Async.parallelThrottledIgnore 1000 +} + +Async.RunSynchronously go4 \ No newline at end of file diff --git a/tests/kafunk.Tests/ConfluentConsumer.fsx b/tests/kafunk.Tests/ConfluentConsumer.fsx index bc13771..0fedac2 100644 --- a/tests/kafunk.Tests/ConfluentConsumer.fsx +++ b/tests/kafunk.Tests/ConfluentConsumer.fsx @@ -5,6 +5,7 @@ open System open System.Text open System.Collections.Generic +open System.Collections.Concurrent open System.Diagnostics open System.Threading open Kafunk @@ -69,8 +70,19 @@ let go = async { let md = consumer.GetMetadata(true) Log.info "metadata|%A" md.Topics + let partitionOffsets = new ConcurrentDictionary () + let handle (m:Message) = async { - Log.info "handing message|p=%i key=%s" m.Partition (Encoding.UTF8.GetString m.Key) + //Log.info "handing message|p=%i key=%s" m.Partition (Encoding.UTF8.GetString m.Key) + let offset = m.Offset.Value + match partitionOffsets.TryGetValue (m.Partition) with + | true, lastOffset -> + if (lastOffset + 1L < offset) then + let gap = offset - (lastOffset + 1L) + failwithf "non_contig_offsets_detected|partition=%i last_offset=%i current_offset=%i gap=%i" m.Partition lastOffset offset gap + | _ -> () + partitionOffsets.[m.Partition] <- offset + return () } use counter = Metrics.counter Log 5000 diff --git a/tests/kafunk.Tests/Consumer.fsx b/tests/kafunk.Tests/Consumer.fsx index 89b3594..144eb72 100644 --- a/tests/kafunk.Tests/Consumer.fsx +++ b/tests/kafunk.Tests/Consumer.fsx @@ -4,6 +4,7 @@ open FSharp.Control open Kafunk open System +open System.Collections.Concurrent //Log.MinLevel <- LogLevel.Trace let Log = Log.create __SOURCE_FILE__ @@ -31,7 +32,7 @@ let go = async { tcpConfig = chanConfig, requestRetryPolicy = KafkaConfig.DefaultRequestRetryPolicy, version = Versions.V_0_10_1, - autoApiVersions = true, + //autoApiVersions = true, //version = Versions.V_0_9_0, //autoApiVersions = false, clientId = "leo") @@ -68,20 +69,33 @@ let go = async { let! _ = Async.StartChild showProgress + let partitionOffsets = new ConcurrentDictionary () + let handle (s:ConsumerState) (ms:ConsumerMessageSet) = async { - //use! _cnc = Async.OnCancel (fun () -> Log.warn "cancelling_handler") - //for m in ms.messageSet.messages do - // Log.info "key=%s" (Binary.toString m.message.key) - Log.trace "consuming_message_set|topic=%s partition=%i count=%i size=%i os=[%i-%i] ts=[%O] hwo=%i lag=%i" - ms.topic - ms.partition - (ms.messageSet.messages.Length) - (ConsumerMessageSet.size ms) - (ConsumerMessageSet.firstOffset ms) - (ConsumerMessageSet.lastOffset ms) - (ConsumerMessageSet.firstTimestamp ms) - (ms.highWatermarkOffset) - (ConsumerMessageSet.lag ms) + + do! Async.Sleep 5000 + + if ms.partition = 0 then + Log.info "consuming_message_set|topic=%s partition=%i count=%i size=%i os=[%i-%i] ts=[%O] hwo=%i lag=%i" + ms.topic + ms.partition + (ms.messageSet.messages.Length) + (ConsumerMessageSet.size ms) + (ConsumerMessageSet.firstOffset ms) + (ConsumerMessageSet.lastOffset ms) + (ConsumerMessageSet.firstTimestamp ms) + (ms.highWatermarkOffset) + (ConsumerMessageSet.lag ms) + + for msi in ms.messageSet.messages do + match partitionOffsets.TryGetValue (ms.partition) with + | true, lastOffset -> + if (lastOffset + 1L < msi.offset) then + let gap = msi.offset - (lastOffset + 1L) + failwithf "non_contig_offsets_detected|partition=%i last_offset=%i current_offset=%i gap=%i" ms.partition lastOffset msi.offset gap + | _ -> () + partitionOffsets.[ms.partition] <- msi.offset + return () } use counter = Metrics.counter Log 5000 diff --git a/tests/kafunk.Tests/ProducerConsumer.fsx b/tests/kafunk.Tests/ProducerConsumer.fsx index 576a26e..ed5b42d 100644 --- a/tests/kafunk.Tests/ProducerConsumer.fsx +++ b/tests/kafunk.Tests/ProducerConsumer.fsx @@ -20,6 +20,8 @@ let batchSize = argiDefault 4 "1000" |> Int32.Parse let consumerCount = argiDefault 5 "1" |> Int32.Parse let producerThreads = argiDefault 6 "100" |> Int32.Parse +let contigDeltaThreshold = 200000 + let testId = Guid.NewGuid().ToString("n") let consumerGroup = "kafunk-producer-consumer-test-" + testId @@ -153,7 +155,7 @@ module Reporter = let report () = let r = new Report(ack.Received, ack.Duplicates, ack.Sent, ack.Contig) - printReport r + //printReport r r let rec loop () = async { @@ -196,14 +198,13 @@ let monitor = async { do! Async.Sleep 5000 let! report = Reporter.report reporter printReport report - if (report.received - report.contigCount) > 100000 then + if (report.received - report.contigCount) > contigDeltaThreshold then Log.error "contig_delta_surpassed_threshold" IVar.tryPut () completed |> ignore } // ---------------------------------------------------------------------------------------------------------------------------------- - let producer = async { let message (messageNumber:int) = @@ -218,7 +219,12 @@ let producer = async { Log.info "starting_producer_process|batch_count=%i" batchCount - let connCfg = KafkaConfig.create ([KafkaUri.parse host], tcpConfig = chanConfig) + let connCfg = + KafkaConfig.create ( + [KafkaUri.parse host], + tcpConfig = chanConfig, + version = Versions.V_0_10_1) + use! conn = Kafka.connAsync connCfg let producerCfg = @@ -252,8 +258,37 @@ let producer = async { let consumer = async { + let partitionOffsets = new ConcurrentDictionary () + let handle (_:ConsumerState) (ms:ConsumerMessageSet) = async { + //failwithf "testing ERRORs!" + //Log.error "testing error!" + + //let firstOffset' = ConsumerMessageSet.firstOffset ms + + //let mutable lastOffset = 0L + //if partitionOffsets.TryGetValue (ms.partition, &lastOffset) then + // if firstOffset' > lastOffset + 1L then + // failwithf "offset_gap_detected|partition=%i last_offset=%i first_offset_next_batch=%i" ms.partition lastOffset firstOffset' + + //partitionOffsets.[ms.partition] <- ConsumerMessageSet.lastOffset ms + + //ms.messageSet.messages + //|> Seq.pairwise + //|> Seq.iter (fun (msi1,msi2) -> + // if msi1.offset + 1L <> msi2.offset then + // failwithf "non_contiguous_offsets_detected|offset1=%i offset2=%i" msi1.offset msi2.offset + // ()) + + for msi in ms.messageSet.messages do + match partitionOffsets.TryGetValue (ms.partition) with + | true, lastOffset -> + if (lastOffset + 1L <> msi.offset) then + failwithf "non_contig_offsets|partition=%i last_offset=%i current_offset=%i" ms.partition lastOffset msi.offset + | _ -> () + partitionOffsets.[ms.partition] <- msi.offset + let values = ms.messageSet.messages |> Seq.choose (fun m -> @@ -270,7 +305,8 @@ let consumer = async { let connCfg = KafkaConfig.create ( [KafkaUri.parse host], - tcpConfig = chanConfig) + tcpConfig = chanConfig, + version = Versions.V_0_10_1) use! conn = Kafka.connAsync connCfg let consumerCfg =