Skip to content

Commit

Permalink
refactor: passing QueryContext to RegionServer
Browse files Browse the repository at this point in the history
  • Loading branch information
Kelvinyu1117 committed Apr 28, 2024
1 parent dadee99 commit a8eff60
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2c14c6e22dfe957f40bb88dd01fb8530656de89b" }
greptime-proto = { git = "https://github.com/Kelvinyu1117/greptime-proto", branch = "refactor/kelvin-add-query-context"}
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
14 changes: 11 additions & 3 deletions src/query/src/dist_plan/merge_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion_physical_expr::EquivalenceProperties;
use datatypes::schema::{Schema, SchemaRef};
use futures_util::StreamExt;
use greptime_proto::v1::region::{QueryRequest, RegionRequestHeader};
use greptime_proto::v1::region::{QueryContext, QueryRequest, RegionRequestHeader};
use meter_core::data::ReadItem;
use meter_macros::read_meter;
use session::context::QueryContextRef;
Expand Down Expand Up @@ -179,7 +179,10 @@ impl MergeScanExec {

let dbname = context.task_id().unwrap_or_default();
let tracing_context = TracingContext::from_json(context.session_id().as_str());
let tz = self.query_ctx.timezone().to_string();
let current_catalog = self.query_ctx.current_catalog().to_string();
let current_schema = self.query_ctx.current_schema().to_string();
let timezone = self.query_ctx.timezone().to_string();
let extension = self.query_ctx.to_extension();

let stream = Box::pin(stream!({
MERGE_SCAN_REGIONS.observe(regions.len() as f64);
Expand All @@ -192,7 +195,12 @@ impl MergeScanExec {
header: Some(RegionRequestHeader {
tracing_context: tracing_context.to_w3c(),
dbname: dbname.clone(),
timezone: tz.clone(),
query_context: Some(QueryContext {
current_catalog: current_catalog.clone(),
current_schema: current_schema.clone(),
timezone: timezone.clone(),
extension: extension.clone(),
}),
}),
region_id: region_id.into(),
plan: substrait_plan.clone(),
Expand Down
32 changes: 23 additions & 9 deletions src/session/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,25 @@ impl Clone for QueryContext {

impl From<&RegionRequestHeader> for QueryContext {
fn from(value: &RegionRequestHeader) -> Self {
let (catalog, schema) = parse_catalog_and_schema_from_db_string(&value.dbname);
QueryContext {
current_catalog: catalog.to_string(),
current_schema: schema.to_string(),
current_user: Default::default(),
timezone: ArcSwap::new(Arc::new(parse_timezone(Some(&value.timezone)))),
sql_dialect: Arc::new(GreptimeDbDialect {}),
extension: Default::default(),
configuration_parameter: Default::default(),
match &value.query_context {
Some(ctx) => QueryContext {
current_catalog: ctx.current_catalog.clone(),
current_schema: ctx.current_schema.clone(),
current_user: Default::default(),
timezone: ArcSwap::new(Arc::new(parse_timezone(Some(&ctx.timezone)))),
sql_dialect: Arc::new(GreptimeDbDialect {}),
extension: ctx.extension.clone(),
configuration_parameter: Default::default(),
},
None => QueryContext {
current_catalog: DEFAULT_CATALOG_NAME.to_string(),
current_schema: DEFAULT_SCHEMA_NAME.to_string(),
current_user: ArcSwap::new(Arc::new(None)),
timezone: ArcSwap::new(Arc::new(get_timezone(None).clone())),
sql_dialect: Arc::new(GreptimeDbDialect {}),
extension: HashMap::new(),
configuration_parameter: Default::default(),
},
}
}
}
Expand Down Expand Up @@ -169,6 +179,10 @@ impl QueryContext {
self.extension.get(key.as_ref()).map(|v| v.as_str())
}

pub fn to_extension(&self) -> HashMap<String, String> {
self.extension.clone()
}

/// SQL like `set variable` may change timezone or other info in `QueryContext`.
/// We need persist these change in `Session`.
pub fn update_session(&self, session: &SessionRef) {
Expand Down

0 comments on commit a8eff60

Please sign in to comment.