From 73514da38188551ec1a89865644566731b081a99 Mon Sep 17 00:00:00 2001 From: JohnsonLee <53596783+J0HN50N133@users.noreply.github.com> Date: Sun, 14 Apr 2024 01:00:44 +0800 Subject: [PATCH] feat: support Get command (#6) * refactor: split the runtime part and static part when implementing Command * feat: support get command * feat: command id will convert to uppercase at comptime when making command_type_stub * refactor: remove redundant code while registering mod --- src/commands/src/commands.rs | 140 ++++++++++++++++++++++------ src/commands/src/commands/string.rs | 97 ++++++++++++------- src/commands/src/error.rs | 10 +- src/commands/src/lib.rs | 2 +- src/server/src/connection.rs | 77 ++++++++------- src/server/src/error.rs | 26 ++++++ src/server/src/lib.rs | 3 + 7 files changed, 254 insertions(+), 101 deletions(-) create mode 100644 src/server/src/error.rs diff --git a/src/commands/src/commands.rs b/src/commands/src/commands.rs index bfb9cbe..7a89c77 100644 --- a/src/commands/src/commands.rs +++ b/src/commands/src/commands.rs @@ -23,49 +23,135 @@ use crate::error::Result; mod string; -pub static COMMANDS_TABLE: Lazy> = Lazy::new(|| { - let mut table = HashMap::new(); - table.insert(CommandId::SET, Command::new(CommandId::SET)); - table -}); - -/// ```plaintext -/// COMMAND: string => Command ID => Command --create--> CommandInstance -/// ``` -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, strum::Display, strum::EnumString)] -#[strum(serialize_all = "lowercase")] -#[strum(ascii_case_insensitive)] -pub enum CommandId { - SET, +// pub type GlobalCommandTable = HashMap; +#[derive(Default)] +pub struct GlobalCommandTable { + table: HashMap, Command>, } +impl GlobalCommandTable { + pub fn register(&mut self) { + let cmd = Command::new::(); + self.table.insert(cmd.id, cmd); + } + + pub fn get(&self, cmd_id: &CommandId) -> Option<&Command> { + self.table.get(cmd_id.to_ascii_uppercase().as_str()) + } + + pub fn new() -> GlobalCommandTable { + GlobalCommandTable { + table: HashMap::new(), + } + } +} + +macro_rules! register_mod { + ($($mod:ident),*) => { + Lazy::new(|| { + let mut table = Default::default(); + $( + $mod::register(&mut table); + );* + table + }) + }; +} + +#[macro_export] +macro_rules! register { + ($($cmd:ident),*) => { + pub(in $crate::commands) fn register(table: &mut GlobalCommandTable) { + static START: Once = Once::new(); + START.call_once(move || { + $( + table.register::<$cmd>(); + )* + }); + } + }; +} + +/// create a command type stub for a command +/// +/// [`cmd_id`] will be converted to uppercase +#[macro_export] +macro_rules! command_type_stub { + (id: $cmd_id:literal) => { + paste::paste! { + fn command() -> &'static Command { + static STUB: Lazy = + Lazy::new(|| Command::new_stub( + stringify!([< $cmd_id:upper >]) + )); + &STUB + } + } + }; +} + +pub static GLOBAL_COMMANDS_TABLE: Lazy = register_mod! {string}; + +/// Mapping relationship between command id and command instance +/// [`CommandId`] => [`Command`] --create--> [`CommandInstance`] +pub type CommandId = str; +pub type CommandIdRef<'a> = &'a CommandId; + +type CreateInstanceFn = fn() -> Box; + +#[derive(Clone)] pub struct Command { /// command id i.e. command name - id: CommandId, + id: CommandIdRef<'static>, + create_instance_fn: CreateInstanceFn, +} +impl Command { + pub fn create_instance(&self) -> Box { + (self.create_instance_fn)() + } } impl Command { - pub fn new(cmd_id: CommandId) -> Self { - Self { id: cmd_id } + pub(crate) fn new() -> Self { + let mut cmd = F::command().clone(); + cmd.create_instance_fn = F::boxed; + cmd } - pub fn new_instance(&self) -> impl CommandInstance { - match self.id { - CommandId::SET => string::Set::new(), + pub(crate) fn new_stub(cmd_id: CommandIdRef<'static>) -> Self { + /// [`dummy_create_inst`] is a dummy function to satisfy the type of `CommandInstance` + /// and prevent cyclic dependency between [`create_inst`] and [`new`] of CommandInst + fn dummy_create_inst() -> Box { + unreachable!() + } + Self { + id: cmd_id, + create_instance_fn: dummy_create_inst, } } } -pub trait CommandInstance { - fn get_attr(&self) -> &Command; +pub trait CommandInstance: Send { + /// Parse command arguments representing in an array of Bytes, since client can only send RESP3 Array frames + fn parse(&mut self, input: &[Bytes]) -> Result<()>; + fn execute(&mut self, storage: &Storage, namespace: Bytes) -> Result; +} + +pub trait CommandTypeInfo: CommandInstance + Sized + 'static { + /// Tell the system how to create instance + fn new() -> Self; + + /// Static typing infomation of command, which is used to register the command + fn command() -> &'static Command; - fn id(&self) -> CommandId { - self.get_attr().id + /// Boxed version of `new` + fn boxed() -> Box { + Box::new(Self::new()) } - /// Parse an array of Bytes, since client can only send RESP3 Array frames - fn parse(&mut self, input: Vec) -> Result<()>; - fn execute(self, storage: &Storage, namespace: Bytes) -> BytesFrame; + fn id() -> CommandIdRef<'static> { + Self::command().id + } } #[cfg(test)] diff --git a/src/commands/src/commands/string.rs b/src/commands/src/commands/string.rs index a4b97bc..7939abb 100644 --- a/src/commands/src/commands/string.rs +++ b/src/commands/src/commands/string.rs @@ -12,18 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Once; + use bytes::Bytes; use common_base::bytes::StringBytes; +use once_cell::sync::Lazy; use redis_protocol::resp3::types::{BytesFrame, FrameKind}; use roxy::datatypes::string::{RedisString, StringSetArgs, StringSetArgsBuilder, StringSetType}; use roxy::storage::Storage; use snafu::ResultExt; -use super::{Command, CommandId, CommandInstance}; -use crate::error::{InvalidCmdSyntaxSnafu, Result}; +use super::{Command, CommandInstance, CommandTypeInfo, GlobalCommandTable}; +use crate::error::{FailInStorageSnafu, InvalidCmdSyntaxSnafu, Result}; use crate::parser::{ chain, key, keyword, optional, string, ttl, value_of_type, Parser, TTLOption, Tokens, }; +use crate::{command_type_stub, register}; pub struct SetArgs { key: Bytes, @@ -33,33 +37,17 @@ pub struct SetArgs { /// [`SET`] is an instance of `SET` command pub struct Set { - cmd: Command, args: Option, } -impl Set { - pub fn new() -> Self { - Self { - cmd: Command { id: CommandId::SET }, - args: None, - } - } -} - impl CommandInstance for Set { - fn get_attr(&self) -> &Command { - &self.cmd - } - - fn parse(&mut self, input: Vec) -> Result<()> { + fn parse(&mut self, input: &[Bytes]) -> Result<()> { let mut set_args_builder = StringSetArgsBuilder::default(); - let mut tokens = Tokens::new(&input[..]); - // skip the command name which is already ensured by strum - tokens.advance(); + let mut tokens = Tokens::new(input); let (key, value) = chain(key(), string()) .parse(&mut tokens) - .context(InvalidCmdSyntaxSnafu { cmd_id: self.id() })?; + .context(InvalidCmdSyntaxSnafu { cmd_id: Self::id() })?; // TODO: Add permutation parser let set_type = optional(value_of_type::()) @@ -84,7 +72,7 @@ impl CommandInstance for Set { let set_args = set_args_builder.build().unwrap(); tokens .expect_eot() - .context(InvalidCmdSyntaxSnafu { cmd_id: self.id() })?; + .context(InvalidCmdSyntaxSnafu { cmd_id: Self::id() })?; self.args = Some(SetArgs { key: key.into(), value, @@ -93,27 +81,64 @@ impl CommandInstance for Set { Ok(()) } - fn execute(mut self, storage: &Storage, namespace: Bytes) -> BytesFrame { + fn execute(&mut self, storage: &Storage, namespace: Bytes) -> Result { let db = RedisString::new(storage, namespace.into()); let args = self.args.take().unwrap(); // TODO: handle ttl - let (Ok(res) | Err(res)) = db - .set(args.key, args.value, &args.set_args) + db.set(args.key, args.value, &args.set_args) .map(|opt_old| match opt_old { - Some(old) if args.set_args.get => (FrameKind::BlobString, old).try_into().unwrap(), - Some(_) => (FrameKind::SimpleString, "OK").try_into().unwrap(), + Some(old) => (FrameKind::BlobString, old).try_into().unwrap(), + None if args.set_args.get => (FrameKind::SimpleString, "OK").try_into().unwrap(), None => BytesFrame::Null, }) - .map_err(|err| { - (FrameKind::SimpleError, err.to_string()) - .try_into() - .unwrap() - }); - res + .context(FailInStorageSnafu { cmd_id: Self::id() }) + } +} + +impl CommandTypeInfo for Set { + fn new() -> Self { + Self { args: None } + } + + command_type_stub! { id: "Set" } +} + +pub struct Get { + key: Bytes, +} + +impl CommandInstance for Get { + fn parse(&mut self, input: &[Bytes]) -> Result<()> { + let key = key() + .parse(&mut Tokens::new(input)) + .context(InvalidCmdSyntaxSnafu { cmd_id: Self::id() })?; + self.key = key.into(); + Ok(()) + } + + fn execute(&mut self, storage: &Storage, namespace: Bytes) -> Result { + RedisString::new(storage, namespace.into()) + .get(self.key.clone()) + .map(|opt_value| { + opt_value + .map(|value| (FrameKind::BlobString, value).try_into().unwrap()) + .unwrap_or_else(|| BytesFrame::Null) + }) + .context(FailInStorageSnafu { cmd_id: Self::id() }) + } +} + +impl CommandTypeInfo for Get { + fn new() -> Self { + Self { key: Bytes::new() } } + + command_type_stub! { id: "Get" } } +register! {Set, Get} + #[cfg(test)] mod tests { use std::error::Error; @@ -125,7 +150,7 @@ mod tests { fn test_valid_set_cmd_parse() { let input = resp3_encode_command("SeT key value nX"); let mut set_cmd = Set::new(); - assert!(set_cmd.parse(input).is_ok()); + assert!(set_cmd.parse(&input[1..]).is_ok()); let args = set_cmd.args.as_ref().unwrap(); assert_eq!(args.key, &b"key"[..]); assert_eq!(args.value.as_utf8(), "value"); @@ -138,8 +163,8 @@ mod tests { let mut set_cmd = Set::new(); println!( "{}", - set_cmd.parse(input.clone()).unwrap_err().source().unwrap() + set_cmd.parse(&input[1..]).unwrap_err().source().unwrap() ); - assert!(set_cmd.parse(input).is_err()); + assert!(set_cmd.parse(&input[..]).is_err()); } } diff --git a/src/commands/src/error.rs b/src/commands/src/error.rs index a3c7b41..822c899 100644 --- a/src/commands/src/error.rs +++ b/src/commands/src/error.rs @@ -14,22 +14,22 @@ use snafu::Snafu; -use crate::commands::CommandId; +use crate::commands::CommandIdRef; use crate::parser::ParseError; #[derive(Debug, Snafu)] -#[snafu(visibility(pub))] +#[snafu(visibility(pub(crate)))] pub enum Error { #[snafu(display("Invalid '{}' command", cmd_id))] InvalidCmdSyntax { source: ParseError, - cmd_id: CommandId, + cmd_id: CommandIdRef<'static>, }, #[snafu(display("Fail to execute command '{}' because of storage error", cmd_id))] FailInStorage { source: roxy::error::Error, - cmd_id: CommandId, + cmd_id: CommandIdRef<'static>, }, } -pub type Result = std::result::Result; +pub(crate) type Result = std::result::Result; diff --git a/src/commands/src/lib.rs b/src/commands/src/lib.rs index e02604f..fd0548b 100644 --- a/src/commands/src/lib.rs +++ b/src/commands/src/lib.rs @@ -13,5 +13,5 @@ // limitations under the License. pub mod commands; -pub(crate) mod error; +pub mod error; pub mod parser; diff --git a/src/server/src/connection.rs b/src/server/src/connection.rs index bd6ae15..a737e41 100644 --- a/src/server/src/connection.rs +++ b/src/server/src/connection.rs @@ -1,14 +1,17 @@ use bytes::Bytes; -use commands::commands::{CommandId, CommandInstance, COMMANDS_TABLE}; +use commands::commands::GLOBAL_COMMANDS_TABLE; use common_base::bytes::StringBytes; use common_telemetry::log::debug; use futures::stream::{SplitSink, SplitStream}; use futures::{SinkExt as _, StreamExt}; use redis_protocol::codec::Resp3; -use redis_protocol::resp3::types::BytesFrame; +use redis_protocol::resp3::types::{BytesFrame, FrameKind}; use roxy::storage::StorageRef; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::codec::Framed; + +use crate::error::{Result, UnknownCommandSnafu}; + type Stream = SplitStream>; type Sink = SplitSink, BytesFrame>; @@ -17,6 +20,7 @@ pub struct Connection { writer: Sink, namespace: Bytes, storage: StorageRef, + command_tokens_buf: Vec, } impl Connection @@ -31,19 +35,22 @@ where writer, storage, namespace: Bytes::from_static(b"default"), + command_tokens_buf: Vec::with_capacity(64), } } - fn frame_as_bytes_array(frame: BytesFrame) -> Vec { + fn frame_as_bytes_array(&mut self, frame: BytesFrame) { + self.command_tokens_buf.clear(); match frame { - BytesFrame::Array { data, .. } => data - .iter() - .map(|b| match b { - BytesFrame::SimpleString { data, .. } => data.clone(), - BytesFrame::BlobString { data, .. } => data.clone(), - _ => unreachable!(), - }) - .collect(), + BytesFrame::Array { data, .. } => { + data.iter() + .map(|b| match b { + BytesFrame::SimpleString { data, .. } => data.clone(), + BytesFrame::BlobString { data, .. } => data.clone(), + _ => unreachable!(), + }) + .collect_into(&mut self.command_tokens_buf); + } _ => unreachable!(), } } @@ -52,31 +59,37 @@ where while let Some(frame) = self.reader.next().await { let frame = frame.unwrap(); debug!("Received: {:?}", frame); - let command_tokens = Self::frame_as_bytes_array(frame); - let command_id: CommandId = match StringBytes::new(command_tokens[0].clone()) - .as_utf8() - .parse() - { - Ok(id) => id, - Err(_) => { - let _ = self - .writer - .send(BytesFrame::SimpleString { - data: Bytes::from_static(b"Unimplemented"), - attributes: None, - }) - .await; - continue; - } - }; - let command = COMMANDS_TABLE.get(&command_id).unwrap(); - let mut command_inst = command.new_instance(); - command_inst.parse(command_tokens).unwrap(); - let response = command_inst.execute(&self.storage, self.namespace.clone()); + self.frame_as_bytes_array(frame); + let response = self.to_response(self.execute_command(&self.command_tokens_buf[..])); let res = self.writer.send(response).await; assert!(res.is_ok()); } } + fn to_response(&self, response: Result) -> BytesFrame { + match response { + Ok(frame) => frame, + Err(e) => { + debug!("Error: {:?}", e); + (FrameKind::SimpleError, e.to_string()).try_into().unwrap() + } + } + } + + fn execute_command(&self, command_tokens: &[Bytes]) -> Result { + if let Some(c) = command_tokens.first() { + let s = StringBytes::new(c.clone()); + if let Some(command) = GLOBAL_COMMANDS_TABLE.get(s.as_utf8()) { + let mut command_inst = command.create_instance(); + // Skip the first token, which is the command name + command_inst.parse(&command_tokens[1..]).unwrap(); + Ok(command_inst.execute(&self.storage, self.namespace.clone())?) + } else { + UnknownCommandSnafu { cmd: s.as_utf8() }.fail() + } + } else { + UnknownCommandSnafu { cmd: "" }.fail() + } + } } #[cfg(test)] diff --git a/src/server/src/error.rs b/src/server/src/error.rs new file mode 100644 index 0000000..7758d9b --- /dev/null +++ b/src/server/src/error.rs @@ -0,0 +1,26 @@ +// Copyright 2024 Rudeus Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::Snafu; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub(crate)))] +pub enum Error { + #[snafu(display("Unknown command '{}'", cmd))] + UnknownCommand { cmd: String }, + #[snafu(transparent)] + ExecuteError { source: commands::error::Error }, +} + +pub type Result = std::result::Result; diff --git a/src/server/src/lib.rs b/src/server/src/lib.rs index 7c4cb5b..2bebe67 100644 --- a/src/server/src/lib.rs +++ b/src/server/src/lib.rs @@ -12,5 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(iter_collect_into)] + pub mod connection; +pub mod error; pub mod server;