Skip to content

Commit

Permalink
[PSL-1252] fix bugs, and handle edge cases for the migration-data ide…
Browse files Browse the repository at this point in the history
…ntifier
  • Loading branch information
j-rafique committed Aug 14, 2024
1 parent b41d407 commit 5db6b5c
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 8 deletions.
31 changes: 25 additions & 6 deletions p2p/kademlia/store/sqlite/meta_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,18 @@ type MetaStoreInterface interface {
// GetCountOfStaleData returns the count of stale data where last_accessed is 3 months before.
func (d *MigrationMetaStore) GetCountOfStaleData(ctx context.Context, staleTime time.Time) (int, error) {
var count int
query := `SELECT COUNT(*) FROM meta WHERE last_accessed < ?`
query := `SELECT COUNT(*)
FROM meta m
LEFT JOIN (
SELECT DISTINCT mm.key
FROM meta_migration mm
JOIN migration mg ON mm.migration_id = mg.id
WHERE mg.migration_started_at > $1
AND mm.is_migrated = true
OR mm.created_at > $1
) AS recent_migrations ON m.key = recent_migrations.key
WHERE m.last_accessed < $1
AND recent_migrations.key IS NULL;`

err := d.db.GetContext(ctx, &count, query, staleTime)
if err != nil {
Expand All @@ -622,11 +633,19 @@ func (d *MigrationMetaStore) GetStaleDataInBatches(ctx context.Context, batchSiz
offset := batchNumber * batchSize

query := `
SELECT key
FROM meta
WHERE last_accessed < ?
LIMIT ? OFFSET ?
`
SELECT m.key
FROM meta m
LEFT JOIN (
SELECT DISTINCT mm.key
FROM meta_migration mm
JOIN migration mg ON mm.migration_id = mg.id
WHERE mg.migration_started_at > $1
AND mm.is_migrated = true
OR mm.created_at > $1
) AS recent_migrations ON m.key = recent_migrations.key
WHERE m.last_accessed < $1
AND recent_migrations.key IS NULL
LIMIT $2 OFFSET $3;`

rows, err := d.db.QueryxContext(ctx, query, duration, batchSize, offset)
if err != nil {
Expand Down
14 changes: 12 additions & 2 deletions supernode/cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package cmd
import (
"context"
"fmt"
"github.com/pastelnetwork/gonode/p2p/kademlia/store/sqlite"
"github.com/pastelnetwork/gonode/supernode/services/metamigrator"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -225,8 +227,10 @@ func runApp(ctx context.Context, config *configs.Config) error {
config.P2P.SetWorkDir(config.WorkDir)
config.P2P.ID = config.PastelID

cloudStorage := cloud.NewRcloneStorage(config.RcloneStorageConfig.BucketName, config.RcloneStorageConfig.SpecName)
var cloudStorage cloud.Storage

if config.RcloneStorageConfig.BucketName != "" && config.RcloneStorageConfig.SpecName != "" {
cloudStorage = cloud.NewRcloneStorage(config.RcloneStorageConfig.BucketName, config.RcloneStorageConfig.SpecName)
if err := cloudStorage.CheckCloudConnection(); err != nil {
log.WithContext(ctx).WithError(err).Fatal("error establishing connection with the cloud")
return fmt.Errorf("rclone connection check failed: %w", err)
Expand All @@ -238,6 +242,11 @@ func runApp(ctx context.Context, config *configs.Config) error {
return errors.Errorf("could not create p2p service, %w", err)
}

metaMigratorStore, err := sqlite.NewMigrationMetaStore(ctx, config.P2P.DataDir, cloudStorage)
if err != nil {
return errors.Errorf("could not create p2p service, %w", err)
}

rqAddr := fmt.Sprint(config.RaptorQ.Host, ":", config.RaptorQ.Port)
// raptorq client
config.NftRegister.RaptorQServiceAddress = rqAddr
Expand Down Expand Up @@ -298,6 +307,7 @@ func runApp(ctx context.Context, config *configs.Config) error {
storageChallenger := storagechallenge.NewService(&config.StorageChallenge, fileStorage, pastelClient, nodeClient, p2p, hDB, sDB)
healthCheckChallenger := healthcheckchallenge.NewService(&config.HealthCheckChallenge, fileStorage, pastelClient, nodeClient, p2p, nil, hDB, sDB)
selfHealing := selfhealing.NewService(&config.SelfHealingChallenge, fileStorage, pastelClient, nodeClient, p2p, hDB, nftDownload, rqstore)
metaMigratorWorker := metamigrator.NewService(metaMigratorStore)
// // ----Userdata Services----
// userdataNodeClient := client.New(pastelClient, secInfo)
// userdataProcess := userdataprocess.NewService(&config.UserdataProcess, pastelClient, userdataNodeClient, database)
Expand Down Expand Up @@ -340,5 +350,5 @@ func runApp(ctx context.Context, config *configs.Config) error {
_ = http.ListenAndServe(fmt.Sprintf(":%s", profilingPort), nil)
}()

return runServices(ctx, grpc, p2p, nftRegister, nftDownload, senseRegister, cascadeRegister, statsMngr, debugSerivce, storageChallenger, selfHealing, collectionRegister, healthCheckChallenger)
return runServices(ctx, grpc, p2p, nftRegister, nftDownload, senseRegister, cascadeRegister, statsMngr, debugSerivce, storageChallenger, selfHealing, collectionRegister, healthCheckChallenger, metaMigratorWorker)
}
2 changes: 2 additions & 0 deletions supernode/services/metamigrator/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func (service *MetaMigratorService) Run(ctx context.Context) error {
for {
select {
case <-time.After(defaultMetaMigratorDataIdentifier):
log.WithContext(ctx).Info("meta-migrator-worker run() has been invoked")

isLow, err := utils.CheckDiskSpace(lowSpaceThresholdGB)
if err != nil {
Expand All @@ -36,6 +37,7 @@ func (service *MetaMigratorService) Run(ctx context.Context) error {
}

if !isLow {
log.WithContext(ctx).Info("disk space is not lower than threshold, not proceeding further")
continue
}

Expand Down

0 comments on commit 5db6b5c

Please sign in to comment.