Skip to content

Conversation

@zhidongqu-db
Copy link
Contributor

@zhidongqu-db zhidongqu-db commented Jan 27, 2026

What changes were proposed in this pull request?

This PR adds support for vector aggregation functions to Spark SQL, enabling element-wise sum and average computations across groups of vectors.

  • vector_sum(vectors) - Returns the element-wise sum of float vectors in a group. Each element in the result is the sum of the corresponding elements across all input vectors.
  • vector_avg(vectors) - Returns the element-wise average of float vectors in a group. Each element in the result is the arithmetic mean of the corresponding elements across all input vectors.

Key implementation details:

  • Type Safety: Functions accept only ARRAY for vectors. No implicit type casting is performed - passing ARRAY or ARRAY results in a DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE error.
  • Dimension Validation: All vectors in a group must have the same dimension; throws VECTOR_DIMENSION_MISMATCH error if dimensions do not match.
  • NULL Handling: NULL vectors are skipped in aggregation. Non-NULL vectors containing NULL elements are also treated as NULL and skipped.
  • Edge Cases: Returns NULL if all values in the group are invalid. Returns an empty array [] if all input vectors are empty.
  • Compact Buffer Storage: Aggregate state uses BINARY format (dim * 4 bytes) instead of ARRAY for more efficient storage without null field overhead.

This PR only includes SQL language support; DataFrame API will be added in a separate PR.

Why are the changes needed?

Vector aggregation functions are fundamental operations for:

  • Clustering workloads: Computing cluster centroids by averaging member vectors
  • RAG applications: Aggregating embeddings across document chunks
  • Distributed ML: Gradient accumulation and combining pre-aggregated vectors across partitions
  • Recommendation systems: Computing user preference vectors from interaction history

These functions complement the vector distance/similarity functions and are commonly available in other systems (Snowflake's VECTOR_SUM/VECTOR_AVG, PostgreSQL pgvector's SUM/AVG over vectors).

Does this PR introduce any user-facing change?

Yes, this PR introduces 2 new SQL aggregate functions:

-- Setup example table
CREATE TABLE vector_data (category STRING, embedding ARRAY<FLOAT>);
INSERT INTO vector_data VALUES 
      ('A', array(1.0F, 2.0F, 3.0F)), 
      ('A', array(4.0F, 5.0F, 6.0F)), 
      ('B', array(2.0F, 1.0F, 4.0F)), 
      ('B', array(3.0F, 2.0F, 1.0F));

-- Element-wise sum per category                                   
SELECT category, vector_sum(embedding) AS sum_vector 
  FROM vector_data  
  GROUP BY category 
  ORDER BY category; 
-- category: A, sum_vector: [5.0, 7.0, 9.0] 
-- category: B, sum_vector: [5.0, 3.0, 5.0] 

-- Element-wise average per category (centroid computation) 
SELECT category, vector_avg(embedding) AS centroid 
  FROM vector_data 
  GROUP BY category 
  ORDER BY category;
-- category: A, centroid: [2.5, 3.5, 4.5] 
-- category: B, centroid: [2.5, 1.5, 2.5] 

-- Scalar aggregation (no GROUP BY) 
SELECT vector_sum(embedding) AS total_sum, vector_avg(embedding) AS overall_centroid 
  FROM vector_data; 
-- total_sum: [10.0, 10.0, 14.0] 
-- overall_centroid: [2.5, 2.5, 3.5] 

-- NULL vectors are skipped 
INSERT INTO vector_data VALUES ('A', NULL); 
SELECT category, vector_avg(embedding) FROM vector_data WHERE category = 'A' GROUP BY category; 
-- Returns: [2.5, 3.5, 4.5] (unchanged, NULL skipped) 
                                                                     
-- Vectors with NULL elements are skipped 
INSERT INTO vector_data VALUES ('A', array(100.0F, NULL, 100.0F)); 
SELECT category, vector_avg(embedding) FROM vector_data WHERE 
category = 'A' GROUP BY category; 
-- Returns: [2.5, 3.5, 4.5] (unchanged, vector with NULL element skipped)

How was this patch tested?

SQL Golden File Tests: Added vector-agg.sql with test coverage:

  • Basic functionality tests for both vector_sum and vector_avg
  • GROUP BY aggregation and scalar aggregation (no GROUP BY)
  • Mathematical correctness validation
  • Empty vector handling (returns empty array)
  • NULL vector handling (skipped in aggregation)
  • NULL element within vector handling (entire vector skipped)
  • All-NULL group handling (returns NULL)
  • Dimension mismatch error cases
  • Type mismatch error cases
  • Single element vectors
  • Large vectors (16 elements)
  • Window function aggregation (PARTITION BY)
  • Special float values: NaN, Infinity, -Infinity (IEEE 754 propagation)

Unit tests: Added VectorAggSuite.scala to test aggregate lifecycle phases:

  • initialize(): Empty buffer returns null
  • update(): Single/multiple vectors, NULL handling, special floats
  • merge(): Two buffers, empty buffers, different counts (for weighted average)
  • eval(): Result extraction from binary buffer
  • Numerical stability test for running average algorithm

Was this patch authored or co-authored using generative AI tooling?

Yes, code assistance with Claude Opus 4.5 in combination with manual editing by the author.

@github-actions
Copy link

JIRA Issue Information

=== New Feature SPARK-55031 ===
Summary: Add support for vector_sum, vector_avg functions
Assignee: Zhidong Qu
Status: Open
Affected: ["4.2.0"]


This comment was automatically generated by GitHub Actions

@github-actions github-actions bot added the SQL label Jan 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant