feat: AQE DPP for native Parquet scans with broadcast reuse#4112
Draft
mbutrovich wants to merge 25 commits intoapache:mainfrom
Draft
feat: AQE DPP for native Parquet scans with broadcast reuse#4112mbutrovich wants to merge 25 commits intoapache:mainfrom
mbutrovich wants to merge 25 commits intoapache:mainfrom
Conversation
Open
6 tasks
…t exchangeReuseEnabled and onlyInBroadcast, create aggregate SubqueryExec for case 3.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Partially addresses #3510. Closes #4045 (V1 Parquet AQE DPP). Related PRs: #4011 (non-AQE DPP), #4053 (scalar subquery pushdown +
CometReuseSubquery), #4037 (non-AQE DPP edge case tests), #4033 (AQE DPP for Iceberg, draft).Rationale for this change
Under AQE (the default), Spark creates
SubqueryAdaptiveBroadcastExec(SAB) for DPP. Spark'sPlanAdaptiveDynamicPruningFiltersconverts these by findingBroadcastHashJoinExecin the plan. After Comet replaces it withCometBroadcastHashJoinExec, Spark's rule can't find a match. WithonlyInBroadcast=true, it replaces DPP withLiteral.TrueLiteral, disabling partition pruning. With the previousisAqeDynamicPruningFilterrejection, the scan fell back to Spark entirely, losing native acceleration for all DPP queries under AQE.What changes are included in this PR?
Two-phase SAB conversion
Spark's
PlanAdaptiveDynamicPruningFiltersruns before customqueryStageOptimizerRulesand converts SABs toTrueLiteral. We work around this in two phases:CometSubqueryAdaptiveBroadcastExecso Spark's pattern match doesn't recognize them. Only wraps inCometNativeScanExecnodes (non-Comet scans like CSV keep the original SAB for Spark to handle).exchangeReuseEnabled+ matching broadcast join:CometSubqueryBroadcastExecwired toBroadcastQueryStageExecfor broadcast reuseonlyInBroadcast=true:Literal.TrueLiteral(DPP disabled)onlyInBroadcast=false: aggregateSubqueryExec(DPP via separate execution, matching Spark's lines 68-79)Cross-stage and cross-plan broadcast search
Spark's
PlanAdaptiveDynamicPruningFiltersis constructed withrootPlan = this(the currentAdaptiveSparkPlanExec), so each ASPE (main query and each scalar subquery) gets its own rule instance pointing to itself. CustomqueryStageOptimizerRulesregistered viainjectQueryStageOptimizerRuleare shared across all ASPEs and don't get a per-ASPE rootPlan reference.We approximate Spark's behavior with two searches:
planarg toapply()): covers same-stage joins (the common case) and scalar subqueries where scan and join are under one exchangeAdaptiveExecutionContext): covers cross-stage joins in the main query where a shuffle separates the scan from the broadcast joinWhen the broadcast is not yet materialized as a
BroadcastQueryStageExec(cross-stage case), we follow Spark's pattern (lines 44-64): construct a new broadcast exchange wrappingadaptivePlan.executedPlan, wrap it in a newAdaptiveSparkPlanExec, and let AQE'sstageCachecanonicalization ensure the broadcast runs once.Subquery deduplication via shared cache
Spark's
ReuseAdaptiveSubqueryuses the sharedAdaptiveExecutionContext.subqueryCachefor cross-plan deduplication. Our rule runs afterReuseAdaptiveSubquery(which can't see our subqueries because they don't exist yet), andCometReuseSubqueryuses a per-invocation local cache. We register DPP subqueries in the sharedsubqueryCachedirectly, matchingReuseAdaptiveSubquery's behavior for cross-plan reuse (e.g., main query and scalar subquery with identical DPP).Dual-filter resolution
CometNativeScanExec.partitionFiltersandCometScanExec.partitionFilterscontain separateInSubqueryExecinstances.CometExecRuleonly wraps the outer filters (the innerCometScanExecis@transient, not in the expression tree).CometPlanAdaptiveDynamicPruningFiltersconverts both, matchingCometSubqueryAdaptiveBroadcastExec(wrapped, outer) andSubqueryAdaptiveBroadcastExec(unwrapped, inner).Spark 3.4 fallback
injectQueryStageOptimizerRuleis unavailable on 3.4. SAB wrapping is gated onisSpark35Plus. On 3.4, scans with AQE DPP fall back to Spark so that Spark's rule handles DPP natively (with partition pruning).Broadcast fallback cases
BroadcastHashJoinExecand createsSubqueryBroadcastExec(Spark's type) viacreateSubqueryBroadcastExecshim.Literal.TrueLiteralor aggregateSubqueryExecdepending ononlyInBroadcast.AdaptiveExecutionContext),BroadcastQueryStageExec.planmay beReusedExchangeExec. The rule unwraps it to verify the underlying exchange type.How are these changes tested?
14 new AQE DPP tests in
CometExecSuitecovering the combination matrix:CometSubqueryBroadcastExec,AdaptiveSparkPlanExecchild withReusedExchangeExec(broadcast reuse via stageCache), no unconverted SABs, 3.4 fallbackCometSubqueryBroadcastExec(Spark handles DPP)CometSubqueryBroadcastExec, no unconverted SABsAdaptiveSparkPlanExecchildren, no unconverted SABsCometSubqueryBroadcastExecpresent, no unconverted SABsReusedExchangeExecREUSE_BROADCAST_ONLY=false,EXCHANGE_REUSE_ENABLED=false: aggregate SubqueryExec path (case 3)subqueryCacheAll tests have version-specific assertions (3.5+ native path vs 3.4 fallback). Existing DPP tests (
CometDppFallbackRepro3949Suite,CometShuffleFallbackStickinessSuite) updated to disable native scan to preserve thestageContainsDPPScanstickiness code path.Existing non-AQE DPP tests renamed to consistent
"[non-AQE|AQE] DPP: <scenario>"format.