Skip to content

Commit

Permalink
get timestamps method
Browse files Browse the repository at this point in the history
  • Loading branch information
AR1011 committed Jan 20, 2024
1 parent cac0659 commit 560f16d
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 47 deletions.
16 changes: 4 additions & 12 deletions api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package api

import (
"encoding/json"
"fmt"
"log/slog"

"github.com/autoapev1/indexer/auth"
Expand Down Expand Up @@ -150,21 +149,14 @@ func (s *Server) getChains(r *JRPCRequest) *types.GetChainsResponse {
chains := []types.Chain{}
for _, c := range s.config.Chains {
tc := types.Chain{
ChainID: c.ChainID,
Name: c.Name,
ShortName: c.ShortName,
ExplorerURL: c.ExplorerURL,
RouterV2: c.RouterV2Address,
FactoryV2: c.FactoryV2Address,
RouterV3: c.RouterV3Address,
FactoryV3: c.FactoryV3Address,
BlockDuration: int64(c.BlockDuration),
ChainID: c.ChainID,
Name: c.Name,
ShortName: c.ShortName,
ExplorerURL: c.ExplorerURL,
}
chains = append(chains, tc)
}

fmt.Println(len(s.config.Chains))

return &types.GetChainsResponse{
ID: r.ID,
Method: r.Method,
Expand Down
73 changes: 73 additions & 0 deletions cmd/dev/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package main

import (
"fmt"

"github.com/autoapev1/indexer/config"
"github.com/autoapev1/indexer/eth"
"github.com/autoapev1/indexer/types"
)

func main() {
config.Parse("config.toml")
// conf := config.Get().Storage.Postgres
// conf.Name = "ETH"

// db := storage.NewPostgresDB(conf).WithChainID(1)
// err := db.Init()
// if err != nil {
// panic(err)
// }

chain := types.Chain{
Name: "Ethereum",
Http: "http://localhost:7545",
ShortName: "ETH",
ChainID: 1,
}

eth := eth.NewNetwork(chain, config.Get())

if err := eth.Init(); err != nil {
panic(err)
}

bts, err := eth.GetBlockTimestamps(0, 10)
if err != nil {
panic(err)
}

for _, v := range bts {
fmt.Printf("Block %d timestamp %d\n", v.Block, v.Timestamp)
}

// pairAddrs, err := db.GetUniqueAddressesFromPairs()
// if err != nil {
// panic(err)
// }

// tokenAddrs, err := db.GetUniqueAddressesFromTokens()
// if err != nil {
// panic(err)
// }

// uniqueAddrs := make(map[string]struct{})
// for _, v := range pairAddrs {
// uniqueAddrs[v] = struct{}{}
// }

// for _, v := range tokenAddrs {
// uniqueAddrs[v] = struct{}{}
// }

// // find addresses in pairs that are not in tokens
// for _, v := range pairAddrs {
// if _, ok := uniqueAddrs[v]; !ok {
// fmt.Printf("Pair address %s not found in tokens\n", v)
// }
// }

// fmt.Printf("Found %d unique addresses\n", len(uniqueAddrs))
// fmt.Printf("Found %d unique pair addresses\n", len(pairAddrs))
// fmt.Printf("Found %d unique token addresses\n", len(tokenAddrs))
}
10 changes: 0 additions & 10 deletions config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,14 @@ chainID = 1
name = "Ethereum"
shortName = "ETH"
explorerURL = "https://etherscan.io"
routerV2Address = "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"
factoryV2Address = "0xc0a47dFe034B400B47bDaD5FecDa2621de6c4d95"
routerV3Address = "0xE592427A0AEce92De3Edee1F18E0157C05861564"
factoryV3Address = "0x1F98431c8aD98523631AE4a59f267346ea31F984"
rpcURL = "http://localhost:8545"
blockDuration = 12

[[chains]]
chainID = 56
name = "Binance Smart Chain"
shortName = "BSC"
explorerURL = "https://bscscan.com"
routerV2Address = "0x10ED43C718714eb63d5aA57B78B54704E256024E"
factoryV2Address = "0xcA143Ce32Fe78f1f7019d7d551a6402fC5350c73"
routerV3Address = "0xE592427A0AEce92De3Edee1F18E0157C05861564"
factoryV3Address = "0x6725F303b657a9451d8BA641348b6761A6CC7a17"
rpcURL = "http://localhost:8546"
blockDuration = 3

[api]
host = "localhost"
Expand Down
25 changes: 5 additions & 20 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,14 @@ chainID = 1
name = "Ethereum"
shortName = "ETH"
explorerURL = "https://etherscan.io"
routerV2Address = "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"
factoryV2Address = "0xc0a47dFe034B400B47bDaD5FecDa2621de6c4d95"
routerV3Address = "0xE592427A0AEce92De3Edee1F18E0157C05861564"
factoryV3Address = "0x1F98431c8aD98523631AE4a59f267346ea31F984"
rpcURL = "http://localhost:8545"
blockDuration = 12
[[chains]]
chainID = 56
name = "Binance Smart Chain"
shortName = "BSC"
explorerURL = "https://bscscan.com"
routerV2Address = "0x10ED43C718714eb63d5aA57B78B54704E256024E"
factoryV2Address = "0xcA143Ce32Fe78f1f7019d7d551a6402fC5350c73"
routerV3Address = "0xE592427A0AEce92De3Edee1F18E0157C05861564"
factoryV3Address = "0x6725F303b657a9451d8BA641348b6761A6CC7a17"
rpcURL = "http://localhost:8546"
blockDuration = 3
[api]
host = "localhost"
Expand Down Expand Up @@ -94,16 +84,11 @@ type Config struct {
}

type ChainConfig struct {
ChainID int
Name string
ShortName string
ExplorerURL string
RouterV2Address string
FactoryV2Address string
RouterV3Address string
FactoryV3Address string
RPCURL string
BlockDuration int
ChainID int
Name string
ShortName string
ExplorerURL string
RPCURL string
}

type SyncConfig struct {
Expand Down
7 changes: 5 additions & 2 deletions eth/eth.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package eth

import (
"github.com/autoapev1/indexer/config"
"github.com/autoapev1/indexer/types"
"github.com/ethereum/go-ethereum/ethclient"
)

type Network struct {
Chain types.Chain
config config.Config
Client *ethclient.Client
}

func NewNetwork(c types.Chain) *Network {
func NewNetwork(c types.Chain, conf config.Config) *Network {
return &Network{
Chain: c,
Chain: c,
config: conf,
}
}

Expand Down
114 changes: 114 additions & 0 deletions eth/methods.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package eth

import (
"context"
"log/slog"
"sync"

"github.com/autoapev1/indexer/types"
etypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
)

func (n *Network) GetBlockTimestamps(from int64, to int64) ([]*types.BlockTimestamp, error) {
var blockTimestamps []*types.BlockTimestamp

batchSize := n.config.Sync.BlockTimestamps.BatchSize
concurrency := n.config.Sync.BlockTimestamps.BatchSize

if batchSize <= 0 {
batchSize = 100
}

if concurrency <= 0 {
concurrency = 2
}

batches := n.makeBlockTimestampBatches(from, to, int64(batchSize))

workers := make(chan int, concurrency)
var wg sync.WaitGroup
counter := 0
for {
if counter >= len(batches) {
break
}

workers <- 1
wg.Add(1)
batch := batches[counter]
counter++

go func(batch []rpc.BatchElem) {
defer func() {
<-workers
wg.Done()
}()

bts, err := n.getBlockTimestampBatch(batch)
if err != nil {
slog.Error("getBlockTimestampBatch", "err", err)
return
}

blockTimestamps = append(blockTimestamps, bts...)
}(batch)
}

wg.Wait()

return blockTimestamps, nil
}

func (n *Network) makeBlockTimestampBatches(from int64, to int64, batchSize int64) [][]rpc.BatchElem {
batchCount := (to - from) / batchSize
if (to-from)%batchSize != 0 {
batchCount++
}

batches := make([][]rpc.BatchElem, 0, batchCount)

for i := from; i <= to; i += batchSize {
end := i + batchSize
if end > to+1 {
end = to + 1
}

batch := make([]rpc.BatchElem, 0, end-i)
for j := i; j < end; j++ {
batch = append(batch, rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{j, false},
Result: new(etypes.Header),
})
}
batches = append(batches, batch)
}

return batches
}

func (n *Network) getBlockTimestampBatch(batch []rpc.BatchElem) ([]*types.BlockTimestamp, error) {
var blockTimestamps []*types.BlockTimestamp

ctx := context.Background()
if err := n.Client.Client().BatchCallContext(ctx, batch); err != nil {
return nil, err
}

for _, b := range batch {
if b.Error != nil {
return nil, b.Error
}
}

for _, b := range batch {
header := b.Result.(*etypes.Header)
blockTimestamps = append(blockTimestamps, &types.BlockTimestamp{
Block: header.Number.Int64(),
Timestamp: int64(header.Time),
})
}

return blockTimestamps, nil
}
12 changes: 9 additions & 3 deletions storage/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,12 +450,16 @@ func (p *PostgresStore) GetUniqueAddressesFromPairs() ([]string, error) {
// Query to get distinct addresses from both token0 and token1
var addresses []string
ctx := context.Background()
err := p.DB.NewSelect().ColumnExpr("DISTINCT token0_address").Scan(ctx, &addresses)
err := p.DB.NewSelect().
Table("pairs").
ColumnExpr("DISTINCT token0_address").Scan(ctx, &addresses)
if err != nil {
return addresses, err
}

err = p.DB.NewSelect().ColumnExpr("DISTINCT token1_address").Scan(ctx, &addresses)
err = p.DB.NewSelect().
Table("pairs").
ColumnExpr("DISTINCT token1_address").Scan(ctx, &addresses)
if err != nil {
return addresses, err
}
Expand All @@ -477,7 +481,9 @@ func (p *PostgresStore) GetUniqueAddressesFromPairs() ([]string, error) {
func (p *PostgresStore) GetUniqueAddressesFromTokens() ([]string, error) {
var addresses []string
ctx := context.Background()
err := p.DB.NewSelect().ColumnExpr("DISTINCT address").Scan(ctx, &addresses)
err := p.DB.NewSelect().
Table("tokens").
ColumnExpr("DISTINCT address").Scan(ctx, &addresses)
if err != nil {
return addresses, err
}
Expand Down

0 comments on commit 560f16d

Please sign in to comment.