Skip to content

Commit

Permalink
e2e test CustomSyncs (#2333)
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Dec 9, 2024
1 parent e868b3b commit b71aa99
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 33 deletions.
89 changes: 56 additions & 33 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,17 +412,13 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() {
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)
// insert 1 row into the source table
testKey := fmt.Sprintf("test_key_%d", 1)
testValue := fmt.Sprintf("test_value_%d", 1)
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(key, value) VALUES ($1, $2)
`, srcTableName), testKey, testValue)
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(
"INSERT INTO %s(key, value) VALUES ('test_key', 'test_value')", srcTableName))
e2e.EnvNoError(s.t, env, err)

// delete that row
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1
`, srcTableName))
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(
"DELETE FROM %s WHERE id=1", srcTableName))
e2e.EnvNoError(s.t, env, err)
s.t.Log("Inserted and deleted a row for peerdb column check")

Expand Down Expand Up @@ -904,26 +900,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
s.t.Logf("Inserted %d rows into the source table", numRows)
}

getWorkflowState := func() peerflow.CDCFlowWorkflowState {
var state peerflow.CDCFlowWorkflowState
val, err := env.Query(shared.CDCFlowStateQuery)
e2e.EnvNoError(s.t, env, err)
err = val.Get(&state)
e2e.EnvNoError(s.t, env, err)

return state
}

getFlowStatus := func() protos.FlowStatus {
var flowStatus protos.FlowStatus
val, err := env.Query(shared.FlowStatusQuery)
e2e.EnvNoError(s.t, env, err)
err = val.Get(&flowStatus)
e2e.EnvNoError(s.t, env, err)

return flowStatus
}

// add before to test initial load too.
addRows(18)
e2e.SetupCDCFlowStatusQuery(s.t, env, config)
Expand All @@ -934,7 +910,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
})

workflowState := getWorkflowState()
workflowState := e2e.EnvGetWorkflowState(s.t, env)
assert.EqualValues(s.t, 7, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
assert.EqualValues(s.t, 6, workflowState.SyncFlowOptions.BatchSize)
assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 1)
Expand All @@ -943,8 +919,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
if !s.t.Failed() {
e2e.SignalWorkflow(env, model.FlowSignal, model.PauseSignal)
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool {
flowStatus := getFlowStatus()
return flowStatus == protos.FlowStatus_STATUS_PAUSED
return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_PAUSED
})

_, err = s.Conn().Exec(context.Background(),
Expand All @@ -968,7 +943,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
})

e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool {
return getFlowStatus() == protos.FlowStatus_STATUS_RUNNING
return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_RUNNING
})
e2e.EnvWaitFor(s.t, env, 2*time.Minute, "normalize 18 records - first table", func() bool {
return s.comparePGTables(srcTable1Name, dstTable1Name, "id,t") == nil
Expand All @@ -977,7 +952,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
return s.comparePGTables(srcTable2Name, dstTable2Name, "id,t") == nil
})

workflowState = getWorkflowState()
workflowState = e2e.EnvGetWorkflowState(s.t, env)
assert.EqualValues(s.t, 14, workflowState.SyncFlowOptions.IdleTimeoutSeconds)
assert.EqualValues(s.t, 12, workflowState.SyncFlowOptions.BatchSize)
assert.Len(s.t, workflowState.SyncFlowOptions.TableMappings, 2)
Expand All @@ -988,6 +963,54 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuitePG) Test_CustomSync() {
srcTableName := s.attachSchemaSuffix("test_customsync")
dstTableName := s.attachSchemaSuffix("test_customsync_dst")

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_customsync_flow"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
Destination: s.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)

_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
key TEXT NOT NULL,
value TEXT NOT NULL
);
`, srcTableName))

require.NoError(s.t, err)
tc := e2e.NewTemporalClient(s.t)
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)

e2e.SignalWorkflow(env, model.FlowSignal, model.PauseSignal)
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool {
return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_PAUSED
})

e2e.SignalWorkflow(env, model.CDCDynamicPropertiesSignal, &protos.CDCFlowConfigUpdate{
NumberOfSyncs: 1,
})
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "resumed workflow", func() bool {
return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_RUNNING
})

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(
"INSERT INTO %s(key, value) VALUES ('test_key', 'test_value')", srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitFor(s.t, env, 1*time.Minute, "paused workflow", func() bool {
return e2e.EnvGetFlowStatus(s.t, env) == protos.FlowStatus_STATUS_PAUSED
})

require.NoError(s.t, s.comparePGTables(srcTableName, dstTableName, "id,key,value"))
env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuitePG) Test_TypeSystem_PG() {
srcTableName := s.attachSchemaSuffix("test_typesystem_pg")
dstTableName := s.attachSchemaSuffix("test_typesystem_pg_dst")
Expand Down
18 changes: 18 additions & 0 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,3 +717,21 @@ func EnvWaitForFinished(t *testing.T, env WorkflowRun, timeout time.Duration) {
return false
})
}

func EnvGetWorkflowState(t *testing.T, env WorkflowRun) peerflow.CDCFlowWorkflowState {
t.Helper()
var state peerflow.CDCFlowWorkflowState
val, err := env.Query(shared.CDCFlowStateQuery)
EnvNoError(t, env, err)
EnvNoError(t, env, val.Get(&state))
return state
}

func EnvGetFlowStatus(t *testing.T, env WorkflowRun) protos.FlowStatus {
t.Helper()
var flowStatus protos.FlowStatus
val, err := env.Query(shared.FlowStatusQuery)
EnvNoError(t, env, err)
EnvNoError(t, env, val.Get(&flowStatus))
return flowStatus
}

0 comments on commit b71aa99

Please sign in to comment.