Skip to content

Commit

Permalink
Merge pull request #927 from pastelnetwork/PSL-1257_enablePragmasOnMe…
Browse files Browse the repository at this point in the history
…taDB

[PSL-1257] enable pragma to handle db locking on meta-migration db
  • Loading branch information
j-rafique authored Aug 19, 2024
2 parents 494ae2c + c2a6f56 commit f8a1218
Showing 1 changed file with 53 additions and 10 deletions.
63 changes: 53 additions & 10 deletions p2p/kademlia/store/sqlite/meta_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var (
migrationMetaDB = "data001-migration-meta.sqlite3"
accessUpdateBufferSize = 100000
commitInsertsInterval = 10 * time.Second
metaSyncBatchSize = 10000
metaSyncBatchSize = 5000
lowSpaceThresholdGB = 50 // in GB
minKeysToMigrate = 100

Expand Down Expand Up @@ -78,6 +78,10 @@ func NewMigrationMetaStore(ctx context.Context, dataDir string, cloud cloud.Stor
db.SetMaxOpenConns(20)
db.SetMaxIdleConns(10)

if err := setPragmas(db); err != nil {
log.WithContext(ctx).WithError(err).Error("error executing pragmas")
}

p2pDataStore, err := connectP2PDataStore(dataDir)
if err != nil {
log.WithContext(ctx).WithError(err).Error("error connecting p2p store from meta-migration store")
Expand Down Expand Up @@ -121,6 +125,33 @@ func NewMigrationMetaStore(ctx context.Context, dataDir string, cloud cloud.Stor
return handler, nil
}

func setPragmas(db *sqlx.DB) error {
// Set journal mode to WAL
_, err := db.Exec("PRAGMA journal_mode=WAL;")
if err != nil {
return err
}
// Set synchronous to NORMAL
_, err = db.Exec("PRAGMA synchronous=NORMAL;")
if err != nil {
return err
}

// Set cache size
_, err = db.Exec("PRAGMA cache_size=-262144;")
if err != nil {
return err
}

// Set busy timeout
_, err = db.Exec("PRAGMA busy_timeout=5000;")
if err != nil {
return err
}

return nil
}

func (d *MigrationMetaStore) migrateMeta() error {
query := `
CREATE TABLE IF NOT EXISTS meta (
Expand Down Expand Up @@ -205,17 +236,23 @@ func (d *MigrationMetaStore) syncMetaWithData(ctx context.Context) error {
return err
}

var batchProcessed bool

tx, err := d.db.Beginx()
if err != nil {
rows.Close()
log.WithContext(ctx).WithError(err).Error("Failed to start transaction")
return err
}

stmt, err := tx.Prepare(insertQuery)
if err != nil {
tx.Rollback()
rows.Close()
log.WithContext(ctx).WithError(err).Error("Failed to prepare statement")
return err
}

var batchProcessed bool
for rows.Next() {
batchProcessed = true
var r Record
var t *time.Time

Expand All @@ -227,14 +264,15 @@ func (d *MigrationMetaStore) syncMetaWithData(ctx context.Context) error {
r.UpdatedAt = *t
}

if _, err := tx.Exec(insertQuery, r.Key, r.UpdatedAt, len(r.Data)); err != nil {
tx.Rollback()
rows.Close()
log.WithContext(ctx).WithError(err).Error("Failed to execute batch insert")
return err
if _, err := stmt.Exec(r.Key, r.UpdatedAt, len(r.Data)); err != nil {
log.WithContext(ctx).WithField("key", r.Key).WithError(err).Error("error inserting key to meta")
continue
}

batchProcessed = true
}

stmt.Close()
if err := rows.Err(); err != nil {
tx.Rollback()
rows.Close()
Expand All @@ -255,7 +293,12 @@ func (d *MigrationMetaStore) syncMetaWithData(ctx context.Context) error {
}

rows.Close()
offset += metaSyncBatchSize
if !batchProcessed {
tx.Rollback()
log.WithContext(ctx).Info("no rows processed, rolling back and breaking.")
break
}
offset += metaSyncBatchSize //
}

return nil
Expand Down

0 comments on commit f8a1218

Please sign in to comment.