diff --git a/nodedb-query/src/functions/datetime.rs b/nodedb-query/src/functions/datetime.rs index fc2b2952..7b1f4d48 100644 --- a/nodedb-query/src/functions/datetime.rs +++ b/nodedb-query/src/functions/datetime.rs @@ -159,7 +159,68 @@ pub(super) fn try_eval(name: &str, args: &[Value]) -> Option { Err(_) => Value::Null, } }), + "time_bucket" => eval_time_bucket(args), _ => return None, }; Some(v) } + +/// `time_bucket(interval, timestamp)` — truncate a millisecond timestamp +/// to the start of the given interval bucket. +/// +/// Accepts two argument orders (both common in SQL): +/// - `time_bucket('1 hour', timestamp_col)` — interval first +/// - `time_bucket(timestamp_col, '1 hour')` — timestamp first +/// +/// The interval is a string like `'1h'`, `'5m'`, `'1 hour'`, `'30 seconds'`. +/// The timestamp is an integer (epoch milliseconds). +fn eval_time_bucket(args: &[Value]) -> Value { + if args.len() < 2 { + return Value::Null; + } + + // Detect which arg is the interval string and which is the timestamp. + let (interval_ms, timestamp_ms) = match (&args[0], &args[1]) { + // time_bucket('1 hour', timestamp) + (Value::String(s), ts_val) => { + let interval = parse_interval_to_ms(s); + let ts = value_to_timestamp_ms(ts_val); + (interval, ts) + } + // time_bucket(timestamp, '1 hour') + (ts_val, Value::String(s)) => { + let interval = parse_interval_to_ms(s); + let ts = value_to_timestamp_ms(ts_val); + (interval, ts) + } + // time_bucket(3600, timestamp) — interval as integer seconds + (Value::Integer(interval_secs), ts_val) => { + let ts = value_to_timestamp_ms(ts_val); + (Some((*interval_secs) * 1000), ts) + } + _ => return Value::Null, + }; + + match (interval_ms, timestamp_ms) { + (Some(i), Some(ts)) if i > 0 => Value::Integer((ts / i) * i), + _ => Value::Null, + } +} + +fn value_to_timestamp_ms(v: &Value) -> Option { + match v { + Value::Integer(n) => Some(*n), + Value::Float(f) => Some(*f as i64), + _ => None, + } +} + +/// Parse an interval string like "1h", "5m", "1 hour", "30 seconds" to ms. +/// +/// Delegates to the canonical `nodedb_types::kv_parsing::parse_interval_to_ms`. +fn parse_interval_to_ms(s: &str) -> Option { + nodedb_types::kv_parsing::parse_interval_to_ms(s) + .ok() + .map(|ms| ms as i64) + .filter(|&ms| ms > 0) +} diff --git a/nodedb-sql/Cargo.toml b/nodedb-sql/Cargo.toml index 3a095139..980e98b0 100644 --- a/nodedb-sql/Cargo.toml +++ b/nodedb-sql/Cargo.toml @@ -7,5 +7,6 @@ license.workspace = true description = "SQL parser, planner, and optimizer for NodeDB" [dependencies] +nodedb-types = { path = "../nodedb-types" } sqlparser = "0.61" thiserror = { workspace = true } diff --git a/nodedb-sql/src/planner/aggregate.rs b/nodedb-sql/src/planner/aggregate.rs index 89e796ad..8764ce4d 100644 --- a/nodedb-sql/src/planner/aggregate.rs +++ b/nodedb-sql/src/planner/aggregate.rs @@ -32,6 +32,7 @@ pub fn plan_aggregate( &aggregates, filters, &select.group_by, + &select.projection, functions, ); } @@ -65,6 +66,7 @@ fn plan_timeseries_aggregate( aggregates: &[AggregateExpr], filters: &[Filter], raw_group_by: &GroupByExpr, + select_items: &[ast::SelectItem], functions: &FunctionRegistry, ) -> Result { let mut bucket_interval_ms: i64 = 0; @@ -73,22 +75,16 @@ fn plan_timeseries_aggregate( // Check for time_bucket in GROUP BY. if let GroupByExpr::Expressions(exprs, _) = raw_group_by { for expr in exprs { - if let ast::Expr::Function(func) = expr { - let name = func - .name - .0 - .iter() - .map(|p| match p { - ast::ObjectNamePart::Identifier(ident) => normalize_ident(ident), - _ => String::new(), - }) - .collect::>() - .join("."); - if functions.search_trigger(&name) == SearchTrigger::TimeBucket { - bucket_interval_ms = extract_bucket_interval(func)?; - continue; - } + // Resolve the expression: GROUP BY alias or GROUP BY ordinal (1-based) + // should resolve to the corresponding SELECT item expression. + let resolved = resolve_group_by_expr(expr, select_items); + let check_expr = resolved.unwrap_or(expr); + + if let Some(interval) = try_extract_time_bucket(check_expr, functions)? { + bucket_interval_ms = interval; + continue; } + // Non-time_bucket GROUP BY columns. if let ast::Expr::Identifier(ident) = expr { group_columns.push(normalize_ident(ident)); @@ -113,55 +109,108 @@ fn plan_timeseries_aggregate( }) } +/// Check if an expression is a time_bucket() call and extract the interval. +fn try_extract_time_bucket(expr: &ast::Expr, functions: &FunctionRegistry) -> Result> { + if let ast::Expr::Function(func) = expr { + let name = func + .name + .0 + .iter() + .map(|p| match p { + ast::ObjectNamePart::Identifier(ident) => normalize_ident(ident), + _ => String::new(), + }) + .collect::>() + .join("."); + if functions.search_trigger(&name) == SearchTrigger::TimeBucket { + return Ok(Some(extract_bucket_interval(func)?)); + } + } + Ok(None) +} + +/// Resolve a GROUP BY expression that references a SELECT alias or ordinal. +/// +/// `GROUP BY b` where `b` is an alias → returns the aliased expression. +/// `GROUP BY 1` → returns the 1st SELECT expression (0-indexed). +fn resolve_group_by_expr<'a>( + expr: &ast::Expr, + select_items: &'a [ast::SelectItem], +) -> Option<&'a ast::Expr> { + match expr { + ast::Expr::Identifier(ident) => { + let alias_name = normalize_ident(ident); + select_items.iter().find_map(|item| { + if let ast::SelectItem::ExprWithAlias { expr, alias } = item + && normalize_ident(alias) == alias_name + { + Some(expr) + } else { + None + } + }) + } + ast::Expr::Value(v) => { + if let ast::Value::Number(n, _) = &v.value + && let Ok(idx) = n.parse::() + && idx >= 1 + && idx <= select_items.len() + { + match &select_items[idx - 1] { + ast::SelectItem::UnnamedExpr(e) => Some(e), + ast::SelectItem::ExprWithAlias { expr, .. } => Some(expr), + _ => None, + } + } else { + None + } + } + _ => None, + } +} + /// Extract the bucket interval from a time_bucket() call. +/// +/// Handles both argument orders: +/// - `time_bucket('1 hour', timestamp)` — interval first +/// - `time_bucket(timestamp, '1 hour')` — timestamp first +/// - `time_bucket(3600, timestamp)` — integer seconds fn extract_bucket_interval(func: &ast::Function) -> Result { let args = match &func.args { ast::FunctionArguments::List(args) => &args.args, _ => return Ok(0), }; - if args.is_empty() { - return Ok(0); - } - let interval_str = match &args[0] { - ast::FunctionArg::Unnamed(ast::FunctionArgExpr::Expr(ast::Expr::Value(v))) => { + // Try each argument position for the interval literal. + for arg in args { + if let ast::FunctionArg::Unnamed(ast::FunctionArgExpr::Expr(ast::Expr::Value(v))) = arg { match &v.value { - ast::Value::SingleQuotedString(s) => s.clone(), - _ => return Ok(0), + ast::Value::SingleQuotedString(s) => { + let ms = parse_interval_to_ms(s); + if ms > 0 { + return Ok(ms); + } + } + ast::Value::Number(n, _) => { + if let Ok(secs) = n.parse::() + && secs > 0 + { + return Ok(secs * 1000); + } + } + _ => {} } } - _ => return Ok(0), - }; - Ok(parse_interval_to_ms(&interval_str)) + } + Ok(0) } /// Parse an interval string to milliseconds. /// -/// Accepted forms: `"1h"`, `"15m"`, `"30s"`, `"1d"`, `"1 hour"`, `"15 minutes"`, -/// `"30 seconds"`, `"7 days"`. Plural and singular word forms both work. +/// Delegates to the canonical `nodedb_types::kv_parsing::parse_interval_to_ms`. fn parse_interval_to_ms(s: &str) -> i64 { - let s = s.trim(); - if s.is_empty() { - return 0; - } - - // Split into numeric part and unit part (handles both "1h" and "1 hour"). - let num_end = s - .find(|c: char| !c.is_ascii_digit() && c != '.') - .unwrap_or(s.len()); - let num: i64 = s[..num_end].trim().parse().unwrap_or(0); - let unit = s[num_end..].trim(); - - match unit { - "s" | "sec" | "second" | "seconds" => num * 1_000, - "m" | "min" | "minute" | "minutes" => num * 60_000, - "h" | "hr" | "hour" | "hours" => num * 3_600_000, - "d" | "day" | "days" => num * 86_400_000, - "" => { - // Bare number — treat as seconds. - num * 1_000 - } - _ => 0, - } + nodedb_types::kv_parsing::parse_interval_to_ms(s) + .map(|ms| ms as i64) + .unwrap_or(0) } /// Extract time range from filters (timestamp >= X AND timestamp <= Y). @@ -279,4 +328,67 @@ mod tests { assert_eq!(parse_interval_to_ms("1 day"), 86_400_000); assert_eq!(parse_interval_to_ms("5 min"), 300_000); } + + /// Helper: parse a SQL SELECT and return the select body + projection. + fn parse_select(sql: &str) -> ast::Select { + use sqlparser::dialect::GenericDialect; + use sqlparser::parser::Parser; + let stmts = Parser::parse_sql(&GenericDialect {}, sql).unwrap(); + match stmts.into_iter().next().unwrap() { + ast::Statement::Query(q) => match *q.body { + ast::SetExpr::Select(s) => *s, + _ => panic!("expected SELECT"), + }, + _ => panic!("expected query"), + } + } + + #[test] + fn resolve_group_by_alias_to_time_bucket() { + let select = parse_select( + "SELECT time_bucket('1 hour', timestamp) AS b, COUNT(*) FROM t GROUP BY b", + ); + let functions = FunctionRegistry::new(); + + if let GroupByExpr::Expressions(exprs, _) = &select.group_by { + let resolved = resolve_group_by_expr(&exprs[0], &select.projection); + assert!(resolved.is_some(), "alias 'b' should resolve"); + let interval = try_extract_time_bucket(resolved.unwrap(), &functions).unwrap(); + assert_eq!(interval, Some(3_600_000)); + } else { + panic!("expected GROUP BY expressions"); + } + } + + #[test] + fn resolve_group_by_ordinal_to_time_bucket() { + let select = + parse_select("SELECT time_bucket('5 minutes', timestamp), COUNT(*) FROM t GROUP BY 1"); + let functions = FunctionRegistry::new(); + + if let GroupByExpr::Expressions(exprs, _) = &select.group_by { + let resolved = resolve_group_by_expr(&exprs[0], &select.projection); + assert!(resolved.is_some(), "ordinal 1 should resolve"); + let interval = try_extract_time_bucket(resolved.unwrap(), &functions).unwrap(); + assert_eq!(interval, Some(300_000)); + } else { + panic!("expected GROUP BY expressions"); + } + } + + #[test] + fn resolve_group_by_plain_column_not_time_bucket() { + let select = parse_select("SELECT qtype, COUNT(*) FROM t GROUP BY qtype"); + let functions = FunctionRegistry::new(); + + if let GroupByExpr::Expressions(exprs, _) = &select.group_by { + let resolved = resolve_group_by_expr(&exprs[0], &select.projection); + // 'qtype' is not an alias in SELECT, so resolve returns None. + assert!(resolved.is_none()); + let interval = try_extract_time_bucket(&exprs[0], &functions).unwrap(); + assert_eq!(interval, None); + } else { + panic!("expected GROUP BY expressions"); + } + } } diff --git a/nodedb/src/bridge/physical_plan/timeseries.rs b/nodedb/src/bridge/physical_plan/timeseries.rs index 4016b8f5..a9e30b52 100644 --- a/nodedb/src/bridge/physical_plan/timeseries.rs +++ b/nodedb/src/bridge/physical_plan/timeseries.rs @@ -26,6 +26,9 @@ pub enum TimeseriesOp { /// Empty = no gap-fill. Otherwise: "null", "prev", "linear", or literal value. /// Only applied when `bucket_interval_ms > 0`. gap_fill: String, + /// Serialized `Vec` for scalar projection expressions + /// (e.g. `time_bucket('1h', timestamp)`). Applied per-row in raw scan mode. + computed_columns: Vec, /// RLS post-scan filters (applied after time-range pruning). rls_filters: Vec, }, diff --git a/nodedb/src/control/planner/auto_tier.rs b/nodedb/src/control/planner/auto_tier.rs index 8098c28a..5ff60ed9 100644 --- a/nodedb/src/control/planner/auto_tier.rs +++ b/nodedb/src/control/planner/auto_tier.rs @@ -223,6 +223,7 @@ fn build_scan_task( group_by: group_by.to_vec(), aggregates: aggregates.to_vec(), gap_fill: gap_fill.to_string(), + computed_columns: Vec::new(), rls_filters: Vec::new(), }), post_set_op: PostSetOp::None, diff --git a/nodedb/src/control/planner/sql_plan_convert/aggregate.rs b/nodedb/src/control/planner/sql_plan_convert/aggregate.rs index b5d6afa2..17d057c3 100644 --- a/nodedb/src/control/planner/sql_plan_convert/aggregate.rs +++ b/nodedb/src/control/planner/sql_plan_convert/aggregate.rs @@ -110,6 +110,7 @@ pub(super) fn convert_aggregate( group_by: group_strs, aggregates: agg_pairs, gap_fill: String::new(), + computed_columns: Vec::new(), rls_filters: Vec::new(), }), post_set_op: PostSetOp::None, diff --git a/nodedb/src/control/planner/sql_plan_convert/scan.rs b/nodedb/src/control/planner/sql_plan_convert/scan.rs index 5932f136..7d2c7d6c 100644 --- a/nodedb/src/control/planner/sql_plan_convert/scan.rs +++ b/nodedb/src/control/planner/sql_plan_convert/scan.rs @@ -55,6 +55,7 @@ pub(super) fn convert_scan(p: ScanParams<'_>) -> crate::Result group_by: Vec::new(), aggregates: Vec::new(), gap_fill: String::new(), + computed_columns: computed_bytes, rls_filters: Vec::new(), }) } @@ -236,6 +237,7 @@ pub(super) fn convert_timeseries_scan( group_by: group_by.to_vec(), aggregates: agg_pairs, gap_fill: gap_fill.to_string(), + computed_columns: Vec::new(), rls_filters: Vec::new(), }), post_set_op: PostSetOp::None, diff --git a/nodedb/src/control/server/native/dispatch/plan_builder/timeseries.rs b/nodedb/src/control/server/native/dispatch/plan_builder/timeseries.rs index 88a54f11..9d22c623 100644 --- a/nodedb/src/control/server/native/dispatch/plan_builder/timeseries.rs +++ b/nodedb/src/control/server/native/dispatch/plan_builder/timeseries.rs @@ -29,6 +29,7 @@ pub(crate) fn build_scan(fields: &TextFields, collection: &str) -> crate::Result group_by: Vec::new(), aggregates: Vec::new(), gap_fill: String::new(), + computed_columns: Vec::new(), rls_filters: Vec::new(), })) } diff --git a/nodedb/src/data/executor/dispatch/other.rs b/nodedb/src/data/executor/dispatch/other.rs index 23758b40..d2084477 100644 --- a/nodedb/src/data/executor/dispatch/other.rs +++ b/nodedb/src/data/executor/dispatch/other.rs @@ -486,6 +486,7 @@ impl CoreLoop { group_by, aggregates, gap_fill, + computed_columns, .. }) => { let scoped_coll = format!("{tid}:{collection}"); @@ -500,6 +501,7 @@ impl CoreLoop { group_by, aggregates, gap_fill, + computed_columns, }, ) } diff --git a/nodedb/src/data/executor/handlers/point.rs b/nodedb/src/data/executor/handlers/point.rs index e8041369..27e010f6 100644 --- a/nodedb/src/data/executor/handlers/point.rs +++ b/nodedb/src/data/executor/handlers/point.rs @@ -652,34 +652,33 @@ impl CoreLoop { schemaless_keys.push((bare_key, "embedding".to_string())); } - if !schemaless_keys.is_empty() { - if let Ok(ndb_val) = nodedb_types::value_from_msgpack(value) - && let nodedb_types::Value::Object(ref obj) = ndb_val - { - for (params_key, field_name) in &schemaless_keys { - if let Some(nodedb_types::Value::Array(arr)) = obj.get(field_name) { - let floats: Vec = arr - .iter() - .filter_map(|v| match v { - nodedb_types::Value::Float(f) => Some(*f as f32), - nodedb_types::Value::Integer(i) => Some(*i as f32), - _ => None, - }) - .collect(); - if !floats.is_empty() { - let params = self - .vector_params - .get(params_key) - .cloned() - .unwrap_or_default(); - // Use field-qualified key so search can find it. - let store_key = Self::vector_index_key(tid, collection, field_name); - let coll = - self.vector_collections.entry(store_key).or_insert_with(|| { - nodedb_vector::VectorCollection::new(floats.len(), params) - }); - coll.insert_with_doc_id(floats, document_id.to_string()); - } + if !schemaless_keys.is_empty() + && let Ok(ndb_val) = nodedb_types::value_from_msgpack(value) + && let nodedb_types::Value::Object(ref obj) = ndb_val + { + for (params_key, field_name) in &schemaless_keys { + if let Some(nodedb_types::Value::Array(arr)) = obj.get(field_name) { + let floats: Vec = arr + .iter() + .filter_map(|v| match v { + nodedb_types::Value::Float(f) => Some(*f as f32), + nodedb_types::Value::Integer(i) => Some(*i as f32), + _ => None, + }) + .collect(); + if !floats.is_empty() { + let params = self + .vector_params + .get(params_key) + .cloned() + .unwrap_or_default(); + // Use field-qualified key so search can find it. + let store_key = Self::vector_index_key(tid, collection, field_name); + let coll = + self.vector_collections.entry(store_key).or_insert_with(|| { + nodedb_vector::VectorCollection::new(floats.len(), params) + }); + coll.insert_with_doc_id(floats, document_id.to_string()); } } } diff --git a/nodedb/src/data/executor/handlers/timeseries/mod.rs b/nodedb/src/data/executor/handlers/timeseries/mod.rs index 113632c3..ee6dfda4 100644 --- a/nodedb/src/data/executor/handlers/timeseries/mod.rs +++ b/nodedb/src/data/executor/handlers/timeseries/mod.rs @@ -24,6 +24,8 @@ pub(in crate::data::executor) struct TimeseriesScanParams<'a> { pub aggregates: &'a [(String, String)], /// Gap-fill strategy. Empty = no gap-fill. pub gap_fill: &'a str, + /// Serialized computed columns for scalar projection expressions. + pub computed_columns: &'a [u8], } impl CoreLoop { @@ -47,6 +49,7 @@ impl CoreLoop { group_by, aggregates, gap_fill, + computed_columns, } = params; // Lazy-load partition registry from disk if not yet loaded. @@ -112,14 +115,15 @@ impl CoreLoop { &needed_columns, ) } else { - self.execute_ts_raw_scan( + self.execute_ts_raw_scan(raw_scan::RawScanParams { task, collection, time_range, limit, - &filter_predicates, + filter_predicates: &filter_predicates, has_filters, - ) + computed_columns, + }) } } diff --git a/nodedb/src/data/executor/handlers/timeseries/raw_scan.rs b/nodedb/src/data/executor/handlers/timeseries/raw_scan.rs index 0a0261f9..2535281c 100644 --- a/nodedb/src/data/executor/handlers/timeseries/raw_scan.rs +++ b/nodedb/src/data/executor/handlers/timeseries/raw_scan.rs @@ -13,17 +13,32 @@ use crate::engine::timeseries::columnar_segment::ColumnarSegmentReader; use super::super::columnar_filter; +/// Parameters for a timeseries raw scan (no aggregation). +pub(in crate::data::executor) struct RawScanParams<'a> { + pub task: &'a ExecutionTask, + pub collection: &'a str, + pub time_range: (i64, i64), + pub limit: usize, + pub filter_predicates: &'a [crate::bridge::scan_filter::ScanFilter], + pub has_filters: bool, + pub computed_columns: &'a [u8], +} + impl CoreLoop { /// Raw scan mode: emit rows from memtable + partitions. pub(in crate::data::executor) fn execute_ts_raw_scan( &self, - task: &ExecutionTask, - collection: &str, - time_range: (i64, i64), - limit: usize, - filter_predicates: &[crate::bridge::scan_filter::ScanFilter], - has_filters: bool, + params: RawScanParams<'_>, ) -> Response { + let RawScanParams { + task, + collection, + time_range, + limit, + filter_predicates, + has_filters, + computed_columns: computed_columns_bytes, + } = params; let mut results: Vec = Vec::new(); // 1. Read from memtable. @@ -101,6 +116,22 @@ impl CoreLoop { } } + // Apply computed columns (e.g. time_bucket) if present. + let results = if !computed_columns_bytes.is_empty() { + let computed_cols: Vec = + zerompk::from_msgpack(computed_columns_bytes).unwrap_or_default(); + if computed_cols.is_empty() { + results + } else { + results + .into_iter() + .map(|row| apply_computed_columns_rmpv(row, &computed_cols)) + .collect() + } + } else { + results + }; + let array = rmpv::Value::Array(results); let mut buf = Vec::new(); rmpv::encode::write_value(&mut buf, &array).unwrap_or(()); @@ -371,6 +402,70 @@ fn extract_timestamp(row: &rmpv::Value) -> i64 { 0 } +/// Apply computed column expressions to an rmpv row. +/// +/// Converts the row to `nodedb_types::Value` for expression evaluation, +/// then produces a new row containing only the computed columns. +/// When computed columns are present, the output contains ONLY +/// computed columns (matching Document engine behavior for projection). +fn apply_computed_columns_rmpv( + row: rmpv::Value, + computed_cols: &[crate::bridge::expr_eval::ComputedColumn], +) -> rmpv::Value { + let doc = rmpv_to_nodedb_value(&row); + let mut fields: Vec<(rmpv::Value, rmpv::Value)> = Vec::with_capacity(computed_cols.len()); + for cc in computed_cols { + let result = cc.expr.eval(&doc); + fields.push(( + rmpv::Value::String(cc.alias.as_str().into()), + nodedb_value_to_rmpv(&result), + )); + } + rmpv::Value::Map(fields) +} + +/// Convert rmpv row to nodedb_types::Value for expression evaluation. +fn rmpv_to_nodedb_value(row: &rmpv::Value) -> nodedb_types::Value { + match row { + rmpv::Value::Map(fields) => { + let mut map = std::collections::HashMap::new(); + for (k, v) in fields { + let key = match k { + rmpv::Value::String(s) => s.as_str().unwrap_or("").to_string(), + _ => continue, + }; + let val = match v { + rmpv::Value::Integer(n) => { + nodedb_types::Value::Integer(n.as_i64().unwrap_or(0)) + } + rmpv::Value::F64(f) => nodedb_types::Value::Float(*f), + rmpv::Value::String(s) => { + nodedb_types::Value::String(s.as_str().unwrap_or("").to_string()) + } + rmpv::Value::Nil => nodedb_types::Value::Null, + rmpv::Value::Boolean(b) => nodedb_types::Value::Bool(*b), + _ => nodedb_types::Value::Null, + }; + map.insert(key, val); + } + nodedb_types::Value::Object(map) + } + _ => nodedb_types::Value::Null, + } +} + +/// Convert nodedb_types::Value back to rmpv::Value for response encoding. +fn nodedb_value_to_rmpv(v: &nodedb_types::Value) -> rmpv::Value { + match v { + nodedb_types::Value::Integer(n) => rmpv::Value::Integer((*n).into()), + nodedb_types::Value::Float(f) => rmpv::Value::F64(*f), + nodedb_types::Value::String(s) => rmpv::Value::String(s.as_str().into()), + nodedb_types::Value::Bool(b) => rmpv::Value::Boolean(*b), + nodedb_types::Value::Null => rmpv::Value::Nil, + _ => rmpv::Value::Nil, + } +} + /// Convert rmpv::Value row to serde_json::Value (fallback for complex filter eval). fn rmpv_to_json_value(row: &rmpv::Value) -> serde_json::Value { match row { diff --git a/nodedb/src/event/alert/executor.rs b/nodedb/src/event/alert/executor.rs index 195ea335..6513d9ac 100644 --- a/nodedb/src/event/alert/executor.rs +++ b/nodedb/src/event/alert/executor.rs @@ -170,6 +170,7 @@ async fn execute_aggregate_scan( alert.condition.column.clone(), )], gap_fill: String::new(), + computed_columns: Vec::new(), rls_filters: Vec::new(), });