Skip to content

Commit d4c738b

Browse files
authored
Implement split_and_shuffle, as a building block for sort-shuffling (#249)
Implement the one missing piece for sorting in rapidsmpf. More could be done here, but I think this is sufficient. This is almost exactly identical to `partition_and_pack` (we could also pass in the split tables already, making it even more of a component of `partition_and_pack` maybe). `split_and_pack` then should be able to replace `partition_and_pack` in the `Shuffle` for a new `SortShuffle`(?), I assumed it is OK to skip a fully empty part there. (More on how sorting can work in details.) ~EDIT: Hmmm, my pre-commit borked some formatting... (different computer, correct result)~ <details> The basic steps for sorting are the following: 1. Local sorting 2. From local result, extract evenly spaced points `[0, step, ... step*(N-1)]` (roughly). * (Only from the columns actually being sorted) 3. Continue with these "split candidates": * Also attach the `(partition_id, row)` (i.e. the row we split in global coordinates). * Broadcast all split candidates to all parts. (*I am assuming this is OK to do with Dask for now, as it is small*) * Do a local sort of all (from all parts). * We can use these to find which of our local chunk needs to go to which node -> i.e. the input for `split_and_pack`. 4. Use the shuffler, but with `split_and_pack(local_sorted_result, split_points)` (found in 1. and 3. 5. Do another local sort after gathering. For stable sorting, care needs to be taken, and I am not sure how the shuffler works. I had implemented this for [legate-df](https://github.com/rapidsai/legate-dataframe/blob/main/cpp/src/sort.cpp), which uses `libcudf` API, so it should translate pretty well to `pylibcudf` code. (For me the slow thing is figuring out the exact graph building, etc. right now) Authors: - Sebastian Berg (https://github.com/seberg) Approvers: - Lawrence Mitchell (https://github.com/wence-) - Richard (Rick) Zamora (https://github.com/rjzamora) URL: #249
1 parent 94d5257 commit d4c738b

File tree

6 files changed

+223
-6
lines changed

6 files changed

+223
-6
lines changed

cpp/include/rapidsmpf/shuffler/partition.hpp

+26
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,32 @@ partition_and_split(
8585
rmm::device_async_resource_ref mr
8686
);
8787

88+
89+
/**
90+
* @brief Splits rows from the input table into multiple packed (serialized) tables.
91+
*
92+
* @param table The table to split and pack into partitions.
93+
* @param splits The split points, equivalent to cudf::split(), i.e. one less than
94+
* the number of result partitions.
95+
* @param stream CUDA stream used for device memory operations and kernel launches.
96+
* @param mr Device memory resource used to allocate the returned table's device memory.
97+
*
98+
* @return A map of partition IDs and their packed tables.
99+
*
100+
* @throw std::out_of_range if the splits are invalid.
101+
*
102+
* @see unpack_and_concat
103+
* @see cudf::split
104+
* @see partition_and_pack
105+
*/
106+
[[nodiscard]] std::unordered_map<PartID, PackedData> split_and_pack(
107+
cudf::table_view const& table,
108+
std::vector<cudf::size_type> const& splits,
109+
rmm::cuda_stream_view stream,
110+
rmm::device_async_resource_ref mr
111+
);
112+
113+
88114
/**
89115
* @brief Unpack (deserialize) input tables and concatenate them.
90116
*

cpp/src/shuffler/partition.cpp

+39-5
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,20 @@ partition_and_split(
5858
return std::make_pair(std::move(tbl_partitioned), std::move(partition_table));
5959
}
6060

61+
static std::unordered_map<PartID, PackedData> pack_tables(
62+
std::vector<cudf::table_view> const& tables,
63+
rmm::cuda_stream_view stream,
64+
rmm::device_async_resource_ref mr
65+
) {
66+
std::unordered_map<PartID, PackedData> ret;
67+
ret.reserve(tables.size());
68+
for (PartID i = 0; static_cast<std::size_t>(i) < tables.size(); ++i) {
69+
auto pack = cudf::detail::pack(tables[i], stream, mr);
70+
ret.emplace(i, PackedData(std::move(pack.metadata), std::move(pack.gpu_data)));
71+
}
72+
return ret;
73+
}
74+
6175
std::unordered_map<PartID, PackedData> partition_and_pack(
6276
cudf::table_view const& table,
6377
std::vector<cudf::size_type> const& columns_to_hash,
@@ -71,12 +85,32 @@ std::unordered_map<PartID, PackedData> partition_and_pack(
7185
auto [tables, owner] = partition_and_split(
7286
table, columns_to_hash, num_partitions, hash_function, seed, stream, mr
7387
);
74-
std::unordered_map<PartID, PackedData> ret;
75-
for (PartID i = 0; static_cast<std::size_t>(i) < tables.size(); ++i) {
76-
auto pack = cudf::detail::pack(tables[i], stream, mr);
77-
ret.emplace(i, PackedData(std::move(pack.metadata), std::move(pack.gpu_data)));
88+
return pack_tables(tables, stream, mr);
89+
}
90+
91+
std::unordered_map<PartID, PackedData> split_and_pack(
92+
cudf::table_view const& table,
93+
std::vector<cudf::size_type> const& splits,
94+
rmm::cuda_stream_view stream,
95+
rmm::device_async_resource_ref mr
96+
) {
97+
RAPIDSMPF_NVTX_FUNC_RANGE();
98+
std::vector<cudf::table_view> tables;
99+
100+
if (table.num_rows() == 0) {
101+
// Work around cudf::split() not supporting empty tables.
102+
RAPIDSMPF_EXPECTS(
103+
std::all_of(splits.begin(), splits.end(), [](auto val) { return val == 0; }),
104+
"split point != 0 is invalid for empty table",
105+
std::out_of_range
106+
);
107+
tables = std::vector<cudf::table_view>(
108+
static_cast<std::size_t>(splits.size() + 1), table
109+
);
110+
} else {
111+
tables = cudf::split(table, splits, stream);
78112
}
79-
return ret;
113+
return pack_tables(tables, stream, mr);
80114
}
81115

82116
std::unique_ptr<cudf::table> unpack_and_concat(

cpp/tests/test_shuffler.cpp

+30
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,36 @@ TEST_P(NumOfPartitions, partition_and_pack) {
6767
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(sort_table(expect), sort_table(result));
6868
}
6969

70+
TEST_P(NumOfPartitions, split_and_pack) {
71+
int const num_partitions = std::get<0>(GetParam());
72+
int const num_rows = std::get<1>(GetParam());
73+
std::int64_t const seed = 42;
74+
auto stream = cudf::get_default_stream();
75+
auto mr = cudf::get_current_device_resource_ref();
76+
77+
cudf::table expect = random_table_with_index(seed, num_rows, 0, 10);
78+
79+
std::vector<cudf::size_type> splits;
80+
for (int i = 1; i < num_partitions; ++i) {
81+
splits.emplace_back(i * num_rows / num_partitions);
82+
}
83+
84+
auto chunks = rapidsmpf::shuffler::split_and_pack(expect, splits, stream, mr);
85+
86+
// Convert to a vector (restoring the original order).
87+
std::vector<rapidsmpf::PackedData> chunks_vector;
88+
for (int i = 0; i < num_partitions; ++i) {
89+
chunks_vector.emplace_back(std::move(chunks.at(i)));
90+
}
91+
EXPECT_EQ(chunks_vector.size(), num_partitions);
92+
93+
auto result =
94+
rapidsmpf::shuffler::unpack_and_concat(std::move(chunks_vector), stream, mr);
95+
96+
// Compare the input table with the result.
97+
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(expect, *result);
98+
}
99+
70100
TEST(MetadataMessage, round_trip) {
71101
auto metadata = iota_vector<uint8_t>(100);
72102

python/rapidsmpf/rapidsmpf/shuffler.pyi

+6
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ def partition_and_pack(
2121
stream: Stream,
2222
device_mr: DeviceMemoryResource,
2323
) -> dict[int, PackedData]: ...
24+
def split_and_pack(
25+
table: Table,
26+
splits: Iterable[int],
27+
stream: Stream,
28+
device_mr: DeviceMemoryResource,
29+
) -> dict[int, PackedData]: ...
2430
def unpack_and_concat(
2531
partitions: Iterable[PackedData],
2632
stream: Stream,

python/rapidsmpf/rapidsmpf/shuffler.pyx

+70
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@ cdef extern from "<rapidsmpf/shuffler/partition.hpp>" nogil:
3838
device_memory_resource *mr,
3939
) except +
4040

41+
cdef unordered_map[uint32_t, cpp_PackedData] cpp_split_and_pack \
42+
"rapidsmpf::shuffler::split_and_pack"(
43+
const table_view& table,
44+
const vector[size_type] &splits,
45+
cuda_stream_view stream,
46+
device_memory_resource *mr,
47+
) except +
48+
4149

4250
cpdef dict partition_and_pack(
4351
Table table,
@@ -76,6 +84,7 @@ cpdef dict partition_and_pack(
7684
rapidsmpf.shuffler.unpack_and_concat
7785
pylibcudf.partitioning.hash_partition
7886
pylibcudf.contiguous_split.pack
87+
rapidsmpf.shuffler.split_and_pack
7988
"""
8089
cdef vector[size_type] _columns_to_hash = tuple(columns_to_hash)
8190
cdef unordered_map[uint32_t, cpp_PackedData] _ret
@@ -103,6 +112,67 @@ cpdef dict partition_and_pack(
103112
return ret
104113

105114

115+
cpdef dict split_and_pack(
116+
Table table,
117+
splits,
118+
stream,
119+
DeviceMemoryResource device_mr,
120+
):
121+
"""
122+
Splits rows from the input table into multiple packed (serialized) tables.
123+
124+
Parameters
125+
----------
126+
table
127+
The input table to split and pack. The table cannot be empty (the
128+
split points would not be valid).
129+
splits
130+
The split points, equivalent to cudf::split(), i.e. one less than
131+
the number of result partitions.
132+
stream
133+
The CUDA stream used for memory operations.
134+
device_mr
135+
Reference to the RMM device memory resource used for device allocations.
136+
137+
Returns
138+
-------
139+
A dictionary where the keys are partition IDs and the values are packed tables.
140+
141+
Raises
142+
------
143+
IndexError
144+
If the splits are out of range for ``[0, len(table)]``.
145+
146+
See Also
147+
--------
148+
rapidsmpf.shuffler.unpack_and_concat
149+
pylibcudf.copying.split
150+
rapidsmpf.shuffler.partition_and_pack
151+
"""
152+
cdef vector[size_type] _splits = tuple(splits)
153+
cdef unordered_map[uint32_t, cpp_PackedData] _ret
154+
cdef table_view tbl = table.view()
155+
if stream is None:
156+
raise ValueError("stream cannot be None")
157+
cdef cuda_stream_view _stream = Stream(stream).view()
158+
159+
with nogil:
160+
_ret = cpp_split_and_pack(
161+
tbl,
162+
_splits,
163+
_stream,
164+
device_mr.get_mr()
165+
)
166+
ret = {}
167+
cdef unordered_map[uint32_t, cpp_PackedData].iterator it = _ret.begin()
168+
while(it != _ret.end()):
169+
ret[deref(it).first] = PackedData.from_librapidsmpf(
170+
make_unique[cpp_PackedData](move(deref(it).second))
171+
)
172+
postincrement(it)
173+
return ret
174+
175+
106176
cdef extern from "<rapidsmpf/shuffler/partition.hpp>" nogil:
107177
cdef unique_ptr[cpp_table] cpp_unpack_and_concat \
108178
"rapidsmpf::shuffler::unpack_and_concat"(

python/rapidsmpf/rapidsmpf/tests/test_shuffler.py

+52-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@
1313

1414
from rapidsmpf.buffer.resource import BufferResource
1515
from rapidsmpf.progress_thread import ProgressThread
16-
from rapidsmpf.shuffler import Shuffler, partition_and_pack, unpack_and_concat
16+
from rapidsmpf.shuffler import (
17+
Shuffler,
18+
partition_and_pack,
19+
split_and_pack,
20+
unpack_and_concat,
21+
)
1722
from rapidsmpf.testing import assert_eq
1823
from rapidsmpf.utils.cudf import (
1924
cudf_to_pylibcudf_table,
@@ -50,6 +55,52 @@ def test_partition_and_pack_unpack(
5055
assert_eq(expect, got, sort_rows="0")
5156

5257

58+
@pytest.mark.parametrize(
59+
"df",
60+
[
61+
{"0": [1, 2, 3], "1": [2, 2, 1]},
62+
{"0": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], "1": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]},
63+
{"0": [], "1": []},
64+
],
65+
)
66+
@pytest.mark.parametrize("num_partitions", [1, 2, 3, 10])
67+
def test_split_and_pack_unpack(
68+
device_mr: rmm.mr.CudaMemoryResource, df: dict[str, list[int]], num_partitions: int
69+
) -> None:
70+
expect = cudf.DataFrame(df)
71+
splits = np.linspace(0, len(expect), num_partitions, endpoint=False)[1:].astype(int)
72+
partitions = split_and_pack(
73+
cudf_to_pylibcudf_table(expect),
74+
splits=splits,
75+
stream=DEFAULT_STREAM,
76+
device_mr=device_mr,
77+
)
78+
got = pylibcudf_to_cudf_dataframe(
79+
unpack_and_concat(
80+
tuple(partitions[i] for i in range(num_partitions)),
81+
stream=DEFAULT_STREAM,
82+
device_mr=device_mr,
83+
)
84+
)
85+
86+
assert_eq(expect, got)
87+
88+
89+
@pytest.mark.parametrize("df", [{"0": [1, 2, 3], "1": [2, 2, 1]}, {"0": [], "1": []}])
90+
@pytest.mark.parametrize("num_partitions", [1, 2, 3, 10])
91+
def test_split_and_pack_unpack_out_of_range(
92+
device_mr: rmm.mr.CudaMemoryResource, df: dict[str, list[int]], num_partitions: int
93+
) -> None:
94+
expect = cudf.DataFrame({"0": [], "1": []})
95+
with pytest.raises(IndexError):
96+
split_and_pack(
97+
cudf_to_pylibcudf_table(expect),
98+
splits=[100],
99+
stream=DEFAULT_STREAM,
100+
device_mr=device_mr,
101+
)
102+
103+
53104
@pytest.mark.parametrize("wait_on", [False, True])
54105
@pytest.mark.parametrize("total_num_partitions", [1, 2, 3, 10])
55106
def test_shuffler_single_nonempty_partition(

0 commit comments

Comments
 (0)