diff --git a/java/lance-jni/src/fragment.rs b/java/lance-jni/src/fragment.rs index 05d71946a16..f33950ddcad 100644 --- a/java/lance-jni/src/fragment.rs +++ b/java/lance-jni/src/fragment.rs @@ -650,6 +650,7 @@ impl FromJObjectWithEnv for JObject<'_> { row_id_meta, created_at_version_meta: None, last_updated_at_version_meta: None, + clustering_metadata: None, }) } } diff --git a/protos/table.proto b/protos/table.proto index e7de867e46e..dccc59abab3 100644 --- a/protos/table.proto +++ b/protos/table.proto @@ -205,6 +205,16 @@ message Manifest { // The branch of the dataset. None means main branch. optional string branch = 20; + + message ClusteringFormat { + // The clustering provider (e.g. "hilbert", "zorder"). + string provider = 1; + // The columns by which data should be clustered. + repeated string columns = 2; + } + + // The clustering format for this dataset. + optional ClusteringFormat clustering_format = 22; } // Manifest // external dataset base path @@ -332,8 +342,21 @@ message DataFragment { // deletion tombstones. To compute the current number of rows, subtract // `deletion_file.num_deleted_rows` from this value. uint64 physical_rows = 4; + + // Clustering metadata for this fragment, if it was produced by a clustered compaction. + optional ClusteringMetadata clustering_metadata = 11; } +// Metadata about how a fragment's data was clustered during compaction. +message ClusteringMetadata { + // The columns by which this fragment's data is clustered. + repeated string columns = 1; + // The provider that performed the clustering (e.g. "hilbert", "zorder"). + string provider = 2; + // A UUID identifying the compaction group — all fragments written in the same + // compaction task share this value. + string group = 3; + message DataFile { // Path to the root relative to the dataset's URI. string path = 1; diff --git a/python/src/fragment.rs b/python/src/fragment.rs index aec4bfac58b..f70cf8624aa 100644 --- a/python/src/fragment.rs +++ b/python/src/fragment.rs @@ -766,6 +766,7 @@ impl FromPyObject<'_> for PyLance { row_id_meta, last_updated_at_version_meta, created_at_version_meta, + clustering_metadata: None, })) } } diff --git a/rust/lance-table/src/format.rs b/rust/lance-table/src/format.rs index db065199532..5d6e662c299 100644 --- a/rust/lance-table/src/format.rs +++ b/rust/lance-table/src/format.rs @@ -16,8 +16,8 @@ pub use fragment::*; pub use index::IndexMetadata; pub use manifest::{ - BasePath, DETACHED_VERSION_MASK, DataStorageFormat, Manifest, SelfDescribingFileReader, - WriterVersion, is_detached_version, + BasePath, ClusteringFormat, DETACHED_VERSION_MASK, DataStorageFormat, Manifest, + SelfDescribingFileReader, WriterVersion, is_detached_version, }; pub use transaction::Transaction; diff --git a/rust/lance-table/src/format/fragment.rs b/rust/lance-table/src/format/fragment.rs index 01c5b535f8e..0d95ef55878 100644 --- a/rust/lance-table/src/format/fragment.rs +++ b/rust/lance-table/src/format/fragment.rs @@ -271,6 +271,18 @@ impl TryFrom for RowIdMeta { } } +/// Metadata about how a fragment's data was clustered during compaction. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)] +pub struct ClusteringMetadata { + /// The columns by which this fragment's data is clustered. + pub columns: Vec, + /// The provider that performed the clustering (e.g. "hilbert", "zorder"). + pub provider: String, + /// A UUID identifying the compaction group — all fragments written in the same + /// compaction task share this value. + pub group: String, +} + /// Data fragment. /// /// A fragment is a set of files which represent the different columns of the same rows. @@ -303,6 +315,10 @@ pub struct Fragment { /// Created at version metadata #[serde(skip_serializing_if = "Option::is_none")] pub created_at_version_meta: Option, + + /// Clustering metadata for this fragment, if it was produced by a clustered compaction. + #[serde(skip_serializing_if = "Option::is_none")] + pub clustering_metadata: Option, } impl Fragment { @@ -315,6 +331,7 @@ impl Fragment { physical_rows: None, last_updated_at_version_meta: None, created_at_version_meta: None, + clustering_metadata: None, } } @@ -354,6 +371,7 @@ impl Fragment { row_id_meta: None, last_updated_at_version_meta: None, created_at_version_meta: None, + clustering_metadata: None, } } @@ -480,6 +498,11 @@ impl TryFrom for Fragment { .created_at_version_sequence .map(RowDatasetVersionMeta::try_from) .transpose()?, + clustering_metadata: p.clustering_metadata.map(|cm| ClusteringMetadata { + columns: cm.columns, + provider: cm.provider, + group: cm.group, + }), }) } } @@ -521,6 +544,14 @@ impl From<&Fragment> for pb::DataFragment { physical_rows: f.physical_rows.unwrap_or_default() as u64, last_updated_at_version_sequence, created_at_version_sequence, + clustering_metadata: f + .clustering_metadata + .as_ref() + .map(|cm| pb::ClusteringMetadata { + columns: cm.columns.clone(), + provider: cm.provider.clone(), + group: cm.group.clone(), + }), } } } @@ -638,4 +669,27 @@ mod tests { .validate(&base_path) .expect("validation should allow extra columns without field ids"); } + + #[test] + fn test_roundtrip_fragment_with_clustering_metadata() { + let mut fragment = Fragment::new(42); + let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]); + fragment.add_file_legacy("clustered.lance", &Schema::try_from(&schema).unwrap()); + fragment.physical_rows = Some(100); + fragment.clustering_metadata = Some(ClusteringMetadata { + columns: vec!["col_a".to_string(), "col_b".to_string()], + provider: "hilbert".to_string(), + group: "550e8400-e29b-41d4-a716-446655440000".to_string(), + }); + + let proto = pb::DataFragment::from(&fragment); + let roundtripped = Fragment::try_from(proto).unwrap(); + assert_eq!(fragment, roundtripped); + + // Also verify None round-trips correctly + fragment.clustering_metadata = None; + let proto = pb::DataFragment::from(&fragment); + let roundtripped = Fragment::try_from(proto).unwrap(); + assert_eq!(fragment, roundtripped); + } } diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index 71de80c547f..bb424e3e582 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -23,6 +23,19 @@ use lance_core::datatypes::Schema; use lance_core::{Error, Result}; use lance_io::object_store::{ObjectStore, ObjectStoreRegistry}; use lance_io::utils::read_struct; +use serde::{Deserialize, Serialize}; +use snafu::location; + +/// Clustering configuration for a dataset. +/// +/// Specifies which columns and provider to use when clustering data. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)] +pub struct ClusteringFormat { + /// The clustering provider (e.g. "hilbert", "zorder"). + pub provider: String, + /// The columns by which data should be clustered. + pub columns: Vec, +} /// Manifest of a dataset /// @@ -98,6 +111,9 @@ pub struct Manifest { /// is used to tell libraries how to read, write, or manage the table. pub table_metadata: HashMap, + /// The clustering format for this dataset. + pub clustering_format: Option, + /* external base paths */ pub base_paths: HashMap, } @@ -194,6 +210,7 @@ impl Manifest { data_storage_format, config: HashMap::new(), table_metadata: HashMap::new(), + clustering_format: None, base_paths, } } @@ -225,6 +242,7 @@ impl Manifest { data_storage_format: previous.data_storage_format.clone(), config: previous.config.clone(), table_metadata: previous.table_metadata.clone(), + clustering_format: previous.clustering_format.clone(), base_paths: previous.base_paths.clone(), } } @@ -288,6 +306,7 @@ impl Manifest { base_paths }, table_metadata: self.table_metadata.clone(), + clustering_format: self.clustering_format.clone(), } } @@ -924,6 +943,10 @@ impl TryFrom for Manifest { data_storage_format, config: p.config, table_metadata: p.table_metadata, + clustering_format: p.clustering_format.map(|cf| ClusteringFormat { + provider: cf.provider, + columns: cf.columns, + }), base_paths: p .base_paths .iter() @@ -992,6 +1015,12 @@ impl From<&Manifest> for pb::Manifest { }) .collect(), transaction_section: m.transaction_section.map(|i| i as u64), + clustering_format: m.clustering_format.as_ref().map(|cf| { + pb::manifest::ClusteringFormat { + provider: cf.provider.clone(), + columns: cf.columns.clone(), + } + }), } } } @@ -1319,6 +1348,7 @@ mod tests { physical_rows: None, created_at_version_meta: None, last_updated_at_version_meta: None, + clustering_metadata: None, }, Fragment { id: 1, @@ -1331,6 +1361,7 @@ mod tests { physical_rows: None, created_at_version_meta: None, last_updated_at_version_meta: None, + clustering_metadata: None, }, ]; @@ -1485,4 +1516,34 @@ mod tests { let stats_map: BTreeMap = deletion_summary.into(); assert_eq!(stats_map.len(), 7) } + + #[test] + fn test_roundtrip_manifest_clustering_format() { + let arrow_schema = ArrowSchema::new(vec![ArrowField::new( + "a", + arrow_schema::DataType::Int64, + false, + )]); + let schema = Schema::try_from(&arrow_schema).unwrap(); + let mut manifest = Manifest::new( + schema, + Arc::new(vec![]), + DataStorageFormat::default(), + HashMap::new(), + ); + manifest.clustering_format = Some(ClusteringFormat { + provider: "hilbert".to_string(), + columns: vec!["col_a".to_string(), "col_b".to_string()], + }); + + let proto = pb::Manifest::from(&manifest); + let roundtripped = Manifest::try_from(proto).unwrap(); + assert_eq!(manifest.clustering_format, roundtripped.clustering_format); + + // Also verify None round-trips correctly + manifest.clustering_format = None; + let proto = pb::Manifest::from(&manifest); + let roundtripped = Manifest::try_from(proto).unwrap(); + assert_eq!(roundtripped.clustering_format, None); + } } diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index 8bbc8d177cd..f9fe1663d53 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -111,12 +111,18 @@ use serde::{Deserialize, Serialize}; use tracing::info; mod binary_copy; +pub mod clustering; pub mod remapping; +pub mod rewrite_strategy; use crate::index::frag_reuse::build_new_frag_reuse_index; use crate::io::deletion::read_dataset_deletion_file; use binary_copy::rewrite_files_binary_copy; +pub use clustering::ClusteringRewriteStrategy; pub use remapping::{IgnoreRemap, IndexRemapper, IndexRemapperOptions, RemappedIndex}; +pub use rewrite_strategy::{ + ClusteringMode, DefaultRewriteStrategy, RewriteStrategy, RewriteStrategyConfig, +}; /// Controls how data is rewritten during compaction. #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] @@ -201,6 +207,15 @@ pub struct CompactionOptions { /// Controls how much data is read at once when performing binary copy. /// Defaults to 16MB (16 * 1024 * 1024). pub binary_copy_read_batch_bytes: Option, + /// The rewrite strategy to apply during compaction. + /// + /// When set to [`RewriteStrategyConfig::Clustering`], data will be sorted + /// or reordered by the specified columns instead of preserving insertion + /// order. Clustering forces the reencode path (binary copy cannot be used). + /// + /// Defaults to [`RewriteStrategyConfig::Default`] (preserve insertion order). + #[serde(default)] + pub rewrite_strategy: RewriteStrategyConfig, } #[allow(deprecated)] @@ -220,6 +235,7 @@ impl Default for CompactionOptions { enable_binary_copy: false, enable_binary_copy_force: false, binary_copy_read_batch_bytes: Some(16 * 1024 * 1024), + rewrite_strategy: RewriteStrategyConfig::Default, } } } @@ -567,6 +583,84 @@ impl CompactionPlanner for DefaultCompactionPlanner { } } +/// Compaction planner that selects all (or a filtered subset of) fragments +/// for clustering. Unlike [`DefaultCompactionPlanner`], which only selects +/// small or deletion-heavy fragments, this planner selects fragments for +/// data reorganization (sorting / z-ordering / hilbert ordering). +/// +/// The planner produces one task per chunk of approximately +/// `target_rows_per_fragment` rows so that memory usage stays bounded. +/// Within each task the [`RewriteStrategy`] (carried on the +/// [`CompactionOptions`]) determines how data is reordered. +#[derive(Debug, Clone)] +pub struct ClusteringCompactionPlanner { + options: CompactionOptions, + /// Optional filter: only include fragments with these IDs. + fragment_ids: Option>, +} + +impl ClusteringCompactionPlanner { + pub fn new(mut options: CompactionOptions, fragment_ids: Option>) -> Self { + options.validate(); + Self { + options, + fragment_ids, + } + } +} + +#[async_trait::async_trait] +impl CompactionPlanner for ClusteringCompactionPlanner { + async fn plan(&self, dataset: &Dataset) -> Result { + let all_fragments = dataset.get_fragments(); + let fragments: Vec = if let Some(ids) = &self.fragment_ids { + all_fragments + .into_iter() + .filter(|f| ids.contains(&(f.id() as u64))) + .map(|f| f.metadata) + .collect() + } else { + all_fragments.into_iter().map(|f| f.metadata).collect() + }; + + if fragments.is_empty() { + return Ok(CompactionPlan::new( + dataset.manifest.version, + self.options.clone(), + )); + } + + // Group fragments into tasks so each task has roughly + // target_rows_per_fragment rows. This keeps memory bounded when + // sorting. For exact global ordering the caller should set a very + // large target. + let target = self.options.target_rows_per_fragment as u64; + let mut plan = CompactionPlan::new(dataset.manifest.version, self.options.clone()); + let mut current_task = Vec::new(); + let mut current_rows: u64 = 0; + + for frag in fragments { + let frag_rows = frag.physical_rows.unwrap_or(0) as u64; + current_task.push(frag); + current_rows += frag_rows; + + if current_rows >= target { + plan.extend_tasks(std::iter::once(TaskData { + fragments: std::mem::take(&mut current_task), + })); + current_rows = 0; + } + } + if !current_task.is_empty() { + plan.extend_tasks(std::iter::once(TaskData { + fragments: current_task, + })); + } + + Ok(plan) + } +} + /// Compacts the files in the dataset without reordering them. /// /// By default, this does a few things: @@ -709,6 +803,10 @@ impl CompactionPlan { /// in-order reading. /// - `capture_row_ids`: When index remapping is needed, include and capture the /// `_rowid` column from the stream. +/// - `include_row_id_column`: When true, include `_rowid` as a regular column in +/// the stream without capturing it. Used by the clustering rewrite strategy so +/// that row IDs flow through the sort and can be captured afterwards. +/// Mutually exclusive with `capture_row_ids`. /// /// Returns: /// - `SendableRecordBatchStream`: The batch stream (with `_rowid` removed if captured) @@ -721,10 +819,15 @@ async fn prepare_reader( batch_size: Option, with_frags: bool, capture_row_ids: bool, + include_row_id_column: bool, ) -> Result<( SendableRecordBatchStream, Option>, )> { + debug_assert!( + !(capture_row_ids && include_row_id_column), + "capture_row_ids and include_row_id_column are mutually exclusive" + ); let mut scanner = dataset.scan(); let has_blob_columns = dataset .schema() @@ -741,8 +844,10 @@ async fn prepare_reader( .with_fragments(fragments.to_vec()) .scan_in_order(true); } - if capture_row_ids { + if capture_row_ids || include_row_id_column { scanner.with_row_id(); + } + if capture_row_ids { let data = SendableRecordBatchStream::from(scanner.try_into_stream().await?); let (data_no_row_ids, rx) = make_rowid_capture_stream(data, dataset.manifest.uses_stable_row_ids())?; @@ -1001,8 +1106,14 @@ async fn rewrite_files( num_rows, fragments.len() ); + let is_clustering = !options.rewrite_strategy.is_default(); let mode = options.compaction_mode(); - let can_binary_copy = can_use_binary_copy(dataset.as_ref(), options, &fragments).await; + // Clustering requires re-encoding (cannot binary-copy and sort). + let can_binary_copy = if is_clustering { + false + } else { + can_use_binary_copy(dataset.as_ref(), options, &fragments).await + }; if !can_binary_copy && matches!(mode, CompactionMode::ForceBinaryCopy) { return Err(Error::not_supported_source( format!("compaction task {}: binary copy is not supported", task_id).into(), @@ -1012,12 +1123,19 @@ async fn rewrite_files( let mut reader: Option = None; if !can_binary_copy { + // When clustering, we need _rowid to flow through the sort so we + // can capture the addresses in the sorted (output) order. So we + // include _rowid as a regular column and defer capture until after + // the rewrite strategy transforms the stream. + let capture_now = needs_remapping && !is_clustering; + let include_rowid_col = needs_remapping && is_clustering; let (prepared_reader, rx_initial) = prepare_reader( dataset.as_ref(), &fragments, options.batch_size, true, - needs_remapping, + capture_now, + include_rowid_col, ) .await?; row_ids_rx = rx_initial; @@ -1033,10 +1151,24 @@ async fn rewrite_files( num_rows, ); }); - reader = Some(Box::pin(RecordBatchStreamAdapter::new( - schema, - reader_with_progress, - ))); + let mut stream: SendableRecordBatchStream = + Box::pin(RecordBatchStreamAdapter::new(schema, reader_with_progress)); + + // Apply the rewrite strategy (sort / z-order / hilbert). + if is_clustering { + let strategy = options.rewrite_strategy.to_strategy()?; + stream = strategy.transform(stream).await?; + + // Now capture _rowid from the (reordered) stream. + if needs_remapping { + let (data_no_row_ids, rx) = + make_rowid_capture_stream(stream, dataset.manifest.uses_stable_row_ids())?; + stream = data_no_row_ids; + row_ids_rx = Some(rx); + } + } + + reader = Some(stream); } let mut params = WriteParams { @@ -1501,6 +1633,7 @@ mod tests { physical_rows: Some(0), last_updated_at_version_meta: None, created_at_version_meta: None, + clustering_metadata: None, }; let single_bin = CandidateBin { fragments: vec![fragment.clone()], diff --git a/rust/lance/src/dataset/optimize/clustering.rs b/rust/lance/src/dataset/optimize/clustering.rs new file mode 100644 index 00000000000..c570aefd2c6 --- /dev/null +++ b/rust/lance/src/dataset/optimize/clustering.rs @@ -0,0 +1,237 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Clustering rewrite strategy for compaction. +//! +//! Sorts data by one or more columns during compaction to improve query +//! locality. Three ordering modes are supported: +//! +//! * **Sort** – plain multi-column sort (ORDER BY col1, col2, …). +//! * **ZOrder** – bit-interleaved z-order curve (TODO: Phase 2). +//! * **Hilbert** – Hilbert curve ordering (TODO: Phase 3). +//! +//! The implementation follows the same DataFusion-based pattern used by +//! `lance-index`'s Hilbert sorter: wrap the input stream in [`OneShotExec`], +//! optionally add a computed column, sort via [`SortExec`], and execute with +//! [`execute_plan`]. + +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::execution::SendableRecordBatchStream; +use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::sorts::sort::SortExec; +use datafusion_physical_expr::expressions::Column as DFColumn; +use lance_core::{Error, Result}; +use lance_datafusion::exec::{LanceExecutionOptions, OneShotExec, execute_plan}; + +use super::rewrite_strategy::{ClusteringMode, RewriteStrategy}; + +/// Rewrite strategy that clusters data by the given columns. +pub struct ClusteringRewriteStrategy { + columns: Vec, + mode: ClusteringMode, +} + +impl ClusteringRewriteStrategy { + pub fn new(columns: Vec, mode: ClusteringMode) -> Result { + if columns.is_empty() { + return Err(Error::invalid_input( + "clustering requires at least one column", + )); + } + Ok(Self { columns, mode }) + } +} + +#[async_trait] +impl RewriteStrategy for ClusteringRewriteStrategy { + async fn transform( + &self, + data: SendableRecordBatchStream, + ) -> Result { + match self.mode { + ClusteringMode::Sort => self.sort_transform(data), + ClusteringMode::ZOrder => Err(Error::not_supported( + "z-order clustering is not yet implemented", + )), + ClusteringMode::Hilbert => Err(Error::not_supported( + "hilbert clustering is not yet implemented", + )), + } + } +} + +impl ClusteringRewriteStrategy { + /// Plain multi-column sort using DataFusion's SortExec. + fn sort_transform(&self, data: SendableRecordBatchStream) -> Result { + let schema = data.schema(); + let source = Arc::new(OneShotExec::new(data)); + + let sort_exprs: Vec = self + .columns + .iter() + .map(|col_name| { + let idx = schema.index_of(col_name).map_err(|_| { + Error::invalid_input(format!( + "clustering column \"{}\" not found in schema: {:?}", + col_name, + schema + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect::>() + )) + })?; + Ok(PhysicalSortExpr { + expr: Arc::new(DFColumn::new(col_name, idx)), + options: arrow_schema::SortOptions::default(), + }) + }) + .collect::>()?; + + let lex_ordering = LexOrdering::new(sort_exprs).ok_or_else(|| { + Error::invalid_input("clustering columns produced an empty sort ordering") + })?; + let sort_exec = Arc::new(SortExec::new( + lex_ordering, + source as Arc, + )); + + let sorted_stream = execute_plan( + sort_exec, + LanceExecutionOptions { + use_spilling: true, + ..Default::default() + }, + )?; + + Ok(sorted_stream) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + use datafusion::physical_plan::common::collect; + use std::sync::Arc; + + #[tokio::test] + async fn test_sort_single_column() { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Utf8, false), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![3, 1, 4, 1, 5])), + Arc::new(StringArray::from(vec!["c", "a", "d", "b", "e"])), + ], + ) + .unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let stream = lance_datafusion::utils::reader_to_stream(Box::new(reader)); + + let strategy = + ClusteringRewriteStrategy::new(vec!["id".to_string()], ClusteringMode::Sort).unwrap(); + let sorted = strategy.transform(stream).await.unwrap(); + let batches = collect(sorted).await.unwrap(); + + let ids: Vec = batches + .iter() + .flat_map(|b| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + }) + .collect(); + assert_eq!(ids, vec![1, 1, 3, 4, 5]); + } + + #[tokio::test] + async fn test_sort_multi_column() { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![2, 1, 2, 1])), + Arc::new(Int32Array::from(vec![2, 2, 1, 1])), + ], + ) + .unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let stream = lance_datafusion::utils::reader_to_stream(Box::new(reader)); + + let strategy = ClusteringRewriteStrategy::new( + vec!["a".to_string(), "b".to_string()], + ClusteringMode::Sort, + ) + .unwrap(); + let sorted = strategy.transform(stream).await.unwrap(); + let batches = collect(sorted).await.unwrap(); + + let a_vals: Vec = batches + .iter() + .flat_map(|b| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + }) + .collect(); + let b_vals: Vec = batches + .iter() + .flat_map(|b| { + b.column(1) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + }) + .collect(); + assert_eq!(a_vals, vec![1, 1, 2, 2]); + assert_eq!(b_vals, vec![1, 2, 1, 2]); + } + + #[tokio::test] + async fn test_empty_columns_rejected() { + let result = ClusteringRewriteStrategy::new(vec![], ClusteringMode::Sort); + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_invalid_column_name() { + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vec![1]))]) + .unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let stream = lance_datafusion::utils::reader_to_stream(Box::new(reader)); + + let strategy = + ClusteringRewriteStrategy::new(vec!["nonexistent".to_string()], ClusteringMode::Sort) + .unwrap(); + let result = strategy.transform(stream).await; + assert!(result.is_err()); + } +} diff --git a/rust/lance/src/dataset/optimize/rewrite_strategy.rs b/rust/lance/src/dataset/optimize/rewrite_strategy.rs new file mode 100644 index 00000000000..7f055ad858e --- /dev/null +++ b/rust/lance/src/dataset/optimize/rewrite_strategy.rs @@ -0,0 +1,83 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use async_trait::async_trait; +use datafusion::execution::SendableRecordBatchStream; +use lance_core::Result; +use serde::{Deserialize, Serialize}; + +/// Controls how data is reorganized during compaction rewrites. +/// +/// A rewrite strategy transforms the data stream between reading fragments +/// and writing new fragments. The default strategy passes data through +/// unchanged (preserving insertion order). A clustering strategy sorts or +/// reorders data for improved query locality. +#[async_trait] +pub trait RewriteStrategy: Send + Sync { + /// Transform the data stream before writing. + /// + /// The input stream contains all rows from the fragments being rewritten + /// (after deletion filtering). The output stream will be written to new + /// fragments. + async fn transform(&self, data: SendableRecordBatchStream) + -> Result; +} + +/// Serializable specification of a rewrite strategy. +/// +/// This enum is stored in [`super::CompactionOptions`] so that it can be +/// serialized alongside [`super::CompactionTask`] for distributed execution. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)] +pub enum RewriteStrategyConfig { + /// Pass data through unchanged (default / current behavior). + #[default] + Default, + /// Cluster data by the specified columns using the given ordering mode. + Clustering { + columns: Vec, + mode: ClusteringMode, + }, +} + +/// Algorithm used to order data during clustering. +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +pub enum ClusteringMode { + /// Sort rows by the given columns in order (like ORDER BY col1, col2, …). + Sort, + /// Z-order (bit-interleave) across columns for multi-dimensional locality. + ZOrder, + /// Hilbert curve ordering across columns for multi-dimensional locality. + Hilbert, +} + +impl RewriteStrategyConfig { + /// Instantiate the concrete [`RewriteStrategy`] described by this config. + pub fn to_strategy(&self) -> Result> { + match self { + Self::Default => Ok(Box::new(DefaultRewriteStrategy)), + Self::Clustering { columns, mode } => { + let strategy = + super::clustering::ClusteringRewriteStrategy::new(columns.clone(), *mode)?; + Ok(Box::new(strategy)) + } + } + } + + /// Returns true if this is the default (passthrough) strategy. + pub fn is_default(&self) -> bool { + matches!(self, Self::Default) + } +} + +/// The default rewrite strategy: passes data through unchanged. +pub struct DefaultRewriteStrategy; + +#[async_trait] +impl RewriteStrategy for DefaultRewriteStrategy { + async fn transform( + &self, + data: SendableRecordBatchStream, + ) -> Result { + Ok(data) + } +} diff --git a/rust/lance/src/dataset/schema_evolution.rs b/rust/lance/src/dataset/schema_evolution.rs index b79cb283956..8e6e0858594 100644 --- a/rust/lance/src/dataset/schema_evolution.rs +++ b/rust/lance/src/dataset/schema_evolution.rs @@ -1045,6 +1045,7 @@ mod test { physical_rows: Some(50), last_updated_at_version_meta: None, created_at_version_meta: None, + clustering_metadata: None, })) } else { Ok(None) diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index fc0d4f4dc19..b2ec9634839 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -3681,6 +3681,7 @@ mod tests { deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, + clustering_metadata: None, }]; let mut next_row_id = 0; @@ -3713,6 +3714,7 @@ mod tests { deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, + clustering_metadata: None, }]; let mut next_row_id = 100; @@ -3745,6 +3747,7 @@ mod tests { deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, + clustering_metadata: None, }]; let mut next_row_id = 100; @@ -3780,6 +3783,7 @@ mod tests { deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, + clustering_metadata: None, }]; let mut next_row_id = 100; @@ -3808,6 +3812,7 @@ mod tests { deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, + clustering_metadata: None, }, Fragment { id: 2, @@ -3817,6 +3822,7 @@ mod tests { deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, + clustering_metadata: None, }, ]; let mut next_row_id = 1000; @@ -3861,6 +3867,7 @@ mod tests { deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, + clustering_metadata: None, }]; let mut next_row_id = 0; diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index 0a85d27ce99..54af98f228e 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -497,6 +497,7 @@ mod tests { physical_rows: Some(10), last_updated_at_version_meta: None, created_at_version_meta: None, + clustering_metadata: None, } } diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index ebab7eb6b4d..5b9a4b0bd5e 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -1535,6 +1535,7 @@ mod tests { physical_rows: None, last_updated_at_version_meta: None, created_at_version_meta: None, + clustering_metadata: None, }, Fragment { id: 1, @@ -1547,6 +1548,7 @@ mod tests { physical_rows: None, last_updated_at_version_meta: None, created_at_version_meta: None, + clustering_metadata: None, }, ]; @@ -1584,6 +1586,7 @@ mod tests { physical_rows: None, last_updated_at_version_meta: None, created_at_version_meta: None, + clustering_metadata: None, }, Fragment { id: 1, @@ -1596,6 +1599,7 @@ mod tests { physical_rows: None, last_updated_at_version_meta: None, created_at_version_meta: None, + clustering_metadata: None, }, ]; assert_eq!(manifest.fragments.as_ref(), &expected_fragments); diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index f3b037aa02e..77352f471de 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -247,6 +247,7 @@ impl TestDatasetGenerator { physical_rows: Some(batch.num_rows()), last_updated_at_version_meta: None, created_at_version_meta: None, + clustering_metadata: None, } } }