Skip to content

feat: AQE DPP for native Parquet scans with broadcast reuse#4112

Draft
mbutrovich wants to merge 25 commits intoapache:mainfrom
mbutrovich:aqe_dpp_parquet
Draft

feat: AQE DPP for native Parquet scans with broadcast reuse#4112
mbutrovich wants to merge 25 commits intoapache:mainfrom
mbutrovich:aqe_dpp_parquet

Conversation

@mbutrovich
Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich commented Apr 27, 2026

Which issue does this PR close?

Partially addresses #3510. Closes #4045 (V1 Parquet AQE DPP). Related PRs: #4011 (non-AQE DPP), #4053 (scalar subquery pushdown + CometReuseSubquery), #4037 (non-AQE DPP edge case tests), #4033 (AQE DPP for Iceberg, draft).

Rationale for this change

Under AQE (the default), Spark creates SubqueryAdaptiveBroadcastExec (SAB) for DPP. Spark's PlanAdaptiveDynamicPruningFilters converts these by finding BroadcastHashJoinExec in the plan. After Comet replaces it with CometBroadcastHashJoinExec, Spark's rule can't find a match. With onlyInBroadcast=true, it replaces DPP with Literal.TrueLiteral, disabling partition pruning. With the previous isAqeDynamicPruningFilter rejection, the scan fell back to Spark entirely, losing native acceleration for all DPP queries under AQE.

What changes are included in this PR?

Two-phase SAB conversion

Spark's PlanAdaptiveDynamicPruningFilters runs before custom queryStageOptimizerRules and converts SABs to TrueLiteral. We work around this in two phases:

  1. CometExecRule (queryStagePreparationRules, before Spark's rule): Wraps SABs in CometSubqueryAdaptiveBroadcastExec so Spark's pattern match doesn't recognize them. Only wraps in CometNativeScanExec nodes (non-Comet scans like CSV keep the original SAB for Spark to handle).
  2. CometPlanAdaptiveDynamicPruningFilters (queryStageOptimizerRule, after Spark's rule): Converts following the same decision tree as Spark's rule:
    • exchangeReuseEnabled + matching broadcast join: CometSubqueryBroadcastExec wired to BroadcastQueryStageExec for broadcast reuse
    • No reusable broadcast + onlyInBroadcast=true: Literal.TrueLiteral (DPP disabled)
    • No reusable broadcast + onlyInBroadcast=false: aggregate SubqueryExec (DPP via separate execution, matching Spark's lines 68-79)

Cross-stage and cross-plan broadcast search

Spark's PlanAdaptiveDynamicPruningFilters is constructed with rootPlan = this (the current AdaptiveSparkPlanExec), so each ASPE (main query and each scalar subquery) gets its own rule instance pointing to itself. Custom queryStageOptimizerRules registered via injectQueryStageOptimizerRule are shared across all ASPEs and don't get a per-ASPE rootPlan reference.

We approximate Spark's behavior with two searches:

  1. stagePlan (the plan arg to apply()): covers same-stage joins (the common case) and scalar subqueries where scan and join are under one exchange
  2. context.qe.executedPlan (the main query's ASPE via the shared AdaptiveExecutionContext): covers cross-stage joins in the main query where a shuffle separates the scan from the broadcast join

When the broadcast is not yet materialized as a BroadcastQueryStageExec (cross-stage case), we follow Spark's pattern (lines 44-64): construct a new broadcast exchange wrapping adaptivePlan.executedPlan, wrap it in a new AdaptiveSparkPlanExec, and let AQE's stageCache canonicalization ensure the broadcast runs once.

Subquery deduplication via shared cache

Spark's ReuseAdaptiveSubquery uses the shared AdaptiveExecutionContext.subqueryCache for cross-plan deduplication. Our rule runs after ReuseAdaptiveSubquery (which can't see our subqueries because they don't exist yet), and CometReuseSubquery uses a per-invocation local cache. We register DPP subqueries in the shared subqueryCache directly, matching ReuseAdaptiveSubquery's behavior for cross-plan reuse (e.g., main query and scalar subquery with identical DPP).

Dual-filter resolution

CometNativeScanExec.partitionFilters and CometScanExec.partitionFilters contain separate InSubqueryExec instances. CometExecRule only wraps the outer filters (the inner CometScanExec is @transient, not in the expression tree). CometPlanAdaptiveDynamicPruningFilters converts both, matching CometSubqueryAdaptiveBroadcastExec (wrapped, outer) and SubqueryAdaptiveBroadcastExec (unwrapped, inner).

Spark 3.4 fallback

injectQueryStageOptimizerRule is unavailable on 3.4. SAB wrapping is gated on isSpark35Plus. On 3.4, scans with AQE DPP fall back to Spark so that Spark's rule handles DPP natively (with partition pruning).

Broadcast fallback cases

  • Spark BHJ (Comet BHJ disabled): The rule finds BroadcastHashJoinExec and creates SubqueryBroadcastExec (Spark's type) via createSubqueryBroadcastExec shim.
  • SMJ (no broadcast): No matching broadcast join. Falls back to Literal.TrueLiteral or aggregate SubqueryExec depending on onlyInBroadcast.
  • ReusedExchangeExec: When AQE reuses exchanges across the main plan and scalar subquery plans (shared AdaptiveExecutionContext), BroadcastQueryStageExec.plan may be ReusedExchangeExec. The rule unwraps it to verify the underlying exchange type.

How are these changes tested?

14 new AQE DPP tests in CometExecSuite covering the combination matrix:

# Scenario Verifies
1 BHJ golden path Native scan, CometSubqueryBroadcastExec, AdaptiveSparkPlanExec child with ReusedExchangeExec (broadcast reuse via stageCache), no unconverted SABs, 3.4 fallback
2 Spark BHJ (Comet BHJ disabled) No CometSubqueryBroadcastExec (Spark handles DPP)
3 SMJ (no broadcast) Native scan, no CometSubqueryBroadcastExec, no unconverted SABs
4 Two separate broadcast joins buildKeys disambiguation, native scan, no unconverted SABs
5 Empty broadcast result Empty result, no errors
6 Dual filter resolution Both outer+inner filters resolved (correct results prove this)
7 Broadcast exchange reuse AdaptiveSparkPlanExec children, no unconverted SABs
8 Non-atomic type (struct/array) Correct results with complex join keys
9 Non-atomic type + CometSubqueryBroadcast CometSubqueryBroadcastExec present, no unconverted SABs
10 Scalar subquery + DPP reuse Cross-plan broadcast reuse via ReusedExchangeExec
11 BHJ with exchange reuse disabled REUSE_BROADCAST_ONLY=false, EXCHANGE_REUSE_ENABLED=false: aggregate SubqueryExec path (case 3)
12 Avoid reordering broadcast join keys Cross-stage broadcast search via rootPlan, shuffle between scan and join
13 Uncorrelated scalar subquery with broadcast reuse Cross-plan subquery deduplication via shared subqueryCache
14 Join with ordering requirement project count Verifies ReusedExchangeExec prevents double-counting of project nodes

All tests have version-specific assertions (3.5+ native path vs 3.4 fallback). Existing DPP tests (CometDppFallbackRepro3949Suite, CometShuffleFallbackStickinessSuite) updated to disable native scan to preserve the stageContainsDPPScan stickiness code path.

Existing non-AQE DPP tests renamed to consistent "[non-AQE|AQE] DPP: <scenario>" format.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Comet DPP exchange/broadcast reuse fails under AQE

1 participant