Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions java/lance-jni/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arrow::array::{RecordBatch, RecordBatchIterator, StructArray};
use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema, from_ffi_and_data_type};
use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
use arrow_schema::DataType;
use jni::objects::{JIntArray, JValue, JValueGen};
use jni::objects::{JIntArray, JLongArray, JValue, JValueGen};
use jni::{
JNIEnv,
objects::{JObject, JString},
Expand Down Expand Up @@ -444,7 +444,7 @@ const DELETE_FILE_CONSTRUCTOR_SIG: &str =
"(JJLjava/lang/Long;Lorg/lance/fragment/DeletionFileType;Ljava/lang/Integer;)V";
const DELETE_FILE_TYPE_CLASS: &str = "org/lance/fragment/DeletionFileType";
const FRAGMENT_METADATA_CLASS: &str = "org/lance/FragmentMetadata";
const FRAGMENT_METADATA_CONSTRUCTOR_SIG: &str = "(ILjava/util/List;Ljava/lang/Long;Lorg/lance/fragment/DeletionFile;Lorg/lance/fragment/RowIdMeta;)V";
const FRAGMENT_METADATA_CONSTRUCTOR_SIG: &str = "(ILjava/util/List;Ljava/lang/Long;Lorg/lance/fragment/DeletionFile;Lorg/lance/fragment/RowIdMeta;[J)V";
const ROW_ID_META_CLASS: &str = "org/lance/fragment/RowIdMeta";
const ROW_ID_META_CONSTRUCTOR_SIG: &str = "(Ljava/lang/String;)V";
const FRAGMENT_MERGE_RESULT_CLASS: &str = "org/lance/fragment/FragmentMergeResult";
Expand Down Expand Up @@ -574,6 +574,18 @@ impl IntoJava for &Fragment {
Some(m) => m.into_java(env)?,
None => JObject::null(),
};
let pending_updated_row_offsets = match &self.pending_updated_row_offsets {
Some(offsets) => {
let java_offsets = env.new_long_array(offsets.len() as i32)?;
let java_offsets_slice = offsets
.iter()
.map(|offset| i64::from(*offset))
.collect::<Vec<_>>();
env.set_long_array_region(&java_offsets, 0, &java_offsets_slice)?;
JObject::from(java_offsets)
}
None => JObject::null(),
};

env.new_object(
FRAGMENT_METADATA_CLASS,
Expand All @@ -584,6 +596,7 @@ impl IntoJava for &Fragment {
JValueGen::Object(physical_rows),
JValueGen::Object(&deletion_file),
JValueGen::Object(&row_id_meta),
JValueGen::Object(&pending_updated_row_offsets),
],
)
.map_err(|e| {
Expand Down Expand Up @@ -642,6 +655,14 @@ impl FromJObjectWithEnv<Fragment> for JObject<'_> {
} else {
Some(row_id_meta.extract_object(env)?)
};
let pending_updated_row_offsets = env
.call_method(self, "getPendingUpdatedRowOffsets", "()[J", &[])?
.l()?;
let pending_updated_row_offsets = if pending_updated_row_offsets.is_null() {
None
} else {
Some(JLongArray::from(pending_updated_row_offsets).extract_object(env)?)
};
Ok(Fragment {
id,
files,
Expand All @@ -650,6 +671,7 @@ impl FromJObjectWithEnv<Fragment> for JObject<'_> {
row_id_meta,
created_at_version_meta: None,
last_updated_at_version_meta: None,
pending_updated_row_offsets,
})
}
}
Expand Down
19 changes: 19 additions & 0 deletions java/src/main/java/org/lance/FragmentMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,30 @@ public class FragmentMetadata implements Serializable {
private final long physicalRows;
private final DeletionFile deletionFile;
private final RowIdMeta rowIdMeta;
private final long[] pendingUpdatedRowOffsets;

public FragmentMetadata(
int id,
List<DataFile> files,
Long physicalRows,
DeletionFile deletionFile,
RowIdMeta rowIdMeta) {
this(id, files, physicalRows, deletionFile, rowIdMeta, null);
}

public FragmentMetadata(
int id,
List<DataFile> files,
Long physicalRows,
DeletionFile deletionFile,
RowIdMeta rowIdMeta,
long[] pendingUpdatedRowOffsets) {
this.id = id;
this.files = files;
this.physicalRows = physicalRows;
this.deletionFile = deletionFile;
this.rowIdMeta = rowIdMeta;
this.pendingUpdatedRowOffsets = pendingUpdatedRowOffsets;
}

public int getId() {
Expand Down Expand Up @@ -80,6 +92,13 @@ public RowIdMeta getRowIdMeta() {
return rowIdMeta;
}

/**
* Internal commit-time state used to preserve per-row rewrite metadata across JNI round-trips.
*/
public long[] getPendingUpdatedRowOffsets() {
return pendingUpdatedRowOffsets;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
47 changes: 43 additions & 4 deletions java/src/test/java/org/lance/operation/UpdateTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.lance.Transaction;
import org.lance.fragment.FragmentUpdateResult;
import org.lance.ipc.LanceScanner;
import org.lance.ipc.ScanOptions;
import org.lance.operation.Update.UpdateMode;

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.TimeStampSecTZVector;
import org.apache.arrow.vector.VarCharVector;
Expand All @@ -32,6 +34,10 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -110,7 +116,9 @@ void testUpdateColumns(@TempDir Path tempDir) throws Exception {
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.UpdateColumnTestDataset testDataset =
new TestUtils.UpdateColumnTestDataset(allocator, datasetPath);
dataset = testDataset.createEmptyDataset();
dataset =
testDataset.createDatasetWithWriteParams(
new org.lance.WriteParams.Builder().withEnableStableRowIds(true).build());
/* dataset content
* _rowid | id | name | timeStamp |
* 0: | 0 | "Person 0" | 0 |
Expand Down Expand Up @@ -146,24 +154,32 @@ void testUpdateColumns(@TempDir Path tempDir) throws Exception {
* 3: | null | null |
*/
FragmentUpdateResult updateResult = testDataset.updateColumn(targetFragment, updateRowCount);
FragmentMetadata serializedUpdatedFragment =
serializeAndDeserialize(updateResult.getUpdatedFragment());
try (Transaction updateTxn =
new Transaction.Builder()
.readVersion(dataset.version())
.operation(
Update.builder()
.updatedFragments(
Collections.singletonList(updateResult.getUpdatedFragment()))
.updatedFragments(Collections.singletonList(serializedUpdatedFragment))
.fieldsModified(updateResult.getFieldsModified())
.build())
.build()) {
try (Dataset dataset = new CommitBuilder(this.dataset).execute(updateTxn)) {
assertEquals(3, dataset.version());
assertEquals(3, dataset.latestVersion());
Fragment fragment = dataset.getFragments().get(0);
try (LanceScanner scanner = fragment.newScan(rowCount)) {
try (LanceScanner scanner =
fragment.newScan(
new ScanOptions.Builder()
.batchSize((long) rowCount)
.columns(
Arrays.asList("id", "name", "timeStamp", "_row_last_updated_at_version"))
.build())) {
List<Integer> actualIds = new ArrayList<>(rowCount);
List<String> actualNames = new ArrayList<>(rowCount);
List<Long> actualTimeStamps = new ArrayList<>(rowCount);
List<Long> actualLastUpdatedVersions = new ArrayList<>(rowCount);
try (ArrowReader reader = scanner.scanBatches()) {
while (reader.loadNextBatch()) {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
Expand All @@ -181,6 +197,14 @@ void testUpdateColumns(@TempDir Path tempDir) throws Exception {
actualTimeStamps.add(
timeStampVector.isNull(i) ? null : timeStampVector.getObject(i));
}
FieldVector lastUpdatedVector =
(FieldVector) root.getVector("_row_last_updated_at_version");
for (int i = 0; i < lastUpdatedVector.getValueCount(); i++) {
Number lastUpdatedValue =
lastUpdatedVector.isNull(i) ? null : (Number) lastUpdatedVector.getObject(i);
actualLastUpdatedVersions.add(
lastUpdatedValue == null ? null : lastUpdatedValue.longValue());
}
}
}
/* result dataset content
Expand All @@ -196,12 +220,27 @@ void testUpdateColumns(@TempDir Path tempDir) throws Exception {
List<String> expectNames =
Arrays.asList("Update 0", null, "Update 2", null, "Person 4", null);
List<Long> expectTimeStamps = Arrays.asList(0L, null, 2L, null, 4L, null);
List<Long> expectLastUpdatedVersions = Arrays.asList(3L, 3L, 3L, 3L, 2L, 2L);
assertEquals(expectIds, actualIds);
assertEquals(expectNames, actualNames);
assertEquals(expectTimeStamps, actualTimeStamps);
assertEquals(expectLastUpdatedVersions, actualLastUpdatedVersions);
}
}
}
}
}

@SuppressWarnings("unchecked")
private static <T> T serializeAndDeserialize(T object) throws Exception {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (ObjectOutputStream out = new ObjectOutputStream(outputStream)) {
out.writeObject(object);
}

try (ObjectInputStream in =
new ObjectInputStream(new ByteArrayInputStream(outputStream.toByteArray()))) {
return (T) in.readObject();
}
}
}
13 changes: 11 additions & 2 deletions python/python/lance/fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ class FragmentMetadata:
row_id_meta: Optional[RowIdMeta] = None
created_at_version_meta: Optional[RowDatasetVersionMeta] = None
last_updated_at_version_meta: Optional[RowDatasetVersionMeta] = None
_pending_updated_row_offsets: Optional[List[int]] = field(
default=None, repr=False, compare=False
)

@property
def num_deletions(self) -> int:
Expand Down Expand Up @@ -118,7 +121,9 @@ def to_json(self) -> dict:
self.deletion_file.asdict() if self.deletion_file is not None else None
),
row_id_meta=(
self.row_id_meta.asdict() if self.row_id_meta is not None else None
json.loads(self.row_id_meta.json())
if self.row_id_meta is not None
else None
),
created_at_version_meta=(
json.loads(self.created_at_version_meta.json())
Expand All @@ -130,6 +135,7 @@ def to_json(self) -> dict:
if self.last_updated_at_version_meta is not None
else None
),
_pending_updated_row_offsets=self._pending_updated_row_offsets,
)

@staticmethod
Expand All @@ -142,7 +148,7 @@ def from_json(json_data: str) -> FragmentMetadata:

row_id_meta = json_data.get("row_id_meta")
if row_id_meta is not None:
row_id_meta = RowIdMeta(**row_id_meta)
row_id_meta = RowIdMeta.from_json(json.dumps(row_id_meta))

created_at_version_meta = json_data.get("created_at_version_meta")
if created_at_version_meta is not None:
Expand All @@ -156,6 +162,8 @@ def from_json(json_data: str) -> FragmentMetadata:
json.dumps(last_updated_at_version_meta)
)

pending_updated_row_offsets = json_data.get("_pending_updated_row_offsets")

return FragmentMetadata(
id=json_data["id"],
files=[DataFile(**f) for f in json_data["files"]],
Expand All @@ -164,6 +172,7 @@ def from_json(json_data: str) -> FragmentMetadata:
row_id_meta=row_id_meta,
created_at_version_meta=created_at_version_meta,
last_updated_at_version_meta=last_updated_at_version_meta,
_pending_updated_row_offsets=pending_updated_row_offsets,
)


Expand Down
25 changes: 25 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,31 @@ def test_take_version_system_columns(tmp_path: Path):
assert created_at == [dataset.version] * 2


def test_merge_insert_partial_rewrite_columns_preserves_last_updated_versions(
tmp_path: Path,
):
table = pa.table(
{
"id": [1, 2, 3, 4],
"value": [10, 20, 30, 40],
"tag": ["a", "b", "c", "d"],
}
)
base_dir = tmp_path / "test_merge_insert_partial_rewrite_columns_versions"
dataset = lance.write_dataset(table, base_dir, enable_stable_row_ids=True)

new_table = pa.table({"id": [2], "value": [200]})
dataset.merge_insert("id").when_matched_update_all().execute(new_table)

result = dataset.to_table(
columns=["id", "value", "tag", "_row_last_updated_at_version"]
).to_pydict()
assert result["id"] == [1, 2, 3, 4]
assert result["value"] == [10, 200, 30, 40]
assert result["tag"] == ["a", "b", "c", "d"]
assert result["_row_last_updated_at_version"] == [1, 2, 1, 1]


@pytest.mark.parametrize("indices", [[], [1, 1], [1, 1, 20, 20, 21], [21, 0, 21, 1, 0]])
def test_take_duplicate_index(tmp_path: Path, indices: List[int]):
table = pa.table({"x": range(24)})
Expand Down
Loading
Loading