Skip to content

Commit

Permalink
refactor: use MetaKey trait instead of TableMetaKey
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Apr 30, 2024
1 parent 20cbc03 commit 9ea8af4
Show file tree
Hide file tree
Showing 17 changed files with 323 additions and 163 deletions.
18 changes: 9 additions & 9 deletions src/cmd/src/cli/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use common_meta::key::table_info::{TableInfoKey, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameValue};
use common_meta::key::table_region::{TableRegionKey, TableRegionValue};
use common_meta::key::table_route::{TableRouteKey, TableRouteValue as NextTableRouteValue};
use common_meta::key::{RegionDistribution, TableMetaKey, TableMetaValue};
use common_meta::key::{MetaKey, RegionDistribution, TableMetaValue};
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::KvBackendRef;
use common_meta::range_stream::PaginationStream;
Expand Down Expand Up @@ -137,7 +137,7 @@ impl MigrateTableMetadata {
while let Some((key, value)) = stream.try_next().await.context(error::IterStreamSnafu)? {
let table_id = self.migrate_table_route_key(value).await?;
keys.push(key);
keys.push(TableRegionKey::new(table_id).as_raw_key())
keys.push(TableRegionKey::new(table_id).to_bytes())
}

info!("Total migrated TableRouteKeys: {}", keys.len() / 2);
Expand Down Expand Up @@ -165,7 +165,7 @@ impl MigrateTableMetadata {
self.etcd_store
.put(
PutRequest::new()
.with_key(new_key.as_raw_key())
.with_key(new_key.to_bytes())
.with_value(new_table_value.try_as_raw_value().unwrap()),
)
.await
Expand Down Expand Up @@ -217,7 +217,7 @@ impl MigrateTableMetadata {
self.etcd_store
.put(
PutRequest::new()
.with_key(new_key.as_raw_key())
.with_key(new_key.to_bytes())
.with_value(schema_name_value.try_as_raw_value().unwrap()),
)
.await
Expand Down Expand Up @@ -269,7 +269,7 @@ impl MigrateTableMetadata {
self.etcd_store
.put(
PutRequest::new()
.with_key(new_key.as_raw_key())
.with_key(new_key.to_bytes())
.with_value(catalog_name_value.try_as_raw_value().unwrap()),
)
.await
Expand Down Expand Up @@ -346,11 +346,11 @@ impl MigrateTableMetadata {
.batch_put(
BatchPutRequest::new()
.add_kv(
table_info_key.as_raw_key(),
table_info_key.to_bytes(),
table_info_value.try_as_raw_value().unwrap(),
)
.add_kv(
table_region_key.as_raw_key(),
table_region_key.to_bytes(),
table_region_value.try_as_raw_value().unwrap(),
),
)
Expand Down Expand Up @@ -378,7 +378,7 @@ impl MigrateTableMetadata {
self.etcd_store
.put(
PutRequest::new()
.with_key(table_name_key.as_raw_key())
.with_key(table_name_key.to_bytes())
.with_value(table_name_value.try_as_raw_value().unwrap()),
)
.await
Expand Down Expand Up @@ -425,7 +425,7 @@ impl MigrateTableMetadata {
} else {
let mut req = BatchPutRequest::new();
for (key, value) in datanode_table_kvs {
req = req.add_kv(key.as_raw_key(), value.try_as_raw_value().unwrap());
req = req.add_kv(key.to_bytes(), value.try_as_raw_value().unwrap());
}
self.etcd_store.batch_put(req).await.unwrap();
}
Expand Down
10 changes: 5 additions & 5 deletions src/common/meta/src/cache_invalidator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::key::schema_name::SchemaNameKey;
use crate::key::table_info::TableInfoKey;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteKey;
use crate::key::TableMetaKey;
use crate::key::MetaKey;

/// KvBackend cache invalidator
#[async_trait::async_trait]
Expand Down Expand Up @@ -99,18 +99,18 @@ where
match cache {
CacheIdent::TableId(table_id) => {
let key = TableInfoKey::new(table_id);
self.invalidate_key(&key.as_raw_key()).await;
self.invalidate_key(&key.to_bytes()).await;

let key = &TableRouteKey { table_id };
self.invalidate_key(&key.as_raw_key()).await;
self.invalidate_key(&key.to_bytes()).await;
}
CacheIdent::TableName(table_name) => {
let key: TableNameKey = (&table_name).into();
self.invalidate_key(&key.as_raw_key()).await
self.invalidate_key(&key.to_bytes()).await
}
CacheIdent::SchemaName(schema_name) => {
let key: SchemaNameKey = (&schema_name).into();
self.invalidate_key(&key.as_raw_key()).await;
self.invalidate_key(&key.to_bytes()).await;
}
}
}
Expand Down
69 changes: 21 additions & 48 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,16 @@ pub type FlowId = u32;
/// The partition of flow.
pub type FlowPartitionId = u32;

lazy_static! {
static ref TABLE_INFO_KEY_PATTERN: Regex =
Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
}

lazy_static! {
static ref TABLE_ROUTE_KEY_PATTERN: Regex =
Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
}

lazy_static! {
static ref DATANODE_TABLE_KEY_PATTERN: Regex =
Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
Expand Down Expand Up @@ -189,15 +199,11 @@ lazy_static! {
.unwrap();
}

pub trait TableMetaKey {
fn as_raw_key(&self) -> Vec<u8>;
}

/// The key of metadata.
pub trait MetaKey<T> {
pub trait MetaKey<'a, T> {
fn to_bytes(&self) -> Vec<u8>;

fn from_bytes(bytes: &[u8]) -> Result<T>;
fn from_bytes(bytes: &'a [u8]) -> Result<T>;
}

#[derive(Debug, Clone, PartialEq)]
Expand All @@ -209,12 +215,12 @@ impl From<Vec<u8>> for BytesAdapter {
}
}

impl MetaKey<BytesAdapter> for BytesAdapter {
impl<'a> MetaKey<'a, BytesAdapter> for BytesAdapter {
fn to_bytes(&self) -> Vec<u8> {
self.0.clone()
}

fn from_bytes(bytes: &[u8]) -> Result<BytesAdapter> {
fn from_bytes(bytes: &'a [u8]) -> Result<BytesAdapter> {
Ok(BytesAdapter(bytes.to_vec()))
}
}
Expand All @@ -228,24 +234,6 @@ pub(crate) trait TableMetaKeyGetTxnOp {
);
}

impl TableMetaKey for String {
fn as_raw_key(&self) -> Vec<u8> {
self.as_bytes().to_vec()
}
}

impl TableMetaKeyGetTxnOp for String {
fn build_get_op(
&self,
) -> (
TxnOp,
impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
) {
let key = self.as_raw_key();
(TxnOp::Get(key.clone()), TxnOpGetResponseSet::filter(key))
}
}

pub trait TableMetaValue {
fn try_from_raw_value(raw_value: &[u8]) -> Result<Self>
where
Expand Down Expand Up @@ -675,11 +663,11 @@ impl TableMetadataManager {
.map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
.collect::<HashSet<_>>();

keys.push(table_name.as_raw_key());
keys.push(table_info_key.as_raw_key());
keys.push(table_route_key.as_raw_key());
keys.push(table_name.to_bytes());
keys.push(table_info_key.to_bytes());
keys.push(table_route_key.to_bytes());
for key in &datanode_table_keys {
keys.push(key.as_raw_key());
keys.push(key.to_bytes());
}
Ok(keys)
}
Expand Down Expand Up @@ -992,21 +980,6 @@ impl TableMetadataManager {
}
}

#[macro_export]
macro_rules! impl_table_meta_key {
($($val_ty: ty), *) => {
$(
impl std::fmt::Display for $val_ty {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", String::from_utf8_lossy(&self.as_raw_key()))
}
}
)*
}
}

impl_table_meta_key!(TableNameKey<'_>, TableInfoKey, DatanodeTableKey);

#[macro_export]
macro_rules! impl_table_meta_value {
($($val_ty: ty), *) => {
Expand All @@ -1024,7 +997,7 @@ macro_rules! impl_table_meta_value {
}
}

macro_rules! impl_table_meta_key_get_txn_op {
macro_rules! impl_meta_key_get_txn_op {
($($key: ty), *) => {
$(
impl $crate::key::TableMetaKeyGetTxnOp for $key {
Expand All @@ -1038,7 +1011,7 @@ macro_rules! impl_table_meta_key_get_txn_op {
&'a mut TxnOpGetResponseSet,
) -> Option<Vec<u8>>,
) {
let raw_key = self.as_raw_key();
let raw_key = self.to_bytes();
(
TxnOp::Get(raw_key.clone()),
TxnOpGetResponseSet::filter(raw_key),
Expand All @@ -1049,7 +1022,7 @@ macro_rules! impl_table_meta_key_get_txn_op {
}
}

impl_table_meta_key_get_txn_op! {
impl_meta_key_get_txn_op! {
TableNameKey<'_>,
TableInfoKey,
TableRouteKey,
Expand Down
34 changes: 25 additions & 9 deletions src/common/meta/src/key/catalog_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};

use crate::error::{self, Error, InvalidTableMetadataSnafu, Result};
use crate::key::{TableMetaKey, CATALOG_NAME_KEY_PATTERN, CATALOG_NAME_KEY_PREFIX};
use crate::key::{MetaKey, CATALOG_NAME_KEY_PATTERN, CATALOG_NAME_KEY_PREFIX};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;

/// The catalog name key, indices all catalog names
///
/// The layout: `__catalog_name/{catalog_name}`
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct CatalogNameKey<'a> {
pub catalog: &'a str,
Expand All @@ -53,15 +56,28 @@ impl<'a> CatalogNameKey<'a> {
}
}

impl Display for CatalogNameKey<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", CATALOG_NAME_KEY_PREFIX, self.catalog)
impl<'a> MetaKey<'a, CatalogNameKey<'a>> for CatalogNameKey<'_> {
fn to_bytes(&self) -> Vec<u8> {
self.to_string().into_bytes()
}

fn from_bytes(bytes: &'a [u8]) -> Result<CatalogNameKey<'a>> {
let key = std::str::from_utf8(bytes).map_err(|e| {
InvalidTableMetadataSnafu {
err_msg: format!(
"CatalogNameKey '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
CatalogNameKey::try_from(key)
}
}

impl TableMetaKey for CatalogNameKey<'_> {
fn as_raw_key(&self) -> Vec<u8> {
self.to_string().into_bytes()
impl Display for CatalogNameKey<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", CATALOG_NAME_KEY_PREFIX, self.catalog)
}
}

Expand Down Expand Up @@ -103,7 +119,7 @@ impl CatalogManager {
pub async fn create(&self, catalog: CatalogNameKey<'_>, if_not_exists: bool) -> Result<()> {
let _timer = crate::metrics::METRIC_META_CREATE_CATALOG.start_timer();

let raw_key = catalog.as_raw_key();
let raw_key = catalog.to_bytes();
let raw_value = CatalogNameValue.try_as_raw_value()?;
if self
.kv_backend
Expand All @@ -117,7 +133,7 @@ impl CatalogManager {
}

pub async fn exists(&self, catalog: CatalogNameKey<'_>) -> Result<bool> {
let raw_key = catalog.as_raw_key();
let raw_key = catalog.to_bytes();

self.kv_backend.exists(&raw_key).await
}
Expand Down
Loading

0 comments on commit 9ea8af4

Please sign in to comment.