diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala index b1af5a2d88f5..95ae7471daf8 100644 --- a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala +++ b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala @@ -18,20 +18,27 @@ package org.apache.paimon.spark.catalyst.analysis -import org.apache.paimon.spark.SparkTable -import org.apache.paimon.spark.commands.MergeIntoPaimonTable - import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, MergeIntoTable} /** A post-hoc resolution rule for MergeInto. */ case class PaimonMergeInto(spark: SparkSession) extends PaimonMergeIntoBase { - override def resolveNotMatchedBySourceActions( - merge: MergeIntoTable, - targetOutput: Seq[AttributeReference], - dataEvolutionEnabled: Boolean): Seq[MergeAction] = { + /** + * Align all MergeActions in a MergeIntoTable based on the target table's output attributes. + * Returns a new MergeIntoTable with aligned matchedActions and notMatchedActions. + */ + override def alignMergeIntoTable( + m: MergeIntoTable, + targetOutput: Seq[Attribute]): MergeIntoTable = { + m.copy( + matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput)), + notMatchedActions = m.notMatchedActions.map(alignMergeAction(_, targetOutput)) + ) + } + + override def resolveNotMatchedBySourceActions(merge: MergeIntoTable): Seq[MergeAction] = { Seq.empty } } diff --git a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala index b1af5a2d88f5..a92d13cc0b35 100644 --- a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala +++ b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala @@ -18,20 +18,28 @@ package org.apache.paimon.spark.catalyst.analysis -import org.apache.paimon.spark.SparkTable -import org.apache.paimon.spark.commands.MergeIntoPaimonTable - import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, MergeIntoTable} /** A post-hoc resolution rule for MergeInto. */ case class PaimonMergeInto(spark: SparkSession) extends PaimonMergeIntoBase { - override def resolveNotMatchedBySourceActions( - merge: MergeIntoTable, - targetOutput: Seq[AttributeReference], - dataEvolutionEnabled: Boolean): Seq[MergeAction] = { + /** + * Align all MergeActions in a MergeIntoTable based on the target table's output attributes. + * Returns a new MergeIntoTable with aligned matchedActions and notMatchedActions. + */ + override def alignMergeIntoTable( + m: MergeIntoTable, + targetOutput: Seq[Attribute]): MergeIntoTable = { + m.copy( + matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput)), + notMatchedActions = m.notMatchedActions.map(alignMergeAction(_, targetOutput)) + ) + } + + override def resolveNotMatchedBySourceActions(merge: MergeIntoTable): Seq[MergeAction] = { Seq.empty } + } diff --git a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala index c83ee5493867..c8ae09a26be4 100644 --- a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala +++ b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala @@ -61,3 +61,43 @@ class MergeIntoAppendNonBucketedTableTest super.sparkConf.set("spark.paimon.write.use-v2-write", "false") } } + +class V2MergeIntoPrimaryKeyBucketedTableTest + extends MergeIntoTableTestBase + with MergeIntoPrimaryKeyTableTest + with MergeIntoNotMatchedBySourceTest + with PaimonPrimaryKeyBucketedTableTest { + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.paimon.write.use-v2-write", "true") + } +} + +class V2MergeIntoPrimaryKeyNonBucketTableTest + extends MergeIntoTableTestBase + with MergeIntoPrimaryKeyTableTest + with MergeIntoNotMatchedBySourceTest + with PaimonPrimaryKeyNonBucketTableTest { + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.paimon.write.use-v2-write", "true") + } +} + +class V2MergeIntoAppendBucketedTableTest + extends MergeIntoTableTestBase + with MergeIntoAppendTableTest + with MergeIntoNotMatchedBySourceTest + with PaimonAppendBucketedTableTest { + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.paimon.write.use-v2-write", "true") + } +} + +class V2MergeIntoAppendNonBucketedTableTest + extends MergeIntoTableTestBase + with MergeIntoAppendTableTest + with MergeIntoNotMatchedBySourceTest + with PaimonAppendNonBucketTableTest { + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.paimon.write.use-v2-write", "true") + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala index 13ce64b86b16..2404f1f49f10 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala @@ -89,21 +89,6 @@ trait AssignmentAlignmentHelper extends SQLConfHelper with ExpressionHelper { } } - /** - * Align all MergeActions in a MergeIntoTable based on the target table's output attributes. - * Returns a new MergeIntoTable with aligned matchedActions, notMatchedActions, and - * notMatchedBySourceActions. - */ - protected def alignMergeIntoTable( - m: MergeIntoTable, - targetOutput: Seq[Attribute]): MergeIntoTable = { - m.copy( - matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput)), - notMatchedActions = m.notMatchedActions.map(alignMergeAction(_, targetOutput)), - notMatchedBySourceActions = m.notMatchedBySourceActions.map(alignMergeAction(_, targetOutput)) - ) - } - private def recursiveAlignUpdates( targetAttrs: Seq[NamedExpression], updates: Seq[AttrUpdate], diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala index d6023c2f6920..45916be76136 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeInto.scala @@ -19,17 +19,28 @@ package org.apache.paimon.spark.catalyst.analysis import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, MergeIntoTable} /** A post-hoc resolution rule for MergeInto. */ case class PaimonMergeInto(spark: SparkSession) extends PaimonMergeIntoBase { - override def resolveNotMatchedBySourceActions( - merge: MergeIntoTable, - targetOutput: Seq[AttributeReference], - dataEvolutionEnabled: Boolean): Seq[MergeAction] = { - merge.notMatchedBySourceActions.map( - checkAndAlignActionAssignment(_, targetOutput, dataEvolutionEnabled)) + /** + * Align all MergeActions in a MergeIntoTable based on the target table's output attributes. + * Returns a new MergeIntoTable with aligned matchedActions, notMatchedActions, and + * notMatchedBySourceActions. + */ + override def alignMergeIntoTable( + m: MergeIntoTable, + targetOutput: Seq[Attribute]): MergeIntoTable = { + m.copy( + matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput)), + notMatchedActions = m.notMatchedActions.map(alignMergeAction(_, targetOutput)), + notMatchedBySourceActions = m.notMatchedBySourceActions.map(alignMergeAction(_, targetOutput)) + ) + } + + override def resolveNotMatchedBySourceActions(merge: MergeIntoTable): Seq[MergeAction] = { + merge.notMatchedBySourceActions } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala index f9a186e70b9f..84560543a2d9 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala @@ -23,7 +23,7 @@ import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper import org.apache.paimon.spark.commands.{MergeIntoPaimonDataEvolutionTable, MergeIntoPaimonTable} import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, SubqueryExpression} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule @@ -62,64 +62,34 @@ trait PaimonMergeIntoBase updateActions, primaryKeys) } - val alignedMatchedActions = - merge.matchedActions.map( - checkAndAlignActionAssignment(_, targetOutput, dataEvolutionEnabled)) - val alignedNotMatchedActions = - merge.notMatchedActions.map( - checkAndAlignActionAssignment(_, targetOutput, dataEvolutionEnabled)) - val alignedNotMatchedBySourceActions = - resolveNotMatchedBySourceActions(merge, targetOutput, dataEvolutionEnabled) - - if (dataEvolutionEnabled) { - MergeIntoPaimonDataEvolutionTable( - v2Table, - merge.targetTable, - merge.sourceTable, - merge.mergeCondition, - alignedMatchedActions, - alignedNotMatchedActions, - alignedNotMatchedBySourceActions - ) - } else { - MergeIntoPaimonTable( - v2Table, - merge.targetTable, - merge.sourceTable, - merge.mergeCondition, - alignedMatchedActions, - alignedNotMatchedActions, - alignedNotMatchedBySourceActions - ) - } - } - } - - def resolveNotMatchedBySourceActions( - merge: MergeIntoTable, - targetOutput: Seq[AttributeReference], - dataEvolutionEnabled: Boolean): Seq[MergeAction] - protected def checkAndAlignActionAssignment( - action: MergeAction, - targetOutput: Seq[AttributeReference], - dataEvolutionEnabled: Boolean): MergeAction = { - action match { - case d @ DeleteAction(_) => d - case u @ UpdateAction(_, assignments) => - u.copy(assignments = alignAssignments(targetOutput, assignments)) + val alignedMergeIntoTable = alignMergeIntoTable(merge, targetOutput) - case i @ InsertAction(_, assignments) => - i.copy(assignments = alignAssignments(targetOutput, assignments, isInsert = true)) - - case _: UpdateStarAction => - throw new RuntimeException(s"UpdateStarAction should not be here.") - - case _: InsertStarAction => - throw new RuntimeException(s"InsertStarAction should not be here.") - - case _ => - throw new RuntimeException(s"Can't recognize this action: $action") + if (!shouldFallbackToV1MergeInto(alignedMergeIntoTable)) { + alignedMergeIntoTable + } else { + if (dataEvolutionEnabled) { + MergeIntoPaimonDataEvolutionTable( + v2Table, + merge.targetTable, + merge.sourceTable, + merge.mergeCondition, + alignedMergeIntoTable.matchedActions, + alignedMergeIntoTable.notMatchedActions, + resolveNotMatchedBySourceActions(alignedMergeIntoTable) + ) + } else { + MergeIntoPaimonTable( + v2Table, + merge.targetTable, + merge.sourceTable, + merge.mergeCondition, + alignedMergeIntoTable.matchedActions, + alignedMergeIntoTable.notMatchedActions, + resolveNotMatchedBySourceActions(alignedMergeIntoTable) + ) + } + } } } @@ -156,4 +126,8 @@ trait PaimonMergeIntoBase throw new RuntimeException("Can't update the primary key column in update clause.") } } + + def resolveNotMatchedBySourceActions(merge: MergeIntoTable): Seq[MergeAction] + + def alignMergeIntoTable(m: MergeIntoTable, targetOutput: Seq[Attribute]): MergeIntoTable } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala index 8ec139b60715..b3b09292270f 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala @@ -24,7 +24,7 @@ import org.apache.paimon.table.{FileStoreTable, Table} import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, BinaryExpression, EqualTo, Expression, SubqueryExpression} -import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable} +import org.apache.spark.sql.catalyst.plans.logical.{Assignment, MergeIntoTable, UpdateTable} trait RowLevelHelper extends SQLConfHelper with AssignmentAlignmentHelper { @@ -102,4 +102,13 @@ trait RowLevelHelper extends SQLConfHelper with AssignmentAlignmentHelper { !updateTable.rewritable || !updateTable.aligned } + + /** Determines if DataSourceV2 merge into is not supported for the given table. */ + protected def shouldFallbackToV1MergeInto(m: MergeIntoTable): Boolean = { + val relation = PaimonRelation.getPaimonRelation(m.targetTable) + val table = relation.table.asInstanceOf[SparkTable] + shouldFallbackToV1(table) || + !m.rewritable || + !m.aligned + } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala index 11bd72e360aa..9b44094c23c7 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala @@ -42,6 +42,9 @@ import scala.collection.mutable /** * Note: The [[pushedPartitionFilters]] and [[pushedDataFilters]] are intentionally set to empty * because file-level filtering is handled through Spark's runtime V2 filtering mechanism. + * + * When Spark's runtime filter is not applied (e.g., when NOT MATCHED BY SOURCE is present in + * MergeInto), this scan will read all data from the table. */ case class PaimonCopyOnWriteScan( table: FileStoreTable, @@ -51,10 +54,29 @@ case class PaimonCopyOnWriteScan( extends BaseScan with SupportsRuntimeV2Filtering { - override def inputSplits: Array[Split] = dataSplits.asInstanceOf[Array[Split]] + // Track whether filter() has been called + @volatile private var filterApplied: Boolean = false + + private val filteredFileNames: mutable.Set[String] = mutable.Set[String]() + + override def inputSplits: Array[Split] = { + loadSplits() + dataSplits.asInstanceOf[Array[Split]] + } var dataSplits: Array[DataSplit] = Array() + private def loadSplits(): Unit = { + val snapshotReader = table.newSnapshotReader() + if (table.coreOptions().manifestDeleteFileDropStats()) { + snapshotReader.dropStats() + } + if (filterApplied) { + snapshotReader.withDataFileNameFilter(fileName => filteredFileNames.contains(fileName)) + } + dataSplits = snapshotReader.read().splits().asScala.collect { case s: DataSplit => s }.toArray + } + def scannedFiles: Seq[SparkDataFileMeta] = { dataSplits .flatMap(dataSplit => convertToSparkDataFileMeta(dataSplit, dataSplit.totalBuckets())) @@ -66,9 +88,9 @@ case class PaimonCopyOnWriteScan( } override def filter(predicates: Array[SparkPredicate]): Unit = { - val filteredFileNames: mutable.Set[String] = mutable.Set[String]() - val runtimefilters: Array[Filter] = predicates.flatMap(PaimonUtils.filterV2ToV1) - for (filter <- runtimefilters) { + filterApplied = true + val runtimeFilters: Array[Filter] = predicates.flatMap(PaimonUtils.filterV2ToV1) + for (filter <- runtimeFilters) { filter match { case in: In if in.attribute.equalsIgnoreCase(FILE_PATH_COLUMN) => for (value <- in.values) { @@ -78,14 +100,6 @@ case class PaimonCopyOnWriteScan( case _ => logWarning("Unsupported runtime filter") } } - - val snapshotReader = table.newSnapshotReader() - if (table.coreOptions().manifestDeleteFileDropStats()) { - snapshotReader.dropStats() - } - - snapshotReader.withDataFileNameFilter(fileName => filteredFileNames.contains(fileName)) - dataSplits = snapshotReader.read().splits().asScala.collect { case s: DataSplit => s }.toArray } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala index ee4799f56cab..28e1b63fd2d8 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala @@ -528,7 +528,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase with PaimonTab createTable("target", "a INT, b INT, c STRING", Seq("a")) spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')") - val error = intercept[RuntimeException] { + val error = intercept[Exception] { spark.sql(s""" |MERGE INTO target |USING source @@ -539,7 +539,11 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase with PaimonTab |THEN INSERT (a, b, c) values (a, b, c) |""".stripMargin) }.getMessage - assert(error.contains("match more than one source rows")) + // V1 path: "match more than one source rows" + // V2 path: "MERGE_CARDINALITY_VIOLATION" + assert( + error.contains("match more than one source rows") || + error.contains("MERGE_CARDINALITY_VIOLATION")) } }