Spawn a Worker#
The Worker is a gRPC server implementing the Arrow Flight protocol for distributed query execution. Worker nodes
run these endpoints to receive execution plans, execute them, and stream results back.
Overview#
The Worker is the core worker component in Distributed DataFusion. It:
Receives serialized execution plans via Arrow Flight’s
do_getmethodDeserializes plans using protobuf and user-provided codecs
Executes plans using the local DataFusion runtime
Streams results back as Arrow record batches through the gRPC Arrow Flight interface
Launching the Arrow Flight server#
The default Worker implementation satisfies most basic use cases:
use datafusion_distributed::Worker;
async fn main() {
let endpoint = Worker::default();
Server::builder()
.add_service(endpoint.into_flight_server())
.serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000))
.await?;
Ok(())
}
However, most DataFusion deployments include custom UDFs, execution nodes, or configuration options.
You’ll need to tell the Worker how to build your DataFusion sessions:
async fn build_state(ctx: WorkerQueryContext) -> Result<SessionState, DataFusionError> {
Ok(ctx
.builder
.with_scalar_functions(vec![your_custom_udf()])
.build())
}
async fn main() {
let endpoint = Worker::from_session_builder(build_sate);
Server::builder()
.add_service(endpoint.into_flight_server())
.serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000))
.await?;
Ok(())
}
WorkerSessionBuilder#
The WorkerSessionBuilder is a closure or type that implements:
#[async_trait]
pub trait WorkerSessionBuilder {
async fn build_session_state(
&self,
ctx: WorkerQueryContext,
) -> Result<SessionState, DataFusionError>;
}
It receives a WorkerQueryContext containing:
SessionStateBuilder: A pre-populated session state builder in which you can inject your custom stuffheaders: HTTP headers from the incoming request (useful for passing metadata like authentication tokens or configuration)
Serving the Endpoint#
Convert the endpoint to a gRPC service and serve it:
use tonic::transport::Server;
use datafusion_distributed::Worker;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
async fn main() {
let endpoint = Worker::default();
Server::builder()
.add_service(endpoint.into_flight_server())
.serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080))
.await?;
}
The into_flight_server() method builds a FlightServiceServer ready to be added as a Tonic service.