Skip to content

Commit

Permalink
KAFKA-12758 Added server-common module to have server side common c…
Browse files Browse the repository at this point in the history
…lasses. (#10638)

Added server-common module to have server side common classes. Moved ApiMessageAndVersion, RecordSerde, AbstractApiMessageSerde, and BytesApiMessageSerde to server-common module.

Reivewers:  Kowshik Prakasam <[email protected]>, Jun Rao <[email protected]>
  • Loading branch information
satishd authored May 11, 2021
1 parent 80a468e commit 7ef3879
Show file tree
Hide file tree
Showing 64 changed files with 152 additions and 79 deletions.
64 changes: 62 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ project(':core') {
api project(':clients')
api libs.scalaLibrary

implementation project(':server-common')
implementation project(':metadata')
implementation project(':raft')
implementation project(':storage')
Expand Down Expand Up @@ -1066,6 +1067,7 @@ project(':metadata') {
archivesBaseName = "kafka-metadata"

dependencies {
implementation project(':server-common')
implementation project(':clients')
implementation libs.jacksonDatabind
implementation libs.jacksonJDK8Datatypes
Expand Down Expand Up @@ -1263,11 +1265,13 @@ project(':raft') {
archivesBaseName = "kafka-raft"

dependencies {
implementation project(':server-common')
implementation project(':clients')
implementation project(':metadata')
implementation libs.slf4jApi
implementation libs.jacksonDatabind

testImplementation project(':server-common')
testImplementation project(':clients')
testImplementation project(':clients').sourceSets.test.output
testImplementation libs.junitJupiter
Expand Down Expand Up @@ -1341,6 +1345,62 @@ project(':raft') {
}
}

project(':server-common') {
archivesBaseName = "kafka-server-common"

dependencies {
api project(':clients')
implementation libs.slf4jApi

testImplementation project(':clients')
testImplementation project(':clients').sourceSets.test.output
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore

testRuntimeOnly libs.slf4jlog4j
}

task createVersionFile(dependsOn: determineCommitId) {
ext.receiptFile = file("$buildDir/kafka/$buildVersionFileName")
outputs.file receiptFile
outputs.upToDateWhen { false }
doLast {
def data = [
commitId: commitId,
version: version,
]

receiptFile.parentFile.mkdirs()
def content = data.entrySet().collect { "$it.key=$it.value" }.sort().join("\n")
receiptFile.setText(content, "ISO-8859-1")
}
}

sourceSets {
main {
java {
srcDirs = ["src/main/java"]
}
}
test {
java {
srcDirs = ["src/test/java"]
}
}
}

jar {
dependsOn createVersionFile
from("$buildDir") {
include "kafka/$buildVersionFileName"
}
}

clean.doFirst {
delete "$buildDir/kafka/"
}
}

project(':storage:api') {
archivesBaseName = "kafka-storage-api"

Expand Down Expand Up @@ -1406,9 +1466,8 @@ project(':storage') {

dependencies {
implementation project(':storage:api')
implementation project(':server-common')
implementation project(':clients')
implementation project(':metadata')
implementation project(':raft')
implementation libs.slf4jApi
implementation libs.jacksonDatabind

Expand Down Expand Up @@ -1581,6 +1640,7 @@ project(':shell') {
implementation libs.jacksonJDK8Datatypes
implementation libs.jline
implementation libs.slf4jApi
implementation project(':server-common')
implementation project(':clients')
implementation project(':core')
implementation project(':log4j-appender')
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
<allow pkg="org.apache.kafka.test"/>
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.server.common" />
</subpackage>

<subpackage name="tools">
Expand Down
18 changes: 14 additions & 4 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
</subpackage>
Expand All @@ -231,6 +232,7 @@
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.test" />
</subpackage>

Expand All @@ -240,6 +242,7 @@
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.test" />
</subpackage>

Expand All @@ -266,13 +269,16 @@

<subpackage name="server">
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="com.fasterxml.jackson" />

<subpackage name="common">
<allow pkg="org.apache.kafka.server.common" />
</subpackage>

<subpackage name="log">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.log" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.raft.metadata" />
<allow pkg="org.apache.kafka.test" />
</subpackage>
</subpackage>

Expand All @@ -287,6 +293,7 @@
<allow pkg="org.apache.kafka.metalog"/>
<allow pkg="org.apache.kafka.queue"/>
<allow pkg="org.apache.kafka.raft"/>
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.shell"/>
<allow pkg="org.apache.kafka.snapshot"/>
<allow pkg="org.jline"/>
Expand Down Expand Up @@ -410,6 +417,8 @@
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.common.serialization" />
<allow pkg="org.apache.kafka.test"/>
<allow pkg="com.fasterxml.jackson" />
<allow pkg="net.jqwik"/>
Expand All @@ -418,6 +427,7 @@
<subpackage name="snapshot">
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.test"/>
</subpackage>

Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.nio.file.Files
import java.util
import java.util.OptionalInt
import java.util.concurrent.CompletableFuture

import kafka.log.Log
import kafka.raft.KafkaRaftManager.RaftIoThread
import kafka.server.{KafkaConfig, MetaProperties}
Expand All @@ -37,8 +36,8 @@ import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, NON_ROUTABLE_ADDRESS, UnknownAddressSpec}
import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, RaftClient, RaftConfig, RaftRequest, RecordSerde}

import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, RaftClient, RaftConfig, RaftRequest}
import org.apache.kafka.server.common.serialization.RecordSerde
import scala.jdk.CollectionConverters._

object KafkaRaftManager {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.util
import java.util.Collections
import java.util.Map.Entry
import java.util.concurrent.{CompletableFuture, ExecutionException}

import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
Expand Down Expand Up @@ -50,8 +49,9 @@ import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Node, Uuid}
import org.apache.kafka.controller.Controller
import org.apache.kafka.metadata.{ApiMessageAndVersion, BrokerHeartbeatReply, BrokerRegistrationReply, VersionRange}
import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply, VersionRange}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion

import scala.jdk.CollectionConverters._

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package kafka.server
import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util
import java.util.concurrent.locks.ReentrantLock

import kafka.cluster.Broker.ServerInfo
import kafka.log.LogConfig
import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
Expand All @@ -37,11 +36,12 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, Endpoint}
import org.apache.kafka.controller.{Controller, QuorumController, QuorumControllerMetrics}
import org.apache.kafka.metadata.{ApiMessageAndVersion, VersionRange}
import org.apache.kafka.metadata.VersionRange
import org.apache.kafka.metalog.MetaLogManager
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.raft.RaftConfig.AddressSpec
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion

import scala.jdk.CollectionConverters._

Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/server/KafkaRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package kafka.server

import java.io.File
import java.util.concurrent.CompletableFuture

import kafka.common.{InconsistentNodeIdException, KafkaException}
import kafka.log.Log
import kafka.metrics.{KafkaMetricsReporter, KafkaYammerMetrics}
Expand All @@ -27,9 +26,9 @@ import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole}
import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties}
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.utils.{AppInfoParser, Time}
import org.apache.kafka.metadata.ApiMessageAndVersion
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.raft.metadata.{MetaLogRaftShim, MetadataRecordSerde}
import org.apache.kafka.server.common.ApiMessageAndVersion

import scala.collection.Seq

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/tools/TestRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
import org.apache.kafka.raft.{Batch, BatchReader, RaftClient, RaftConfig, RecordSerde}
import org.apache.kafka.raft.{Batch, BatchReader, RaftClient, RaftConfig}
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.snapshot.SnapshotReader

import scala.jdk.CollectionConverters._
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.Controller;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.metalog.MetaLogManager;
import org.apache.kafka.raft.RaftConfig;
import org.apache.kafka.raft.metadata.MetaLogRaftShim;
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import java.io.File
import java.nio.ByteBuffer
import java.nio.file.{Files, Path}
import java.util.{Collections, Optional}

import kafka.log.Log
import kafka.server.KafkaRaftServer
import kafka.utils.{MockTime, TestUtils}
Expand All @@ -30,7 +29,8 @@ import org.apache.kafka.common.protocol.{ObjectSerializationCache, Writable}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.raft.internals.BatchBuilder
import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo, LogOffsetMetadata, OffsetAndEpoch, RecordSerde, ReplicatedLog, ValidOffsetAndEpoch}
import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo, LogOffsetMetadata, OffsetAndEpoch, ReplicatedLog, ValidOffsetAndEpoch}
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.snapshot.{SnapshotPath, Snapshots}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.net.InetAddress
import java.util
import java.util.Properties
import java.util.concurrent.ExecutionException

import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
Expand All @@ -43,7 +42,7 @@ import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicRe
import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState
import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult
import org.apache.kafka.common.message._
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResourceCollection, AlterableConfig, AlterableConfigCollection, AlterConfigsResource}
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterConfigsResourceCollection, AlterableConfig, AlterableConfigCollection}
import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResourceCollection => OldAlterConfigsResourceCollection}
import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => OldAlterConfigsResource}
import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfigCollection => OldAlterableConfigCollection}
Expand All @@ -56,8 +55,8 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.controller.Controller
import org.apache.kafka.metadata.ApiMessageAndVersion
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.mockito.ArgumentMatchers._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.io.{ByteArrayOutputStream, File, PrintWriter}
import java.nio.ByteBuffer
import java.util
import java.util.Properties

import kafka.log.{Log, LogConfig, LogManager, LogTest}
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import kafka.tools.DumpLogSegments.TimeIndexDumpErrors
Expand All @@ -31,8 +30,8 @@ import org.apache.kafka.common.metadata.{PartitionChangeRecord, RegisterBrokerRe
import org.apache.kafka.common.protocol.{ByteBufferAccessor, ObjectSerializationCache}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.ApiMessageAndVersion
import org.apache.kafka.raft.metadata.MetadataRecordSerde
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FeatureMapAndEpoch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.kafka.controller;

import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.server.common.ApiMessageAndVersion;

import java.util.Collections;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.kafka.controller;

import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.server.common.ApiMessageAndVersion;

import java.util.Objects;
import java.util.stream.Collectors;
Expand Down
Loading

0 comments on commit 7ef3879

Please sign in to comment.