Skip to content

Implement split_and_shuffle, as a building block for sort-shuffling #249

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 7, 2025

Conversation

seberg
Copy link
Contributor

@seberg seberg commented May 6, 2025

Implement the one missing piece for sorting in rapidsmpf. More could be done here, but I think this is sufficient.
This is almost exactly identical to partition_and_pack (we could also pass in the split tables already, making it even more of a component of partition_and_pack maybe).

split_and_pack then should be able to replace partition_and_pack in the Shuffle for a new SortShuffle(?), I assumed it is OK to skip a fully empty part there.
(More on how sorting can work in details.)

EDIT: Hmmm, my pre-commit borked some formatting... (different computer, correct result)

The basic steps for sorting are the following:

  1. Local sorting
  2. From local result, extract evenly spaced points [0, step, ... step*(N-1)] (roughly).
    • (Only from the columns actually being sorted)
  3. Continue with these "split candidates":
    • Also attach the (partition_id, row) (i.e. the row we split in global coordinates).
    • Broadcast all split candidates to all parts. (I am assuming this is OK to do with Dask for now, as it is small)
    • Do a local sort of all (from all parts).
    • We can use these to find which of our local chunk needs to go to which node -> i.e. the input for split_and_pack.
  4. Use the shuffler, but with split_and_pack(local_sorted_result, split_points) (found in 1. and 3.
  5. Do another local sort after gathering.

For stable sorting, care needs to be taken, and I am not sure how the shuffler works. I had implemented this for legate-df, which uses libcudf API, so it should translate pretty well to pylibcudf code.
(For me the slow thing is figuring out the exact graph building, etc. right now)

@seberg seberg requested review from a team as code owners May 6, 2025 13:45
Implementing a distributed sort has the following components
(more complex things possibly exists, but I think this should be OK):

LocalSort -> SelectEquidistant/SplitPointValues -> BroadcastToAll
(Split points are selected by the number of partitions.)

The BroadcastToAll result will then contain:
1. The original data (by which we sorted, no need for all rows)
2. Two additional rows with (partition_id, local_row_id) to establish
   a global order.

The `BroadcastToAll` result can then be sorted again and used to
figure out how to move each split around.

And for that step one needs something like the `split_and_pack()` to
replace the `partition_and_pack()` currently used for hash partitioning.

So with the above one should be able to implement a
ShuffleForSort(LocalSort, BroadcastToAll).
The final sort result would then be another local sort after shuffling.

(The needed broadcast is small, so I assume there is no need to do it
with rapidsmpf initially.)

The above approach guarantees that the result partitions are at most
a factor of two less balanced than the input (I can look up the
reference).

Signed-off-by: Sebastian Berg <[email protected]>
@seberg seberg force-pushed the split-shuffle-for-sort branch from 10c024b to c18bc39 Compare May 6, 2025 14:00
Signed-off-by: Sebastian Berg <[email protected]>
@madsbk madsbk added improvement Improves an existing functionality non-breaking Introduces a non-breaking change labels May 6, 2025
Copy link
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two minor suggestions

Comment on lines 90 to 92
RAPIDSMPF_EXPECTS(
table.num_rows() > 0, "the input table cannot be empty", std::invalid_argument
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be allowed by returning an appropriate empty result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... that is clearly better, would be nice if cudf::split just did it, but fixed (a bit maximalistic).

Comment on lines 95 to 99
std::unordered_map<PartID, PackedData> ret;
for (PartID i = 0; static_cast<std::size_t>(i) < tables.size(); ++i) {
auto pack = cudf::detail::pack(tables[i], stream, mr);
ret.emplace(i, PackedData(std::move(pack.metadata), std::move(pack.gpu_data)));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth factoring out this code which is identical to lines 74-78?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't sure, but why not added. Kept it local/private for now, though.

Signed-off-by: Sebastian Berg <[email protected]>
Copy link
Member

@rjzamora rjzamora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @seberg!

I think I'll put together a PR to update DaskIntegration.insert_partition to accept an additional argument (perhaps sort_boundaries?).

I'm thinking the insert_partition logic could use plc.search to calculate the row offsets (i.e. splits) and pass them into split_and_pack. If sort_boundaries is None, we simply fall back to hash partitioning.

Copy link
Contributor

@nirandaperera nirandaperera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a small nitpick

@seberg
Copy link
Contributor Author

seberg commented May 7, 2025

I think I'll put together a PR to update DaskIntegration.insert_partition to accept an additional argument (perhaps sort_boundaries?).

Yeah, integrating it directly is nicest maybe. sort_boundaries or maybe sort_splits?

EDIT: Ah, now I see that this is needed even if I might create a RMPFIntegration class... (A class can avoid it, but would want to remove the argument eventually, I would prefer that solution -- i.e. tag it on to the an RMPFIntegration instance. unpack_and_concat should also have an unpack_and_merge maybe, although that can be a second sort initially)

I'm thinking the insert_partition logic could use plc.search to calculate the row offsets

Yeah, exactly, you can use lower/upper bound searches. Unfortunately, you need to adjust for exact equality. Here I did that via both upper/lower and then doing some post-processing.
Maybe there is another trick to do it simpler, but it is nice to use plc searching directly to deal with multi-column sorting.

PS: I think I may need to ask for permission to be able to merge.

@seberg
Copy link
Contributor Author

seberg commented May 7, 2025

/merge

1 similar comment
@TomAugspurger
Copy link
Contributor

/merge

@rapids-bot rapids-bot bot merged commit d4c738b into rapidsai:branch-25.06 May 7, 2025
21 checks passed
@seberg seberg deleted the split-shuffle-for-sort branch May 7, 2025 14:41
rapids-bot bot pushed a commit that referenced this pull request May 8, 2025
Follow up to #249

- Adds minimal changes to support sorting with a `DaskIntegration` protocol.
- Does **not** bother to implement/demonstrate anything beyond an ascending sort by a single column.
- Adds `sort_boundaries` and `options` arguments to `DaskIntegration.insert_partition` and adds an `options` argument to `DaskIntegration.extract_partition`
    - The `sort_boundaries` must be a separate positional argument so that it can be generated dynamically as the output of a separate task. All other sorting options can be passed through with `options` (e.g. ascending vs descending and null precedence).

This technically breaks existing cudf-polars code, so we will want to update the `DaskIntegration` protocol defined in cudf asap after this is finished/merged.

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Sebastian Berg (https://github.com/seberg)
  - Tom Augspurger (https://github.com/TomAugspurger)

URL: #256
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
improvement Improves an existing functionality non-breaking Introduces a non-breaking change
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants