diff --git a/Makefile b/Makefile index 1c01268..6b4b90e 100644 --- a/Makefile +++ b/Makefile @@ -5,13 +5,13 @@ build: @go build -o ./bin/ingest-bsc ./cmd/ingest/bsc/main.go run: build - @./bin/api + @./bin/api --config config.toml ingest-eth: build - @./bin/ingest-eth + @./bin/ingest-eth --config config.toml ingest-bsc: build - @./bin/ingest-bsc + @./bin/ingest-bsc --config config.toml postgres-up: docker compose -f ./docker/postgres.yml up -d --remove-orphans diff --git a/adapter/adapter.go b/adapter/adapter.go index 4f2c916..18fc3e0 100644 --- a/adapter/adapter.go +++ b/adapter/adapter.go @@ -117,13 +117,13 @@ func ReadTokens(loc string) ([]*types.Token, error) { address := record[0] if _, exists := tokenMap[address]; !exists { tokenMap[address] = &types.Token{ - Address: address, - Name: record[1], - Symbol: record[2], - Decimals: uint8(decimals), - Creator: record[4], - CreatedAtBlock: createdAtBlock, - ChainID: 0, // Set ChainID to 0 for now + Address: address, + Name: record[1], + Symbol: record[2], + Decimals: uint8(decimals), + Creator: record[4], + CreatedAt: createdAtBlock, + ChainID: 0, // Set ChainID to 0 for now } } } @@ -135,7 +135,7 @@ func ReadTokens(loc string) ([]*types.Token, error) { // sort by created at block asc sort.Slice(tokens, func(i, j int) bool { - return tokens[i].CreatedAtBlock < tokens[j].CreatedAtBlock + return tokens[i].CreatedAt < tokens[j].CreatedAt }) return tokens, nil diff --git a/cmd/dev/main.go b/cmd/dev/main.go index b61ddc8..10bd564 100644 --- a/cmd/dev/main.go +++ b/cmd/dev/main.go @@ -34,8 +34,14 @@ func main() { if err := eth.Init(); err != nil { panic(err) } + h, err := db.GetHeights() + if err != nil { + panic(err) + } + + fmt.Printf("Heights %v\n", h) - getPairs(eth) + //getPairs(eth) // bts, err := eth.GetBlockTimestamps(0, 10) // if err != nil { // panic(err) diff --git a/cmd/ingest/bsc/main.go b/cmd/ingest/bsc/main.go index b721965..e5bd603 100644 --- a/cmd/ingest/bsc/main.go +++ b/cmd/ingest/bsc/main.go @@ -1,7 +1,11 @@ package main import ( + "flag" "fmt" + "log" + "log/slog" + "os" "time" "github.com/autoapev1/indexer/adapter" @@ -10,11 +14,23 @@ import ( ) func main() { + var ( + configFile string + ) + flagSet := flag.NewFlagSet("ingest-eth", flag.ExitOnError) + flagSet.StringVar(&configFile, "config", "config.toml", "") + flagSet.Parse(os.Args[1:]) + + slog.Info("config", "configFile", configFile) + err := config.Parse(configFile) + if err != nil { + log.Fatal(err) + } conf := config.Get().Storage.Postgres conf.Name = "BSC" db := storage.NewPostgresDB(conf).WithChainID(56) - err := db.Init() + err = db.Init() if err != nil { panic(err) } @@ -55,7 +71,7 @@ func main() { } func IngestPairs(pg *storage.PostgresStore) error { - data, err := adapter.ReadPairs("./adapter/data/bsc.pairs.csv") + data, err := adapter.ReadPairs("./adapter/data/bsc.pairs2.csv") if err != nil { return err } diff --git a/cmd/ingest/eth/main.go b/cmd/ingest/eth/main.go index a321404..020ddd6 100644 --- a/cmd/ingest/eth/main.go +++ b/cmd/ingest/eth/main.go @@ -1,7 +1,11 @@ package main import ( + "flag" "fmt" + "log" + "log/slog" + "os" "time" "github.com/autoapev1/indexer/adapter" @@ -10,11 +14,24 @@ import ( ) func main() { + var ( + configFile string + ) + flagSet := flag.NewFlagSet("ingest-eth", flag.ExitOnError) + flagSet.StringVar(&configFile, "config", "config.toml", "") + flagSet.Parse(os.Args[1:]) + + slog.Info("config", "configFile", configFile) + err := config.Parse(configFile) + if err != nil { + log.Fatal(err) + } + conf := config.Get().Storage.Postgres conf.Name = "ETH" db := storage.NewPostgresDB(conf).WithChainID(1) - err := db.Init() + err = db.Init() if err != nil { panic(err) } diff --git a/eth/eth.go b/eth/eth.go index 8b34a0c..0033325 100644 --- a/eth/eth.go +++ b/eth/eth.go @@ -16,6 +16,7 @@ type Network struct { config config.Config Client *ethclient.Client Web3 *web3.Web3 + ready bool } func NewNetwork(c types.Chain, conf config.Config) *Network { @@ -25,6 +26,10 @@ func NewNetwork(c types.Chain, conf config.Config) *Network { } } +func (n *Network) Ready() bool { + return n.ready +} + func (n *Network) Init() error { var err error diff --git a/eth/method_blocks.go b/eth/method_blocks.go index 413ebbe..5025648 100644 --- a/eth/method_blocks.go +++ b/eth/method_blocks.go @@ -10,7 +10,7 @@ import ( "github.com/ethereum/go-ethereum/rpc" ) -func (n *Network) GetBlockTimestamps(from int64, to int64) ([]*types.BlockTimestamp, error) { +func (n *Network) GetBlockTimestamps(ctx context.Context, from int64, to int64) ([]*types.BlockTimestamp, error) { blockTimestamps := make([]*types.BlockTimestamp, 0, to-from) @@ -46,7 +46,7 @@ func (n *Network) GetBlockTimestamps(from int64, to int64) ([]*types.BlockTimest wg.Done() }() - bts, err := n.getBlockTimestampBatch(batch) + bts, err := n.getBlockTimestampBatch(ctx, batch) if err != nil { slog.Error("getBlockTimestampBatch", "err", err) return @@ -89,10 +89,9 @@ func (n *Network) makeBlockTimestampBatches(from int64, to int64, batchSize int6 return batches } -func (n *Network) getBlockTimestampBatch(batch []rpc.BatchElem) ([]*types.BlockTimestamp, error) { +func (n *Network) getBlockTimestampBatch(ctx context.Context, batch []rpc.BatchElem) ([]*types.BlockTimestamp, error) { var blockTimestamps []*types.BlockTimestamp - ctx := context.Background() if err := n.Client.Client().BatchCallContext(ctx, batch); err != nil { return nil, err } diff --git a/eth/method_pairs.go b/eth/method_pairs.go index 19190ec..ef8c7fa 100644 --- a/eth/method_pairs.go +++ b/eth/method_pairs.go @@ -165,7 +165,7 @@ func (n *Network) getPairs(ctx context.Context, decoder abi.ABI, signature commo Token1Address: common.HexToAddress((l.Topics[2].String())).String(), Fee: l.Topics[3].Big().Int64(), PoolType: 3, - PoolAddress: "unknown", + PoolAddress: "0x0000000000000000000000000000000000000000", TickSpacing: 0, } diff --git a/eth/method_tokens.go b/eth/method_tokens.go index 6321700..8f24da6 100644 --- a/eth/method_tokens.go +++ b/eth/method_tokens.go @@ -193,7 +193,7 @@ func (n *Network) getStage1TokenInfoBatch(ctx context.Context, batch []rpc.Batch token.Creator = creator.Creator if token.Creator == "" { - token.Creator = "unknown" + token.Creator = "0x0000000000000000000000000000000000000000" } token.CreationHash = creator.Hash if token.CreationHash == "" { @@ -284,7 +284,7 @@ func (n *Network) getStage2TokenInfoBatch(ctx context.Context, batch []rpc.Batch slog.Error("getStage2TokenInfoBatch", "err", err) blockNumberInt = 0 } - tokens[i].CreatedAtBlock = blockNumberInt + tokens[i].CreatedAt = blockNumberInt } return nil diff --git a/storage/sql.go b/storage/sql.go index c0d9389..f4eb83a 100644 --- a/storage/sql.go +++ b/storage/sql.go @@ -20,6 +20,7 @@ type PostgresStore struct { DB *bun.DB ChainID int64 debug bool + ready bool } func NewPostgresDB(conf config.PostgresConfig) *PostgresStore { @@ -46,6 +47,10 @@ func (p *PostgresStore) WithDebug() *PostgresStore { return p } +func (p *PostgresStore) Ready() bool { + return p.ready +} + func (p *PostgresStore) Init() error { st := time.Now() err := p.CreateTables() @@ -160,7 +165,7 @@ func (p *PostgresStore) BulkInsertBlockTimestamp(blockTimestamps []*types.BlockT batchSize := 10000 for i := 0; i < len(blockTimestamps); i += batchSize { - fmt.Printf("Inserting blocktimestamps %d to %d\n", i, i+batchSize) + fmt.Printf("inserting into block_timestamps \tfrom:%d \tto:%d\n", i, i+batchSize) end := i + batchSize if end > len(blockTimestamps) { end = len(blockTimestamps) @@ -182,10 +187,10 @@ func (p *PostgresStore) GetBlockTimestamps(to int64, from int64) ([]*types.Block var blockTimestamps []*types.BlockTimestamp ctx := context.Background() - err := p.DB.NewSelect().Model(&blockTimestamps). + err := p.DB.NewSelect(). + Table("block_timestamps"). Where("block >= ?", from). Where("block <= ?", to). - Limit(10000). Scan(ctx) if err != nil { @@ -198,7 +203,10 @@ func (p *PostgresStore) GetBlockTimestamps(to int64, from int64) ([]*types.Block func (p *PostgresStore) GetHight() (int64, error) { var block int64 ctx := context.Background() - err := p.DB.NewSelect().Model(&types.BlockTimestamp{}).ColumnExpr("MAX(block)").Scan(ctx, &block) + err := p.DB.NewSelect(). + Table("block_timestamps"). + ColumnExpr("MAX(block)"). + Scan(ctx, &block) if err != nil { return block, err } @@ -220,7 +228,11 @@ func (p *PostgresStore) BulkInsertTokenInfo(tokenInfos []*types.Token) error { batchSize := 10000 for i := 0; i < len(tokenInfos); i += batchSize { - fmt.Printf("Inserting tokens %d to %d\n", i, i+batchSize) + tokenInfos[i].Lower() + } + + for i := 0; i < len(tokenInfos); i += batchSize { + fmt.Printf("inserting into tokens \tfrom:%d \tto:%d\n", i, i+batchSize) end := i + batchSize if end > len(tokenInfos) { end = len(tokenInfos) @@ -341,7 +353,11 @@ func (p *PostgresStore) BulkInsertPairInfo(pairInfos []*types.Pair) error { batchSize := 100000 for i := 0; i < len(pairInfos); i += batchSize { - fmt.Printf("Inserting pairs %d to %d\n", i, i+batchSize) + pairInfos[i].Lower() + } + + for i := 0; i < len(pairInfos); i += batchSize { + fmt.Printf("inserting into pairs \tfrom:%d \tto:%d\n", i, i+batchSize) end := i + batchSize if end > len(pairInfos) { end = len(pairInfos) @@ -497,6 +513,68 @@ func (p *PostgresStore) GetUniqueAddressesFromTokens() ([]string, error) { return addresses, nil } +func (p *PostgresStore) GetPairsWithoutTokenInfo() ([]string, error) { + pairs, err := p.GetUniqueAddressesFromPairs() + if err != nil { + return nil, err + } + + tokenAddresses, err := p.GetUniqueAddressesFromTokens() + if err != nil { + return nil, err + } + + tokens := make(map[string]struct{}) + for _, token := range tokenAddresses { + tokens[strings.ToLower(token)] = struct{}{} + } + + var missing []string + for _, pair := range pairs { + if _, exists := tokens[strings.ToLower(pair)]; !exists { + missing = append(missing, pair) + } + } + + return missing, nil +} + +func (p *PostgresStore) GetHeights() (*types.Heights, error) { + heights := &types.Heights{ + Blocks: 0, + Tokens: 0, + Pairs: 0, + } + + ctx := context.Background() + + err := p.DB.NewSelect(). + ColumnExpr("MAX(block)"). + Model(&types.BlockTimestamp{}). + Scan(ctx, &heights.Blocks) + if err != nil { + return heights, err + } + + err = p.DB.NewSelect(). + ColumnExpr("MAX(created_at)"). + Model(&types.Token{}). + Scan(ctx, &heights.Tokens) + if err != nil { + return heights, err + } + + err = p.DB.NewSelect(). + ColumnExpr("MAX(created_at)"). + Model(&types.Pair{}). + Scan(ctx, &heights.Pairs) + if err != nil { + return heights, err + } + + return heights, nil +} + var _ Store = &PostgresStore{} func fuzWrap(s *string) string { diff --git a/storage/store.go b/storage/store.go index 2752bfb..ec5a4fa 100644 --- a/storage/store.go +++ b/storage/store.go @@ -3,6 +3,8 @@ package storage import "github.com/autoapev1/indexer/types" type Store interface { + Init() error + Ready() bool GetChainID() int64 GetHight() (int64, error) @@ -27,4 +29,6 @@ type Store interface { // util GetUniqueAddressesFromPairs() ([]string, error) GetUniqueAddressesFromTokens() ([]string, error) + GetPairsWithoutTokenInfo() ([]string, error) + GetHeights() (*types.Heights, error) } diff --git a/syncer/syncer.go b/syncer/syncer.go new file mode 100644 index 0000000..adec917 --- /dev/null +++ b/syncer/syncer.go @@ -0,0 +1,159 @@ +package syncer + +import ( + "context" + "errors" + "log/slog" + + "github.com/autoapev1/indexer/config" + "github.com/autoapev1/indexer/eth" + "github.com/autoapev1/indexer/storage" +) + +var ( + ErrNoNetwork = errors.New("no network provided") + ErrNoStore = errors.New("no store provided") +) + +type Syncer struct { + config config.Config + network *eth.Network + store storage.Store + ctx context.Context +} + +func NewSyncer(conf config.Config) *Syncer { + return &Syncer{ + config: conf, + } +} + +func (s *Syncer) WithNetwork(n *eth.Network) *Syncer { + s.network = n + return s +} + +func (s *Syncer) WithStore(st storage.Store) *Syncer { + s.store = st + return s +} + +func (s *Syncer) WithContext(ctx context.Context) *Syncer { + s.ctx = ctx + return s +} + +func (s *Syncer) Init() error { + if s.network == nil { + return ErrNoNetwork + } + + if s.store == nil { + return ErrNoStore + } + + if s.ctx == nil { + s.ctx = context.Background() + } + + if !s.network.Ready() { + slog.Warn("network not ready, initializing") + err := s.network.Init() + if err != nil { + return err + } + } + + if !s.store.Ready() { + slog.Warn("store not ready, initializing") + err := s.store.Init() + if err != nil { + return err + } + } + + return nil +} + +func (s *Syncer) Sync(ctx context.Context) error { + if err := s.Init(); err != nil { + return err + } + + if err := s.ArchiveSync(ctx); err != nil { + return err + } + + select { + case <-ctx.Done(): + return nil + default: + } + + return nil +} + +func (s *Syncer) ArchiveSync(ctx context.Context) error { + + chainHeight, err := s.network.Web3.Eth.GetBlockNumber() + if err != nil { + return err + } + + heights, err := s.store.GetHeights() + if err != nil { + slog.Error("failed to get db heights") + return err + } + + if (int64(chainHeight) - heights.Blocks) > 0 { + slog.Info("chain height is higher than db blocktimestamp height, syncing block timestamps", "chainHeight", chainHeight, "dbHeight", heights.Blocks) + bts, err := s.network.GetBlockTimestamps(ctx, heights.Blocks, int64(chainHeight)) + if err != nil { + slog.Error("failed to get block timestamps", "error", err) + return err + } + + err = s.store.BulkInsertBlockTimestamp(bts) + if err != nil { + slog.Error("failed to ingest block timestamps", "error", err) + return err + } + } + + if (int64(chainHeight) - heights.Pairs) > 0 { + slog.Info("chain height is higher than db pair height, syncing pairs", "chainHeight", chainHeight, "dbHeight", heights.Pairs) + pairs, err := s.network.GetPairs(ctx, heights.Pairs, int64(chainHeight)) + if err != nil { + slog.Error("failed to get pairs", "error", err) + return err + } + + err = s.store.BulkInsertPairInfo(pairs) + if err != nil { + slog.Error("failed to ingest pairs", "error", err) + return err + } + + toFetchTokens, err := s.store.GetPairsWithoutTokenInfo() + if err != nil { + slog.Error("failed to get pairs without token info", "error", err) + return err + } + + tokens, err := s.network.GetTokenInfo(ctx, toFetchTokens) + if err != nil { + slog.Error("failed to get tokens", "error", err) + return err + } + + err = s.store.BulkInsertTokenInfo(tokens) + if err != nil { + slog.Error("failed to ingest tokens", "error", err) + return err + } + } + return nil +} +func (s *Syncer) LiveSync(ctx context.Context, bn int64) {} +func (s *Syncer) BlockOracle(ctx context.Context) {} diff --git a/types/tokens.go b/types/tokens.go index bc71338..124749e 100644 --- a/types/tokens.go +++ b/types/tokens.go @@ -1,21 +1,40 @@ package types -import "strings" +import ( + "strings" + + "github.com/uptrace/bun" +) type Token struct { - Address string `json:"address" bun:",pk"` - Name string `json:"name"` - Symbol string `json:"symbol"` - Decimals uint8 `json:"decimals"` - Creator string `json:"creator"` - CreatedAtBlock int64 `json:"created_at"` - CreationHash string `json:"creation_hash"` - ChainID int16 `json:"chain_id"` + bun.BaseModel `bun:"table:tokens,alias:tokens" json:"-"` + Address string `json:"address" bun:",pk,type:varchar(42),unique"` + Name string `json:"name"` + Symbol string `json:"symbol"` + Decimals uint8 `json:"decimals"` + Creator string `json:"creator" bun:",type:varchar(42),default:'0x0000000000000000000000000000000000000000'"` + CreatedAt int64 `json:"created_at"` + CreationHash string `json:"creation_hash" bun:",type:varchar(66),default:'0x0000000000000000000000000000000000000000000000000000000000000000'"` + ChainID int16 `json:"chain_id"` +} + +func (p *Token) Lower() { + if p.Name == "_Unknown" { + p.Name = "unknown" + } + if p.Symbol == "_Unknown" { + p.Symbol = "unknown" + } + + p.Address = strings.ToLower(p.Address) + p.Creator = strings.ToLower(p.Creator) + p.CreationHash = strings.ToLower(p.CreationHash) } type BlockTimestamp struct { - Block int64 `json:"block" bun:",pk"` - Timestamp int64 `json:"timestamp"` + bun.BaseModel `bun:"table:block_timestamps,alias:block_timestamps" json:"-"` + Block int64 `json:"block" bun:",pk,notnull,unique"` + Timestamp int64 `json:"timestamp" bun:",notnull,default:0"` } type Creator struct { @@ -28,14 +47,15 @@ type BlockNumber struct { } type Pair struct { - Token0Address string `json:"token0_address"` - Token1Address string `json:"token1_address"` - Fee int64 `json:"fee"` - TickSpacing int64 `json:"tick_spacing"` - PoolAddress string `json:"pool_address"` - PoolType uint8 `json:"pool_type"` + bun.BaseModel `bun:"table:pairs,alias:pairs" json:"-"` + Token0Address string `json:"token0_address" bun:",type:varchar(42),notnull"` + Token1Address string `json:"token1_address" bun:",type:varchar(42),notnull"` + Fee int64 `json:"fee" bun:",notnull,default:0"` + TickSpacing int64 `json:"tick_spacing" bun:",notnull,default:0"` + PoolAddress string `json:"pool_address" bun:",notnull,type:varchar(42),unique"` + PoolType uint8 `json:"pool_type" bun:",notnull,default:0"` CreatedAt int64 `json:"created_at"` - Hash string `json:"hash" bun:",pk"` + Hash string `json:"hash" bun:",pk,type:varchar(66)"` ChainID int16 `json:"chain_id"` } @@ -58,3 +78,9 @@ type OHLC struct { NB uint32 `json:"nb"` // number of buy NS uint32 `json:"ns"` // number of sells } + +type Heights struct { + Blocks int64 + Tokens int64 + Pairs int64 +}