diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala deleted file mode 100644 index 832291e37952..000000000000 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.spark.util - -import org.apache.paimon.spark.SparkTable -import org.apache.paimon.spark.schema.PaimonMetadataColumn.{PATH_AND_INDEX_META_COLUMNS, ROW_TRACKING_META_COLUMNS} -import org.apache.paimon.table.{InnerTable, KnownSplitsTable} -import org.apache.paimon.table.source.Split - -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.catalyst.SQLConfHelper -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.functions.col - -trait ScanPlanHelper extends SQLConfHelper { - - /** Create a new scan plan from a relation with the given data splits, condition(optional). */ - def createNewScanPlan( - dataSplits: Seq[Split], - relation: DataSourceV2Relation, - condition: Option[Expression]): LogicalPlan = { - val newRelation = createNewScanPlan(dataSplits, relation) - condition match { - case Some(c) if c != TrueLiteral => Filter(c, newRelation) - case _ => newRelation - } - } - - def createNewScanPlan( - dataSplits: Seq[Split], - relation: DataSourceV2Relation): DataSourceV2Relation = { - relation.table match { - case sparkTable @ SparkTable(table: InnerTable) => - val knownSplitsTable = KnownSplitsTable.create(table, dataSplits.toArray) - relation.copy(table = sparkTable.copy(table = knownSplitsTable)) - case _ => throw new RuntimeException() - } - } - - def selectWithDvMeta(data: DataFrame): DataFrame = { - selectWithAdditionalCols(data, PATH_AND_INDEX_META_COLUMNS) - } - - def selectWithRowTracking(data: DataFrame): DataFrame = { - selectWithAdditionalCols(data, ROW_TRACKING_META_COLUMNS) - } - - private def selectWithAdditionalCols(data: DataFrame, additionalCols: Seq[String]): DataFrame = { - val dataColNames = data.schema.names - val mergedColNames = dataColNames ++ additionalCols.filterNot(dataColNames.contains) - data.select(mergedColNames.map(col): _*) - } -} - -/** This wrapper is only used in java code, e.g. Procedure. */ -object ScanPlanHelper extends ScanPlanHelper { - def createNewScanPlan(dataSplits: Array[Split], relation: DataSourceV2Relation): LogicalPlan = { - ScanPlanHelper.createNewScanPlan(dataSplits.toSeq, relation) - } -} diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala deleted file mode 100644 index 9fb3a7b54a25..000000000000 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.catalog.CatalogUtils -import org.apache.spark.sql.catalyst.plans.logical.TableSpec -import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH - -trait PaimonStrategyHelper { - - def spark: SparkSession - - protected def makeQualifiedDBObjectPath(location: String): String = { - CatalogUtils.makeQualifiedDBObjectPath( - spark.sharedState.conf.get(WAREHOUSE_PATH), - location, - spark.sharedState.hadoopConf) - } - - protected def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = { - tableSpec.copy(location = tableSpec.location.map(makeQualifiedDBObjectPath(_))) - } - -} diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala deleted file mode 100644 index 61e25b7c16a9..000000000000 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.shim - -import org.apache.paimon.CoreOptions -import org.apache.paimon.iceberg.IcebergOptions -import org.apache.paimon.spark.SparkCatalog -import org.apache.paimon.spark.catalog.FormatTableCatalog - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, TableSpec} -import org.apache.spark.sql.connector.catalog.StagingTableCatalog -import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan, SparkStrategy} -import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec - -import scala.collection.JavaConverters._ - -case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) - extends SparkStrategy - with PaimonStrategyHelper { - - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - - override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case CreateTableAsSelect( - ResolvedIdentifier(catalog: SparkCatalog, ident), - parts, - query, - tableSpec: TableSpec, - options, - ifNotExists, - true) => - catalog match { - case _: StagingTableCatalog => - throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.") - case _ => - val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq - - // Include Iceberg compatibility options in table properties (fix for DataFrame writer options) - val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq - - val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys - - val (tableOptions, writeOptions) = options.partition { - case (key, _) => allTableOptionKeys.contains(key) - } - val newTableSpec = tableSpec.copy(properties = tableSpec.properties ++ tableOptions) - - val isPartitionedFormatTable = { - catalog match { - case catalog: FormatTableCatalog => - catalog.isFormatTable(newTableSpec.provider.orNull) && parts.nonEmpty - case _ => false - } - } - - if (isPartitionedFormatTable) { - throw new UnsupportedOperationException( - "Using CTAS with partitioned format table is not supported yet.") - } - - CreateTableAsSelectExec( - catalog.asTableCatalog, - ident, - parts, - query, - qualifyLocInTableSpec(newTableSpec), - writeOptions, - ifNotExists) :: Nil - } - case _ => Nil - } -} diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala index 3b581e325fa8..dd3e89bc3f62 100644 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala +++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, SubstituteUnreso import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef, LogicalPlan, MergeAction, MergeIntoTable, MergeRows, SubqueryAlias, UnresolvedWith} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef, LogicalPlan, MergeAction, MergeIntoTable, MergeRows, SubqueryAlias, TableSpec, UnresolvedWith} import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Keep import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.ArrayData @@ -40,6 +40,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.SparkFormatTable import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, PartitionSpec} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataTypes, StructType, VariantType} @@ -101,6 +102,19 @@ class Spark4Shim extends SparkShim { tableCatalog.createTable(ident, columns, partitions, properties) } + override def copyDataSourceV2Relation( + relation: DataSourceV2Relation, + newTable: Table): DataSourceV2Relation = + relation.copy(table = newTable) + + override def copyTableSpecLocation(spec: TableSpec, location: Option[String]): TableSpec = + spec.copy(location = location) + + override def copyTableSpecProperties( + spec: TableSpec, + properties: Map[String, String]): TableSpec = + spec.copy(properties = properties) + override def createCTERelationRef( cteId: Long, resolved: Boolean, diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala index 832291e37952..dfe0b88ef197 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.functions.col +import org.apache.spark.sql.paimon.shims.SparkShimLoader trait ScanPlanHelper extends SQLConfHelper { @@ -51,7 +52,9 @@ trait ScanPlanHelper extends SQLConfHelper { relation.table match { case sparkTable @ SparkTable(table: InnerTable) => val knownSplitsTable = KnownSplitsTable.create(table, dataSplits.toArray) - relation.copy(table = sparkTable.copy(table = knownSplitsTable)) + SparkShimLoader.shim.copyDataSourceV2Relation( + relation, + sparkTable.copy(table = knownSplitsTable)) case _ => throw new RuntimeException() } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala index 9fb3a7b54a25..82fa8bb08b65 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonStrategyHelper.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.catalyst.plans.logical.TableSpec import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH +import org.apache.spark.sql.paimon.shims.SparkShimLoader trait PaimonStrategyHelper { @@ -35,7 +36,9 @@ trait PaimonStrategyHelper { } protected def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = { - tableSpec.copy(location = tableSpec.location.map(makeQualifiedDBObjectPath(_))) + SparkShimLoader.shim.copyTableSpecLocation( + tableSpec, + tableSpec.location.map(makeQualifiedDBObjectPath(_))) } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala index 61e25b7c16a9..b707b773cc13 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, Logical import org.apache.spark.sql.connector.catalog.StagingTableCatalog import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan, SparkStrategy} import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec +import org.apache.spark.sql.paimon.shims.SparkShimLoader import scala.collection.JavaConverters._ @@ -61,7 +62,8 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) val (tableOptions, writeOptions) = options.partition { case (key, _) => allTableOptionKeys.contains(key) } - val newTableSpec = tableSpec.copy(properties = tableSpec.properties ++ tableOptions) + val newTableSpec = SparkShimLoader.shim + .copyTableSpecProperties(tableSpec, tableSpec.properties ++ tableOptions) val isPartitionedFormatTable = { catalog match { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala index 7a541a451401..7417d46ddb68 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala @@ -27,11 +27,12 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.logical.{CTERelationRef, LogicalPlan, MergeAction, MergeIntoTable, SubqueryAlias, UnresolvedWith} +import org.apache.spark.sql.catalyst.plans.logical.{CTERelationRef, LogicalPlan, MergeAction, MergeIntoTable, SubqueryAlias, TableSpec, UnresolvedWith} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.ArrayData import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.types.StructType import java.util.{Map => JMap} @@ -65,6 +66,27 @@ trait SparkShim { partitions: Array[Transform], properties: JMap[String, String]): Table + /** + * Returns a `DataSourceV2Relation` like `relation` but with `table` replaced. Spark 4.1 added + * `Option[TimeTravelSpec]` as the 6th field of `DataSourceV2Relation`, so a `relation.copy(table + * = ...)` call compiled against 4.1.1 emits a `copy$default$6` reference that crashes with + * `NoSuchMethodError` on Spark 4.0 runtime. Routing through this factory lets each per-version + * SparkShim implementation generate the matching copy bytecode. + */ + def copyDataSourceV2Relation( + relation: DataSourceV2Relation, + newTable: Table): DataSourceV2Relation + + /** + * Returns a `TableSpec` like `spec` but with `location` replaced. Spark 4.1 widened `TableSpec` + * from 8 to 9 fields (added `Seq[Constraint]`), so a `spec.copy(location = ...)` call compiled + * against 4.1.1 emits a `copy$default$9` reference that crashes on Spark 4.0 runtime. + */ + def copyTableSpecLocation(spec: TableSpec, location: Option[String]): TableSpec + + /** Same arity-mismatch problem as `copyTableSpecLocation`, but for `properties`. */ + def copyTableSpecProperties(spec: TableSpec, properties: Map[String, String]): TableSpec + def createCTERelationRef( cteId: Long, resolved: Boolean, diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala index 5f989f525d04..f8e06c320fc2 100644 --- a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, SubstituteUnreso import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef, LogicalPlan, MergeAction, MergeIntoTable, SubqueryAlias, UnresolvedWith} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef, LogicalPlan, MergeAction, MergeIntoTable, SubqueryAlias, TableSpec, UnresolvedWith} // NOTE: `MergeRows` / `MergeRows.Keep` were introduced in Spark 3.4. We access them only via // reflection inside the `mergeRowsKeep*` method bodies so that loading `Spark3Shim` does not fail // on Spark 3.2 / 3.3 runtimes that still ship `paimon-spark3-common` (the module targets 3.5.8 at @@ -43,6 +43,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.SparkFormatTable import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, PartitionSpec} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -85,6 +86,19 @@ class Spark3Shim extends SparkShim { tableCatalog.createTable(ident, schema, partitions, properties) } + override def copyDataSourceV2Relation( + relation: DataSourceV2Relation, + newTable: Table): DataSourceV2Relation = + relation.copy(table = newTable) + + override def copyTableSpecLocation(spec: TableSpec, location: Option[String]): TableSpec = + spec.copy(location = location) + + override def copyTableSpecProperties( + spec: TableSpec, + properties: Map[String, String]): TableSpec = + spec.copy(properties = properties) + override def createCTERelationRef( cteId: Long, resolved: Boolean, diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala index 01b6d8d79b15..6441c60021d7 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.analysis.CTESubstitution import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef, LogicalPlan, MergeAction, MergeIntoTable, MergeRows, SubqueryAlias, UnresolvedWith} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef, LogicalPlan, MergeAction, MergeIntoTable, MergeRows, SubqueryAlias, TableSpec, UnresolvedWith} import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Copy, Insert, Keep, Update} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.ArrayData @@ -40,6 +40,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.SparkFormatTable import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, PartitionSpec} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.streaming.runtime.MetadataLogFileIndex import org.apache.spark.sql.execution.streaming.sinks.FileStreamSink import org.apache.spark.sql.internal.SQLConf @@ -85,6 +86,19 @@ class Spark4Shim extends SparkShim { tableCatalog.createTable(ident, columns, partitions, properties) } + override def copyDataSourceV2Relation( + relation: DataSourceV2Relation, + newTable: Table): DataSourceV2Relation = + relation.copy(table = newTable) + + override def copyTableSpecLocation(spec: TableSpec, location: Option[String]): TableSpec = + spec.copy(location = location) + + override def copyTableSpecProperties( + spec: TableSpec, + properties: Map[String, String]): TableSpec = + spec.copy(properties = properties) + override def createCTERelationRef( cteId: Long, resolved: Boolean,