pg_replicate
is a Rust crate to quickly build replication solutions for Postgres. It provides building blocks to construct data pipelines which can continually copy data from Postgres to other systems. It builds abstractions on top of Postgres's logical streaming replication protocol and pushes users towards the pit of success without letting them worry about low level details of the protocol.
To quickly try out pg_replicate
, you can run the stdout
example, which will replicate the data to standard output. First, create a publication in Postgres which includes the tables you want to replicate:
create publication my_publication
for table table1, table2;
Then run the stdout
example:
cargo run -p pg_replicate --example stdout --features="stdout" -- --db-host localhost --db-port 5432 --db-name postgres --db-username postgres --db-password password cdc my_publication stdout_slot
In the above example, pg_replicate
connects to a Postgres database named postgres
running on localhost:5432
with a username postgres
and password password
. The slot name stdout_slot
will be created by pg_replicate
automatically.
Refer to the examples folder to run examples for sinks other than stdout
(currently only bigquery
and duckdb
supported). A quick tip: to see all the command line options, run the example wihout any options specified, e.g. cargo run --example bigquery
will print the detailed usage instructions for the bigquery
sink.
To use pg_replicate
in your Rust project, add it via a git dependency in Cargo.toml
:
[dependencies]
pg_replicate = { git = "https://github.com/supabase/pg_replicate", features = ["stdout"] }
Each sink is behind a feature of the same name, so remember to enable the right feature. The git dependency is needed for now because pg_replicate
is not yet published on crates.io. You'd also need to add a dependency to tokio:
[dependencies]
...
tokio = { version = "1.38" }
Now your main.rs
can have code like the following:
use std::error::Error;
use pg_replicate::pipeline::{
data_pipeline::DataPipeline,
sinks::stdout::StdoutSink,
sources::postgres::{PostgresSource, TableNamesFrom},
PipelineAction,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let host = "localhost";
let port = 5432;
let database = "postgres";
let username = "postgres";
let password = Some("password".to_string());
let slot_name = Some("my_slot".to_string());
let table_names = TableNamesFrom::Publication("my_publication".to_string());
// Create a PostgresSource
let postgres_source = PostgresSource::new(
host,
port,
database,
username,
password,
slot_name,
table_names,
)
.await?;
// Create a StdoutSink. This sink just prints out the events it receives to stdout
let stdout_sink = StdoutSink;
// Create a `DataPipeline` to connect the source to the sink
let mut pipeline = DataPipeline::new(postgres_source, stdout_sink, PipelineAction::Both);
// Start the `DataPipeline` to start copying data from Postgres to stdout
pipeline.start().await?;
Ok(())
}
For more examples, please refer to the examples folder in the source.
The pg_replicate
crate has the following features:
- duckdb
- bigquery
- stdout
Each feature enables the corresponding sink of the same name.
To run the pg_replicate
examples from the root of the repository, use the following command:
cargo run -p pg_replicate --example <example_name> --features="<example_name>" ...
In the above command we ask cargo to run the examples from the pg_replicate
crate in the workspace. We also enable the right features needed for the example. Usually the feature is named the same as the example. E.g.:
cargo run -p pg_replicate --example duckdb --features="duckdb"
If you are inside the pg_replicate
folder inside the root, then you can omit the -p pg_replicate
part from the command.
The repository is a cargo workspace. Each of the individual sub-folders are crate in the workspace. A brief explanation of each crate is as follows:
api
- REST api used for hostingpg_replicate
in a cloud environment.pg_replicate
- The main library crate containing the core logic.replicator
- A binary crate usingpg_replicate
. Packaged as a docker container for use in cloud hosting.
pg_replicate
is still under heavy development so expect bugs and papercuts but overtime we plan to add the following sinks.
- Add BigQuery Sink
- Add DuckDb Sink
- Add MotherDuck Sink
- Add Snowflake Sink
- Add ClickHouse Sink
- Many more to come...
Note: DuckDb and MotherDuck sinks do no use the batched pipeline, hence they currently perform poorly. A batched pipeline version of these sinks is planned.
See the open issues for a full list of proposed features (and known issues).
Distributed under the Apache-2.0 License. See LICENSE
for more information.
To create the docker image for replicator
run docker build -f ./replicator/Dockerfile .
from the root of the repo. Similarly, to create the docker image for api
run docker build -f ./api/Dockerfile .
.
Applications can use data sources and sinks from pg_replicate
to build a data pipeline to continually copy data from the source to the sink. For example, a data pipeline to copy data from Postgres to DuckDB takes about 100 lines of Rust.
There are three components in a data pipeline:
- A data source
- A data sink
- A pipline
The data source is an object from where data will be copied. The data sink is an object to which data will be copied. The pipeline is an object which drives the data copy operations from the source to the sink.
+----------+ +----------+
| | | |
| Source |---- Data Pipeline --->| Sink |
| | | |
+----------+ +----------+
So roughly you write code like this:
let postgres_source = PostgresSource::new(...);
let duckdb_sink = DuckDbSink::new(..);
let pipeline = DataPipeline(postgres_source, duckdb_sink);
pipeline.start();
Of course, the real code is more than these four lines, but this is the basic idea. For a complete example look at the duckdb example.
A data source is the source for data which the pipeline will copy to the data sink. Currently, the repository has only one data source: PostgresSource
. PostgresSource
is the primary data source; data in any other source or sink would have originated from it.
A data sink is where the data from a data source is copied. There are two kinds of data sinks. Those which retain the essential nature of data coming out of a PostgresSource
and those which don't. The former kinds of data sinks can act as a data source in future. The latter kind can't act as a data source and are data's final resting place.
For instance, DuckDbSink
ensures that the change data capture (CDC) stream coming in from a source is materialized into tables in a DuckDB database. Once this lossy data transformation is done, it can not be used as a CDC stream again.
Contrast this with a potential future sink S3Sink
or KafkaSink
which just copies the CDC stream as is. The data deposited in the sink can later be used as if it was coming from Postgres directly.
A data pipeline encapsulates the business logic to copy the data from the source to the sink. It also orchestrates resumption of the CDC stream from the exact location it was last stopped at. The data sink participates in this by persisting the resumption state and returning it to the pipeline when it restarts.
If a data sink is not transactional (e.g. S3Sink
), it is not always possible to keep the CDC stream and the resumption state consistent with each other. This can result in these non-transactional sinks having duplicate portions of the CDC stream. Data pipeline helps in deduplicating these duplicate CDC events when the data is being copied over to a transactional store like DuckDB.
Finally, the data pipeline reports back the log sequence number (LSN) upto which the CDC stream has been copied in the sink to the PostgresSource
. This allows the Postgres database to reclaim disk space by removing WAL segment files which are no longer required by the data sink.
+----------+ +----------+
| | | |
| Source |<---- LSN Numbers -----| Sink |
| | | |
+----------+ +----------+
CDC stream is not the only kind of data a data pipeline performs. There's also full table copy, aka backfill. These two kinds can be performed either together or separately. For example, a one-off data copy can use the backfill. But if you want to regularly copy data out of Postgres and into your OLAP database, backfill and CDC stream both should be used. Backfill to get the intial copies of the data and CDC stream to keep those copies up to date and changes in Postgres happen to the copied tables.
Currently the data source and sinks copy table row and CDC events one at a time. This is expected to be slow. Batching, and other strategies will likely improve the performance drastically. But at this early stage the focus is on correctness rather than performance. There are also zero benchmarks at this stage, so commentary about performance is closer to speculation than reality.
If you see the following error when running tests with cargo test
on macOS:
called `Result::unwrap()` on an `Err` value: Os { code: 24, kind: Uncategorized, message: "Too many open files" }
Then raise the limit of open files per process with this command:
ulimit -n 10000
Now the tests should pass again.