Skip to content

Commit

Permalink
fix: open region missing options (#2473)
Browse files Browse the repository at this point in the history
* fix: open region missing options

* refactor: remove redundant clone

* chore: apply suggestions from CR

* chore: apply suggestions

* chore: apply suggestions

* test: add test for initialize_region_server

* feat: introduce RegionInfo
  • Loading branch information
WenyXu authored Oct 7, 2023
1 parent fe783c7 commit f50f2a8
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 86 deletions.
9 changes: 6 additions & 3 deletions src/cmd/src/cli/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use client::api::v1::meta::TableRouteValue;
use common_meta::ddl::utils::region_storage_path;
use common_meta::error as MetaError;
use common_meta::key::catalog_name::{CatalogNameKey, CatalogNameValue};
use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue, RegionInfo};
use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue};
use common_meta::key::table_info::{TableInfoKey, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameValue};
Expand Down Expand Up @@ -405,8 +405,11 @@ impl MigrateTableMetadata {
DatanodeTableValue::new(
table_id,
regions,
engine.to_string(),
region_storage_path.clone(),
RegionInfo {
engine: engine.to_string(),
region_storage_path: region_storage_path.clone(),
region_options: (&value.table_info.meta.options).into(),
},
),
)
})
Expand Down
8 changes: 6 additions & 2 deletions src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::fmt::{Display, Formatter};

use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -73,13 +74,15 @@ impl Display for OpenRegion {
pub struct OpenRegion {
pub region_ident: RegionIdent,
pub region_storage_path: String,
pub options: HashMap<String, String>,
}

impl OpenRegion {
pub fn new(region_ident: RegionIdent, path: &str) -> Self {
pub fn new(region_ident: RegionIdent, path: &str, options: HashMap<String, String>) -> Self {
Self {
region_ident,
region_storage_path: path.to_string(),
options,
}
}
}
Expand Down Expand Up @@ -127,12 +130,13 @@ mod tests {
engine: "mito2".to_string(),
},
"test/foo",
HashMap::new(),
));

let serialized = serde_json::to_string(&open_region).unwrap();

assert_eq!(
r#"{"OpenRegion":{"region_ident":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo"}}"#,
r#"{"OpenRegion":{"region_ident":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","options":{}}}"#,
serialized
);

Expand Down
50 changes: 35 additions & 15 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub mod table_region;
#[allow(deprecated)]
pub mod table_route;

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;

use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
Expand All @@ -69,6 +69,7 @@ use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
use table_name::{TableNameKey, TableNameManager, TableNameValue};

use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
use self::datanode_table::RegionInfo;
use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
use self::table_route::{TableRouteManager, TableRouteValue};
use crate::ddl::utils::region_storage_path;
Expand Down Expand Up @@ -256,6 +257,7 @@ impl TableMetadataManager {
.table_name_manager()
.build_create_txn(&table_name, table_id)?;

let region_options = (&table_info.meta.options).into();
// Creates table info.
let table_info_value = TableInfoValue::new(table_info);
let (create_table_info_txn, on_create_table_info_failure) = self
Expand All @@ -268,6 +270,7 @@ impl TableMetadataManager {
table_id,
&engine,
&region_storage_path,
region_options,
distribution,
)?;

Expand Down Expand Up @@ -446,10 +449,10 @@ impl TableMetadataManager {
pub async fn update_table_route(
&self,
table_id: TableId,
engine: &str,
region_storage_path: &str,
region_info: RegionInfo,
current_table_route_value: TableRouteValue,
new_region_routes: Vec<RegionRoute>,
new_region_options: &HashMap<String, String>,
) -> Result<()> {
// Updates the datanode table key value pairs.
let current_region_distribution =
Expand All @@ -458,10 +461,10 @@ impl TableMetadataManager {

let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
table_id,
engine,
region_storage_path,
region_info,
current_region_distribution,
new_region_distribution,
new_region_options,
)?;

// Updates the table_route.
Expand Down Expand Up @@ -553,7 +556,7 @@ impl_optional_meta_value! {

#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;

use datatypes::prelude::ConcreteDataType;
Expand All @@ -563,6 +566,7 @@ mod tests {

use super::datanode_table::DatanodeTableKey;
use crate::ddl::utils::region_storage_path;
use crate::key::datanode_table::RegionInfo;
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
Expand Down Expand Up @@ -894,10 +898,14 @@ mod tests {
table_metadata_manager
.update_table_route(
table_id,
engine,
&region_storage_path,
RegionInfo {
engine: engine.to_string(),
region_storage_path: region_storage_path.to_string(),
region_options: HashMap::new(),
},
current_table_route_value.clone(),
new_region_routes.clone(),
&HashMap::new(),
)
.await
.unwrap();
Expand All @@ -907,10 +915,14 @@ mod tests {
table_metadata_manager
.update_table_route(
table_id,
engine,
&region_storage_path,
RegionInfo {
engine: engine.to_string(),
region_storage_path: region_storage_path.to_string(),
region_options: HashMap::new(),
},
current_table_route_value.clone(),
new_region_routes.clone(),
&HashMap::new(),
)
.await
.unwrap();
Expand All @@ -921,10 +933,14 @@ mod tests {
table_metadata_manager
.update_table_route(
table_id,
engine,
&region_storage_path,
RegionInfo {
engine: engine.to_string(),
region_storage_path: region_storage_path.to_string(),
region_options: HashMap::new(),
},
current_table_route_value.clone(),
new_region_routes.clone(),
&HashMap::new(),
)
.await
.unwrap();
Expand All @@ -941,10 +957,14 @@ mod tests {
assert!(table_metadata_manager
.update_table_route(
table_id,
engine,
&region_storage_path,
RegionInfo {
engine: engine.to_string(),
region_storage_path: region_storage_path.to_string(),
region_options: HashMap::new(),
},
wrong_table_route_value,
new_region_routes
new_region_routes,
&HashMap::new(),
)
.await
.is_err());
Expand Down
84 changes: 41 additions & 43 deletions src/common/meta/src/key/datanode_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use futures::stream::BoxStream;
Expand All @@ -32,6 +33,21 @@ use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;
use crate::DatanodeId;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
/// RegionInfo
/// For compatible reason, DON'T modify the field name.
pub struct RegionInfo {
#[serde(default)]
// The table engine, it SHOULD be immutable after created.
pub engine: String,
// The region storage path, it SHOULD be immutable after created.
#[serde(default)]
pub region_storage_path: String,
// The region options.
#[serde(default)]
pub region_options: HashMap<String, String>,
}

pub struct DatanodeTableKey {
datanode_id: DatanodeId,
table_id: TableId,
Expand Down Expand Up @@ -85,25 +101,17 @@ impl TableMetaKey for DatanodeTableKey {
pub struct DatanodeTableValue {
pub table_id: TableId,
pub regions: Vec<RegionNumber>,
#[serde(default)]
pub engine: String,
#[serde(default)]
pub region_storage_path: String,
#[serde(flatten)]
pub region_info: RegionInfo,
version: u64,
}

impl DatanodeTableValue {
pub fn new(
table_id: TableId,
regions: Vec<RegionNumber>,
engine: String,
region_storage_path: String,
) -> Self {
pub fn new(table_id: TableId, regions: Vec<RegionNumber>, region_info: RegionInfo) -> Self {
Self {
table_id,
regions,
engine,
region_storage_path,
region_info,
version: 0,
}
}
Expand Down Expand Up @@ -156,6 +164,7 @@ impl DatanodeTableManager {
table_id: TableId,
engine: &str,
region_storage_path: &str,
region_options: HashMap<String, String>,
distribution: RegionDistribution,
) -> Result<Txn> {
let txns = distribution
Expand All @@ -165,8 +174,11 @@ impl DatanodeTableManager {
let val = DatanodeTableValue::new(
table_id,
regions,
engine.to_string(),
region_storage_path.to_string(),
RegionInfo {
engine: engine.to_string(),
region_storage_path: region_storage_path.to_string(),
region_options: region_options.clone(),
},
);

Ok(TxnOp::Put(key.as_raw_key(), val.try_as_raw_value()?))
Expand All @@ -182,10 +194,10 @@ impl DatanodeTableManager {
pub(crate) fn build_update_txn(
&self,
table_id: TableId,
engine: &str,
region_storage_path: &str,
region_info: RegionInfo,
current_region_distribution: RegionDistribution,
new_region_distribution: RegionDistribution,
new_region_options: &HashMap<String, String>,
) -> Result<Txn> {
let mut opts = Vec::new();

Expand All @@ -197,33 +209,20 @@ impl DatanodeTableManager {
opts.push(TxnOp::Delete(raw_key))
}
}

let need_update_options = region_info.region_options != *new_region_options;
for (datanode, regions) in new_region_distribution.into_iter() {
if let Some(current_region) = current_region_distribution.get(&datanode) {
// Updates if need.
if *current_region != regions {
let key = DatanodeTableKey::new(datanode, table_id);
let raw_key = key.as_raw_key();
let val = DatanodeTableValue::new(
table_id,
regions,
engine.to_string(),
region_storage_path.to_string(),
)
.try_as_raw_value()?;
opts.push(TxnOp::Put(raw_key, val));
}
} else {
// New datanodes
let need_update =
if let Some(current_region) = current_region_distribution.get(&datanode) {
// Updates if need.
*current_region != regions || need_update_options
} else {
true
};
if need_update {
let key = DatanodeTableKey::new(datanode, table_id);
let raw_key = key.as_raw_key();
let val = DatanodeTableValue::new(
table_id,
regions,
engine.to_string(),
region_storage_path.to_string(),
)
.try_as_raw_value()?;
let val = DatanodeTableValue::new(table_id, regions, region_info.clone())
.try_as_raw_value()?;
opts.push(TxnOp::Put(raw_key, val));
}
}
Expand Down Expand Up @@ -270,11 +269,10 @@ mod tests {
let value = DatanodeTableValue {
table_id: 42,
regions: vec![1, 2, 3],
engine: Default::default(),
region_storage_path: Default::default(),
region_info: RegionInfo::default(),
version: 1,
};
let literal = br#"{"table_id":42,"regions":[1,2,3],"engine":"","region_storage_path":"","version":1}"#;
let literal = br#"{"table_id":42,"regions":[1,2,3],"engine":"","region_storage_path":"","region_options":{},"version":1}"#;

let raw_value = value.try_as_raw_value().unwrap();
assert_eq!(raw_value, literal);
Expand Down
Loading

0 comments on commit f50f2a8

Please sign in to comment.