Skip to content

Aggregate Functions#21

Open
gatesn wants to merge 19 commits intodevelopfrom
ngates/aggregates
Open

Aggregate Functions#21
gatesn wants to merge 19 commits intodevelopfrom
ngates/aggregates

Conversation

@gatesn
Copy link
Contributor

@gatesn gatesn commented Feb 28, 2026

No description provided.

Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
let agg = &options.aggregate_fn;

// Try encoding-specific fast path first.
if let Some(states) = list.elements().aggregate_list(&list, agg)? {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also the wrong type

let list = args.inputs[0].to_listview()?;
let agg = &options.aggregate_fn;

// Try encoding-specific fast path first.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do the encoding specific stuff happen?

Comment on lines +56 to +62
fn accumulate_list(&mut self, list: &ListViewArray) -> VortexResult<()> {
for i in 0..list.len() {
self.accumulate(&list.list_elements_at(i)?)?;
self.flush()?;
}
Ok(())
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might want to use a array + offset + len, approach to avoid list construction at each step?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean each step?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I way thinking as you do pushdown or reduce you will need to unwrap the elements, unwrap an encodings and wrap that up with offset + len

Comment on lines +323 to +324
**Self-reduce** (`ScalarFnVTable::reduce`): constant list folding, count from list sizes,
min/max from statistics, sum of constant elements.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what method on Aggregate is called here?

**Parent-reduce** (encoding-specific): child encodings match on `ExactScalarFn<ListAggregate>`
to optimize specific aggregate + encoding combinations. For example:

- **Dict**: `ListAggregate(Min/Max, List(Dict(codes, values)))` pushes down to values.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the check applied for this?

like `list_sum`, `list_min`, etc. don't yet exist — and implementing each one separately would
duplicate the underlying aggregation logic.

The key observation is that a list column stored as `(offsets, elements)` is a pre-materialized
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

infrastructure enables serializing intermediate state for distributed execution. A
`state()` export method on `Accumulator` would complete this.

- **Aggregate push-down in Scan**: using reduce rules to push aggregates into `LayoutReader`,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for now ListScalarFn can't be pushed into scan either then, until we figure out how to pushdown aggregates into scans

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can? This comment is more about SELECT MIN(a + 10) FROM .../foo.vortex` being pushed into Vortex

gatesn added 3 commits March 2, 2026 21:36
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
Signed-off-by: Nicholas Gates <nick@nickgates.com>
/// Merge a partial state scalar into the current group state.
fn merge(
&self, options: &Self::Options, state: &mut Self::GroupState, partial: &Scalar,
) -> VortexResult<()>;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you define merge in this way? It could be (GroupState, GroupState) -> GroupState

```rust
pub trait DynAccumulator: Send {
fn accumulate(&mut self, batch: &ArrayRef) -> VortexResult<()>;
fn accumulate_list(&mut self, list: &ListViewArray) -> VortexResult<()>;
Copy link

@joseph-isaacs joseph-isaacs Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn accumulate_list(&mut self, list: &ListViewArray) -> VortexResult<()>;
fn accumulate_group(&mut self, list: &ListViewArray) -> VortexResult<()>;

fn accumulate(&mut self, batch: &ArrayRef) -> VortexResult<()>;
fn accumulate_list(&mut self, list: &ListViewArray) -> VortexResult<()>;
fn merge(&mut self, state: &Scalar) -> VortexResult<()>;
fn merge_list(&mut self, states: &ArrayRef) -> VortexResult<()>;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has the List DType?

| ------------ | ---------------------------------------- | ----------------------------------------- |
| `Sum` | `i64` (or widened input type) | `SumState::I64(Some(42))` |
| `Count` | `u64` | `u64` |
| `Min` | input element type | `MinState::I32(Some(3))` |

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has to be Optional incase there is no min

fn state_dtype(&self, options: &Self::Options, input_dtype: &DType) -> VortexResult<DType>;

fn identity(&self, options: &Self::Options, input_dtype: &DType)
-> VortexResult<Self::GroupState>;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like we actually want None? Since stats might actually be None not the Group identity?


/// Accumulate a canonical batch into the current group state.
fn accumulate(
&self, options: &Self::Options, state: &mut Self::GroupState, batch: &Canonical,
Copy link

@joseph-isaacs joseph-isaacs Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fallback and we have encoding specific kernels?

-> VortexResult<Self::GroupState>;

/// Accumulate a canonical batch into the current group state.
fn accumulate(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trying to pull out of stats happens here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants