Skip to content

Commit

Permalink
Merge branch 'main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jun 24, 2024
2 parents 751e6a0 + 3e1ddef commit b7b4ccf
Show file tree
Hide file tree
Showing 93 changed files with 1,271 additions and 1,438 deletions.
2 changes: 1 addition & 1 deletion flow/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ linters:
- misspell
- musttag
- nakedret
- nolintlint
# TODO bring back in 0.15 - nolintlint
- nonamedreturns
- perfsprint
- prealloc
Expand Down
223 changes: 70 additions & 153 deletions flow/activities/flowable.go

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
})
defer shutdown()

dstConn, err := connectors.GetAs[TSync](ctx, config.Destination)
dstConn, err := connectors.GetByNameAs[TSync](ctx, a.CatalogPool, config.DestinationName)
if err != nil {
return nil, fmt.Errorf("failed to get destination connector: %w", err)
}
Expand Down Expand Up @@ -154,7 +154,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
hasRecords := !recordBatchSync.WaitAndCheckEmpty()
logger.Info("current sync flow has records?", slog.Bool("hasRecords", hasRecords))

dstConn, err = connectors.GetAs[TSync](ctx, config.Destination)
dstConn, err = connectors.GetByNameAs[TSync](ctx, a.CatalogPool, config.DestinationName)
if err != nil {
return nil, fmt.Errorf("failed to recreate destination connector: %w", err)
}
Expand Down Expand Up @@ -189,7 +189,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
var res *model.SyncResponse
errGroup.Go(func() error {
syncBatchID, err := dstConn.GetLastSyncBatchID(errCtx, flowName)
if err != nil && config.Destination.Type != protos.DBType_EVENTHUBS {
if err != nil {
return err
}
syncBatchID += 1
Expand Down Expand Up @@ -332,14 +332,14 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := log.With(activity.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName))

srcConn, err := connectors.GetAs[TPull](ctx, config.SourcePeer)
srcConn, err := connectors.GetByNameAs[TPull](ctx, a.CatalogPool, config.SourceName)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to get qrep source connector: %w", err)
}
defer connectors.CloseConnector(ctx, srcConn)

dstConn, err := connectors.GetAs[TSync](ctx, config.DestinationPeer)
dstConn, err := connectors.GetByNameAs[TSync](ctx, a.CatalogPool, config.DestinationName)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to get qrep destination connector: %w", err)
Expand Down Expand Up @@ -431,13 +431,13 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
logger := activity.GetLogger(ctx)

startTime := time.Now()
srcConn, err := connectors.GetAs[*connpostgres.PostgresConnector](ctx, config.SourcePeer)
srcConn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, a.CatalogPool, config.SourceName)
if err != nil {
return 0, fmt.Errorf("failed to get qrep source connector: %w", err)
}
defer connectors.CloseConnector(ctx, srcConn)

dstConn, err := connectors.GetAs[TSync](ctx, config.DestinationPeer)
dstConn, err := connectors.GetByNameAs[TSync](ctx, a.CatalogPool, config.DestinationName)
if err != nil {
return 0, fmt.Errorf("failed to get qrep destination connector: %w", err)
}
Expand Down
25 changes: 11 additions & 14 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package activities

import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"time"

"github.com/jackc/pgx/v5/pgxpool"
"go.temporal.io/sdk/activity"

"github.com/PeerDB-io/peer-flow/alerting"
Expand All @@ -29,6 +31,7 @@ type TxSnapshotState struct {

type SnapshotActivity struct {
Alerter *alerting.Alerter
CatalogPool *pgxpool.Pool
SlotSnapshotStates map[string]SlotSnapshotState
TxSnapshotStates map[string]TxSnapshotState
SnapshotStatesMutex sync.Mutex
Expand Down Expand Up @@ -56,16 +59,14 @@ func (a *SnapshotActivity) SetupReplication(
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

dbType := config.PeerConnectionConfig.Type
if dbType != protos.DBType_POSTGRES {
logger.Info(fmt.Sprintf("setup replication is no-op for %s", dbType))
return nil, nil
}

a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job")

conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig)
conn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, a.CatalogPool, config.PeerName)
if err != nil {
if errors.Is(err, errors.ErrUnsupported) {
logger.Info("setup replication is no-op for non-postgres source")
return nil, nil
}
return nil, fmt.Errorf("failed to get connector: %w", err)
}

Expand All @@ -80,14 +81,10 @@ func (a *SnapshotActivity) SetupReplication(
connectors.CloseConnector(ctx, conn)
}

// This now happens in a goroutine
go func() {
pgConn := conn.(*connpostgres.PostgresConnector)
err = pgConn.SetupReplication(ctx, slotSignal, config)
if err != nil {
if err := conn.SetupReplication(ctx, slotSignal, config); err != nil {
closeConnectionForError(err)
replicationErr <- err
return
}
}()

Expand Down Expand Up @@ -122,8 +119,8 @@ func (a *SnapshotActivity) SetupReplication(
}, nil
}

func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, peer *protos.Peer) error {
conn, err := connectors.GetCDCPullConnector(ctx, peer)
func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, peer string) error {
conn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, a.CatalogPool, peer)
if err != nil {
return err
}
Expand Down
30 changes: 15 additions & 15 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peer-flow/alerting"
"github.com/PeerDB-io/peer-flow/connectors"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
Expand Down Expand Up @@ -60,16 +61,16 @@ func schemaForTableIdentifier(tableIdentifier string, peerDBType int32) string {
func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
req *protos.CreateCDCFlowRequest, workflowID string,
) error {
sourcePeerID, sourePeerType, srcErr := h.getPeerID(ctx, req.ConnectionConfigs.Source.Name)
sourcePeerID, sourePeerType, srcErr := h.getPeerID(ctx, req.ConnectionConfigs.SourceName)
if srcErr != nil {
return fmt.Errorf("unable to get peer id for source peer %s: %w",
req.ConnectionConfigs.Source.Name, srcErr)
req.ConnectionConfigs.SourceName, srcErr)
}

destinationPeerID, destinationPeerType, dstErr := h.getPeerID(ctx, req.ConnectionConfigs.Destination.Name)
destinationPeerID, destinationPeerType, dstErr := h.getPeerID(ctx, req.ConnectionConfigs.DestinationName)
if dstErr != nil {
return fmt.Errorf("unable to get peer id for target peer %s: %w",
req.ConnectionConfigs.Destination.Name, srcErr)
req.ConnectionConfigs.DestinationName, srcErr)
}

for _, v := range req.ConnectionConfigs.TableMappings {
Expand All @@ -92,14 +93,14 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
func (h *FlowRequestHandler) createQRepJobEntry(ctx context.Context,
req *protos.CreateQRepFlowRequest, workflowID string,
) error {
sourcePeerName := req.QrepConfig.SourcePeer.Name
sourcePeerName := req.QrepConfig.SourceName
sourcePeerID, _, srcErr := h.getPeerID(ctx, sourcePeerName)
if srcErr != nil {
return fmt.Errorf("unable to get peer id for source peer %s: %w",
sourcePeerName, srcErr)
}

destinationPeerName := req.QrepConfig.DestinationPeer.Name
destinationPeerName := req.QrepConfig.DestinationName
destinationPeerID, _, dstErr := h.getPeerID(ctx, destinationPeerName)
if dstErr != nil {
return fmt.Errorf("unable to get peer id for target peer %s: %w",
Expand Down Expand Up @@ -167,10 +168,7 @@ func (h *FlowRequestHandler) updateFlowConfigInCatalog(
ctx context.Context,
cfg *protos.FlowConnectionConfigs,
) error {
var cfgBytes []byte
var err error

cfgBytes, err = proto.Marshal(cfg)
cfgBytes, err := proto.Marshal(cfg)
if err != nil {
return fmt.Errorf("unable to marshal flow config: %w", err)
}
Expand Down Expand Up @@ -208,15 +206,18 @@ func (h *FlowRequestHandler) CreateQRepFlow(
},
}
if req.CreateCatalogEntry {
err := h.createQRepJobEntry(ctx, req, workflowID)
if err != nil {
if err := h.createQRepJobEntry(ctx, req, workflowID); err != nil {
slog.Error("unable to create flow job entry",
slog.Any("error", err), slog.String("flowName", cfg.FlowJobName))
return nil, fmt.Errorf("unable to create flow job entry: %w", err)
}
}
dbtype, err := connectors.LoadPeerType(ctx, h.pool, cfg.SourceName)
if err != nil {
return nil, err
}
var workflowFn interface{}
if cfg.SourcePeer.Type == protos.DBType_POSTGRES && cfg.WatermarkColumn == "xmin" {
if dbtype == protos.DBType_POSTGRES && cfg.WatermarkColumn == "xmin" {
workflowFn = peerflow.XminFlowWorkflow
} else {
workflowFn = peerflow.QRepFlowWorkflow
Expand All @@ -226,8 +227,7 @@ func (h *FlowRequestHandler) CreateQRepFlow(
cfg.SyncedAtColName = "_PEERDB_SYNCED_AT"
}

_, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, nil)
if err != nil {
if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, nil); err != nil {
slog.Error("unable to start QRepFlow workflow",
slog.Any("error", err), slog.String("flowName", cfg.FlowJobName))
return nil, fmt.Errorf("unable to start QRepFlow workflow: %w", err)
Expand Down
43 changes: 30 additions & 13 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//nolint:staticcheck // TODO remove in 0.15
package cmd

import (
Expand All @@ -10,6 +11,7 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/PeerDB-io/peer-flow/connectors"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
Expand Down Expand Up @@ -120,27 +122,45 @@ func (h *FlowRequestHandler) CDCFlowStatus(
return nil, err
}

// TODO remove in 0.15
// patching config to use new fields on ui
if config.Source != nil {
config.SourceName = config.Source.Name
config.Source = nil
}
if config.Destination != nil {
config.DestinationName = config.Destination.Name
config.Destination = nil
}

// patching config to show latest values from state
if state.SyncFlowOptions != nil {
config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds
config.MaxBatchSize = state.SyncFlowOptions.BatchSize
config.TableMappings = state.SyncFlowOptions.TableMappings
}

var initialCopyStatus *protos.SnapshotStatus

cloneStatuses, err := h.cloneTableSummary(ctx, req.FlowJobName)
srcType, err := connectors.LoadPeerType(ctx, h.pool, config.SourceName)
if err != nil {
return nil, err
}
dstType, err := connectors.LoadPeerType(ctx, h.pool, config.DestinationName)
if err != nil {
return nil, err
}

initialCopyStatus = &protos.SnapshotStatus{
Clones: cloneStatuses,
cloneStatuses, err := h.cloneTableSummary(ctx, req.FlowJobName)
if err != nil {
return nil, err
}

return &protos.CDCMirrorStatus{
Config: config,
SnapshotStatus: initialCopyStatus,
Config: config,
SourceType: srcType,
DestinationType: dstType,
SnapshotStatus: &protos.SnapshotStatus{
Clones: cloneStatuses,
},
}, nil
}

Expand Down Expand Up @@ -316,16 +336,14 @@ func (h *FlowRequestHandler) getFlowConfigFromCatalog(
flowJobName string,
) (*protos.FlowConnectionConfigs, error) {
var configBytes sql.RawBytes
var err error
var config protos.FlowConnectionConfigs

err = h.pool.QueryRow(ctx,
err := h.pool.QueryRow(ctx,
"SELECT config_proto FROM flows WHERE name = $1", flowJobName).Scan(&configBytes)
if err != nil {
slog.Error("unable to query flow config from catalog", slog.Any("error", err))
return nil, fmt.Errorf("unable to query flow config from catalog: %w", err)
}

var config protos.FlowConnectionConfigs
err = proto.Unmarshal(configBytes, &config)
if err != nil {
slog.Error("unable to unmarshal flow config", slog.Any("error", err))
Expand Down Expand Up @@ -386,8 +404,7 @@ func (h *FlowRequestHandler) getCDCWorkflowState(ctx context.Context,
fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
}
var state peerflow.CDCFlowWorkflowState
err = res.Get(&state)
if err != nil {
if err := res.Get(&state); err != nil {
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
return nil,
fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
Expand Down
1 change: 1 addition & 0 deletions flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (client.Client, worker.Work
SlotSnapshotStates: make(map[string]activities.SlotSnapshotState),
TxSnapshotStates: make(map[string]activities.TxSnapshotState),
Alerter: alerting.NewAlerter(context.Background(), conn),
CatalogPool: conn,
})

return c, w, nil
Expand Down
14 changes: 11 additions & 3 deletions flow/cmd/validate_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/jackc/pgx/v5/pgtype"

"github.com/PeerDB-io/peer-flow/connectors"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand Down Expand Up @@ -43,12 +44,19 @@ func (h *FlowRequestHandler) ValidateCDCMirror(
Ok: false,
}, errors.New("connection configs is nil")
}
sourcePeerConfig := req.ConnectionConfigs.Source.GetPostgresConfig()
sourcePeer, err := connectors.LoadPeer(ctx, h.pool, req.ConnectionConfigs.SourceName)
if err != nil {
return &protos.ValidateCDCMirrorResponse{
Ok: false,
}, err
}

sourcePeerConfig := sourcePeer.GetPostgresConfig()
if sourcePeerConfig == nil {
slog.Error("/validatecdc source peer config is nil", slog.Any("peer", req.ConnectionConfigs.Source))
slog.Error("/validatecdc source peer config is not postgres", slog.String("peer", req.ConnectionConfigs.SourceName))
return &protos.ValidateCDCMirrorResponse{
Ok: false,
}, errors.New("source peer config is nil")
}, errors.New("source peer config is not postgres")
}

pgPeer, err := connpostgres.NewPostgresConnector(ctx, sourcePeerConfig)
Expand Down
10 changes: 3 additions & 7 deletions flow/cmd/validate_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func (h *FlowRequestHandler) ValidatePeer(
Message: displayErr,
}, nil
}

defer conn.Close()

if req.Peer.Type == protos.DBType_POSTGRES {
Expand All @@ -58,10 +57,8 @@ func (h *FlowRequestHandler) ValidatePeer(
}
}

validationConn, ok := conn.(connectors.ValidationConnector)
if ok {
validErr := validationConn.ValidateCheck(ctx)
if validErr != nil {
if validationConn, ok := conn.(connectors.ValidationConnector); ok {
if validErr := validationConn.ValidateCheck(ctx); validErr != nil {
displayErr := fmt.Sprintf("failed to validate peer %s: %v", req.Peer.Name, validErr)
h.alerter.LogNonFlowWarning(ctx, telemetry.CreatePeer, req.Peer.Name,
displayErr,
Expand All @@ -73,8 +70,7 @@ func (h *FlowRequestHandler) ValidatePeer(
}
}

connErr := conn.ConnectionActive(ctx)
if connErr != nil {
if connErr := conn.ConnectionActive(ctx); connErr != nil {
displayErr := fmt.Sprintf("failed to establish active connection to %s peer %s: %v", req.Peer.Type, req.Peer.Name, connErr)
h.alerter.LogNonFlowWarning(ctx, telemetry.CreatePeer, req.Peer.Name,
displayErr,
Expand Down
Loading

0 comments on commit b7b4ccf

Please sign in to comment.