Skip to content
This repository has been archived by the owner on Nov 19, 2024. It is now read-only.

Commit

Permalink
fix: update to use latest clickhouse version
Browse files Browse the repository at this point in the history
Signed-off-by: Gustavo <[email protected]>
  • Loading branch information
gusinacio committed Jul 3, 2024
1 parent 5bdd188 commit 2fdc5fb
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 27 deletions.
26 changes: 11 additions & 15 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use futures03::future::join_all;
use futures03::StreamExt;
use hyper_rustls::HttpsConnectorBuilder;
use loader::Cursor;
use tracing::{error, info};
use logging::LogConfig;
use pb::sf::substreams::v1::Package;
use tracing::{error, info};
use tracing_core::LevelFilter;
use tracing_subscriber::{prelude::*, Registry, EnvFilter};
use tracing_subscriber::{prelude::*, EnvFilter, Registry};
use url::Url;

use prost::Message;
Expand Down Expand Up @@ -78,10 +78,10 @@ pub enum ElricError {
PackageFileError(#[from] std::io::Error),
#[error("Could not decode package")]
PackageDecodeError(#[from] prost::DecodeError),
#[error("Could not load cursor")]
CursorError,
#[error("Could not load schema")]
LoadSchemaError,
#[error("Could not load cursor: {0}")]
CursorError(anyhow::Error),
#[error("Could not load schema: {0}")]
LoadSchemaError(clickhouse::error::Error),
#[error("Could not insert cursor")]
InsertCursorError,
#[error("Could not insert row")]
Expand Down Expand Up @@ -129,7 +129,9 @@ async fn main() -> Result<(), Error> {
Some(token) => token,
None => token.ok_or(ElricError::TokenNotFound)?,
};
let cursor = load_persisted_cursor(&client, &id).await.map_err(|_| ElricError::CursorError)?;
let cursor = load_persisted_cursor(&client, &id)
.await
.map_err(|e| ElricError::CursorError(e))?;
let stream = create_stream(
cursor,
package_file,
Expand All @@ -138,13 +140,8 @@ async fn main() -> Result<(), Error> {
token,
start_block,
end_block,
)?;
run(
id,
stream,
client,
)
.await?;
)?;
run(id, stream, client).await?;
}
}
Ok(())
Expand Down Expand Up @@ -177,7 +174,6 @@ async fn run(
mut stream: SubstreamsStream,
client: clickhouse::Client,
) -> Result<(), ElricError> {

let table_info = get_table_information(&client).await?;

let dynamic_tables = table_info
Expand Down
22 changes: 10 additions & 12 deletions src/table_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,17 +245,15 @@ pub async fn get_table_information(client: &Client) -> Result<Vec<TableInfo>, El
let query = client.query(
format!(
"
SELECT
table_schema,
table_name
FROM
information_schema.tables
WHERE
table_type = 1 AND
table_schema = '{}'
ORDER BY
table_schema,
table_name
SELECT database AS table_schema,
name AS table_name
FROM system.tables
WHERE NOT is_temporary
AND engine NOT LIKE '%View'
AND engine NOT LIKE 'System%'
AND has_own_data != 0
AND database = '{}'
ORDER BY database, name
",
client.database().unwrap_or("default")
)
Expand All @@ -264,6 +262,6 @@ pub async fn get_table_information(client: &Client) -> Result<Vec<TableInfo>, El
let result = query
.fetch_all()
.await
.map_err(|_| ElricError::LoadSchemaError)?;
.map_err(|e| ElricError::LoadSchemaError(e))?;
Ok(result)
}

0 comments on commit 2fdc5fb

Please sign in to comment.