Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion docs/content/iceberg/rest-catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ the query results:
200, 20, 2
*/
```

**Schema compatabilty and Partition evolution:**

There is a fundamental difference between Paimon and Iceberg regarding the starting fieldId. Paimon uses fieldId 0, while Iceberg uses fieldId 1. If we create an Iceberg table using a Paimon schema directly, it will shift all fieldIds by +1, causing field disorder. However, it is possible to update the schema after table creation and start the schema from fieldId 0.

Table creation attempts to minimize issues with fieldId disorder and partition evolution by following a 2 option logic:

- Partition fieldId = 0: Paimon creates an empty schema first and then updates the schema to the actual one. Partition evolution is unavoidable.
- Partition fieldId > 0: Paimon creates an initial dummy schema first, offsetting partition fields correctly, and then updates the schema to the actual one, avoiding partition evolution.

**Note:**

Paimon will firstly write iceberg metadata in a separate directory like hadoop-catalog, and then commit metadata to iceberg rest catalog.
Expand All @@ -108,5 +118,5 @@ If the two are incompatible, we take the metadata stored in the separate directo
There are some cases when committing to iceberg rest catalog:
1. table not exists in iceberg rest-catalog. It'll create the table in rest catalog first, and commit metadata.
2. table exists in iceberg rest-catalog and is compatible with the base metadata stored in the separate directory. It'll directly get the table and commit metadata.
3. table exists, and isn't compatible with the base metadata stored in the separate directory. It'll **drop the table and recreate the table**, then commit metadata.
3. table exists, and isn't compatible with the base metadata stored in the separate directory. It'll **drop the table and recreate the table**, then commit metadata.

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.MetadataUpdate;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
Expand All @@ -41,6 +43,8 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -141,7 +145,7 @@ private void commitMetadataImpl(
try {
if (!tableExists()) {
LOG.info("Table {} does not exist, create it.", icebergTableIdentifier);
icebergTable = createTable();
icebergTable = createTable(newMetadata);
updatdeBuilder =
updatesForCorrectBase(
((BaseTable) icebergTable).operations().current(),
Expand Down Expand Up @@ -240,7 +244,7 @@ private TableMetadata.Builder updatesForCorrectBase(

private TableMetadata.Builder updatesForIncorrectBase(TableMetadata newMetadata) {
LOG.info("the base metadata is incorrect, we'll recreate the iceberg table.");
icebergTable = recreateTable();
icebergTable = recreateTable(newMetadata);
return updatesForCorrectBase(
((BaseTable) icebergTable).operations().current(), newMetadata, true);
}
Expand All @@ -267,15 +271,50 @@ private void createDatabase() {
restCatalog.createNamespace(Namespace.of(icebergDatabaseName));
}

private Table createTable() {
/* Here we create iceberg table with an emptySchema. This is because:
When creating table, fieldId in iceberg will be forced to start from 1, while fieldId in paimon usually start from 0.
If we directly use the schema extracted from paimon to create iceberg table, the fieldId will be in disorder, and this
may cause incorrectness when reading by iceberg reader. So we use an emptySchema here, and add the corresponding
schemas later.
private Table createTable(TableMetadata newMetadata) {
/*
Handles fieldId incompatibility between Paimon (starts at 0) and Iceberg (starts at 1).

Direct schema conversion shifts all fieldIds by +1, causing field disorder. While
schemas can be updated post-creation to start at fieldId 0, creating an empty schema
first triggers partition evolution issues that break some query engines.

Strategy based on partition field position:
- fieldId = 0: Creates empty schema first, partition evolution unavoidable
- fieldId > 0: Creates dummy schema with offset fields and gap filling to preserve the partition spec
*/
Schema emptySchema = new Schema();
return restCatalog.createTable(icebergTableIdentifier, emptySchema);
PartitionSpec spec = newMetadata.spec();
boolean isPartitionedWithZeroFieldId =
spec.fields().stream().anyMatch(f -> f.sourceId() == 0);
if (spec.isUnpartitioned() || isPartitionedWithZeroFieldId) {
if (isPartitionedWithZeroFieldId) {
LOG.info(
"Partition fieldId = 0. The Iceberg REST committer will use partition evolution to support Iceberg compatibility with the Paimon schema. If you want to avoid this, use a non-zero fieldId partition field");
}
Schema emptySchema = new Schema();
return restCatalog.createTable(icebergTableIdentifier, emptySchema);
} else {
LOG.info(
"Partition fieldId > 0. In order to avoid partition evlolution, dummy schema will be created first");

int size =
spec.fields().stream().mapToInt(PartitionField::sourceId).max().orElseThrow();
// prefill the schema with dummy fields
NestedField[] columns = new NestedField[size];
for (int idx = 0; idx < size; idx++) {
int fieldId = idx + 1;
columns[idx] =
NestedField.optional(fieldId, "f" + fieldId, Types.BooleanType.get());
}
// find and set partition fields with offset -1, so they align correctly after table
// creation
for (PartitionField f : spec.fields()) {
columns[f.sourceId() - 1] = newMetadata.schema().findField(f.sourceId());
}

Schema dummySchema = new Schema(columns);
return restCatalog.createTable(icebergTableIdentifier, dummySchema, spec);
}
}

private Table getTable() {
Expand All @@ -287,10 +326,10 @@ private void dropTable() {
restCatalog.dropTable(icebergTableIdentifier, false);
}

private Table recreateTable() {
private Table recreateTable(TableMetadata newMetadata) {
try {
dropTable();
return createTable();
return createTable(newMetadata);
} catch (Exception e) {
throw new RuntimeException("Fail to recreate iceberg table.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@

import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.IcebergGenerics;
Expand All @@ -53,6 +55,8 @@
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.rest.RESTCatalogServer;
import org.apache.iceberg.rest.RESTServerExtension;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand Down Expand Up @@ -139,6 +143,9 @@ public void testUnPartitionedPrimaryKeyTable() throws Exception {
testRecords,
expected,
Record::toString);

PartitionSpec expectedPartitionSpec = PartitionSpec.builderFor(new Schema()).build();
runPartitionSpecCompatibilityTest(expectedPartitionSpec);
}

@Test
Expand Down Expand Up @@ -206,6 +213,89 @@ public void testPartitionedPrimaryKeyTable() throws Exception {
testRecords,
expected,
Record::toString);

PartitionSpec expectedPartitionSpec = PartitionSpec.builderFor(new Schema()).build();
runPartitionSpecCompatibilityTest(expectedPartitionSpec);
}

@Test
public void testPartitionedPrimaryKeyTableWithNonZeroFieldId() throws Exception {
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.STRING(),
DataTypes.INT(),
DataTypes.STRING(),
DataTypes.INT(),
DataTypes.BIGINT()
},
new String[] {
"k", "pt1", "pt2", "v1", "v2"
}); // partition starts from fieldId 1

BiFunction<Integer, String, BinaryRow> binaryRow =
(pt1, pt2) -> {
BinaryRow b = new BinaryRow(2);
BinaryRowWriter writer = new BinaryRowWriter(b);
writer.writeInt(0, pt1);
writer.writeString(1, BinaryString.fromString(pt2));
writer.complete();
return b;
};

int numRounds = 20;
int numRecords = 500;
ThreadLocalRandom random = ThreadLocalRandom.current();
boolean samePartitionEachRound = random.nextBoolean();

List<List<TestRecord>> testRecords = new ArrayList<>();
List<List<String>> expected = new ArrayList<>();
Map<String, String> expectedMap = new LinkedHashMap<>();
for (int r = 0; r < numRounds; r++) {
List<TestRecord> round = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
int pt1 = (random.nextInt(0, samePartitionEachRound ? 1 : 2) + r) % 3;
String pt2 = String.valueOf(random.nextInt(10, 12));
String k = String.valueOf(random.nextInt(0, 100));
int v1 = random.nextInt();
long v2 = random.nextLong();
round.add(
new TestRecord(
binaryRow.apply(pt1, pt2),
GenericRow.of(
BinaryString.fromString(k),
pt1,
BinaryString.fromString(pt2),
v1,
v2)));
expectedMap.put(
String.format("%s, %d, %s", k, pt1, pt2), String.format("%d, %d", v1, v2));
}
testRecords.add(round);
expected.add(
expectedMap.entrySet().stream()
.map(e -> String.format("Record(%s, %s)", e.getKey(), e.getValue()))
.sorted()
.collect(Collectors.toList()));
}

runCompatibilityTest(
rowType,
Arrays.asList("pt1", "pt2"),
Arrays.asList("k", "pt1", "pt2"),
testRecords,
expected,
Record::toString);

PartitionSpec expectedPartitionSpec =
PartitionSpec.builderFor(
new Schema(
NestedField.required(1, "pt1", Types.IntegerType.get()),
NestedField.required(2, "pt2", Types.StringType.get())))
.identity("pt1")
.identity("pt2")
.build();
runPartitionSpecCompatibilityTest(expectedPartitionSpec);
}

private void runCompatibilityTest(
Expand Down Expand Up @@ -256,6 +346,12 @@ private void runCompatibilityTest(
commit.close();
}

private void runPartitionSpecCompatibilityTest(PartitionSpec expectedSpec) {
Table icebergTable = restCatalog.loadTable(TableIdentifier.of("mydb", "t"));
PartitionSpec spec = icebergTable.spec();
assertThat(spec).isEqualTo(expectedSpec);
}

@Test
public void testSchemaAndPropertiesChange() throws Exception {
RowType rowType =
Expand Down