diff --git a/.circleci/config.yml b/.circleci/config.yml index 329dec4f..96bcf028 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -54,6 +54,9 @@ jobs: key: kafka-go-mod-{{ checksum "go.sum" }}-1 paths: - /go/pkg/mod + - run: + name: Wait for kafka + command: ./scripts/wait-for-kafka.sh - run: name: Test kafka-go command: go test -race -cover ./... diff --git a/addoffsetstotxn_test.go b/addoffsetstotxn_test.go index 56d6ee46..d86a5fd9 100644 --- a/addoffsetstotxn_test.go +++ b/addoffsetstotxn_test.go @@ -22,11 +22,13 @@ func TestClientAddOffsetsToTxn(t *testing.T) { defer shutdown() err := clientCreateTopic(client, topic, 3) + defer deleteTopic(t, topic) if err != nil { t.Fatal(err) } ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + waitForTopic(ctx, t, topic) defer cancel() respc, err := waitForCoordinatorIndefinitely(ctx, client, &FindCoordinatorRequest{ Addr: client.Addr, diff --git a/reader_test.go b/reader_test.go index f413d742..64d45190 100644 --- a/reader_test.go +++ b/reader_test.go @@ -309,7 +309,7 @@ func createTopic(t *testing.T, topic string, partitions int) { ReplicationFactor: 1, }, }, - Timeout: milliseconds(time.Second), + Timeout: milliseconds(5 * time.Second), }) if err != nil { if !errors.Is(err, TopicAlreadyExists) { @@ -364,8 +364,8 @@ func waitForTopic(ctx context.Context, t *testing.T, topic string) { } } - t.Logf("retrying after 1s") - time.Sleep(time.Second) + t.Logf("retrying after 100ms") + time.Sleep(100 * time.Millisecond) continue } } @@ -1559,17 +1559,22 @@ func TestConsumerGroupWithGroupTopicsSingle(t *testing.T) { } } -func TestConsumerGroupWithGroupTopicsMultple(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) +func TestConsumerGroupWithGroupTopicsMultiple(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() client, shutdown := newLocalClient() defer shutdown() - + t1 := makeTopic() + createTopic(t, t1, 1) + defer deleteTopic(t, t1) + t2 := makeTopic() + createTopic(t, t2, 1) + defer deleteTopic(t, t2) conf := ReaderConfig{ Brokers: []string{"localhost:9092"}, GroupID: makeGroupID(), - GroupTopics: []string{makeTopic(), makeTopic()}, + GroupTopics: []string{t1, t2}, MaxWait: time.Second, PartitionWatchInterval: 100 * time.Millisecond, WatchPartitionChanges: true, diff --git a/scripts/wait-for-kafka.sh b/scripts/wait-for-kafka.sh new file mode 100755 index 00000000..5cd33655 --- /dev/null +++ b/scripts/wait-for-kafka.sh @@ -0,0 +1,20 @@ +#/bin/bash + +COUNTER=0; +echo foo | nc localhost 9092 +STATUS=$? +ATTEMPTS=60 +until [ ${STATUS} -eq 0 ] || [ "$COUNTER" -ge "${ATTEMPTS}" ]; +do + let COUNTER=$COUNTER+1; + sleep 1; + echo "[$COUNTER] waiting for 9092 port to be open"; + echo foo | nc localhost 9092 + STATUS=$? +done + +if [ "${COUNTER}" -gt "${ATTEMPTS}" ]; +then + echo "Kafka is not running, failing" + exit 1 +fi \ No newline at end of file diff --git a/txnoffsetcommit_test.go b/txnoffsetcommit_test.go index eb3d33cb..8562e896 100644 --- a/txnoffsetcommit_test.go +++ b/txnoffsetcommit_test.go @@ -21,6 +21,8 @@ func TestClientTxnOffsetCommit(t *testing.T) { client, shutdown := newLocalClientWithTopic(topic, 1) defer shutdown() + waitForTopic(context.TODO(), t, topic) + defer deleteTopic(t, topic) now := time.Now()