Skip to content

Commit

Permalink
[PSL-1254] revert channel mechanism instead use batching for meta sync (
Browse files Browse the repository at this point in the history
  • Loading branch information
j-rafique authored Aug 15, 2024
1 parent 5dfbeec commit d479a9b
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 36 deletions.
77 changes: 53 additions & 24 deletions p2p/kademlia/store/sqlite/meta_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var (
migrationMetaDB = "data001-migration-meta.sqlite3"
accessUpdateBufferSize = 100000
commitInsertsInterval = 60 * time.Second
metaSyncBatchSize = 10000
metaSyncBatchSize = 5000
lowSpaceThresholdGB = 50 // in GB
minKeysToMigrate = 100

Expand All @@ -39,7 +39,7 @@ func init() {

type UpdateMessages []UpdateMessage

// AccessUpdate holds the key and the last accessed time.
// UpdateMessage holds the key and the last accessed time.
type UpdateMessage struct {
Key string
LastAccessTime time.Time
Expand Down Expand Up @@ -104,12 +104,14 @@ func NewMigrationMetaStore(ctx context.Context, dataDir string, cloud cloud.Stor
log.P2P().WithContext(ctx).Errorf("cannot create migration table in sqlite database: %s", err.Error())
}

if handler.isMetaSyncRequired() {
err := handler.syncMetaWithData(ctx)
if err != nil {
log.WithContext(ctx).WithError(err).Error("error syncing meta with p2p data")
go func() {
if handler.isMetaSyncRequired() {
err := handler.syncMetaWithData(ctx)
if err != nil {
log.WithContext(ctx).WithError(err).Error("error syncing meta with p2p data")
}
}
}
}()

go handler.startLastAccessedUpdateWorker(ctx)
go handler.startInsertWorker(ctx)
Expand Down Expand Up @@ -184,21 +186,36 @@ func (d *MigrationMetaStore) isMetaSyncRequired() bool {
}

func (d *MigrationMetaStore) syncMetaWithData(ctx context.Context) error {
query := `SELECT key, data, updatedAt FROM data LIMIT ? OFFSET ?`
var offset int

query := `SELECT key, data, updatedAt FROM data LIMIT ? OFFSET ?`
insertQuery := `
INSERT INTO meta (key, last_accessed, access_count, data_size)
VALUES (?, ?, 1, ?)
ON CONFLICT(key) DO
UPDATE SET
last_accessed = EXCLUDED.last_accessed,
data_size = EXCLUDED.data_size,
access_count = access_count + 1`

for {
rows, err := d.p2pDataStore.Queryx(query, metaSyncBatchSize, offset)
if err != nil {
log.WithContext(ctx).WithError(err).Error("Error querying p2p data store")
return err
}

log.WithContext(ctx).WithField("offset", offset).Info("Syncing meta with p2p data store")
var batchUpdates []UpdateMessage
found := false
var batchProcessed bool

tx, err := d.db.Beginx()
if err != nil {
rows.Close()
log.WithContext(ctx).WithError(err).Error("Failed to start transaction")
return err
}

for rows.Next() {
found = true
batchProcessed = true
var r Record
var t *time.Time

Expand All @@ -210,23 +227,35 @@ func (d *MigrationMetaStore) syncMetaWithData(ctx context.Context) error {
r.UpdatedAt = *t
}

dataSize := len(r.Data)
batchUpdates = append(batchUpdates, UpdateMessage{
Key: r.Key,
LastAccessTime: r.UpdatedAt,
Size: dataSize,
})
if _, err := tx.Exec(insertQuery, r.Key, r.UpdatedAt, len(r.Data)); err != nil {
tx.Rollback()
rows.Close()
log.WithContext(ctx).WithError(err).Error("Failed to execute batch insert")
return err
}
}

if err := rows.Err(); err != nil {
tx.Rollback()
rows.Close()
log.WithContext(ctx).WithError(err).Error("Error iterating rows")
return err
}
rows.Close()

if !found {
if batchProcessed {
if err := tx.Commit(); err != nil {
rows.Close()
log.WithContext(ctx).WithError(err).Error("Failed to commit transaction")
return err
}
} else {
tx.Rollback()
rows.Close()
break
}

// Send batch for insertion using the existing channel-based mechanism.
PostKeysInsert(batchUpdates)

offset += len(batchUpdates) // Move the offset forward by the number of items processed.
rows.Close()
offset += metaSyncBatchSize
}

return nil
Expand Down
7 changes: 1 addition & 6 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,7 @@ func (s *p2p) configure(ctx context.Context) error {
}

// New returns a new p2p instance.
func New(ctx context.Context, config *Config, pastelClient pastel.Client, secInfo *alts.SecInfo, rqstore rqstore.Store, cloud cloud.Storage) (P2P, error) {
mst, err := sqlite.NewMigrationMetaStore(ctx, config.DataDir, cloud)
if err != nil {
return nil, fmt.Errorf("cannot create meta store: %w", err)
}

func New(ctx context.Context, config *Config, pastelClient pastel.Client, secInfo *alts.SecInfo, rqstore rqstore.Store, cloud cloud.Storage, mst *sqlite.MigrationMetaStore) (P2P, error) {
store, err := sqlite.NewStore(ctx, config.DataDir, cloud, mst)
if err != nil {
return nil, errors.Errorf("new kademlia store: %w", err)
Expand Down
13 changes: 7 additions & 6 deletions supernode/cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,24 +229,25 @@ func runApp(ctx context.Context, config *configs.Config) error {
config.P2P.ID = config.PastelID

var cloudStorage *cloud.RcloneStorage
var metaMigratorStore *sqlite.MigrationMetaStore
if config.RcloneStorageConfig != nil {
cloudStorage = cloud.NewRcloneStorage(config.RcloneStorageConfig.BucketName, config.RcloneStorageConfig.SpecName)
if config.RcloneStorageConfig.BucketName != "" && config.RcloneStorageConfig.SpecName != "" {
if err := cloudStorage.CheckCloudConnection(); err != nil {
log.WithContext(ctx).WithError(err).Fatal("error establishing connection with the cloud")
return fmt.Errorf("rclone connection check failed: %w", err)
}

metaMigratorStore, err = sqlite.NewMigrationMetaStore(ctx, config.P2P.DataDir, cloudStorage)
if err != nil {
return errors.Errorf("could not create p2p service, %w", err)
}
}
} else {
log.WithContext(ctx).Info("cloud backup unavailable")
}

p2p, err := p2p.New(ctx, config.P2P, pastelClient, secInfo, rqstore, cloudStorage)
if err != nil {
return errors.Errorf("could not create p2p service, %w", err)
}

metaMigratorStore, err := sqlite.NewMigrationMetaStore(ctx, config.P2P.DataDir, cloudStorage)
p2p, err := p2p.New(ctx, config.P2P, pastelClient, secInfo, rqstore, cloudStorage, metaMigratorStore)
if err != nil {
return errors.Errorf("could not create p2p service, %w", err)
}
Expand Down

0 comments on commit d479a9b

Please sign in to comment.