Skip to content

Commit

Permalink
Merge pull request #63 from supabase/text-format
Browse files Browse the repository at this point in the history
add support for parsing values from text format
  • Loading branch information
imor authored Nov 27, 2024
2 parents 1db8e39 + fe419f2 commit 30bb8d7
Show file tree
Hide file tree
Showing 7 changed files with 420 additions and 69 deletions.
41 changes: 39 additions & 2 deletions pg_replicate/src/clients/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,10 +441,47 @@ impl ReplicationClient {
publication: &str,
slot_name: &str,
start_lsn: PgLsn,
) -> Result<(LogicalReplicationStream, bool), ReplicationClientError> {
match self
.get_stream(publication, slot_name, start_lsn, true)
.await
{
Ok(stream) => {
info!("binary format supported by logical replication");
Ok((stream, true))
}
Err(rce) => {
if let ReplicationClientError::TokioPostgresError(e) = &rce {
if let Some(dbe) = e.as_db_error() {
//TODO: use a more robust method of recognizing whether the server supports binary option or not
if dbe.message() == "unrecognized pgoutput option: binary" {
info!(
"binary format not supported by logical replication, trying text"
);
return self
.get_stream(publication, slot_name, start_lsn, false)
.await
.map(|s| (s, false));
}
}
}
Err(rce)
}
}
}

pub async fn get_stream(
&self,
publication: &str,
slot_name: &str,
start_lsn: PgLsn,
binary_format: bool,
) -> Result<LogicalReplicationStream, ReplicationClientError> {
let binary_option = if binary_format { r#", "binary""# } else { "" };
let options = format!(
r#"("proto_version" '1', "publication_names" {}, "binary")"#,
quote_literal(publication)
r#"("proto_version" '1', "publication_names" {}{})"#,
quote_literal(publication),
binary_option
);

let query = format!(
Expand Down
17 changes: 17 additions & 0 deletions pg_replicate/src/conversions/bool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use thiserror::Error;

#[derive(Debug, Error)]
pub enum ParseBoolError {
#[error("invalid input value: {0}")]
InvalidInput(String),
}

pub fn parse_bool(s: &str) -> Result<bool, ParseBoolError> {
if s == "t" {
Ok(true)
} else if s == "f" {
Ok(false)
} else {
Err(ParseBoolError::InvalidInput(s.to_string()))
}
}
Loading

0 comments on commit 30bb8d7

Please sign in to comment.