Skip to content

Commit

Permalink
fix: parse large timestamp (#2185)
Browse files Browse the repository at this point in the history
* feat: support parsing large timestamp values

* chore: update sqlness tests

* fix: tests

* fix: allow larger window
  • Loading branch information
v0y4g3r authored Aug 24, 2023
1 parent 4ee1034 commit 19d2d77
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 48 deletions.
121 changes: 112 additions & 9 deletions src/common/time/src/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ use crate::error::{ArithmeticOverflowSnafu, Error, ParseTimestampSnafu, Timestam
use crate::timezone::TimeZone;
use crate::util::{div_ceil, format_utc_datetime, local_datetime_to_utc};

/// Timestamp represents the value of units(seconds/milliseconds/microseconds/nanoseconds) elapsed
/// since UNIX epoch. The valid value range of [Timestamp] depends on it's unit (all in UTC time zone):
/// - for [TimeUnit::Second]: [-262144-01-01 00:00:00, +262143-12-31 23:59:59]
/// - for [TimeUnit::Millisecond]: [-262144-01-01 00:00:00.000, +262143-12-31 23:59:59.999]
/// - for [TimeUnit::Microsecond]: [-262144-01-01 00:00:00.000000, +262143-12-31 23:59:59.999999]
/// - for [TimeUnit::Nanosecond]: [1677-09-21 00:12:43.145225, 2262-04-11 23:47:16.854775807]
///
/// # Note:
/// For values out of range, you can still store these timestamps, but while performing arithmetic
/// or formatting operations, it will return an error or just overflow.
#[derive(Debug, Clone, Default, Copy, Serialize, Deserialize)]
pub struct Timestamp {
value: i64,
Expand Down Expand Up @@ -169,6 +179,28 @@ impl Timestamp {
(sec_div, nsec)
}

/// Creates a new Timestamp instance from seconds and nanoseconds parts.
/// Returns None if overflow.
fn from_splits(sec: i64, nsec: u32) -> Option<Self> {
if nsec == 0 {
Some(Timestamp::new_second(sec))
} else if nsec % 1_000_000 == 0 {
let millis = nsec / 1_000_000;
sec.checked_mul(1000)
.and_then(|v| v.checked_add(millis as i64))
.map(Timestamp::new_millisecond)
} else if nsec % 1000 == 0 {
let micros = nsec / 1000;
sec.checked_mul(1_000_000)
.and_then(|v| v.checked_add(micros as i64))
.map(Timestamp::new_microsecond)
} else {
sec.checked_mul(1_000_000_000)
.and_then(|v| v.checked_add(nsec as i64))
.map(Timestamp::new_nanosecond)
}
}

/// Format timestamp to ISO8601 string. If the timestamp exceeds what chrono timestamp can
/// represent, this function simply print the timestamp unit and value in plain string.
pub fn to_iso8601_string(&self) -> String {
Expand Down Expand Up @@ -205,6 +237,12 @@ impl Timestamp {
let (sec, nsec) = self.split();
NaiveDateTime::from_timestamp_opt(sec, nsec)
}

pub fn from_chrono_datetime(ndt: NaiveDateTime) -> Option<Self> {
let sec = ndt.timestamp();
let nsec = ndt.timestamp_subsec_nanos();
Timestamp::from_splits(sec, nsec)
}
}

impl FromStr for Timestamp {
Expand All @@ -225,13 +263,16 @@ impl FromStr for Timestamp {
// RFC3339 timestamp (with a T)
let s = s.trim();
if let Ok(ts) = DateTime::parse_from_rfc3339(s) {
return Ok(Timestamp::new(ts.timestamp_nanos(), TimeUnit::Nanosecond));
return Timestamp::from_chrono_datetime(ts.naive_utc())
.context(ParseTimestampSnafu { raw: s });
}
if let Ok(ts) = DateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f%:z") {
return Ok(Timestamp::new(ts.timestamp_nanos(), TimeUnit::Nanosecond));
return Timestamp::from_chrono_datetime(ts.naive_utc())
.context(ParseTimestampSnafu { raw: s });
}
if let Ok(ts) = Utc.datetime_from_str(s, "%Y-%m-%d %H:%M:%S%.fZ") {
return Ok(Timestamp::new(ts.timestamp_nanos(), TimeUnit::Nanosecond));
return Timestamp::from_chrono_datetime(ts.naive_utc())
.context(ParseTimestampSnafu { raw: s });
}

if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
Expand Down Expand Up @@ -264,7 +305,7 @@ fn naive_datetime_to_timestamp(
match local_datetime_to_utc(&datetime) {
LocalResult::None => ParseTimestampSnafu { raw: s }.fail(),
LocalResult::Single(utc) | LocalResult::Ambiguous(utc, _) => {
Ok(Timestamp::new(utc.timestamp_nanos(), TimeUnit::Nanosecond))
Timestamp::from_chrono_datetime(utc).context(ParseTimestampSnafu { raw: s })
}
}
}
Expand Down Expand Up @@ -608,11 +649,7 @@ mod tests {
// but expected timestamp is in UTC timezone
fn check_from_str(s: &str, expect: &str) {
let ts = Timestamp::from_str(s).unwrap();
let time = NaiveDateTime::from_timestamp_opt(
ts.value / 1_000_000_000,
(ts.value % 1_000_000_000) as u32,
)
.unwrap();
let time = ts.to_chrono_datetime().unwrap();
assert_eq!(expect, time.to_string());
}

Expand Down Expand Up @@ -1049,4 +1086,70 @@ mod tests {
TimeUnit::from(ArrowTimeUnit::Nanosecond)
);
}

fn check_conversion(ts: Timestamp, valid: bool) {
let Some(t2) = ts.to_chrono_datetime() else {
if valid {
panic!("Cannot convert {:?} to Chrono NaiveDateTime", ts);
}
return;
};
let Some(t3) = Timestamp::from_chrono_datetime(t2) else {
if valid {
panic!("Cannot convert Chrono NaiveDateTime {:?} to Timestamp", t2);
}
return;
};

assert_eq!(t3, ts);
}

#[test]
fn test_from_naive_date_time() {
let min_sec = Timestamp::new_second(-8334632851200);
let max_sec = Timestamp::new_second(8210298412799);
check_conversion(min_sec, true);
check_conversion(Timestamp::new_second(min_sec.value - 1), false);
check_conversion(max_sec, true);
check_conversion(Timestamp::new_second(max_sec.value + 1), false);

let min_millis = Timestamp::new_millisecond(-8334632851200000);
let max_millis = Timestamp::new_millisecond(8210298412799999);
check_conversion(min_millis, true);
check_conversion(Timestamp::new_millisecond(min_millis.value - 1), false);
check_conversion(max_millis, true);
check_conversion(Timestamp::new_millisecond(max_millis.value + 1), false);

let min_micros = Timestamp::new_microsecond(-8334632851200000000);
let max_micros = Timestamp::new_microsecond(8210298412799999999);
check_conversion(min_micros, true);
check_conversion(Timestamp::new_microsecond(min_micros.value - 1), false);
check_conversion(max_micros, true);
check_conversion(Timestamp::new_microsecond(max_micros.value + 1), false);

let min_nanos = Timestamp::new_nanosecond(-9223372036854775000);
let max_nanos = Timestamp::new_nanosecond(i64::MAX);
check_conversion(min_nanos, true);
check_conversion(Timestamp::new_nanosecond(min_nanos.value - 1), false);
check_conversion(max_nanos, true);
}

#[test]
fn test_parse_timestamp_range() {
let valid_strings = vec![
"-262144-01-01 00:00:00Z",
"+262143-12-31 23:59:59Z",
"-262144-01-01 00:00:00Z",
"+262143-12-31 23:59:59.999Z",
"-262144-01-01 00:00:00Z",
"+262143-12-31 23:59:59.999999Z",
"1677-09-21 00:12:43.145225Z",
"2262-04-11 23:47:16.854775807Z",
"+100000-01-01 00:00:01.5Z",
];

for s in valid_strings {
Timestamp::from_str(s).unwrap();
}
}
}
31 changes: 17 additions & 14 deletions src/query/src/optimizer/type_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,14 +276,16 @@ fn timestamp_to_timestamp_ms_expr(val: i64, unit: TimeUnit) -> Expr {
}

fn string_to_timestamp_ms(string: &str) -> Result<ScalarValue> {
Ok(ScalarValue::TimestampMillisecond(
Some(
Timestamp::from_str(string)
.map(|t| t.value() / 1_000_000)
.map_err(|e| DataFusionError::External(Box::new(e)))?,
),
None,
))
let ts = Timestamp::from_str(string).map_err(|e| DataFusionError::External(Box::new(e)))?;

let value = Some(ts.value());
let scalar = match ts.unit() {
TimeUnit::Second => ScalarValue::TimestampSecond(value, None),
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(value, None),
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(value, None),
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(value, None),
};
Ok(scalar)
}

#[cfg(test)]
Expand All @@ -302,11 +304,11 @@ mod tests {
fn test_string_to_timestamp_ms() {
assert_eq!(
string_to_timestamp_ms("2022-02-02 19:00:00+08:00").unwrap(),
ScalarValue::TimestampMillisecond(Some(1643799600000), None)
ScalarValue::TimestampSecond(Some(1643799600), None)
);
assert_eq!(
string_to_timestamp_ms("2009-02-13 23:31:30Z").unwrap(),
ScalarValue::TimestampMillisecond(Some(1234567890000), None)
ScalarValue::TimestampSecond(Some(1234567890), None)
);
}

Expand Down Expand Up @@ -366,9 +368,10 @@ mod tests {
let mut converter = TypeConverter { schema };

assert_eq!(
Expr::Column(Column::from_name("ts")).gt(Expr::Literal(
ScalarValue::TimestampMillisecond(Some(1599514949000), None)
)),
Expr::Column(Column::from_name("ts")).gt(Expr::Literal(ScalarValue::TimestampSecond(
Some(1599514949),
None
))),
converter
.mutate(
Expr::Column(Column::from_name("ts")).gt(Expr::Literal(ScalarValue::Utf8(
Expand Down Expand Up @@ -440,7 +443,7 @@ mod tests {
.unwrap();
let expected = String::from(
"Aggregate: groupBy=[[]], aggr=[[COUNT(column1)]]\
\n Filter: column3 > TimestampMillisecond(-28800000, None)\
\n Filter: column3 > TimestampSecond(-28800, None)\
\n Values: (Int64(1), Float64(1), TimestampMillisecond(1, None))",
);
assert_eq!(format!("{}", transformed_plan.display_indent()), expected);
Expand Down
36 changes: 17 additions & 19 deletions src/storage/src/window_infer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,21 @@ use crate::memtable::MemtableStats;
use crate::sst::FileMeta;

/// A set of predefined time windows.
const TIME_WINDOW_SIZE: [i64; 10] = [
1, // 1 second
60, // 1 minute
60 * 10, // 10 minutes
60 * 30, // 30 minutes
60 * 60, // 1 hour
2 * 60 * 60, // 2 hours
6 * 60 * 60, // 6 hours
12 * 60 * 60, // 12 hours
24 * 60 * 60, // 1 day
7 * 24 * 60 * 60, // 1 week
const TIME_WINDOW_SIZE: [i64; 14] = [
1, // 1 second
60, // 1 minute
60 * 10, // 10 minutes
60 * 30, // 30 minutes
60 * 60, // 1 hour
2 * 60 * 60, // 2 hours
6 * 60 * 60, // 6 hours
12 * 60 * 60, // 12 hours
24 * 60 * 60, // 1 day
7 * 24 * 60 * 60, // 1 week
30 * 24 * 60 * 60, // 1 month
12 * 30 * 24 * 60 * 60, // 1 year
10 * 12 * 30 * 24 * 60 * 60, // 10 years
100 * 12 * 30 * 24 * 60 * 60, // 100 years
];

/// [WindowInfer] infers the time windows that can be used to optimize table scans ordered by
Expand Down Expand Up @@ -180,14 +184,8 @@ mod tests {
assert_eq!(12 * 60 * 60, duration_to_window_size(21601, 21601));
assert_eq!(24 * 60 * 60, duration_to_window_size(43201, 43201));
assert_eq!(7 * 24 * 60 * 60, duration_to_window_size(604799, 604799));
assert_eq!(
7 * 24 * 60 * 60,
duration_to_window_size(31535999, 31535999)
);
assert_eq!(
7 * 24 * 60 * 60,
duration_to_window_size(i64::MAX, i64::MAX)
);
assert_eq!(311040000, duration_to_window_size(31535999, 31535999));
assert_eq!(3110400000, duration_to_window_size(i64::MAX, i64::MAX));
}

#[test]
Expand Down
55 changes: 49 additions & 6 deletions tests/cases/standalone/common/timestamp/timestamp.result
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,57 @@ INSERT INTO timestamp_with_precision(ts,cnt) VALUES ('2023-04-04 08:00:00.0052+0

Affected Rows: 1

INSERT INTO timestamp_with_precision(ts,cnt) VALUES ('+100000-01-01 00:00:01.5Z', 3);

Affected Rows: 1

INSERT INTO timestamp_with_precision(ts,cnt) VALUES ('-262144-01-01 00:00:00Z', 4);

Affected Rows: 1

INSERT INTO timestamp_with_precision(ts,cnt) VALUES ('+262143-12-31 23:59:59Z', 5);

Affected Rows: 1

INSERT INTO timestamp_with_precision(ts,cnt) VALUES ('-262144-01-01 00:00:00Z', 6);

Affected Rows: 1

INSERT INTO timestamp_with_precision(ts,cnt) VALUES ('+262143-12-31 23:59:59.999Z', 7);

Affected Rows: 1

INSERT INTO timestamp_with_precision(ts,cnt) VALUES ('-262144-01-01 00:00:00Z', 8);

Affected Rows: 1

INSERT INTO timestamp_with_precision(ts,cnt) VALUES ('+262143-12-31 23:59:59.999999Z', 9);

Affected Rows: 1

INSERT INTO timestamp_with_precision(ts,cnt) VALUES ('1677-09-21 00:12:43.145225Z', 10);

Affected Rows: 1

INSERT INTO timestamp_with_precision(ts,cnt) VALUES ('2262-04-11 23:47:16.854775807Z', 11);

Affected Rows: 1

SELECT * FROM timestamp_with_precision ORDER BY ts ASC;

+----------------------------+-----+
| ts | cnt |
+----------------------------+-----+
| 2023-04-04T00:00:00.005200 | 2 |
| 2023-04-04T08:00:00.005200 | 1 |
+----------------------------+-----+
+-------------------------------+-----+
| ts | cnt |
+-------------------------------+-----+
| -262144-01-01T00:00:00 | 8 |
| 1677-09-21T00:12:43.145225 | 10 |
| 2023-04-04T00:00:00.005200 | 2 |
| 2023-04-04T08:00:00.005200 | 1 |
| 2262-04-11T23:47:16.854775 | 11 |
| +100000-01-01T00:00:01.500 | 3 |
| +262143-12-31T23:59:59 | 5 |
| +262143-12-31T23:59:59.999 | 7 |
| +262143-12-31T23:59:59.999999 | 9 |
+-------------------------------+-----+

DROP TABLE timestamp_with_precision;

Expand Down
18 changes: 18 additions & 0 deletions tests/cases/standalone/common/timestamp/timestamp.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,24 @@ INSERT INTO timestamp_with_precision(ts,cnt) VALUES ('2023-04-04 08:00:00.0052+0

INSERT INTO timestamp_with_precision(ts,cnt) VALUES ('2023-04-04 08:00:00.0052+0800', 2);

INSERT INTO timestamp_with_precision(ts,cnt) VALUES ('+100000-01-01 00:00:01.5Z', 3);

INSERT INTO timestamp_with_precision(ts,cnt) VALUES ('-262144-01-01 00:00:00Z', 4);

INSERT INTO timestamp_with_precision(ts,cnt) VALUES ('+262143-12-31 23:59:59Z', 5);

INSERT INTO timestamp_with_precision(ts,cnt) VALUES ('-262144-01-01 00:00:00Z', 6);

INSERT INTO timestamp_with_precision(ts,cnt) VALUES ('+262143-12-31 23:59:59.999Z', 7);

INSERT INTO timestamp_with_precision(ts,cnt) VALUES ('-262144-01-01 00:00:00Z', 8);

INSERT INTO timestamp_with_precision(ts,cnt) VALUES ('+262143-12-31 23:59:59.999999Z', 9);

INSERT INTO timestamp_with_precision(ts,cnt) VALUES ('1677-09-21 00:12:43.145225Z', 10);

INSERT INTO timestamp_with_precision(ts,cnt) VALUES ('2262-04-11 23:47:16.854775807Z', 11);

SELECT * FROM timestamp_with_precision ORDER BY ts ASC;

DROP TABLE timestamp_with_precision;

0 comments on commit 19d2d77

Please sign in to comment.