Skip to content

Commit

Permalink
add the option to batch by num rows (#211)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Jul 12, 2023
1 parent 81d0921 commit b25ec43
Show file tree
Hide file tree
Showing 14 changed files with 349 additions and 84 deletions.
46 changes: 0 additions & 46 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,52 +127,6 @@ func (a *APIServer) StartPeerFlowWithConfig(
return workflowID, nil
}

func genConfigForQRepFlow(config *protos.QRepConfig, flowOptions map[string]interface{},
queryString string, destinationTableIdentifier string) error {
config.InitialCopyOnly = false
config.MaxParallelWorkers = uint32(flowOptions["parallelism"].(float64))
config.DestinationTableIdentifier = destinationTableIdentifier
config.Query = queryString
config.WatermarkColumn = flowOptions["watermark_column"].(string)
config.WatermarkTable = flowOptions["watermark_table_name"].(string)
config.BatchSizeInt = uint32(flowOptions["batch_size_int"].(float64))
config.BatchDurationSeconds = uint32(flowOptions["batch_duration_timestamp"].(float64))
config.WaitBetweenBatchesSeconds = uint32(flowOptions["refresh_interval"].(float64))
if flowOptions["sync_data_format"].(string) == "avro" {
config.SyncMode = protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO
if _, ok := flowOptions["staging_path"]; ok {
config.StagingPath = flowOptions["staging_path"].(string)
} else {
// if staging_path is not present, set it to empty string
config.StagingPath = ""
}
} else if flowOptions["sync_data_format"].(string) == "default" {
config.SyncMode = protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT
} else {
return fmt.Errorf("unsupported sync data format: %s", flowOptions["sync_data_format"].(string))
}
if flowOptions["mode"].(string) == "append" {
tempWriteMode := &protos.QRepWriteMode{
WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND,
}
config.WriteMode = tempWriteMode
} else if flowOptions["mode"].(string) == "upsert" {
upsertKeyColumns := make([]string, 0)
for _, column := range flowOptions["unique_key_columns"].([]interface{}) {
upsertKeyColumns = append(upsertKeyColumns, column.(string))
}

tempWriteMode := &protos.QRepWriteMode{
WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT,
UpsertKeyColumns: upsertKeyColumns,
}
config.WriteMode = tempWriteMode
} else {
return fmt.Errorf("unsupported write mode: %s", flowOptions["mode"].(string))
}
return nil
}

func (a *APIServer) StartQRepFlow(reqCtx context.Context, config *protos.QRepConfig) (string, error) {
workflowID := fmt.Sprintf("%s-qrepflow-%s", config.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
Expand Down
89 changes: 89 additions & 0 deletions flow/cmd/qrep_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package main

import (
"errors"
"fmt"

"github.com/PeerDB-io/peer-flow/generated/protos"
)

const (
SyncDataFormatAvro = "avro"
SyncDataFormatDefault = "default"
WriteModeAppend = "append"
WriteModeUpsert = "upsert"
)

type UnsupportedOptionError struct {
OptionName string
OptionValue string
}

func (e *UnsupportedOptionError) Error() string {
return fmt.Sprintf("unsupported %s: %s", e.OptionName, e.OptionValue)
}

func createWriteMode(writeType protos.QRepWriteType, upsertKeyColumns []string) *protos.QRepWriteMode {
return &protos.QRepWriteMode{
WriteType: writeType,
UpsertKeyColumns: upsertKeyColumns,
}
}

func genConfigForQRepFlow(
config *protos.QRepConfig,
flowOptions map[string]interface{},
queryString string,
destinationTableIdentifier string,
) error {
config.InitialCopyOnly = false
config.MaxParallelWorkers = uint32(flowOptions["parallelism"].(float64))
config.DestinationTableIdentifier = destinationTableIdentifier
config.Query = queryString
config.WatermarkColumn = flowOptions["watermark_column"].(string)
config.WatermarkTable = flowOptions["watermark_table_name"].(string)

// TODO (kaushik): investigate why this is a float64 in the first place
config.BatchSizeInt = uint32(flowOptions["batch_size_int"].(float64))
config.BatchDurationSeconds = uint32(flowOptions["batch_duration_timestamp"].(float64))
config.WaitBetweenBatchesSeconds = uint32(flowOptions["refresh_interval"].(float64))
config.NumRowsPerPartition = uint32(flowOptions["num_rows_per_partition"].(float64))

syncDataFormat, ok := flowOptions["sync_data_format"].(string)
if !ok {
return errors.New("sync_data_format must be a string")
}

switch syncDataFormat {
case SyncDataFormatAvro:
config.SyncMode = protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO
if _, ok := flowOptions["staging_path"]; ok {
config.StagingPath = flowOptions["staging_path"].(string)
} else {
config.StagingPath = ""
}
case SyncDataFormatDefault:
config.SyncMode = protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT
default:
return &UnsupportedOptionError{"sync_data_format", syncDataFormat}
}

mode, ok := flowOptions["mode"].(string)
if !ok {
return errors.New("mode must be a string")
}

switch mode {
case WriteModeAppend:
config.WriteMode = createWriteMode(protos.QRepWriteType_QREP_WRITE_MODE_APPEND, nil)
case WriteModeUpsert:
upsertKeyColumns := make([]string, 0)
for _, column := range flowOptions["unique_key_columns"].([]interface{}) {
upsertKeyColumns = append(upsertKeyColumns, column.(string))
}
config.WriteMode = createWriteMode(protos.QRepWriteType_QREP_WRITE_MODE_UPSERT, upsertKeyColumns)
default:
return &UnsupportedOptionError{"mode", mode}
}
return nil
}
129 changes: 129 additions & 0 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand All @@ -19,6 +20,10 @@ func (c *PostgresConnector) GetQRepPartitions(
config *protos.QRepConfig,
last *protos.QRepPartition,
) ([]*protos.QRepPartition, error) {
if config.NumRowsPerPartition > 0 {
return c.getNumRowsPartitions(config, last)
}

minValue, maxValue, err := c.getMinMaxValues(config, last)
if err != nil {
return nil, err
Expand Down Expand Up @@ -47,6 +52,130 @@ func (c *PostgresConnector) GetQRepPartitions(
return partitions, nil
}

func (c *PostgresConnector) getNumRowsPartitions(
config *protos.QRepConfig,
last *protos.QRepPartition,
) ([]*protos.QRepPartition, error) {
numRows := int64(config.NumRowsPerPartition)
quotedWatermarkColumn := fmt.Sprintf("\"%s\"", config.WatermarkColumn)

whereClause := ""
if last != nil && last.Range != nil {
whereClause = fmt.Sprintf(`WHERE %s > $1`, quotedWatermarkColumn)
}

// Query to get the total number of rows in the table
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM %s %s", config.WatermarkTable, whereClause)
var row pgx.Row
var minVal interface{}
if last != nil && last.Range != nil {
switch lastRange := last.Range.Range.(type) {
case *protos.PartitionRange_IntRange:
minVal = lastRange.IntRange.End
case *protos.PartitionRange_TimestampRange:
minVal = lastRange.TimestampRange.End.AsTime()
}
row = c.pool.QueryRow(c.ctx, countQuery, minVal)
} else {
row = c.pool.QueryRow(c.ctx, countQuery)
}

var totalRows int64
if err := row.Scan(&totalRows); err != nil {
return nil, fmt.Errorf("failed to query for total rows: %w", err)
}

// Calculate the number of partitions
numPartitions := totalRows / numRows
if totalRows%numRows != 0 {
numPartitions++
}

// Query to get partitions using window functions
var rows pgx.Rows
var err error
if minVal != nil {
partitionsQuery := fmt.Sprintf(
`SELECT bucket, MIN(%[2]s), MAX(%[2]s)
FROM (
SELECT NTILE(%[1]d) OVER (ORDER BY %[2]s) AS bucket, %[2]s
FROM %[3]s WHERE %[2]s > $1
) subquery
GROUP BY bucket`,
numPartitions,
quotedWatermarkColumn,
config.WatermarkTable,
)
rows, err = c.pool.Query(c.ctx, partitionsQuery, minVal)
} else {
partitionsQuery := fmt.Sprintf(
`SELECT bucket, MIN(%[2]s), MAX(%[2]s)
FROM (
SELECT NTILE(%[1]d) OVER (ORDER BY %[2]s) AS bucket, %[2]s FROM %[3]s
) subquery
GROUP BY bucket`,
numPartitions,
quotedWatermarkColumn,
config.WatermarkTable,
)
rows, err = c.pool.Query(c.ctx, partitionsQuery)
}
if err != nil {
return nil, fmt.Errorf("failed to query for partitions: %w", err)
}

var partitions []*protos.QRepPartition
for rows.Next() {
var bucket int64
var start, end interface{}
if err := rows.Scan(&bucket, &start, &end); err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
}

switch v := start.(type) {
case int64:
partitions = append(partitions, createIntPartition(v, end.(int64)))
case int32:
endVal := int64(end.(int32))
partitions = append(partitions, createIntPartition(int64(v), endVal))
case time.Time:
partitions = append(partitions, createTimePartition(v, end.(time.Time)))
default:
return nil, fmt.Errorf("unsupported type: %T", v)
}
}

return partitions, nil
}

func createIntPartition(start int64, end int64) *protos.QRepPartition {
return &protos.QRepPartition{
PartitionId: uuid.New().String(),
Range: &protos.PartitionRange{
Range: &protos.PartitionRange_IntRange{
IntRange: &protos.IntPartitionRange{
Start: start,
End: end,
},
},
},
}
}

func createTimePartition(start time.Time, end time.Time) *protos.QRepPartition {
return &protos.QRepPartition{
PartitionId: uuid.New().String(),
Range: &protos.PartitionRange{
Range: &protos.PartitionRange_TimestampRange{
TimestampRange: &protos.TimestampPartitionRange{
Start: timestamppb.New(start),
End: timestamppb.New(end),
},
},
},
}
}

func (c *PostgresConnector) getMinMaxValues(
config *protos.QRepConfig,
last *protos.QRepPartition,
Expand Down
Loading

0 comments on commit b25ec43

Please sign in to comment.