From 3919c3b8dac45b22cb9d2a831e6a8f90943af76d Mon Sep 17 00:00:00 2001 From: Nick Pillitteri Date: Sun, 31 Dec 2023 11:05:14 -0500 Subject: [PATCH] Add support for incr/decr commands --- mtop-client/src/core.rs | 195 +++++++++++++++++++++++++++++++--------- mtop/src/bin/mc.rs | 44 ++++++++- 2 files changed, 196 insertions(+), 43 deletions(-) diff --git a/mtop-client/src/core.rs b/mtop-client/src/core.rs index 2e9f5e4..3675d15 100644 --- a/mtop-client/src/core.rs +++ b/mtop-client/src/core.rs @@ -641,8 +641,10 @@ impl error::Error for ProtocolError {} enum Command<'a> { Add(&'a str, u64, u32, &'a [u8]), CrawlerMetadump, + Decr(&'a str, u64), Delete(&'a str), Gets(&'a [String]), + Incr(&'a str, u64), Replace(&'a str, u64, u32, &'a [u8]), Stats, StatsItems, @@ -657,8 +659,10 @@ impl<'a> From> for Vec { match value { Command::Add(key, flags, ttl, data) => storage_command("add", key, flags, ttl, data), Command::CrawlerMetadump => "lru_crawler metadump hash\r\n".to_owned().into_bytes(), + Command::Decr(key, delta) => format!("decr {} {}\r\n", key, delta).into_bytes(), Command::Delete(key) => format!("delete {}\r\n", key).into_bytes(), Command::Gets(keys) => format!("gets {}\r\n", keys.join(" ")).into_bytes(), + Command::Incr(key, delta) => format!("incr {} {}\r\n", key, delta).into_bytes(), Command::Replace(key, flags, ttl, data) => storage_command("replace", key, flags, ttl, data), Command::Stats => "stats\r\n".to_owned().into_bytes(), Command::StatsItems => "stats items\r\n".to_owned().into_bytes(), @@ -899,6 +903,51 @@ impl Memcached { } } + /// Increment the value of a key by the given delta if the value is numeric returning + /// the new value. Returns an error if the value is _not_ numeric. + pub async fn incr(&mut self, key: K, delta: u64) -> Result + where + K: AsRef, + { + if !validate_key(key.as_ref()) { + return Err(MtopError::runtime("invalid key")); + } + + self.send(Command::Incr(key.as_ref(), delta)).await?; + if let Some(v) = self.read.next_line().await? { + Self::parse_numeric_response(&v) + } else { + Err(MtopError::runtime("unexpected empty response")) + } + } + + /// Decrement the value of a key by the given delta if the value is numeric returning + /// the new value with a minimum of 0. Returns an error if the value is _not_ numeric. + pub async fn decr(&mut self, key: K, delta: u64) -> Result + where + K: AsRef, + { + if !validate_key(key.as_ref()) { + return Err(MtopError::runtime("invalid key")); + } + + self.send(Command::Decr(key.as_ref(), delta)).await?; + if let Some(v) = self.read.next_line().await? { + Self::parse_numeric_response(&v) + } else { + Err(MtopError::runtime("unexpected empty response")) + } + } + + fn parse_numeric_response(line: &str) -> Result { + if let Some(err) = Self::parse_error(line) { + Err(MtopError::from(err)) + } else { + line.parse() + .map_err(|_e| MtopError::runtime(format!("unable to parse '{}'", line))) + } + } + /// Send a simple command to verify our connection to the server is working. pub async fn ping(&mut self) -> Result<(), MtopError> { self.send(Command::Version).await?; @@ -1102,7 +1151,7 @@ mod test { #[test] fn test_validate_key_success() { - let key = "a-long-but-reasonable-key"; + let key = "a-reasonable-key"; assert!(validate_key(key)); } @@ -1125,22 +1174,16 @@ mod test { } } - /// Create a new `Memcached` instance to read the provided server response. + /// Create a new receiver channel and `Memcached` instance to read the provided server + /// response. Anything written by the client is able to be read from the receiver channel. + /// NOTE that it is important that the receiver not be dropped by the caller since this will + /// cause writes to the channel to fail from within the client. macro_rules! client { () => ({ - Memcached::new(Cursor::new(Vec::new()), Vec::new()) + let (tx, rx) = mpsc::unbounded_channel(); + let reads = Vec::new(); + (rx, Memcached::new(Cursor::new(reads), WriteAdapter { tx })) }); - ($($line:expr),+ $(,)?) => ({ - let writes = Vec::new(); - let mut reads = Vec::new(); - $(reads.extend_from_slice($line.as_bytes());)+ - Memcached::new(Cursor::new(reads), writes) - }) - } - - /// Create a new receiver channel and `Memcached` instance to read the provided server - /// response. Anything written by the client is able to be read from the receiver channel. - macro_rules! client_rw { ($($line:expr),+ $(,)?) => ({ let (tx, rx) = mpsc::unbounded_channel(); let mut reads = Vec::new(); @@ -1151,8 +1194,8 @@ mod test { #[tokio::test] async fn test_get_no_key() { - let mut client = client!(); - let keys: Vec = Vec::new(); + let (_rx, mut client) = client!(); + let keys: Vec = vec![]; let res = client.get(keys).await; assert!(res.is_err()); @@ -1162,7 +1205,7 @@ mod test { #[tokio::test] async fn test_get_bad_key() { - let mut client = client!(); + let (_rx, mut client) = client!(); let res = client.get(&["bad key".repeat(MAX_KEY_LENGTH)]).await; assert!(res.is_err()); @@ -1172,7 +1215,7 @@ mod test { #[tokio::test] async fn test_get_error() { - let mut client = client!("SERVER_ERROR backend failure\r\n"); + let (_rx, mut client) = client!("SERVER_ERROR backend failure\r\n"); let keys = vec!["foo".to_owned(), "baz".to_owned()]; let res = client.get(&keys).await; @@ -1183,7 +1226,7 @@ mod test { #[tokio::test] async fn test_get_miss() { - let mut client = client!("END\r\n"); + let (_rx, mut client) = client!("END\r\n"); let keys = vec!["foo".to_owned(), "baz".to_owned()]; let res = client.get(&keys).await.unwrap(); @@ -1192,7 +1235,7 @@ mod test { #[tokio::test] async fn test_get_hit() { - let mut client = client!( + let (_rx, mut client) = client!( "VALUE foo 32 3 1\r\n", "bar\r\n", "VALUE baz 64 3 2\r\n", @@ -1215,9 +1258,79 @@ mod test { assert_eq!(2, val2.cas); } + #[tokio::test] + async fn test_incr_bad_key() { + let (_rx, mut client) = client!(); + let res = client.incr("bad key", 1).await; + + assert!(res.is_err()); + let err = res.unwrap_err(); + assert_eq!(ErrorKind::Runtime, err.kind()); + } + + #[tokio::test] + async fn test_incr_bad_val() { + let (mut rx, mut client) = client!("CLIENT_ERROR cannot increment or decrement non-numeric value\r\n"); + let res = client.incr("test", 2).await; + + assert!(res.is_err()); + let err = res.unwrap_err(); + assert_eq!(ErrorKind::Protocol, err.kind()); + + let bytes = rx.recv().await.unwrap(); + let command = String::from_utf8(bytes).unwrap(); + assert_eq!("incr test 2\r\n", command); + } + + #[tokio::test] + async fn test_incr_success() { + let (mut rx, mut client) = client!("3\r\n"); + let res = client.incr("test", 2).await.unwrap(); + + assert_eq!(3, res); + let bytes = rx.recv().await.unwrap(); + let command = String::from_utf8(bytes).unwrap(); + assert_eq!("incr test 2\r\n", command); + } + + #[tokio::test] + async fn test_decr_bad_key() { + let (_rx, mut client) = client!(); + let res = client.decr("bad key", 1).await; + + assert!(res.is_err()); + let err = res.unwrap_err(); + assert_eq!(ErrorKind::Runtime, err.kind()); + } + + #[tokio::test] + async fn test_decr_bad_val() { + let (mut rx, mut client) = client!("CLIENT_ERROR cannot increment or decrement non-numeric value\r\n"); + let res = client.decr("test", 1).await; + + assert!(res.is_err()); + let err = res.unwrap_err(); + assert_eq!(ErrorKind::Protocol, err.kind()); + + let bytes = rx.recv().await.unwrap(); + let command = String::from_utf8(bytes).unwrap(); + assert_eq!("decr test 1\r\n", command); + } + + #[tokio::test] + async fn test_decr_success() { + let (mut rx, mut client) = client!("3\r\n"); + let res = client.decr("test", 1).await.unwrap(); + + assert_eq!(3, res); + let bytes = rx.recv().await.unwrap(); + let command = String::from_utf8(bytes).unwrap(); + assert_eq!("decr test 1\r\n", command); + } + macro_rules! test_store_command_success { ($method:ident, $verb:expr) => { - let (mut rx, mut client) = client_rw!("STORED\r\n"); + let (mut rx, mut client) = client!("STORED\r\n"); let res = client.$method("test", 0, 300, "val".as_bytes()).await; assert!(res.is_ok()); @@ -1229,7 +1342,7 @@ mod test { macro_rules! test_store_command_bad_key { ($method:ident) => { - let mut client = client!(); + let (_rx, mut client) = client!(); let res = client.$method("bad key", 0, 300, "val".as_bytes()).await; assert!(res.is_err()); @@ -1240,7 +1353,7 @@ mod test { macro_rules! test_store_command_error { ($method:ident, $verb:expr) => { - let (mut rx, mut client) = client_rw!("NOT_STORED\r\n"); + let (mut rx, mut client) = client!("NOT_STORED\r\n"); let res = client.$method("test", 0, 300, "val".as_bytes()).await; assert!(res.is_err()); @@ -1300,7 +1413,7 @@ mod test { #[tokio::test] async fn test_touch_success() { - let (mut rx, mut client) = client_rw!("TOUCHED\r\n"); + let (mut rx, mut client) = client!("TOUCHED\r\n"); let res = client.touch("test", 300).await; assert!(res.is_ok()); @@ -1311,7 +1424,7 @@ mod test { #[tokio::test] async fn test_touch_bad_key() { - let mut client = client!(); + let (_rx, mut client) = client!(); let res = client.touch("bad key", 300).await; assert!(res.is_err()); @@ -1321,7 +1434,7 @@ mod test { #[tokio::test] async fn test_touch_error() { - let (mut rx, mut client) = client_rw!("NOT_FOUND\r\n"); + let (mut rx, mut client) = client!("NOT_FOUND\r\n"); let res = client.touch("test", 300).await; assert!(res.is_err()); @@ -1335,7 +1448,7 @@ mod test { #[tokio::test] async fn test_delete_success() { - let (mut rx, mut client) = client_rw!("DELETED\r\n"); + let (mut rx, mut client) = client!("DELETED\r\n"); let res = client.delete("test").await; assert!(res.is_ok()); @@ -1346,7 +1459,7 @@ mod test { #[tokio::test] async fn test_delete_bad_key() { - let mut client = client!(); + let (_rx, mut client) = client!(); let res = client.delete("bad key").await; assert!(res.is_err()); @@ -1356,7 +1469,7 @@ mod test { #[tokio::test] async fn test_delete_error() { - let (mut rx, mut client) = client_rw!("NOT_FOUND\r\n"); + let (mut rx, mut client) = client!("NOT_FOUND\r\n"); let res = client.delete("test").await; assert!(res.is_err()); @@ -1370,7 +1483,7 @@ mod test { #[tokio::test] async fn test_stats_empty() { - let mut client = client!("END\r\n"); + let (_rx, mut client) = client!("END\r\n"); let res = client.stats().await; assert!(res.is_err()); @@ -1380,7 +1493,7 @@ mod test { #[tokio::test] async fn test_stats_error() { - let mut client = client!("SERVER_ERROR backend failure\r\n"); + let (_rx, mut client) = client!("SERVER_ERROR backend failure\r\n"); let res = client.stats().await; assert!(res.is_err()); @@ -1390,7 +1503,7 @@ mod test { #[tokio::test] async fn test_stats_success() { - let mut client = client!( + let (_rx, mut client) = client!( "STAT pid 1525\r\n", "STAT uptime 271984\r\n", "STAT time 1687212809\r\n", @@ -1494,7 +1607,7 @@ mod test { #[tokio::test] async fn test_slabs_empty() { - let mut client = client!("STAT active_slabs 0\r\n", "STAT total_malloced 0\r\n", "END\r\n"); + let (_rx, mut client) = client!("STAT active_slabs 0\r\n", "STAT total_malloced 0\r\n", "END\r\n"); let res = client.slabs().await.unwrap(); assert!(res.slabs.is_empty()); @@ -1502,7 +1615,7 @@ mod test { #[tokio::test] async fn test_slabs_error() { - let mut client = client!("ERROR Too many open connections\r\n"); + let (_rx, mut client) = client!("ERROR Too many open connections\r\n"); let res = client.slabs().await; assert!(res.is_err()); @@ -1512,7 +1625,7 @@ mod test { #[tokio::test] async fn test_slabs_success() { - let mut client = client!( + let (_rx, mut client) = client!( "STAT 6:chunk_size 304\r\n", "STAT 6:chunks_per_page 3449\r\n", "STAT 6:total_pages 1\r\n", @@ -1590,7 +1703,7 @@ mod test { #[tokio::test] async fn test_items_empty() { - let mut client = client!(); + let (_rx, mut client) = client!(); let res = client.items().await.unwrap(); assert!(res.is_empty()); @@ -1598,7 +1711,7 @@ mod test { #[tokio::test] async fn test_items_error() { - let mut client = client!("ERROR Too many open connections\r\n"); + let (_rx, mut client) = client!("ERROR Too many open connections\r\n"); let res = client.items().await; assert!(res.is_err()); @@ -1608,7 +1721,7 @@ mod test { #[tokio::test] async fn test_items_success() { - let mut client = client!( + let (_rx, mut client) = client!( "STAT items:39:number 3\r\n", "STAT items:39:number_hot 0\r\n", "STAT items:39:number_warm 1\r\n", @@ -1680,7 +1793,7 @@ mod test { #[tokio::test] async fn test_metas_empty() { - let mut client = client!(); + let (_rx, mut client) = client!(); let res = client.metas().await.unwrap(); assert!(res.is_empty()); @@ -1688,7 +1801,7 @@ mod test { #[tokio::test] async fn test_metas_error() { - let mut client = client!("BUSY crawler is busy\r\n",); + let (_rx, mut client) = client!("BUSY crawler is busy\r\n",); let res = client.metas().await; assert!(res.is_err()); @@ -1698,7 +1811,7 @@ mod test { #[tokio::test] async fn test_metas_success() { - let mut client = client!( + let (_rx, mut client) = client!( "key=memcached%2Fmurmur3_hash.c exp=1687216956 la=1687216656 cas=259502 fetch=yes cls=17 size=2912\r\n", "key=memcached%2Fmd5.h exp=1687216956 la=1687216656 cas=259731 fetch=yes cls=17 size=3593\r\n", "END\r\n", diff --git a/mtop/src/bin/mc.rs b/mtop/src/bin/mc.rs index e4bcc39..f1a42f5 100644 --- a/mtop/src/bin/mc.rs +++ b/mtop/src/bin/mc.rs @@ -55,13 +55,15 @@ struct McConfig { #[derive(Debug, Subcommand)] enum Action { Add(AddCommand), + Check(CheckCommand), + Decr(DecrCommand), Delete(DeleteCommand), Get(GetCommand), + Incr(IncrCommand), Keys(KeysCommand), Replace(ReplaceCommand), Set(SetCommand), Touch(TouchCommand), - Check(CheckCommand), } /// Add a value to the cache only if it does not already exist. @@ -97,10 +99,23 @@ struct CheckCommand { delay_millis: u64, } +/// Decrement the value of an item in the cache. +#[derive(Debug, Args)] +struct DecrCommand { + /// Key of the value to decrement. If the value does not exist the command will exit with + /// and error status. + #[arg(required = true)] + key: String, + + /// Amount to decrement the value by. + #[arg(required = true)] + delta: u64, +} + /// Delete an item in the cache. #[derive(Debug, Args)] struct DeleteCommand { - /// Key of the item to delete. If the item does not exist the command will exit with an + /// Key of the item to delete. If the item does not exist the command will exit with an /// error status. #[arg(required = true)] key: String, @@ -116,6 +131,19 @@ struct GetCommand { key: String, } +/// Increment the value of an item in the cach +#[derive(Debug, Args)] +struct IncrCommand { + /// Key of the value to increment. If the value does not exist the command will exit with + /// and error status. + #[arg(required = true)] + key: String, + + /// Amount to increment the value by. + #[arg(required = true)] + delta: u64, +} + /// Show keys for all items in the cache. #[derive(Debug, Args)] struct KeysCommand { @@ -228,6 +256,12 @@ async fn main() -> Result<(), Box> { tracing::warn!(message = "error writing output", error = %e); } } + Action::Decr(c) => { + if let Err(e) = client.decr(&c.key, c.delta).await { + tracing::error!(message = "unable to decrement value", key = c.key, host = opts.host, error = %e); + process::exit(1); + } + } Action::Delete(c) => { if let Err(e) = client.delete(&c.key).await { tracing::error!(message = "unable to delete item", key = c.key, host = opts.host, error = %e); @@ -246,6 +280,12 @@ async fn main() -> Result<(), Box> { } } } + Action::Incr(c) => { + if let Err(e) = client.incr(&c.key, c.delta).await { + tracing::error!(message = "unable to increment value", key = c.key, host = opts.host, error = %e); + process::exit(1); + } + } Action::Keys(c) => { let mut metas = client.metas().await.unwrap_or_else(|e| { tracing::error!(message = "unable to list keys", host = opts.host, error = %e);