diff --git a/p2p/kademlia/store/sqlite/meta_worker.go b/p2p/kademlia/store/sqlite/meta_worker.go index be30f89d3..b10402510 100644 --- a/p2p/kademlia/store/sqlite/meta_worker.go +++ b/p2p/kademlia/store/sqlite/meta_worker.go @@ -24,7 +24,7 @@ var ( migrationMetaDB = "data001-migration-meta.sqlite3" accessUpdateBufferSize = 100000 commitInsertsInterval = 60 * time.Second - metaSyncBatchSize = 10000 + metaSyncBatchSize = 5000 lowSpaceThresholdGB = 50 // in GB minKeysToMigrate = 100 @@ -39,7 +39,7 @@ func init() { type UpdateMessages []UpdateMessage -// AccessUpdate holds the key and the last accessed time. +// UpdateMessage holds the key and the last accessed time. type UpdateMessage struct { Key string LastAccessTime time.Time @@ -104,12 +104,14 @@ func NewMigrationMetaStore(ctx context.Context, dataDir string, cloud cloud.Stor log.P2P().WithContext(ctx).Errorf("cannot create migration table in sqlite database: %s", err.Error()) } - if handler.isMetaSyncRequired() { - err := handler.syncMetaWithData(ctx) - if err != nil { - log.WithContext(ctx).WithError(err).Error("error syncing meta with p2p data") + go func() { + if handler.isMetaSyncRequired() { + err := handler.syncMetaWithData(ctx) + if err != nil { + log.WithContext(ctx).WithError(err).Error("error syncing meta with p2p data") + } } - } + }() go handler.startLastAccessedUpdateWorker(ctx) go handler.startInsertWorker(ctx) @@ -184,9 +186,18 @@ func (d *MigrationMetaStore) isMetaSyncRequired() bool { } func (d *MigrationMetaStore) syncMetaWithData(ctx context.Context) error { - query := `SELECT key, data, updatedAt FROM data LIMIT ? OFFSET ?` var offset int + query := `SELECT key, data, updatedAt FROM data LIMIT ? OFFSET ?` + insertQuery := ` + INSERT INTO meta (key, last_accessed, access_count, data_size) + VALUES (?, ?, 1, ?) + ON CONFLICT(key) DO + UPDATE SET + last_accessed = EXCLUDED.last_accessed, + data_size = EXCLUDED.data_size, + access_count = access_count + 1` + for { rows, err := d.p2pDataStore.Queryx(query, metaSyncBatchSize, offset) if err != nil { @@ -194,11 +205,17 @@ func (d *MigrationMetaStore) syncMetaWithData(ctx context.Context) error { return err } - log.WithContext(ctx).WithField("offset", offset).Info("Syncing meta with p2p data store") - var batchUpdates []UpdateMessage - found := false + var batchProcessed bool + + tx, err := d.db.Beginx() + if err != nil { + rows.Close() + log.WithContext(ctx).WithError(err).Error("Failed to start transaction") + return err + } + for rows.Next() { - found = true + batchProcessed = true var r Record var t *time.Time @@ -210,23 +227,35 @@ func (d *MigrationMetaStore) syncMetaWithData(ctx context.Context) error { r.UpdatedAt = *t } - dataSize := len(r.Data) - batchUpdates = append(batchUpdates, UpdateMessage{ - Key: r.Key, - LastAccessTime: r.UpdatedAt, - Size: dataSize, - }) + if _, err := tx.Exec(insertQuery, r.Key, r.UpdatedAt, len(r.Data)); err != nil { + tx.Rollback() + rows.Close() + log.WithContext(ctx).WithError(err).Error("Failed to execute batch insert") + return err + } + } + + if err := rows.Err(); err != nil { + tx.Rollback() + rows.Close() + log.WithContext(ctx).WithError(err).Error("Error iterating rows") + return err } - rows.Close() - if !found { + if batchProcessed { + if err := tx.Commit(); err != nil { + rows.Close() + log.WithContext(ctx).WithError(err).Error("Failed to commit transaction") + return err + } + } else { + tx.Rollback() + rows.Close() break } - // Send batch for insertion using the existing channel-based mechanism. - PostKeysInsert(batchUpdates) - - offset += len(batchUpdates) // Move the offset forward by the number of items processed. + rows.Close() + offset += metaSyncBatchSize } return nil diff --git a/p2p/p2p.go b/p2p/p2p.go index 58a89d532..d1ee12c48 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -262,12 +262,7 @@ func (s *p2p) configure(ctx context.Context) error { } // New returns a new p2p instance. -func New(ctx context.Context, config *Config, pastelClient pastel.Client, secInfo *alts.SecInfo, rqstore rqstore.Store, cloud cloud.Storage) (P2P, error) { - mst, err := sqlite.NewMigrationMetaStore(ctx, config.DataDir, cloud) - if err != nil { - return nil, fmt.Errorf("cannot create meta store: %w", err) - } - +func New(ctx context.Context, config *Config, pastelClient pastel.Client, secInfo *alts.SecInfo, rqstore rqstore.Store, cloud cloud.Storage, mst *sqlite.MigrationMetaStore) (P2P, error) { store, err := sqlite.NewStore(ctx, config.DataDir, cloud, mst) if err != nil { return nil, errors.Errorf("new kademlia store: %w", err) diff --git a/supernode/cmd/app.go b/supernode/cmd/app.go index 9e0fd56b7..221b6fef3 100644 --- a/supernode/cmd/app.go +++ b/supernode/cmd/app.go @@ -229,6 +229,7 @@ func runApp(ctx context.Context, config *configs.Config) error { config.P2P.ID = config.PastelID var cloudStorage *cloud.RcloneStorage + var metaMigratorStore *sqlite.MigrationMetaStore if config.RcloneStorageConfig != nil { cloudStorage = cloud.NewRcloneStorage(config.RcloneStorageConfig.BucketName, config.RcloneStorageConfig.SpecName) if config.RcloneStorageConfig.BucketName != "" && config.RcloneStorageConfig.SpecName != "" { @@ -236,17 +237,17 @@ func runApp(ctx context.Context, config *configs.Config) error { log.WithContext(ctx).WithError(err).Fatal("error establishing connection with the cloud") return fmt.Errorf("rclone connection check failed: %w", err) } + + metaMigratorStore, err = sqlite.NewMigrationMetaStore(ctx, config.P2P.DataDir, cloudStorage) + if err != nil { + return errors.Errorf("could not create p2p service, %w", err) + } } } else { log.WithContext(ctx).Info("cloud backup unavailable") } - p2p, err := p2p.New(ctx, config.P2P, pastelClient, secInfo, rqstore, cloudStorage) - if err != nil { - return errors.Errorf("could not create p2p service, %w", err) - } - - metaMigratorStore, err := sqlite.NewMigrationMetaStore(ctx, config.P2P.DataDir, cloudStorage) + p2p, err := p2p.New(ctx, config.P2P, pastelClient, secInfo, rqstore, cloudStorage, metaMigratorStore) if err != nil { return errors.Errorf("could not create p2p service, %w", err) }