From 7ef38794291fc5d2317a3c3e33b463e2525b12a9 Mon Sep 17 00:00:00 2001 From: Satish Duggana Date: Tue, 11 May 2021 22:28:28 +0530 Subject: [PATCH] KAFKA-12758 Added `server-common` module to have server side common classes. (#10638) Added server-common module to have server side common classes. Moved ApiMessageAndVersion, RecordSerde, AbstractApiMessageSerde, and BytesApiMessageSerde to server-common module. Reivewers: Kowshik Prakasam , Jun Rao --- build.gradle | 64 ++++++++++++++++++- checkstyle/import-control-core.xml | 1 + checkstyle/import-control.xml | 18 ++++-- .../main/scala/kafka/raft/RaftManager.scala | 5 +- .../scala/kafka/server/ControllerApis.scala | 4 +- .../scala/kafka/server/ControllerServer.scala | 4 +- .../scala/kafka/server/KafkaRaftServer.scala | 3 +- .../scala/kafka/tools/TestRaftServer.scala | 3 +- .../kafka/testkit/KafkaClusterTestKit.java | 2 +- .../kafka/raft/KafkaMetadataLogTest.scala | 4 +- .../kafka/server/ControllerApisTest.scala | 5 +- .../kafka/tools/DumpLogSegmentsTest.scala | 3 +- .../controller/ClientQuotaControlManager.java | 2 +- .../controller/ClusterControlManager.java | 2 +- .../ConfigurationControlManager.java | 2 +- .../kafka/controller/ControllerResult.java | 2 +- .../controller/ControllerResultAndOffset.java | 2 +- .../controller/FeatureControlManager.java | 2 +- .../kafka/controller/NoOpSnapshotWriter.java | 2 +- .../kafka/controller/QuorumController.java | 2 +- .../controller/ReplicationControlManager.java | 2 +- .../kafka/controller/SnapshotGenerator.java | 2 +- .../kafka/controller/SnapshotWriter.java | 2 +- .../apache/kafka/metalog/MetaLogManager.java | 2 +- .../ClientQuotaControlManagerTest.java | 2 +- .../controller/ClusterControlManagerTest.java | 2 +- .../ConfigurationControlManagerTest.java | 2 +- .../kafka/controller/ControllerTestUtils.java | 2 +- .../controller/FeatureControlManagerTest.java | 2 +- .../kafka/controller/MockSnapshotWriter.java | 2 +- .../controller/QuorumControllerTest.java | 2 +- .../ReplicationControlManagerTest.java | 2 +- .../controller/SnapshotGeneratorTest.java | 2 +- .../apache/kafka/metalog/LocalLogManager.java | 2 +- .../kafka/metalog/LocalLogManagerTest.java | 2 +- .../org/apache/kafka/raft/BatchReader.java | 2 +- .../apache/kafka/raft/KafkaRaftClient.java | 1 + .../raft/internals/BatchAccumulator.java | 2 +- .../kafka/raft/internals/BatchBuilder.java | 2 +- .../raft/internals/RecordsBatchReader.java | 2 +- .../kafka/raft/internals/RecordsIterator.java | 2 +- .../kafka/raft/internals/StringSerde.java | 2 +- .../kafka/raft/metadata/MetaLogRaftShim.java | 2 +- .../raft/metadata/MetadataRecordSerde.java | 1 + .../apache/kafka/snapshot/SnapshotReader.java | 2 +- .../apache/kafka/snapshot/SnapshotWriter.java | 2 +- .../kafka/raft/RaftClientTestContext.java | 1 + .../kafka/raft/RaftEventSimulationTest.java | 1 + .../raft/internals/RecordsIteratorTest.java | 2 +- .../metadata/MetadataRecordSerdeTest.java | 2 +- .../server/common}/ApiMessageAndVersion.java | 2 +- .../AbstractApiMessageSerde.java | 5 +- .../serialization/BytesApiMessageSerde.java | 14 ++-- .../common/serialization}/RecordSerde.java | 2 +- settings.gradle | 1 + .../kafka/shell/MetadataNodeManager.java | 2 +- .../org/apache/kafka/shell/MetadataShell.java | 2 +- .../kafka/shell/SnapshotFileReader.java | 2 +- .../serialization/RemoteLogMetadataSerde.java | 3 +- .../RemoteLogMetadataTransform.java | 2 +- .../RemoteLogSegmentMetadataTransform.java | 2 +- ...moteLogSegmentMetadataUpdateTransform.java | 2 +- ...emotePartitionDeleteMetadataTransform.java | 2 +- .../RemoteLogMetadataTransformTest.java | 2 +- 64 files changed, 152 insertions(+), 79 deletions(-) rename {metadata/src/main/java/org/apache/kafka/metadata => server-common/src/main/java/org/apache/kafka/server/common}/ApiMessageAndVersion.java (97%) rename {raft/src/main/java/org/apache/kafka/raft/metadata => server-common/src/main/java/org/apache/kafka/server/common/serialization}/AbstractApiMessageSerde.java (96%) rename {storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage => server-common/src/main/java/org/apache/kafka/server/common}/serialization/BytesApiMessageSerde.java (84%) rename {raft/src/main/java/org/apache/kafka/raft => server-common/src/main/java/org/apache/kafka/server/common/serialization}/RecordSerde.java (97%) diff --git a/build.gradle b/build.gradle index c01d99490336..b10fc53eab3c 100644 --- a/build.gradle +++ b/build.gradle @@ -779,6 +779,7 @@ project(':core') { api project(':clients') api libs.scalaLibrary + implementation project(':server-common') implementation project(':metadata') implementation project(':raft') implementation project(':storage') @@ -1066,6 +1067,7 @@ project(':metadata') { archivesBaseName = "kafka-metadata" dependencies { + implementation project(':server-common') implementation project(':clients') implementation libs.jacksonDatabind implementation libs.jacksonJDK8Datatypes @@ -1263,11 +1265,13 @@ project(':raft') { archivesBaseName = "kafka-raft" dependencies { + implementation project(':server-common') implementation project(':clients') implementation project(':metadata') implementation libs.slf4jApi implementation libs.jacksonDatabind + testImplementation project(':server-common') testImplementation project(':clients') testImplementation project(':clients').sourceSets.test.output testImplementation libs.junitJupiter @@ -1341,6 +1345,62 @@ project(':raft') { } } +project(':server-common') { + archivesBaseName = "kafka-server-common" + + dependencies { + api project(':clients') + implementation libs.slf4jApi + + testImplementation project(':clients') + testImplementation project(':clients').sourceSets.test.output + testImplementation libs.junitJupiter + testImplementation libs.mockitoCore + + testRuntimeOnly libs.slf4jlog4j + } + + task createVersionFile(dependsOn: determineCommitId) { + ext.receiptFile = file("$buildDir/kafka/$buildVersionFileName") + outputs.file receiptFile + outputs.upToDateWhen { false } + doLast { + def data = [ + commitId: commitId, + version: version, + ] + + receiptFile.parentFile.mkdirs() + def content = data.entrySet().collect { "$it.key=$it.value" }.sort().join("\n") + receiptFile.setText(content, "ISO-8859-1") + } + } + + sourceSets { + main { + java { + srcDirs = ["src/main/java"] + } + } + test { + java { + srcDirs = ["src/test/java"] + } + } + } + + jar { + dependsOn createVersionFile + from("$buildDir") { + include "kafka/$buildVersionFileName" + } + } + + clean.doFirst { + delete "$buildDir/kafka/" + } +} + project(':storage:api') { archivesBaseName = "kafka-storage-api" @@ -1406,9 +1466,8 @@ project(':storage') { dependencies { implementation project(':storage:api') + implementation project(':server-common') implementation project(':clients') - implementation project(':metadata') - implementation project(':raft') implementation libs.slf4jApi implementation libs.jacksonDatabind @@ -1581,6 +1640,7 @@ project(':shell') { implementation libs.jacksonJDK8Datatypes implementation libs.jline implementation libs.slf4jApi + implementation project(':server-common') implementation project(':clients') implementation project(':core') implementation project(':log4j-appender') diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index 7d6374c5b0ad..b2037b784404 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -54,6 +54,7 @@ + diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 38db3966fac0..318384fca9c0 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -221,6 +221,7 @@ + @@ -231,6 +232,7 @@ + @@ -240,6 +242,7 @@ + @@ -266,13 +269,16 @@ - - + + + + + + - - + @@ -287,6 +293,7 @@ + @@ -410,6 +417,8 @@ + + @@ -418,6 +427,7 @@ + diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index ee06a95a78ab..f0d37be8a683 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -21,7 +21,6 @@ import java.nio.file.Files import java.util import java.util.OptionalInt import java.util.concurrent.CompletableFuture - import kafka.log.Log import kafka.raft.KafkaRaftManager.RaftIoThread import kafka.server.{KafkaConfig, MetaProperties} @@ -37,8 +36,8 @@ import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, NON_ROUTABLE_ADDRESS, UnknownAddressSpec} -import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, RaftClient, RaftConfig, RaftRequest, RecordSerde} - +import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, RaftClient, RaftConfig, RaftRequest} +import org.apache.kafka.server.common.serialization.RecordSerde import scala.jdk.CollectionConverters._ object KafkaRaftManager { diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index cf5be56a9540..b68da34b0084 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -21,7 +21,6 @@ import java.util import java.util.Collections import java.util.Map.Entry import java.util.concurrent.{CompletableFuture, ExecutionException} - import kafka.network.RequestChannel import kafka.raft.RaftManager import kafka.server.QuotaFactory.QuotaManagers @@ -50,8 +49,9 @@ import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC} import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{Node, Uuid} import org.apache.kafka.controller.Controller -import org.apache.kafka.metadata.{ApiMessageAndVersion, BrokerHeartbeatReply, BrokerRegistrationReply, VersionRange} +import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply, VersionRange} import org.apache.kafka.server.authorizer.Authorizer +import org.apache.kafka.server.common.ApiMessageAndVersion import scala.jdk.CollectionConverters._ diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 6a2844af8a25..638e23ac84c4 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -20,7 +20,6 @@ package kafka.server import java.util.concurrent.{CompletableFuture, TimeUnit} import java.util import java.util.concurrent.locks.ReentrantLock - import kafka.cluster.Broker.ServerInfo import kafka.log.LogConfig import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector} @@ -37,11 +36,12 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.common.{ClusterResource, Endpoint} import org.apache.kafka.controller.{Controller, QuorumController, QuorumControllerMetrics} -import org.apache.kafka.metadata.{ApiMessageAndVersion, VersionRange} +import org.apache.kafka.metadata.VersionRange import org.apache.kafka.metalog.MetaLogManager import org.apache.kafka.raft.RaftConfig import org.apache.kafka.raft.RaftConfig.AddressSpec import org.apache.kafka.server.authorizer.Authorizer +import org.apache.kafka.server.common.ApiMessageAndVersion import scala.jdk.CollectionConverters._ diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala index e1fc81f09ab8..1f96eaa7bacf 100644 --- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala +++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala @@ -18,7 +18,6 @@ package kafka.server import java.io.File import java.util.concurrent.CompletableFuture - import kafka.common.{InconsistentNodeIdException, KafkaException} import kafka.log.Log import kafka.metrics.{KafkaMetricsReporter, KafkaYammerMetrics} @@ -27,9 +26,9 @@ import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole} import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties} import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.common.utils.{AppInfoParser, Time} -import org.apache.kafka.metadata.ApiMessageAndVersion import org.apache.kafka.raft.RaftConfig import org.apache.kafka.raft.metadata.{MetaLogRaftShim, MetadataRecordSerde} +import org.apache.kafka.server.common.ApiMessageAndVersion import scala.collection.Seq diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index 3d432159ce4c..c21f4b7e86c6 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -35,7 +35,8 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{TopicPartition, Uuid, protocol} -import org.apache.kafka.raft.{Batch, BatchReader, RaftClient, RaftConfig, RecordSerde} +import org.apache.kafka.raft.{Batch, BatchReader, RaftClient, RaftConfig} +import org.apache.kafka.server.common.serialization.RecordSerde import org.apache.kafka.snapshot.SnapshotReader import scala.jdk.CollectionConverters._ diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java index 77e6a54fff8b..87083a240c2d 100644 --- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java +++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java @@ -36,7 +36,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.controller.Controller; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.metalog.MetaLogManager; import org.apache.kafka.raft.RaftConfig; import org.apache.kafka.raft.metadata.MetaLogRaftShim; diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala index c54fd4780b62..f9ec73e453f7 100644 --- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala +++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala @@ -20,7 +20,6 @@ import java.io.File import java.nio.ByteBuffer import java.nio.file.{Files, Path} import java.util.{Collections, Optional} - import kafka.log.Log import kafka.server.KafkaRaftServer import kafka.utils.{MockTime, TestUtils} @@ -30,7 +29,8 @@ import org.apache.kafka.common.protocol.{ObjectSerializationCache, Writable} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} import org.apache.kafka.common.utils.Utils import org.apache.kafka.raft.internals.BatchBuilder -import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo, LogOffsetMetadata, OffsetAndEpoch, RecordSerde, ReplicatedLog, ValidOffsetAndEpoch} +import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo, LogOffsetMetadata, OffsetAndEpoch, ReplicatedLog, ValidOffsetAndEpoch} +import org.apache.kafka.server.common.serialization.RecordSerde import org.apache.kafka.snapshot.{SnapshotPath, Snapshots} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala index 3ed4faa2a01a..b0fa2b36b940 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala @@ -21,7 +21,6 @@ import java.net.InetAddress import java.util import java.util.Properties import java.util.concurrent.ExecutionException - import kafka.network.RequestChannel import kafka.raft.RaftManager import kafka.server.QuotaFactory.QuotaManagers @@ -43,7 +42,7 @@ import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicRe import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult import org.apache.kafka.common.message._ -import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResourceCollection, AlterableConfig, AlterableConfigCollection, AlterConfigsResource} +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterConfigsResourceCollection, AlterableConfig, AlterableConfigCollection} import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResourceCollection => OldAlterConfigsResourceCollection} import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => OldAlterConfigsResource} import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfigCollection => OldAlterableConfigCollection} @@ -56,8 +55,8 @@ import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.controller.Controller -import org.apache.kafka.metadata.ApiMessageAndVersion import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer} +import org.apache.kafka.server.common.ApiMessageAndVersion import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} import org.mockito.ArgumentMatchers._ diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala index 256262a99b4e..965dd2d64fb7 100644 --- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala @@ -21,7 +21,6 @@ import java.io.{ByteArrayOutputStream, File, PrintWriter} import java.nio.ByteBuffer import java.util import java.util.Properties - import kafka.log.{Log, LogConfig, LogManager, LogTest} import kafka.server.{BrokerTopicStats, LogDirFailureChannel} import kafka.tools.DumpLogSegments.TimeIndexDumpErrors @@ -31,8 +30,8 @@ import org.apache.kafka.common.metadata.{PartitionChangeRecord, RegisterBrokerRe import org.apache.kafka.common.protocol.{ByteBufferAccessor, ObjectSerializationCache} import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} import org.apache.kafka.common.utils.Utils -import org.apache.kafka.metadata.ApiMessageAndVersion import org.apache.kafka.raft.metadata.MetadataRecordSerde +import org.apache.kafka.server.common.ApiMessageAndVersion import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java index c1a98016f4e4..317d58e097db 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java @@ -26,7 +26,7 @@ import org.apache.kafka.common.quota.ClientQuotaAlteration; import org.apache.kafka.common.quota.ClientQuotaEntity; import org.apache.kafka.common.requests.ApiError; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index 312d788ef930..6d34024ced92 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -33,7 +33,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.metadata.BrokerRegistration; import org.apache.kafka.metadata.BrokerRegistrationReply; import org.apache.kafka.metadata.FeatureMapAndEpoch; diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index b53926e21fb6..6820d6b00a45 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -27,7 +27,7 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; import org.slf4j.Logger; diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java index e6ae031b9b3b..d130de59282d 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java @@ -17,7 +17,7 @@ package org.apache.kafka.controller; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import java.util.Collections; import java.util.List; diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java index 8b8ca8dea80d..1b725653d3af 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java @@ -17,7 +17,7 @@ package org.apache.kafka.controller; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import java.util.Objects; import java.util.stream.Collectors; diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java index fb6d7dbcce74..42a82502a22f 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java @@ -31,7 +31,7 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.metadata.FeatureMap; import org.apache.kafka.metadata.FeatureMapAndEpoch; import org.apache.kafka.metadata.VersionRange; diff --git a/metadata/src/main/java/org/apache/kafka/controller/NoOpSnapshotWriter.java b/metadata/src/main/java/org/apache/kafka/controller/NoOpSnapshotWriter.java index 0263dd556a58..1a4d1825e0c2 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/NoOpSnapshotWriter.java +++ b/metadata/src/main/java/org/apache/kafka/controller/NoOpSnapshotWriter.java @@ -19,7 +19,7 @@ import java.io.IOException; import java.util.List; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; /** diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 86faa5ede8e6..2645b8615ce6 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -74,7 +74,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.controller.SnapshotGenerator.Section; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.metadata.BrokerHeartbeatReply; import org.apache.kafka.metadata.BrokerRegistrationReply; import org.apache.kafka.metadata.FeatureMapAndEpoch; diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index ea94a0052d34..b270e35205ad 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -60,7 +60,7 @@ import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.controller.BrokersToIsrs.TopicIdPartition; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.metadata.BrokerHeartbeatReply; import org.apache.kafka.metadata.BrokerRegistration; import org.apache.kafka.timeline.SnapshotRegistry; diff --git a/metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java b/metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java index 91295c2a2f76..f182d28f158f 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java +++ b/metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java @@ -24,7 +24,7 @@ import org.apache.kafka.common.utils.ExponentialBackoff; import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.slf4j.Logger; diff --git a/metadata/src/main/java/org/apache/kafka/controller/SnapshotWriter.java b/metadata/src/main/java/org/apache/kafka/controller/SnapshotWriter.java index 595f4c181b2f..93a429d97642 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/SnapshotWriter.java +++ b/metadata/src/main/java/org/apache/kafka/controller/SnapshotWriter.java @@ -19,7 +19,7 @@ import java.io.IOException; import java.util.List; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; interface SnapshotWriter extends AutoCloseable { diff --git a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java index 9126245ef385..1ba358c4f06c 100644 --- a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java +++ b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java @@ -17,7 +17,7 @@ package org.apache.kafka.metalog; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import java.util.List; diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClientQuotaControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClientQuotaControlManagerTest.java index 47726964c2e1..ffe891d09768 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ClientQuotaControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ClientQuotaControlManagerTest.java @@ -25,7 +25,7 @@ import org.apache.kafka.common.quota.ClientQuotaEntity; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java index 9b9ddb0cc1a7..04fc8168a952 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java @@ -34,7 +34,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.metadata.BrokerRegistration; import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.Test; diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java index 608b428f68a8..4afbf7d2c880 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; import java.util.AbstractMap.SimpleImmutableEntry; diff --git a/metadata/src/test/java/org/apache/kafka/controller/ControllerTestUtils.java b/metadata/src/test/java/org/apache/kafka/controller/ControllerTestUtils.java index 10def7ecb352..5ad037a647a2 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ControllerTestUtils.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ControllerTestUtils.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.Message; import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import java.lang.reflect.Field; import java.lang.reflect.Method; diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java index 0ee5ee1eb3f2..f417be6aed9f 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java @@ -27,7 +27,7 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.metadata.FeatureMap; import org.apache.kafka.metadata.FeatureMapAndEpoch; import org.apache.kafka.metadata.VersionRange; diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockSnapshotWriter.java b/metadata/src/test/java/org/apache/kafka/controller/MockSnapshotWriter.java index 76b493448239..de481a0f7f93 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/MockSnapshotWriter.java +++ b/metadata/src/test/java/org/apache/kafka/controller/MockSnapshotWriter.java @@ -18,7 +18,7 @@ package org.apache.kafka.controller; import org.apache.kafka.common.protocol.ApiMessage; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import java.io.IOException; import java.util.ArrayList; diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index 8c64cec8bd71..b6eea362c902 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -48,7 +48,7 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.controller.BrokersToIsrs.TopicIdPartition; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.metadata.BrokerHeartbeatReply; import org.apache.kafka.metadata.BrokerRegistrationReply; import org.apache.kafka.metalog.LocalLogManagerTestEnv; diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 2a456be830bb..f7e0277eb87b 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -43,7 +43,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.metadata.BrokerHeartbeatReply; import org.apache.kafka.metadata.BrokerRegistration; import org.apache.kafka.timeline.SnapshotRegistry; diff --git a/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java b/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java index e8346211f161..a2e703153b55 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java @@ -24,7 +24,7 @@ import org.apache.kafka.common.utils.ExponentialBackoff; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.controller.SnapshotGenerator.Section; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java index e58848eda4ed..6442bb9dbaad 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.queue.EventQueue; import org.apache.kafka.queue.KafkaEventQueue; import org.slf4j.Logger; diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java index ac578fb63580..e13ebfe95352 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java @@ -18,7 +18,7 @@ package org.apache.kafka.metalog; import org.apache.kafka.common.metadata.RegisterBrokerRecord; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; diff --git a/raft/src/main/java/org/apache/kafka/raft/BatchReader.java b/raft/src/main/java/org/apache/kafka/raft/BatchReader.java index 6469af2095f5..79e6614e2e7c 100644 --- a/raft/src/main/java/org/apache/kafka/raft/BatchReader.java +++ b/raft/src/main/java/org/apache/kafka/raft/BatchReader.java @@ -28,7 +28,7 @@ * of the Raft IO thread. This helps to ensure that a slow state machine will not * affect replication. * - * @param record type (see {@link org.apache.kafka.raft.RecordSerde}) + * @param record type (see {@link org.apache.kafka.server.common.serialization.RecordSerde}) */ public interface BatchReader extends Iterator>, AutoCloseable { diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 1c759514cd6c..190f39bb7207 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -68,6 +68,7 @@ import org.apache.kafka.raft.internals.MemoryBatchReader; import org.apache.kafka.raft.internals.RecordsBatchReader; import org.apache.kafka.raft.internals.ThresholdPurgatory; +import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.snapshot.RawSnapshotReader; import org.apache.kafka.snapshot.RawSnapshotWriter; import org.apache.kafka.snapshot.SnapshotReader; diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java index 84f744fc7826..939cfc118f25 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java @@ -22,7 +22,7 @@ import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.raft.RecordSerde; +import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.common.message.LeaderChangeMessage; import java.io.Closeable; diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java index 93dc6546eb73..4a7a24398db1 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java @@ -28,7 +28,7 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.ByteUtils; -import org.apache.kafka.raft.RecordSerde; +import org.apache.kafka.server.common.serialization.RecordSerde; import java.io.DataOutputStream; import java.nio.ByteBuffer; diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java index a55815d0b5e5..e95206100a30 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.raft.Batch; import org.apache.kafka.raft.BatchReader; -import org.apache.kafka.raft.RecordSerde; +import org.apache.kafka.server.common.serialization.RecordSerde; import java.util.NoSuchElementException; import java.util.Optional; diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java index 46155b57ee4f..c12a0849915b 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java @@ -33,7 +33,7 @@ import org.apache.kafka.common.record.Records; import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.raft.Batch; -import org.apache.kafka.raft.RecordSerde; +import org.apache.kafka.server.common.serialization.RecordSerde; public final class RecordsIterator implements Iterator>, AutoCloseable { private final Records records; diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/StringSerde.java b/raft/src/main/java/org/apache/kafka/raft/internals/StringSerde.java index cf096dfe69a9..c2a011a687db 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/StringSerde.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/StringSerde.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.protocol.Readable; import org.apache.kafka.common.protocol.Writable; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.raft.RecordSerde; +import org.apache.kafka.server.common.serialization.RecordSerde; public class StringSerde implements RecordSerde { diff --git a/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java b/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java index 0bcf2c67055b..dba84123466a 100644 --- a/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java +++ b/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java @@ -17,7 +17,7 @@ package org.apache.kafka.raft.metadata; import org.apache.kafka.common.protocol.ApiMessage; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.metalog.MetaLogLeader; import org.apache.kafka.metalog.MetaLogListener; import org.apache.kafka.metalog.MetaLogManager; diff --git a/raft/src/main/java/org/apache/kafka/raft/metadata/MetadataRecordSerde.java b/raft/src/main/java/org/apache/kafka/raft/metadata/MetadataRecordSerde.java index 5250d3927b2d..123de4c4853d 100644 --- a/raft/src/main/java/org/apache/kafka/raft/metadata/MetadataRecordSerde.java +++ b/raft/src/main/java/org/apache/kafka/raft/metadata/MetadataRecordSerde.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.metadata.MetadataRecordType; import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.server.common.serialization.AbstractApiMessageSerde; public class MetadataRecordSerde extends AbstractApiMessageSerde { diff --git a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java index af00cdb2286b..0ddecd704e6a 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java @@ -21,7 +21,7 @@ import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.raft.Batch; import org.apache.kafka.raft.OffsetAndEpoch; -import org.apache.kafka.raft.RecordSerde; +import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.raft.internals.RecordsIterator; /** diff --git a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java index e9b3c64e0362..a61b333c3bb8 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java @@ -21,7 +21,7 @@ import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.utils.Time; import org.apache.kafka.raft.OffsetAndEpoch; -import org.apache.kafka.raft.RecordSerde; +import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.raft.internals.BatchAccumulator; import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch; diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index a70e5b638225..c4bf9199e5b1 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -55,6 +55,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.raft.internals.BatchBuilder; import org.apache.kafka.raft.internals.StringSerde; +import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.snapshot.RawSnapshotWriter; import org.apache.kafka.snapshot.SnapshotReader; import org.apache.kafka.test.TestCondition; diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java index 32e701a3854e..d0016614b389 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.raft.MockLog.LogBatch; import org.apache.kafka.raft.MockLog.LogEntry; import org.apache.kafka.raft.internals.BatchMemoryPool; +import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.snapshot.SnapshotReader; import org.junit.jupiter.api.Tag; diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java index e450b5266440..4852c1f21035 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java @@ -35,7 +35,7 @@ import org.apache.kafka.common.record.Records; import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.raft.Batch; -import org.apache.kafka.raft.RecordSerde; +import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; diff --git a/raft/src/test/java/org/apache/kafka/raft/metadata/MetadataRecordSerdeTest.java b/raft/src/test/java/org/apache/kafka/raft/metadata/MetadataRecordSerdeTest.java index 2071814ed953..de844e6e75ff 100644 --- a/raft/src/test/java/org/apache/kafka/raft/metadata/MetadataRecordSerdeTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/metadata/MetadataRecordSerdeTest.java @@ -22,7 +22,7 @@ import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.ObjectSerializationCache; import org.apache.kafka.common.utils.ByteUtils; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.Test; import java.nio.ByteBuffer; diff --git a/metadata/src/main/java/org/apache/kafka/metadata/ApiMessageAndVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/ApiMessageAndVersion.java similarity index 97% rename from metadata/src/main/java/org/apache/kafka/metadata/ApiMessageAndVersion.java rename to server-common/src/main/java/org/apache/kafka/server/common/ApiMessageAndVersion.java index 75ccb4807b1d..66a625be5ef3 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/ApiMessageAndVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/ApiMessageAndVersion.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kafka.metadata; +package org.apache.kafka.server.common; import org.apache.kafka.common.protocol.ApiMessage; diff --git a/raft/src/main/java/org/apache/kafka/raft/metadata/AbstractApiMessageSerde.java b/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java similarity index 96% rename from raft/src/main/java/org/apache/kafka/raft/metadata/AbstractApiMessageSerde.java rename to server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java index fea70a7ce41c..67c067d1a2b0 100644 --- a/raft/src/main/java/org/apache/kafka/raft/metadata/AbstractApiMessageSerde.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.raft.metadata; +package org.apache.kafka.server.common.serialization; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.protocol.ApiMessage; @@ -22,8 +22,7 @@ import org.apache.kafka.common.protocol.Readable; import org.apache.kafka.common.protocol.Writable; import org.apache.kafka.common.utils.ByteUtils; -import org.apache.kafka.metadata.ApiMessageAndVersion; -import org.apache.kafka.raft.RecordSerde; +import org.apache.kafka.server.common.ApiMessageAndVersion; /** * This is an implementation of {@code RecordSerde} with {@link ApiMessageAndVersion} but implementors need to implement diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/BytesApiMessageSerde.java b/server-common/src/main/java/org/apache/kafka/server/common/serialization/BytesApiMessageSerde.java similarity index 84% rename from storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/BytesApiMessageSerde.java rename to server-common/src/main/java/org/apache/kafka/server/common/serialization/BytesApiMessageSerde.java index 65973b0e920c..668bbfb24886 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/BytesApiMessageSerde.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/serialization/BytesApiMessageSerde.java @@ -14,14 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.server.log.remote.metadata.storage.serialization; +package org.apache.kafka.server.common.serialization; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.ObjectSerializationCache; import org.apache.kafka.common.protocol.Readable; -import org.apache.kafka.metadata.ApiMessageAndVersion; -import org.apache.kafka.raft.metadata.AbstractApiMessageSerde; +import org.apache.kafka.server.common.ApiMessageAndVersion; + import java.nio.ByteBuffer; /** @@ -35,7 +35,7 @@ */ public abstract class BytesApiMessageSerde { - private final AbstractApiMessageSerde metadataRecordSerde = new AbstractApiMessageSerde() { + private final AbstractApiMessageSerde apiMessageSerde = new AbstractApiMessageSerde() { @Override public ApiMessage apiMessageFor(short apiKey) { return BytesApiMessageSerde.this.apiMessageFor(apiKey); @@ -44,9 +44,9 @@ public ApiMessage apiMessageFor(short apiKey) { public byte[] serialize(ApiMessageAndVersion messageAndVersion) { ObjectSerializationCache cache = new ObjectSerializationCache(); - int size = metadataRecordSerde.recordSize(messageAndVersion, cache); + int size = apiMessageSerde.recordSize(messageAndVersion, cache); ByteBufferAccessor writable = new ByteBufferAccessor(ByteBuffer.allocate(size)); - metadataRecordSerde.write(messageAndVersion, cache, writable); + apiMessageSerde.write(messageAndVersion, cache, writable); return writable.buffer().array(); } @@ -54,7 +54,7 @@ public byte[] serialize(ApiMessageAndVersion messageAndVersion) { public ApiMessageAndVersion deserialize(byte[] data) { Readable readable = new ByteBufferAccessor(ByteBuffer.wrap(data)); - return metadataRecordSerde.read(readable, data.length); + return apiMessageSerde.read(readable, data.length); } /** diff --git a/raft/src/main/java/org/apache/kafka/raft/RecordSerde.java b/server-common/src/main/java/org/apache/kafka/server/common/serialization/RecordSerde.java similarity index 97% rename from raft/src/main/java/org/apache/kafka/raft/RecordSerde.java rename to server-common/src/main/java/org/apache/kafka/server/common/serialization/RecordSerde.java index 90a5e1ed286e..70642c652e97 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RecordSerde.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/serialization/RecordSerde.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.raft; +package org.apache.kafka.server.common.serialization; import org.apache.kafka.common.protocol.ObjectSerializationCache; import org.apache.kafka.common.protocol.Readable; diff --git a/settings.gradle b/settings.gradle index 8ed1d0eb19d0..f29866136b73 100644 --- a/settings.gradle +++ b/settings.gradle @@ -29,6 +29,7 @@ include 'clients', 'log4j-appender', 'metadata', 'raft', + 'server-common', 'shell', 'storage', 'storage:api', diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java index dde0e40defb2..55986f3c28d8 100644 --- a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java +++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java @@ -36,7 +36,6 @@ import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.metadata.ApiMessageAndVersion; import org.apache.kafka.metalog.MetaLogLeader; import org.apache.kafka.metalog.MetaLogListener; import org.apache.kafka.queue.EventQueue; @@ -44,6 +43,7 @@ import org.apache.kafka.raft.Batch; import org.apache.kafka.raft.BatchReader; import org.apache.kafka.raft.RaftClient; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.snapshot.SnapshotReader; import org.apache.kafka.shell.MetadataNode.DirectoryNode; import org.apache.kafka.shell.MetadataNode.FileNode; diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java index 9ba70d83064e..1d99623044e0 100644 --- a/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java +++ b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java @@ -24,7 +24,7 @@ import net.sourceforge.argparse4j.inf.Namespace; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java b/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java index 907b4db467ae..3078505980cc 100644 --- a/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java +++ b/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java @@ -26,12 +26,12 @@ import org.apache.kafka.common.record.Record; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.metadata.ApiMessageAndVersion; import org.apache.kafka.metalog.MetaLogLeader; import org.apache.kafka.metalog.MetaLogListener; import org.apache.kafka.raft.metadata.MetadataRecordSerde; import org.apache.kafka.queue.EventQueue; import org.apache.kafka.queue.KafkaEventQueue; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java index 4e68cf0b97f3..94975b91ea86 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java @@ -17,7 +17,8 @@ package org.apache.kafka.server.log.remote.metadata.storage.serialization; import org.apache.kafka.common.protocol.ApiMessage; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.serialization.BytesApiMessageSerde; import org.apache.kafka.server.log.remote.metadata.storage.generated.MetadataRecordType; import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord; diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataTransform.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataTransform.java index 8cb4c61af20f..b6e35820ae81 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataTransform.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataTransform.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.server.log.remote.metadata.storage.serialization; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; /** diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java index 29e7fc83e1cb..375c533d176e 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java @@ -18,7 +18,7 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java index c0f5b2efa258..4ad827725784 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java @@ -18,7 +18,7 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemotePartitionDeleteMetadataTransform.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemotePartitionDeleteMetadataTransform.java index 09d142c3d1f7..27f4f1f4c8b2 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemotePartitionDeleteMetadataTransform.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemotePartitionDeleteMetadataTransform.java @@ -18,7 +18,7 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord; import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState; diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java index ea7dae820c16..87e768332933 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java @@ -21,7 +21,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogSegmentMetadataTransform; import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogSegmentMetadataUpdateTransform; import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemotePartitionDeleteMetadataTransform;