Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PSL-1240] write unit tests, bug fixes #921

Merged
merged 3 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions p2p/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ require (
github.com/pastelnetwork/gonode/pastel v0.0.0-00010101000000-000000000000
github.com/stretchr/testify v1.9.0
github.com/tj/assert v0.0.3
github.com/vmihailenco/msgpack/v5 v5.4.1
go.uber.org/ratelimit v0.3.0
golang.org/x/sync v0.6.0
google.golang.org/grpc v1.62.0
Expand Down Expand Up @@ -49,7 +48,6 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/x-cray/logrus-prefixed-formatter v0.5.2 // indirect
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/sys v0.16.0 // indirect
Expand Down
4 changes: 0 additions & 4 deletions p2p/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,6 @@ github.com/tinylib/msgp v1.1.0/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDW
github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/tj/assert v0.0.3 h1:Df/BlaZ20mq6kuai7f5z2TvPFiwC3xaWJSDQNiIS3Rk=
github.com/tj/assert v0.0.3/go.mod h1:Ne6X72Q+TB1AteidzQncjw9PabbMp4PBMZ1k+vd1Pvk=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/willf/bitset v1.1.9/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/x-cray/logrus-prefixed-formatter v0.5.2 h1:00txxvfBM9muc0jiLIEAkAcIMJzfthRT6usrui8uGmg=
Expand Down
6 changes: 1 addition & 5 deletions p2p/kademlia/dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,7 @@ func (ts *testSuite) SetupSuite() {
ts.ctx, ts.cancel = context.WithCancel(context.Background())
ts.ctx = log.ContextWithPrefix(ts.ctx, "p2p-test")

// init the badger store
defaultReplicateInterval := time.Second * 3600
defaultRepublishInterval := time.Second * 3600 * 24

dbStore, err := sqlite.NewStore(ts.ctx, filepath.Join(workDir, "p2p"), defaultReplicateInterval, defaultRepublishInterval, nil)
dbStore, err := sqlite.NewStore(ts.ctx, filepath.Join(workDir, "p2p"), nil, nil)
if err != nil {
ts.T().Fatalf("new sqlite store: %v", err)
}
Expand Down
16 changes: 11 additions & 5 deletions p2p/kademlia/store/sqlite/meta_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import (
)

var (
commitLastAccessedInterval = 60 * time.Second
commitLastAccessedInterval = 90 * time.Second
migrationExecutionTicker = 12 * time.Hour
migrationMetaDB = "data001-migration-meta.sqlite3"
accessUpdateBufferSize = 100000
commitInsertsInterval = 90 * time.Second
commitInsertsInterval = 60 * time.Second
metaSyncBatchSize = 10000
lowSpaceThresholdGB = 50 // in GB
minKeysToMigrate = 100
Expand Down Expand Up @@ -258,7 +258,7 @@ func PostAccessUpdate(updates []string) {
select {
case updateChannel <- UpdateMessage{
Key: update,
LastAccessTime: time.Now(),
LastAccessTime: time.Now().UTC(),
}:
// Inserted
default:
Expand Down Expand Up @@ -291,7 +291,13 @@ func (d *MigrationMetaStore) commitLastAccessedUpdates(ctx context.Context) {
return
}

stmt, err := tx.Prepare("INSERT OR REPLACE INTO meta (key, last_accessed) VALUES (?, ?)")
stmt, err := tx.Prepare(`
INSERT INTO meta (key, last_accessed, access_count)
VALUES (?, ?, 1)
ON CONFLICT(key) DO
UPDATE SET
last_accessed = EXCLUDED.last_accessed,
access_count = access_count + 1`)
if err != nil {
tx.Rollback() // Roll back the transaction on error
log.WithContext(ctx).WithError(err).Error("Error preparing statement (commitLastAccessedUpdates)")
Expand All @@ -309,7 +315,7 @@ func (d *MigrationMetaStore) commitLastAccessedUpdates(ctx context.Context) {
if !ok {
return false
}
_, err := stmt.Exec(v, k)
_, err := stmt.Exec(k, v)
if err != nil {
log.WithContext(ctx).WithError(err).WithField("key", key).Error("Error executing statement (commitLastAccessedUpdates)")
return true // continue
Expand Down
14 changes: 6 additions & 8 deletions p2p/kademlia/store/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ type Worker struct {

// Store is the main struct
type Store struct {
db *sqlx.DB
worker *Worker
cloud cloud.Storage
db *sqlx.DB
worker *Worker
cloud cloud.Storage
migrationStore *MigrationMetaStore
}

// Record is a data record
Expand All @@ -67,7 +68,7 @@ type Record struct {
}

// NewStore returns a new store
func NewStore(ctx context.Context, dataDir string, _ time.Duration, _ time.Duration, cloud cloud.Storage) (*Store, error) {
func NewStore(ctx context.Context, dataDir string, cloud cloud.Storage, mst *MigrationMetaStore) (*Store, error) {
worker := &Worker{
JobQueue: make(chan Job, 500),
quit: make(chan bool),
Expand Down Expand Up @@ -165,10 +166,7 @@ func NewStore(ctx context.Context, dataDir string, _ time.Duration, _ time.Durat
go s.startCheckpointWorker(ctx)

if s.IsCloudBackupOn() {
_, err = NewMigrationMetaStore(ctx, dataDir, cloud)
if err != nil {
return nil, fmt.Errorf("cannot create meta store: %w", err)
}
s.migrationStore = mst
}

return s, nil
Expand Down
130 changes: 129 additions & 1 deletion p2p/kademlia/store/sqlite/sqlite_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
//go:build !race
// +build !race

package sqlite

import (
"context"
"crypto/rand"
"encoding/hex"

"io/ioutil"
"os"
"path/filepath"
"testing"
"time"

"github.com/pastelnetwork/gonode/common/utils"
"github.com/pastelnetwork/gonode/p2p/kademlia/store/cloud.go"
"github.com/stretchr/testify/assert"
)

Expand All @@ -21,7 +27,7 @@ func TestStoreAndRetrieve(t *testing.T) {
defer os.RemoveAll(tempDir)

dbPath := filepath.Join(tempDir, "test.db")
store, err := NewStore(context.Background(), dbPath, time.Minute, time.Minute, nil)
store, err := NewStore(context.Background(), dbPath, nil, nil)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}
Expand Down Expand Up @@ -56,3 +62,125 @@ func generateRandomBytes(n int) []byte {
_, _ = rand.Read(b)
return b
}

func TestStore(t *testing.T) {
cloud := cloud.NewRcloneStorage("test", "test")
ctx, cancel := context.WithCancel(context.Background())

mst, err := NewMigrationMetaStore(ctx, ".", cloud)

mst.updateTicker.Stop()
mst.insertTicker.Stop()

// override the tickers for testing
mst.updateTicker = time.NewTicker(2 * time.Second)
mst.insertTicker = time.NewTicker(2 * time.Second)

if err != nil {
t.Fatalf("failed to create store: %v", err)
}
store, err := NewStore(ctx, ".", cloud, mst)
if err != nil {
t.Fatalf("failed to create store: %v", err)
}

r1 := []byte("test-record-1")
r2 := []byte("test-record-2")
r3 := []byte("test-record-3")

hashed, err := utils.Sha3256hash(r1)
if err != nil {
t.Fatalf("failed to hash record: %v", err)
}

r1Key := hex.EncodeToString(hashed)

hashed, err = utils.Sha3256hash(r2)
if err != nil {
t.Fatalf("failed to hash record: %v", err)
}

r2Key := hex.EncodeToString(hashed)

hashed, err = utils.Sha3256hash(r3)
if err != nil {
t.Fatalf("failed to hash record: %v", err)
}

r3Key := hex.EncodeToString(hashed)

err = store.storeBatchRecord([][]byte{r1, r2, r3}, 0, true)
if err != nil {
t.Fatalf("failed to store record: %v", err)
}

time.Sleep(3 * time.Second)

type record struct {
Key string `db:"key"`
LastAcccessed time.Time `db:"last_accessed"`
AccessCount int `db:"access_count"`
DataSize int `db:"data_size"`
}

var keys []record
err = store.migrationStore.db.Select(&keys, "SELECT key,last_accessed,access_count,data_size FROM meta where key in (?, ?, ?)", r1Key, r2Key, r3Key)
if err != nil {
t.Fatalf("failed to retrieve record: %v", err)
}

if len(keys) != 3 {
t.Fatalf("expected 3 records, got %d", len(keys))
}

time.Sleep(1 * time.Second)

_, _, err = store.RetrieveBatchValues(context.Background(), []string{r1Key, r2Key, r3Key}, true)
if err != nil {
t.Fatalf("failed to retrieve record: %v", err)
}

time.Sleep(3 * time.Second)

var nkeys []record
err = store.migrationStore.db.Select(&nkeys, "SELECT key,last_accessed,access_count,data_size FROM meta where key in (?, ?, ?)", r1Key, r2Key, r3Key)
if err != nil {
t.Fatalf("failed to retrieve record: %v", err)
}

if len(nkeys) != 3 {
t.Fatalf("expected 3 records, got %d", len(nkeys))
}

for _, key := range nkeys {
for _, k := range keys {
if key.Key == k.Key {

if !key.LastAcccessed.After(k.LastAcccessed) {
t.Fatalf("last accessed time not updated")
}

if key.AccessCount != k.AccessCount+1 {
t.Fatalf("access count not updated")
}

if key.DataSize != len(r1) {
t.Fatalf("data size not updated")
}
}
}
}

cancel() // Signal all contexts to finish
mst.updateTicker.Stop()
mst.insertTicker.Stop()

// Allow some time for goroutines to exit
time.Sleep(100 * time.Millisecond)

os.Remove("data001.sqlite3")
os.Remove("data001-migration-meta.sqlite3")
os.Remove("data001.sqlite3-shm")
os.Remove("data001.sqlite3-wal")

}
7 changes: 6 additions & 1 deletion p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,12 @@ func (s *p2p) configure(ctx context.Context) error {

// New returns a new p2p instance.
func New(ctx context.Context, config *Config, pastelClient pastel.Client, secInfo *alts.SecInfo, rqstore rqstore.Store, cloud cloud.Storage) (P2P, error) {
store, err := sqlite.NewStore(ctx, config.DataDir, defaultReplicateInterval, defaultRepublishInterval, cloud)
mst, err := sqlite.NewMigrationMetaStore(ctx, config.DataDir, cloud)
if err != nil {
return nil, fmt.Errorf("cannot create meta store: %w", err)
}

store, err := sqlite.NewStore(ctx, config.DataDir, cloud, mst)
if err != nil {
return nil, errors.Errorf("new kademlia store: %w", err)
}
Expand Down
2 changes: 0 additions & 2 deletions supernode/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ require (
github.com/stretchr/objx v0.5.2 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/urfave/cli/v2 v2.25.7 // indirect
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/x-cray/logrus-prefixed-formatter v0.5.2 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.uber.org/ratelimit v0.3.0 // indirect
Expand Down
4 changes: 0 additions & 4 deletions supernode/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,6 @@ github.com/tj/assert v0.0.3 h1:Df/BlaZ20mq6kuai7f5z2TvPFiwC3xaWJSDQNiIS3Rk=
github.com/tj/assert v0.0.3/go.mod h1:Ne6X72Q+TB1AteidzQncjw9PabbMp4PBMZ1k+vd1Pvk=
github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs=
github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/x-cray/logrus-prefixed-formatter v0.5.2 h1:00txxvfBM9muc0jiLIEAkAcIMJzfthRT6usrui8uGmg=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
Expand Down
Loading