Skip to content

Commit

Permalink
batcher: use abstract Queue type for blocks state (ethereum-optimism#…
Browse files Browse the repository at this point in the history
…12180)

* op-service: add queue package

* batcher: use Queue type for blocks

* revert changes to errors.As/Is

* implement and use Peek operation

* queue: add unit tests

* add godoc

* add more test cases

permute expected / got

* ensure enqueue and prepend are noops when args is empty

* use queue.PeekN and queue.DequeueN

* typo

* queue: simplify method implementations

* revert to old dequeue impl
  • Loading branch information
geoknee authored Oct 2, 2024
1 parent 1217d4a commit d1f6501
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 23 deletions.
45 changes: 24 additions & 21 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/queue"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
Expand All @@ -32,7 +33,7 @@ type channelManager struct {
rollupCfg *rollup.Config

// All blocks since the last request for new tx data.
blocks []*types.Block
blocks queue.Queue[*types.Block]
// The latest L1 block from all the L2 blocks in the most recently closed channel
l1OriginLastClosedChannel eth.BlockID
// The default ChannelConfig to use for the next channel
Expand Down Expand Up @@ -68,7 +69,7 @@ func (s *channelManager) Clear(l1OriginLastClosedChannel eth.BlockID) {
s.mu.Lock()
defer s.mu.Unlock()
s.log.Trace("clearing channel manager state")
s.blocks = s.blocks[:0]
s.blocks.Clear()
s.l1OriginLastClosedChannel = l1OriginLastClosedChannel
s.tip = common.Hash{}
s.closed = false
Expand Down Expand Up @@ -106,9 +107,11 @@ func (s *channelManager) TxConfirmed(_id txID, inclusionBlock eth.BlockID) {
if channel, ok := s.txChannels[id]; ok {
delete(s.txChannels, id)
done, blocks := channel.TxConfirmed(id, inclusionBlock)
s.blocks = append(blocks, s.blocks...)
if done {
s.removePendingChannel(channel)
if len(blocks) > 0 {
s.blocks.Prepend(blocks...)
}
}
} else {
s.log.Warn("transaction from unknown channel marked as confirmed", "id", id)
Expand Down Expand Up @@ -208,7 +211,7 @@ func (s *channelManager) getReadyChannel(l1Head eth.BlockID) (*channel, error) {
}

dataPending := firstWithTxData != nil
s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", len(s.blocks))
s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", s.blocks.Len())

// Short circuit if there is pending tx data or the channel manager is closed
if dataPending {
Expand All @@ -222,7 +225,7 @@ func (s *channelManager) getReadyChannel(l1Head eth.BlockID) (*channel, error) {
// No pending tx data, so we have to add new blocks to the channel

// If we have no saved blocks, we will not be able to create valid frames
if len(s.blocks) == 0 {
if s.blocks.Len() == 0 {
return nil, io.EOF
}

Expand Down Expand Up @@ -274,14 +277,14 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
"id", pc.ID(),
"l1Head", l1Head,
"l1OriginLastClosedChannel", s.l1OriginLastClosedChannel,
"blocks_pending", len(s.blocks),
"blocks_pending", s.blocks.Len(),
"batch_type", cfg.BatchType,
"compression_algo", cfg.CompressorConfig.CompressionAlgo,
"target_num_frames", cfg.TargetNumFrames,
"max_frame_size", cfg.MaxFrameSize,
"use_blobs", cfg.UseBlobs,
)
s.metr.RecordChannelOpened(pc.ID(), len(s.blocks))
s.metr.RecordChannelOpened(pc.ID(), s.blocks.Len())

return nil
}
Expand All @@ -304,7 +307,13 @@ func (s *channelManager) processBlocks() error {
_chFullErr *ChannelFullError // throw away, just for type checking
latestL2ref eth.L2BlockRef
)
for i, block := range s.blocks {

for i := 0; ; i++ {
block, ok := s.blocks.PeekN(i)
if !ok {
break
}

l1info, err := s.currentChannel.AddBlock(block)
if errors.As(err, &_chFullErr) {
// current block didn't get added because channel is already full
Expand All @@ -323,22 +332,16 @@ func (s *channelManager) processBlocks() error {
}
}

if blocksAdded == len(s.blocks) {
// all blocks processed, reuse slice
s.blocks = s.blocks[:0]
} else {
// remove processed blocks
s.blocks = s.blocks[blocksAdded:]
}
_, _ = s.blocks.DequeueN(blocksAdded)

s.metr.RecordL2BlocksAdded(latestL2ref,
blocksAdded,
len(s.blocks),
s.blocks.Len(),
s.currentChannel.InputBytes(),
s.currentChannel.ReadyBytes())
s.log.Debug("Added blocks to channel",
"blocks_added", blocksAdded,
"blocks_pending", len(s.blocks),
"blocks_pending", s.blocks.Len(),
"channel_full", s.currentChannel.IsFull(),
"input_bytes", s.currentChannel.InputBytes(),
"ready_bytes", s.currentChannel.ReadyBytes(),
Expand All @@ -363,7 +366,7 @@ func (s *channelManager) outputFrames() error {
inBytes, outBytes := s.currentChannel.InputBytes(), s.currentChannel.OutputBytes()
s.metr.RecordChannelClosed(
s.currentChannel.ID(),
len(s.blocks),
s.blocks.Len(),
s.currentChannel.TotalFrames(),
inBytes,
outBytes,
Expand All @@ -377,7 +380,7 @@ func (s *channelManager) outputFrames() error {

s.log.Info("Channel closed",
"id", s.currentChannel.ID(),
"blocks_pending", len(s.blocks),
"blocks_pending", s.blocks.Len(),
"num_frames", s.currentChannel.TotalFrames(),
"input_bytes", inBytes,
"output_bytes", outBytes,
Expand All @@ -404,7 +407,7 @@ func (s *channelManager) AddL2Block(block *types.Block) error {
}

s.metr.RecordL2BlockInPendingQueue(block)
s.blocks = append(s.blocks, block)
s.blocks.Enqueue(block)
s.tip = block.Hash()

return nil
Expand Down Expand Up @@ -489,7 +492,7 @@ func (s *channelManager) Requeue(newCfg ChannelConfig) {
}

// We put the blocks back at the front of the queue:
s.blocks = append(blocksToRequeue, s.blocks...)
s.blocks.Prepend(blocksToRequeue...)
// Channels which where already being submitted are put back
s.channelQueue = newChannelQueue
s.currentChannel = nil
Expand Down
6 changes: 4 additions & 2 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
derivetest "github.com/ethereum-optimism/optimism/op-node/rollup/derive/test"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/queue"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -87,7 +88,7 @@ func ChannelManagerReturnsErrReorg(t *testing.T, batchType uint) {
require.NoError(t, m.AddL2Block(c))
require.ErrorIs(t, m.AddL2Block(x), ErrReorg)

require.Equal(t, []*types.Block{a, b, c}, m.blocks)
require.Equal(t, queue.Queue[*types.Block]{a, b, c}, m.blocks)
}

// ChannelManagerReturnsErrReorgWhenDrained ensures that the channel manager
Expand Down Expand Up @@ -626,7 +627,7 @@ func TestChannelManager_Requeue(t *testing.T) {

// This is the snapshot of channel manager state we want to reinstate
// when we requeue
stateSnapshot := []*types.Block{blockA, blockB}
stateSnapshot := queue.Queue[*types.Block]{blockA, blockB}
m.blocks = stateSnapshot
require.Empty(t, m.channelQueue)

Expand Down Expand Up @@ -664,5 +665,6 @@ func TestChannelManager_Requeue(t *testing.T) {

// The requeue shouldn't affect the pending channel
require.Contains(t, m.channelQueue, channel0)

require.NotContains(t, m.blocks, blockA)
}
75 changes: 75 additions & 0 deletions op-service/queue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package queue

// Queue implements a FIFO queue.
type Queue[T any] []T

// Enqueue adds the elements to the back of the queue.
func (q *Queue[T]) Enqueue(t ...T) {
if len(t) == 0 {
return
}
*q = append(*q, t...)
}

// Dequeue removes a single element from the front of the queue
// (if there is one) and returns it. Returns a zero value and false
// if there is no element to dequeue.
func (q *Queue[T]) Dequeue() (T, bool) {
if len(*q) == 0 {
var zeroValue T
return zeroValue, false
}
t := (*q)[0]
*q = (*q)[1:]
return t, true
}

// DequeueN removes N elements from the front of the queue
// (if there are enough) and returns a slice of those elements. Returns
// a nil slice and false if there are insufficient elements to dequeue.
func (q *Queue[T]) DequeueN(N int) ([]T, bool) {
if len(*q) < N {
return nil, false
}
t := (*q)[0:N]
*q = (*q)[N:]
return t, true
}

// Prepend inserts the elements at the front of the queue,
// preserving their order. A noop if t is empty.
func (q *Queue[T]) Prepend(t ...T) {
if len(t) == 0 {
return
}
*q = append(t, *q...)
}

// Clear removes all elements from the queue.
func (q *Queue[T]) Clear() {
*q = (*q)[:0]
}

// Len returns the number of elements in the queue.
func (q *Queue[T]) Len() int {
return len(*q)
}

// Peek returns the single element at the front of the queue
// (if there is one) without removing it. Returns a zero value and
// false if there is no element to peek at.
func (q *Queue[T]) Peek() (T, bool) {
return q.PeekN(0)
}

// PeekN returns the element in Nth position in the queue
// Returns a zero value and false if there are insufficient elements
// in the queue.
func (q *Queue[T]) PeekN(N int) (T, bool) {
if len(*q) <= N {
var zeroValue T
return zeroValue, false
}
t := (*q)[N]
return t, true
}
122 changes: 122 additions & 0 deletions op-service/queue/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package queue

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestQueue(t *testing.T) {
t.Run("enqueue amd dequeue", func(t *testing.T) {
q := Queue[int]{}
q.Enqueue(1, 2, 3, 4)

p, peekOk := q.Peek()
require.True(t, peekOk)
require.Equal(t, 1, p)

d, dequeueOk := q.Dequeue()
require.Equal(t, 1, d)
require.True(t, dequeueOk)
require.Equal(t, 3, q.Len())
p, peekOk = q.Peek()
require.True(t, peekOk)
require.Equal(t, 2, p)

d, dequeueOk = q.Dequeue()
require.Equal(t, 2, d)
require.True(t, dequeueOk)
require.Equal(t, 2, q.Len())
p, peekOk = q.Peek()
require.True(t, peekOk)
require.Equal(t, 3, p)

d, dequeueOk = q.Dequeue()
require.Equal(t, 3, d)
require.True(t, dequeueOk)
require.Equal(t, 1, q.Len())
p, peekOk = q.Peek()
require.True(t, peekOk)
require.Equal(t, 4, p)

d, dequeueOk = q.Dequeue()
require.Equal(t, 4, d)
require.True(t, dequeueOk)
require.Equal(t, 0, q.Len())
p, peekOk = q.Peek()
require.False(t, peekOk)
require.Equal(t, 0, p)

d, dequeueOk = q.Dequeue()
require.Equal(t, 0, d)
require.False(t, dequeueOk)
require.Equal(t, 0, q.Len())
p, peekOk = q.Peek()
require.False(t, peekOk)
require.Equal(t, 0, p)
p, peekOk = q.Peek()
require.False(t, peekOk)
require.Equal(t, 0, p)
})

t.Run("peekN and deqeueueN", func(t *testing.T) {
q := Queue[int]{}
q.Enqueue(1, 2, 3, 4)

p, peekOk := q.PeekN(1)
require.True(t, peekOk)
require.Equal(t, 2, p)

p, peekOk = q.PeekN(2)
require.Equal(t, 3, p)
require.True(t, peekOk)
require.Equal(t, 4, q.Len())

p, peekOk = q.PeekN(4)
require.Equal(t, 0, p)
require.False(t, peekOk)

d, dequeueOk := q.DequeueN(1)
require.Equal(t, []int{1}, d)
require.True(t, dequeueOk)
require.Equal(t, 3, q.Len())

d, dequeueOk = q.DequeueN(3)
require.Equal(t, []int{2, 3, 4}, d)
require.True(t, dequeueOk)
require.Equal(t, 0, q.Len())
})

t.Run("enqueue and clear", func(t *testing.T) {
q := Queue[int]{}
q.Enqueue(5, 6, 7)

q.Clear()
require.Equal(t, 0, q.Len())

d, ok := q.Dequeue()
require.Equal(t, 0, d)
require.False(t, ok)
})

t.Run("prepend", func(t *testing.T) {
var q, r Queue[int]
q.Enqueue(5, 6, 7)
r.Enqueue(8, 9)

q.Prepend(r...)
require.Equal(t, 5, q.Len())

d, ok := q.Dequeue()
require.Equal(t, 8, d)
require.True(t, ok)
require.Equal(t, 4, q.Len())

q.Prepend()
require.Equal(t, 4, q.Len())

d, ok = q.Dequeue()
require.Equal(t, 9, d)
require.True(t, ok)
})
}

0 comments on commit d1f6501

Please sign in to comment.