Building Custom Distributed Plans#

By default, the distributed planner decides where to place network boundaries: it walks your single-node physical plan and injects NetworkShuffleExec, NetworkCoalesceExec, and NetworkBroadcastExec nodes at the points it thinks make sense.

You don’t have to leave that decision to the planner. NetworkShuffleExec and NetworkCoalesceExec are public, constructible nodes, so you can place network boundaries yourself and build whatever stage topology your workload needs.

How it works#

The distributed planner runs as a query planner that post-processes the physical plan. Its first check is whether the plan already contains network boundaries:

  • If it does, the planner assumes you placed them deliberately. It does not try to distribute the plan itself — it only finalises your boundaries (giving each stage a unique id and eliding any boundary that turns out to connect a single producer task to a single consumer task) and wraps the result in a DistributedExec.

  • If it doesn’t, the planner runs its normal automatic distribution passes.

So building a custom distributed plan is a matter of getting your network boundaries into the physical plan before the distributed planner sees it. The idiomatic way to do that is a DataFusion PhysicalOptimizerRule, which runs while the physical plan is being built:

let state = SessionStateBuilder::new()
    .with_default_features()
    // Your rule injects NetworkShuffleExec / NetworkCoalesceExec into the plan...
    .with_physical_optimizer_rule(Arc::new(MyBoundaryInjectionRule))
    // ...and the distributed planner finalises whatever boundaries it finds.
    .with_distributed_worker_resolver(/* ... */)
    .with_distributed_planner()
    .with_distributed_user_codec(MyLeafCodec)
    .build();

The two boundary nodes#

Both nodes connect a producer stage (below, running on producer_tasks tasks) to a consumer stage (above). The placeholder stage ids they are constructed with are filled in for you during finalisation.

NetworkShuffleExec#

NetworkShuffleExec::try_new(input: Arc<dyn ExecutionPlan>, producer_tasks: usize) -> Result<Self>

A network shuffle — the distributed equivalent of a RepartitionExec. It repartitions data across tasks: each consumer task gathers data from every producer task, hash-partitioned by key. Its input must be a RepartitionExec with Hash partitioning. Use it for shuffle-based aggregations and partitioned joins.

NetworkCoalesceExec#

NetworkCoalesceExec::try_new(
    input: Arc<dyn ExecutionPlan>,
    producer_tasks: usize,
    consumer_tasks: usize,
) -> Result<Self>

A network coalesce — the distributed equivalent of a CoalescePartitionsExec. It concatenates partitions across tasks without repartitioning: each consumer task reads a contiguous group of producer tasks. It is meant to sit right above a partition-collecting node such as CoalescePartitionsExec or SortPreservingMergeExec. Use it to reduce the width of a stage (gather many tasks into fewer).

The caller is responsible for keeping the counts consistent: the consumer_tasks of one boundary must match the producer_tasks of the boundary immediately above it.

Leaf data splitting still happens automatically#

Even when you inject the boundaries yourself, the distributed planner runs the registered TaskEstimator over each stage’s leaves and calls scale_up_leaf_node with the stage’s task count. So a parquet DataSourceExec is wrapped in a DistributedLeafExec (with one per-task file-group variant) by the default file-scan estimator, and a custom leaf is split by whatever TaskEstimator you registered for it — exactly as in the automatic path. You only place the boundaries; the leaves are scaled for you.

This means a custom leaf node still needs its TaskEstimator (and its scale_up_leaf_node / DistributedTaskContext-based dispatch) registered, just as it would for automatic planning — you do not need to hand-build DistributedLeafExec in your boundary-injection rule.

Example: a progressive partial-reduction tree#

A common reason to build the topology yourself is to control how a stage fans in. Instead of gathering N leaf tasks into a single node with one wide coalesce, you can build a reduction tree that shrinks the data at every level:

 Final            (1 task)   — finishes the aggregation
   NetworkCoalesceExec  M → 1
 PartialReduce    (M tasks)  — merges partial states, shrinking the data again
   NetworkCoalesceExec  N → M
 Partial          (N tasks)  — first partial reduce, one task per slice of the input
   DistributedLeafExec

The node that makes this progressive (rather than just a fan-in of concatenations) is AggregateExec with AggregateMode::PartialReduce: it merges partial-aggregate states into fewer partial-aggregate states, so the volume crossing each network hop keeps dropping. A single Final aggregation on the root finishes the job — and because the merge happens on partial states, stateful aggregates like avg(...) stay correct.

There is a complete, runnable example in the examples/ folder:

  • custom_distributed_partial_reduction_tree.rs — a PhysicalOptimizerRule that rewrites a GROUP BY aggregation over a parquet table into the tree above (Partial NetworkCoalesce PartialReduce NetworkCoalesce Final). It only injects the boundaries; the planner’s TaskEstimator splits the parquet leaf across the leaf-stage tasks automatically.