Skip to content

Commit

Permalink
KAFKA-12772: Move all transaction state transition rules into their s…
Browse files Browse the repository at this point in the history
…tates (#10667)

Co-authored-by: dengziming <[email protected]>
  • Loading branch information
dengziming and dengziming authored May 12, 2021
1 parent 7ef3879 commit 2debbb9
Showing 1 changed file with 11 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ private[transaction] sealed trait TransactionState {
* Get the name of this state. This is exposed through the `DescribeTransactions` API.
*/
def name: String

def validPreviousStates: Set[TransactionState]
}

/**
Expand All @@ -75,6 +77,7 @@ private[transaction] sealed trait TransactionState {
private[transaction] case object Empty extends TransactionState {
val id: Byte = 0
val name: String = "Empty"
val validPreviousStates: Set[TransactionState] = Set(Empty, CompleteCommit, CompleteAbort)
}

/**
Expand All @@ -88,6 +91,7 @@ private[transaction] case object Empty extends TransactionState {
private[transaction] case object Ongoing extends TransactionState {
val id: Byte = 1
val name: String = "Ongoing"
val validPreviousStates: Set[TransactionState] = Set(Ongoing, Empty, CompleteCommit, CompleteAbort)
}

/**
Expand All @@ -98,6 +102,7 @@ private[transaction] case object Ongoing extends TransactionState {
private[transaction] case object PrepareCommit extends TransactionState {
val id: Byte = 2
val name: String = "PrepareCommit"
val validPreviousStates: Set[TransactionState] = Set(Ongoing)
}

/**
Expand All @@ -108,6 +113,7 @@ private[transaction] case object PrepareCommit extends TransactionState {
private[transaction] case object PrepareAbort extends TransactionState {
val id: Byte = 3
val name: String = "PrepareAbort"
val validPreviousStates: Set[TransactionState] = Set(Ongoing, PrepareEpochFence)
}

/**
Expand All @@ -118,6 +124,7 @@ private[transaction] case object PrepareAbort extends TransactionState {
private[transaction] case object CompleteCommit extends TransactionState {
val id: Byte = 4
val name: String = "CompleteCommit"
val validPreviousStates: Set[TransactionState] = Set(PrepareCommit)
}

/**
Expand All @@ -128,6 +135,7 @@ private[transaction] case object CompleteCommit extends TransactionState {
private[transaction] case object CompleteAbort extends TransactionState {
val id: Byte = 5
val name: String = "CompleteAbort"
val validPreviousStates: Set[TransactionState] = Set(PrepareAbort)
}

/**
Expand All @@ -136,6 +144,7 @@ private[transaction] case object CompleteAbort extends TransactionState {
private[transaction] case object Dead extends TransactionState {
val id: Byte = 6
val name: String = "Dead"
val validPreviousStates: Set[TransactionState] = Set(Empty, CompleteAbort, CompleteCommit)
}

/**
Expand All @@ -145,6 +154,7 @@ private[transaction] case object Dead extends TransactionState {
private[transaction] case object PrepareEpochFence extends TransactionState {
val id: Byte = 7
val name: String = "PrepareEpochFence"
val validPreviousStates: Set[TransactionState] = Set(Ongoing)
}

private[transaction] object TransactionMetadata {
Expand All @@ -162,20 +172,6 @@ private[transaction] object TransactionMetadata {
new TransactionMetadata(transactionalId, producerId, lastProducerId, producerEpoch, lastProducerEpoch,
txnTimeoutMs, state, collection.mutable.Set.empty[TopicPartition], timestamp, timestamp)

def isValidTransition(oldState: TransactionState, newState: TransactionState): Boolean =
TransactionMetadata.validPreviousStates(newState).contains(oldState)

private val validPreviousStates: Map[TransactionState, Set[TransactionState]] =
Map(Empty -> Set(Empty, CompleteCommit, CompleteAbort),
Ongoing -> Set(Ongoing, Empty, CompleteCommit, CompleteAbort),
PrepareCommit -> Set(Ongoing),
PrepareAbort -> Set(Ongoing, PrepareEpochFence),
CompleteCommit -> Set(PrepareCommit),
CompleteAbort -> Set(PrepareAbort),
Dead -> Set(Empty, CompleteAbort, CompleteCommit),
PrepareEpochFence -> Set(Ongoing)
)

def isEpochExhausted(producerEpoch: Short): Boolean = producerEpoch >= Short.MaxValue - 1
}

Expand Down Expand Up @@ -385,7 +381,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
throw new IllegalArgumentException(s"Illegal new producer epoch $newEpoch")

// check that the new state transition is valid and update the pending state if necessary
if (TransactionMetadata.validPreviousStates(newState).contains(state)) {
if (newState.validPreviousStates.contains(state)) {
val transitMetadata = TxnTransitMetadata(newProducerId, producerId, newEpoch, newLastEpoch, newTxnTimeoutMs, newState,
newTopicPartitions, newTxnStartTimestamp, updateTimestamp)
debug(s"TransactionalId $transactionalId prepare transition from $state to $transitMetadata")
Expand Down

0 comments on commit 2debbb9

Please sign in to comment.