Skip to content

Commit

Permalink
no peer config stored in temporal state (#1844)
Browse files Browse the repository at this point in the history
Also api uses peer names more too instead of passing configs around

This will need two version bumps to sanitize:
- on 0.13 user pauses their mirrors. This is important, failure to do so will break mirror when upgrading
- after upgrading to 0.14 unpause mirror. Temporal data will be sanitized at this point
- in 0.15 sanitization logic will be removed & protobufs can remove fields. Mirror should be paused in 0.14 to prevent replay using outdated protobufs
  • Loading branch information
serprex authored Jun 24, 2024
1 parent 078cfee commit 3e1ddef
Show file tree
Hide file tree
Showing 76 changed files with 1,241 additions and 1,345 deletions.
2 changes: 1 addition & 1 deletion flow/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ linters:
- misspell
- musttag
- nakedret
- nolintlint
# TODO bring back in 0.15 - nolintlint
- nonamedreturns
- perfsprint
- prealloc
Expand Down
223 changes: 70 additions & 153 deletions flow/activities/flowable.go

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
})
defer shutdown()

dstConn, err := connectors.GetAs[TSync](ctx, config.Destination)
dstConn, err := connectors.GetByNameAs[TSync](ctx, a.CatalogPool, config.DestinationName)
if err != nil {
return nil, fmt.Errorf("failed to get destination connector: %w", err)
}
Expand Down Expand Up @@ -154,7 +154,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
hasRecords := !recordBatchSync.WaitAndCheckEmpty()
logger.Info("current sync flow has records?", slog.Bool("hasRecords", hasRecords))

dstConn, err = connectors.GetAs[TSync](ctx, config.Destination)
dstConn, err = connectors.GetByNameAs[TSync](ctx, a.CatalogPool, config.DestinationName)
if err != nil {
return nil, fmt.Errorf("failed to recreate destination connector: %w", err)
}
Expand Down Expand Up @@ -189,7 +189,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
var res *model.SyncResponse
errGroup.Go(func() error {
syncBatchID, err := dstConn.GetLastSyncBatchID(errCtx, flowName)
if err != nil && config.Destination.Type != protos.DBType_EVENTHUBS {
if err != nil {
return err
}
syncBatchID += 1
Expand Down Expand Up @@ -332,14 +332,14 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := log.With(activity.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName))

srcConn, err := connectors.GetAs[TPull](ctx, config.SourcePeer)
srcConn, err := connectors.GetByNameAs[TPull](ctx, a.CatalogPool, config.SourceName)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to get qrep source connector: %w", err)
}
defer connectors.CloseConnector(ctx, srcConn)

dstConn, err := connectors.GetAs[TSync](ctx, config.DestinationPeer)
dstConn, err := connectors.GetByNameAs[TSync](ctx, a.CatalogPool, config.DestinationName)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to get qrep destination connector: %w", err)
Expand Down Expand Up @@ -431,13 +431,13 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
logger := activity.GetLogger(ctx)

startTime := time.Now()
srcConn, err := connectors.GetAs[*connpostgres.PostgresConnector](ctx, config.SourcePeer)
srcConn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, a.CatalogPool, config.SourceName)
if err != nil {
return 0, fmt.Errorf("failed to get qrep source connector: %w", err)
}
defer connectors.CloseConnector(ctx, srcConn)

dstConn, err := connectors.GetAs[TSync](ctx, config.DestinationPeer)
dstConn, err := connectors.GetByNameAs[TSync](ctx, a.CatalogPool, config.DestinationName)
if err != nil {
return 0, fmt.Errorf("failed to get qrep destination connector: %w", err)
}
Expand Down
25 changes: 11 additions & 14 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package activities

import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"time"

"github.com/jackc/pgx/v5/pgxpool"
"go.temporal.io/sdk/activity"

"github.com/PeerDB-io/peer-flow/alerting"
Expand All @@ -29,6 +31,7 @@ type TxSnapshotState struct {

type SnapshotActivity struct {
Alerter *alerting.Alerter
CatalogPool *pgxpool.Pool
SlotSnapshotStates map[string]SlotSnapshotState
TxSnapshotStates map[string]TxSnapshotState
SnapshotStatesMutex sync.Mutex
Expand Down Expand Up @@ -56,16 +59,14 @@ func (a *SnapshotActivity) SetupReplication(
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

dbType := config.PeerConnectionConfig.Type
if dbType != protos.DBType_POSTGRES {
logger.Info(fmt.Sprintf("setup replication is no-op for %s", dbType))
return nil, nil
}

a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job")

conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig)
conn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, a.CatalogPool, config.PeerName)
if err != nil {
if errors.Is(err, errors.ErrUnsupported) {
logger.Info("setup replication is no-op for non-postgres source")
return nil, nil
}
return nil, fmt.Errorf("failed to get connector: %w", err)
}

Expand All @@ -80,14 +81,10 @@ func (a *SnapshotActivity) SetupReplication(
connectors.CloseConnector(ctx, conn)
}

// This now happens in a goroutine
go func() {
pgConn := conn.(*connpostgres.PostgresConnector)
err = pgConn.SetupReplication(ctx, slotSignal, config)
if err != nil {
if err := conn.SetupReplication(ctx, slotSignal, config); err != nil {
closeConnectionForError(err)
replicationErr <- err
return
}
}()

Expand Down Expand Up @@ -122,8 +119,8 @@ func (a *SnapshotActivity) SetupReplication(
}, nil
}

func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, peer *protos.Peer) error {
conn, err := connectors.GetCDCPullConnector(ctx, peer)
func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, peer string) error {
conn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, a.CatalogPool, peer)
if err != nil {
return err
}
Expand Down
30 changes: 15 additions & 15 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peer-flow/alerting"
"github.com/PeerDB-io/peer-flow/connectors"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
Expand Down Expand Up @@ -60,16 +61,16 @@ func schemaForTableIdentifier(tableIdentifier string, peerDBType int32) string {
func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
req *protos.CreateCDCFlowRequest, workflowID string,
) error {
sourcePeerID, sourePeerType, srcErr := h.getPeerID(ctx, req.ConnectionConfigs.Source.Name)
sourcePeerID, sourePeerType, srcErr := h.getPeerID(ctx, req.ConnectionConfigs.SourceName)
if srcErr != nil {
return fmt.Errorf("unable to get peer id for source peer %s: %w",
req.ConnectionConfigs.Source.Name, srcErr)
req.ConnectionConfigs.SourceName, srcErr)
}

destinationPeerID, destinationPeerType, dstErr := h.getPeerID(ctx, req.ConnectionConfigs.Destination.Name)
destinationPeerID, destinationPeerType, dstErr := h.getPeerID(ctx, req.ConnectionConfigs.DestinationName)
if dstErr != nil {
return fmt.Errorf("unable to get peer id for target peer %s: %w",
req.ConnectionConfigs.Destination.Name, srcErr)
req.ConnectionConfigs.DestinationName, srcErr)
}

for _, v := range req.ConnectionConfigs.TableMappings {
Expand All @@ -92,14 +93,14 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
func (h *FlowRequestHandler) createQRepJobEntry(ctx context.Context,
req *protos.CreateQRepFlowRequest, workflowID string,
) error {
sourcePeerName := req.QrepConfig.SourcePeer.Name
sourcePeerName := req.QrepConfig.SourceName
sourcePeerID, _, srcErr := h.getPeerID(ctx, sourcePeerName)
if srcErr != nil {
return fmt.Errorf("unable to get peer id for source peer %s: %w",
sourcePeerName, srcErr)
}

destinationPeerName := req.QrepConfig.DestinationPeer.Name
destinationPeerName := req.QrepConfig.DestinationName
destinationPeerID, _, dstErr := h.getPeerID(ctx, destinationPeerName)
if dstErr != nil {
return fmt.Errorf("unable to get peer id for target peer %s: %w",
Expand Down Expand Up @@ -167,10 +168,7 @@ func (h *FlowRequestHandler) updateFlowConfigInCatalog(
ctx context.Context,
cfg *protos.FlowConnectionConfigs,
) error {
var cfgBytes []byte
var err error

cfgBytes, err = proto.Marshal(cfg)
cfgBytes, err := proto.Marshal(cfg)
if err != nil {
return fmt.Errorf("unable to marshal flow config: %w", err)
}
Expand Down Expand Up @@ -208,15 +206,18 @@ func (h *FlowRequestHandler) CreateQRepFlow(
},
}
if req.CreateCatalogEntry {
err := h.createQRepJobEntry(ctx, req, workflowID)
if err != nil {
if err := h.createQRepJobEntry(ctx, req, workflowID); err != nil {
slog.Error("unable to create flow job entry",
slog.Any("error", err), slog.String("flowName", cfg.FlowJobName))
return nil, fmt.Errorf("unable to create flow job entry: %w", err)
}
}
dbtype, err := connectors.LoadPeerType(ctx, h.pool, cfg.SourceName)
if err != nil {
return nil, err
}
var workflowFn interface{}
if cfg.SourcePeer.Type == protos.DBType_POSTGRES && cfg.WatermarkColumn == "xmin" {
if dbtype == protos.DBType_POSTGRES && cfg.WatermarkColumn == "xmin" {
workflowFn = peerflow.XminFlowWorkflow
} else {
workflowFn = peerflow.QRepFlowWorkflow
Expand All @@ -226,8 +227,7 @@ func (h *FlowRequestHandler) CreateQRepFlow(
cfg.SyncedAtColName = "_PEERDB_SYNCED_AT"
}

_, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, nil)
if err != nil {
if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, nil); err != nil {
slog.Error("unable to start QRepFlow workflow",
slog.Any("error", err), slog.String("flowName", cfg.FlowJobName))
return nil, fmt.Errorf("unable to start QRepFlow workflow: %w", err)
Expand Down
43 changes: 30 additions & 13 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//nolint:staticcheck // TODO remove in 0.15
package cmd

import (
Expand All @@ -10,6 +11,7 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/PeerDB-io/peer-flow/connectors"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
Expand Down Expand Up @@ -120,27 +122,45 @@ func (h *FlowRequestHandler) CDCFlowStatus(
return nil, err
}

// TODO remove in 0.15
// patching config to use new fields on ui
if config.Source != nil {
config.SourceName = config.Source.Name
config.Source = nil
}
if config.Destination != nil {
config.DestinationName = config.Destination.Name
config.Destination = nil
}

// patching config to show latest values from state
if state.SyncFlowOptions != nil {
config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds
config.MaxBatchSize = state.SyncFlowOptions.BatchSize
config.TableMappings = state.SyncFlowOptions.TableMappings
}

var initialCopyStatus *protos.SnapshotStatus

cloneStatuses, err := h.cloneTableSummary(ctx, req.FlowJobName)
srcType, err := connectors.LoadPeerType(ctx, h.pool, config.SourceName)
if err != nil {
return nil, err
}
dstType, err := connectors.LoadPeerType(ctx, h.pool, config.DestinationName)
if err != nil {
return nil, err
}

initialCopyStatus = &protos.SnapshotStatus{
Clones: cloneStatuses,
cloneStatuses, err := h.cloneTableSummary(ctx, req.FlowJobName)
if err != nil {
return nil, err
}

return &protos.CDCMirrorStatus{
Config: config,
SnapshotStatus: initialCopyStatus,
Config: config,
SourceType: srcType,
DestinationType: dstType,
SnapshotStatus: &protos.SnapshotStatus{
Clones: cloneStatuses,
},
}, nil
}

Expand Down Expand Up @@ -316,16 +336,14 @@ func (h *FlowRequestHandler) getFlowConfigFromCatalog(
flowJobName string,
) (*protos.FlowConnectionConfigs, error) {
var configBytes sql.RawBytes
var err error
var config protos.FlowConnectionConfigs

err = h.pool.QueryRow(ctx,
err := h.pool.QueryRow(ctx,
"SELECT config_proto FROM flows WHERE name = $1", flowJobName).Scan(&configBytes)
if err != nil {
slog.Error("unable to query flow config from catalog", slog.Any("error", err))
return nil, fmt.Errorf("unable to query flow config from catalog: %w", err)
}

var config protos.FlowConnectionConfigs
err = proto.Unmarshal(configBytes, &config)
if err != nil {
slog.Error("unable to unmarshal flow config", slog.Any("error", err))
Expand Down Expand Up @@ -386,8 +404,7 @@ func (h *FlowRequestHandler) getCDCWorkflowState(ctx context.Context,
fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
}
var state peerflow.CDCFlowWorkflowState
err = res.Get(&state)
if err != nil {
if err := res.Get(&state); err != nil {
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
return nil,
fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
Expand Down
1 change: 1 addition & 0 deletions flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (client.Client, worker.Work
SlotSnapshotStates: make(map[string]activities.SlotSnapshotState),
TxSnapshotStates: make(map[string]activities.TxSnapshotState),
Alerter: alerting.NewAlerter(context.Background(), conn),
CatalogPool: conn,
})

return c, w, nil
Expand Down
14 changes: 11 additions & 3 deletions flow/cmd/validate_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/jackc/pgx/v5/pgtype"

"github.com/PeerDB-io/peer-flow/connectors"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand Down Expand Up @@ -43,12 +44,19 @@ func (h *FlowRequestHandler) ValidateCDCMirror(
Ok: false,
}, errors.New("connection configs is nil")
}
sourcePeerConfig := req.ConnectionConfigs.Source.GetPostgresConfig()
sourcePeer, err := connectors.LoadPeer(ctx, h.pool, req.ConnectionConfigs.SourceName)
if err != nil {
return &protos.ValidateCDCMirrorResponse{
Ok: false,
}, err
}

sourcePeerConfig := sourcePeer.GetPostgresConfig()
if sourcePeerConfig == nil {
slog.Error("/validatecdc source peer config is nil", slog.Any("peer", req.ConnectionConfigs.Source))
slog.Error("/validatecdc source peer config is not postgres", slog.String("peer", req.ConnectionConfigs.SourceName))
return &protos.ValidateCDCMirrorResponse{
Ok: false,
}, errors.New("source peer config is nil")
}, errors.New("source peer config is not postgres")
}

pgPeer, err := connpostgres.NewPostgresConnector(ctx, sourcePeerConfig)
Expand Down
10 changes: 3 additions & 7 deletions flow/cmd/validate_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func (h *FlowRequestHandler) ValidatePeer(
Message: displayErr,
}, nil
}

defer conn.Close()

if req.Peer.Type == protos.DBType_POSTGRES {
Expand All @@ -58,10 +57,8 @@ func (h *FlowRequestHandler) ValidatePeer(
}
}

validationConn, ok := conn.(connectors.ValidationConnector)
if ok {
validErr := validationConn.ValidateCheck(ctx)
if validErr != nil {
if validationConn, ok := conn.(connectors.ValidationConnector); ok {
if validErr := validationConn.ValidateCheck(ctx); validErr != nil {
displayErr := fmt.Sprintf("failed to validate peer %s: %v", req.Peer.Name, validErr)
h.alerter.LogNonFlowWarning(ctx, telemetry.CreatePeer, req.Peer.Name,
displayErr,
Expand All @@ -73,8 +70,7 @@ func (h *FlowRequestHandler) ValidatePeer(
}
}

connErr := conn.ConnectionActive(ctx)
if connErr != nil {
if connErr := conn.ConnectionActive(ctx); connErr != nil {
displayErr := fmt.Sprintf("failed to establish active connection to %s peer %s: %v", req.Peer.Type, req.Peer.Name, connErr)
h.alerter.LogNonFlowWarning(ctx, telemetry.CreatePeer, req.Peer.Name,
displayErr,
Expand Down
Loading

0 comments on commit 3e1ddef

Please sign in to comment.