Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: handle expire with literally expire directly #9

Merged
merged 1 commit into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion example/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
stdout = true
level = "debug"
[server]
bind = "127.0.0.1:6666"
bind = "127.0.0.1:6667"
[storage]
path = "/tmp/roxy"
secondary_path = "/tmp/roxy2"
Expand Down
28 changes: 20 additions & 8 deletions src/commands/src/commands/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ use std::sync::Once;

use bytes::Bytes;
use common_base::bytes::StringBytes;
use common_base::resp3;
use common_base::timestamp::timestamp_ms;
use once_cell::sync::Lazy;
use redis_protocol::resp3::types::{BytesFrame, FrameKind};
use roxy::database::Database;
use roxy::datatypes::string::{RedisString, StringSetArgs, StringSetArgsBuilder, StringSetType};
use roxy::storage::Storage;
use snafu::ResultExt;

use super::{Command, CommandInstance, CommandTypeInfo, GlobalCommandTable};
use crate::error::{FailInStorageSnafu, InvalidCmdSyntaxSnafu, Result};
use crate::parser::{
chain, key, keyword, optional, string, ttl, value_of_type, Parser, TTLOption, Tokens,
chain, expire, key, keyword, optional, string, value_of_type, ExpireOption, Parser, Tokens,
};
use crate::{command_type_stub, register};

Expand Down Expand Up @@ -61,11 +64,11 @@ impl CommandInstance for Set {
.is_some();
set_args_builder.get(get);

let ttl = optional(ttl()).parse(&mut tokens).unwrap();
if let Some(ttl) = ttl {
match ttl {
TTLOption::TTL(ttl) => set_args_builder.ttl(ttl),
TTLOption::KeepTTL => set_args_builder.keep_ttl(true),
let expire = optional(expire()).parse(&mut tokens).unwrap();
if let Some(expire) = expire {
match expire {
ExpireOption::Expire(expire) => set_args_builder.expire(Some(expire)),
ExpireOption::KeepTTL => set_args_builder.keep_ttl(true),
};
}

Expand All @@ -84,12 +87,21 @@ impl CommandInstance for Set {
fn execute(&mut self, storage: &Storage, namespace: Bytes) -> Result<BytesFrame> {
let db = RedisString::new(storage, namespace.into());
let args = self.args.take().unwrap();
// TODO: handle ttl
if args
.set_args
.expire
.is_some_and(|expire| expire < timestamp_ms())
{
db.db()
.delete(args.key.clone().into())
.context(FailInStorageSnafu { cmd_id: Self::id() })?;
return Ok(resp3::ok());
}

db.set(args.key, args.value, &args.set_args)
.map(|opt_old| match opt_old {
Some(old) => (FrameKind::BlobString, old).try_into().unwrap(),
None if args.set_args.get => (FrameKind::SimpleString, "OK").try_into().unwrap(),
None if args.set_args.get => resp3::ok(),
None => BytesFrame::Null,
})
.context(FailInStorageSnafu { cmd_id: Self::id() })
Expand Down
41 changes: 29 additions & 12 deletions src/commands/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ pub enum ParseError {
source: Box<ParseError>,
ttl_type: String,
},
#[snafu(display("out of numeric range"))]
OutOfNumericRange,
}
pub struct Tokens<'a> {
tokens: &'a [Token],
Expand Down Expand Up @@ -118,6 +120,7 @@ where
}

mod operand {
use std::ops::Range;
use std::rc::Rc;
use std::str::FromStr;

Expand All @@ -126,8 +129,8 @@ mod operand {
use snafu::{OptionExt as _, ResultExt};

use super::{
alt, optional, InvalidValueOfTypeSnafu, MismatchedKeywordSnafu, MissKeySnafu, Parser,
TTLMissTimeSnafu, Tokens,
alt, optional, InvalidValueOfTypeSnafu, MismatchedKeywordSnafu, MissKeySnafu,
OutOfNumericRangeSnafu, Parser, TTLMissTimeSnafu, Tokens,
};

/// expect a keyword(ignore case)
Expand Down Expand Up @@ -175,6 +178,18 @@ mod operand {
}
}

pub fn number<'a, N: FromStr + PartialOrd + Copy>(
rng: Range<N>,
) -> impl Parser<'a, Output = N> {
move |s: &mut Tokens<'a>| {
let value = value_of_type::<N>().parse(s)?;
if !rng.contains(&value) {
OutOfNumericRangeSnafu.fail()?;
}
Ok(value)
}
}

pub fn key<'a>() -> impl Parser<'a, Output = Bytes> {
move |s: &mut Tokens<'a>| match s.peek() {
Some(first) => s.yield_value(first.clone().into()),
Expand All @@ -198,11 +213,11 @@ mod operand {
}
}

pub enum TTLOption {
TTL(u64),
pub enum ExpireOption {
Expire(u64),
KeepTTL,
}
pub fn ttl<'a>() -> impl Parser<'a, Output = TTLOption> {
pub fn expire<'a>() -> impl Parser<'a, Output = ExpireOption> {
move |s: &mut Tokens<'a>| {
let ttl_type = optional(alt((
keyword("EX"),
Expand All @@ -214,14 +229,16 @@ mod operand {
.parse(s)
.unwrap();
if let Some(ttl_type) = ttl_type {
let ttl = value_of_type::<u64>().parse(s).context(TTLMissTimeSnafu {
ttl_type: ttl_type.clone(),
})?;
let ttl = number::<u64>(1u64..u64::MAX)
.parse(s)
.context(TTLMissTimeSnafu {
ttl_type: ttl_type.clone(),
})?;
Ok(match ttl_type.as_utf8() {
"EX" => TTLOption::TTL(ttl * 1000),
"PX" => TTLOption::TTL(ttl),
"PXAT" => TTLOption::TTL(ttl - timestamp_ms()),
"EXAT" => TTLOption::TTL(ttl * 1000 - timestamp_ms()),
"EX" => ExpireOption::Expire(ttl * 1000 + timestamp_ms()),
"EXAT" => ExpireOption::Expire(ttl * 1000),
"PX" => ExpireOption::Expire(ttl + timestamp_ms()),
"PXAT" => ExpireOption::Expire(ttl),
_ => unreachable!(),
})
} else {
Expand Down
9 changes: 5 additions & 4 deletions src/common/base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ edition.workspace = true
license.workspace = true

[dependencies]
binrw.workspace = true
bytes.workspace = true
parking_lot.workspace = true
serde.workspace = true
binrw.workspace = true
bytes.workspace = true
parking_lot.workspace = true
redis-protocol.workspace = true
serde.workspace = true
6 changes: 6 additions & 0 deletions src/common/base/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ impl PartialEq<Bytes> for [u8] {
}
}

impl AsRef<[u8]> for Bytes {
fn as_ref(&self) -> &[u8] {
&self.0
}
}

pub struct BytesReader(NoSeek<Reader<bytes::Bytes>>);

impl std::io::Read for BytesReader {
Expand Down
1 change: 1 addition & 0 deletions src/common/base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@

pub mod bytes;
pub mod lock_pool;
pub mod resp3;
pub mod timestamp;
25 changes: 25 additions & 0 deletions src/common/base/src/resp3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2024 Rudeus Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use bytes::Bytes;
use redis_protocol::resp3::types::BytesFrame;

/// Create a simple string response with "OK".
pub fn ok() -> BytesFrame {
static OK: bytes::Bytes = Bytes::from_static(b"OK");
BytesFrame::SimpleString {
data: OK.clone(),
attributes: None,
}
}
2 changes: 1 addition & 1 deletion src/common/base/src/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ fn timestamp() -> Duration {
}

pub fn timestamp_ms() -> u64 {
timestamp().as_micros() as u64
timestamp().as_millis() as u64
}
43 changes: 34 additions & 9 deletions src/roxy/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
use common_base::bytes::Bytes;
use common_base::lock_pool;
use rocksdb::{AsColumnFamilyRef, WriteBatch, WriteOptions};
use snafu::OptionExt;
use strum::VariantArray;

use crate::error::{DatatypeMismatchedSnafu, KeyExpiredSnafu, Result};
use crate::error::{DatatypeMismatchedSnafu, KeyExpiredSnafu, KeyNotFoundSnafu, Result};
use crate::metadata::{self, Metadata, RedisType};
use crate::storage::{ColumnFamilyId, Storage};

Expand Down Expand Up @@ -53,9 +55,9 @@ pub trait Database {
types: &[RedisType],
ns_key: Bytes,
) -> Result<Option<(metadata::Metadata, Bytes)>> {
let raw_metadata = self.get_raw_metadata(options, ns_key)?;
if let Some(raw_metadata) = raw_metadata {
Ok(Some(self.parse_metadata(types, raw_metadata)?))
let raw_data = self.get_raw_data(options, ns_key)?;
if let Some(raw_data) = raw_data {
Ok(self.validate_metadata(types, raw_data)?)
} else {
Ok(None)
}
Expand All @@ -65,16 +67,23 @@ pub trait Database {
/// "raw metadata" from the storage engine without parsing
/// it to [`Metadata`] type.
///
fn get_raw_metadata(&self, options: GetOptions, ns_key: Bytes) -> Result<Option<Bytes>>;
fn get_raw_data(&self, options: GetOptions, ns_key: Bytes) -> Result<Option<Bytes>>;

/// [`parse_metadata`] parse the [`Metadata`] from input bytes and return
/// the rest part of the input bytes.
fn parse_metadata(&self, types: &[RedisType], input: Bytes) -> Result<(Metadata, Bytes)> {
/// if the input bytes is not a valid metadata, it will return an error.
/// if the key is expired, it will return None.
fn validate_metadata(
&self,
types: &[RedisType],
input: Bytes,
) -> Result<Option<(Metadata, Bytes)>> {
let mut reader = input.reader();
let metadata = Metadata::decode_from(&mut reader)?;
let rest = reader.into_inner();

if metadata.expired() {
return KeyExpiredSnafu.fail();
return Ok(None);
}

if !types.contains(&metadata.datatype()) {
Expand All @@ -84,7 +93,7 @@ pub trait Database {
if metadata.size() == 0 && !metadata.datatype().is_emptyable() {
return DatatypeMismatchedSnafu.fail();
}
Ok((metadata, rest))
Ok(Some((metadata, rest)))
}

fn lock_key(&self, key: Bytes) -> Self::KeyLockGuard<'_>;
Expand All @@ -94,6 +103,8 @@ pub trait Database {
fn get_cf_ref(&self) -> impl AsColumnFamilyRef;

fn write(&self, opts: &WriteOptions, batch: WriteBatch) -> Result<()>;

fn delete(&self, key: Bytes) -> Result<()>;
}

/// [`Roxy`] is a wrapper of storage engine, it provides
Expand Down Expand Up @@ -123,7 +134,7 @@ impl<'s> Database for Roxy<'s> {
type KeyLockGuard<'a> = lock_pool::MutexGuard<'a>
where Self: 'a;

fn get_raw_metadata(&self, options: GetOptions, ns_key: Bytes) -> Result<Option<Bytes>> {
fn get_raw_data(&self, options: GetOptions, ns_key: Bytes) -> Result<Option<Bytes>> {
let mut opts = rocksdb::ReadOptions::default();
if options.with_snapshot {
opts.set_snapshot(&self.storage.db().snapshot());
Expand All @@ -150,6 +161,20 @@ impl<'s> Database for Roxy<'s> {
fn write(&self, opts: &WriteOptions, updates: WriteBatch) -> Result<()> {
self.storage.write(opts, updates)
}

fn delete(&self, key: Bytes) -> Result<()> {
let ns_key = self.encode_namespace_prefix(key);
let _guard = self.lock_key(ns_key.clone());
let metadata = self
.get_metadata(GetOptions::default(), RedisType::VARIANTS, ns_key.clone())?
.context(KeyNotFoundSnafu)?;
if metadata.expired() {
return KeyExpiredSnafu.fail();
}

self.storage
.delete(&WriteOptions::default(), self.get_cf_ref(), ns_key)
}
}

#[derive(Default)]
Expand Down
Loading