Skip to content

feat(java): support distributed splits planning#6328

Open
summaryzb wants to merge 1 commit intolance-format:mainfrom
summaryzb:split_plan
Open

feat(java): support distributed splits planning#6328
summaryzb wants to merge 1 commit intolance-format:mainfrom
summaryzb:split_plan

Conversation

@summaryzb
Copy link
Copy Markdown

Summary

This PR adds distributed split planning for Lance's filtered read execution, enabling a plan/execute separation pattern where a coordinator node plans a scan and worker nodes execute per-fragment portions of it. The implementation spans the Rust core (scanner, filtered read exec, protobuf serialization) and Java bindings, with a new FilteredRead Java class that exposes the full workflow.

Problem

Lance's FilteredReadExec supports serializing an entire scan plan to protobuf for remote execution, but there was no mechanism to split a multi-fragment plan into per-fragment tasks that could be distributed to individual workers. Distributed engines like Spark need to: (1) plan a scan on the coordinator, (2) split the plan into independent per-fragment tasks, (3) serialize each task and ship it to a worker, and (4) execute each task independently. The existing code could serialize and deserialize a full plan, but lacked the splitting, metadata extraction, and end-to-end orchestration API needed for this workflow.

Approach

The implementation follows a three-layer design:

Rust core -- Scanner entry point (scanner.rs): A new Scanner::plan_filtered_read() method constructs a FilteredReadExec from the current scanner settings (filter, projection, fragments, batch size, etc.), triggers internal planning via ensure_plan_initialized() to compute the RowAddrTreeMap (which fragments/rows to read), and serializes the result to protobuf bytes. This mirrors the logic in create_plan / new_filtered_read but exposes the result as an opaque serializable blob rather than executing it.

Rust core -- Proto splitting and execution (filtered_read_proto.rs): Three new public functions:

  • split_plan_proto() decodes a full FilteredReadExecProto, iterates over the RowAddrTreeMap to extract per-fragment entries, and re-serializes each as a standalone proto with the same table identifier and options but a single-fragment plan. The global scan_range_after_filter is intentionally dropped from per-fragment protos since it can only be applied after aggregating results across all workers.
  • extract_plan_metadata() provides a lightweight summary (fragment IDs and row counts per fragment, where -1 means "full fragment") without needing a dataset handle, enabling coordinators to estimate task sizes for load balancing.
  • split_and_inspect_plan_proto() combines both operations in a single decode pass to avoid redundant deserialization.
  • execute_filtered_read_from_bytes() is the worker-side counterpart: it decodes a proto, reconstructs a FilteredReadExec (optionally reusing an existing dataset handle), and returns a SendableRecordBatchStream.

Rust core -- FilteredReadExec (filtered_read.rs): A new ensure_plan_initialized() method triggers internal plan computation without converting to the external FilteredReadPlan format, caching the result for subsequent serialization.

Java bindings: A new FilteredRead class implements Serializable and provides the full distributed workflow API:

  • planFilteredRead(scanner) calls through JNI to Scanner::plan_filtered_read() and split_and_inspect_plan_proto(), returning a FilteredRead object containing the full proto, per-fragment split protos, fragment IDs, and rows-per-fragment metadata.
  • getTasks() returns the per-fragment task protos for distribution.
  • executeFilteredRead(dataset, taskProto, allocator) executes a single task on a worker node.
  • Split protos and metadata arrays are marked transient -- they are not included in Java serialization; the receiver reconstructs them by calling split_and_inspect_plan_proto on the deserialized full proto.

Per-fragment filter deduplication from the original plan is preserved: split_plan_proto looks up each fragment's filter expression ID in the shared filter_expressions array and copies only the relevant entry into the per-fragment proto.

Changes

Rust -- rust/lance/src/dataset/scanner.rs:

  • Added Scanner::plan_filtered_read() method (gated on substrait feature) that builds a FilteredReadExec, triggers planning, and serializes to protobuf bytes.

Rust -- rust/lance/src/io/exec/filtered_read.rs:

  • Added FilteredReadExec::ensure_plan_initialized() public method to trigger and cache plan computation without external conversion.

Rust -- rust/lance/src/io/exec/filtered_read_proto.rs:

  • Added FilteredReadPlanMetadata struct and extract_plan_metadata() function for lightweight plan inspection.
  • Added execute_filtered_read_from_bytes() for worker-side deserialization and execution.
  • Added split_plan_proto() for per-fragment plan splitting.
  • Added SplitPlanResult struct and split_and_inspect_plan_proto() for combined split + metadata extraction.

Proto -- protos/filtered_read.proto:

  • Added a clarifying comment to FilteredReadPlanProto.row_addr_tree_map field.

Java JNI -- java/lance-jni/src/blocking_scanner.rs:

  • Added nativeCreatePlan and nativeExecuteFilteredRead JNI entry points.

Java -- java/src/main/java/org/lance/ipc/FilteredRead.java:

  • New FilteredRead class implementing Serializable with full distributed workflow API.

Java -- java/src/test/java/org/lance/FilteredReadTest.java:

  • New end-to-end test class for the distributed filtered read workflow.

Test Coverage

  • Rust -- test_extract_plan_metadata: Verifies fragment IDs and row counts are correctly extracted from a serialized plan (2 fragments, 50 rows each).
  • Rust -- test_split_plan_proto: Splits a 2-fragment plan and verifies each split executes independently, with total rows matching direct execution.
  • Rust -- test_split_plan_proto_with_filter: Same as above but with a filter expression (x > 10), verifying per-fragment filters are correctly preserved in splits.
  • Rust -- test_plan_serialize_execute_roundtrip: End-to-end: build exec with filter, plan, serialize to bytes, deserialize via execute_filtered_read_from_bytes, and verify identical output.
  • Java -- testBasicPlanAndExecute: Plans a 2-fragment dataset, splits into tasks, executes each, and verifies total row count (50 rows).
  • Java -- testPlanMetadata: Verifies fragment count, fragment IDs, and rows-per-fragment arrays for a 3-fragment dataset.
  • Java -- testDistributedSplitAndExecute: Simulates coordinator/worker pattern with 3 fragments, compares distributed execution total with direct scan total (60 rows).
  • Java -- testPlanWithFilter: Plans with id > 10 filter on a 40-row fragment, verifies 29 rows returned after distributed execution.
  • Java -- testSerializableRoundtrip: Serializes a task proto via Java ObjectOutputStream, deserializes, executes both, and verifies identical row counts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request java

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant