Skip to content

Basic sorting support with Dask #256

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
May 8, 2025
Merged

Conversation

rjzamora
Copy link
Member

@rjzamora rjzamora commented May 7, 2025

Follow up to #249

  • Adds minimal changes to support sorting with a DaskIntegration protocol.
  • Does not bother to implement/demonstrate anything beyond an ascending sort by a single column.
  • Adds sort_boundaries and options arguments to DaskIntegration.insert_partition and adds an options argument to DaskIntegration.extract_partition
    • The sort_boundaries must be a separate positional argument so that it can be generated dynamically as the output of a separate task. All other sorting options can be passed through with options (e.g. ascending vs descending and null precedence).

This technically breaks existing cudf-polars code, so we will want to update the DaskIntegration protocol defined in cudf asap after this is finished/merged.

@rjzamora rjzamora self-assigned this May 7, 2025
@rjzamora rjzamora requested a review from a team as a code owner May 7, 2025 22:31
@rjzamora rjzamora added breaking Introduces a breaking change feature request New feature or request labels May 7, 2025
sort_boundaries
Output partition boundaries for sorting.
options
Optional key-work arguments.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Optional key-work arguments.
Optional key-word arguments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we use "Additional options." in a few places below. Let's pick one description and copy it through.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, missed this one - Let's do the simple "Additional options." for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to make it DaskIntegration(on=, sort_boundaries=)? Or would that obfuscate/impede the way we build dask graphs here?

More of a question, as it would side-step the immediate need for breaking things (not that it matters) and might avoid the options catch-all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to pass on the pid of df here for sorting. That is needed to find the right splits if you want to balance the result partition sizes for degenerate case (such as all equal).
And I believe we don't have another way to pass it in.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@seberg - I updated/generalized the protocol a bit. I didn't include the input partition id as a required argument, but we can add that now that we are changing things. Can you explain how having the input partition id would help you handle degenerate values?

Copy link
Contributor

@seberg seberg May 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, the idea is that the split_boundary values know which partition ID they came from (and ideally their local row).

For example, we split (1, 1, 1, 1), distributed as pid0=(1, 1) and pid1=(1, 1).
If you the pid and row, then the split boundary will be (value, pid=1, row=0).

With that pid information, you can figure out now here that pid=0 should send it's data to 0 (split after the boundary) and pid=1 should send it all to 1 (split before at boundary here).

Without the additional information, there is no choice but for both pids to send all data to 0.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I see. This case definitely isn't a high priority yet (dask-dataframe still doesn't attempt to handle this at all), but it's good-enough reason to include partition_id as a required argument to insert_partition now that we are updating the protocol anyway.

Copy link
Contributor

@seberg seberg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, although my Dask eye isn't keen. Two small comments that may or may not be relevant.

sort_boundaries
Output partition boundaries for sorting.
options
Optional key-work arguments.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to make it DaskIntegration(on=, sort_boundaries=)? Or would that obfuscate/impede the way we build dask graphs here?

More of a question, as it would side-step the immediate need for breaking things (not that it matters) and might avoid the options catch-all.

)
else:
df = df.sort_values(on)
splits = df[on[0]].searchsorted(sort_boundaries, side="right")
Copy link
Contributor

@seberg seberg May 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

N.B. (I assume you are aware, and at most worth a code comment): Good for an example but it only works if values in sort_boundaries are unique in df. Otherwise you need to adjust for where the boundary value came from. Thus the longer function I shared.

EDIT: Sorry, this is not as bad as I first recalled. As it is only needed to avoid large imbalances in the result partition sizes.

Output partition boundaries for sorting. If None,
hashing will be used to calculate output partitions.
options
Additional options.
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can imagine that we might eventually want more all-to-all-like patterns. Would it make more sense to change this interface such that insert_partition just takes the list[PackedData] and the shuffler and we provide separate functions for hash and sort-based partitioning (and the user can bring their own).

So something like:

def insert_partition(
    shuffler: Shuffler,
    chunks: Sequence[PackedData], # Or whatever it is
) -> None:

And we provide two builtin functions

def hash_partition(df, partition_count, *, on) -> list[PackedData]:
    ...
def sort_partition(df, partition_count, *, by) -> list[PackedData]:
    ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that helps us generalize at all. We already have Shuffler.insert_chunks, which is essentially the insert_partition function you are proposing. The purpose of DaskIntegration.insert_partition is to aviod the need for various Dask shuffling applications to write their own task graph.

We want insert_partition/extract_partition to include the minimal necessary arguments to construct a "general" shuffling task graph. Since we are revising things, this may be:

    @staticmethod
    def insert_partition(
        df: DataFrameT,  # Partition to insert
        partition_count: int,   # Output partition count
        shuffler: Shuffler,   # Shuffler object
        options: dict[str, Any] | None,   # Arbitrary keyword arguments
        *other: Any,   # "Other" task-output data (e.g. sorting boundaries/quantiles)
    ) -> None:

    @staticmethod
    def extract_partition(
        partition_id: int,  # Partition ID to extract
        shuffler: Shuffler,   # Shuffler object
        options: dict[str, Any] | None,   # Arbitrary keyword arguments
    ) -> DataFrameT:

I think the options argument can be used to control most variation of a shuffle, and the *other positional argument could be used to pass in information that must be calculated dynamically at execution time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok, carry on then

Copy link
Contributor

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be good. I think our quickstart example will need to be updated for the new keyword.

@wence-'s comment in https://github.com/rapidsai/rapidsmpf/pull/256/files#r2079496979 is worth resolving one way or another. Aside from churn, I think it can be handled later, but I don't have a strong opinion on it.

@rjzamora
Copy link
Member Author

rjzamora commented May 8, 2025

Thanks @TomAugspurger

@wence-'s comment in https://github.com/rapidsai/rapidsmpf/pull/256/files#r2079496979 is worth resolving one way or another. Aside from churn, I think it can be handled later, but I don't have a strong opinion on it.

Could you summarize what still needs to be resolved, and maybe we can file an issue? As far as I can tell, the suggestions from that comment already exist in rapidsmpf (via DaskIntegration.insert_partition, split_and_pack, and partition_and_pack) - That said, I'm probably missing something.

@TomAugspurger
Copy link
Contributor

Yep that makes sense to me. Let's go ahead and merge this and open a followup issue if @wence- has anything to add.

@rjzamora
Copy link
Member Author

rjzamora commented May 8, 2025

/merge

@rapids-bot rapids-bot bot merged commit 6877cb6 into rapidsai:branch-25.06 May 8, 2025
23 checks passed
rapids-bot bot pushed a commit to rapidsai/cudf that referenced this pull request May 8, 2025
Teeing up this "fix" for the proposed change in rapidsai/rapidsmpf#256

Once that PR is merged, we will want to get this in asap to keep `rapidsmpf` shuffling from breaking. We can update `Sort` in a follow-up PR.

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Tom Augspurger (https://github.com/TomAugspurger)

URL: #18720
@rjzamora rjzamora deleted the dask-sort branch May 8, 2025 22:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
breaking Introduces a breaking change feature request New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants