Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions nodedb-query/src/functions/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,68 @@ pub(super) fn try_eval(name: &str, args: &[Value]) -> Option<Value> {
Err(_) => Value::Null,
}
}),
"time_bucket" => eval_time_bucket(args),
_ => return None,
};
Some(v)
}

/// `time_bucket(interval, timestamp)` — truncate a millisecond timestamp
/// to the start of the given interval bucket.
///
/// Accepts two argument orders (both common in SQL):
/// - `time_bucket('1 hour', timestamp_col)` — interval first
/// - `time_bucket(timestamp_col, '1 hour')` — timestamp first
///
/// The interval is a string like `'1h'`, `'5m'`, `'1 hour'`, `'30 seconds'`.
/// The timestamp is an integer (epoch milliseconds).
fn eval_time_bucket(args: &[Value]) -> Value {
if args.len() < 2 {
return Value::Null;
}

// Detect which arg is the interval string and which is the timestamp.
let (interval_ms, timestamp_ms) = match (&args[0], &args[1]) {
// time_bucket('1 hour', timestamp)
(Value::String(s), ts_val) => {
let interval = parse_interval_to_ms(s);
let ts = value_to_timestamp_ms(ts_val);
(interval, ts)
}
// time_bucket(timestamp, '1 hour')
(ts_val, Value::String(s)) => {
let interval = parse_interval_to_ms(s);
let ts = value_to_timestamp_ms(ts_val);
(interval, ts)
}
// time_bucket(3600, timestamp) — interval as integer seconds
(Value::Integer(interval_secs), ts_val) => {
let ts = value_to_timestamp_ms(ts_val);
(Some((*interval_secs) * 1000), ts)
}
_ => return Value::Null,
};

match (interval_ms, timestamp_ms) {
(Some(i), Some(ts)) if i > 0 => Value::Integer((ts / i) * i),
_ => Value::Null,
}
}

fn value_to_timestamp_ms(v: &Value) -> Option<i64> {
match v {
Value::Integer(n) => Some(*n),
Value::Float(f) => Some(*f as i64),
_ => None,
}
}

/// Parse an interval string like "1h", "5m", "1 hour", "30 seconds" to ms.
///
/// Delegates to the canonical `nodedb_types::kv_parsing::parse_interval_to_ms`.
fn parse_interval_to_ms(s: &str) -> Option<i64> {
nodedb_types::kv_parsing::parse_interval_to_ms(s)
.ok()
.map(|ms| ms as i64)
.filter(|&ms| ms > 0)
}
1 change: 1 addition & 0 deletions nodedb-sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ license.workspace = true
description = "SQL parser, planner, and optimizer for NodeDB"

[dependencies]
nodedb-types = { path = "../nodedb-types" }
sqlparser = "0.61"
thiserror = { workspace = true }
212 changes: 162 additions & 50 deletions nodedb-sql/src/planner/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub fn plan_aggregate(
&aggregates,
filters,
&select.group_by,
&select.projection,
functions,
);
}
Expand Down Expand Up @@ -65,6 +66,7 @@ fn plan_timeseries_aggregate(
aggregates: &[AggregateExpr],
filters: &[Filter],
raw_group_by: &GroupByExpr,
select_items: &[ast::SelectItem],
functions: &FunctionRegistry,
) -> Result<SqlPlan> {
let mut bucket_interval_ms: i64 = 0;
Expand All @@ -73,22 +75,16 @@ fn plan_timeseries_aggregate(
// Check for time_bucket in GROUP BY.
if let GroupByExpr::Expressions(exprs, _) = raw_group_by {
for expr in exprs {
if let ast::Expr::Function(func) = expr {
let name = func
.name
.0
.iter()
.map(|p| match p {
ast::ObjectNamePart::Identifier(ident) => normalize_ident(ident),
_ => String::new(),
})
.collect::<Vec<_>>()
.join(".");
if functions.search_trigger(&name) == SearchTrigger::TimeBucket {
bucket_interval_ms = extract_bucket_interval(func)?;
continue;
}
// Resolve the expression: GROUP BY alias or GROUP BY ordinal (1-based)
// should resolve to the corresponding SELECT item expression.
let resolved = resolve_group_by_expr(expr, select_items);
let check_expr = resolved.unwrap_or(expr);

if let Some(interval) = try_extract_time_bucket(check_expr, functions)? {
bucket_interval_ms = interval;
continue;
}

// Non-time_bucket GROUP BY columns.
if let ast::Expr::Identifier(ident) = expr {
group_columns.push(normalize_ident(ident));
Expand All @@ -113,55 +109,108 @@ fn plan_timeseries_aggregate(
})
}

/// Check if an expression is a time_bucket() call and extract the interval.
fn try_extract_time_bucket(expr: &ast::Expr, functions: &FunctionRegistry) -> Result<Option<i64>> {
if let ast::Expr::Function(func) = expr {
let name = func
.name
.0
.iter()
.map(|p| match p {
ast::ObjectNamePart::Identifier(ident) => normalize_ident(ident),
_ => String::new(),
})
.collect::<Vec<_>>()
.join(".");
if functions.search_trigger(&name) == SearchTrigger::TimeBucket {
return Ok(Some(extract_bucket_interval(func)?));
}
}
Ok(None)
}

/// Resolve a GROUP BY expression that references a SELECT alias or ordinal.
///
/// `GROUP BY b` where `b` is an alias → returns the aliased expression.
/// `GROUP BY 1` → returns the 1st SELECT expression (0-indexed).
fn resolve_group_by_expr<'a>(
expr: &ast::Expr,
select_items: &'a [ast::SelectItem],
) -> Option<&'a ast::Expr> {
match expr {
ast::Expr::Identifier(ident) => {
let alias_name = normalize_ident(ident);
select_items.iter().find_map(|item| {
if let ast::SelectItem::ExprWithAlias { expr, alias } = item
&& normalize_ident(alias) == alias_name
{
Some(expr)
} else {
None
}
})
}
ast::Expr::Value(v) => {
if let ast::Value::Number(n, _) = &v.value
&& let Ok(idx) = n.parse::<usize>()
&& idx >= 1
&& idx <= select_items.len()
{
match &select_items[idx - 1] {
ast::SelectItem::UnnamedExpr(e) => Some(e),
ast::SelectItem::ExprWithAlias { expr, .. } => Some(expr),
_ => None,
}
} else {
None
}
}
_ => None,
}
}

/// Extract the bucket interval from a time_bucket() call.
///
/// Handles both argument orders:
/// - `time_bucket('1 hour', timestamp)` — interval first
/// - `time_bucket(timestamp, '1 hour')` — timestamp first
/// - `time_bucket(3600, timestamp)` — integer seconds
fn extract_bucket_interval(func: &ast::Function) -> Result<i64> {
let args = match &func.args {
ast::FunctionArguments::List(args) => &args.args,
_ => return Ok(0),
};
if args.is_empty() {
return Ok(0);
}
let interval_str = match &args[0] {
ast::FunctionArg::Unnamed(ast::FunctionArgExpr::Expr(ast::Expr::Value(v))) => {
// Try each argument position for the interval literal.
for arg in args {
if let ast::FunctionArg::Unnamed(ast::FunctionArgExpr::Expr(ast::Expr::Value(v))) = arg {
match &v.value {
ast::Value::SingleQuotedString(s) => s.clone(),
_ => return Ok(0),
ast::Value::SingleQuotedString(s) => {
let ms = parse_interval_to_ms(s);
if ms > 0 {
return Ok(ms);
}
}
ast::Value::Number(n, _) => {
if let Ok(secs) = n.parse::<i64>()
&& secs > 0
{
return Ok(secs * 1000);
}
}
_ => {}
}
}
_ => return Ok(0),
};
Ok(parse_interval_to_ms(&interval_str))
}
Ok(0)
}

/// Parse an interval string to milliseconds.
///
/// Accepted forms: `"1h"`, `"15m"`, `"30s"`, `"1d"`, `"1 hour"`, `"15 minutes"`,
/// `"30 seconds"`, `"7 days"`. Plural and singular word forms both work.
/// Delegates to the canonical `nodedb_types::kv_parsing::parse_interval_to_ms`.
fn parse_interval_to_ms(s: &str) -> i64 {
let s = s.trim();
if s.is_empty() {
return 0;
}

// Split into numeric part and unit part (handles both "1h" and "1 hour").
let num_end = s
.find(|c: char| !c.is_ascii_digit() && c != '.')
.unwrap_or(s.len());
let num: i64 = s[..num_end].trim().parse().unwrap_or(0);
let unit = s[num_end..].trim();

match unit {
"s" | "sec" | "second" | "seconds" => num * 1_000,
"m" | "min" | "minute" | "minutes" => num * 60_000,
"h" | "hr" | "hour" | "hours" => num * 3_600_000,
"d" | "day" | "days" => num * 86_400_000,
"" => {
// Bare number — treat as seconds.
num * 1_000
}
_ => 0,
}
nodedb_types::kv_parsing::parse_interval_to_ms(s)
.map(|ms| ms as i64)
.unwrap_or(0)
}

/// Extract time range from filters (timestamp >= X AND timestamp <= Y).
Expand Down Expand Up @@ -279,4 +328,67 @@ mod tests {
assert_eq!(parse_interval_to_ms("1 day"), 86_400_000);
assert_eq!(parse_interval_to_ms("5 min"), 300_000);
}

/// Helper: parse a SQL SELECT and return the select body + projection.
fn parse_select(sql: &str) -> ast::Select {
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
let stmts = Parser::parse_sql(&GenericDialect {}, sql).unwrap();
match stmts.into_iter().next().unwrap() {
ast::Statement::Query(q) => match *q.body {
ast::SetExpr::Select(s) => *s,
_ => panic!("expected SELECT"),
},
_ => panic!("expected query"),
}
}

#[test]
fn resolve_group_by_alias_to_time_bucket() {
let select = parse_select(
"SELECT time_bucket('1 hour', timestamp) AS b, COUNT(*) FROM t GROUP BY b",
);
let functions = FunctionRegistry::new();

if let GroupByExpr::Expressions(exprs, _) = &select.group_by {
let resolved = resolve_group_by_expr(&exprs[0], &select.projection);
assert!(resolved.is_some(), "alias 'b' should resolve");
let interval = try_extract_time_bucket(resolved.unwrap(), &functions).unwrap();
assert_eq!(interval, Some(3_600_000));
} else {
panic!("expected GROUP BY expressions");
}
}

#[test]
fn resolve_group_by_ordinal_to_time_bucket() {
let select =
parse_select("SELECT time_bucket('5 minutes', timestamp), COUNT(*) FROM t GROUP BY 1");
let functions = FunctionRegistry::new();

if let GroupByExpr::Expressions(exprs, _) = &select.group_by {
let resolved = resolve_group_by_expr(&exprs[0], &select.projection);
assert!(resolved.is_some(), "ordinal 1 should resolve");
let interval = try_extract_time_bucket(resolved.unwrap(), &functions).unwrap();
assert_eq!(interval, Some(300_000));
} else {
panic!("expected GROUP BY expressions");
}
}

#[test]
fn resolve_group_by_plain_column_not_time_bucket() {
let select = parse_select("SELECT qtype, COUNT(*) FROM t GROUP BY qtype");
let functions = FunctionRegistry::new();

if let GroupByExpr::Expressions(exprs, _) = &select.group_by {
let resolved = resolve_group_by_expr(&exprs[0], &select.projection);
// 'qtype' is not an alias in SELECT, so resolve returns None.
assert!(resolved.is_none());
let interval = try_extract_time_bucket(&exprs[0], &functions).unwrap();
assert_eq!(interval, None);
} else {
panic!("expected GROUP BY expressions");
}
}
}
3 changes: 3 additions & 0 deletions nodedb/src/bridge/physical_plan/timeseries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub enum TimeseriesOp {
/// Empty = no gap-fill. Otherwise: "null", "prev", "linear", or literal value.
/// Only applied when `bucket_interval_ms > 0`.
gap_fill: String,
/// Serialized `Vec<ComputedColumn>` for scalar projection expressions
/// (e.g. `time_bucket('1h', timestamp)`). Applied per-row in raw scan mode.
computed_columns: Vec<u8>,
/// RLS post-scan filters (applied after time-range pruning).
rls_filters: Vec<u8>,
},
Expand Down
1 change: 1 addition & 0 deletions nodedb/src/control/planner/auto_tier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ fn build_scan_task(
group_by: group_by.to_vec(),
aggregates: aggregates.to_vec(),
gap_fill: gap_fill.to_string(),
computed_columns: Vec::new(),
rls_filters: Vec::new(),
}),
post_set_op: PostSetOp::None,
Expand Down
1 change: 1 addition & 0 deletions nodedb/src/control/planner/sql_plan_convert/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ pub(super) fn convert_aggregate(
group_by: group_strs,
aggregates: agg_pairs,
gap_fill: String::new(),
computed_columns: Vec::new(),
rls_filters: Vec::new(),
}),
post_set_op: PostSetOp::None,
Expand Down
2 changes: 2 additions & 0 deletions nodedb/src/control/planner/sql_plan_convert/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub(super) fn convert_scan(p: ScanParams<'_>) -> crate::Result<Vec<PhysicalTask>
group_by: Vec::new(),
aggregates: Vec::new(),
gap_fill: String::new(),
computed_columns: computed_bytes,
rls_filters: Vec::new(),
})
}
Expand Down Expand Up @@ -236,6 +237,7 @@ pub(super) fn convert_timeseries_scan(
group_by: group_by.to_vec(),
aggregates: agg_pairs,
gap_fill: gap_fill.to_string(),
computed_columns: Vec::new(),
rls_filters: Vec::new(),
}),
post_set_op: PostSetOp::None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub(crate) fn build_scan(fields: &TextFields, collection: &str) -> crate::Result
group_by: Vec::new(),
aggregates: Vec::new(),
gap_fill: String::new(),
computed_columns: Vec::new(),
rls_filters: Vec::new(),
}))
}
Expand Down
Loading
Loading