diff --git a/java/lance-jni/src/fragment.rs b/java/lance-jni/src/fragment.rs index 05d71946a16..3fdad5eec25 100644 --- a/java/lance-jni/src/fragment.rs +++ b/java/lance-jni/src/fragment.rs @@ -5,7 +5,7 @@ use arrow::array::{RecordBatch, RecordBatchIterator, StructArray}; use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema, from_ffi_and_data_type}; use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream}; use arrow_schema::DataType; -use jni::objects::{JIntArray, JValue, JValueGen}; +use jni::objects::{JIntArray, JLongArray, JValue, JValueGen}; use jni::{ JNIEnv, objects::{JObject, JString}, @@ -444,7 +444,7 @@ const DELETE_FILE_CONSTRUCTOR_SIG: &str = "(JJLjava/lang/Long;Lorg/lance/fragment/DeletionFileType;Ljava/lang/Integer;)V"; const DELETE_FILE_TYPE_CLASS: &str = "org/lance/fragment/DeletionFileType"; const FRAGMENT_METADATA_CLASS: &str = "org/lance/FragmentMetadata"; -const FRAGMENT_METADATA_CONSTRUCTOR_SIG: &str = "(ILjava/util/List;Ljava/lang/Long;Lorg/lance/fragment/DeletionFile;Lorg/lance/fragment/RowIdMeta;)V"; +const FRAGMENT_METADATA_CONSTRUCTOR_SIG: &str = "(ILjava/util/List;Ljava/lang/Long;Lorg/lance/fragment/DeletionFile;Lorg/lance/fragment/RowIdMeta;[J)V"; const ROW_ID_META_CLASS: &str = "org/lance/fragment/RowIdMeta"; const ROW_ID_META_CONSTRUCTOR_SIG: &str = "(Ljava/lang/String;)V"; const FRAGMENT_MERGE_RESULT_CLASS: &str = "org/lance/fragment/FragmentMergeResult"; @@ -574,6 +574,18 @@ impl IntoJava for &Fragment { Some(m) => m.into_java(env)?, None => JObject::null(), }; + let pending_updated_row_offsets = match &self.pending_updated_row_offsets { + Some(offsets) => { + let java_offsets = env.new_long_array(offsets.len() as i32)?; + let java_offsets_slice = offsets + .iter() + .map(|offset| i64::from(*offset)) + .collect::>(); + env.set_long_array_region(&java_offsets, 0, &java_offsets_slice)?; + JObject::from(java_offsets) + } + None => JObject::null(), + }; env.new_object( FRAGMENT_METADATA_CLASS, @@ -584,6 +596,7 @@ impl IntoJava for &Fragment { JValueGen::Object(physical_rows), JValueGen::Object(&deletion_file), JValueGen::Object(&row_id_meta), + JValueGen::Object(&pending_updated_row_offsets), ], ) .map_err(|e| { @@ -642,6 +655,14 @@ impl FromJObjectWithEnv for JObject<'_> { } else { Some(row_id_meta.extract_object(env)?) }; + let pending_updated_row_offsets = env + .call_method(self, "getPendingUpdatedRowOffsets", "()[J", &[])? + .l()?; + let pending_updated_row_offsets = if pending_updated_row_offsets.is_null() { + None + } else { + Some(JLongArray::from(pending_updated_row_offsets).extract_object(env)?) + }; Ok(Fragment { id, files, @@ -650,6 +671,7 @@ impl FromJObjectWithEnv for JObject<'_> { row_id_meta, created_at_version_meta: None, last_updated_at_version_meta: None, + pending_updated_row_offsets, }) } } diff --git a/java/src/main/java/org/lance/FragmentMetadata.java b/java/src/main/java/org/lance/FragmentMetadata.java index 8bc701b6351..4e326d0965c 100644 --- a/java/src/main/java/org/lance/FragmentMetadata.java +++ b/java/src/main/java/org/lance/FragmentMetadata.java @@ -31,6 +31,7 @@ public class FragmentMetadata implements Serializable { private final long physicalRows; private final DeletionFile deletionFile; private final RowIdMeta rowIdMeta; + private final long[] pendingUpdatedRowOffsets; public FragmentMetadata( int id, @@ -38,11 +39,22 @@ public FragmentMetadata( Long physicalRows, DeletionFile deletionFile, RowIdMeta rowIdMeta) { + this(id, files, physicalRows, deletionFile, rowIdMeta, null); + } + + public FragmentMetadata( + int id, + List files, + Long physicalRows, + DeletionFile deletionFile, + RowIdMeta rowIdMeta, + long[] pendingUpdatedRowOffsets) { this.id = id; this.files = files; this.physicalRows = physicalRows; this.deletionFile = deletionFile; this.rowIdMeta = rowIdMeta; + this.pendingUpdatedRowOffsets = pendingUpdatedRowOffsets; } public int getId() { @@ -80,6 +92,13 @@ public RowIdMeta getRowIdMeta() { return rowIdMeta; } + /** + * Internal commit-time state used to preserve per-row rewrite metadata across JNI round-trips. + */ + public long[] getPendingUpdatedRowOffsets() { + return pendingUpdatedRowOffsets; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/java/src/test/java/org/lance/operation/UpdateTest.java b/java/src/test/java/org/lance/operation/UpdateTest.java index bb39a5f4d12..1b5cfe63045 100644 --- a/java/src/test/java/org/lance/operation/UpdateTest.java +++ b/java/src/test/java/org/lance/operation/UpdateTest.java @@ -21,9 +21,11 @@ import org.lance.Transaction; import org.lance.fragment.FragmentUpdateResult; import org.lance.ipc.LanceScanner; +import org.lance.ipc.ScanOptions; import org.lance.operation.Update.UpdateMode; import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.TimeStampSecTZVector; import org.apache.arrow.vector.VarCharVector; @@ -32,6 +34,10 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; @@ -110,7 +116,9 @@ void testUpdateColumns(@TempDir Path tempDir) throws Exception { try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { TestUtils.UpdateColumnTestDataset testDataset = new TestUtils.UpdateColumnTestDataset(allocator, datasetPath); - dataset = testDataset.createEmptyDataset(); + dataset = + testDataset.createDatasetWithWriteParams( + new org.lance.WriteParams.Builder().withEnableStableRowIds(true).build()); /* dataset content * _rowid | id | name | timeStamp | * 0: | 0 | "Person 0" | 0 | @@ -146,13 +154,14 @@ void testUpdateColumns(@TempDir Path tempDir) throws Exception { * 3: | null | null | */ FragmentUpdateResult updateResult = testDataset.updateColumn(targetFragment, updateRowCount); + FragmentMetadata serializedUpdatedFragment = + serializeAndDeserialize(updateResult.getUpdatedFragment()); try (Transaction updateTxn = new Transaction.Builder() .readVersion(dataset.version()) .operation( Update.builder() - .updatedFragments( - Collections.singletonList(updateResult.getUpdatedFragment())) + .updatedFragments(Collections.singletonList(serializedUpdatedFragment)) .fieldsModified(updateResult.getFieldsModified()) .build()) .build()) { @@ -160,10 +169,17 @@ void testUpdateColumns(@TempDir Path tempDir) throws Exception { assertEquals(3, dataset.version()); assertEquals(3, dataset.latestVersion()); Fragment fragment = dataset.getFragments().get(0); - try (LanceScanner scanner = fragment.newScan(rowCount)) { + try (LanceScanner scanner = + fragment.newScan( + new ScanOptions.Builder() + .batchSize((long) rowCount) + .columns( + Arrays.asList("id", "name", "timeStamp", "_row_last_updated_at_version")) + .build())) { List actualIds = new ArrayList<>(rowCount); List actualNames = new ArrayList<>(rowCount); List actualTimeStamps = new ArrayList<>(rowCount); + List actualLastUpdatedVersions = new ArrayList<>(rowCount); try (ArrowReader reader = scanner.scanBatches()) { while (reader.loadNextBatch()) { VectorSchemaRoot root = reader.getVectorSchemaRoot(); @@ -181,6 +197,14 @@ void testUpdateColumns(@TempDir Path tempDir) throws Exception { actualTimeStamps.add( timeStampVector.isNull(i) ? null : timeStampVector.getObject(i)); } + FieldVector lastUpdatedVector = + (FieldVector) root.getVector("_row_last_updated_at_version"); + for (int i = 0; i < lastUpdatedVector.getValueCount(); i++) { + Number lastUpdatedValue = + lastUpdatedVector.isNull(i) ? null : (Number) lastUpdatedVector.getObject(i); + actualLastUpdatedVersions.add( + lastUpdatedValue == null ? null : lastUpdatedValue.longValue()); + } } } /* result dataset content @@ -196,12 +220,27 @@ void testUpdateColumns(@TempDir Path tempDir) throws Exception { List expectNames = Arrays.asList("Update 0", null, "Update 2", null, "Person 4", null); List expectTimeStamps = Arrays.asList(0L, null, 2L, null, 4L, null); + List expectLastUpdatedVersions = Arrays.asList(3L, 3L, 3L, 3L, 2L, 2L); assertEquals(expectIds, actualIds); assertEquals(expectNames, actualNames); assertEquals(expectTimeStamps, actualTimeStamps); + assertEquals(expectLastUpdatedVersions, actualLastUpdatedVersions); } } } } } + + @SuppressWarnings("unchecked") + private static T serializeAndDeserialize(T object) throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (ObjectOutputStream out = new ObjectOutputStream(outputStream)) { + out.writeObject(object); + } + + try (ObjectInputStream in = + new ObjectInputStream(new ByteArrayInputStream(outputStream.toByteArray()))) { + return (T) in.readObject(); + } + } } diff --git a/python/python/lance/fragment.py b/python/python/lance/fragment.py index f8cff450f06..c820737fa50 100644 --- a/python/python/lance/fragment.py +++ b/python/python/lance/fragment.py @@ -84,6 +84,9 @@ class FragmentMetadata: row_id_meta: Optional[RowIdMeta] = None created_at_version_meta: Optional[RowDatasetVersionMeta] = None last_updated_at_version_meta: Optional[RowDatasetVersionMeta] = None + _pending_updated_row_offsets: Optional[List[int]] = field( + default=None, repr=False, compare=False + ) @property def num_deletions(self) -> int: @@ -118,7 +121,9 @@ def to_json(self) -> dict: self.deletion_file.asdict() if self.deletion_file is not None else None ), row_id_meta=( - self.row_id_meta.asdict() if self.row_id_meta is not None else None + json.loads(self.row_id_meta.json()) + if self.row_id_meta is not None + else None ), created_at_version_meta=( json.loads(self.created_at_version_meta.json()) @@ -130,6 +135,7 @@ def to_json(self) -> dict: if self.last_updated_at_version_meta is not None else None ), + _pending_updated_row_offsets=self._pending_updated_row_offsets, ) @staticmethod @@ -142,7 +148,7 @@ def from_json(json_data: str) -> FragmentMetadata: row_id_meta = json_data.get("row_id_meta") if row_id_meta is not None: - row_id_meta = RowIdMeta(**row_id_meta) + row_id_meta = RowIdMeta.from_json(json.dumps(row_id_meta)) created_at_version_meta = json_data.get("created_at_version_meta") if created_at_version_meta is not None: @@ -156,6 +162,8 @@ def from_json(json_data: str) -> FragmentMetadata: json.dumps(last_updated_at_version_meta) ) + pending_updated_row_offsets = json_data.get("_pending_updated_row_offsets") + return FragmentMetadata( id=json_data["id"], files=[DataFile(**f) for f in json_data["files"]], @@ -164,6 +172,7 @@ def from_json(json_data: str) -> FragmentMetadata: row_id_meta=row_id_meta, created_at_version_meta=created_at_version_meta, last_updated_at_version_meta=last_updated_at_version_meta, + _pending_updated_row_offsets=pending_updated_row_offsets, ) diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 3df324a13f7..5c0f4ee10d7 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -831,6 +831,31 @@ def test_take_version_system_columns(tmp_path: Path): assert created_at == [dataset.version] * 2 +def test_merge_insert_partial_rewrite_columns_preserves_last_updated_versions( + tmp_path: Path, +): + table = pa.table( + { + "id": [1, 2, 3, 4], + "value": [10, 20, 30, 40], + "tag": ["a", "b", "c", "d"], + } + ) + base_dir = tmp_path / "test_merge_insert_partial_rewrite_columns_versions" + dataset = lance.write_dataset(table, base_dir, enable_stable_row_ids=True) + + new_table = pa.table({"id": [2], "value": [200]}) + dataset.merge_insert("id").when_matched_update_all().execute(new_table) + + result = dataset.to_table( + columns=["id", "value", "tag", "_row_last_updated_at_version"] + ).to_pydict() + assert result["id"] == [1, 2, 3, 4] + assert result["value"] == [10, 200, 30, 40] + assert result["tag"] == ["a", "b", "c", "d"] + assert result["_row_last_updated_at_version"] == [1, 2, 1, 1] + + @pytest.mark.parametrize("indices", [[], [1, 1], [1, 1, 20, 20, 21], [21, 0, 21, 1, 0]]) def test_take_duplicate_index(tmp_path: Path, indices: List[int]): table = pa.table({"x": range(24)}) diff --git a/python/python/tests/test_fragment.py b/python/python/tests/test_fragment.py index 40e2a69df22..e776d2067eb 100644 --- a/python/python/tests/test_fragment.py +++ b/python/python/tests/test_fragment.py @@ -563,6 +563,204 @@ def test_fragment_update_columns_basic(tmp_path): assert result["id"] == [1, 2, 3, 4] # id column should remain unchanged +def test_fragment_update_columns_updates_last_updated_version(tmp_path): + data = pa.table( + { + "id": [1, 2, 3, 4], + "value": [10, 20, 30, 40], + } + ) + dataset_uri = tmp_path / "test_dataset_update_columns_last_updated" + dataset = lance.write_dataset(data, dataset_uri, enable_stable_row_ids=True) + + update_data = pa.table( + { + "_rowid": pa.array([0, 2], type=pa.uint64()), + "value": [100, 300], + } + ) + + fragment = dataset.get_fragment(0) + updated_fragment, fields_modified = fragment.update_columns(update_data) + + op = LanceOperation.Update( + updated_fragments=[updated_fragment], + fields_modified=fields_modified, + ) + updated_dataset = lance.LanceDataset.commit( + str(dataset_uri), op, read_version=dataset.version + ) + + result = updated_dataset.to_table( + columns=["value", "_row_last_updated_at_version"] + ).to_pydict() + assert result["value"] == [100, 20, 300, 40] + assert result["_row_last_updated_at_version"] == [2, 1, 2, 1] + + +def test_fragment_update_columns_preserves_last_updated_version_across_json(tmp_path): + data = pa.table( + { + "id": [1, 2, 3, 4], + "value": [10, 20, 30, 40], + } + ) + dataset_uri = tmp_path / "test_dataset_update_columns_last_updated_json" + dataset = lance.write_dataset(data, dataset_uri, enable_stable_row_ids=True) + + update_data = pa.table( + { + "_rowid": pa.array([0, 2], type=pa.uint64()), + "value": [100, 300], + } + ) + + fragment = dataset.get_fragment(0) + updated_fragment, fields_modified = fragment.update_columns(update_data) + updated_fragment = FragmentMetadata.from_json( + json.dumps(updated_fragment.to_json()) + ) + + op = LanceOperation.Update( + updated_fragments=[updated_fragment], + fields_modified=fields_modified, + ) + updated_dataset = lance.LanceDataset.commit( + str(dataset_uri), op, read_version=dataset.version + ) + + result = updated_dataset.to_table( + columns=["value", "_row_last_updated_at_version"] + ).to_pydict() + assert result["value"] == [100, 20, 300, 40] + assert result["_row_last_updated_at_version"] == [2, 1, 2, 1] + + +def test_fragment_update_columns_rewrite_columns_without_pending_offsets(tmp_path): + data = pa.table( + { + "id": [1, 2, 3, 4], + "value": [10, 20, 30, 40], + } + ) + dataset_uri = tmp_path / "test_dataset_update_columns_rewrite_columns_fallback" + dataset = lance.write_dataset(data, dataset_uri, enable_stable_row_ids=True) + + update_data = pa.table( + { + "_rowid": pa.array([0, 2], type=pa.uint64()), + "value": [100, 300], + } + ) + + fragment = dataset.get_fragment(0) + updated_fragment, fields_modified = fragment.update_columns(update_data) + updated_fragment = lance.fragment.FragmentMetadata( + id=updated_fragment.id, + files=updated_fragment.files, + physical_rows=updated_fragment.physical_rows, + deletion_file=updated_fragment.deletion_file, + row_id_meta=updated_fragment.row_id_meta, + created_at_version_meta=updated_fragment.created_at_version_meta, + last_updated_at_version_meta=updated_fragment.last_updated_at_version_meta, + ) + + op = LanceOperation.Update( + updated_fragments=[updated_fragment], + fields_modified=fields_modified, + update_mode="rewrite_columns", + ) + updated_dataset = lance.LanceDataset.commit( + str(dataset_uri), op, read_version=dataset.version + ) + + result = updated_dataset.to_table( + columns=["value", "_row_last_updated_at_version"] + ).to_pydict() + assert result["value"] == [100, 20, 300, 40] + assert result["_row_last_updated_at_version"] == [2, 2, 2, 2] + + +def test_fragment_update_columns_does_not_refresh_untouched_fragments(tmp_path): + data = pa.table( + { + "id": list(range(8)), + "value": list(range(10, 18)), + } + ) + dataset_uri = tmp_path / "test_dataset_update_columns_untouched_fragments" + dataset = lance.write_dataset( + data, + dataset_uri, + enable_stable_row_ids=True, + max_rows_per_file=4, + ) + + fragment = dataset.get_fragment(0) + updated_fragment, fields_modified = fragment.update_columns( + pa.table( + { + "_rowid": pa.array([0, 2], type=pa.uint64()), + "value": [100, 300], + } + ) + ) + + op = LanceOperation.Update( + updated_fragments=[updated_fragment], + fields_modified=fields_modified, + ) + updated_dataset = lance.LanceDataset.commit( + str(dataset_uri), op, read_version=dataset.version + ) + + result = updated_dataset.to_table( + columns=["id", "_row_last_updated_at_version"] + ).to_pydict() + assert result["id"] == list(range(8)) + assert result["_row_last_updated_at_version"] == [2, 1, 2, 1, 1, 1, 1, 1] + + +def test_fragment_update_columns_updates_non_zero_fragment_versions(tmp_path): + data = pa.table( + { + "id": list(range(8)), + "value": list(range(10, 18)), + } + ) + dataset_uri = tmp_path / "test_dataset_update_columns_non_zero_fragment" + dataset = lance.write_dataset( + data, + dataset_uri, + enable_stable_row_ids=True, + max_rows_per_file=4, + ) + + fragment = dataset.get_fragment(1) + updated_fragment, fields_modified = fragment.update_columns( + pa.table( + { + "_rowid": pa.array([4, 6], type=pa.uint64()), + "value": [140, 160], + } + ) + ) + + op = LanceOperation.Update( + updated_fragments=[updated_fragment], + fields_modified=fields_modified, + ) + updated_dataset = lance.LanceDataset.commit( + str(dataset_uri), op, read_version=dataset.version + ) + + result = updated_dataset.to_table( + columns=["value", "_row_last_updated_at_version"] + ).to_pydict() + assert result["value"] == [10, 11, 12, 13, 140, 15, 160, 17] + assert result["_row_last_updated_at_version"] == [1, 1, 1, 1, 2, 1, 2, 1] + + def test_fragment_update_columns_with_custom_join_key(tmp_path): """Test fragment update columns with custom join key.""" # Create initial dataset diff --git a/python/src/fragment.rs b/python/src/fragment.rs index aec4bfac58b..05d00916f60 100644 --- a/python/src/fragment.rs +++ b/python/src/fragment.rs @@ -757,6 +757,9 @@ impl FromPyObject<'_> for PyLance { let created_at_version_meta: Option> = ob.getattr("created_at_version_meta")?.extract()?; let created_at_version_meta = created_at_version_meta.map(|r| r.0.clone()); + let pending_updated_row_offsets = ob + .getattr("_pending_updated_row_offsets")? + .extract::>>()?; Ok(Self(Fragment { id: ob.getattr("id")?.extract()?, @@ -766,6 +769,7 @@ impl FromPyObject<'_> for PyLance { row_id_meta, last_updated_at_version_meta, created_at_version_meta, + pending_updated_row_offsets, })) } } @@ -798,6 +802,7 @@ impl<'py> IntoPyObject<'py> for PyLance<&Fragment> { .created_at_version_meta .as_ref() .map(|r| PyRowDatasetVersionMeta(r.clone())); + let pending_updated_row_offsets = self.0.pending_updated_row_offsets.clone(); cls.call1(( self.0.id, @@ -807,6 +812,7 @@ impl<'py> IntoPyObject<'py> for PyLance<&Fragment> { row_id_meta, created_at_version_meta, last_updated_at_version_meta, + pending_updated_row_offsets, )) } } diff --git a/rust/lance-table/src/format/fragment.rs b/rust/lance-table/src/format/fragment.rs index 01c5b535f8e..1ed601e2c32 100644 --- a/rust/lance-table/src/format/fragment.rs +++ b/rust/lance-table/src/format/fragment.rs @@ -303,6 +303,12 @@ pub struct Fragment { /// Created at version metadata #[serde(skip_serializing_if = "Option::is_none")] pub created_at_version_meta: Option, + + /// Physical row offsets updated by an in-flight column rewrite. + /// + /// This is transient commit-time state. It is not persisted in fragment metadata. + #[serde(skip)] + pub pending_updated_row_offsets: Option>, } impl Fragment { @@ -315,6 +321,7 @@ impl Fragment { physical_rows: None, last_updated_at_version_meta: None, created_at_version_meta: None, + pending_updated_row_offsets: None, } } @@ -354,6 +361,7 @@ impl Fragment { row_id_meta: None, last_updated_at_version_meta: None, created_at_version_meta: None, + pending_updated_row_offsets: None, } } @@ -480,6 +488,7 @@ impl TryFrom for Fragment { .created_at_version_sequence .map(RowDatasetVersionMeta::try_from) .transpose()?, + pending_updated_row_offsets: None, }) } } diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index 71de80c547f..ca046f0eec1 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -1319,6 +1319,7 @@ mod tests { physical_rows: None, created_at_version_meta: None, last_updated_at_version_meta: None, + pending_updated_row_offsets: None, }, Fragment { id: 1, @@ -1331,6 +1332,7 @@ mod tests { physical_rows: None, created_at_version_meta: None, last_updated_at_version_meta: None, + pending_updated_row_offsets: None, }, ]; diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 5be98a9b23d..7b0b25b7991 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -23,6 +23,7 @@ use futures::future::try_join_all; use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, join, stream}; use lance_arrow::{RecordBatchExt, SchemaExt}; use lance_core::datatypes::{OnMissing, OnTypeMismatch, SchemaCompareOptions}; +use lance_core::utils::address::RowAddress; use lance_core::utils::deletion::DeletionVector; use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::{Error, Result, cache::CacheKey, datatypes::Schema}; @@ -1645,7 +1646,12 @@ impl FileFragment { // Prepare the read projection: align with the write_schema's columns and append the left_on column. let mut read_columns: Vec = write_schema.fields.iter().map(|f| f.name.clone()).collect(); - read_columns.push(left_on.to_string()); + if !read_columns.iter().any(|column| column == left_on) { + read_columns.push(left_on.to_string()); + } + if !read_columns.iter().any(|column| column == ROW_ADDR) { + read_columns.push(ROW_ADDR.to_string()); + } let mut updater = self .updater( Some(&read_columns), @@ -1655,14 +1661,25 @@ impl FileFragment { .await?; // Hash join let joiner = Arc::new(HashJoiner::try_new(right_stream, right_on).await?); + let mut updated_row_offsets = Vec::new(); while let Some(batch) = updater.next().await? { - let updated_batch = joiner - .collect_with_fallback(batch, batch[left_on].clone(), self.dataset()) + let (updated_batch, matched_rows) = joiner + .collect_with_fallback_and_matches(batch, batch[left_on].clone(), self.dataset()) .await?; + if !matched_rows.is_empty() { + let row_addrs: &UInt64Array = as_primitive_array(batch[ROW_ADDR].as_ref()); + updated_row_offsets.extend(matched_rows.into_iter().map(|row_idx| { + let row_addr = row_addrs.value(row_idx as usize); + RowAddress::from(row_addr).row_offset() + })); + } updater.update(updated_batch).await?; } let mut updated_fragment = updater.finish().await?; + updated_row_offsets.sort_unstable(); + updated_row_offsets.dedup(); + updated_fragment.pending_updated_row_offsets = Some(updated_row_offsets); // Mark fields in updated data files as obsolete ("tombstone"). let updated_fields = updated_fragment.files.last().unwrap().fields.clone(); for data_file in &mut updated_fragment.files.iter_mut().rev().skip(1) { @@ -2628,6 +2645,7 @@ mod tests { }; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; use lance_core::ROW_ID; + use lance_core::ROW_LAST_UPDATED_AT_VERSION; use lance_core::utils::tempfile::TempStrDir; use lance_datagen::{RowCount, array, gen_batch}; use lance_file::version::LanceFileVersion; @@ -2702,6 +2720,7 @@ mod tests { max_rows_per_file: 40, max_rows_per_group: 10, data_storage_version: Some(LanceFileVersion::Stable), + enable_stable_row_ids: true, ..Default::default() }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -2881,6 +2900,33 @@ mod tests { batches1[0].column_by_name("col1").unwrap().as_ref(), &Int64Array::from(expected_col1) ); + let mut scanner1 = dataset1.scan(); + scanner1.project(&[ROW_LAST_UPDATED_AT_VERSION]).unwrap(); + let version_batches1 = scanner1 + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + let expected_last_updated_v1 = dataset1.version().version; + assert_eq!( + version_batches1[0] + .column_by_name(ROW_LAST_UPDATED_AT_VERSION) + .unwrap() + .as_ref(), + &UInt64Array::from( + (0..40) + .map(|idx| { + if idx == 0 || idx == 3 { + 1 + } else { + expected_last_updated_v1 + } + }) + .collect::>() + ) + ); // Test update with user specified keys let _ = dataset1 @@ -2962,6 +3008,33 @@ mod tests { batches2[0].column_by_name("col2").unwrap().as_ref(), &BooleanArray::from(expected_col2) ); + let mut scanner3 = dataset2.scan(); + scanner3.project(&[ROW_LAST_UPDATED_AT_VERSION]).unwrap(); + let version_batches2 = scanner3 + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + let expected_last_updated_v2 = dataset2.version().version; + assert_eq!( + version_batches2[0] + .column_by_name(ROW_LAST_UPDATED_AT_VERSION) + .unwrap() + .as_ref(), + &UInt64Array::from( + (0..40) + .map(|idx| { + if idx == 0 || idx == 3 { + 1 + } else { + expected_last_updated_v2 + } + }) + .collect::>() + ) + ); } #[tokio::test] diff --git a/rust/lance/src/dataset/hash_joiner.rs b/rust/lance/src/dataset/hash_joiner.rs index 351f942c1d2..fde8cea278a 100644 --- a/rust/lance/src/dataset/hash_joiner.rs +++ b/rust/lance/src/dataset/hash_joiner.rs @@ -211,16 +211,12 @@ impl HashJoiner { Ok(()) } - /// Collecting the data using the index column from left table, - /// invalid join column values in left table will be filled with origin values in left table - /// - /// Will run in parallel over columns using all available cores. - pub(super) async fn collect_with_fallback( + pub(super) async fn collect_with_fallback_and_matches( &self, left_batch: &RecordBatch, index_column: ArrayRef, dataset: &Dataset, - ) -> Result { + ) -> Result<(RecordBatch, Vec)> { if index_column.data_type() != &self.index_type { return Err(Error::invalid_input(format!( "Index column type mismatch: expected {}, got {}", @@ -242,6 +238,17 @@ impl HashJoiner { .unwrap_or((left_batch_index, left_rowi)) }) .collect::>(); + let matched_rows = indices + .iter() + .enumerate() + .filter_map(|(left_rowi, (batch_i, _))| { + if *batch_i == left_batch_index { + None + } else { + Some(left_rowi as u32) + } + }) + .collect::>(); let indices = Arc::new(indices); // Do this in parallel over the columns let columns = futures::stream::iter(0..self.batches[0].num_columns()) @@ -276,7 +283,10 @@ impl HashJoiner { .buffered(get_num_compute_intensive_cpus()) .try_collect::>() .await?; - Ok(RecordBatch::try_new(self.batches[0].schema(), columns)?) + Ok(( + RecordBatch::try_new(self.batches[0].schema(), columns)?, + matched_rows, + )) } } diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index 7cec534e171..0ede4c89d92 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -1664,6 +1664,7 @@ mod tests { physical_rows: Some(0), last_updated_at_version_meta: None, created_at_version_meta: None, + pending_updated_row_offsets: None, }; let single_bin = CandidateBin { fragments: vec![fragment.clone()], diff --git a/rust/lance/src/dataset/schema_evolution.rs b/rust/lance/src/dataset/schema_evolution.rs index b79cb283956..1ce8614841f 100644 --- a/rust/lance/src/dataset/schema_evolution.rs +++ b/rust/lance/src/dataset/schema_evolution.rs @@ -1045,6 +1045,7 @@ mod test { physical_rows: Some(50), last_updated_at_version_meta: None, created_at_version_meta: None, + pending_updated_row_offsets: None, })) } else { Ok(None) diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 574955b216b..faa8c657cfb 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -1673,7 +1673,9 @@ impl Transaction { let existing_fragments = maybe_existing_fragments?; // Apply updates to existing fragments - let updated_frags: Vec = existing_fragments + let original_fragments_map: std::collections::HashMap = + existing_fragments.iter().map(|f| (f.id, f)).collect(); + let mut updated_frags: Vec = existing_fragments .iter() .filter_map(|f| { if removed_fragment_ids.contains(&f.id) { @@ -1687,13 +1689,14 @@ impl Transaction { }) .collect(); - // Update version metadata for updated fragments if stable row IDs are enabled - // Note: We don't update version metadata for fragments with deletion vectors - // because the version sequences are indexed by physical row position, not logical position. - // Version metadata for deleted rows will be filtered out during scan using the deletion vector. - if next_row_id.is_some() { - // Version metadata will be properly set during compaction when deletions are materialized - } + Self::refresh_rewrite_columns_last_updated_meta( + updated_frags.as_mut_slice(), + updated_fragments, + &original_fragments_map, + update_mode.clone(), + next_row_id.is_some(), + current_manifest.map(|m| m.version).unwrap_or_default(), + )?; final_fragments.extend(updated_frags); @@ -2309,6 +2312,71 @@ impl Transaction { /// If an operation modifies one or more fields in a fragment then we need to remove /// that fragment from any indices that cover one of the modified fields. + fn refresh_rewrite_columns_last_updated_meta( + updated_frags: &mut [Fragment], + updated_fragments: &[Fragment], + original_fragments_map: &HashMap, + update_mode: Option, + stable_row_ids_enabled: bool, + prev_version: u64, + ) -> Result<()> { + let rewrite_columns = update_mode == Some(UpdateMode::RewriteColumns); + let updated_fragment_ids = updated_fragments + .iter() + .map(|f| f.id) + .collect::>(); + let has_partial_rewrite_offsets = updated_fragments + .iter() + .any(|fragment| fragment.pending_updated_row_offsets.is_some()); + if !stable_row_ids_enabled || (!rewrite_columns && !has_partial_rewrite_offsets) { + return Ok(()); + } + + let new_version = prev_version + 1; + for fragment in updated_frags + .iter_mut() + .filter(|fragment| updated_fragment_ids.contains(&fragment.id)) + { + let preserves_existing_last_updated_meta = original_fragments_map + .get(&fragment.id) + .is_some_and(|original_fragment| { + fragment.last_updated_at_version_meta + != original_fragment.last_updated_at_version_meta + }); + + match fragment.pending_updated_row_offsets.take() { + Some(updated_offsets) + if !matches!( + fragment.last_updated_at_version_meta, + Some(lance_table::format::RowDatasetVersionMeta::External(_)) + ) => + { + let updated_offsets = updated_offsets + .into_iter() + .map(|offset| offset as usize) + .collect::>(); + lance_table::rowids::version::refresh_row_latest_update_meta_for_partial_frag_rewrite_cols( + fragment, + &updated_offsets, + new_version, + prev_version, + )?; + } + Some(_) => { + fragment.last_updated_at_version_meta = + lance_table::rowids::version::build_version_meta(fragment, new_version); + } + None if rewrite_columns && !preserves_existing_last_updated_meta => { + fragment.last_updated_at_version_meta = + lance_table::rowids::version::build_version_meta(fragment, new_version); + } + None => {} + } + } + + Ok(()) + } + fn prune_updated_fields_from_indices( indices: &mut [IndexMetadata], updated_fragments: &[Fragment], @@ -3788,6 +3856,7 @@ mod tests { deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, + pending_updated_row_offsets: None, }]; let mut next_row_id = 0; @@ -3820,6 +3889,7 @@ mod tests { deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, + pending_updated_row_offsets: None, }]; let mut next_row_id = 100; @@ -3852,6 +3922,7 @@ mod tests { deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, + pending_updated_row_offsets: None, }]; let mut next_row_id = 100; @@ -3887,6 +3958,7 @@ mod tests { deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, + pending_updated_row_offsets: None, }]; let mut next_row_id = 100; @@ -3915,6 +3987,7 @@ mod tests { deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, + pending_updated_row_offsets: None, }, Fragment { id: 2, @@ -3924,6 +3997,7 @@ mod tests { deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, + pending_updated_row_offsets: None, }, ]; let mut next_row_id = 1000; @@ -3968,6 +4042,7 @@ mod tests { deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, + pending_updated_row_offsets: None, }]; let mut next_row_id = 0; diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index ddf1b769425..8fbc991f11c 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -498,6 +498,7 @@ mod tests { physical_rows: Some(10), last_updated_at_version_meta: None, created_at_version_meta: None, + pending_updated_row_offsets: None, } } diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 7a7b9a5a77a..2f3446b9cbe 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -1570,6 +1570,7 @@ mod tests { physical_rows: None, last_updated_at_version_meta: None, created_at_version_meta: None, + pending_updated_row_offsets: None, }, Fragment { id: 1, @@ -1582,6 +1583,7 @@ mod tests { physical_rows: None, last_updated_at_version_meta: None, created_at_version_meta: None, + pending_updated_row_offsets: None, }, ]; @@ -1619,6 +1621,7 @@ mod tests { physical_rows: None, last_updated_at_version_meta: None, created_at_version_meta: None, + pending_updated_row_offsets: None, }, Fragment { id: 1, @@ -1631,6 +1634,7 @@ mod tests { physical_rows: None, last_updated_at_version_meta: None, created_at_version_meta: None, + pending_updated_row_offsets: None, }, ]; assert_eq!(manifest.fragments.as_ref(), &expected_fragments); diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index f3b037aa02e..6aa23f9d576 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -247,6 +247,7 @@ impl TestDatasetGenerator { physical_rows: Some(batch.num_rows()), last_updated_at_version_meta: None, created_at_version_meta: None, + pending_updated_row_offsets: None, } } }