From d274193200dfb3404a36afa1a2e0d940e5519c4d Mon Sep 17 00:00:00 2001 From: "xiyu.zk" Date: Tue, 28 Apr 2026 17:44:20 +0800 Subject: [PATCH] [spark] Refactor BatchWrite subclasses into base logic + per-version wrappers --- .../spark/format/FormatTableBatchWrite.scala | 49 ++++++++ .../paimon/spark/write/PaimonBatchWrite.scala | 112 +++--------------- .../spark/sql/paimon/shims/Spark4Shim.scala | 20 ++++ .../format/FormatTableBatchWriteBase.scala} | 95 ++++----------- .../spark/format/PaimonFormatTable.scala | 107 +---------------- ...Write.scala => PaimonBatchWriteBase.scala} | 40 ++++--- .../paimon/spark/write/PaimonV2Write.scala | 8 +- .../spark/sql/paimon/shims/SparkShim.scala | 24 ++++ .../spark/format/FormatTableBatchWrite.scala | 47 ++++++++ .../paimon/spark/write/PaimonBatchWrite.scala | 51 ++++++++ .../spark/sql/paimon/shims/Spark3Shim.scala | 20 ++++ .../spark/format/FormatTableBatchWrite.scala | 49 ++++++++ .../paimon/spark/write/PaimonBatchWrite.scala | 53 +++++++++ .../spark/sql/paimon/shims/Spark4Shim.scala | 20 ++++ 14 files changed, 403 insertions(+), 292 deletions(-) create mode 100644 paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala rename paimon-spark/{paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala => paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWriteBase.scala} (55%) rename paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/{PaimonBatchWrite.scala => PaimonBatchWriteBase.scala} (76%) create mode 100644 paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala create mode 100644 paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala create mode 100644 paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala create mode 100644 paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala new file mode 100644 index 000000000000..c05349c50473 --- /dev/null +++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.format + +import org.apache.paimon.table.FormatTable + +import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage} +import org.apache.spark.sql.types.StructType + +/** + * Spark 4.0-compatible shadow of the `paimon-spark4-common` `FormatTableBatchWrite`. Compiled + * against 4.0.2 so its class file's method table does not carry the `commit(.., WriteSummary)` + * signature added by Spark 4.1's `BatchWrite` default method, avoiding `ClassNotFoundException: + * WriteSummary` lazy-linking on 4.0 task serialization. + */ +class FormatTableBatchWrite( + table: FormatTable, + overwriteDynamic: Option[Boolean], + overwritePartitions: Option[Map[String, String]], + writeSchema: StructType) + extends FormatTableBatchWriteBase(table, overwriteDynamic, overwritePartitions, writeSchema) + with BatchWrite + with Serializable { + + override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = + createFormatTableDataWriterFactory() + + override def useCommitCoordinator(): Boolean = false + + override def commit(messages: Array[WriterCommitMessage]): Unit = commitMessages(messages) + + override def abort(messages: Array[WriterCommitMessage]): Unit = abortMessages(messages) +} diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala index d546eebf4c1b..56b9d4c4fe6e 100644 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala +++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala @@ -18,121 +18,37 @@ package org.apache.paimon.spark.write -import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement} -import org.apache.paimon.spark.catalyst.Compatibility -import org.apache.paimon.spark.commands.SparkDataFileMeta -import org.apache.paimon.spark.metric.SparkMetricRegistry import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan import org.apache.paimon.table.FileStoreTable -import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, CommitMessageImpl} -import org.apache.spark.sql.PaimonSparkSession import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage} -import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.StructType -import java.util.Collections - -import scala.collection.JavaConverters._ - -case class PaimonBatchWrite( +/** + * Spark-4.0 shadow wrapper. Source-identical to the `paimon-spark4-common` version but compiled + * against Spark 4.0.2; the maven shade order picks `paimon-spark-4.0/target/classes` ahead of the + * shaded 4-common copy, so the class metadata loaded at runtime does not include the 4.1-only + * `BatchWrite.commit(.., WriteSummary)` signature that triggers `ClassNotFoundException` via + * `ObjectStreamClass.getPrivateMethod` during Spark task serialization. + */ +class PaimonBatchWrite( table: FileStoreTable, writeSchema: StructType, dataSchema: StructType, overwritePartitions: Option[Map[String, String]], copyOnWriteScan: Option[PaimonCopyOnWriteScan]) - extends BatchWrite - with WriteHelper { - - protected val metricRegistry = SparkMetricRegistry() + extends PaimonBatchWriteBase(table, writeSchema, dataSchema, overwritePartitions, copyOnWriteScan) + with BatchWrite + with Serializable { - protected val batchWriteBuilder: BatchWriteBuilder = { - val builder = table.newBatchWriteBuilder() - overwritePartitions.foreach(partitions => builder.withOverwrite(partitions.asJava)) - builder - } - - override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = { - (_: Int, _: Long) => - { - PaimonV2DataWriter( - batchWriteBuilder, - writeSchema, - dataSchema, - coreOptions, - table.catalogEnvironment().catalogContext()) - } - } + override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = + createPaimonDataWriterFactory(info) override def useCommitCoordinator(): Boolean = false - override def commit(messages: Array[WriterCommitMessage]): Unit = { - logInfo(s"Committing to table ${table.name()}") - val batchTableCommit = batchWriteBuilder.newCommit() - batchTableCommit.withMetricRegistry(metricRegistry) - val addCommitMessage = WriteTaskResult.merge(messages) - val deletedCommitMessage = copyOnWriteScan match { - case Some(scan) => buildDeletedCommitMessage(scan.scannedFiles) - case None => Seq.empty - } - val commitMessages = addCommitMessage ++ deletedCommitMessage - try { - val start = System.currentTimeMillis() - batchTableCommit.commit(commitMessages.asJava) - logInfo(s"Committed in ${System.currentTimeMillis() - start} ms") - } finally { - batchTableCommit.close() - } - postDriverMetrics() - postCommit(commitMessages) - } - - // Spark support v2 write driver metrics since 4.0, see https://github.com/apache/spark/pull/48573 - // To ensure compatibility with 3.x, manually post driver metrics here instead of using Spark's API. - protected def postDriverMetrics(): Unit = { - val spark = PaimonSparkSession.active - // todo: find a more suitable way to get metrics. - val commitMetrics = metricRegistry.buildSparkCommitMetrics() - val executionId = spark.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - val executionMetrics = Compatibility.getExecutionMetrics(spark, executionId.toLong).distinct - val metricUpdates = executionMetrics.flatMap { - m => - commitMetrics.find(x => m.metricType.toLowerCase.contains(x.name.toLowerCase)) match { - case Some(customTaskMetric) => Some((m.accumulatorId, customTaskMetric.value())) - case None => None - } - } - SQLMetrics.postDriverMetricsUpdatedByValue(spark.sparkContext, executionId, metricUpdates) - } + override def commit(messages: Array[WriterCommitMessage]): Unit = commitMessages(messages) override def abort(messages: Array[WriterCommitMessage]): Unit = { // TODO clean uncommitted files } - - private def buildDeletedCommitMessage( - deletedFiles: Seq[SparkDataFileMeta]): Seq[CommitMessage] = { - logInfo(s"[V2 Write] Building deleted commit message for ${deletedFiles.size} files") - deletedFiles - .groupBy(f => (f.partition, f.bucket)) - .map { - case ((partition, bucket), files) => - val deletedDataFileMetas = files.map(_.dataFileMeta).toList.asJava - - new CommitMessageImpl( - partition, - bucket, - files.head.totalBuckets, - new DataIncrement( - Collections.emptyList[DataFileMeta], - deletedDataFileMetas, - Collections.emptyList[DataFileMeta]), - new CompactIncrement( - Collections.emptyList[DataFileMeta], - Collections.emptyList[DataFileMeta], - Collections.emptyList[DataFileMeta]) - ) - } - .toSeq - } } diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala index 3b581e325fa8..e708fb1bddb8 100644 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala +++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala @@ -22,6 +22,10 @@ import org.apache.paimon.data.variant.{GenericVariant, Variant} import org.apache.paimon.spark.catalyst.analysis.Spark4ResolutionRules import org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark4SqlExtensionsParser import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow, Spark4InternalRowWithBlob, SparkArrayData, SparkInternalRow} +import org.apache.paimon.spark.format.FormatTableBatchWrite +import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan +import org.apache.paimon.spark.write.PaimonBatchWrite +import org.apache.paimon.table.{FileStoreTable, FormatTable} import org.apache.paimon.types.{DataType, RowType} import org.apache.hadoop.conf.Configuration @@ -38,6 +42,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.ArrayData import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.write.BatchWrite import org.apache.spark.sql.execution.SparkFormatTable import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, PartitionSpec} import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex} @@ -101,6 +106,21 @@ class Spark4Shim extends SparkShim { tableCatalog.createTable(ident, columns, partitions, properties) } + override def createPaimonBatchWrite( + table: FileStoreTable, + writeSchema: StructType, + dataSchema: StructType, + overwritePartitions: Option[Map[String, String]], + copyOnWriteScan: Option[PaimonCopyOnWriteScan]): BatchWrite = + new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions, copyOnWriteScan) + + override def createFormatTableBatchWrite( + table: FormatTable, + overwriteDynamic: Option[Boolean], + overwritePartitions: Option[Map[String, String]], + writeSchema: StructType): BatchWrite = + new FormatTableBatchWrite(table, overwriteDynamic, overwritePartitions, writeSchema) + override def createCTERelationRef( cteId: Long, resolved: Boolean, diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWriteBase.scala similarity index 55% rename from paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala rename to paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWriteBase.scala index 4f55579a68c0..853c7f41467b 100644 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWriteBase.scala @@ -18,89 +18,38 @@ package org.apache.paimon.spark.format -import org.apache.paimon.format.csv.CsvOptions -import org.apache.paimon.spark.{BaseTable, FormatTableScanBuilder, SparkInternalRowWrapper} -import org.apache.paimon.spark.write.{BaseV2WriteBuilder, FormatTableWriteTaskResult, V2DataWrite, WriteTaskResult} +import org.apache.paimon.spark.SparkInternalRowWrapper +import org.apache.paimon.spark.write.{FormatTableWriteTaskResult, V2DataWrite, WriteTaskResult} import org.apache.paimon.table.FormatTable import org.apache.paimon.table.sink.{BatchTableWrite, BatchWriteBuilder, CommitMessage} -import org.apache.paimon.types.RowType import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, TableCapability, TableCatalog} -import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE, OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC} -import org.apache.spark.sql.connector.read.ScanBuilder -import org.apache.spark.sql.connector.write._ -import org.apache.spark.sql.connector.write.streaming.StreamingWrite +import org.apache.spark.sql.connector.write.{DataWriterFactory, WriterCommitMessage} import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap - -import java.util -import java.util.Locale import scala.collection.JavaConverters._ -case class PaimonFormatTable(table: FormatTable) - extends BaseTable - with SupportsRead - with SupportsWrite { - - override def capabilities(): util.Set[TableCapability] = { - util.EnumSet.of(BATCH_READ, BATCH_WRITE, OVERWRITE_DYNAMIC, OVERWRITE_BY_FILTER) - } - - override def properties: util.Map[String, String] = { - val properties = new util.HashMap[String, String](table.options()) - properties.put(TableCatalog.PROP_PROVIDER, table.format.name().toLowerCase(Locale.ROOT)) - if (table.comment.isPresent) { - properties.put(TableCatalog.PROP_COMMENT, table.comment.get) - } - if (FormatTable.Format.CSV == table.format) { - properties.put( - "sep", - properties.getOrDefault( - CsvOptions.FIELD_DELIMITER.key(), - CsvOptions.FIELD_DELIMITER.defaultValue())) - } - properties - } - - override def newScanBuilder(caseInsensitiveStringMap: CaseInsensitiveStringMap): ScanBuilder = { - val scanBuilder = FormatTableScanBuilder(table.copy(caseInsensitiveStringMap)) - scanBuilder.pruneColumns(schema) - scanBuilder - } - - override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { - PaimonFormatTableWriterBuilder(table, info.schema) - } -} - -case class PaimonFormatTableWriterBuilder(table: FormatTable, writeSchema: StructType) - extends BaseV2WriteBuilder(table) { - - override def partitionRowType(): RowType = table.partitionType - - override def build: Write = new Write() { - override def toBatch: BatchWrite = { - FormatTableBatchWrite(table, overwriteDynamic, overwritePartitions, writeSchema) - } - - override def toStreaming: StreamingWrite = { - throw new UnsupportedOperationException("FormatTable doesn't support streaming write") - } - } -} - -private case class FormatTableBatchWrite( +/** + * Business logic for `FormatTable` batch writes, deliberately *not* extending + * `org.apache.spark.sql.connector.write.BatchWrite`. See + * [[org.apache.paimon.spark.write.PaimonBatchWriteBase]] for the full rationale: Spark 4.1 added a + * default method `BatchWrite.commit(.., WriteSummary)` whose `WriteSummary` parameter type is + * unavailable on Spark 4.0, so a class compiled against 4.1 that mixes in `BatchWrite` triggers + * `ClassNotFoundException: WriteSummary` lazy-linking on 4.0 runtimes during Spark task + * serialization. Keeping this base off `BatchWrite` lets common ship the implementation once; + * per-version `paimon-spark{3,4}-common` modules supply a thin wrapper that mixes in `BatchWrite`, + * and `paimon-spark-4.0/src/main` shadows that wrapper at the 4.0.2 compile target. + */ +abstract class FormatTableBatchWriteBase( table: FormatTable, overwriteDynamic: Option[Boolean], overwritePartitions: Option[Map[String, String]], writeSchema: StructType) - extends BatchWrite - with Logging { + extends Logging + with Serializable { - private val batchWriteBuilder = { + protected val batchWriteBuilder: BatchWriteBuilder = { val builder = table.newBatchWriteBuilder() // todo: add test for static overwrite the whole table if (overwriteDynamic.contains(true)) { @@ -111,13 +60,11 @@ private case class FormatTableBatchWrite( builder } - override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = { + protected def createFormatTableDataWriterFactory(): DataWriterFactory = { (_: Int, _: Long) => new FormatTableDataWriter(batchWriteBuilder, writeSchema) } - override def useCommitCoordinator(): Boolean = false - - override def commit(messages: Array[WriterCommitMessage]): Unit = { + protected def commitMessages(messages: Array[WriterCommitMessage]): Unit = { logInfo(s"Committing to FormatTable ${table.name()}") val batchTableCommit = batchWriteBuilder.newCommit() val commitMessages = WriteTaskResult.merge(messages).asJava @@ -132,7 +79,7 @@ private case class FormatTableBatchWrite( } } - override def abort(messages: Array[WriterCommitMessage]): Unit = { + protected def abortMessages(messages: Array[WriterCommitMessage]): Unit = { logInfo(s"Aborting write to FormatTable ${table.name()}") val batchTableCommit = batchWriteBuilder.newCommit() val commitMessages = WriteTaskResult.merge(messages).asJava diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala index 4f55579a68c0..dc7effd39f72 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala @@ -19,27 +19,23 @@ package org.apache.paimon.spark.format import org.apache.paimon.format.csv.CsvOptions -import org.apache.paimon.spark.{BaseTable, FormatTableScanBuilder, SparkInternalRowWrapper} -import org.apache.paimon.spark.write.{BaseV2WriteBuilder, FormatTableWriteTaskResult, V2DataWrite, WriteTaskResult} +import org.apache.paimon.spark.{BaseTable, FormatTableScanBuilder} +import org.apache.paimon.spark.write.BaseV2WriteBuilder import org.apache.paimon.table.FormatTable -import org.apache.paimon.table.sink.{BatchTableWrite, BatchWriteBuilder, CommitMessage} import org.apache.paimon.types.RowType -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, TableCapability, TableCatalog} import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE, OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC} import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write._ import org.apache.spark.sql.connector.write.streaming.StreamingWrite +import org.apache.spark.sql.paimon.shims.SparkShimLoader import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap import java.util import java.util.Locale -import scala.collection.JavaConverters._ - case class PaimonFormatTable(table: FormatTable) extends BaseTable with SupportsRead @@ -83,7 +79,8 @@ case class PaimonFormatTableWriterBuilder(table: FormatTable, writeSchema: Struc override def build: Write = new Write() { override def toBatch: BatchWrite = { - FormatTableBatchWrite(table, overwriteDynamic, overwritePartitions, writeSchema) + SparkShimLoader.shim + .createFormatTableBatchWrite(table, overwriteDynamic, overwritePartitions, writeSchema) } override def toStreaming: StreamingWrite = { @@ -91,97 +88,3 @@ case class PaimonFormatTableWriterBuilder(table: FormatTable, writeSchema: Struc } } } - -private case class FormatTableBatchWrite( - table: FormatTable, - overwriteDynamic: Option[Boolean], - overwritePartitions: Option[Map[String, String]], - writeSchema: StructType) - extends BatchWrite - with Logging { - - private val batchWriteBuilder = { - val builder = table.newBatchWriteBuilder() - // todo: add test for static overwrite the whole table - if (overwriteDynamic.contains(true)) { - builder.withOverwrite() - } else { - overwritePartitions.foreach(partitions => builder.withOverwrite(partitions.asJava)) - } - builder - } - - override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = { - (_: Int, _: Long) => new FormatTableDataWriter(batchWriteBuilder, writeSchema) - } - - override def useCommitCoordinator(): Boolean = false - - override def commit(messages: Array[WriterCommitMessage]): Unit = { - logInfo(s"Committing to FormatTable ${table.name()}") - val batchTableCommit = batchWriteBuilder.newCommit() - val commitMessages = WriteTaskResult.merge(messages).asJava - try { - val start = System.currentTimeMillis() - batchTableCommit.commit(commitMessages) - logInfo(s"Committed in ${System.currentTimeMillis() - start} ms") - } catch { - case e: Exception => - logError("Failed to commit FormatTable writes", e) - throw e - } - } - - override def abort(messages: Array[WriterCommitMessage]): Unit = { - logInfo(s"Aborting write to FormatTable ${table.name()}") - val batchTableCommit = batchWriteBuilder.newCommit() - val commitMessages = WriteTaskResult.merge(messages).asJava - batchTableCommit.abort(commitMessages) - } -} - -private class FormatTableDataWriter(batchWriteBuilder: BatchWriteBuilder, writeSchema: StructType) - extends V2DataWrite - with Logging { - - private val rowConverter: InternalRow => org.apache.paimon.data.InternalRow = { - val numFields = writeSchema.fields.length - record => { - new SparkInternalRowWrapper(writeSchema, numFields).replace(record) - } - } - - private val write: BatchTableWrite = batchWriteBuilder.newWrite() - - override def write(record: InternalRow): Unit = { - val paimonRow = rowConverter.apply(record) - write.write(paimonRow) - } - - override def commitImpl(): Seq[CommitMessage] = { - write.prepareCommit().asScala.toSeq - } - - def buildWriteTaskResult(commitMessages: Seq[CommitMessage]): FormatTableWriteTaskResult = { - FormatTableWriteTaskResult(commitMessages) - } - - override def commit: FormatTableWriteTaskResult = { - super.commit.asInstanceOf[FormatTableWriteTaskResult] - } - - override def abort(): Unit = { - logInfo("Aborting FormatTable data writer") - close() - } - - override def close(): Unit = { - try { - write.close() - } catch { - case e: Exception => - logError("Error closing FormatTableDataWriter", e) - throw new RuntimeException(e) - } - } -} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala similarity index 76% rename from paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala rename to paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala index 92aeae031343..3600fcabc925 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala @@ -27,7 +27,7 @@ import org.apache.paimon.table.FileStoreTable import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, CommitMessageImpl} import org.apache.spark.sql.PaimonSparkSession -import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage} +import org.apache.spark.sql.connector.write.{DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.StructType @@ -36,14 +36,26 @@ import java.util.Collections import scala.collection.JavaConverters._ -case class PaimonBatchWrite( - table: FileStoreTable, - writeSchema: StructType, - dataSchema: StructType, - overwritePartitions: Option[Map[String, String]], - copyOnWriteScan: Option[PaimonCopyOnWriteScan]) - extends BatchWrite - with WriteHelper { +/** + * Business logic for Paimon batch writes, deliberately *not* extending + * `org.apache.spark.sql.connector.write.BatchWrite`. Spark 4.1 added a default method + * `BatchWrite.commit(WriterCommitMessage[], WriteSummary)` whose `WriteSummary` parameter type does + * not exist on Spark 4.0; a class compiled against 4.1 that declares `extends BatchWrite` carries + * the inherited `commit(.., WriteSummary)` signature in its method table, which JVM + * `ObjectStreamClass.getPrivateMethod` lazy-links during Spark task serialization, crashing on 4.0 + * with `ClassNotFoundException: WriteSummary`. Keeping this base off `BatchWrite` lets common ship + * the implementation once; per-version `paimon-spark{3,4}-common` modules supply a thin wrapper + * that mixes in `BatchWrite`, and `paimon-spark-4.0/src/main` shadows that wrapper at the 4.0.2 + * compile target. + */ +abstract class PaimonBatchWriteBase( + val table: FileStoreTable, + val writeSchema: StructType, + val dataSchema: StructType, + val overwritePartitions: Option[Map[String, String]], + val copyOnWriteScan: Option[PaimonCopyOnWriteScan]) + extends WriteHelper + with Serializable { protected val metricRegistry = SparkMetricRegistry() @@ -53,7 +65,7 @@ case class PaimonBatchWrite( builder } - override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = { + protected def createPaimonDataWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = { (_: Int, _: Long) => { PaimonV2DataWriter( @@ -65,9 +77,7 @@ case class PaimonBatchWrite( } } - override def useCommitCoordinator(): Boolean = false - - override def commit(messages: Array[WriterCommitMessage]): Unit = { + protected def commitMessages(messages: Array[WriterCommitMessage]): Unit = { logInfo(s"Committing to table ${table.name()}") val batchTableCommit = batchWriteBuilder.newCommit() batchTableCommit.withMetricRegistry(metricRegistry) @@ -106,10 +116,6 @@ case class PaimonBatchWrite( SQLMetrics.postDriverMetricsUpdatedByValue(spark.sparkContext, executionId, metricUpdates) } - override def abort(messages: Array[WriterCommitMessage]): Unit = { - // TODO clean uncommitted files - } - private def buildDeletedCommitMessage( deletedFiles: Seq[SparkDataFileMeta]): Seq[CommitMessage] = { logInfo(s"[V2 Write] Building deleted commit message for ${deletedFiles.size} files") diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala index e4676ae4af1f..c7a9a9ff3ab8 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.connector.distributions.Distribution import org.apache.spark.sql.connector.expressions.SortOrder import org.apache.spark.sql.connector.metric.CustomMetric import org.apache.spark.sql.connector.write._ +import org.apache.spark.sql.paimon.shims.SparkShimLoader import org.apache.spark.sql.types.StructType import scala.collection.mutable @@ -62,7 +63,12 @@ class PaimonV2Write( } override def toBatch: BatchWrite = { - PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions, copyOnWriteScan) + SparkShimLoader.shim.createPaimonBatchWrite( + table, + writeSchema, + dataSchema, + overwritePartitions, + copyOnWriteScan) } override def supportedCustomMetrics(): Array[CustomMetric] = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala index 7a541a451401..31e44e727b65 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.paimon.shims import org.apache.paimon.data.variant.Variant import org.apache.paimon.spark.data.{SparkArrayData, SparkInternalRow} +import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan +import org.apache.paimon.table.{FileStoreTable, FormatTable} import org.apache.paimon.types.{DataType, RowType} import org.apache.spark.sql.SparkSession @@ -32,6 +34,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.ArrayData import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.write.BatchWrite import org.apache.spark.sql.types.StructType import java.util.{Map => JMap} @@ -65,6 +68,27 @@ trait SparkShim { partitions: Array[Transform], properties: JMap[String, String]): Table + /** + * Constructs a `BatchWrite` for Paimon's V2 write path. The implementation lives in each + * per-version shim module so the `extends BatchWrite` mixin is compiled against the right Spark + * minor version: Spark 4.1 added a default method `BatchWrite.commit(.., WriteSummary)` whose + * inherited signature triggers `ClassNotFoundException: WriteSummary` lazy-linking on Spark 4.0 + * runtimes when the class is loaded for task serialization. + */ + def createPaimonBatchWrite( + table: FileStoreTable, + writeSchema: StructType, + dataSchema: StructType, + overwritePartitions: Option[Map[String, String]], + copyOnWriteScan: Option[PaimonCopyOnWriteScan]): BatchWrite + + /** Same `BatchWrite` mixin problem as [[createPaimonBatchWrite]], but for `FormatTable` writes. */ + def createFormatTableBatchWrite( + table: FormatTable, + overwriteDynamic: Option[Boolean], + overwritePartitions: Option[Map[String, String]], + writeSchema: StructType): BatchWrite + def createCTERelationRef( cteId: Long, resolved: Boolean, diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala new file mode 100644 index 000000000000..d13c73737025 --- /dev/null +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.format + +import org.apache.paimon.table.FormatTable + +import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage} +import org.apache.spark.sql.types.StructType + +/** + * Spark-3.x thin wrapper that mixes `BatchWrite` into [[FormatTableBatchWriteBase]]. See the base + * class scaladoc for why the inheritance lives here rather than in `paimon-spark-common`. + */ +class FormatTableBatchWrite( + table: FormatTable, + overwriteDynamic: Option[Boolean], + overwritePartitions: Option[Map[String, String]], + writeSchema: StructType) + extends FormatTableBatchWriteBase(table, overwriteDynamic, overwritePartitions, writeSchema) + with BatchWrite + with Serializable { + + override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = + createFormatTableDataWriterFactory() + + override def useCommitCoordinator(): Boolean = false + + override def commit(messages: Array[WriterCommitMessage]): Unit = commitMessages(messages) + + override def abort(messages: Array[WriterCommitMessage]): Unit = abortMessages(messages) +} diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala new file mode 100644 index 000000000000..fd46fa258561 --- /dev/null +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.write + +import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan +import org.apache.paimon.table.FileStoreTable + +import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage} +import org.apache.spark.sql.types.StructType + +/** + * Spark-3.x thin wrapper that mixes `BatchWrite` into [[PaimonBatchWriteBase]]. See the base class + * scaladoc for why the inheritance lives here rather than in `paimon-spark-common`. + */ +class PaimonBatchWrite( + table: FileStoreTable, + writeSchema: StructType, + dataSchema: StructType, + overwritePartitions: Option[Map[String, String]], + copyOnWriteScan: Option[PaimonCopyOnWriteScan]) + extends PaimonBatchWriteBase(table, writeSchema, dataSchema, overwritePartitions, copyOnWriteScan) + with BatchWrite + with Serializable { + + override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = + createPaimonDataWriterFactory(info) + + override def useCommitCoordinator(): Boolean = false + + override def commit(messages: Array[WriterCommitMessage]): Unit = commitMessages(messages) + + override def abort(messages: Array[WriterCommitMessage]): Unit = { + // TODO clean uncommitted files + } +} diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala index 5f989f525d04..28c7ea1da2c8 100644 --- a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala @@ -22,6 +22,10 @@ import org.apache.paimon.data.variant.Variant import org.apache.paimon.spark.catalyst.analysis.Spark3ResolutionRules import org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark3SqlExtensionsParser import org.apache.paimon.spark.data.{Spark3ArrayData, Spark3InternalRow, Spark3InternalRowWithBlob, SparkArrayData, SparkInternalRow} +import org.apache.paimon.spark.format.FormatTableBatchWrite +import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan +import org.apache.paimon.spark.write.PaimonBatchWrite +import org.apache.paimon.table.{FileStoreTable, FormatTable} import org.apache.paimon.types.{DataType, RowType} import org.apache.hadoop.conf.Configuration @@ -41,6 +45,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.ArrayData import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.write.BatchWrite import org.apache.spark.sql.execution.SparkFormatTable import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, PartitionSpec} import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex} @@ -85,6 +90,21 @@ class Spark3Shim extends SparkShim { tableCatalog.createTable(ident, schema, partitions, properties) } + override def createPaimonBatchWrite( + table: FileStoreTable, + writeSchema: StructType, + dataSchema: StructType, + overwritePartitions: Option[Map[String, String]], + copyOnWriteScan: Option[PaimonCopyOnWriteScan]): BatchWrite = + new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions, copyOnWriteScan) + + override def createFormatTableBatchWrite( + table: FormatTable, + overwriteDynamic: Option[Boolean], + overwritePartitions: Option[Map[String, String]], + writeSchema: StructType): BatchWrite = + new FormatTableBatchWrite(table, overwriteDynamic, overwritePartitions, writeSchema) + override def createCTERelationRef( cteId: Long, resolved: Boolean, diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala new file mode 100644 index 000000000000..4fa58c95a753 --- /dev/null +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.format + +import org.apache.paimon.table.FormatTable + +import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage} +import org.apache.spark.sql.types.StructType + +/** + * Spark-4.x thin wrapper that mixes `BatchWrite` into [[FormatTableBatchWriteBase]]. See the base + * class scaladoc for why the inheritance lives here rather than in `paimon-spark-common`. A + * duplicate of this file lives at `paimon-spark-4.0/src/main` so that a 4.0.2 compile target + * produces a class file whose method table does not reference `WriteSummary` (4.1-only). + */ +class FormatTableBatchWrite( + table: FormatTable, + overwriteDynamic: Option[Boolean], + overwritePartitions: Option[Map[String, String]], + writeSchema: StructType) + extends FormatTableBatchWriteBase(table, overwriteDynamic, overwritePartitions, writeSchema) + with BatchWrite + with Serializable { + + override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = + createFormatTableDataWriterFactory() + + override def useCommitCoordinator(): Boolean = false + + override def commit(messages: Array[WriterCommitMessage]): Unit = commitMessages(messages) + + override def abort(messages: Array[WriterCommitMessage]): Unit = abortMessages(messages) +} diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala new file mode 100644 index 000000000000..ca813e1108c0 --- /dev/null +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.write + +import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan +import org.apache.paimon.table.FileStoreTable + +import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage} +import org.apache.spark.sql.types.StructType + +/** + * Spark-4.x thin wrapper that mixes `BatchWrite` into [[PaimonBatchWriteBase]]. See the base class + * scaladoc for why the inheritance lives here rather than in `paimon-spark-common`. A duplicate of + * this file lives at `paimon-spark-4.0/src/main` so that a 4.0.2 compile target produces a class + * file whose method table does not reference `WriteSummary` (4.1-only). + */ +class PaimonBatchWrite( + table: FileStoreTable, + writeSchema: StructType, + dataSchema: StructType, + overwritePartitions: Option[Map[String, String]], + copyOnWriteScan: Option[PaimonCopyOnWriteScan]) + extends PaimonBatchWriteBase(table, writeSchema, dataSchema, overwritePartitions, copyOnWriteScan) + with BatchWrite + with Serializable { + + override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = + createPaimonDataWriterFactory(info) + + override def useCommitCoordinator(): Boolean = false + + override def commit(messages: Array[WriterCommitMessage]): Unit = commitMessages(messages) + + override def abort(messages: Array[WriterCommitMessage]): Unit = { + // TODO clean uncommitted files + } +} diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala index 01b6d8d79b15..999edd6b930b 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala @@ -22,6 +22,10 @@ import org.apache.paimon.data.variant.{GenericVariant, Variant} import org.apache.paimon.spark.catalyst.analysis.Spark4ResolutionRules import org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark4SqlExtensionsParser import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow, Spark4InternalRowWithBlob, SparkArrayData, SparkInternalRow} +import org.apache.paimon.spark.format.FormatTableBatchWrite +import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan +import org.apache.paimon.spark.write.PaimonBatchWrite +import org.apache.paimon.table.{FileStoreTable, FormatTable} import org.apache.paimon.types.{DataType, RowType} import org.apache.hadoop.conf.Configuration @@ -38,6 +42,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.ArrayData import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.write.BatchWrite import org.apache.spark.sql.execution.SparkFormatTable import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, PartitionSpec} import org.apache.spark.sql.execution.streaming.runtime.MetadataLogFileIndex @@ -85,6 +90,21 @@ class Spark4Shim extends SparkShim { tableCatalog.createTable(ident, columns, partitions, properties) } + override def createPaimonBatchWrite( + table: FileStoreTable, + writeSchema: StructType, + dataSchema: StructType, + overwritePartitions: Option[Map[String, String]], + copyOnWriteScan: Option[PaimonCopyOnWriteScan]): BatchWrite = + new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions, copyOnWriteScan) + + override def createFormatTableBatchWrite( + table: FormatTable, + overwriteDynamic: Option[Boolean], + overwritePartitions: Option[Map[String, String]], + writeSchema: StructType): BatchWrite = + new FormatTableBatchWrite(table, overwriteDynamic, overwritePartitions, writeSchema) + override def createCTERelationRef( cteId: Long, resolved: Boolean,