From 5b1c90cf16e6869f306e37cec9cb60647ad2d2ab Mon Sep 17 00:00:00 2001 From: tsreaper Date: Thu, 5 Feb 2026 10:45:46 +0800 Subject: [PATCH] [core] Clean up constructor for FileStoreCommitImpl --- .../org/apache/paimon/AbstractFileStore.java | 24 +--- .../paimon/operation/FileStoreCommitImpl.java | 112 +++++++----------- .../operation/commit/StrictModeChecker.java | 16 --- 3 files changed, 47 insertions(+), 105 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 8b30113f02e1..eea68990c774 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -48,7 +48,6 @@ import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.operation.commit.CommitRollback; import org.apache.paimon.operation.commit.ConflictDetection; -import org.apache.paimon.operation.commit.StrictModeChecker; import org.apache.paimon.partition.PartitionExpireStrategy; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; @@ -291,12 +290,6 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) { newIndexFileHandler(), snapshotManager, scanner); - StrictModeChecker strictModeChecker = - StrictModeChecker.create( - snapshotManager, - commitUser, - this::newScan, - options.commitStrictModeLastSafeSnapshot().orElse(null)); CommitRollback rollback = null; TableRollback tableRollback = catalogEnvironment.catalogTableRollback(); if (tableRollback != null) { @@ -310,32 +303,17 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) { commitUser, partitionType, options, - options.partitionDefaultName(), pathFactory(), snapshotManager, manifestFileFactory(), manifestListFactory(), indexManifestFileFactory(), - newScan(), - options.bucket(), - options.manifestTargetSize(), - options.manifestFullCompactionThresholdSize(), - options.manifestMergeMinCount(), - partitionType.getFieldCount() > 0 && options.dynamicPartitionOverwrite(), - options.branch(), + this::newScan, newStatsFileHandler(), bucketMode(), - options.scanManifestParallelism(), createCommitPreCallbacks(table), createCommitCallbacks(commitUser, table), - options.commitMaxRetries(), - options.commitTimeout(), - options.commitMinRetryWait(), - options.commitMaxRetryWait(), - options.rowTrackingEnabled(), - options.commitDiscardDuplicateFiles(), conflictDetectFactory, - strictModeChecker, rollback); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 983f541446a0..7cc0266b4357 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -54,7 +54,6 @@ import org.apache.paimon.operation.commit.SuccessCommitResult; import org.apache.paimon.operation.metrics.CommitMetrics; import org.apache.paimon.operation.metrics.CommitStats; -import org.apache.paimon.options.MemorySize; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.partition.PartitionStatistics; import org.apache.paimon.predicate.Predicate; @@ -91,6 +90,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.function.Supplier; import java.util.stream.Collectors; import static java.util.Collections.emptyList; @@ -135,7 +135,6 @@ public class FileStoreCommitImpl implements FileStoreCommit { private final String commitUser; private final RowType partitionType; private final CoreOptions options; - private final String partitionDefaultName; private final FileStorePathFactory pathFactory; private final SnapshotManager snapshotManager; private final ManifestFile manifestFile; @@ -143,23 +142,12 @@ public class FileStoreCommitImpl implements FileStoreCommit { private final IndexManifestFile indexManifestFile; @Nullable private final CommitRollback rollback; private final CommitScanner scanner; - private final int numBucket; - private final MemorySize manifestTargetSize; - private final MemorySize manifestFullCompactionSize; - private final int manifestMergeMinCount; - private final boolean dynamicPartitionOverwrite; - private final String branchName; - @Nullable private final Integer manifestReadParallelism; private final List commitPreCallbacks; private final List commitCallbacks; private final StatsFileHandler statsFileHandler; private final BucketMode bucketMode; - private final long commitTimeout; private final RetryWaiter retryWaiter; - private final int commitMaxRetries; private final InternalRowPartitionComputer partitionComputer; - private final boolean rowTrackingEnabled; - private final boolean discardDuplicateFiles; @Nullable private final StrictModeChecker strictModeChecker; private final ConflictDetection conflictDetection; private final CommitCleaner commitCleaner; @@ -176,32 +164,17 @@ public FileStoreCommitImpl( String commitUser, RowType partitionType, CoreOptions options, - String partitionDefaultName, FileStorePathFactory pathFactory, SnapshotManager snapshotManager, ManifestFile.Factory manifestFileFactory, ManifestList.Factory manifestListFactory, IndexManifestFile.Factory indexManifestFileFactory, - FileStoreScan scan, - int numBucket, - MemorySize manifestTargetSize, - MemorySize manifestFullCompactionSize, - int manifestMergeMinCount, - boolean dynamicPartitionOverwrite, - String branchName, + Supplier scanSupplier, StatsFileHandler statsFileHandler, BucketMode bucketMode, - @Nullable Integer manifestReadParallelism, List commitPreCallbacks, List commitCallbacks, - int commitMaxRetries, - long commitTimeout, - long commitMinRetryWait, - long commitMaxRetryWait, - boolean rowTrackingEnabled, - boolean discardDuplicateFiles, ConflictDetection.Factory conflictDetectFactory, - @Nullable StrictModeChecker strictModeChecker, @Nullable CommitRollback rollback) { this.snapshotCommit = snapshotCommit; this.fileIO = fileIO; @@ -210,26 +183,17 @@ public FileStoreCommitImpl( this.commitUser = commitUser; this.partitionType = partitionType; this.options = options; - this.partitionDefaultName = partitionDefaultName; this.pathFactory = pathFactory; this.snapshotManager = snapshotManager; this.manifestFile = manifestFileFactory.create(); this.manifestList = manifestListFactory.create(); this.indexManifestFile = indexManifestFileFactory.create(); this.rollback = rollback; - this.scanner = new CommitScanner(scan, indexManifestFile, options); - this.numBucket = numBucket; - this.manifestTargetSize = manifestTargetSize; - this.manifestFullCompactionSize = manifestFullCompactionSize; - this.manifestMergeMinCount = manifestMergeMinCount; - this.dynamicPartitionOverwrite = dynamicPartitionOverwrite; - this.branchName = branchName; - this.manifestReadParallelism = manifestReadParallelism; + this.scanner = new CommitScanner(scanSupplier.get(), indexManifestFile, options); this.commitPreCallbacks = commitPreCallbacks; this.commitCallbacks = commitCallbacks; - this.commitMaxRetries = commitMaxRetries; - this.commitTimeout = commitTimeout; - this.retryWaiter = new RetryWaiter(commitMinRetryWait, commitMaxRetryWait); + this.retryWaiter = + new RetryWaiter(options.commitMinRetryWait(), options.commitMaxRetryWait()); this.partitionComputer = new InternalRowPartitionComputer( options.partitionDefaultName(), @@ -240,9 +204,16 @@ public FileStoreCommitImpl( this.commitMetrics = null; this.statsFileHandler = statsFileHandler; this.bucketMode = bucketMode; - this.rowTrackingEnabled = rowTrackingEnabled; - this.discardDuplicateFiles = discardDuplicateFiles; - this.strictModeChecker = strictModeChecker; + this.strictModeChecker = + options.commitStrictModeLastSafeSnapshot() + .map( + id -> + new StrictModeChecker( + snapshotManager, + commitUser, + scanSupplier.get(), + id)) + .orElse(null); this.conflictDetection = conflictDetectFactory.create(scanner); this.commitCleaner = new CommitCleaner(manifestList, manifestFile, indexManifestFile); } @@ -468,7 +439,7 @@ public int overwritePartition( boolean skipOverwrite = false; // partition filter is built from static or dynamic partition according to properties PartitionPredicate partitionFilter = null; - if (dynamicPartitionOverwrite) { + if (partitionType.getFieldCount() > 0 && options.dynamicPartitionOverwrite()) { if (changes.appendTableFiles.isEmpty()) { // in dynamic mode, if there is no changes to commit, no data will be deleted skipOverwrite = true; @@ -482,7 +453,8 @@ public int overwritePartition( } else { // partition may be partial partition fields, so here must use predicate way. Predicate partitionPredicate = - createPartitionPredicate(partition, partitionType, partitionDefaultName); + createPartitionPredicate( + partition, partitionType, options.partitionDefaultName()); partitionFilter = PartitionPredicate.fromPredicate(partitionType, partitionPredicate); // sanity check, all changes must be done within the given partition @@ -613,16 +585,19 @@ public void dropPartitions(List> partitions, long commitIden PartitionPredicate partitionFilter; if (fullMode) { List binaryPartitions = - createBinaryPartitions(partitions, partitionType, partitionDefaultName); + createBinaryPartitions( + partitions, partitionType, options.partitionDefaultName()); partitionFilter = PartitionPredicate.fromMultiple(partitionType, binaryPartitions); } else { - // partitions may be partial partition fields, so here must to use predicate way. + // partitions may be partial partition fields, so here must use predicate way. Predicate predicate = partitions.stream() .map( partition -> createPartitionPredicate( - partition, partitionType, partitionDefaultName)) + partition, + partitionType, + options.partitionDefaultName())) .reduce(PredicateBuilder::or) .orElseThrow( () -> new RuntimeException("Failed to get partition filter.")); @@ -688,7 +663,7 @@ public FileIO fileIO() { } private ManifestEntryChanges collectChanges(List commitMessages) { - ManifestEntryChanges changes = new ManifestEntryChanges(numBucket); + ManifestEntryChanges changes = new ManifestEntryChanges(options.bucket()); commitMessages.forEach(changes::collect); LOG.info("Finished collecting changes, including: {}", changes); return changes; @@ -730,12 +705,12 @@ private int tryCommit( retryResult = (RetryCommitResult) result; - if (System.currentTimeMillis() - startMillis > commitTimeout - || retryCount >= commitMaxRetries) { + if (System.currentTimeMillis() - startMillis > options.commitTimeout() + || retryCount >= options.commitMaxRetries()) { String message = String.format( "Commit failed after %s millis with %s retries, there maybe exist commit conflicts between multiple jobs.", - commitTimeout, retryCount); + options.commitTimeout(), retryCount); throw new RuntimeException(message, retryResult.exception); } @@ -761,7 +736,11 @@ private int tryOverwritePartition( return tryCommit( latestSnapshot -> scanner.readOverwriteChanges( - numBucket, changes, indexFiles, latestSnapshot, partitionFilter), + options.bucket(), + changes, + indexFiles, + latestSnapshot, + partitionFilter), identifier, watermark, properties, @@ -836,7 +815,8 @@ CommitResult tryCommitOnce( } List baseDataFiles = new ArrayList<>(); - boolean discardDuplicate = discardDuplicateFiles && commitKind == CommitKind.APPEND; + boolean discardDuplicate = + options.commitDiscardDuplicateFiles() && commitKind == CommitKind.APPEND; if (latestSnapshot != null && (discardDuplicate || detectConflicts)) { // latestSnapshotId is different from the snapshot id we've checked for conflicts, // so we have to check again @@ -920,14 +900,14 @@ CommitResult tryCommitOnce( ManifestFileMerger.merge( mergeBeforeManifests, manifestFile, - manifestTargetSize.getBytes(), - manifestMergeMinCount, - manifestFullCompactionSize.getBytes(), + options.manifestTargetSize().getBytes(), + options.manifestMergeMinCount(), + options.manifestFullCompactionThresholdSize().getBytes(), partitionType, - manifestReadParallelism); + options.scanManifestParallelism()); baseManifestList = manifestList.write(mergeAfterManifests); - if (rowTrackingEnabled) { + if (options.rowTrackingEnabled()) { RowTrackingAssigned assigned = assignRowTracking(newSnapshotId, firstRowIdStart, deltaFiles); nextRowIdStart = assigned.nextRowIdStart; @@ -1097,12 +1077,12 @@ public void compactManifest() { break; } - if (System.currentTimeMillis() - startMillis > commitTimeout - || retryCount >= commitMaxRetries) { + if (System.currentTimeMillis() - startMillis > options.commitTimeout() + || retryCount >= options.commitMaxRetries()) { throw new RuntimeException( String.format( "Commit failed after %s millis with %s retries, there maybe exist commit conflicts between multiple jobs.", - commitTimeout, retryCount)); + options.commitTimeout(), retryCount)); } retryWaiter.retryWait(retryCount); @@ -1126,11 +1106,11 @@ private boolean compactManifestOnce() { ManifestFileMerger.merge( mergeBeforeManifests, manifestFile, - manifestTargetSize.getBytes(), + options.manifestTargetSize().getBytes(), 1, 1, partitionType, - manifestReadParallelism); + options.scanManifestParallelism()); if (new HashSet<>(mergeBeforeManifests).equals(new HashSet<>(mergeAfterManifests))) { // no need to commit this snapshot, because no compact were happened @@ -1173,7 +1153,7 @@ private boolean commitSnapshotImpl(Snapshot newSnapshot, List de for (PartitionEntry entry : deltaStatistics) { statistics.add(entry.toPartitionStatistics(partitionComputer)); } - return snapshotCommit.commit(newSnapshot, branchName, statistics); + return snapshotCommit.commit(newSnapshot, options.branch(), statistics); } catch (Throwable e) { // exception when performing the atomic rename, // we cannot clean up because we can't determine the success diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java index e2d76c2327c7..b0084b70beec 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java @@ -26,10 +26,7 @@ import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.utils.SnapshotManager; -import javax.annotation.Nullable; - import java.util.Iterator; -import java.util.function.Supplier; /** A checker to check strict mode based on last safe snapshot. */ public class StrictModeChecker { @@ -51,19 +48,6 @@ public StrictModeChecker( this.strictModeLastSafeSnapshot = strictModeLastSafeSnapshot; } - @Nullable - public static StrictModeChecker create( - SnapshotManager snapshotManager, - String commitUser, - Supplier scanSupplier, - @Nullable Long strictModeLastSafeSnapshot) { - if (strictModeLastSafeSnapshot == null) { - return null; - } - return new StrictModeChecker( - snapshotManager, commitUser, scanSupplier.get(), strictModeLastSafeSnapshot); - } - public void check(long newSnapshotId, CommitKind newCommitKind) { for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId; id++) { Snapshot snapshot = snapshotManager.snapshot(id);