diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 30455181dcc3..c6b11ee57ef7 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -36,16 +36,16 @@ //! - The value is a [TableNameValue] struct; it contains the table id. //! - Used in the table name to table id lookup. //! -//! 6. Flow info key: `__flow/{catalog}/info/{flow_id}` +//! 6. Flow info key: `__flow/info/{flow_id}` //! - Stores metadata of the flow. //! -//! 7. Flow name key: `__flow/{catalog}/name/{flow_name}` +//! 7. Flow name key: `__flow/name/{catalog}/{flow_name}` //! - Mapping {catalog}/{flow_name} to {flow_id} //! -//! 8. Flownode flow key: `__flow/{catalog}/flownode/{flownode_id}/{flow_id}/{partition_id}` +//! 8. Flownode flow key: `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}` //! - Mapping {flownode_id} to {flow_id} //! -//! 9. Table flow key: `__table_flow/{catalog}/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}` +//! 9. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}` //! - Mapping source table's {table_id} to {flownode_id} //! - Used in `Flownode` booting. //! @@ -60,12 +60,12 @@ //! The whole picture of flow keys will be like this: //! //! __flow/ -//! {catalog}/ -//! info/ -//! {tsak_id} +//! info/ +//! {flow_id} //! //! name/ -//! {flow_name} +//! {catalog_name} +//! {flow_name} //! //! flownode/ //! {flownode_id}/ @@ -84,7 +84,6 @@ pub mod datanode_table; #[allow(unused)] pub mod flow; pub mod schema_name; -pub mod scope; pub mod table_info; pub mod table_name; // TODO(weny): removes it. @@ -194,6 +193,32 @@ pub trait TableMetaKey { fn as_raw_key(&self) -> Vec; } +/// The key of metadata. +pub trait MetaKey { + fn to_bytes(&self) -> Vec; + + fn from_bytes(bytes: &[u8]) -> Result; +} + +#[derive(Debug, Clone, PartialEq)] +pub struct BytesAdapter(Vec); + +impl From> for BytesAdapter { + fn from(value: Vec) -> Self { + Self(value) + } +} + +impl MetaKey for BytesAdapter { + fn to_bytes(&self) -> Vec { + self.0.clone() + } + + fn from_bytes(bytes: &[u8]) -> Result { + Ok(BytesAdapter(bytes.to_vec())) + } +} + pub(crate) trait TableMetaKeyGetTxnOp { fn build_get_op( &self, diff --git a/src/common/meta/src/key/catalog_name.rs b/src/common/meta/src/key/catalog_name.rs index 63873177b1b7..648d84f259cc 100644 --- a/src/common/meta/src/key/catalog_name.rs +++ b/src/common/meta/src/key/catalog_name.rs @@ -82,7 +82,7 @@ impl<'a> TryFrom<&'a str> for CatalogNameKey<'a> { } } -/// Decoder `KeyValue` to ({catalog},()) +/// Decoder `KeyValue` to {catalog} pub fn catalog_decoder(kv: KeyValue) -> Result { let str = std::str::from_utf8(&kv.key).context(error::ConvertRawKeySnafu)?; let catalog_name = CatalogNameKey::try_from(str)?; diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index cbda6aa88276..8d3660a42e51 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -30,9 +30,8 @@ use crate::key::flow::flow_info::FlowInfoManager; use crate::key::flow::flow_name::FlowNameManager; use crate::key::flow::flownode_flow::FlownodeFlowManager; use crate::key::flow::table_flow::TableFlowManager; -use crate::key::scope::MetaKey; use crate::key::txn_helper::TxnOpGetResponseSet; -use crate::key::FlowId; +use crate::key::{FlowId, MetaKey}; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; @@ -139,20 +138,15 @@ impl FlowMetadataManager { .flow_name_manager .build_create_txn(&flow_value.catalog_name, &flow_value.flow_name, flow_id)?; - let (create_flow_txn, on_create_flow_failure) = self.flow_info_manager.build_create_txn( - &flow_value.catalog_name, - flow_id, - &flow_value, - )?; + let (create_flow_txn, on_create_flow_failure) = self + .flow_info_manager + .build_create_txn(flow_id, &flow_value)?; - let create_flownode_flow_txn = self.flownode_flow_manager.build_create_txn( - &flow_value.catalog_name, - flow_id, - flow_value.flownode_ids().clone(), - ); + let create_flownode_flow_txn = self + .flownode_flow_manager + .build_create_txn(flow_id, flow_value.flownode_ids().clone()); let create_table_flow_txn = self.table_flow_manager.build_create_txn( - &flow_value.catalog_name, flow_id, flow_value.flownode_ids().clone(), flow_value.source_table_ids(), @@ -222,7 +216,6 @@ mod tests { use super::*; use crate::key::flow::table_flow::TableFlowKey; - use crate::key::scope::CatalogScoped; use crate::kv_backend::memory::MemoryKvBackend; use crate::table_name::TableName; @@ -245,27 +238,23 @@ mod tests { #[test] fn test_flow_scoped_to_bytes() { - let key = FlowScoped::new(CatalogScoped::new( - "my_catalog".to_string(), - MockKey { - inner: b"hi".to_vec(), - }, - )); - assert_eq!(b"__flow/my_catalog/hi".to_vec(), key.to_bytes()); + let key = FlowScoped::new(MockKey { + inner: b"hi".to_vec(), + }); + assert_eq!(b"__flow/hi".to_vec(), key.to_bytes()); } #[test] fn test_flow_scoped_from_bytes() { - let bytes = b"__flow/my_catalog/hi"; - let key = FlowScoped::>::from_bytes(bytes).unwrap(); - assert_eq!(key.catalog(), "my_catalog"); + let bytes = b"__flow/hi"; + let key = FlowScoped::::from_bytes(bytes).unwrap(); assert_eq!(key.inner.inner, b"hi".to_vec()); } #[test] fn test_flow_scoped_from_bytes_mismatch() { - let bytes = b"__table/my_catalog/hi"; - let err = FlowScoped::>::from_bytes(bytes).unwrap_err(); + let bytes = b"__table/hi"; + let err = FlowScoped::::from_bytes(bytes).unwrap_err(); assert_matches!(err, error::Error::MismatchPrefix { .. }); } @@ -302,14 +291,14 @@ mod tests { .unwrap(); let got = flow_metadata_manager .flow_info_manager() - .get(catalog_name, flow_id) + .get(flow_id) .await .unwrap() .unwrap(); assert_eq!(got, flow_value); let flows = flow_metadata_manager .flownode_flow_manager() - .flows(catalog_name, 1) + .flows(1) .try_collect::>() .await .unwrap(); @@ -317,20 +306,11 @@ mod tests { for table_id in [1024, 1025, 1026] { let nodes = flow_metadata_manager .table_flow_manager() - .nodes(catalog_name, table_id) + .nodes(table_id) .try_collect::>() .await .unwrap(); - assert_eq!( - nodes, - vec![TableFlowKey::new( - catalog_name.to_string(), - table_id, - 1, - flow_id, - 0 - )] - ); + assert_eq!(nodes, vec![TableFlowKey::new(table_id, 1, flow_id, 0)]); } } diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index f9b9ae4b259d..b1f07845edb7 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -22,9 +22,10 @@ use table::metadata::TableId; use crate::error::{self, Result}; use crate::key::flow::FlowScoped; -use crate::key::scope::{CatalogScoped, MetaKey}; use crate::key::txn_helper::TxnOpGetResponseSet; -use crate::key::{txn_helper, DeserializedValueWithBytes, FlowId, FlowPartitionId, TableMetaValue}; +use crate::key::{ + txn_helper, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetaKey, TableMetaValue, +}; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; use crate::table_name::TableName; @@ -39,8 +40,8 @@ lazy_static! { /// The key stores the metadata of the flow. /// -/// The layout: `__flow/{catalog}/info/{flow_id}`. -pub struct FlowInfoKey(FlowScoped>); +/// The layout: `__flow/info/{flow_id}`. +pub struct FlowInfoKey(FlowScoped); impl MetaKey for FlowInfoKey { fn to_bytes(&self) -> Vec { @@ -48,22 +49,17 @@ impl MetaKey for FlowInfoKey { } fn from_bytes(bytes: &[u8]) -> Result { - Ok(FlowInfoKey( - FlowScoped::>::from_bytes(bytes)?, - )) + Ok(FlowInfoKey(FlowScoped::::from_bytes( + bytes, + )?)) } } impl FlowInfoKey { /// Returns the [FlowInfoKey]. - pub fn new(catalog: String, flow_id: FlowId) -> FlowInfoKey { + pub fn new(flow_id: FlowId) -> FlowInfoKey { let inner = FlowInfoKeyInner::new(flow_id); - FlowInfoKey(FlowScoped::new(CatalogScoped::new(catalog, inner))) - } - - /// Returns the catalog. - pub fn catalog(&self) -> &str { - self.0.catalog() + FlowInfoKey(FlowScoped::new(inner)) } /// Returns the [FlowId]. @@ -159,8 +155,8 @@ impl FlowInfoManager { } /// Returns the [FlowInfoValue] of specified `flow_id`. - pub async fn get(&self, catalog: &str, flow_id: FlowId) -> Result> { - let key = FlowInfoKey::new(catalog.to_string(), flow_id).to_bytes(); + pub async fn get(&self, flow_id: FlowId) -> Result> { + let key = FlowInfoKey::new(flow_id).to_bytes(); self.kv_backend .get(&key) .await? @@ -169,11 +165,10 @@ impl FlowInfoManager { } /// Builds a create flow transaction. - /// It is expected that the `__flow/{catalog}/info/{flow_id}` wasn't occupied. + /// It is expected that the `__flow/info/{flow_id}` wasn't occupied. /// Otherwise, the transaction will retrieve existing value. pub(crate) fn build_create_txn( &self, - catalog: &str, flow_id: FlowId, flow_value: &FlowInfoValue, ) -> Result<( @@ -182,7 +177,7 @@ impl FlowInfoManager { &mut TxnOpGetResponseSet, ) -> Result>>, )> { - let key = FlowInfoKey::new(catalog.to_string(), flow_id).to_bytes(); + let key = FlowInfoKey::new(flow_id).to_bytes(); let txn = txn_helper::build_put_if_absent_txn(key.clone(), flow_value.try_as_raw_value()?); Ok(( @@ -198,15 +193,14 @@ mod tests { #[test] fn test_key_serialization() { - let flow_info = FlowInfoKey::new("my_catalog".to_string(), 2); - assert_eq!(b"__flow/my_catalog/info/2".to_vec(), flow_info.to_bytes()); + let flow_info = FlowInfoKey::new(2); + assert_eq!(b"__flow/info/2".to_vec(), flow_info.to_bytes()); } #[test] fn test_key_deserialization() { - let bytes = b"__flow/my_catalog/info/2".to_vec(); + let bytes = b"__flow/info/2".to_vec(); let key = FlowInfoKey::from_bytes(&bytes).unwrap(); - assert_eq!(key.catalog(), "my_catalog"); assert_eq!(key.flow_id(), 2); } } diff --git a/src/common/meta/src/key/flow/flow_name.rs b/src/common/meta/src/key/flow/flow_name.rs index dbb6d81c35b1..031b19ce64af 100644 --- a/src/common/meta/src/key/flow/flow_name.rs +++ b/src/common/meta/src/key/flow/flow_name.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use api::v1::flow::flow_server::Flow; use lazy_static::lazy_static; use regex::Regex; use serde::{Deserialize, Serialize}; @@ -19,34 +20,37 @@ use snafu::OptionExt; use crate::error::{self, Result}; use crate::key::flow::FlowScoped; -use crate::key::scope::{CatalogScoped, MetaKey}; use crate::key::txn_helper::TxnOpGetResponseSet; -use crate::key::{txn_helper, DeserializedValueWithBytes, FlowId, TableMetaValue, NAME_PATTERN}; +use crate::key::{ + txn_helper, DeserializedValueWithBytes, FlowId, MetaKey, TableMetaValue, NAME_PATTERN, +}; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; const FLOW_NAME_KEY_PREFIX: &str = "name"; lazy_static! { - static ref FLOW_NAME_KEY_PATTERN: Regex = - Regex::new(&format!("^{FLOW_NAME_KEY_PREFIX}/({NAME_PATTERN})$")).unwrap(); + static ref FLOW_NAME_KEY_PATTERN: Regex = Regex::new(&format!( + "^{FLOW_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$" + )) + .unwrap(); } /// The key of mapping {flow_name} to [FlowId]. /// -/// The layout: `__flow/{catalog}/name/{flow_name}`. -pub struct FlowNameKey(FlowScoped>); +/// The layout: `__flow/name/{catalog_name}/{flow_name}`. +pub struct FlowNameKey(FlowScoped); impl FlowNameKey { /// Returns the [FlowNameKey] pub fn new(catalog: String, flow_name: String) -> FlowNameKey { - let inner = FlowNameKeyInner::new(flow_name); - FlowNameKey(FlowScoped::new(CatalogScoped::new(catalog, inner))) + let inner = FlowNameKeyInner::new(catalog, flow_name); + FlowNameKey(FlowScoped::new(inner)) } /// Returns the catalog. pub fn catalog(&self) -> &str { - self.0.catalog() + &self.0.catalog_name } /// Return the `flow_name` @@ -61,21 +65,26 @@ impl MetaKey for FlowNameKey { } fn from_bytes(bytes: &[u8]) -> Result { - Ok(FlowNameKey( - FlowScoped::>::from_bytes(bytes)?, - )) + Ok(FlowNameKey(FlowScoped::::from_bytes( + bytes, + )?)) } } /// The key of mapping name to [FlowId] #[derive(Debug, Clone, PartialEq, Eq)] pub struct FlowNameKeyInner { + pub catalog_name: String, pub flow_name: String, } impl MetaKey for FlowNameKeyInner { fn to_bytes(&self) -> Vec { - format!("{FLOW_NAME_KEY_PREFIX}/{}", self.flow_name).into_bytes() + format!( + "{FLOW_NAME_KEY_PREFIX}/{}/{}", + self.catalog_name, self.flow_name + ) + .into_bytes() } fn from_bytes(bytes: &[u8]) -> Result { @@ -95,15 +104,22 @@ impl MetaKey for FlowNameKeyInner { err_msg: format!("Invalid FlowNameKeyInner '{key}'"), })?; // Safety: pass the regex check above - let flow_name = captures[1].to_string(); - Ok(FlowNameKeyInner { flow_name }) + let catalog_name = captures[1].to_string(); + let flow_name = captures[2].to_string(); + Ok(FlowNameKeyInner { + catalog_name, + flow_name, + }) } } impl FlowNameKeyInner { /// Returns a [FlowNameKeyInner]. - pub fn new(flow_name: String) -> Self { - Self { flow_name } + pub fn new(catalog_name: String, flow_name: String) -> Self { + Self { + catalog_name, + flow_name, + } } } @@ -155,12 +171,12 @@ impl FlowNameManager { } /// Builds a create flow name transaction. - /// It's expected that the `__flow/{catalog}/name/{flow_name}` wasn't occupied. + /// It's expected that the `__flow/name/{catalog}/{flow_name}` wasn't occupied. /// Otherwise, the transaction will retrieve existing value. pub fn build_create_txn( &self, - catalog: &str, - name: &str, + catalog_name: &str, + flow_name: &str, flow_id: FlowId, ) -> Result<( Txn, @@ -168,7 +184,7 @@ impl FlowNameManager { &mut TxnOpGetResponseSet, ) -> Result>>, )> { - let key = FlowNameKey::new(catalog.to_string(), name.to_string()); + let key = FlowNameKey::new(catalog_name.to_string(), flow_name.to_string()); let raw_key = key.to_bytes(); let flow_flow_name_value = FlowNameValue::new(flow_id); let txn = txn_helper::build_put_if_absent_txn( @@ -190,12 +206,12 @@ mod tests { #[test] fn test_key_serialization() { let key = FlowNameKey::new("my_catalog".to_string(), "my_task".to_string()); - assert_eq!(b"__flow/my_catalog/name/my_task".to_vec(), key.to_bytes(),); + assert_eq!(b"__flow/name/my_catalog/my_task".to_vec(), key.to_bytes(),); } #[test] fn test_key_deserialization() { - let bytes = b"__flow/my_catalog/name/my_task".to_vec(); + let bytes = b"__flow/name/my_catalog/my_task".to_vec(); let key = FlowNameKey::from_bytes(&bytes).unwrap(); assert_eq!(key.catalog(), "my_catalog"); assert_eq!(key.flow_name(), "my_task"); diff --git a/src/common/meta/src/key/flow/flownode_flow.rs b/src/common/meta/src/key/flow/flownode_flow.rs index 360b96b0f56f..d584c8b25776 100644 --- a/src/common/meta/src/key/flow/flownode_flow.rs +++ b/src/common/meta/src/key/flow/flownode_flow.rs @@ -22,8 +22,7 @@ use snafu::OptionExt; use crate::error::{self, Result}; use crate::key::flow::FlowScoped; -use crate::key::scope::{BytesAdapter, CatalogScoped, MetaKey}; -use crate::key::{FlowId, FlowPartitionId}; +use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetaKey}; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; @@ -42,8 +41,8 @@ const FLOWNODE_FLOW_KEY_PREFIX: &str = "flownode"; /// The key of mapping [FlownodeId] to [FlowId]. /// -/// The layout `__flow/{catalog}/flownode/{flownode_id}/{flow_id}/{partition_id}` -pub struct FlownodeFlowKey(FlowScoped>); +/// The layout `__flow/flownode/{flownode_id}/{flow_id}/{partition_id}` +pub struct FlownodeFlowKey(FlowScoped); impl MetaKey for FlownodeFlowKey { fn to_bytes(&self) -> Vec { @@ -51,37 +50,29 @@ impl MetaKey for FlownodeFlowKey { } fn from_bytes(bytes: &[u8]) -> Result { - Ok(FlownodeFlowKey(FlowScoped::< - CatalogScoped, - >::from_bytes(bytes)?)) + Ok(FlownodeFlowKey( + FlowScoped::::from_bytes(bytes)?, + )) } } impl FlownodeFlowKey { /// Returns a new [FlownodeFlowKey]. pub fn new( - catalog: String, flownode_id: FlownodeId, flow_id: FlowId, partition_id: FlowPartitionId, ) -> FlownodeFlowKey { let inner = FlownodeFlowKeyInner::new(flownode_id, flow_id, partition_id); - FlownodeFlowKey(FlowScoped::new(CatalogScoped::new(catalog, inner))) + FlownodeFlowKey(FlowScoped::new(inner)) } /// The prefix used to retrieve all [FlownodeFlowKey]s with the specified `flownode_id`. - pub fn range_start_key(catalog: String, flownode_id: FlownodeId) -> Vec { - let catalog_scoped_key = CatalogScoped::new( - catalog, - BytesAdapter::from(FlownodeFlowKeyInner::range_start_key(flownode_id).into_bytes()), - ); - - FlowScoped::new(catalog_scoped_key).to_bytes() - } + pub fn range_start_key(flownode_id: FlownodeId) -> Vec { + let inner = + BytesAdapter::from(FlownodeFlowKeyInner::range_start_key(flownode_id).into_bytes()); - /// Returns the catalog. - pub fn catalog(&self) -> &str { - self.0.catalog() + FlowScoped::new(inner).to_bytes() } /// Returns the [FlowId]. @@ -184,10 +175,9 @@ impl FlownodeFlowManager { /// Retrieves all [FlowId] and [FlowPartitionId]s of the specified `flownode_id`. pub fn flows( &self, - catalog: &str, flownode_id: FlownodeId, ) -> BoxStream<'static, Result<(FlowId, FlowPartitionId)>> { - let start_key = FlownodeFlowKey::range_start_key(catalog.to_string(), flownode_id); + let start_key = FlownodeFlowKey::range_start_key(flownode_id); let req = RangeRequest::new().with_prefix(start_key); let stream = PaginationStream::new( @@ -205,16 +195,13 @@ impl FlownodeFlowManager { /// Puts `__flownode_flow/{flownode_id}/{flow_id}/{partition_id}` keys. pub(crate) fn build_create_txn>( &self, - catalog: &str, flow_id: FlowId, flownode_ids: I, ) -> Txn { let txns = flownode_ids .into_iter() .map(|(partition_id, flownode_id)| { - let key = - FlownodeFlowKey::new(catalog.to_string(), flownode_id, flow_id, partition_id) - .to_bytes(); + let key = FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes(); TxnOp::Put(key, vec![]) }) .collect::>(); @@ -226,24 +213,20 @@ impl FlownodeFlowManager { #[cfg(test)] mod tests { use crate::key::flow::flownode_flow::FlownodeFlowKey; - use crate::key::scope::MetaKey; + use crate::key::MetaKey; #[test] fn test_key_serialization() { - let flownode_flow = FlownodeFlowKey::new("my_catalog".to_string(), 1, 2, 0); - assert_eq!( - b"__flow/my_catalog/flownode/1/2/0".to_vec(), - flownode_flow.to_bytes() - ); - let prefix = FlownodeFlowKey::range_start_key("my_catalog".to_string(), 1); - assert_eq!(b"__flow/my_catalog/flownode/1/".to_vec(), prefix); + let flownode_flow = FlownodeFlowKey::new(1, 2, 0); + assert_eq!(b"__flow/flownode/1/2/0".to_vec(), flownode_flow.to_bytes()); + let prefix = FlownodeFlowKey::range_start_key(1); + assert_eq!(b"__flow/flownode/1/".to_vec(), prefix); } #[test] fn test_key_deserialization() { - let bytes = b"__flow/my_catalog/flownode/1/2/0".to_vec(); + let bytes = b"__flow/flownode/1/2/0".to_vec(); let key = FlownodeFlowKey::from_bytes(&bytes).unwrap(); - assert_eq!(key.catalog(), "my_catalog"); assert_eq!(key.flownode_id(), 1); assert_eq!(key.flow_id(), 2); assert_eq!(key.partition_id(), 0); diff --git a/src/common/meta/src/key/flow/table_flow.rs b/src/common/meta/src/key/flow/table_flow.rs index d3cabd86f276..9eff61d10e81 100644 --- a/src/common/meta/src/key/flow/table_flow.rs +++ b/src/common/meta/src/key/flow/table_flow.rs @@ -22,8 +22,7 @@ use table::metadata::TableId; use crate::error::{self, Result}; use crate::key::flow::FlowScoped; -use crate::key::scope::{BytesAdapter, CatalogScoped, MetaKey}; -use crate::key::{FlowId, FlowPartitionId}; +use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetaKey}; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; @@ -51,9 +50,9 @@ struct TableFlowKeyInner { /// The key of mapping [TableId] to [FlownodeId] and [FlowId]. /// -/// The layout: `__flow/{catalog}/table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`. +/// The layout: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`. #[derive(Debug, PartialEq)] -pub struct TableFlowKey(FlowScoped>); +pub struct TableFlowKey(FlowScoped); impl MetaKey for TableFlowKey { fn to_bytes(&self) -> Vec { @@ -61,38 +60,29 @@ impl MetaKey for TableFlowKey { } fn from_bytes(bytes: &[u8]) -> Result { - Ok(TableFlowKey( - FlowScoped::>::from_bytes(bytes)?, - )) + Ok(TableFlowKey(FlowScoped::::from_bytes( + bytes, + )?)) } } impl TableFlowKey { /// Returns a new [TableFlowKey]. pub fn new( - catalog: String, table_id: TableId, flownode_id: FlownodeId, flow_id: FlowId, partition_id: FlowPartitionId, ) -> TableFlowKey { let inner = TableFlowKeyInner::new(table_id, flownode_id, flow_id, partition_id); - TableFlowKey(FlowScoped::new(CatalogScoped::new(catalog, inner))) + TableFlowKey(FlowScoped::new(inner)) } /// The prefix used to retrieve all [TableFlowKey]s with the specified `table_id`. - pub fn range_start_key(catalog: String, table_id: TableId) -> Vec { - let catalog_scoped_key = CatalogScoped::new( - catalog, - BytesAdapter::from(TableFlowKeyInner::range_start_key(table_id).into_bytes()), - ); - - FlowScoped::new(catalog_scoped_key).to_bytes() - } + pub fn range_start_key(table_id: TableId) -> Vec { + let inner = BytesAdapter::from(TableFlowKeyInner::range_start_key(table_id).into_bytes()); - /// Returns the catalog. - pub fn catalog(&self) -> &str { - self.0.catalog() + FlowScoped::new(inner).to_bytes() } /// Returns the source [TableId]. @@ -198,12 +188,8 @@ impl TableFlowManager { } /// Retrieves all [TableFlowKey]s of the specified `table_id`. - pub fn nodes( - &self, - catalog: &str, - table_id: TableId, - ) -> BoxStream<'static, Result> { - let start_key = TableFlowKey::range_start_key(catalog.to_string(), table_id); + pub fn nodes(&self, table_id: TableId) -> BoxStream<'static, Result> { + let start_key = TableFlowKey::range_start_key(table_id); let req = RangeRequest::new().with_prefix(start_key); let stream = PaginationStream::new( self.kv_backend.clone(), @@ -217,10 +203,9 @@ impl TableFlowManager { /// Builds a create table flow transaction. /// - /// Puts `__table_flow/{table_id}/{node_id}/{partition_id}` keys. + /// Puts `__flow/source_table/{table_id}/{node_id}/{partition_id}` keys. pub fn build_create_txn>( &self, - catalog: &str, flow_id: FlowId, flownode_ids: I, source_table_ids: &[TableId], @@ -230,14 +215,7 @@ impl TableFlowManager { .flat_map(|(partition_id, flownode_id)| { source_table_ids.iter().map(move |table_id| { TxnOp::Put( - TableFlowKey::new( - catalog.to_string(), - *table_id, - flownode_id, - flow_id, - partition_id, - ) - .to_bytes(), + TableFlowKey::new(*table_id, flownode_id, flow_id, partition_id).to_bytes(), vec![], ) }) @@ -254,20 +232,19 @@ mod tests { #[test] fn test_key_serialization() { - let table_flow_key = TableFlowKey::new("my_catalog".to_string(), 1024, 1, 2, 0); + let table_flow_key = TableFlowKey::new(1024, 1, 2, 0); assert_eq!( - b"__flow/my_catalog/source_table/1024/1/2/0".to_vec(), + b"__flow/source_table/1024/1/2/0".to_vec(), table_flow_key.to_bytes(), ); - let prefix = TableFlowKey::range_start_key("my_catalog".to_string(), 1024); - assert_eq!(b"__flow/my_catalog/source_table/1024/".to_vec(), prefix); + let prefix = TableFlowKey::range_start_key(1024); + assert_eq!(b"__flow/source_table/1024/".to_vec(), prefix); } #[test] fn test_key_deserialization() { - let bytes = b"__flow/my_catalog/source_table/1024/1/2/0".to_vec(); + let bytes = b"__flow/source_table/1024/1/2/0".to_vec(); let key = TableFlowKey::from_bytes(&bytes).unwrap(); - assert_eq!(key.catalog(), "my_catalog"); assert_eq!(key.source_table_id(), 1024); assert_eq!(key.flownode_id(), 1); assert_eq!(key.flow_id(), 2); diff --git a/src/common/meta/src/key/scope.rs b/src/common/meta/src/key/scope.rs deleted file mode 100644 index 7f185a81d326..000000000000 --- a/src/common/meta/src/key/scope.rs +++ /dev/null @@ -1,152 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::ops::Deref; - -use snafu::OptionExt; - -use crate::error::{self, Result}; - -/// The delimiter of key. -pub(crate) const DELIMITER: u8 = b'/'; - -/// The key of metadata. -pub trait MetaKey { - fn to_bytes(&self) -> Vec; - - fn from_bytes(bytes: &[u8]) -> Result; -} - -/// The key of `{catalog}/` scope. -#[derive(Debug, PartialEq)] -pub struct CatalogScoped { - inner: T, - catalog: String, -} - -impl Deref for CatalogScoped { - type Target = T; - - fn deref(&self) -> &Self::Target { - &self.inner - } -} - -impl CatalogScoped { - /// Returns a new [CatalogScoped] key. - pub fn new(catalog: String, inner: T) -> CatalogScoped { - CatalogScoped { inner, catalog } - } - - /// Returns the `catalog`. - pub fn catalog(&self) -> &str { - &self.catalog - } -} - -impl> MetaKey> for CatalogScoped { - fn to_bytes(&self) -> Vec { - let prefix = self.catalog.as_bytes(); - let inner = self.inner.to_bytes(); - let mut bytes = Vec::with_capacity(prefix.len() + inner.len() + 1); - bytes.extend(prefix); - bytes.push(DELIMITER); - bytes.extend(inner); - bytes - } - - fn from_bytes(bytes: &[u8]) -> Result> { - let pos = bytes - .iter() - .position(|c| *c == DELIMITER) - .with_context(|| error::DelimiterNotFoundSnafu { - key: String::from_utf8_lossy(bytes), - })?; - let catalog = String::from_utf8_lossy(&bytes[0..pos]).to_string(); - // Safety: We don't need the `DELIMITER` char. - let inner = T::from_bytes(&bytes[pos + 1..])?; - Ok(CatalogScoped { inner, catalog }) - } -} - -#[derive(Debug, Clone, PartialEq)] -pub struct BytesAdapter(Vec); - -impl From> for BytesAdapter { - fn from(value: Vec) -> Self { - Self(value) - } -} - -impl MetaKey for BytesAdapter { - fn to_bytes(&self) -> Vec { - self.0.clone() - } - - fn from_bytes(bytes: &[u8]) -> Result { - Ok(BytesAdapter(bytes.to_vec())) - } -} - -#[cfg(test)] -mod tests { - use std::assert_matches::assert_matches; - - use super::*; - use crate::error::Result; - - #[derive(Debug)] - struct MockKey { - inner: Vec, - } - - impl MetaKey for MockKey { - fn to_bytes(&self) -> Vec { - self.inner.clone() - } - - fn from_bytes(bytes: &[u8]) -> Result { - Ok(MockKey { - inner: bytes.to_vec(), - }) - } - } - - #[test] - fn test_catalog_scoped_from_bytes() { - let key = "test_catalog_name/key"; - let scoped_key = CatalogScoped::::from_bytes(key.as_bytes()).unwrap(); - assert_eq!(scoped_key.catalog, "test_catalog_name"); - assert_eq!(scoped_key.inner.inner, b"key".to_vec()); - assert_eq!(key.as_bytes(), &scoped_key.to_bytes()); - } - - #[test] - fn test_catalog_scoped_from_bytes_delimiter_not_found() { - let key = "test_catalog_name"; - let err = CatalogScoped::::from_bytes(key.as_bytes()).unwrap_err(); - assert_matches!(err, error::Error::DelimiterNotFound { .. }); - } - - #[test] - fn test_catalog_scoped_to_bytes() { - let scoped_key = CatalogScoped { - inner: MockKey { - inner: b"hi".to_vec(), - }, - catalog: "test_catalog".to_string(), - }; - assert_eq!(b"test_catalog/hi".to_vec(), scoped_key.to_bytes()); - } -}