diff --git a/api/handlers.go b/api/handlers.go index 551aeec..8977462 100644 --- a/api/handlers.go +++ b/api/handlers.go @@ -2,7 +2,6 @@ package api import ( "encoding/json" - "fmt" "log/slog" "github.com/autoapev1/indexer/auth" @@ -150,21 +149,14 @@ func (s *Server) getChains(r *JRPCRequest) *types.GetChainsResponse { chains := []types.Chain{} for _, c := range s.config.Chains { tc := types.Chain{ - ChainID: c.ChainID, - Name: c.Name, - ShortName: c.ShortName, - ExplorerURL: c.ExplorerURL, - RouterV2: c.RouterV2Address, - FactoryV2: c.FactoryV2Address, - RouterV3: c.RouterV3Address, - FactoryV3: c.FactoryV3Address, - BlockDuration: int64(c.BlockDuration), + ChainID: c.ChainID, + Name: c.Name, + ShortName: c.ShortName, + ExplorerURL: c.ExplorerURL, } chains = append(chains, tc) } - fmt.Println(len(s.config.Chains)) - return &types.GetChainsResponse{ ID: r.ID, Method: r.Method, diff --git a/cmd/dev/main.go b/cmd/dev/main.go new file mode 100644 index 0000000..317b301 --- /dev/null +++ b/cmd/dev/main.go @@ -0,0 +1,73 @@ +package main + +import ( + "fmt" + + "github.com/autoapev1/indexer/config" + "github.com/autoapev1/indexer/eth" + "github.com/autoapev1/indexer/types" +) + +func main() { + config.Parse("config.toml") + // conf := config.Get().Storage.Postgres + // conf.Name = "ETH" + + // db := storage.NewPostgresDB(conf).WithChainID(1) + // err := db.Init() + // if err != nil { + // panic(err) + // } + + chain := types.Chain{ + Name: "Ethereum", + Http: "http://localhost:7545", + ShortName: "ETH", + ChainID: 1, + } + + eth := eth.NewNetwork(chain, config.Get()) + + if err := eth.Init(); err != nil { + panic(err) + } + + bts, err := eth.GetBlockTimestamps(0, 10) + if err != nil { + panic(err) + } + + for _, v := range bts { + fmt.Printf("Block %d timestamp %d\n", v.Block, v.Timestamp) + } + + // pairAddrs, err := db.GetUniqueAddressesFromPairs() + // if err != nil { + // panic(err) + // } + + // tokenAddrs, err := db.GetUniqueAddressesFromTokens() + // if err != nil { + // panic(err) + // } + + // uniqueAddrs := make(map[string]struct{}) + // for _, v := range pairAddrs { + // uniqueAddrs[v] = struct{}{} + // } + + // for _, v := range tokenAddrs { + // uniqueAddrs[v] = struct{}{} + // } + + // // find addresses in pairs that are not in tokens + // for _, v := range pairAddrs { + // if _, ok := uniqueAddrs[v]; !ok { + // fmt.Printf("Pair address %s not found in tokens\n", v) + // } + // } + + // fmt.Printf("Found %d unique addresses\n", len(uniqueAddrs)) + // fmt.Printf("Found %d unique pair addresses\n", len(pairAddrs)) + // fmt.Printf("Found %d unique token addresses\n", len(tokenAddrs)) +} diff --git a/config.example.toml b/config.example.toml index 9b4cd1a..a70ab2f 100644 --- a/config.example.toml +++ b/config.example.toml @@ -3,24 +3,14 @@ chainID = 1 name = "Ethereum" shortName = "ETH" explorerURL = "https://etherscan.io" -routerV2Address = "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D" -factoryV2Address = "0xc0a47dFe034B400B47bDaD5FecDa2621de6c4d95" -routerV3Address = "0xE592427A0AEce92De3Edee1F18E0157C05861564" -factoryV3Address = "0x1F98431c8aD98523631AE4a59f267346ea31F984" rpcURL = "http://localhost:8545" -blockDuration = 12 [[chains]] chainID = 56 name = "Binance Smart Chain" shortName = "BSC" explorerURL = "https://bscscan.com" -routerV2Address = "0x10ED43C718714eb63d5aA57B78B54704E256024E" -factoryV2Address = "0xcA143Ce32Fe78f1f7019d7d551a6402fC5350c73" -routerV3Address = "0xE592427A0AEce92De3Edee1F18E0157C05861564" -factoryV3Address = "0x6725F303b657a9451d8BA641348b6761A6CC7a17" rpcURL = "http://localhost:8546" -blockDuration = 3 [api] host = "localhost" diff --git a/config/config.go b/config/config.go index 1b11f8e..f4394f5 100644 --- a/config/config.go +++ b/config/config.go @@ -13,24 +13,14 @@ chainID = 1 name = "Ethereum" shortName = "ETH" explorerURL = "https://etherscan.io" -routerV2Address = "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D" -factoryV2Address = "0xc0a47dFe034B400B47bDaD5FecDa2621de6c4d95" -routerV3Address = "0xE592427A0AEce92De3Edee1F18E0157C05861564" -factoryV3Address = "0x1F98431c8aD98523631AE4a59f267346ea31F984" rpcURL = "http://localhost:8545" -blockDuration = 12 [[chains]] chainID = 56 name = "Binance Smart Chain" shortName = "BSC" explorerURL = "https://bscscan.com" -routerV2Address = "0x10ED43C718714eb63d5aA57B78B54704E256024E" -factoryV2Address = "0xcA143Ce32Fe78f1f7019d7d551a6402fC5350c73" -routerV3Address = "0xE592427A0AEce92De3Edee1F18E0157C05861564" -factoryV3Address = "0x6725F303b657a9451d8BA641348b6761A6CC7a17" rpcURL = "http://localhost:8546" -blockDuration = 3 [api] host = "localhost" @@ -94,16 +84,11 @@ type Config struct { } type ChainConfig struct { - ChainID int - Name string - ShortName string - ExplorerURL string - RouterV2Address string - FactoryV2Address string - RouterV3Address string - FactoryV3Address string - RPCURL string - BlockDuration int + ChainID int + Name string + ShortName string + ExplorerURL string + RPCURL string } type SyncConfig struct { diff --git a/eth/eth.go b/eth/eth.go index e14d88f..ed6a143 100644 --- a/eth/eth.go +++ b/eth/eth.go @@ -1,18 +1,21 @@ package eth import ( + "github.com/autoapev1/indexer/config" "github.com/autoapev1/indexer/types" "github.com/ethereum/go-ethereum/ethclient" ) type Network struct { Chain types.Chain + config config.Config Client *ethclient.Client } -func NewNetwork(c types.Chain) *Network { +func NewNetwork(c types.Chain, conf config.Config) *Network { return &Network{ - Chain: c, + Chain: c, + config: conf, } } diff --git a/eth/methods.go b/eth/methods.go new file mode 100644 index 0000000..432824e --- /dev/null +++ b/eth/methods.go @@ -0,0 +1,114 @@ +package eth + +import ( + "context" + "log/slog" + "sync" + + "github.com/autoapev1/indexer/types" + etypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rpc" +) + +func (n *Network) GetBlockTimestamps(from int64, to int64) ([]*types.BlockTimestamp, error) { + var blockTimestamps []*types.BlockTimestamp + + batchSize := n.config.Sync.BlockTimestamps.BatchSize + concurrency := n.config.Sync.BlockTimestamps.BatchSize + + if batchSize <= 0 { + batchSize = 100 + } + + if concurrency <= 0 { + concurrency = 2 + } + + batches := n.makeBlockTimestampBatches(from, to, int64(batchSize)) + + workers := make(chan int, concurrency) + var wg sync.WaitGroup + counter := 0 + for { + if counter >= len(batches) { + break + } + + workers <- 1 + wg.Add(1) + batch := batches[counter] + counter++ + + go func(batch []rpc.BatchElem) { + defer func() { + <-workers + wg.Done() + }() + + bts, err := n.getBlockTimestampBatch(batch) + if err != nil { + slog.Error("getBlockTimestampBatch", "err", err) + return + } + + blockTimestamps = append(blockTimestamps, bts...) + }(batch) + } + + wg.Wait() + + return blockTimestamps, nil +} + +func (n *Network) makeBlockTimestampBatches(from int64, to int64, batchSize int64) [][]rpc.BatchElem { + batchCount := (to - from) / batchSize + if (to-from)%batchSize != 0 { + batchCount++ + } + + batches := make([][]rpc.BatchElem, 0, batchCount) + + for i := from; i <= to; i += batchSize { + end := i + batchSize + if end > to+1 { + end = to + 1 + } + + batch := make([]rpc.BatchElem, 0, end-i) + for j := i; j < end; j++ { + batch = append(batch, rpc.BatchElem{ + Method: "eth_getBlockByNumber", + Args: []interface{}{j, false}, + Result: new(etypes.Header), + }) + } + batches = append(batches, batch) + } + + return batches +} + +func (n *Network) getBlockTimestampBatch(batch []rpc.BatchElem) ([]*types.BlockTimestamp, error) { + var blockTimestamps []*types.BlockTimestamp + + ctx := context.Background() + if err := n.Client.Client().BatchCallContext(ctx, batch); err != nil { + return nil, err + } + + for _, b := range batch { + if b.Error != nil { + return nil, b.Error + } + } + + for _, b := range batch { + header := b.Result.(*etypes.Header) + blockTimestamps = append(blockTimestamps, &types.BlockTimestamp{ + Block: header.Number.Int64(), + Timestamp: int64(header.Time), + }) + } + + return blockTimestamps, nil +} diff --git a/storage/sql.go b/storage/sql.go index 9236964..13fa666 100644 --- a/storage/sql.go +++ b/storage/sql.go @@ -450,12 +450,16 @@ func (p *PostgresStore) GetUniqueAddressesFromPairs() ([]string, error) { // Query to get distinct addresses from both token0 and token1 var addresses []string ctx := context.Background() - err := p.DB.NewSelect().ColumnExpr("DISTINCT token0_address").Scan(ctx, &addresses) + err := p.DB.NewSelect(). + Table("pairs"). + ColumnExpr("DISTINCT token0_address").Scan(ctx, &addresses) if err != nil { return addresses, err } - err = p.DB.NewSelect().ColumnExpr("DISTINCT token1_address").Scan(ctx, &addresses) + err = p.DB.NewSelect(). + Table("pairs"). + ColumnExpr("DISTINCT token1_address").Scan(ctx, &addresses) if err != nil { return addresses, err } @@ -477,7 +481,9 @@ func (p *PostgresStore) GetUniqueAddressesFromPairs() ([]string, error) { func (p *PostgresStore) GetUniqueAddressesFromTokens() ([]string, error) { var addresses []string ctx := context.Background() - err := p.DB.NewSelect().ColumnExpr("DISTINCT address").Scan(ctx, &addresses) + err := p.DB.NewSelect(). + Table("tokens"). + ColumnExpr("DISTINCT address").Scan(ctx, &addresses) if err != nil { return addresses, err }