Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix open region missing path #2441

Merged
merged 4 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 37 additions & 9 deletions src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,34 @@ impl Display for SimpleReply {
}
}

impl Display for OpenRegion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"OpenRegion(region_ident={}, region_storage_path={})",
self.region_ident, self.region_storage_path
)
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OpenRegion {
pub region_ident: RegionIdent,
pub region_storage_path: String,
}

impl OpenRegion {
pub fn new(region_ident: RegionIdent, path: &str) -> Self {
Self {
region_ident,
region_storage_path: path.to_string(),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Display)]
pub enum Instruction {
OpenRegion(RegionIdent),
OpenRegion(OpenRegion),
CloseRegion(RegionIdent),
InvalidateTableIdCache(TableId),
InvalidateTableNameCache(TableName),
Expand Down Expand Up @@ -93,18 +118,21 @@ mod tests {

#[test]
fn test_serialize_instruction() {
let open_region = Instruction::OpenRegion(RegionIdent {
cluster_id: 1,
datanode_id: 2,
table_id: 1024,
region_number: 1,
engine: "mito2".to_string(),
});
let open_region = Instruction::OpenRegion(OpenRegion::new(
RegionIdent {
cluster_id: 1,
datanode_id: 2,
table_id: 1024,
region_number: 1,
engine: "mito2".to_string(),
},
"test/foo",
));

let serialized = serde_json::to_string(&open_region).unwrap();

assert_eq!(
r#"{"OpenRegion":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#,
r#"{"OpenRegion":{"region_ident":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo"}}"#,
serialized
);

Expand Down
10 changes: 7 additions & 3 deletions src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult};
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::RegionIdent;
use common_query::Output;
use common_telemetry::error;
use snafu::OptionExt;
use store_api::path_utils::region_dir;
use store_api::region_request::{RegionCloseRequest, RegionOpenRequest, RegionRequest};
use store_api::storage::RegionId;

Expand All @@ -43,11 +44,14 @@ impl RegionHeartbeatResponseHandler {

fn instruction_to_request(instruction: Instruction) -> MetaResult<(RegionId, RegionRequest)> {
match instruction {
Instruction::OpenRegion(region_ident) => {
Instruction::OpenRegion(OpenRegion {
region_ident,
region_storage_path,
}) => {
let region_id = Self::region_ident_to_region_id(&region_ident);
let open_region_req = RegionRequest::Open(RegionOpenRequest {
engine: region_ident.engine,
region_dir: "".to_string(),
region_dir: region_dir(&region_storage_path, region_id),
options: HashMap::new(),
});
Ok((region_id, open_region_req))
Expand Down
19 changes: 11 additions & 8 deletions src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use common_meta::heartbeat::handler::{
HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
use common_meta::instruction::{Instruction, InstructionReply, RegionIdent};
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, RegionIdent};
use common_query::prelude::ScalarUdf;
use common_query::Output;
use common_runtime::Runtime;
Expand Down Expand Up @@ -90,13 +90,16 @@ fn close_region_instruction() -> Instruction {
}

fn open_region_instruction() -> Instruction {
Instruction::OpenRegion(RegionIdent {
table_id: 1024,
region_number: 0,
cluster_id: 1,
datanode_id: 2,
engine: "mito2".to_string(),
})
Instruction::OpenRegion(OpenRegion::new(
RegionIdent {
table_id: 1024,
region_number: 0,
cluster_id: 1,
datanode_id: 2,
engine: "mito2".to_string(),
},
"path/dir",
))
}

pub struct MockQueryEngine;
Expand Down
19 changes: 14 additions & 5 deletions src/meta-srv/src/procedure/region_failover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,9 @@ mod tests {

use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{HeartbeatResponse, MailboxMessage, Peer, RequestHeader};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::ddl::utils::region_storage_path;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::key::TableMetadataManager;
use common_meta::sequence::Sequence;
use common_meta::DatanodeId;
Expand Down Expand Up @@ -426,6 +428,7 @@ mod tests {
pub context: RegionFailoverContext,
pub heartbeat_receivers: HashMap<DatanodeId, Receiver<tonic::Result<HeartbeatResponse>>>,
pub pushers: Pushers,
pub path: String,
}

impl TestingEnv {
Expand Down Expand Up @@ -549,6 +552,7 @@ mod tests {
},
pushers,
heartbeat_receivers,
path: region_storage_path(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME).to_string(),
}
}
}
Expand Down Expand Up @@ -606,17 +610,22 @@ mod tests {
let (candidate_tx, mut candidate_rx) = tokio::sync::mpsc::channel(1);
for (datanode_id, mut recv) in env.heartbeat_receivers.into_iter() {
let mailbox_clone = env.context.mailbox.clone();
let failed_region_clone = failed_region.clone();
let opening_region = RegionIdent {
datanode_id,
..failed_region.clone()
};
let path = env.path.to_string();
let candidate_tx = candidate_tx.clone();
let _handle = common_runtime::spawn_bg(async move {
let resp = recv.recv().await.unwrap().unwrap();
let received = &resp.mailbox_message.unwrap();
assert_eq!(
received.payload,
Some(Payload::Json(
serde_json::to_string(&Instruction::OpenRegion(
failed_region_clone.clone()
))
serde_json::to_string(&Instruction::OpenRegion(OpenRegion::new(
opening_region,
&path
)))
.unwrap(),
))
);
Expand Down
84 changes: 65 additions & 19 deletions src/meta-srv/src/procedure/region_failover/activate_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@ use std::time::Duration;

use api::v1::meta::MailboxMessage;
use async_trait::async_trait;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::ddl::utils::region_storage_path;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::peer::Peer;
use common_meta::RegionIdent;
use common_telemetry::{debug, info};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};

use super::update_metadata::UpdateRegionMetadata;
use super::{RegionFailoverContext, State};
use crate::error::{
Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu,
self, Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu,
};
use crate::handler::HeartbeatMailbox;
use crate::inactive_region_manager::InactiveRegionManager;
Expand All @@ -36,20 +37,49 @@ use crate::service::mailbox::{Channel, MailboxReceiver};
#[derive(Serialize, Deserialize, Debug)]
pub(super) struct ActivateRegion {
candidate: Peer,
region_storage_path: Option<String>,
}

impl ActivateRegion {
pub(super) fn new(candidate: Peer) -> Self {
Self { candidate }
Self {
candidate,
region_storage_path: None,
}
}

async fn send_open_region_message(
&self,
&mut self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
timeout: Duration,
) -> Result<MailboxReceiver> {
let instruction = Instruction::OpenRegion(failed_region.clone());
let table_id = failed_region.table_id;
// TODO(weny): considers fetching table info only once.
let table_info = ctx
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.context(error::TableMetadataManagerSnafu)?
.context(error::TableInfoNotFoundSnafu { table_id })?
.table_info;

let region_storage_path =
region_storage_path(&table_info.catalog_name, &table_info.schema_name);
WenyXu marked this conversation as resolved.
Show resolved Hide resolved

let candidate_ident = RegionIdent {
datanode_id: self.candidate.id,
..failed_region.clone()
};
info!("Activating region: {candidate_ident:?}");

let instruction = Instruction::OpenRegion(OpenRegion::new(
candidate_ident.clone(),
&region_storage_path,
));

self.region_storage_path = Some(region_storage_path);

let msg = MailboxMessage::json_message(
"Activate Region",
Expand All @@ -72,20 +102,16 @@ impl ActivateRegion {
// command in time, it was considered an inactive node by metasrv, then it replied, and the
// current region failed over again, and the node was selected as a candidate, so it needs
// to clear its previous state first.
let candidate = RegionIdent {
datanode_id: self.candidate.id,
..failed_region.clone()
};
InactiveRegionManager::new(&ctx.in_memory)
.deregister_inactive_region(&candidate)
.deregister_inactive_region(&candidate_ident)
.await?;

let ch = Channel::Datanode(self.candidate.id);
ctx.mailbox.send(&ch, msg, timeout).await
}

async fn handle_response(
&self,
&mut self,
mailbox_receiver: MailboxReceiver,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
Expand All @@ -102,7 +128,14 @@ impl ActivateRegion {
.fail();
};
if result {
Ok(Box::new(UpdateRegionMetadata::new(self.candidate.clone())))
Ok(Box::new(UpdateRegionMetadata::new(
self.candidate.clone(),
self.region_storage_path
.clone()
.context(error::UnexpectedSnafu {
violated: "expected region_storage_path",
})?,
)))
} else {
// The region could be just indeed cannot be opened by the candidate, retry
// would be in vain. Then why not just end the failover procedure? Because we
Expand Down Expand Up @@ -135,7 +168,6 @@ impl State for ActivateRegion {
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
info!("Activating region: {failed_region:?}");
let mailbox_receiver = self
.send_open_region_message(ctx, failed_region, OPEN_REGION_MESSAGE_TIMEOUT)
.await?;
Expand All @@ -160,7 +192,7 @@ mod tests {
let failed_region = env.failed_region(1).await;

let candidate = 2;
let state = ActivateRegion::new(Peer::new(candidate, ""));
let mut state = ActivateRegion::new(Peer::new(candidate, ""));
let mailbox_receiver = state
.send_open_region_message(&env.context, &failed_region, Duration::from_millis(100))
.await
Expand All @@ -179,7 +211,14 @@ mod tests {
assert_eq!(
received.payload,
Some(Payload::Json(
serde_json::to_string(&Instruction::OpenRegion(failed_region.clone())).unwrap(),
serde_json::to_string(&Instruction::OpenRegion(OpenRegion::new(
RegionIdent {
datanode_id: candidate,
..failed_region.clone()
},
&env.path
)))
.unwrap(),
))
);

Expand Down Expand Up @@ -212,7 +251,7 @@ mod tests {
.unwrap();
assert_eq!(
format!("{next_state:?}"),
r#"UpdateRegionMetadata { candidate: Peer { id: 2, addr: "" } }"#
r#"UpdateRegionMetadata { candidate: Peer { id: 2, addr: "" }, region_storage_path: "greptime/public" }"#
);
}

Expand All @@ -224,7 +263,7 @@ mod tests {
let failed_region = env.failed_region(1).await;

let candidate = 2;
let state = ActivateRegion::new(Peer::new(candidate, ""));
let mut state = ActivateRegion::new(Peer::new(candidate, ""));
let mailbox_receiver = state
.send_open_region_message(&env.context, &failed_region, Duration::from_millis(100))
.await
Expand All @@ -241,7 +280,14 @@ mod tests {
assert_eq!(
received.payload,
Some(Payload::Json(
serde_json::to_string(&Instruction::OpenRegion(failed_region.clone())).unwrap()
serde_json::to_string(&Instruction::OpenRegion(OpenRegion::new(
RegionIdent {
datanode_id: candidate,
..failed_region.clone()
},
&env.path
)))
.unwrap(),
))
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ mod tests {
mut heartbeat_receivers,
context,
pushers,
..
} = env;

for frontend_id in 4..=7 {
Expand Down
Loading
Loading