Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PSL-1252] fix bugs, and handle edge cases for the migration-data identifier #922

Merged
merged 1 commit into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions p2p/kademlia/store/sqlite/meta_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,18 @@ type MetaStoreInterface interface {
// GetCountOfStaleData returns the count of stale data where last_accessed is 3 months before.
func (d *MigrationMetaStore) GetCountOfStaleData(ctx context.Context, staleTime time.Time) (int, error) {
var count int
query := `SELECT COUNT(*) FROM meta WHERE last_accessed < ?`
query := `SELECT COUNT(*)
FROM meta m
LEFT JOIN (
SELECT DISTINCT mm.key
FROM meta_migration mm
JOIN migration mg ON mm.migration_id = mg.id
WHERE mg.migration_started_at > $1
AND mm.is_migrated = true
OR mm.created_at > $1
) AS recent_migrations ON m.key = recent_migrations.key
WHERE m.last_accessed < $1
AND recent_migrations.key IS NULL;`

err := d.db.GetContext(ctx, &count, query, staleTime)
if err != nil {
Expand All @@ -622,11 +633,19 @@ func (d *MigrationMetaStore) GetStaleDataInBatches(ctx context.Context, batchSiz
offset := batchNumber * batchSize

query := `
SELECT key
FROM meta
WHERE last_accessed < ?
LIMIT ? OFFSET ?
`
SELECT m.key
FROM meta m
LEFT JOIN (
SELECT DISTINCT mm.key
FROM meta_migration mm
JOIN migration mg ON mm.migration_id = mg.id
WHERE mg.migration_started_at > $1
AND mm.is_migrated = true
OR mm.created_at > $1
) AS recent_migrations ON m.key = recent_migrations.key
WHERE m.last_accessed < $1
AND recent_migrations.key IS NULL
LIMIT $2 OFFSET $3;`

rows, err := d.db.QueryxContext(ctx, query, duration, batchSize, offset)
if err != nil {
Expand Down
14 changes: 12 additions & 2 deletions supernode/cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package cmd
import (
"context"
"fmt"
"github.com/pastelnetwork/gonode/p2p/kademlia/store/sqlite"
"github.com/pastelnetwork/gonode/supernode/services/metamigrator"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -225,8 +227,10 @@ func runApp(ctx context.Context, config *configs.Config) error {
config.P2P.SetWorkDir(config.WorkDir)
config.P2P.ID = config.PastelID

cloudStorage := cloud.NewRcloneStorage(config.RcloneStorageConfig.BucketName, config.RcloneStorageConfig.SpecName)
var cloudStorage cloud.Storage

if config.RcloneStorageConfig.BucketName != "" && config.RcloneStorageConfig.SpecName != "" {
cloudStorage = cloud.NewRcloneStorage(config.RcloneStorageConfig.BucketName, config.RcloneStorageConfig.SpecName)
if err := cloudStorage.CheckCloudConnection(); err != nil {
log.WithContext(ctx).WithError(err).Fatal("error establishing connection with the cloud")
return fmt.Errorf("rclone connection check failed: %w", err)
Expand All @@ -238,6 +242,11 @@ func runApp(ctx context.Context, config *configs.Config) error {
return errors.Errorf("could not create p2p service, %w", err)
}

metaMigratorStore, err := sqlite.NewMigrationMetaStore(ctx, config.P2P.DataDir, cloudStorage)
if err != nil {
return errors.Errorf("could not create p2p service, %w", err)
}

rqAddr := fmt.Sprint(config.RaptorQ.Host, ":", config.RaptorQ.Port)
// raptorq client
config.NftRegister.RaptorQServiceAddress = rqAddr
Expand Down Expand Up @@ -298,6 +307,7 @@ func runApp(ctx context.Context, config *configs.Config) error {
storageChallenger := storagechallenge.NewService(&config.StorageChallenge, fileStorage, pastelClient, nodeClient, p2p, hDB, sDB)
healthCheckChallenger := healthcheckchallenge.NewService(&config.HealthCheckChallenge, fileStorage, pastelClient, nodeClient, p2p, nil, hDB, sDB)
selfHealing := selfhealing.NewService(&config.SelfHealingChallenge, fileStorage, pastelClient, nodeClient, p2p, hDB, nftDownload, rqstore)
metaMigratorWorker := metamigrator.NewService(metaMigratorStore)
// // ----Userdata Services----
// userdataNodeClient := client.New(pastelClient, secInfo)
// userdataProcess := userdataprocess.NewService(&config.UserdataProcess, pastelClient, userdataNodeClient, database)
Expand Down Expand Up @@ -340,5 +350,5 @@ func runApp(ctx context.Context, config *configs.Config) error {
_ = http.ListenAndServe(fmt.Sprintf(":%s", profilingPort), nil)
}()

return runServices(ctx, grpc, p2p, nftRegister, nftDownload, senseRegister, cascadeRegister, statsMngr, debugSerivce, storageChallenger, selfHealing, collectionRegister, healthCheckChallenger)
return runServices(ctx, grpc, p2p, nftRegister, nftDownload, senseRegister, cascadeRegister, statsMngr, debugSerivce, storageChallenger, selfHealing, collectionRegister, healthCheckChallenger, metaMigratorWorker)
}
2 changes: 2 additions & 0 deletions supernode/services/metamigrator/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func (service *MetaMigratorService) Run(ctx context.Context) error {
for {
select {
case <-time.After(defaultMetaMigratorDataIdentifier):
log.WithContext(ctx).Info("meta-migrator-worker run() has been invoked")

isLow, err := utils.CheckDiskSpace(lowSpaceThresholdGB)
if err != nil {
Expand All @@ -36,6 +37,7 @@ func (service *MetaMigratorService) Run(ctx context.Context) error {
}

if !isLow {
log.WithContext(ctx).Info("disk space is not lower than threshold, not proceeding further")
continue
}

Expand Down
Loading