Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, SubstituteUnreso
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef, LogicalPlan, MergeAction, MergeIntoTable, MergeRows, SubqueryAlias, UnresolvedWith}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef, LogicalPlan, MergeAction, MergeIntoTable, MergeRows, SubqueryAlias, TableSpec, UnresolvedWith}
import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Keep
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.SparkFormatTable
import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, PartitionSpec}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataTypes, StructType, VariantType}
Expand Down Expand Up @@ -101,6 +102,19 @@ class Spark4Shim extends SparkShim {
tableCatalog.createTable(ident, columns, partitions, properties)
}

override def copyDataSourceV2Relation(
relation: DataSourceV2Relation,
newTable: Table): DataSourceV2Relation =
relation.copy(table = newTable)

override def copyTableSpecLocation(spec: TableSpec, location: Option[String]): TableSpec =
spec.copy(location = location)

override def copyTableSpecProperties(
spec: TableSpec,
properties: Map[String, String]): TableSpec =
spec.copy(properties = properties)

override def createCTERelationRef(
cteId: Long,
resolved: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.paimon.shims.SparkShimLoader

trait ScanPlanHelper extends SQLConfHelper {

Expand All @@ -51,7 +52,9 @@ trait ScanPlanHelper extends SQLConfHelper {
relation.table match {
case sparkTable @ SparkTable(table: InnerTable) =>
val knownSplitsTable = KnownSplitsTable.create(table, dataSplits.toArray)
relation.copy(table = sparkTable.copy(table = knownSplitsTable))
SparkShimLoader.shim.copyDataSourceV2Relation(
relation,
sparkTable.copy(table = knownSplitsTable))
case _ => throw new RuntimeException()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.catalyst.plans.logical.TableSpec
import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
import org.apache.spark.sql.paimon.shims.SparkShimLoader

trait PaimonStrategyHelper {

Expand All @@ -35,7 +36,9 @@ trait PaimonStrategyHelper {
}

protected def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = {
tableSpec.copy(location = tableSpec.location.map(makeQualifiedDBObjectPath(_)))
SparkShimLoader.shim.copyTableSpecLocation(
tableSpec,
tableSpec.location.map(makeQualifiedDBObjectPath(_)))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, Logical
import org.apache.spark.sql.connector.catalog.StagingTableCatalog
import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan, SparkStrategy}
import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec
import org.apache.spark.sql.paimon.shims.SparkShimLoader

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -61,7 +62,8 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession)
val (tableOptions, writeOptions) = options.partition {
case (key, _) => allTableOptionKeys.contains(key)
}
val newTableSpec = tableSpec.copy(properties = tableSpec.properties ++ tableOptions)
val newTableSpec = SparkShimLoader.shim
.copyTableSpecProperties(tableSpec, tableSpec.properties ++ tableOptions)

val isPartitionedFormatTable = {
catalog match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.{CTERelationRef, LogicalPlan, MergeAction, MergeIntoTable, SubqueryAlias, UnresolvedWith}
import org.apache.spark.sql.catalyst.plans.logical.{CTERelationRef, LogicalPlan, MergeAction, MergeIntoTable, SubqueryAlias, TableSpec, UnresolvedWith}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.StructType

import java.util.{Map => JMap}
Expand Down Expand Up @@ -65,6 +66,27 @@ trait SparkShim {
partitions: Array[Transform],
properties: JMap[String, String]): Table

/**
* Returns a `DataSourceV2Relation` like `relation` but with `table` replaced. Spark 4.1 added
* `Option[TimeTravelSpec]` as the 6th field of `DataSourceV2Relation`, so a `relation.copy(table
* = ...)` call compiled against 4.1.1 emits a `copy$default$6` reference that crashes with
* `NoSuchMethodError` on Spark 4.0 runtime. Routing through this factory lets each per-version
* SparkShim implementation generate the matching copy bytecode.
*/
def copyDataSourceV2Relation(
relation: DataSourceV2Relation,
newTable: Table): DataSourceV2Relation

/**
* Returns a `TableSpec` like `spec` but with `location` replaced. Spark 4.1 widened `TableSpec`
* from 8 to 9 fields (added `Seq[Constraint]`), so a `spec.copy(location = ...)` call compiled
* against 4.1.1 emits a `copy$default$9` reference that crashes on Spark 4.0 runtime.
*/
def copyTableSpecLocation(spec: TableSpec, location: Option[String]): TableSpec

/** Same arity-mismatch problem as `copyTableSpecLocation`, but for `properties`. */
def copyTableSpecProperties(spec: TableSpec, properties: Map[String, String]): TableSpec

def createCTERelationRef(
cteId: Long,
resolved: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, SubstituteUnreso
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef, LogicalPlan, MergeAction, MergeIntoTable, SubqueryAlias, UnresolvedWith}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef, LogicalPlan, MergeAction, MergeIntoTable, SubqueryAlias, TableSpec, UnresolvedWith}
// NOTE: `MergeRows` / `MergeRows.Keep` were introduced in Spark 3.4. We access them only via
// reflection inside the `mergeRowsKeep*` method bodies so that loading `Spark3Shim` does not fail
// on Spark 3.2 / 3.3 runtimes that still ship `paimon-spark3-common` (the module targets 3.5.8 at
Expand All @@ -43,6 +43,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.SparkFormatTable
import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, PartitionSpec}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -85,6 +86,19 @@ class Spark3Shim extends SparkShim {
tableCatalog.createTable(ident, schema, partitions, properties)
}

override def copyDataSourceV2Relation(
relation: DataSourceV2Relation,
newTable: Table): DataSourceV2Relation =
relation.copy(table = newTable)

override def copyTableSpecLocation(spec: TableSpec, location: Option[String]): TableSpec =
spec.copy(location = location)

override def copyTableSpecProperties(
spec: TableSpec,
properties: Map[String, String]): TableSpec =
spec.copy(properties = properties)

override def createCTERelationRef(
cteId: Long,
resolved: Boolean,
Expand Down
Loading
Loading