From e01b712c5aae3d134921a55afec77d41cada3a46 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 12 Jul 2023 06:52:58 -0400 Subject: [PATCH 1/3] use short sha for dev builds (#208) - prevents issues and confusion when testing deployments --- .github/workflows/dev-docker.yml | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/.github/workflows/dev-docker.yml b/.github/workflows/dev-docker.yml index 93072f24a2..fa96e1e6f9 100644 --- a/.github/workflows/dev-docker.yml +++ b/.github/workflows/dev-docker.yml @@ -28,6 +28,10 @@ jobs: username: ${{github.actor}} password: ${{secrets.GITHUB_TOKEN}} + - name: Set Short Commit Hash + id: vars + run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)" + - name: Build (optionally publish) PeerDB Dev Image uses: depot/build-push-action@v1 with: @@ -36,7 +40,7 @@ jobs: file: stacks/nexus.Dockerfile push: ${{ github.ref == 'refs/heads/main' }} tags: | - ghcr.io/peerdb-io/peerdb-server:dev + ghcr.io/peerdb-io/peerdb-server:dev-${{ steps.vars.outputs.sha_short }} - name: Build (optionally publish) Flow API Dev Image uses: depot/build-push-action@v1 @@ -46,7 +50,7 @@ jobs: file: stacks/flow-api.Dockerfile push: ${{ github.ref == 'refs/heads/main' }} tags: | - ghcr.io/peerdb-io/flow-api:dev + ghcr.io/peerdb-io/flow-api:dev-${{ steps.vars.outputs.sha_short }} - name: Build (optionally publish) Flow Worker Dev Image uses: depot/build-push-action@v1 @@ -56,4 +60,4 @@ jobs: file: stacks/flow-worker.Dockerfile push: ${{ github.ref == 'refs/heads/main' }} tags: | - ghcr.io/peerdb-io/flow-worker:dev + ghcr.io/peerdb-io/flow-worker:dev-${{ steps.vars.outputs.sha_short }} From 81d0921f37146f576df19341e159ee05c4aba880 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Wed, 12 Jul 2023 20:40:03 +0530 Subject: [PATCH 2/3] Fixes long wait for PG peer creation with invalid host (#210) Sets timeout for PG peer connection to 15 seconds. Fixes #209 --- nexus/peer-postgres/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nexus/peer-postgres/src/lib.rs b/nexus/peer-postgres/src/lib.rs index 35fb5ad38f..b5aa7843b8 100644 --- a/nexus/peer-postgres/src/lib.rs +++ b/nexus/peer-postgres/src/lib.rs @@ -36,6 +36,8 @@ fn get_connection_string(config: &PostgresConfig) -> String { } connection_string.push_str(" dbname="); connection_string.push_str(&config.database); + connection_string.push_str(" connect_timeout="); + connection_string.push_str("15"); connection_string } From b25ec4382ca7a6847f09a5b63dfff5fda2db96af Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 12 Jul 2023 14:51:30 -0400 Subject: [PATCH 3/3] add the option to batch by num rows (#211) --- flow/cmd/api.go | 46 ------- flow/cmd/qrep_api.go | 89 ++++++++++++ flow/connectors/postgres/qrep.go | 129 ++++++++++++++++++ .../postgres/qrep_partition_test.go | 72 +++++++++- flow/generated/protos/flow.pb.go | 70 ++++++---- nexus/Cargo.lock | 1 + nexus/analyzer/src/lib.rs | 1 + nexus/analyzer/src/qrep.rs | 6 + nexus/pt/Cargo.toml | 1 + nexus/pt/build.rs | 9 +- protos/flow.proto | 5 + stacks/flow-api.Dockerfile | 2 +- stacks/flow-worker.Dockerfile | 1 + stacks/nexus.Dockerfile | 1 + 14 files changed, 349 insertions(+), 84 deletions(-) create mode 100644 flow/cmd/qrep_api.go diff --git a/flow/cmd/api.go b/flow/cmd/api.go index 560d9ecbb8..c70511fdf7 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -127,52 +127,6 @@ func (a *APIServer) StartPeerFlowWithConfig( return workflowID, nil } -func genConfigForQRepFlow(config *protos.QRepConfig, flowOptions map[string]interface{}, - queryString string, destinationTableIdentifier string) error { - config.InitialCopyOnly = false - config.MaxParallelWorkers = uint32(flowOptions["parallelism"].(float64)) - config.DestinationTableIdentifier = destinationTableIdentifier - config.Query = queryString - config.WatermarkColumn = flowOptions["watermark_column"].(string) - config.WatermarkTable = flowOptions["watermark_table_name"].(string) - config.BatchSizeInt = uint32(flowOptions["batch_size_int"].(float64)) - config.BatchDurationSeconds = uint32(flowOptions["batch_duration_timestamp"].(float64)) - config.WaitBetweenBatchesSeconds = uint32(flowOptions["refresh_interval"].(float64)) - if flowOptions["sync_data_format"].(string) == "avro" { - config.SyncMode = protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO - if _, ok := flowOptions["staging_path"]; ok { - config.StagingPath = flowOptions["staging_path"].(string) - } else { - // if staging_path is not present, set it to empty string - config.StagingPath = "" - } - } else if flowOptions["sync_data_format"].(string) == "default" { - config.SyncMode = protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT - } else { - return fmt.Errorf("unsupported sync data format: %s", flowOptions["sync_data_format"].(string)) - } - if flowOptions["mode"].(string) == "append" { - tempWriteMode := &protos.QRepWriteMode{ - WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND, - } - config.WriteMode = tempWriteMode - } else if flowOptions["mode"].(string) == "upsert" { - upsertKeyColumns := make([]string, 0) - for _, column := range flowOptions["unique_key_columns"].([]interface{}) { - upsertKeyColumns = append(upsertKeyColumns, column.(string)) - } - - tempWriteMode := &protos.QRepWriteMode{ - WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT, - UpsertKeyColumns: upsertKeyColumns, - } - config.WriteMode = tempWriteMode - } else { - return fmt.Errorf("unsupported write mode: %s", flowOptions["mode"].(string)) - } - return nil -} - func (a *APIServer) StartQRepFlow(reqCtx context.Context, config *protos.QRepConfig) (string, error) { workflowID := fmt.Sprintf("%s-qrepflow-%s", config.FlowJobName, uuid.New()) workflowOptions := client.StartWorkflowOptions{ diff --git a/flow/cmd/qrep_api.go b/flow/cmd/qrep_api.go new file mode 100644 index 0000000000..e179d7b2b8 --- /dev/null +++ b/flow/cmd/qrep_api.go @@ -0,0 +1,89 @@ +package main + +import ( + "errors" + "fmt" + + "github.com/PeerDB-io/peer-flow/generated/protos" +) + +const ( + SyncDataFormatAvro = "avro" + SyncDataFormatDefault = "default" + WriteModeAppend = "append" + WriteModeUpsert = "upsert" +) + +type UnsupportedOptionError struct { + OptionName string + OptionValue string +} + +func (e *UnsupportedOptionError) Error() string { + return fmt.Sprintf("unsupported %s: %s", e.OptionName, e.OptionValue) +} + +func createWriteMode(writeType protos.QRepWriteType, upsertKeyColumns []string) *protos.QRepWriteMode { + return &protos.QRepWriteMode{ + WriteType: writeType, + UpsertKeyColumns: upsertKeyColumns, + } +} + +func genConfigForQRepFlow( + config *protos.QRepConfig, + flowOptions map[string]interface{}, + queryString string, + destinationTableIdentifier string, +) error { + config.InitialCopyOnly = false + config.MaxParallelWorkers = uint32(flowOptions["parallelism"].(float64)) + config.DestinationTableIdentifier = destinationTableIdentifier + config.Query = queryString + config.WatermarkColumn = flowOptions["watermark_column"].(string) + config.WatermarkTable = flowOptions["watermark_table_name"].(string) + + // TODO (kaushik): investigate why this is a float64 in the first place + config.BatchSizeInt = uint32(flowOptions["batch_size_int"].(float64)) + config.BatchDurationSeconds = uint32(flowOptions["batch_duration_timestamp"].(float64)) + config.WaitBetweenBatchesSeconds = uint32(flowOptions["refresh_interval"].(float64)) + config.NumRowsPerPartition = uint32(flowOptions["num_rows_per_partition"].(float64)) + + syncDataFormat, ok := flowOptions["sync_data_format"].(string) + if !ok { + return errors.New("sync_data_format must be a string") + } + + switch syncDataFormat { + case SyncDataFormatAvro: + config.SyncMode = protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO + if _, ok := flowOptions["staging_path"]; ok { + config.StagingPath = flowOptions["staging_path"].(string) + } else { + config.StagingPath = "" + } + case SyncDataFormatDefault: + config.SyncMode = protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT + default: + return &UnsupportedOptionError{"sync_data_format", syncDataFormat} + } + + mode, ok := flowOptions["mode"].(string) + if !ok { + return errors.New("mode must be a string") + } + + switch mode { + case WriteModeAppend: + config.WriteMode = createWriteMode(protos.QRepWriteType_QREP_WRITE_MODE_APPEND, nil) + case WriteModeUpsert: + upsertKeyColumns := make([]string, 0) + for _, column := range flowOptions["unique_key_columns"].([]interface{}) { + upsertKeyColumns = append(upsertKeyColumns, column.(string)) + } + config.WriteMode = createWriteMode(protos.QRepWriteType_QREP_WRITE_MODE_UPSERT, upsertKeyColumns) + default: + return &UnsupportedOptionError{"mode", mode} + } + return nil +} diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 85021edbad..b6d11bc1b1 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -9,6 +9,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/google/uuid" + "github.com/jackc/pgx/v5" log "github.com/sirupsen/logrus" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -19,6 +20,10 @@ func (c *PostgresConnector) GetQRepPartitions( config *protos.QRepConfig, last *protos.QRepPartition, ) ([]*protos.QRepPartition, error) { + if config.NumRowsPerPartition > 0 { + return c.getNumRowsPartitions(config, last) + } + minValue, maxValue, err := c.getMinMaxValues(config, last) if err != nil { return nil, err @@ -47,6 +52,130 @@ func (c *PostgresConnector) GetQRepPartitions( return partitions, nil } +func (c *PostgresConnector) getNumRowsPartitions( + config *protos.QRepConfig, + last *protos.QRepPartition, +) ([]*protos.QRepPartition, error) { + numRows := int64(config.NumRowsPerPartition) + quotedWatermarkColumn := fmt.Sprintf("\"%s\"", config.WatermarkColumn) + + whereClause := "" + if last != nil && last.Range != nil { + whereClause = fmt.Sprintf(`WHERE %s > $1`, quotedWatermarkColumn) + } + + // Query to get the total number of rows in the table + countQuery := fmt.Sprintf("SELECT COUNT(*) FROM %s %s", config.WatermarkTable, whereClause) + var row pgx.Row + var minVal interface{} + if last != nil && last.Range != nil { + switch lastRange := last.Range.Range.(type) { + case *protos.PartitionRange_IntRange: + minVal = lastRange.IntRange.End + case *protos.PartitionRange_TimestampRange: + minVal = lastRange.TimestampRange.End.AsTime() + } + row = c.pool.QueryRow(c.ctx, countQuery, minVal) + } else { + row = c.pool.QueryRow(c.ctx, countQuery) + } + + var totalRows int64 + if err := row.Scan(&totalRows); err != nil { + return nil, fmt.Errorf("failed to query for total rows: %w", err) + } + + // Calculate the number of partitions + numPartitions := totalRows / numRows + if totalRows%numRows != 0 { + numPartitions++ + } + + // Query to get partitions using window functions + var rows pgx.Rows + var err error + if minVal != nil { + partitionsQuery := fmt.Sprintf( + `SELECT bucket, MIN(%[2]s), MAX(%[2]s) + FROM ( + SELECT NTILE(%[1]d) OVER (ORDER BY %[2]s) AS bucket, %[2]s + FROM %[3]s WHERE %[2]s > $1 + ) subquery + GROUP BY bucket`, + numPartitions, + quotedWatermarkColumn, + config.WatermarkTable, + ) + rows, err = c.pool.Query(c.ctx, partitionsQuery, minVal) + } else { + partitionsQuery := fmt.Sprintf( + `SELECT bucket, MIN(%[2]s), MAX(%[2]s) + FROM ( + SELECT NTILE(%[1]d) OVER (ORDER BY %[2]s) AS bucket, %[2]s FROM %[3]s + ) subquery + GROUP BY bucket`, + numPartitions, + quotedWatermarkColumn, + config.WatermarkTable, + ) + rows, err = c.pool.Query(c.ctx, partitionsQuery) + } + if err != nil { + return nil, fmt.Errorf("failed to query for partitions: %w", err) + } + + var partitions []*protos.QRepPartition + for rows.Next() { + var bucket int64 + var start, end interface{} + if err := rows.Scan(&bucket, &start, &end); err != nil { + return nil, fmt.Errorf("failed to scan row: %w", err) + } + + switch v := start.(type) { + case int64: + partitions = append(partitions, createIntPartition(v, end.(int64))) + case int32: + endVal := int64(end.(int32)) + partitions = append(partitions, createIntPartition(int64(v), endVal)) + case time.Time: + partitions = append(partitions, createTimePartition(v, end.(time.Time))) + default: + return nil, fmt.Errorf("unsupported type: %T", v) + } + } + + return partitions, nil +} + +func createIntPartition(start int64, end int64) *protos.QRepPartition { + return &protos.QRepPartition{ + PartitionId: uuid.New().String(), + Range: &protos.PartitionRange{ + Range: &protos.PartitionRange_IntRange{ + IntRange: &protos.IntPartitionRange{ + Start: start, + End: end, + }, + }, + }, + } +} + +func createTimePartition(start time.Time, end time.Time) *protos.QRepPartition { + return &protos.QRepPartition{ + PartitionId: uuid.New().String(), + Range: &protos.PartitionRange{ + Range: &protos.PartitionRange_TimestampRange{ + TimestampRange: &protos.TimestampPartitionRange{ + Start: timestamppb.New(start), + End: timestamppb.New(end), + }, + }, + }, + } +} + func (c *PostgresConnector) getMinMaxValues( config *protos.QRepConfig, last *protos.QRepPartition, diff --git a/flow/connectors/postgres/qrep_partition_test.go b/flow/connectors/postgres/qrep_partition_test.go index 72c956cee1..60cabede57 100644 --- a/flow/connectors/postgres/qrep_partition_test.go +++ b/flow/connectors/postgres/qrep_partition_test.go @@ -14,11 +14,12 @@ import ( ) type testCase struct { - name string - config *protos.QRepConfig - last *protos.QRepPartition - want []*protos.QRepPartition - wantErr bool + name string + config *protos.QRepConfig + last *protos.QRepPartition + want []*protos.QRepPartition + expectedNumPartitions int + wantErr bool } func newTestCase(schema string, name string, duration uint32, wantErr bool) *testCase { @@ -40,6 +41,25 @@ func newTestCase(schema string, name string, duration uint32, wantErr bool) *tes } } +func newTestCaseForNumRows(schema string, name string, rows uint32, expectedNum int) *testCase { + schemaQualifiedTable := fmt.Sprintf("%s.test", schema) + query := fmt.Sprintf( + `SELECT * FROM %s WHERE "from" >= {{.start}} AND "from" < {{.end}}`, + schemaQualifiedTable) + return &testCase{ + name: name, + config: &protos.QRepConfig{ + FlowJobName: "test_flow_job", + NumRowsPerPartition: rows, + Query: query, + WatermarkTable: schemaQualifiedTable, + WatermarkColumn: "from", + }, + want: []*protos.QRepPartition{}, + expectedNumPartitions: expectedNum, + } +} + func (tc *testCase) appendPartition(start time.Time, end time.Time) *testCase { tsRange := &protos.PartitionRange_TimestampRange{ TimestampRange: &protos.TimestampPartitionRange{ @@ -107,7 +127,7 @@ func TestGetQRepPartitions(t *testing.T) { } // from 2010 Jan 1 10:00 AM UTC to 2010 Jan 30 10:00 AM UTC - prepareTestData(t, pool, schemaName) + numRows := prepareTestData(t, pool, schemaName) secondsInADay := uint32(24 * time.Hour / time.Second) fmt.Printf("secondsInADay: %d\n", secondsInADay) @@ -155,6 +175,31 @@ func TestGetQRepPartitions(t *testing.T) { 0, true, ), + newTestCaseForNumRows( + schemaName, + "ensure all rows are in 1 partition if num_rows_per_partition is size of table", + uint32(numRows), + 1, + ), + newTestCaseForNumRows( + schemaName, + "ensure all rows are in 2 partitions if num_rows_per_partition is half the size of table", + uint32(numRows)/2, + 2, + ), + newTestCaseForNumRows( + schemaName, + "ensure all rows are in 3 partitions if num_rows_per_partition is 1/3 the size of table", + uint32(numRows)/3, + 3, + ), + // this is 5 partitions 30 rows and 7 rows per partition, would be 7, 7, 7, 7, 2 + newTestCaseForNumRows( + schemaName, + "ensure all rows are in 5 partitions if num_rows_per_partition is 1/4 the size of table", + uint32(numRows)/4, + 5, + ), } // Run the test cases @@ -176,6 +221,16 @@ func TestGetQRepPartitions(t *testing.T) { return } + // If the expected number of partitions is set, just check that + // the number of partitions is equal to the expected number of + // partitions, we don't care about the actual partition ranges + // for now, but ideally we should check that the partition ranges + // are correct as well. + if tc.expectedNumPartitions != 0 { + assert.Equal(t, tc.expectedNumPartitions, len(got)) + return + } + expected := tc.want assert.Equal(t, len(expected), len(got)) @@ -195,7 +250,8 @@ func TestGetQRepPartitions(t *testing.T) { } } -func prepareTestData(test *testing.T, pool *pgxpool.Pool, schema string) { +// returns the number of rows inserted +func prepareTestData(test *testing.T, pool *pgxpool.Pool, schema string) int { // Define the start and end times startTime := time.Date(2010, time.January, 1, 10, 0, 0, 0, time.UTC) endTime := time.Date(2010, time.January, 30, 10, 0, 0, 0, time.UTC) @@ -215,4 +271,6 @@ func prepareTestData(test *testing.T, pool *pgxpool.Pool, schema string) { test.Fatalf("Failed to insert test data: %v", err) } } + + return len(times) } diff --git a/flow/generated/protos/flow.pb.go b/flow/generated/protos/flow.pb.go index 0c1c05a2cb..55bc13bbb1 100644 --- a/flow/generated/protos/flow.pb.go +++ b/flow/generated/protos/flow.pb.go @@ -1507,6 +1507,10 @@ type QRepConfig struct { // if nothing is specified then it will be written to local disk // if using GCS or S3 make sure your instance has the correct permissions. StagingPath string `protobuf:"bytes,15,opt,name=staging_path,json=stagingPath,proto3" json:"staging_path,omitempty"` + // This setting overrides batch_size_int and batch_duration_seconds + // and instead uses the number of rows per partition to determine + // how many rows to process per batch. + NumRowsPerPartition uint32 `protobuf:"varint,16,opt,name=num_rows_per_partition,json=numRowsPerPartition,proto3" json:"num_rows_per_partition,omitempty"` } func (x *QRepConfig) Reset() { @@ -1646,6 +1650,13 @@ func (x *QRepConfig) GetStagingPath() string { return "" } +func (x *QRepConfig) GetNumRowsPerPartition() uint32 { + if x != nil { + return x.NumRowsPerPartition + } + return 0 +} + type QRepPartition struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -2058,7 +2069,7 @@ var file_flow_proto_rawDesc = []byte{ 0x72, 0x69, 0x74, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x2c, 0x0a, 0x12, 0x75, 0x70, 0x73, 0x65, 0x72, 0x74, 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x10, 0x75, 0x70, 0x73, 0x65, 0x72, 0x74, 0x4b, 0x65, 0x79, 0x43, - 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x22, 0xe1, 0x05, 0x0a, 0x0a, 0x51, 0x52, 0x65, 0x70, 0x43, + 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x22, 0x96, 0x06, 0x0a, 0x0a, 0x51, 0x52, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x22, 0x0a, 0x0d, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x6a, 0x6f, 0x62, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x66, 0x6c, 0x6f, 0x77, 0x4a, 0x6f, 0x62, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x33, 0x0a, 0x0b, 0x73, 0x6f, 0x75, @@ -2104,33 +2115,36 @@ var file_flow_proto_rawDesc = []byte{ 0x52, 0x65, 0x70, 0x57, 0x72, 0x69, 0x74, 0x65, 0x4d, 0x6f, 0x64, 0x65, 0x52, 0x09, 0x77, 0x72, 0x69, 0x74, 0x65, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x73, 0x74, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x73, - 0x74, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x50, 0x61, 0x74, 0x68, 0x22, 0x65, 0x0a, 0x0d, 0x51, 0x52, - 0x65, 0x70, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c, 0x70, - 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x31, - 0x0a, 0x05, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, - 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x2e, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x50, 0x61, 0x72, 0x74, - 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x05, 0x72, 0x61, 0x6e, 0x67, - 0x65, 0x22, 0x50, 0x0a, 0x12, 0x51, 0x52, 0x65, 0x70, 0x50, 0x61, 0x72, 0x69, 0x74, 0x69, 0x6f, - 0x6e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x3a, 0x0a, 0x0a, 0x70, 0x61, 0x72, 0x74, 0x69, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x70, 0x65, - 0x65, 0x72, 0x64, 0x62, 0x2e, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x51, 0x52, 0x65, 0x70, 0x50, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x73, 0x22, 0x2c, 0x0a, 0x0d, 0x44, 0x72, 0x6f, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x49, - 0x6e, 0x70, 0x75, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x6e, 0x61, 0x6d, - 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x66, 0x6c, 0x6f, 0x77, 0x4e, 0x61, 0x6d, - 0x65, 0x2a, 0x50, 0x0a, 0x0c, 0x51, 0x52, 0x65, 0x70, 0x53, 0x79, 0x6e, 0x63, 0x4d, 0x6f, 0x64, - 0x65, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, - 0x4f, 0x44, 0x45, 0x5f, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x5f, 0x49, 0x4e, 0x53, 0x45, 0x52, 0x54, - 0x10, 0x00, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x53, 0x59, 0x4e, 0x43, 0x5f, - 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x53, 0x54, 0x4f, 0x52, 0x41, 0x47, 0x45, 0x5f, 0x41, 0x56, 0x52, - 0x4f, 0x10, 0x01, 0x2a, 0x47, 0x0a, 0x0d, 0x51, 0x52, 0x65, 0x70, 0x57, 0x72, 0x69, 0x74, 0x65, - 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, 0x0a, 0x16, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, - 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x41, 0x50, 0x50, 0x45, 0x4e, 0x44, 0x10, 0x00, - 0x12, 0x1a, 0x0a, 0x16, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, - 0x4f, 0x44, 0x45, 0x5f, 0x55, 0x50, 0x53, 0x45, 0x52, 0x54, 0x10, 0x01, 0x42, 0x12, 0x5a, 0x10, - 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x50, 0x61, 0x74, 0x68, 0x12, 0x33, 0x0a, 0x16, 0x6e, 0x75, + 0x6d, 0x5f, 0x72, 0x6f, 0x77, 0x73, 0x5f, 0x70, 0x65, 0x72, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, + 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x10, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x13, 0x6e, 0x75, 0x6d, 0x52, + 0x6f, 0x77, 0x73, 0x50, 0x65, 0x72, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x22, + 0x65, 0x0a, 0x0d, 0x51, 0x52, 0x65, 0x70, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, + 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x49, 0x64, 0x12, 0x31, 0x0a, 0x05, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x2e, 0x66, 0x6c, 0x6f, 0x77, + 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x52, + 0x05, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x22, 0x50, 0x0a, 0x12, 0x51, 0x52, 0x65, 0x70, 0x50, 0x61, + 0x72, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x3a, 0x0a, 0x0a, + 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, + 0x32, 0x1a, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x2e, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x51, + 0x52, 0x65, 0x70, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x70, 0x61, + 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x2c, 0x0a, 0x0d, 0x44, 0x72, 0x6f, 0x70, + 0x46, 0x6c, 0x6f, 0x77, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x66, 0x6c, 0x6f, + 0x77, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x66, 0x6c, + 0x6f, 0x77, 0x4e, 0x61, 0x6d, 0x65, 0x2a, 0x50, 0x0a, 0x0c, 0x51, 0x52, 0x65, 0x70, 0x53, 0x79, + 0x6e, 0x63, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x53, + 0x59, 0x4e, 0x43, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x5f, 0x49, + 0x4e, 0x53, 0x45, 0x52, 0x54, 0x10, 0x00, 0x12, 0x1f, 0x0a, 0x1b, 0x51, 0x52, 0x45, 0x50, 0x5f, + 0x53, 0x59, 0x4e, 0x43, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x53, 0x54, 0x4f, 0x52, 0x41, 0x47, + 0x45, 0x5f, 0x41, 0x56, 0x52, 0x4f, 0x10, 0x01, 0x2a, 0x47, 0x0a, 0x0d, 0x51, 0x52, 0x65, 0x70, + 0x57, 0x72, 0x69, 0x74, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1a, 0x0a, 0x16, 0x51, 0x52, 0x45, + 0x50, 0x5f, 0x57, 0x52, 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x41, 0x50, 0x50, + 0x45, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x1a, 0x0a, 0x16, 0x51, 0x52, 0x45, 0x50, 0x5f, 0x57, 0x52, + 0x49, 0x54, 0x45, 0x5f, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x55, 0x50, 0x53, 0x45, 0x52, 0x54, 0x10, + 0x01, 0x42, 0x12, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/nexus/Cargo.lock b/nexus/Cargo.lock index 08eb0d872d..e2a7310f2e 100644 --- a/nexus/Cargo.lock +++ b/nexus/Cargo.lock @@ -2386,6 +2386,7 @@ dependencies = [ "prost", "prost-build", "prost-types", + "serde", "sqlparser", ] diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 4490e9943a..c85f2be0a6 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -164,6 +164,7 @@ impl StatementAnalyzer for PeerDDLAnalyzer { } let processed_options = process_options(raw_options)?; + let qrep_flow_job = QRepFlowJob { name: select.mirror_name.to_string().to_lowercase(), source_peer: select.source_peer.to_string().to_lowercase(), diff --git a/nexus/analyzer/src/qrep.rs b/nexus/analyzer/src/qrep.rs index 5f9aebdf5e..2a61774f2d 100644 --- a/nexus/analyzer/src/qrep.rs +++ b/nexus/analyzer/src/qrep.rs @@ -87,6 +87,12 @@ lazy_static::lazy_static! { default_value: 60, required: false, }, + QRepOptionType::Int { + name: "num_rows_per_partition", + min_value: Some(0), + default_value: 0, + required: false, + }, ] }; } diff --git a/nexus/pt/Cargo.toml b/nexus/pt/Cargo.toml index d0d2878b55..0ccde379fc 100644 --- a/nexus/pt/Cargo.toml +++ b/nexus/pt/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" bytes = "1.1" prost = "0.11" prost-types = "0.11" +serde = { version = "1.0", features = ["derive"] } sqlparser = { path = "../sqlparser-rs" } [build-dependencies] diff --git a/nexus/pt/build.rs b/nexus/pt/build.rs index b5bf355de3..e70fbaa610 100644 --- a/nexus/pt/build.rs +++ b/nexus/pt/build.rs @@ -27,8 +27,13 @@ fn main() -> Result<()> { println!("cargo:warning={}", proto.display()); } - // compile all protos in /protos by iterating over the directory - prost_build::compile_protos(&proto_files, &[root.join("protos")])?; + // see: https://github.com/tokio-rs/prost/issues/75 for future. + let mut config = prost_build::Config::new(); + config.type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]"); + config.type_attribute(".", "#[serde(rename_all = \"camelCase\")]"); + + // generate rust code for all protos in /protos + config.compile_protos(&proto_files, &[root.join("protos")])?; Ok(()) } diff --git a/protos/flow.proto b/protos/flow.proto index ee0a5cfede..cbd554dab8 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -179,6 +179,11 @@ message QRepConfig { // if nothing is specified then it will be written to local disk // if using GCS or S3 make sure your instance has the correct permissions. string staging_path = 15; + + // This setting overrides batch_size_int and batch_duration_seconds + // and instead uses the number of rows per partition to determine + // how many rows to process per batch. + uint32 num_rows_per_partition = 16; } message QRepPartition { diff --git a/stacks/flow-api.Dockerfile b/stacks/flow-api.Dockerfile index eb061a93f2..1374bd9111 100644 --- a/stacks/flow-api.Dockerfile +++ b/stacks/flow-api.Dockerfile @@ -21,7 +21,7 @@ RUN --mount=type=cache,target=/root/.cache/go-build \ FROM ubuntu:20.04 RUN apt-get update && apt-get install -y ca-certificates curl -WORKDIR /root/ +WORKDIR /root COPY --from=builder /root/peer-flow . EXPOSE 8112 ENTRYPOINT ["./peer-flow", "api", "--port", "8112"] diff --git a/stacks/flow-worker.Dockerfile b/stacks/flow-worker.Dockerfile index 8b2f16e7bf..b5226ff9ed 100644 --- a/stacks/flow-worker.Dockerfile +++ b/stacks/flow-worker.Dockerfile @@ -22,6 +22,7 @@ RUN --mount=type=cache,target=/root/.cache/go-build \ FROM ubuntu:20.04 RUN apt-get update && apt-get install -y ca-certificates +WORKDIR /root COPY --from=builder /root/peer-flow . EXPOSE 8112 ENTRYPOINT ["./peer-flow", "worker"] diff --git a/stacks/nexus.Dockerfile b/stacks/nexus.Dockerfile index c7474782f0..b0fc64b59f 100644 --- a/stacks/nexus.Dockerfile +++ b/stacks/nexus.Dockerfile @@ -26,5 +26,6 @@ RUN CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse cargo build --release --bin peerd FROM ubuntu:20.04 RUN apt-get update && apt-get install -y ca-certificates RUN mkdir -p /var/log/peerdb +WORKDIR /root COPY --from=builder /root/nexus/target/release/peerdb-server . CMD ["./peerdb-server"]