Skip to content

Commit

Permalink
feat: added s3 fallback service and made changes in avail config with…
Browse files Browse the repository at this point in the history
… node options
  • Loading branch information
RISHABHAGRAWALZRA committed Nov 27, 2024
1 parent 21590fd commit 1c9e182
Show file tree
Hide file tree
Showing 8 changed files with 428 additions and 166 deletions.
1 change: 1 addition & 0 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ var ConfigDefault = Config{
Staker: staker.DefaultL1ValidatorConfig,
SeqCoordinator: DefaultSeqCoordinatorConfig,
DataAvailability: das.DefaultDataAvailabilityConfig,
Avail: avail.DefaultAvailDAConfig,
SyncMonitor: DefaultSyncMonitorConfig,
Dangerous: DefaultDangerousConfig,
TransactionStreamer: DefaultTransactionStreamerConfig,
Expand Down
113 changes: 70 additions & 43 deletions das/avail/avail.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,18 @@ import (
"context"
"errors"
"fmt"
"strings"
"math"

"time"

gsrpc "github.com/centrifuge/go-substrate-rpc-client/v4"
"github.com/centrifuge/go-substrate-rpc-client/v4/signature"
gsrpc_types "github.com/centrifuge/go-substrate-rpc-client/v4/types"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/das/avail/vectorx"
"github.com/offchainlabs/nitro/das/avail/availDABridge"
s3_storage_service "github.com/offchainlabs/nitro/das/avail/s3StorageService"
"github.com/offchainlabs/nitro/das/dastree"
)

Expand All @@ -40,19 +37,23 @@ func IsAvailMessageHeaderByte(header byte) bool {
}

type AvailDA struct {
// Config
enable bool
vectorx vectorx.VectorX
finalizationTimeout time.Duration
appID int
api *gsrpc.SubstrateAPI
meta *gsrpc_types.Metadata
genesisHash gsrpc_types.Hash
rv *gsrpc_types.RuntimeVersion
keyringPair signature.KeyringPair
key gsrpc_types.StorageKey
bridgeApiBaseURL string
bridgeApiTimeout time.Duration
vectorXTimeout time.Duration

// Client
api *gsrpc.SubstrateAPI
meta *gsrpc_types.Metadata
genesisHash gsrpc_types.Hash
rv *gsrpc_types.RuntimeVersion
keyringPair signature.KeyringPair
key gsrpc_types.StorageKey

// Fallback
fallbackS3Service *s3_storage_service.S3StorageService
// AvailDABridge
availDABridge availDABridge.AvailDABridge
}

func NewAvailDA(cfg DAConfig, l1Client arbutil.L1Interface) (*AvailDA, error) {
Expand Down Expand Up @@ -97,30 +98,16 @@ func NewAvailDA(cfg DAConfig, l1Client arbutil.L1Interface) (*AvailDA, error) {
return nil, fmt.Errorf("AvailDAError: ⚠️ cannot create storage key, %w. %w", err, ErrAvailDAClientInit)
}

// Contract address
contractAddress := common.HexToAddress(cfg.VectorX)

// Parse the contract ABI
abi, err := abi.JSON(strings.NewReader(vectorx.VectorxABI))
if err != nil {
return nil, fmt.Errorf("AvailDAError: ⚠️ cannot create abi for vectorX, %w. %w", err, ErrAvailDAClientInit)
}

// Connect to L1 node thru web socket
client, err := ethclient.Dial(cfg.ArbSepoliaRPC)
if err != nil {
return nil, fmt.Errorf("AvailDAError: %w. %w", err, ErrAvailDAClientInit)
}

// Create a filter query to listen for events
query := ethereum.FilterQuery{
Addresses: []common.Address{contractAddress},
Topics: [][]common.Hash{{abi.Events["HeadUpdate"].ID}},
var fallbackS3Service *s3_storage_service.S3StorageService
if cfg.FallbackS3ServiceConfig.Enable {
fallbackS3Service, err = s3_storage_service.NewS3StorageService(cfg.FallbackS3ServiceConfig)
if err != nil {
return nil, fmt.Errorf("AvailDAError: unable to intialize s3 storage service for fallback, %w. %w", err, ErrAvailDAClientInit)
}
}

return &AvailDA{
enable: cfg.Enable,
vectorx: vectorx.VectorX{Abi: abi, Client: client, Query: query},
finalizationTimeout: time.Duration(cfg.Timeout),
appID: appID,
api: api,
Expand All @@ -129,9 +116,8 @@ func NewAvailDA(cfg DAConfig, l1Client arbutil.L1Interface) (*AvailDA, error) {
rv: rv,
keyringPair: keyringPair,
key: key,
bridgeApiBaseURL: cfg.BridgeApiBaseURL,
bridgeApiTimeout: BridgeApiTimeout,
vectorXTimeout: VectorXTimeout,
fallbackS3Service: fallbackS3Service,
availDABridge: availDABridge.AvailDABridge{},
}, nil
}

Expand All @@ -147,20 +133,20 @@ func (a *AvailDA) Store(ctx context.Context, message []byte) ([]byte, error) {
return nil, fmt.Errorf("AvailDAError: cannot get header for finalized block: %w", err)
}

extrinsicIndex, err := GetExtrinsicIndex(a.api, finalizedblockHash, a.keyringPair.Address, nonce)
extrinsicIndex, err := getExtrinsicIndex(a.api, finalizedblockHash, a.keyringPair.Address, nonce)
if err != nil {
return nil, err
}
log.Info("AvailDAInfo: 🏆 Data included in Avail's finalised block", "blockHash", finalizedblockHash.Hex(), "extrinsicIndex", extrinsicIndex)

blobProof, err := QueryBlobProof(a.api, extrinsicIndex, finalizedblockHash)
blobProof, err := queryBlobProof(a.api, extrinsicIndex, finalizedblockHash)
if err != nil {
return nil, err
}

// validation of blobProof in respect of submitted data
blobDataKeccak256H := crypto.Keccak256Hash(message)
if !ValidateBlobProof(blobProof, blobDataKeccak256H) {
if !validateBlobProof(blobProof, blobDataKeccak256H) {
return nil, fmt.Errorf("AvailDAError: BlobProof is invalid, BlobProof:%s", blobProof.String())
}

Expand All @@ -172,6 +158,14 @@ func (a *AvailDA) Store(ctx context.Context, message []byte) ([]byte, error) {
return nil, fmt.Errorf("AvailDAError: ⚠️ BlobPointer MashalBinary error, %w", err)
}

// fallback
if a.fallbackS3Service != nil {
err := a.fallbackS3Service.Put(ctx, message, 0)
if err != nil {
log.Error("AvailDAError: failed to put data on s3 storage service: %w", err)
}
}

return blobPointerData, nil
}

Expand All @@ -182,7 +176,41 @@ func (a *AvailDA) Read(ctx context.Context, blobPointer BlobPointer) ([]byte, er
blockHeight := blobPointer.BlockHeight
extrinsicIndex := blobPointer.ExtrinsicIndex

var data []byte
for i := 0; i < 3; i++ {
var err error
data, err = readData(a, blockHeight, extrinsicIndex)
if err == nil {
log.Info("AvailInfo: ✅ Succesfully fetched data from Avail")
break
} else if i == 2 {
log.Info("AvailInfo: ❌ failed to fetched data from Avail, err: %w", err)

if a.fallbackS3Service != nil {
data, err = a.fallbackS3Service.GetByHash(ctx, blobPointer.DasTreeRootHash)
if err != nil {
log.Info("AvailInfo: ❌ failed to read data from fallback s3 storage, err: %w", err)
return nil, fmt.Errorf("AvailDAError: unable to read data from AvailDA & Fallback s3 storage")
}
log.Info("AvailInfo: ✅ Succesfully fetched data from Avail using fallbackS3Service")
break
} else {
return nil, fmt.Errorf("AvailDAError: unable to read data from AvailDA & Fallback s3 storage is not enabled")
}

}
sleepDuration := time.Duration(math.Pow(2, float64(i))) * time.Second
time.Sleep(sleepDuration)
}

return data, nil
}

func readData(a *AvailDA, blockHeight, extrinsicIndex uint32) ([]byte, error) {
latestHeader, err := a.api.RPC.Chain.GetHeaderLatest()
if err != nil {
return nil, fmt.Errorf("AvailDAError: cannot get latest header, %w", err)
}

if latestHeader.Number < gsrpc_types.BlockNumber(blockHeight) {
return nil, fmt.Errorf("AvailDAError: %w: %w", err, ErrWrongAvailDAPointer)
Expand All @@ -205,7 +233,6 @@ func (a *AvailDA) Read(ctx context.Context, blobPointer BlobPointer) ([]byte, er
return nil, err
}

log.Info("AvailInfo: ✅ Succesfully fetched data from Avail")
return data, nil
}

Expand Down
145 changes: 145 additions & 0 deletions das/avail/availDABridge/availDABridge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package availDABridge

import (
"context"
"strings"

"fmt"
"time"

flag "github.com/spf13/pflag"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
)

const VectorxABI = `[
{
"type": "event",
"name": "HeadUpdate",
"inputs": [
{
"name": "blockNumber",
"type": "uint32",
"indexed": false,
"internalType": "uint32"
},
{
"name": "headerHash",
"type": "bytes32",
"indexed": false,
"internalType": "bytes32"
}
],
"anonymous": false
}
]`

type AvailDABridgeConfig struct {
VectorXAddress string `koanf:"vectorx-address"`
AvailBridgeApiURL string `koanf:"avail-bridge-api-url"`
ArbSepoliaRPC string `koanf:"arbsepolia-rpc"`
}

var DefaultAvailDABridgeConfig = AvailDABridgeConfig{
AvailBridgeApiURL: "https://turing-bridge-api.fra.avail.so/",
VectorXAddress: "0xA712dfec48AF3a78419A8FF90fE8f97Ae74680F0",
ArbSepoliaRPC: "wss://sepolia-rollup.arbitrum.io/rpc",
}

func AvailDABridgeAddOptions(prefix string, f *flag.FlagSet) {
f.String(prefix+".avail-bridge-api-url", DefaultAvailDABridgeConfig.AvailBridgeApiURL, "Avail bridge api offered over the HTTP-RPC interface")
f.String(prefix+".vectorx-address", DefaultAvailDABridgeConfig.VectorXAddress, "vectorx contract address for event notification")
f.String(prefix+".arbsepolia-rpc", DefaultAvailDABridgeConfig.ArbSepoliaRPC, "ArbSepolia api offered over the WS-RPC interface")
}

type AvailDABridge struct {
abi abi.ABI
client *ethclient.Client
query ethereum.FilterQuery
availbridgeApiBaseURL string
bridgeApiTimeout time.Duration
vectorXTimeout time.Duration
}

func InitializeAvailDABridge(cfg AvailDABridgeConfig, bridgeApiTimeout, vectorXTimeout time.Duration) (*AvailDABridge, error) {
// Contract address
contractAddress := common.HexToAddress(cfg.VectorXAddress)

// Parse the contract ABI
abi, err := abi.JSON(strings.NewReader(VectorxABI))
if err != nil {
return nil, fmt.Errorf("AvailDAError: ⚠️ cannot create abi for vectorX, %w", err)
}

// Connect to L1 node thru web socket
client, err := ethclient.Dial(cfg.ArbSepoliaRPC)
if err != nil {
return nil, fmt.Errorf("AvailDAError: %w", err)
}

// Create a filter query to listen for events
query := ethereum.FilterQuery{
Addresses: []common.Address{contractAddress},
Topics: [][]common.Hash{{abi.Events["HeadUpdate"].ID}},
}

return &AvailDABridge{
abi: abi,
client: client,
query: query,
availbridgeApiBaseURL: cfg.AvailBridgeApiURL,
bridgeApiTimeout: bridgeApiTimeout,
vectorXTimeout: vectorXTimeout,
}, nil
}

func (b *AvailDABridge) SubscribeForHeaderUpdate(finalizedBlockNumber uint32, t time.Duration) error {
// Subscribe to the event stream
logs := make(chan types.Log)
sub, err := b.client.SubscribeFilterLogs(context.Background(), b.query, logs)
if err != nil {
return err
}
defer sub.Unsubscribe()

// Keep the connection alive with a ping mechanism
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()
go func(t *time.Ticker) {
for range t.C {
// Send a dummy request to keep the connection alive
_, err := b.client.BlockNumber(context.Background())
if err != nil {
log.Warn("Failed to send ping to keep the connection alive for vectorx events", "err", err)
}
}
}(ticker)

log.Info("🎧 Listening for vectorx HeadUpdate event with", "blockNumber", finalizedBlockNumber)
timeout := time.After(t * time.Second)
// Loop to process incoming events
for {
select {
case err := <-sub.Err():
return err
case vLog := <-logs:
event, err := b.abi.Unpack("HeadUpdate", vLog.Data)
if err != nil {
return err
}

log.Info("🤝 New HeadUpdate event from vecotorx", "blockNumber", event[0])
val, _ := event[0].(uint32)
if val >= uint32(finalizedBlockNumber) {
return nil
}
case <-timeout:
return fmt.Errorf("⌛️ Timeout of %d seconds reached without getting HeadUpdate event from vectorx for blockNumber %v", t, finalizedBlockNumber)
}
}
}
Loading

0 comments on commit 1c9e182

Please sign in to comment.