Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions java/lance-jni/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ impl FromJObjectWithEnv<Fragment> for JObject<'_> {
row_id_meta,
created_at_version_meta: None,
last_updated_at_version_meta: None,
clustering_metadata: None,
})
}
}
Expand Down
23 changes: 23 additions & 0 deletions protos/table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,16 @@ message Manifest {

// The branch of the dataset. None means main branch.
optional string branch = 20;

message ClusteringFormat {
// The clustering provider (e.g. "hilbert", "zorder").
string provider = 1;
// The columns by which data should be clustered.
repeated string columns = 2;
}

// The clustering format for this dataset.
optional ClusteringFormat clustering_format = 22;
} // Manifest

// external dataset base path
Expand Down Expand Up @@ -332,8 +342,21 @@ message DataFragment {
// deletion tombstones. To compute the current number of rows, subtract
// `deletion_file.num_deleted_rows` from this value.
uint64 physical_rows = 4;

// Clustering metadata for this fragment, if it was produced by a clustered compaction.
optional ClusteringMetadata clustering_metadata = 11;
}

// Metadata about how a fragment's data was clustered during compaction.
message ClusteringMetadata {
// The columns by which this fragment's data is clustered.
repeated string columns = 1;
// The provider that performed the clustering (e.g. "hilbert", "zorder").
string provider = 2;
// A UUID identifying the compaction group — all fragments written in the same
// compaction task share this value.
string group = 3;

message DataFile {
// Path to the root relative to the dataset's URI.
string path = 1;
Expand Down
1 change: 1 addition & 0 deletions python/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,7 @@ impl FromPyObject<'_> for PyLance<Fragment> {
row_id_meta,
last_updated_at_version_meta,
created_at_version_meta,
clustering_metadata: None,
}))
}
}
Expand Down
4 changes: 2 additions & 2 deletions rust/lance-table/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ pub use fragment::*;
pub use index::IndexMetadata;

pub use manifest::{
BasePath, DETACHED_VERSION_MASK, DataStorageFormat, Manifest, SelfDescribingFileReader,
WriterVersion, is_detached_version,
BasePath, ClusteringFormat, DETACHED_VERSION_MASK, DataStorageFormat, Manifest,
SelfDescribingFileReader, WriterVersion, is_detached_version,
};
pub use transaction::Transaction;

Expand Down
54 changes: 54 additions & 0 deletions rust/lance-table/src/format/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,18 @@ impl TryFrom<pb::data_fragment::RowIdSequence> for RowIdMeta {
}
}

/// Metadata about how a fragment's data was clustered during compaction.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
pub struct ClusteringMetadata {
/// The columns by which this fragment's data is clustered.
pub columns: Vec<String>,
/// The provider that performed the clustering (e.g. "hilbert", "zorder").
pub provider: String,
/// A UUID identifying the compaction group — all fragments written in the same
/// compaction task share this value.
pub group: String,
}

/// Data fragment.
///
/// A fragment is a set of files which represent the different columns of the same rows.
Expand Down Expand Up @@ -303,6 +315,10 @@ pub struct Fragment {
/// Created at version metadata
#[serde(skip_serializing_if = "Option::is_none")]
pub created_at_version_meta: Option<RowDatasetVersionMeta>,

/// Clustering metadata for this fragment, if it was produced by a clustered compaction.
#[serde(skip_serializing_if = "Option::is_none")]
pub clustering_metadata: Option<ClusteringMetadata>,
}

impl Fragment {
Expand All @@ -315,6 +331,7 @@ impl Fragment {
physical_rows: None,
last_updated_at_version_meta: None,
created_at_version_meta: None,
clustering_metadata: None,
}
}

Expand Down Expand Up @@ -354,6 +371,7 @@ impl Fragment {
row_id_meta: None,
last_updated_at_version_meta: None,
created_at_version_meta: None,
clustering_metadata: None,
}
}

Expand Down Expand Up @@ -480,6 +498,11 @@ impl TryFrom<pb::DataFragment> for Fragment {
.created_at_version_sequence
.map(RowDatasetVersionMeta::try_from)
.transpose()?,
clustering_metadata: p.clustering_metadata.map(|cm| ClusteringMetadata {
columns: cm.columns,
provider: cm.provider,
group: cm.group,
}),
})
}
}
Expand Down Expand Up @@ -521,6 +544,14 @@ impl From<&Fragment> for pb::DataFragment {
physical_rows: f.physical_rows.unwrap_or_default() as u64,
last_updated_at_version_sequence,
created_at_version_sequence,
clustering_metadata: f
.clustering_metadata
.as_ref()
.map(|cm| pb::ClusteringMetadata {
columns: cm.columns.clone(),
provider: cm.provider.clone(),
group: cm.group.clone(),
}),
}
}
}
Expand Down Expand Up @@ -638,4 +669,27 @@ mod tests {
.validate(&base_path)
.expect("validation should allow extra columns without field ids");
}

#[test]
fn test_roundtrip_fragment_with_clustering_metadata() {
let mut fragment = Fragment::new(42);
let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
fragment.add_file_legacy("clustered.lance", &Schema::try_from(&schema).unwrap());
fragment.physical_rows = Some(100);
fragment.clustering_metadata = Some(ClusteringMetadata {
columns: vec!["col_a".to_string(), "col_b".to_string()],
provider: "hilbert".to_string(),
group: "550e8400-e29b-41d4-a716-446655440000".to_string(),
});

let proto = pb::DataFragment::from(&fragment);
let roundtripped = Fragment::try_from(proto).unwrap();
assert_eq!(fragment, roundtripped);

// Also verify None round-trips correctly
fragment.clustering_metadata = None;
let proto = pb::DataFragment::from(&fragment);
let roundtripped = Fragment::try_from(proto).unwrap();
assert_eq!(fragment, roundtripped);
}
}
61 changes: 61 additions & 0 deletions rust/lance-table/src/format/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ use lance_core::datatypes::Schema;
use lance_core::{Error, Result};
use lance_io::object_store::{ObjectStore, ObjectStoreRegistry};
use lance_io::utils::read_struct;
use serde::{Deserialize, Serialize};
use snafu::location;

/// Clustering configuration for a dataset.
///
/// Specifies which columns and provider to use when clustering data.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
pub struct ClusteringFormat {
/// The clustering provider (e.g. "hilbert", "zorder").
pub provider: String,
/// The columns by which data should be clustered.
pub columns: Vec<String>,
}

/// Manifest of a dataset
///
Expand Down Expand Up @@ -98,6 +111,9 @@ pub struct Manifest {
/// is used to tell libraries how to read, write, or manage the table.
pub table_metadata: HashMap<String, String>,

/// The clustering format for this dataset.
pub clustering_format: Option<ClusteringFormat>,

/* external base paths */
pub base_paths: HashMap<u32, BasePath>,
}
Expand Down Expand Up @@ -194,6 +210,7 @@ impl Manifest {
data_storage_format,
config: HashMap::new(),
table_metadata: HashMap::new(),
clustering_format: None,
base_paths,
}
}
Expand Down Expand Up @@ -225,6 +242,7 @@ impl Manifest {
data_storage_format: previous.data_storage_format.clone(),
config: previous.config.clone(),
table_metadata: previous.table_metadata.clone(),
clustering_format: previous.clustering_format.clone(),
base_paths: previous.base_paths.clone(),
}
}
Expand Down Expand Up @@ -288,6 +306,7 @@ impl Manifest {
base_paths
},
table_metadata: self.table_metadata.clone(),
clustering_format: self.clustering_format.clone(),
}
}

Expand Down Expand Up @@ -924,6 +943,10 @@ impl TryFrom<pb::Manifest> for Manifest {
data_storage_format,
config: p.config,
table_metadata: p.table_metadata,
clustering_format: p.clustering_format.map(|cf| ClusteringFormat {
provider: cf.provider,
columns: cf.columns,
}),
base_paths: p
.base_paths
.iter()
Expand Down Expand Up @@ -992,6 +1015,12 @@ impl From<&Manifest> for pb::Manifest {
})
.collect(),
transaction_section: m.transaction_section.map(|i| i as u64),
clustering_format: m.clustering_format.as_ref().map(|cf| {
pb::manifest::ClusteringFormat {
provider: cf.provider.clone(),
columns: cf.columns.clone(),
}
}),
}
}
}
Expand Down Expand Up @@ -1319,6 +1348,7 @@ mod tests {
physical_rows: None,
created_at_version_meta: None,
last_updated_at_version_meta: None,
clustering_metadata: None,
},
Fragment {
id: 1,
Expand All @@ -1331,6 +1361,7 @@ mod tests {
physical_rows: None,
created_at_version_meta: None,
last_updated_at_version_meta: None,
clustering_metadata: None,
},
];

Expand Down Expand Up @@ -1485,4 +1516,34 @@ mod tests {
let stats_map: BTreeMap<String, String> = deletion_summary.into();
assert_eq!(stats_map.len(), 7)
}

#[test]
fn test_roundtrip_manifest_clustering_format() {
let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
"a",
arrow_schema::DataType::Int64,
false,
)]);
let schema = Schema::try_from(&arrow_schema).unwrap();
let mut manifest = Manifest::new(
schema,
Arc::new(vec![]),
DataStorageFormat::default(),
HashMap::new(),
);
manifest.clustering_format = Some(ClusteringFormat {
provider: "hilbert".to_string(),
columns: vec!["col_a".to_string(), "col_b".to_string()],
});

let proto = pb::Manifest::from(&manifest);
let roundtripped = Manifest::try_from(proto).unwrap();
assert_eq!(manifest.clustering_format, roundtripped.clustering_format);

// Also verify None round-trips correctly
manifest.clustering_format = None;
let proto = pb::Manifest::from(&manifest);
let roundtripped = Manifest::try_from(proto).unwrap();
assert_eq!(roundtripped.clustering_format, None);
}
}
Loading
Loading