Skip to content

Commit

Permalink
almost
Browse files Browse the repository at this point in the history
  • Loading branch information
AR1011 committed Jan 24, 2024
1 parent 13d9d12 commit ca95976
Show file tree
Hide file tree
Showing 13 changed files with 356 additions and 46 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ build:
@go build -o ./bin/ingest-bsc ./cmd/ingest/bsc/main.go

run: build
@./bin/api
@./bin/api --config config.toml

ingest-eth: build
@./bin/ingest-eth
@./bin/ingest-eth --config config.toml

ingest-bsc: build
@./bin/ingest-bsc
@./bin/ingest-bsc --config config.toml

postgres-up:
docker compose -f ./docker/postgres.yml up -d --remove-orphans
Expand Down
16 changes: 8 additions & 8 deletions adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,13 @@ func ReadTokens(loc string) ([]*types.Token, error) {
address := record[0]
if _, exists := tokenMap[address]; !exists {
tokenMap[address] = &types.Token{
Address: address,
Name: record[1],
Symbol: record[2],
Decimals: uint8(decimals),
Creator: record[4],
CreatedAtBlock: createdAtBlock,
ChainID: 0, // Set ChainID to 0 for now
Address: address,
Name: record[1],
Symbol: record[2],
Decimals: uint8(decimals),
Creator: record[4],
CreatedAt: createdAtBlock,
ChainID: 0, // Set ChainID to 0 for now
}
}
}
Expand All @@ -135,7 +135,7 @@ func ReadTokens(loc string) ([]*types.Token, error) {

// sort by created at block asc
sort.Slice(tokens, func(i, j int) bool {
return tokens[i].CreatedAtBlock < tokens[j].CreatedAtBlock
return tokens[i].CreatedAt < tokens[j].CreatedAt
})

return tokens, nil
Expand Down
8 changes: 7 additions & 1 deletion cmd/dev/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,14 @@ func main() {
if err := eth.Init(); err != nil {
panic(err)
}
h, err := db.GetHeights()
if err != nil {
panic(err)
}

fmt.Printf("Heights %v\n", h)

getPairs(eth)
//getPairs(eth)
// bts, err := eth.GetBlockTimestamps(0, 10)
// if err != nil {
// panic(err)
Expand Down
20 changes: 18 additions & 2 deletions cmd/ingest/bsc/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package main

import (
"flag"
"fmt"
"log"
"log/slog"
"os"
"time"

"github.com/autoapev1/indexer/adapter"
Expand All @@ -10,11 +14,23 @@ import (
)

func main() {
var (
configFile string
)
flagSet := flag.NewFlagSet("ingest-eth", flag.ExitOnError)
flagSet.StringVar(&configFile, "config", "config.toml", "")
flagSet.Parse(os.Args[1:])

slog.Info("config", "configFile", configFile)
err := config.Parse(configFile)
if err != nil {
log.Fatal(err)
}
conf := config.Get().Storage.Postgres
conf.Name = "BSC"

db := storage.NewPostgresDB(conf).WithChainID(56)
err := db.Init()
err = db.Init()
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -55,7 +71,7 @@ func main() {
}

func IngestPairs(pg *storage.PostgresStore) error {
data, err := adapter.ReadPairs("./adapter/data/bsc.pairs.csv")
data, err := adapter.ReadPairs("./adapter/data/bsc.pairs2.csv")
if err != nil {
return err
}
Expand Down
19 changes: 18 additions & 1 deletion cmd/ingest/eth/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package main

import (
"flag"
"fmt"
"log"
"log/slog"
"os"
"time"

"github.com/autoapev1/indexer/adapter"
Expand All @@ -10,11 +14,24 @@ import (
)

func main() {
var (
configFile string
)
flagSet := flag.NewFlagSet("ingest-eth", flag.ExitOnError)
flagSet.StringVar(&configFile, "config", "config.toml", "")
flagSet.Parse(os.Args[1:])

slog.Info("config", "configFile", configFile)
err := config.Parse(configFile)
if err != nil {
log.Fatal(err)
}

conf := config.Get().Storage.Postgres
conf.Name = "ETH"

db := storage.NewPostgresDB(conf).WithChainID(1)
err := db.Init()
err = db.Init()
if err != nil {
panic(err)
}
Expand Down
5 changes: 5 additions & 0 deletions eth/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Network struct {
config config.Config
Client *ethclient.Client
Web3 *web3.Web3
ready bool
}

func NewNetwork(c types.Chain, conf config.Config) *Network {
Expand All @@ -25,6 +26,10 @@ func NewNetwork(c types.Chain, conf config.Config) *Network {
}
}

func (n *Network) Ready() bool {
return n.ready
}

func (n *Network) Init() error {
var err error

Expand Down
7 changes: 3 additions & 4 deletions eth/method_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/ethereum/go-ethereum/rpc"
)

func (n *Network) GetBlockTimestamps(from int64, to int64) ([]*types.BlockTimestamp, error) {
func (n *Network) GetBlockTimestamps(ctx context.Context, from int64, to int64) ([]*types.BlockTimestamp, error) {

blockTimestamps := make([]*types.BlockTimestamp, 0, to-from)

Expand Down Expand Up @@ -46,7 +46,7 @@ func (n *Network) GetBlockTimestamps(from int64, to int64) ([]*types.BlockTimest
wg.Done()
}()

bts, err := n.getBlockTimestampBatch(batch)
bts, err := n.getBlockTimestampBatch(ctx, batch)
if err != nil {
slog.Error("getBlockTimestampBatch", "err", err)
return
Expand Down Expand Up @@ -89,10 +89,9 @@ func (n *Network) makeBlockTimestampBatches(from int64, to int64, batchSize int6
return batches
}

func (n *Network) getBlockTimestampBatch(batch []rpc.BatchElem) ([]*types.BlockTimestamp, error) {
func (n *Network) getBlockTimestampBatch(ctx context.Context, batch []rpc.BatchElem) ([]*types.BlockTimestamp, error) {
var blockTimestamps []*types.BlockTimestamp

ctx := context.Background()
if err := n.Client.Client().BatchCallContext(ctx, batch); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion eth/method_pairs.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (n *Network) getPairs(ctx context.Context, decoder abi.ABI, signature commo
Token1Address: common.HexToAddress((l.Topics[2].String())).String(),
Fee: l.Topics[3].Big().Int64(),
PoolType: 3,
PoolAddress: "unknown",
PoolAddress: "0x0000000000000000000000000000000000000000",
TickSpacing: 0,
}

Expand Down
4 changes: 2 additions & 2 deletions eth/method_tokens.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (n *Network) getStage1TokenInfoBatch(ctx context.Context, batch []rpc.Batch

token.Creator = creator.Creator
if token.Creator == "" {
token.Creator = "unknown"
token.Creator = "0x0000000000000000000000000000000000000000"
}
token.CreationHash = creator.Hash
if token.CreationHash == "" {
Expand Down Expand Up @@ -284,7 +284,7 @@ func (n *Network) getStage2TokenInfoBatch(ctx context.Context, batch []rpc.Batch
slog.Error("getStage2TokenInfoBatch", "err", err)
blockNumberInt = 0
}
tokens[i].CreatedAtBlock = blockNumberInt
tokens[i].CreatedAt = blockNumberInt
}

return nil
Expand Down
90 changes: 84 additions & 6 deletions storage/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type PostgresStore struct {
DB *bun.DB
ChainID int64
debug bool
ready bool
}

func NewPostgresDB(conf config.PostgresConfig) *PostgresStore {
Expand All @@ -46,6 +47,10 @@ func (p *PostgresStore) WithDebug() *PostgresStore {
return p
}

func (p *PostgresStore) Ready() bool {
return p.ready
}

func (p *PostgresStore) Init() error {
st := time.Now()
err := p.CreateTables()
Expand Down Expand Up @@ -160,7 +165,7 @@ func (p *PostgresStore) BulkInsertBlockTimestamp(blockTimestamps []*types.BlockT
batchSize := 10000

for i := 0; i < len(blockTimestamps); i += batchSize {
fmt.Printf("Inserting blocktimestamps %d to %d\n", i, i+batchSize)
fmt.Printf("inserting into block_timestamps \tfrom:%d \tto:%d\n", i, i+batchSize)
end := i + batchSize
if end > len(blockTimestamps) {
end = len(blockTimestamps)
Expand All @@ -182,10 +187,10 @@ func (p *PostgresStore) GetBlockTimestamps(to int64, from int64) ([]*types.Block
var blockTimestamps []*types.BlockTimestamp
ctx := context.Background()

err := p.DB.NewSelect().Model(&blockTimestamps).
err := p.DB.NewSelect().
Table("block_timestamps").
Where("block >= ?", from).
Where("block <= ?", to).
Limit(10000).
Scan(ctx)

if err != nil {
Expand All @@ -198,7 +203,10 @@ func (p *PostgresStore) GetBlockTimestamps(to int64, from int64) ([]*types.Block
func (p *PostgresStore) GetHight() (int64, error) {
var block int64
ctx := context.Background()
err := p.DB.NewSelect().Model(&types.BlockTimestamp{}).ColumnExpr("MAX(block)").Scan(ctx, &block)
err := p.DB.NewSelect().
Table("block_timestamps").
ColumnExpr("MAX(block)").
Scan(ctx, &block)
if err != nil {
return block, err
}
Expand All @@ -220,7 +228,11 @@ func (p *PostgresStore) BulkInsertTokenInfo(tokenInfos []*types.Token) error {
batchSize := 10000

for i := 0; i < len(tokenInfos); i += batchSize {
fmt.Printf("Inserting tokens %d to %d\n", i, i+batchSize)
tokenInfos[i].Lower()
}

for i := 0; i < len(tokenInfos); i += batchSize {
fmt.Printf("inserting into tokens \tfrom:%d \tto:%d\n", i, i+batchSize)
end := i + batchSize
if end > len(tokenInfos) {
end = len(tokenInfos)
Expand Down Expand Up @@ -341,7 +353,11 @@ func (p *PostgresStore) BulkInsertPairInfo(pairInfos []*types.Pair) error {
batchSize := 100000

for i := 0; i < len(pairInfos); i += batchSize {
fmt.Printf("Inserting pairs %d to %d\n", i, i+batchSize)
pairInfos[i].Lower()
}

for i := 0; i < len(pairInfos); i += batchSize {
fmt.Printf("inserting into pairs \tfrom:%d \tto:%d\n", i, i+batchSize)
end := i + batchSize
if end > len(pairInfos) {
end = len(pairInfos)
Expand Down Expand Up @@ -497,6 +513,68 @@ func (p *PostgresStore) GetUniqueAddressesFromTokens() ([]string, error) {
return addresses, nil
}

func (p *PostgresStore) GetPairsWithoutTokenInfo() ([]string, error) {
pairs, err := p.GetUniqueAddressesFromPairs()
if err != nil {
return nil, err
}

tokenAddresses, err := p.GetUniqueAddressesFromTokens()
if err != nil {
return nil, err
}

tokens := make(map[string]struct{})
for _, token := range tokenAddresses {
tokens[strings.ToLower(token)] = struct{}{}
}

var missing []string
for _, pair := range pairs {
if _, exists := tokens[strings.ToLower(pair)]; !exists {
missing = append(missing, pair)
}
}

return missing, nil
}

func (p *PostgresStore) GetHeights() (*types.Heights, error) {
heights := &types.Heights{
Blocks: 0,
Tokens: 0,
Pairs: 0,
}

ctx := context.Background()

err := p.DB.NewSelect().
ColumnExpr("MAX(block)").
Model(&types.BlockTimestamp{}).
Scan(ctx, &heights.Blocks)
if err != nil {
return heights, err
}

err = p.DB.NewSelect().
ColumnExpr("MAX(created_at)").
Model(&types.Token{}).
Scan(ctx, &heights.Tokens)
if err != nil {
return heights, err
}

err = p.DB.NewSelect().
ColumnExpr("MAX(created_at)").
Model(&types.Pair{}).
Scan(ctx, &heights.Pairs)
if err != nil {
return heights, err
}

return heights, nil
}

var _ Store = &PostgresStore{}

func fuzWrap(s *string) string {
Expand Down
4 changes: 4 additions & 0 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package storage
import "github.com/autoapev1/indexer/types"

type Store interface {
Init() error
Ready() bool
GetChainID() int64
GetHight() (int64, error)

Expand All @@ -27,4 +29,6 @@ type Store interface {
// util
GetUniqueAddressesFromPairs() ([]string, error)
GetUniqueAddressesFromTokens() ([]string, error)
GetPairsWithoutTokenInfo() ([]string, error)
GetHeights() (*types.Heights, error)
}
Loading

0 comments on commit ca95976

Please sign in to comment.