Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avail:0:1st Proposal:Integration of Avail DA layer with Arbitrum nitro #6

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ WORKDIR /workspace
RUN apt-get update && apt-get install -y curl build-essential=12.9

FROM wasm-base as wasm-libs-builder
# clang / lld used by soft-float wasm
# clang / lld used by soft-float wasm
RUN apt-get install -y clang=1:14.0-55.7~deb12u1 lld=1:14.0-55.7~deb12u1
# pinned rust 1.70.0
# pinned rust 1.70.0
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain 1.70.0 --target x86_64-unknown-linux-gnu wasm32-unknown-unknown wasm32-wasi
COPY ./Makefile ./
COPY arbitrator/arbutil arbitrator/arbutil
Expand All @@ -54,7 +54,7 @@ FROM scratch as wasm-libs-export
COPY --from=wasm-libs-builder /workspace/ /

FROM wasm-base as wasm-bin-builder
# pinned go version
# pinned go version
RUN curl -L https://golang.org/dl/go1.20.linux-`dpkg --print-architecture`.tar.gz | tar -C /usr/local -xzf -
COPY ./Makefile ./go.mod ./go.sum ./
COPY ./arbcompress ./arbcompress
Expand All @@ -66,6 +66,7 @@ COPY ./blsSignatures ./blsSignatures
COPY ./cmd/chaininfo ./cmd/chaininfo
COPY ./cmd/replay ./cmd/replay
COPY ./das/dastree ./das/dastree
COPY ./das/avail ./das/avail
COPY ./precompiles ./precompiles
COPY ./statetransfer ./statetransfer
COPY ./util ./util
Expand Down Expand Up @@ -103,7 +104,7 @@ WORKDIR /workspace
RUN export DEBIAN_FRONTEND=noninteractive && \
apt-get update && \
apt-get install -y make wget gpg software-properties-common zlib1g-dev \
libstdc++-11-dev wabt clang llvm-dev libclang-common-14-dev libpolly-14-dev
libstdc++-11-dev wabt clang llvm-dev libclang-common-14-dev libpolly-14-dev
COPY arbitrator/Cargo.* arbitrator/
COPY arbitrator/arbutil arbitrator/arbutil
COPY arbitrator/prover/Cargo.toml arbitrator/prover/
Expand Down
14 changes: 14 additions & 0 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/offchainlabs/nitro/cmd/chaininfo"
"github.com/offchainlabs/nitro/cmd/genericconf"
"github.com/offchainlabs/nitro/das"
"github.com/offchainlabs/nitro/das/avail"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
"github.com/offchainlabs/nitro/util"
Expand Down Expand Up @@ -88,6 +89,7 @@ type BatchPoster struct {
gasRefunderAddr common.Address
building *buildingBatch
daWriter das.DataAvailabilityServiceWriter
availDAWriter avail.DataAvailabilityWriter
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
messagesPerBatch *arbmath.MovingAverage[uint64]
Expand Down Expand Up @@ -264,6 +266,7 @@ type BatchPosterOpts struct {
DeployInfo *chaininfo.RollupAddresses
TransactOpts *bind.TransactOpts
DAWriter das.DataAvailabilityServiceWriter
AvailDAWriter avail.DataAvailabilityWriter
ParentChainID *big.Int
}

Expand Down Expand Up @@ -310,6 +313,7 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e
gasRefunderAddr: opts.Config().gasRefunder,
bridgeAddr: opts.DeployInfo.Bridge,
daWriter: opts.DAWriter,
availDAWriter: opts.AvailDAWriter,
redisLock: redisLock,
}
b.messagesPerBatch, err = arbmath.NewMovingAverage[uint64](20)
Expand Down Expand Up @@ -1168,6 +1172,16 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
}
}

// ideally we make this part of the above statment by having everything under a single unified interface (soon TM)
if b.daWriter == nil && b.availDAWriter != nil {
// Store the data on Avail and return a marhsalled BlobPointer, which gets used as the sequencerMsg
// which is later used to retrieve the data from Avail
sequencerMsg, err = b.availDAWriter.Store(ctx, sequencerMsg)
if err != nil {
return false, err
}
}

data, kzgBlobs, err := b.encodeAddBatch(new(big.Int).SetUint64(batchPosition.NextSeqNum), batchPosition.MessageCount, b.building.msgCount, sequencerMsg, b.building.segments.delayedMsg, b.building.use4844)
if err != nil {
return false, err
Expand Down
2 changes: 1 addition & 1 deletion arbnode/delayed_seq_reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
defer cancel()

exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{})
tracker, err := NewInboxTracker(db, streamer, nil, nil)
tracker, err := NewInboxTracker(db, streamer, nil, nil, nil)
Require(t, err)

err = streamer.Start(ctx)
Expand Down
29 changes: 16 additions & 13 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcaster"
m "github.com/offchainlabs/nitro/broadcaster/message"
"github.com/offchainlabs/nitro/das/avail"
"github.com/offchainlabs/nitro/staker"
"github.com/offchainlabs/nitro/util/containers"
)
Expand All @@ -33,28 +34,30 @@ var (
)

type InboxTracker struct {
db ethdb.Database
txStreamer *TransactionStreamer
mutex sync.Mutex
validator *staker.BlockValidator
das arbstate.DataAvailabilityReader
blobReader arbstate.BlobReader
db ethdb.Database
txStreamer *TransactionStreamer
mutex sync.Mutex
validator *staker.BlockValidator
das arbstate.DataAvailabilityReader
availDAReader avail.DataAvailabilityReader
blobReader arbstate.BlobReader

batchMetaMutex sync.Mutex
batchMeta *containers.LruCache[uint64, BatchMetadata]
}

func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader, blobReader arbstate.BlobReader) (*InboxTracker, error) {
func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader, availDAReader avail.DataAvailabilityReader, blobReader arbstate.BlobReader) (*InboxTracker, error) {
// We support a nil txStreamer for the pruning code
if txStreamer != nil && txStreamer.chainConfig.ArbitrumChainParams.DataAvailabilityCommittee && das == nil {
return nil, errors.New("data availability service required but unconfigured")
}
tracker := &InboxTracker{
db: db,
txStreamer: txStreamer,
das: das,
blobReader: blobReader,
batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000),
db: db,
txStreamer: txStreamer,
das: das,
availDAReader: availDAReader,
blobReader: blobReader,
batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000),
}
return tracker, nil
}
Expand Down Expand Up @@ -613,7 +616,7 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L
if t.blobReader != nil {
daProviders = append(daProviders, arbstate.NewDAProviderBlobReader(t.blobReader))
}
multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, daProviders, arbstate.KeysetValidate)
multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, daProviders, t.availDAReader, arbstate.KeysetValidate)
batchMessageCounts := make(map[uint64]arbutil.MessageIndex)
currentpos := prevbatchmeta.MessageCount + 1
for {
Expand Down
15 changes: 14 additions & 1 deletion arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/offchainlabs/nitro/broadcaster"
"github.com/offchainlabs/nitro/cmd/chaininfo"
"github.com/offchainlabs/nitro/das"
"github.com/offchainlabs/nitro/das/avail"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/execution/gethexec"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
Expand Down Expand Up @@ -87,6 +88,7 @@ type Config struct {
Staker staker.L1ValidatorConfig `koanf:"staker" reload:"hot"`
SeqCoordinator SeqCoordinatorConfig `koanf:"seq-coordinator"`
DataAvailability das.DataAvailabilityConfig `koanf:"data-availability"`
Avail avail.DAConfig `koanf:"avail"`
SyncMonitor SyncMonitorConfig `koanf:"sync-monitor"`
Dangerous DangerousConfig `koanf:"dangerous"`
TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"`
Expand Down Expand Up @@ -510,6 +512,8 @@ func createNodeImpl(
var daWriter das.DataAvailabilityServiceWriter
var daReader das.DataAvailabilityServiceReader
var dasLifecycleManager *das.LifecycleManager
var availDAWriter avail.DataAvailabilityWriter
var availDAReader avail.DataAvailabilityReader
if config.DataAvailability.Enable {
if config.BatchPoster.Enable {
daWriter, daReader, dasLifecycleManager, err = das.CreateBatchPosterDAS(ctx, &config.DataAvailability, dataSigner, l1client, deployInfo.SequencerInbox)
Expand All @@ -533,9 +537,16 @@ func createNodeImpl(
}
} else if l2Config.ArbitrumChainParams.DataAvailabilityCommittee {
return nil, errors.New("a data availability service is required for this chain, but it was not configured")
} else if config.Avail.Enable {
availService, err := avail.NewAvailDA(config.Avail)
if err != nil {
return nil, err
}
availDAWriter = availService
availDAReader = availService
}

inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader, blobReader)
inboxTracker, err := NewInboxTracker(arbDb, txStreamer, daReader, availDAReader, blobReader)
if err != nil {
return nil, err
}
Expand All @@ -554,6 +565,7 @@ func createNodeImpl(
exec,
rawdb.NewTable(arbDb, storage.BlockValidatorPrefix),
daReader,
availDAReader,
blobReader,
func() *staker.BlockValidatorConfig { return &configFetcher.Get().BlockValidator },
stack,
Expand Down Expand Up @@ -672,6 +684,7 @@ func createNodeImpl(
DeployInfo: deployInfo,
TransactOpts: txOptsBatchPoster,
DAWriter: daWriter,
AvailDAWriter: availDAWriter,
ParentChainID: parentChainID,
})
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions arbstate/das_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/offchainlabs/nitro/arbos/util"
"github.com/offchainlabs/nitro/blsSignatures"
"github.com/offchainlabs/nitro/das/avail"
"github.com/offchainlabs/nitro/das/dastree"
)

Expand All @@ -24,6 +25,10 @@ type DataAvailabilityReader interface {
ExpirationPolicy(ctx context.Context) (ExpirationPolicy, error)
}

type AvailDataAvailibilityReader interface {
avail.DataAvailabilityReader
}

var ErrHashMismatch = errors.New("result does not match expected hash")

// DASMessageHeaderFlag indicates that this data is a certificate for the data availability service,
Expand Down
73 changes: 70 additions & 3 deletions arbstate/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbos/l1pricing"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/das/avail"
"github.com/offchainlabs/nitro/das/dastree"
"github.com/offchainlabs/nitro/util/blobs"
"github.com/offchainlabs/nitro/zeroheavy"
Expand Down Expand Up @@ -63,7 +64,7 @@ const maxZeroheavyDecompressedLen = 101*MaxDecompressedLen/100 + 64
const MaxSegmentsPerSequencerMessage = 100 * 1024
const MinLifetimeSecondsForDataAvailabilityCert = 7 * 24 * 60 * 60 // one week

func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash common.Hash, data []byte, daProviders []DataAvailabilityProvider, keysetValidationMode KeysetValidationMode) (*sequencerMessage, error) {
func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash common.Hash, data []byte, daProviders []DataAvailabilityProvider, availDAReader avail.DataAvailabilityReader, keysetValidationMode KeysetValidationMode) (*sequencerMessage, error) {
if len(data) < 40 {
return nil, errors.New("sequencer message missing L1 header")
}
Expand Down Expand Up @@ -115,6 +116,21 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash
}
}

if len(payload) > 0 && avail.IsAvailMessageHeaderByte(payload[0]) {
if availDAReader == nil {
log.Error("No Avail Reader configured, but sequencer message found with Avail header")
} else {
var err error
payload, err = RecoverPayloadFromAvailBatch(ctx, batchNum, data, availDAReader, nil)
if err != nil {
return nil, err
}
if payload == nil {
return parsedMsg, nil
}
}
}

// At this point, `payload` has not been validated by the sequencer inbox at all.
// It's not safe to trust any part of the payload from this point onwards.

Expand Down Expand Up @@ -165,6 +181,55 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash
return parsedMsg, nil
}

func RecoverPayloadFromAvailBatch(ctx context.Context, batchNum uint64, sequencerMsg []byte, availDAReader avail.DataAvailabilityReader, preimages map[arbutil.PreimageType]map[common.Hash][]byte) ([]byte, error) {
var keccakPreimages map[common.Hash][]byte
if preimages != nil {
if preimages[arbutil.Keccak256PreimageType] == nil {
preimages[arbutil.Keccak256PreimageType] = make(map[common.Hash][]byte)
}
keccakPreimages = preimages[arbutil.Keccak256PreimageType]
}

buf := bytes.NewBuffer(sequencerMsg[40:])

header, err := buf.ReadByte()
if err != nil {
log.Error("Couldn't deserialize Avail header byte", "err", err)
return nil, err
}
if !avail.IsAvailMessageHeaderByte(header) {
return nil, errors.New("tried to deserialize a message that doesn't have the Avail header")
}

recordPreimage := func(key common.Hash, value []byte) {
keccakPreimages[key] = value
}

blobPointer := avail.BlobPointer{}
err = blobPointer.UnmarshalFromBinary(buf.Bytes())
if err != nil {
log.Error("Couldn't unmarshal Avail blob pointer", "err", err)
return nil, err
}

log.Info("Attempting to fetch data for", "batchNum", batchNum, "availBlockHash", blobPointer.BlockHash)
payload, err := availDAReader.Read(ctx, blobPointer)
if err != nil {
log.Error("Failed to resolve blob pointer from avail", "err", err)
return nil, err
}

log.Info("Succesfully fetched payload from Avail", "batchNum", batchNum, "availBlockHash", blobPointer.BlockHash)

log.Info("Recording Sha256 preimage for Avail data")

if keccakPreimages != nil {
log.Info("Data is being recorded into the orcale", "length", len(payload))
dastree.RecordHash(recordPreimage, payload)
}
return payload, nil
}

func RecoverPayloadFromDasBatch(
ctx context.Context,
batchNum uint64,
Expand Down Expand Up @@ -369,6 +434,7 @@ type inboxMultiplexer struct {
backend InboxBackend
delayedMessagesRead uint64
daProviders []DataAvailabilityProvider
availDAReader avail.DataAvailabilityReader
cachedSequencerMessage *sequencerMessage
cachedSequencerMessageNum uint64
cachedSegmentNum uint64
Expand All @@ -378,11 +444,12 @@ type inboxMultiplexer struct {
keysetValidationMode KeysetValidationMode
}

func NewInboxMultiplexer(backend InboxBackend, delayedMessagesRead uint64, daProviders []DataAvailabilityProvider, keysetValidationMode KeysetValidationMode) arbostypes.InboxMultiplexer {
func NewInboxMultiplexer(backend InboxBackend, delayedMessagesRead uint64, daProviders []DataAvailabilityProvider, availDAReader avail.DataAvailabilityReader, keysetValidationMode KeysetValidationMode) arbostypes.InboxMultiplexer {
return &inboxMultiplexer{
backend: backend,
delayedMessagesRead: delayedMessagesRead,
daProviders: daProviders,
availDAReader: availDAReader,
keysetValidationMode: keysetValidationMode,
}
}
Expand All @@ -404,7 +471,7 @@ func (r *inboxMultiplexer) Pop(ctx context.Context) (*arbostypes.MessageWithMeta
}
r.cachedSequencerMessageNum = r.backend.GetSequencerInboxPosition()
var err error
r.cachedSequencerMessage, err = parseSequencerMessage(ctx, r.cachedSequencerMessageNum, batchBlockHash, bytes, r.daProviders, r.keysetValidationMode)
r.cachedSequencerMessage, err = parseSequencerMessage(ctx, r.cachedSequencerMessageNum, batchBlockHash, bytes, r.daProviders, r.availDAReader, r.keysetValidationMode)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion arbstate/inbox_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func FuzzInboxMultiplexer(f *testing.F) {
delayedMessage: delayedMsg,
positionWithinMessage: 0,
}
multiplexer := NewInboxMultiplexer(backend, 0, nil, KeysetValidate)
multiplexer := NewInboxMultiplexer(backend, 0, nil, nil, KeysetValidate)
_, err := multiplexer.Pop(context.TODO())
if err != nil {
panic(err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/pruning/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func findImportantRoots(ctx context.Context, chainDb ethdb.Database, stack *node
return nil, fmt.Errorf("failed to get finalized block: %w", err)
}
l1BlockNum := l1Block.NumberU64()
tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil, nil)
tracker, err := arbnode.NewInboxTracker(arbDb, nil, nil, nil, nil)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading