Skip to content

Commit

Permalink
Merge branch 'main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jul 29, 2024
2 parents 15ea806 + a601c7f commit f53a82c
Show file tree
Hide file tree
Showing 21 changed files with 359 additions and 143 deletions.
4 changes: 4 additions & 0 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ services:
- 8113:8113
environment:
<<: [*catalog-config, *flow-worker-env, *minio-config]
PEERDB_ALLOWED_TARGETS:
PEERDB_CLICKHOUSE_ALLOWED_DOMAINS:
extra_hosts:
- "host.docker.internal:host-gateway"
depends_on:
Expand Down Expand Up @@ -197,6 +199,8 @@ services:
PEERDB_PASSWORD:
NEXTAUTH_SECRET: __changeme__
NEXTAUTH_URL: http://localhost:3000
PEERDB_ALLOWED_TARGETS:
PEERDB_CLICKHOUSE_ALLOWED_DOMAINS:
depends_on:
- flow-api

Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ services:
- 8113:8113
environment:
<<: [*catalog-config, *flow-worker-env, *minio-config]
PEERDB_ALLOWED_TARGETS:
extra_hosts:
- "host.docker.internal:host-gateway"
depends_on:
Expand Down Expand Up @@ -176,6 +177,7 @@ services:
PEERDB_FLOW_SERVER_HTTP: http://flow_api:8113
NEXTAUTH_SECRET: __changeme__
NEXTAUTH_URL: http://localhost:3000
PEERDB_ALLOWED_TARGETS:
depends_on:
- flow-api

Expand Down
36 changes: 34 additions & 2 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"fmt"
"log/slog"
"strings"
"time"

"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -99,8 +100,14 @@ func (h *FlowRequestHandler) ListPeers(
ctx context.Context,
req *protos.ListPeersRequest,
) (*protos.ListPeersResponse, error) {
rows, err := h.pool.Query(ctx, "select name, type from peers")
query := "SELECT name, type FROM peers"
if peerdbenv.PeerDBAllowedTargets() == strings.ToLower(protos.DBType_CLICKHOUSE.String()) {
// only postgres and clickhouse peers
query += " WHERE type IN (3, 8)"
}
rows, err := h.pool.Query(ctx, query)
if err != nil {
slog.Error("failed to query for peers", slog.Any("error", err))
return nil, err
}
peers, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.PeerListItem, error) {
Expand All @@ -109,9 +116,34 @@ func (h *FlowRequestHandler) ListPeers(
return &peer, err
})
if err != nil {
slog.Error("failed to collect peers", slog.Any("error", err))
return nil, err
}
return &protos.ListPeersResponse{Items: peers}, nil

sourceItems := make([]*protos.PeerListItem, 0, len(peers))
for _, peer := range peers {
// only postgres peers
if peer.Type == 3 {
sourceItems = append(sourceItems, peer)
}
}

destinationItems := peers
if peerdbenv.PeerDBAllowedTargets() == strings.ToLower(protos.DBType_CLICKHOUSE.String()) {
destinationItems = make([]*protos.PeerListItem, 0, len(peers))
for _, peer := range peers {
// only clickhouse peers
if peer.Type == 8 {
destinationItems = append(destinationItems, peer)
}
}
}

return &protos.ListPeersResponse{
Items: peers,
SourceItems: sourceItems,
DestinationItems: destinationItems,
}, nil
}

func (h *FlowRequestHandler) GetSchemas(
Expand Down
12 changes: 12 additions & 0 deletions flow/cmd/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"log/slog"
"slices"
"strings"

"github.com/jackc/pgx/v5"

Expand Down Expand Up @@ -36,6 +37,17 @@ func (h *FlowRequestHandler) GetDynamicSettings(
return nil, err
}

if peerdbenv.PeerDBAllowedTargets() == strings.ToLower(protos.DBType_CLICKHOUSE.String()) {
filteredSettings := make([]*protos.DynamicSetting, 0)
for _, setting := range settings {
if setting.TargetForSetting == protos.DynconfTarget_ALL ||
setting.TargetForSetting == protos.DynconfTarget_CLICKHOUSE {
filteredSettings = append(filteredSettings, setting)
}
}
settings = filteredSettings
}

return &protos.GetDynamicSettingsResponse{Settings: settings}, nil
}

Expand Down
22 changes: 21 additions & 1 deletion flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,28 @@ func ValidateS3(ctx context.Context, creds *utils.ClickHouseS3Credentials) error
return utils.PutAndRemoveS3(ctx, s3Client, object.Bucket, object.Prefix)
}

// Creates and drops a dummy table to validate the peer
func ValidateClickhouseHost(ctx context.Context, chHost string, allowedDomainString string) error {
allowedDomains := strings.Split(allowedDomainString, ",")
if len(allowedDomains) == 0 {
return nil
}
// check if chHost ends with one of the allowed domains
for _, domain := range allowedDomains {
if strings.HasSuffix(chHost, domain) {
return nil
}
}
return fmt.Errorf("invalid Clickhouse host domain: %s. Allowed domains: %s",
chHost, strings.Join(allowedDomains, ","))
}

// Performs some checks on the Clickhouse peer to ensure it will work for mirrors
func (c *ClickhouseConnector) ValidateCheck(ctx context.Context) error {
// validate clickhouse host
allowedDomains := peerdbenv.PeerDBClickhouseAllowedDomains()
if err := ValidateClickhouseHost(ctx, c.config.Host, allowedDomains); err != nil {
return err
}
validateDummyTableName := "peerdb_validation_" + shared.RandomString(4)
// create a table
err := c.database.Exec(ctx, fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
Expand Down
8 changes: 8 additions & 0 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,11 @@ func PeerDBCurrentEncKey() (shared.PeerDBEncKey, error) {
encKeys := PeerDBEncKeys()
return encKeys.Get(encKeyID)
}

func PeerDBAllowedTargets() string {
return GetEnvString("PEERDB_ALLOWED_TARGETS", "")
}

func PeerDBClickhouseAllowedDomains() string {
return GetEnvString("PEERDB_CLICKHOUSE_ALLOWED_DOMAINS", "")
}
80 changes: 48 additions & 32 deletions flow/peerdbenv/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,45 @@ import (
var DynamicSettings = [...]*protos.DynamicSetting{
{
Name: "PEERDB_MAX_SYNCS_PER_CDC_FLOW", DefaultValue: "32", ValueType: protos.DynconfValueType_UINT,
Description: "Experimental setting: changes number of syncs per workflow, affects frequency of replication slot disconnects",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
Description: "Experimental setting: changes number of syncs per workflow, affects frequency of replication slot disconnects",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_CDC_CHANNEL_BUFFER_SIZE", DefaultValue: "262144", ValueType: protos.DynconfValueType_INT,
Description: "Advanced setting: changes buffer size of channel PeerDB uses while streaming rows read to destination in CDC",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
Description: "Advanced setting: changes buffer size of channel PeerDB uses while streaming rows read to destination in CDC",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_QUEUE_FLUSH_TIMEOUT_SECONDS", DefaultValue: "10", ValueType: protos.DynconfValueType_INT,
Description: "Frequency of flushing to queue, applicable for PeerDB Streams mirrors only",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
Description: "Frequency of flushing to queue, applicable for PeerDB Streams mirrors only",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_QUEUES,
},
{
Name: "PEERDB_QUEUE_PARALLELISM", DefaultValue: "4", ValueType: protos.DynconfValueType_INT,
Description: "Parallelism for Lua script processing data, applicable for CDC mirrors to Kakfa and PubSub",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
Description: "Parallelism for Lua script processing data, applicable for CDC mirrors to Kakfa and PubSub",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_QUEUES,
},
{
Name: "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", DefaultValue: "1000000", ValueType: protos.DynconfValueType_INT,
Description: "CDC: number of records beyond which records are written to disk instead",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
Description: "CDC: number of records beyond which records are written to disk instead",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", DefaultValue: "-1", ValueType: protos.DynconfValueType_INT,
Description: "CDC: worker memory usage (in %) beyond which records are written to disk instead, -1 disables",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
Description: "CDC: worker memory usage (in %) beyond which records are written to disk instead, -1 disables",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_ENABLE_WAL_HEARTBEAT", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL,
Description: "Enables WAL heartbeat to prevent replication slot lag from increasing during times of no activity",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
Description: "Enables WAL heartbeat to prevent replication slot lag from increasing during times of no activity",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_WAL_HEARTBEAT_QUERY",
Expand All @@ -59,48 +66,57 @@ DROP AGGREGATE IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4);
CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4);
DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4);
END;`, ValueType: protos.DynconfValueType_STRING,
Description: "SQL to run during each WAL heartbeat",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
Description: "SQL to run during each WAL heartbeat",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL,
Description: "Enables parallel sync (moving rows to target) and normalize (updating rows in target table)",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
Description: "Enables parallel sync (moving rows to target) and normalize (updating rows in target table)",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", DefaultValue: "8", ValueType: protos.DynconfValueType_INT,
Description: "Parallel MERGE statements to run for CDC mirrors with Snowflake targets. -1 for no limit",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
Description: "Parallel MERGE statements to run for CDC mirrors with Snowflake targets. -1 for no limit",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_SNOWFLAKE,
},
{
Name: "PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME", DefaultValue: "", ValueType: protos.DynconfValueType_STRING,
Description: "S3 buckets to store Avro files for mirrors with ClickHouse target",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
Description: "S3 buckets to store Avro files for mirrors with ClickHouse target",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_CLICKHOUSE,
},
{
Name: "PEERDB_QUEUE_FORCE_TOPIC_CREATION", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL,
Description: "Force auto topic creation in mirrors, applies to Kafka and PubSub mirrors",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
Description: "Force auto topic creation in mirrors, applies to Kafka and PubSub mirrors",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
TargetForSetting: protos.DynconfTarget_QUEUES,
},
{
Name: "PEERDB_ALERTING_GAP_MINUTES", DefaultValue: "15", ValueType: protos.DynconfValueType_UINT,
Description: "Duration in minutes before reraising alerts, 0 disables all alerting entirely",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
Description: "Duration in minutes before reraising alerts, 0 disables all alerting entirely",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", DefaultValue: "5000", ValueType: protos.DynconfValueType_UINT,
Description: "Lag (in MB) threshold on PeerDB slot to start sending alerts, 0 disables slot lag alerting entirely",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
Description: "Lag (in MB) threshold on PeerDB slot to start sending alerts, 0 disables slot lag alerting entirely",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", DefaultValue: "5", ValueType: protos.DynconfValueType_UINT,
Description: "Open connections from PeerDB user threshold to start sending alerts, 0 disables open connections alerting entirely",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
Description: "Open connections from PeerDB user threshold to start sending alerts, 0 disables open connections alerting entirely",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_BIGQUERY_ENABLE_SYNCED_AT_PARTITIONING_BY_DAYS", DefaultValue: "false", ValueType: protos.DynconfValueType_BOOL,
Description: "BigQuery only: create target tables with partitioning by _PEERDB_SYNCED_AT column",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
Description: "BigQuery only: create target tables with partitioning by _PEERDB_SYNCED_AT column",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR,
TargetForSetting: protos.DynconfTarget_BIGQUERY,
},
}

Expand Down
8 changes: 8 additions & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,14 @@ enum DynconfApplyMode {
APPLY_MODE_NEW_MIRROR = 4;
}

enum DynconfTarget {
ALL = 0;
BIGQUERY = 1;
SNOWFLAKE = 2;
CLICKHOUSE = 3;
QUEUES = 4;
}

message DropFlowActivityInput {
string flow_job_name = 1;
string peer_name = 2;
Expand Down
3 changes: 3 additions & 0 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ message DynamicSetting {
string description = 4;
peerdb_flow.DynconfValueType value_type = 5;
peerdb_flow.DynconfApplyMode apply_mode = 6;
peerdb_flow.DynconfTarget target_for_setting = 7;
}
message GetDynamicSettingsRequest {
}
Expand Down Expand Up @@ -245,6 +246,8 @@ message ListPeersRequest {
}
message ListPeersResponse {
repeated PeerListItem items = 1;
repeated PeerListItem source_items = 2;
repeated PeerListItem destination_items = 3;
}

message SlotInfo {
Expand Down
32 changes: 32 additions & 0 deletions ui/app/api/mirror-types/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { MirrorType } from '@/app/dto/MirrorsDTO';
import { GetPeerDBClickhouseMode } from '@/peerdb-env/allowed_targets';
import { NextRequest } from 'next/server';
export const dynamic = 'force-dynamic';
export async function GET(request: NextRequest) {
const cards = [
{
title: MirrorType.CDC,
description:
'Change-data Capture or CDC refers to replication of changes on the source table to the target table with initial load. This is recommended.',
link: 'https://docs.peerdb.io/usecases/Real-time%20CDC/overview',
},
{
title: MirrorType.QRep,
description:
'Query Replication allows you to specify a set of rows to be synced via a SELECT query. Useful for replicating views and tables without primary keys.',
link: 'https://docs.peerdb.io/usecases/Streaming%20Query%20Replication/overview',
},
];
const xminCard = {
title: MirrorType.XMin,
description:
'XMIN mode uses the xmin system column of PostgreSQL as a watermark column for replication.',
link: 'https://docs.peerdb.io/sql/commands/create-mirror#xmin-query-replication',
};

const isClickhouseMode = GetPeerDBClickhouseMode();
if (!isClickhouseMode) {
cards.push(xminCard);
}
return new Response(JSON.stringify(cards));
}
Loading

0 comments on commit f53a82c

Please sign in to comment.