From 317ead593e1c5d309da833357a3245a58788bc7f Mon Sep 17 00:00:00 2001 From: Ononiwu Maureen Date: Tue, 12 Mar 2024 13:50:22 +0100 Subject: [PATCH] chaindataloader: Added new package Signed-off-by: Ononiwu Maureen --- chaindataloader/binary.go | 257 ++++++++++++++++++++++++++++++ chaindataloader/binary_test.go | 205 ++++++++++++++++++++++++ chaindataloader/dataloader.go | 283 +++++++++++++++++++++++++++++++++ 3 files changed, 745 insertions(+) create mode 100644 chaindataloader/binary.go create mode 100644 chaindataloader/binary_test.go create mode 100644 chaindataloader/dataloader.go diff --git a/chaindataloader/binary.go b/chaindataloader/binary.go new file mode 100644 index 000000000..572f82aa2 --- /dev/null +++ b/chaindataloader/binary.go @@ -0,0 +1,257 @@ +package chaindataloader + +import ( + "bytes" + "errors" + "fmt" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "io" + + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/tlv" +) + +const ( + // blockHeaderSize is the size one block header occupies in bytes. + blockHeaderSize dataSize = 80 + + // filterHeaderSize is the size one filter header occupies in bytes. + filterHeaderSize dataSize = 32 +) + +// binReader is an internal struct that holds all data the binReader needs +// to fetch headers. +// Each file has a header of varint size consisting of, +// +// dataType || extra metadata (e.g. filterType) || startHeight || endHeight +// || chain +// in that order. +type binReader struct { + // reader represents the source to be read. + reader io.ReadSeeker + + // startHeight represents the height of the first header in the file. + startHeight int32 + + // endHeight represents the height of the last header in the file. + endHeight int32 + + // offset represents the distance required to read the first header from + // the file. + initialOffset uint32 + + // chain represents the bitcoin network the headers in the file belong to. + chain wire.BitcoinNet + + dataSize dataSize +} + +type binConfig struct { + reader io.ReadSeeker + headerType dataType +} + +type blkHdrBinReader struct { + *binReader + headerDecoder[*wire.BlockHeader] +} + +type cfHdrBinReader struct { + *binReader + headerDecoder[*chainhash.Hash] +} + +func newBinaryBlkHdrLoader(c *binConfig) (*blkHdrBinReader, error) { + + b, err := newBinaryLoader(c) + if err != nil { + return nil, err + } + + return &blkHdrBinReader{ + binReader: b, + headerDecoder: blkHdrDecoder, + }, nil +} + +func newBinaryFilterHdrLoader(c *binConfig) (*cfHdrBinReader, error) { + + b, err := newBinaryLoader(c) + if err != nil { + return nil, err + } + + return &cfHdrBinReader{ + binReader: b, + headerDecoder: filterHdrDecoder, + }, nil +} + +// newBinaryChainDataLoader initializes a Binary Loader. +func newBinaryLoader(c *binConfig) (*binReader, error) { + + // Create scratch buffer. + scratch := [8]byte{} + + typeOfData, err := tlv.ReadVarInt(c.reader, &scratch) + if err != nil { + return nil, fmt.Errorf("error obtaining data type "+ + "of file %w", err) + } + + if dataType(typeOfData) != c.headerType { + return nil, fmt.Errorf("data type mismatch: got %v but expected %v", + dataType(typeOfData), blockHeaders) + } + + var headerSize dataSize + switch c.headerType { + case blockHeaders: + headerSize = blockHeaderSize + case filterHeaders: + headerSize = filterHeaderSize + default: + return nil, errors.New("unsupported header type") + + } + + // Read start height of block header file. + start, err := tlv.ReadVarInt(c.reader, &scratch) + if err != nil { + return nil, fmt.Errorf("error obtaining start height "+ + "of file %w", err) + } + + // Read end height of block header file. + end, err := tlv.ReadVarInt(c.reader, &scratch) + if err != nil { + return nil, fmt.Errorf("error obtaining end height of file "+ + "%w", err) + } + + // Read the bitcoin network, the headers in the header file belong to. + chainChar, err := tlv.ReadVarInt(c.reader, &scratch) + if err != nil { + return nil, fmt.Errorf("error obtaining chain of file %w", err) + } + + // obtain space occupied by metadata as initial offset + initialOffset, err := c.reader.Seek(0, io.SeekCurrent) + if err != nil { + return nil, fmt.Errorf("unable to determine initial offset: "+ + "%v", err) + } + + return &binReader{ + reader: c.reader, + startHeight: int32(start), + endHeight: int32(end), + initialOffset: uint32(initialOffset), + chain: wire.BitcoinNet(chainChar), + dataSize: headerSize, + }, nil +} + +// NeutrinoHeader is a type parameter for block header and filter hash. +type NeutrinoHeader interface { + *wire.BlockHeader | *chainhash.Hash +} + +// headerDecoder type serializes the passed byte to the required header type. +type headerDecoder[T NeutrinoHeader] func([]byte) (T, error) + +// blkHdrDecoder serializes the passed data in bytes to *wire.BlockHeader type. +func blkHdrDecoder(data []byte) (*wire.BlockHeader, error) { + var blockHeader wire.BlockHeader + + headerReader := bytes.NewReader(data) + + // Finally, decode the raw bytes into a proper bitcoin header. + if err := blockHeader.Deserialize(headerReader); err != nil { + return nil, fmt.Errorf("error deserializing block header: %w", + err) + } + + return &blockHeader, nil +} + +// filterHdrDecoder serializes the passed data in bytes to filter header hash. +func filterHdrDecoder(data []byte) (*chainhash.Hash, error) { + + return chainhash.NewHash(data) +} + +// readHeaders fetches headers from the binary file. +func readHeaders[T NeutrinoHeader](numHeaders int32, reader io.ReadSeeker, dataTypeSize dataSize, + decoder headerDecoder[T]) ([]T, error) { + + hdrs := make([]T, numHeaders) + for i := int32(0); i < numHeaders; i++ { + rawData := make([]byte, dataTypeSize) + + if _, err := reader.Read(rawData); err != nil { + if err == io.EOF { + break + } + return nil, err + } + + T, err := decoder(rawData) + + if err != nil { + return nil, err + } + hdrs[i] = T + } + return hdrs, nil +} + +func (b *blkHdrBinReader) FetchHeaders(numHeaders int32) ([]*wire.BlockHeader, error) { + return readHeaders[*wire.BlockHeader](numHeaders, b.reader, b.dataSize, b.headerDecoder) +} + +func (b *cfHdrBinReader) FetchHeaders(numHeaders int32) ([]*chainhash.Hash, error) { + return readHeaders[*chainhash.Hash](numHeaders, b.reader, b.dataSize, b.headerDecoder) +} + +// EndHeight function returns the height of the last header in the file. +func (b *binReader) EndHeight() int32 { + return b.endHeight +} + +// StartHeight function returns the height of the first header in the file. +func (b *binReader) StartHeight() int32 { + return b.startHeight +} + +// HeadersChain function returns the network the headers in the file belong to. +func (b *binReader) HeadersChain() wire.BitcoinNet { + return b.chain +} + +// SetHeight function receives a height which should be the height of the last +// header the caller has. It uses this to set the appropriate offest for +// fetching Headers. +func (b *binReader) SetHeight(height int32) error { + if b.startHeight > height || height > b.endHeight { + return errors.New("unable to set height as file does not " + + "contain requested height") + } + + // Compute offset to fetch header at `height` relative to the binReader's + // header's start height. + offset := (height - b.startHeight) * int32(b.dataSize) + + // Compute total offset to fetch next header *after* header at `height` + // relative to the reader's start point which contains the reader's + // metadata as well. + totalOffset := (offset + int32(b.initialOffset)) + int32(b.dataSize) + + _, err := b.reader.Seek(int64(totalOffset), io.SeekStart) + + if err != nil { + return fmt.Errorf("unable to set seek for Loader: %v", err) + } + + return nil +} diff --git a/chaindataloader/binary_test.go b/chaindataloader/binary_test.go new file mode 100644 index 000000000..db3cec9d7 --- /dev/null +++ b/chaindataloader/binary_test.go @@ -0,0 +1,205 @@ +package chaindataloader + +import ( + "os" + "path/filepath" + "testing" + + "github.com/btcsuite/btcd/wire" + "github.com/lightninglabs/neutrino/headerfs" + "github.com/stretchr/testify/require" +) + +const ( + testDataDir = "testdata/" +) + +type testSideLoadConfig struct { + name string + filePath string + endHeight uint32 + startHeight uint32 + network wire.BitcoinNet + tipHeight uint32 + err bool + tipErr bool + dataType dataType + dataTypeSize uint32 +} + +// testBinaryBlockHeader tests fetching block headers from a binary file. +func testBinaryBlockHeader() []testSideLoadConfig { + testCases := []testSideLoadConfig{ + + { + name: "invalid network in header file", + filePath: "start_2_end_10_encoded_8_invalidNetwork_headers.bin", + err: true, + dataType: blockHeaders, + dataTypeSize: uint32(headerfs.BlockHeaderSize), + }, + { + name: "valid regtest headers with start height, 2 and end " + + "height, 10", + filePath: "start_2_end_10_encoded_8_valid_regtest_headers.bin", + endHeight: 10, + startHeight: 2, + network: wire.TestNet, + tipHeight: 1, + tipErr: true, + err: false, + dataType: blockHeaders, + dataTypeSize: uint32(headerfs.BlockHeaderSize), + }, + { + name: "valid testnet headers with start height, 0 and end " + + "height, 8", + filePath: "start_0_end_8_encoded_8_valid_testnet_headers.bin", + endHeight: 8, + startHeight: 0, + network: wire.TestNet3, + tipHeight: 3, + tipErr: false, + err: false, + dataType: blockHeaders, + dataTypeSize: uint32(headerfs.BlockHeaderSize), + }, + { + name: "invalid data type in source", + filePath: "start_0_end_10_encoded_10_invalidDataType_testnet_" + + "headers.bin", + endHeight: 10, + startHeight: 0, + tipHeight: 0, + tipErr: false, + network: wire.TestNet3, + err: true, + dataType: blockHeaders, + dataTypeSize: uint32(headerfs.BlockHeaderSize), + }, + { + name: "valid testnet headers with start height, 0 and end " + + "height, 10", + filePath: "start_0_end_10_encoded_10_valid_testnet_headers.bin", + endHeight: 10, + startHeight: 0, + tipHeight: 0, + tipErr: false, + network: wire.TestNet3, + err: false, + dataType: blockHeaders, + dataTypeSize: uint32(headerfs.BlockHeaderSize), + }, + } + return testCases +} + +// TestNewBinarySideLoad tests that the Loader returns the appropriate data +// required for fetching headers when initialized. +func TestNewBinarySideLoad(t *testing.T) { + testCases := append([]testSideLoadConfig{}, + testBinaryBlockHeader()..., + ) + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + testFile := filepath.Join(testDataDir, tc.filePath) + + r, err := os.Open(testFile) + + require.NoError(t, err) + + bReader, err := newBinaryChainDataLoader(r, tc.dataType) + + if tc.err { + require.NotNil(t, err) + } else { + require.Nil(t, err) + } + + if err != nil { + return + } + + require.Equal(t, bReader.startHeight, tc.startHeight) + + require.Equal(t, bReader.endHeight, tc.endHeight) + + require.Equal(t, bReader.chain, tc.network) + + require.Equal(t, bReader.dataTypeSize, tc.dataTypeSize) + + tipErr := bReader.SetHeight(tc.tipHeight) + + if tc.tipErr { + require.NotNil(t, tipErr) + } else { + require.Nil(t, tipErr) + } + + if tipErr != nil { + return + } + }) + } +} + +// TestGenerateBlockHeaders tests fetching headers from the Binary file. +func TestGenerateBlockHeaders(t *testing.T) { + testFile := filepath.Join(testDataDir, + "start_0_end_10_encoded_10_valid_testnet_headers.bin") + + r, err := os.Open(testFile) + + require.NoError(t, err) + + reader, err := newBinaryChainDataLoader(r, blockHeaders) + + require.NoError(t, err) + + require.NotNil(t, reader) + + // set height to fetch header at height 1 + err = reader.SetHeight(0) + + if err != nil { + t.Fatalf("Expected no error but got %v", err) + } + + firstHeader, err := reader.fetchBlockHeaders(1) + + require.NoError(t, err) + + require.Len(t, firstHeader, 1) + + // After fetching the first header the tracker should advance so that we can + // fetch the next one. + secondHeader, err := reader.fetchBlockHeaders(1) + + require.NoError(t, err) + + require.Len(t, secondHeader, 1) + + // Assuming the source file has valid headers, + // ensure the loader fetches the expected headers. + require.NotEqual(t, firstHeader[0], secondHeader[0]) + + require.NotEqual(t, firstHeader[0], secondHeader[0].PrevBlock) + + // Though the tracker should fetch header at height 3 at this point, setting + // the height to zero should reset the tracker and also compute the right + // offset to obtain the header at height one. + err = reader.SetHeight(0) + + if err != nil { + t.Fatalf("Expected no error but got %v", err) + } + + header, err := reader.fetchBlockHeaders(1) + + require.NoError(t, err) + + require.Len(t, header, 1) + + require.Equal(t, firstHeader[0], header[0]) +} diff --git a/chaindataloader/dataloader.go b/chaindataloader/dataloader.go new file mode 100644 index 000000000..de9ac37f3 --- /dev/null +++ b/chaindataloader/dataloader.go @@ -0,0 +1,283 @@ +package chaindataloader + +import ( + "errors" + "fmt" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "io" + + "github.com/btcsuite/btcd/wire" +) + +type cfHeaderValidator interface { + VerifyCheckpoint(prevCheckpoint *chainhash.Hash, + headers []*chainhash.Hash, nextCheckpoint *chainhash.Hash) bool +} + +type headerWriter[T NeutrinoHeader] interface { + chainTip() (int32, chainhash.Hash) + write([]T) error + lastHeight() int32 + lastHeader() chainhash.Hash +} + +type blockHeaderValidator interface { + verify(headers []*wire.BlockHeader) bool +} + +// Loader is the interface a caller fetches data from. +type Loader[T NeutrinoHeader] interface { + // EndHeight indicates the height of the last Header in the chaindataloader + // source. + EndHeight() int32 + + // StartHeight indicates the height of the first Header in the + // chaindataloader source. + StartHeight() int32 + + // HeadersChain returns the bitcoin network the headers in the + // chaindataloader source belong to. + HeadersChain() wire.BitcoinNet + // + // // Load fetches and processes the passed amount of headers returning an + // // error and dteails about the current call that would affect the next. + // Load(uint32) (*ProcessHdrResp, error) + + FetchHeaders(numHeaders int32) ([]T, error) + + // FetchCfHeaders(numHeaders int32) ([]*chainhash.Hash, error) + + // SetHeight tells the chaindataloader, the point the caller would want to + // start reading from. + SetHeight(int32) error +} + +// SourceType is a type that indicates the encoding format of the +// chaindataloader source. +type SourceType uint8 + +const ( + // Binary is a SourceType that uses binary number system to encode + // information. + Binary SourceType = 0 +) + +// dataType indicates the type of data stored by the side load source. +type dataType uint8 + +const ( + // blockHeaders is a data type that indicates the data stored is + // *wire.BlockHeaders. + blockHeaders dataType = 0 + + // filterHeaders is a data type that indicates the data stored is + // filter hash. + filterHeaders dataType = 1 +) + +// dataSize is a type indicating the size in bytes a single unit of data +// handled by the chaindataloader occupies. +type dataSize uint32 + +// ReaderConfig is a generic struct that contains configuration details +// required to etch a Loader. +type ReaderConfig struct { + + // SourceType is the format type of the chaindataloader source. + SourceType SourceType + + // Reader is the chaindataloader's source. + Reader io.ReadSeeker + + HeaderType dataType +} + +// newBlockHeaderReader initializes a block header Loader based on the source +// type of the reader config. +func newBlockHeaderReader(cfg *ReaderConfig) (Loader[*wire.BlockHeader], error) { + + var ( + reader Loader[*wire.BlockHeader] + err error + ) + + switch { + case cfg.SourceType == Binary: + reader, err = newBinaryBlkHdrLoader(&binConfig{ + reader: cfg.Reader, + headerType: cfg.HeaderType, + }) + default: + return nil, errors.New("unsupported header type") + } + + return reader, err +} + +// newFilterHeaderReader initializes a filter header Loader based on the source +// type of the reader config. +func newFilterHeaderReader(cfg *ReaderConfig) (Loader[*chainhash.Hash], error) { + + var ( + reader Loader[*chainhash.Hash] + err error + ) + + switch { + case cfg.SourceType == Binary: + reader, err = newBinaryFilterHdrLoader(&binConfig{ + reader: cfg.Reader, + headerType: cfg.HeaderType, + }) + if err != nil { + return nil, err + } + default: + return nil, errors.New("unsupported header type") + } + + // Fetch next filter checkpoint, + return reader, err +} + +type checkpoints interface { + FetchCheckpoint(idx int32) (uint32, chainhash.Hash) + FindNextHeaderCheckpoint(height int32) (int32, int32, chainhash.Hash) + FindPreviousHeaderCheckpoint(height int32) (int32, int32, chainhash.Hash) + Len() int32 +} + +func obtainRangeFetchFunc(skipVerify bool, chkpt checkpoints, curHeight int32, + endHeight int32, sideloadRange int32) func(height, + idx int32) (int32, int32, *chainhash.Hash) { + + var fetchRange func(height, idx int32) (int32, int32, *chainhash.Hash) + if skipVerify { + fetchRange = func(height, idx int32) (int32, int32, + *chainhash.Hash) { + // Check if idx is out of range + if idx >= chkpt.Len() { + return 0, 0, nil + } + checkpointHeight, checkpointHash := chkpt. + FetchCheckpoint(idx) + + idx = idx + 1 + return int32(checkpointHeight) - height, + int32(checkpointHeight), &checkpointHash + } + } else { + fetchRange = func(height, idx int32) (int32, int32, + *chainhash.Hash) { + if height >= endHeight { + return 0, 0, nil + } + fetchSize := sideloadRange + if height+sideloadRange > endHeight { + fetchSize = endHeight - height + } + return fetchSize, 0, nil + } + } + + return fetchRange +} + +func SideloadCfHeaders(loader Loader[*chainhash.Hash], + blockHeaderStore headerWriter[*wire.BlockHeader], + filterHeaderStore headerWriter[*chainhash.Hash], + cfheaderVerifier cfHeaderValidator, skipVerify bool, + chkpt checkpoints, sideloadRange int32) error { + + bTip, _ := blockHeaderStore.chainTip() + + fTip, fHeader := filterHeaderStore.chainTip() + + idx, _, _ := chkpt.FindNextHeaderCheckpoint(fTip) + + fetchRange := obtainRangeFetchFunc(skipVerify, chkpt, fTip, bTip, + sideloadRange) + + for fTip < bTip { + fetchSize, _, chkpointHash := fetchRange(fTip, idx) + if fetchSize == 0 { + // log.Debugf("Completed sideloading at height %v", + // fTip) + return nil + } + + headers, err := loader.FetchHeaders(fetchSize) + if err != nil { + return err + } + + if !skipVerify { + if !cfheaderVerifier.VerifyCheckpoint( + &fHeader, headers, chkpointHash) { + + return fmt.Errorf( + "headers failed verification at "+ + "height: %v", fTip) + + } + + idx = idx + 1 + } + + err = filterHeaderStore.write(headers) + if err != nil { + return errors.New("error writing headers") + } + + fTip = filterHeaderStore.lastHeight() + fHeader = filterHeaderStore.lastHeader() + } + + return nil +} + +func SideloadblockHeaders(loader Loader[*wire.BlockHeader], + blockHeaderStore headerWriter[*wire.BlockHeader], + blockHeaderVerifier blockHeaderValidator, skipVerify bool, + chkpt checkpoints, sideloadRange int32) error { + + bTip, _ := blockHeaderStore.chainTip() + + idx, _, _ := chkpt.FindNextHeaderCheckpoint(bTip) + + fetchRange := obtainRangeFetchFunc(skipVerify, chkpt, bTip, + loader.EndHeight(), sideloadRange) + + for bTip <= loader.EndHeight() { + fetchSize, _, _ := fetchRange(bTip, idx) + if fetchSize == 0 { + // log.Debugf("Completed sideloading at height %v", + // bTip) + return nil + } + + headers, err := loader.FetchHeaders(fetchSize) + if err != nil { + return err + } + + if !skipVerify { + if !blockHeaderVerifier.verify(headers) { + return fmt.Errorf( + "headers failed verification at "+ + "height: %v", bTip) + } + + idx = idx + 1 + } + + err = blockHeaderStore.write(headers) + if err != nil { + return err + } + + bTip = blockHeaderStore.lastHeight() + } + + return nil +}