Skip to content

Commit

Permalink
getPairs
Browse files Browse the repository at this point in the history
  • Loading branch information
AR1011 committed Jan 23, 2024
1 parent 0f7f59b commit 7444cd1
Show file tree
Hide file tree
Showing 11 changed files with 359 additions and 125 deletions.
52 changes: 35 additions & 17 deletions cmd/dev/main.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
package main

import (
"context"
"fmt"
"time"

"github.com/autoapev1/indexer/config"
"github.com/autoapev1/indexer/eth"
"github.com/autoapev1/indexer/storage"
"github.com/autoapev1/indexer/types"
)

func main() {
config.Parse("config.toml")
// conf := config.Get().Storage.Postgres
// conf.Name = "ETH"
conf := config.Get().Storage.Postgres
conf.Name = "ETH"

// db := storage.NewPostgresDB(conf).WithChainID(1)
// err := db.Init()
// if err != nil {
// panic(err)
// }
db := storage.NewPostgresDB(conf).WithChainID(1)
err := db.Init()
if err != nil {
panic(err)
}

chain := types.Chain{
Name: "Ethereum",
Http: "http://localhost:7545",
Http: "https://rpc.ankr.com/eth/1f648783cc10d97d45439523ee0b0348d124bd6eaf13fa0664e1d15063e16679",
ShortName: "ETH",
ChainID: 1,
}
Expand All @@ -32,15 +35,7 @@ func main() {
panic(err)
}

tokens, err := eth.GetTokenInfo([]string{"0xE0B7927c4aF23765Cb51314A0E0521A9645F0E2A"})
if err != nil {
panic(err)
}

for _, v := range tokens {
fmt.Printf("Token %v\n", v)
}

getPairs(eth)
// bts, err := eth.GetBlockTimestamps(0, 10)
// if err != nil {
// panic(err)
Expand Down Expand Up @@ -80,3 +75,26 @@ func main() {
// fmt.Printf("Found %d unique pair addresses\n", len(pairAddrs))
// fmt.Printf("Found %d unique token addresses\n", len(tokenAddrs))
}

func getTokens(n *eth.Network) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
tokens, err := n.GetTokenInfo(ctx, []string{"0xE0B7927c4aF23765Cb51314A0E0521A9645F0E2A"})
if err != nil {
panic(err)
}

for _, v := range tokens {
fmt.Printf("Token %v\n", v)
}
}

func getPairs(n *eth.Network) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
_, err := n.GetPairs(ctx, 18000010, 18000000)
if err != nil {
panic(err)
}

}
115 changes: 115 additions & 0 deletions eth/blocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package eth

import (
"context"
"log/slog"
"sync"

"github.com/autoapev1/indexer/types"
etypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
)

func (n *Network) GetBlockTimestamps(from int64, to int64) ([]*types.BlockTimestamp, error) {

blockTimestamps := make([]*types.BlockTimestamp, 0, to-from)

batchSize := n.config.Sync.BlockTimestamps.BatchSize
concurrency := n.config.Sync.BlockTimestamps.BatchConcurrency

if batchSize <= 0 {
batchSize = 100
}

if concurrency <= 0 {
concurrency = 2
}

batches := n.makeBlockTimestampBatches(from, to, int64(batchSize))

workers := make(chan int, concurrency)
var wg sync.WaitGroup
counter := 0
for {
if counter >= len(batches) {
break
}

workers <- 1
wg.Add(1)
batch := batches[counter]
counter++

go func(batch []rpc.BatchElem) {
defer func() {
<-workers
wg.Done()
}()

bts, err := n.getBlockTimestampBatch(batch)
if err != nil {
slog.Error("getBlockTimestampBatch", "err", err)
return
}

blockTimestamps = append(blockTimestamps, bts...)
}(batch)
}

wg.Wait()

return blockTimestamps, nil
}

func (n *Network) makeBlockTimestampBatches(from int64, to int64, batchSize int64) [][]rpc.BatchElem {
batchCount := (to - from) / batchSize
if (to-from)%batchSize != 0 {
batchCount++
}

batches := make([][]rpc.BatchElem, 0, batchCount)

for i := from; i <= to; i += batchSize {
end := i + batchSize
if end > to+1 {
end = to + 1
}

batch := make([]rpc.BatchElem, 0, end-i)
for j := i; j < end; j++ {
batch = append(batch, rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{j, false},
Result: new(etypes.Header),
})
}
batches = append(batches, batch)
}

return batches
}

func (n *Network) getBlockTimestampBatch(batch []rpc.BatchElem) ([]*types.BlockTimestamp, error) {
var blockTimestamps []*types.BlockTimestamp

ctx := context.Background()
if err := n.Client.Client().BatchCallContext(ctx, batch); err != nil {
return nil, err
}

for _, b := range batch {
if b.Error != nil {
return nil, b.Error
}
}

for _, b := range batch {
header := b.Result.(*etypes.Header)
blockTimestamps = append(blockTimestamps, &types.BlockTimestamp{
Block: header.Number.Int64(),
Timestamp: int64(header.Time),
})
}

return blockTimestamps, nil
}
14 changes: 13 additions & 1 deletion eth/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package eth

import (
"fmt"
"log/slog"

"github.com/autoapev1/indexer/config"
"github.com/autoapev1/indexer/types"
"github.com/chenzhijie/go-web3"
"github.com/ethereum/go-ethereum/ethclient"
"golang.org/x/crypto/sha3"
)
Expand All @@ -13,6 +15,7 @@ type Network struct {
Chain types.Chain
config config.Config
Client *ethclient.Client
Web3 *web3.Web3
}

func NewNetwork(c types.Chain, conf config.Config) *Network {
Expand All @@ -27,9 +30,18 @@ func (n *Network) Init() error {

n.Client, err = ethclient.Dial(n.Chain.Http)
if err != nil {
panic(err)
slog.Error("Error initilizing eth client", "error", err)
return err
}

w3, err := web3.NewWeb3(n.Chain.Http)
if err != nil {
slog.Error("Error initilizing web3 client", "error", err)
return err
}

n.Web3 = w3

return nil
}

Expand Down
Loading

0 comments on commit 7444cd1

Please sign in to comment.