diff --git a/docs/content/iceberg/rest-catalog.md b/docs/content/iceberg/rest-catalog.md index 62dbc3b00765..8acdd216ba24 100644 --- a/docs/content/iceberg/rest-catalog.md +++ b/docs/content/iceberg/rest-catalog.md @@ -100,6 +100,16 @@ the query results: 200, 20, 2 */ ``` + +**Schema compatabilty and Partition evolution:** + +There is a fundamental difference between Paimon and Iceberg regarding the starting fieldId. Paimon uses fieldId 0, while Iceberg uses fieldId 1. If we create an Iceberg table using a Paimon schema directly, it will shift all fieldIds by +1, causing field disorder. However, it is possible to update the schema after table creation and start the schema from fieldId 0. + +Table creation attempts to minimize issues with fieldId disorder and partition evolution by following a 2 option logic: + +- Partition fieldId = 0: Paimon creates an empty schema first and then updates the schema to the actual one. Partition evolution is unavoidable. +- Partition fieldId > 0: Paimon creates an initial dummy schema first, offsetting partition fields correctly, and then updates the schema to the actual one, avoiding partition evolution. + **Note:** Paimon will firstly write iceberg metadata in a separate directory like hadoop-catalog, and then commit metadata to iceberg rest catalog. @@ -108,5 +118,5 @@ If the two are incompatible, we take the metadata stored in the separate directo There are some cases when committing to iceberg rest catalog: 1. table not exists in iceberg rest-catalog. It'll create the table in rest catalog first, and commit metadata. 2. table exists in iceberg rest-catalog and is compatible with the base metadata stored in the separate directory. It'll directly get the table and commit metadata. -3. table exists, and isn't compatible with the base metadata stored in the separate directory. It'll **drop the table and recreate the table**, then commit metadata. +3. table exists, and isn't compatible with the base metadata stored in the separate directory. It'll **drop the table and recreate the table**, then commit metadata. diff --git a/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java index 119ddcd743e1..bb358512fe53 100644 --- a/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java +++ b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitter.java @@ -31,6 +31,8 @@ import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotRef; @@ -41,6 +43,8 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.rest.RESTCatalog; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.NestedField; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -141,7 +145,7 @@ private void commitMetadataImpl( try { if (!tableExists()) { LOG.info("Table {} does not exist, create it.", icebergTableIdentifier); - icebergTable = createTable(); + icebergTable = createTable(newMetadata); updatdeBuilder = updatesForCorrectBase( ((BaseTable) icebergTable).operations().current(), @@ -240,7 +244,7 @@ private TableMetadata.Builder updatesForCorrectBase( private TableMetadata.Builder updatesForIncorrectBase(TableMetadata newMetadata) { LOG.info("the base metadata is incorrect, we'll recreate the iceberg table."); - icebergTable = recreateTable(); + icebergTable = recreateTable(newMetadata); return updatesForCorrectBase( ((BaseTable) icebergTable).operations().current(), newMetadata, true); } @@ -267,15 +271,50 @@ private void createDatabase() { restCatalog.createNamespace(Namespace.of(icebergDatabaseName)); } - private Table createTable() { - /* Here we create iceberg table with an emptySchema. This is because: - When creating table, fieldId in iceberg will be forced to start from 1, while fieldId in paimon usually start from 0. - If we directly use the schema extracted from paimon to create iceberg table, the fieldId will be in disorder, and this - may cause incorrectness when reading by iceberg reader. So we use an emptySchema here, and add the corresponding - schemas later. + private Table createTable(TableMetadata newMetadata) { + /* + Handles fieldId incompatibility between Paimon (starts at 0) and Iceberg (starts at 1). + + Direct schema conversion shifts all fieldIds by +1, causing field disorder. While + schemas can be updated post-creation to start at fieldId 0, creating an empty schema + first triggers partition evolution issues that break some query engines. + + Strategy based on partition field position: + - fieldId = 0: Creates empty schema first, partition evolution unavoidable + - fieldId > 0: Creates dummy schema with offset fields and gap filling to preserve the partition spec */ - Schema emptySchema = new Schema(); - return restCatalog.createTable(icebergTableIdentifier, emptySchema); + PartitionSpec spec = newMetadata.spec(); + boolean isPartitionedWithZeroFieldId = + spec.fields().stream().anyMatch(f -> f.sourceId() == 0); + if (spec.isUnpartitioned() || isPartitionedWithZeroFieldId) { + if (isPartitionedWithZeroFieldId) { + LOG.info( + "Partition fieldId = 0. The Iceberg REST committer will use partition evolution to support Iceberg compatibility with the Paimon schema. If you want to avoid this, use a non-zero fieldId partition field"); + } + Schema emptySchema = new Schema(); + return restCatalog.createTable(icebergTableIdentifier, emptySchema); + } else { + LOG.info( + "Partition fieldId > 0. In order to avoid partition evlolution, dummy schema will be created first"); + + int size = + spec.fields().stream().mapToInt(PartitionField::sourceId).max().orElseThrow(); + // prefill the schema with dummy fields + NestedField[] columns = new NestedField[size]; + for (int idx = 0; idx < size; idx++) { + int fieldId = idx + 1; + columns[idx] = + NestedField.optional(fieldId, "f" + fieldId, Types.BooleanType.get()); + } + // find and set partition fields with offset -1, so they align correctly after table + // creation + for (PartitionField f : spec.fields()) { + columns[f.sourceId() - 1] = newMetadata.schema().findField(f.sourceId()); + } + + Schema dummySchema = new Schema(columns); + return restCatalog.createTable(icebergTableIdentifier, dummySchema, spec); + } } private Table getTable() { @@ -287,10 +326,10 @@ private void dropTable() { restCatalog.dropTable(icebergTableIdentifier, false); } - private Table recreateTable() { + private Table recreateTable(TableMetadata newMetadata) { try { dropTable(); - return createTable(); + return createTable(newMetadata); } catch (Exception e) { throw new RuntimeException("Fail to recreate iceberg table.", e); } diff --git a/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java index e52ac6e06b75..7111726f3680 100644 --- a/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java +++ b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergRestMetadataCommitterTest.java @@ -43,6 +43,8 @@ import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.IcebergGenerics; @@ -53,6 +55,8 @@ import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.rest.RESTCatalogServer; import org.apache.iceberg.rest.RESTServerExtension; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.NestedField; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -139,6 +143,9 @@ public void testUnPartitionedPrimaryKeyTable() throws Exception { testRecords, expected, Record::toString); + + PartitionSpec expectedPartitionSpec = PartitionSpec.builderFor(new Schema()).build(); + runPartitionSpecCompatibilityTest(expectedPartitionSpec); } @Test @@ -206,6 +213,89 @@ public void testPartitionedPrimaryKeyTable() throws Exception { testRecords, expected, Record::toString); + + PartitionSpec expectedPartitionSpec = PartitionSpec.builderFor(new Schema()).build(); + runPartitionSpecCompatibilityTest(expectedPartitionSpec); + } + + @Test + public void testPartitionedPrimaryKeyTableWithNonZeroFieldId() throws Exception { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.STRING(), + DataTypes.INT(), + DataTypes.STRING(), + DataTypes.INT(), + DataTypes.BIGINT() + }, + new String[] { + "k", "pt1", "pt2", "v1", "v2" + }); // partition starts from fieldId 1 + + BiFunction binaryRow = + (pt1, pt2) -> { + BinaryRow b = new BinaryRow(2); + BinaryRowWriter writer = new BinaryRowWriter(b); + writer.writeInt(0, pt1); + writer.writeString(1, BinaryString.fromString(pt2)); + writer.complete(); + return b; + }; + + int numRounds = 20; + int numRecords = 500; + ThreadLocalRandom random = ThreadLocalRandom.current(); + boolean samePartitionEachRound = random.nextBoolean(); + + List> testRecords = new ArrayList<>(); + List> expected = new ArrayList<>(); + Map expectedMap = new LinkedHashMap<>(); + for (int r = 0; r < numRounds; r++) { + List round = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + int pt1 = (random.nextInt(0, samePartitionEachRound ? 1 : 2) + r) % 3; + String pt2 = String.valueOf(random.nextInt(10, 12)); + String k = String.valueOf(random.nextInt(0, 100)); + int v1 = random.nextInt(); + long v2 = random.nextLong(); + round.add( + new TestRecord( + binaryRow.apply(pt1, pt2), + GenericRow.of( + BinaryString.fromString(k), + pt1, + BinaryString.fromString(pt2), + v1, + v2))); + expectedMap.put( + String.format("%s, %d, %s", k, pt1, pt2), String.format("%d, %d", v1, v2)); + } + testRecords.add(round); + expected.add( + expectedMap.entrySet().stream() + .map(e -> String.format("Record(%s, %s)", e.getKey(), e.getValue())) + .sorted() + .collect(Collectors.toList())); + } + + runCompatibilityTest( + rowType, + Arrays.asList("pt1", "pt2"), + Arrays.asList("k", "pt1", "pt2"), + testRecords, + expected, + Record::toString); + + PartitionSpec expectedPartitionSpec = + PartitionSpec.builderFor( + new Schema( + NestedField.required(1, "pt1", Types.IntegerType.get()), + NestedField.required(2, "pt2", Types.StringType.get()))) + .identity("pt1") + .identity("pt2") + .build(); + runPartitionSpecCompatibilityTest(expectedPartitionSpec); } private void runCompatibilityTest( @@ -256,6 +346,12 @@ private void runCompatibilityTest( commit.close(); } + private void runPartitionSpecCompatibilityTest(PartitionSpec expectedSpec) { + Table icebergTable = restCatalog.loadTable(TableIdentifier.of("mydb", "t")); + PartitionSpec spec = icebergTable.spec(); + assertThat(spec).isEqualTo(expectedSpec); + } + @Test public void testSchemaAndPropertiesChange() throws Exception { RowType rowType =