Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,27 @@

package org.apache.paimon.spark.catalyst.analysis

import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.commands.MergeIntoPaimonTable

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, MergeIntoTable}

/** A post-hoc resolution rule for MergeInto. */
case class PaimonMergeInto(spark: SparkSession) extends PaimonMergeIntoBase {

override def resolveNotMatchedBySourceActions(
merge: MergeIntoTable,
targetOutput: Seq[AttributeReference],
dataEvolutionEnabled: Boolean): Seq[MergeAction] = {
/**
* Align all MergeActions in a MergeIntoTable based on the target table's output attributes.
* Returns a new MergeIntoTable with aligned matchedActions and notMatchedActions.
*/
override def alignMergeIntoTable(
m: MergeIntoTable,
targetOutput: Seq[Attribute]): MergeIntoTable = {
m.copy(
matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput)),
notMatchedActions = m.notMatchedActions.map(alignMergeAction(_, targetOutput))
)
}

override def resolveNotMatchedBySourceActions(merge: MergeIntoTable): Seq[MergeAction] = {
Seq.empty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,28 @@

package org.apache.paimon.spark.catalyst.analysis

import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.commands.MergeIntoPaimonTable

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, MergeIntoTable}

/** A post-hoc resolution rule for MergeInto. */
case class PaimonMergeInto(spark: SparkSession) extends PaimonMergeIntoBase {

override def resolveNotMatchedBySourceActions(
merge: MergeIntoTable,
targetOutput: Seq[AttributeReference],
dataEvolutionEnabled: Boolean): Seq[MergeAction] = {
/**
* Align all MergeActions in a MergeIntoTable based on the target table's output attributes.
* Returns a new MergeIntoTable with aligned matchedActions and notMatchedActions.
*/
override def alignMergeIntoTable(
m: MergeIntoTable,
targetOutput: Seq[Attribute]): MergeIntoTable = {
m.copy(
matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput)),
notMatchedActions = m.notMatchedActions.map(alignMergeAction(_, targetOutput))
)
}

override def resolveNotMatchedBySourceActions(merge: MergeIntoTable): Seq[MergeAction] = {
Seq.empty
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,43 @@ class MergeIntoAppendNonBucketedTableTest
super.sparkConf.set("spark.paimon.write.use-v2-write", "false")
}
}

class V2MergeIntoPrimaryKeyBucketedTableTest
extends MergeIntoTableTestBase
with MergeIntoPrimaryKeyTableTest
with MergeIntoNotMatchedBySourceTest
with PaimonPrimaryKeyBucketedTableTest {
override protected def sparkConf: SparkConf = {
super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
}
}

class V2MergeIntoPrimaryKeyNonBucketTableTest
extends MergeIntoTableTestBase
with MergeIntoPrimaryKeyTableTest
with MergeIntoNotMatchedBySourceTest
with PaimonPrimaryKeyNonBucketTableTest {
override protected def sparkConf: SparkConf = {
super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
}
}

class V2MergeIntoAppendBucketedTableTest
extends MergeIntoTableTestBase
with MergeIntoAppendTableTest
with MergeIntoNotMatchedBySourceTest
with PaimonAppendBucketedTableTest {
override protected def sparkConf: SparkConf = {
super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
}
}

class V2MergeIntoAppendNonBucketedTableTest
extends MergeIntoTableTestBase
with MergeIntoAppendTableTest
with MergeIntoNotMatchedBySourceTest
with PaimonAppendNonBucketTableTest {
override protected def sparkConf: SparkConf = {
super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,6 @@ trait AssignmentAlignmentHelper extends SQLConfHelper with ExpressionHelper {
}
}

/**
* Align all MergeActions in a MergeIntoTable based on the target table's output attributes.
* Returns a new MergeIntoTable with aligned matchedActions, notMatchedActions, and
* notMatchedBySourceActions.
*/
protected def alignMergeIntoTable(
m: MergeIntoTable,
targetOutput: Seq[Attribute]): MergeIntoTable = {
m.copy(
matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput)),
notMatchedActions = m.notMatchedActions.map(alignMergeAction(_, targetOutput)),
notMatchedBySourceActions = m.notMatchedBySourceActions.map(alignMergeAction(_, targetOutput))
)
}

private def recursiveAlignUpdates(
targetAttrs: Seq[NamedExpression],
updates: Seq[AttrUpdate],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,28 @@
package org.apache.paimon.spark.catalyst.analysis

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, MergeIntoTable}

/** A post-hoc resolution rule for MergeInto. */
case class PaimonMergeInto(spark: SparkSession) extends PaimonMergeIntoBase {

override def resolveNotMatchedBySourceActions(
merge: MergeIntoTable,
targetOutput: Seq[AttributeReference],
dataEvolutionEnabled: Boolean): Seq[MergeAction] = {
merge.notMatchedBySourceActions.map(
checkAndAlignActionAssignment(_, targetOutput, dataEvolutionEnabled))
/**
* Align all MergeActions in a MergeIntoTable based on the target table's output attributes.
* Returns a new MergeIntoTable with aligned matchedActions, notMatchedActions, and
* notMatchedBySourceActions.
*/
override def alignMergeIntoTable(
m: MergeIntoTable,
targetOutput: Seq[Attribute]): MergeIntoTable = {
m.copy(
matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput)),
notMatchedActions = m.notMatchedActions.map(alignMergeAction(_, targetOutput)),
notMatchedBySourceActions = m.notMatchedBySourceActions.map(alignMergeAction(_, targetOutput))
)
}

override def resolveNotMatchedBySourceActions(merge: MergeIntoTable): Seq[MergeAction] = {
merge.notMatchedBySourceActions
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.commands.{MergeIntoPaimonDataEvolutionTable, MergeIntoPaimonTable}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule

Expand Down Expand Up @@ -62,64 +62,34 @@ trait PaimonMergeIntoBase
updateActions,
primaryKeys)
}
val alignedMatchedActions =
merge.matchedActions.map(
checkAndAlignActionAssignment(_, targetOutput, dataEvolutionEnabled))
val alignedNotMatchedActions =
merge.notMatchedActions.map(
checkAndAlignActionAssignment(_, targetOutput, dataEvolutionEnabled))
val alignedNotMatchedBySourceActions =
resolveNotMatchedBySourceActions(merge, targetOutput, dataEvolutionEnabled)

if (dataEvolutionEnabled) {
MergeIntoPaimonDataEvolutionTable(
v2Table,
merge.targetTable,
merge.sourceTable,
merge.mergeCondition,
alignedMatchedActions,
alignedNotMatchedActions,
alignedNotMatchedBySourceActions
)
} else {
MergeIntoPaimonTable(
v2Table,
merge.targetTable,
merge.sourceTable,
merge.mergeCondition,
alignedMatchedActions,
alignedNotMatchedActions,
alignedNotMatchedBySourceActions
)
}
}
}

def resolveNotMatchedBySourceActions(
merge: MergeIntoTable,
targetOutput: Seq[AttributeReference],
dataEvolutionEnabled: Boolean): Seq[MergeAction]

protected def checkAndAlignActionAssignment(
action: MergeAction,
targetOutput: Seq[AttributeReference],
dataEvolutionEnabled: Boolean): MergeAction = {
action match {
case d @ DeleteAction(_) => d
case u @ UpdateAction(_, assignments) =>
u.copy(assignments = alignAssignments(targetOutput, assignments))
val alignedMergeIntoTable = alignMergeIntoTable(merge, targetOutput)

case i @ InsertAction(_, assignments) =>
i.copy(assignments = alignAssignments(targetOutput, assignments, isInsert = true))

case _: UpdateStarAction =>
throw new RuntimeException(s"UpdateStarAction should not be here.")

case _: InsertStarAction =>
throw new RuntimeException(s"InsertStarAction should not be here.")

case _ =>
throw new RuntimeException(s"Can't recognize this action: $action")
if (!shouldFallbackToV1MergeInto(alignedMergeIntoTable)) {
alignedMergeIntoTable
} else {
if (dataEvolutionEnabled) {
MergeIntoPaimonDataEvolutionTable(
v2Table,
merge.targetTable,
merge.sourceTable,
merge.mergeCondition,
alignedMergeIntoTable.matchedActions,
alignedMergeIntoTable.notMatchedActions,
resolveNotMatchedBySourceActions(alignedMergeIntoTable)
)
} else {
MergeIntoPaimonTable(
v2Table,
merge.targetTable,
merge.sourceTable,
merge.mergeCondition,
alignedMergeIntoTable.matchedActions,
alignedMergeIntoTable.notMatchedActions,
resolveNotMatchedBySourceActions(alignedMergeIntoTable)
)
}
}
}
}

Expand Down Expand Up @@ -156,4 +126,8 @@ trait PaimonMergeIntoBase
throw new RuntimeException("Can't update the primary key column in update clause.")
}
}

def resolveNotMatchedBySourceActions(merge: MergeIntoTable): Seq[MergeAction]

def alignMergeIntoTable(m: MergeIntoTable, targetOutput: Seq[Attribute]): MergeIntoTable
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.paimon.table.{FileStoreTable, Table}

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, BinaryExpression, EqualTo, Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable}
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, MergeIntoTable, UpdateTable}

trait RowLevelHelper extends SQLConfHelper with AssignmentAlignmentHelper {

Expand Down Expand Up @@ -102,4 +102,13 @@ trait RowLevelHelper extends SQLConfHelper with AssignmentAlignmentHelper {
!updateTable.rewritable ||
!updateTable.aligned
}

/** Determines if DataSourceV2 merge into is not supported for the given table. */
protected def shouldFallbackToV1MergeInto(m: MergeIntoTable): Boolean = {
val relation = PaimonRelation.getPaimonRelation(m.targetTable)
val table = relation.table.asInstanceOf[SparkTable]
shouldFallbackToV1(table) ||
!m.rewritable ||
!m.aligned
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ import scala.collection.mutable
/**
* Note: The [[pushedPartitionFilters]] and [[pushedDataFilters]] are intentionally set to empty
* because file-level filtering is handled through Spark's runtime V2 filtering mechanism.
*
* When Spark's runtime filter is not applied (e.g., when NOT MATCHED BY SOURCE is present in
* MergeInto), this scan will read all data from the table.
*/
case class PaimonCopyOnWriteScan(
table: FileStoreTable,
Expand All @@ -51,10 +54,29 @@ case class PaimonCopyOnWriteScan(
extends BaseScan
with SupportsRuntimeV2Filtering {

override def inputSplits: Array[Split] = dataSplits.asInstanceOf[Array[Split]]
// Track whether filter() has been called
@volatile private var filterApplied: Boolean = false

private val filteredFileNames: mutable.Set[String] = mutable.Set[String]()

override def inputSplits: Array[Split] = {
loadSplits()
dataSplits.asInstanceOf[Array[Split]]
}

var dataSplits: Array[DataSplit] = Array()

private def loadSplits(): Unit = {
val snapshotReader = table.newSnapshotReader()
if (table.coreOptions().manifestDeleteFileDropStats()) {
snapshotReader.dropStats()
}
if (filterApplied) {
snapshotReader.withDataFileNameFilter(fileName => filteredFileNames.contains(fileName))
}
dataSplits = snapshotReader.read().splits().asScala.collect { case s: DataSplit => s }.toArray
}

def scannedFiles: Seq[SparkDataFileMeta] = {
dataSplits
.flatMap(dataSplit => convertToSparkDataFileMeta(dataSplit, dataSplit.totalBuckets()))
Expand All @@ -66,9 +88,9 @@ case class PaimonCopyOnWriteScan(
}

override def filter(predicates: Array[SparkPredicate]): Unit = {
val filteredFileNames: mutable.Set[String] = mutable.Set[String]()
val runtimefilters: Array[Filter] = predicates.flatMap(PaimonUtils.filterV2ToV1)
for (filter <- runtimefilters) {
filterApplied = true
val runtimeFilters: Array[Filter] = predicates.flatMap(PaimonUtils.filterV2ToV1)
for (filter <- runtimeFilters) {
filter match {
case in: In if in.attribute.equalsIgnoreCase(FILE_PATH_COLUMN) =>
for (value <- in.values) {
Expand All @@ -78,14 +100,6 @@ case class PaimonCopyOnWriteScan(
case _ => logWarning("Unsupported runtime filter")
}
}

val snapshotReader = table.newSnapshotReader()
if (table.coreOptions().manifestDeleteFileDropStats()) {
snapshotReader.dropStats()
}

snapshotReader.withDataFileNameFilter(fileName => filteredFileNames.contains(fileName))
dataSplits = snapshotReader.read().splits().asScala.collect { case s: DataSplit => s }.toArray
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase with PaimonTab
createTable("target", "a INT, b INT, c STRING", Seq("a"))
spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')")

val error = intercept[RuntimeException] {
val error = intercept[Exception] {
spark.sql(s"""
|MERGE INTO target
|USING source
Expand All @@ -539,7 +539,11 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase with PaimonTab
|THEN INSERT (a, b, c) values (a, b, c)
|""".stripMargin)
}.getMessage
assert(error.contains("match more than one source rows"))
// V1 path: "match more than one source rows"
// V2 path: "MERGE_CARDINALITY_VIOLATION"
assert(
error.contains("match more than one source rows") ||
error.contains("MERGE_CARDINALITY_VIOLATION"))
}
}

Expand Down