Skip to content

Commit c18bc39

Browse files
committed
Implement split_and_shuffle, as a building block for sort-shuffling
Implementing a distributed sort has the following components (more complex things possibly exists, but I think this should be OK): LocalSort -> SelectEquidistant/SplitPointValues -> BroadcastToAll (Split points are selected by the number of partitions.) The BroadcastToAll result will then contain: 1. The original data (by which we sorted, no need for all rows) 2. Two additional rows with (partition_id, local_row_id) to establish a global order. The `BroadcastToAll` result can then be sorted again and used to figure out how to move each split around. And for that step one needs something like the `split_and_pack()` to replace the `partition_and_pack()` currently used for hash partitioning. So with the above one should be able to implement a ShuffleForSort(LocalSort, BroadcastToAll). The final sort result would then be another local sort after shuffling. (The needed broadcast is small, so I assume there is no need to do it with rapidsmpf initially.) The above approach guarantees that the result partitions are at most a factor of two less balanced than the input (I can look up the reference). Signed-off-by: Sebastian Berg <[email protected]>
1 parent 0317f38 commit c18bc39

File tree

6 files changed

+204
-1
lines changed

6 files changed

+204
-1
lines changed

cpp/include/rapidsmpf/shuffler/partition.hpp

+26
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,32 @@ partition_and_split(
8585
rmm::device_async_resource_ref mr
8686
);
8787

88+
89+
/**
90+
* @brief Splits rows from the input table into multiple packed (serialized) tables.
91+
*
92+
* @param tables The tables to pack into partitions.
93+
* @param splits The split points, equivalent to cudf::split(), i.e. one less than
94+
* the number of result partitions.
95+
* @param stream CUDA stream used for device memory operations and kernel launches.
96+
* @param mr Device memory resource used to allocate the returned table's device memory.
97+
*
98+
* @return A map of partition IDs and their packed tables.
99+
*
100+
* @throw std::invalid_argument if the input table is empty
101+
*
102+
* @see unpack_and_concat
103+
* @see cudf::split
104+
* @see partition_and_pack
105+
*/
106+
[[nodiscard]] std::unordered_map<PartID, PackedData> split_and_pack(
107+
cudf::table_view const& table,
108+
std::vector<cudf::size_type> const& splits,
109+
rmm::cuda_stream_view stream,
110+
rmm::device_async_resource_ref mr
111+
);
112+
113+
88114
/**
89115
* @brief Unpack (deserialize) input tables and concatenate them.
90116
*

cpp/src/shuffler/partition.cpp

+21
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,27 @@ std::unordered_map<PartID, PackedData> partition_and_pack(
7979
return ret;
8080
}
8181

82+
std::unordered_map<PartID, PackedData> split_and_pack(
83+
cudf::table_view const& table,
84+
std::vector<cudf::size_type> const& splits,
85+
rmm::cuda_stream_view stream,
86+
rmm::device_async_resource_ref mr
87+
) {
88+
RAPIDSMPF_NVTX_FUNC_RANGE();
89+
// Can't split empty tables (0 is out of bounds), so raise
90+
RAPIDSMPF_EXPECTS(
91+
table.num_rows() > 0, "the input table cannot be empty", std::invalid_argument
92+
);
93+
94+
auto tables = cudf::split(table, splits, stream);
95+
std::unordered_map<PartID, PackedData> ret;
96+
for (PartID i = 0; static_cast<std::size_t>(i) < tables.size(); ++i) {
97+
auto pack = cudf::detail::pack(tables[i], stream, mr);
98+
ret.emplace(i, PackedData(std::move(pack.metadata), std::move(pack.gpu_data)));
99+
}
100+
return ret;
101+
}
102+
82103
std::unique_ptr<cudf::table> unpack_and_concat(
83104
std::vector<PackedData>&& partitions,
84105
rmm::cuda_stream_view stream,

cpp/tests/test_shuffler.cpp

+30
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,36 @@ TEST_P(NumOfPartitions, partition_and_pack) {
6767
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(sort_table(expect), sort_table(result));
6868
}
6969

70+
TEST_P(NumOfPartitions, split_and_pack) {
71+
int const num_partitions = std::get<0>(GetParam());
72+
int const num_rows = std::get<1>(GetParam());
73+
std::int64_t const seed = 42;
74+
auto stream = cudf::get_default_stream();
75+
auto mr = cudf::get_current_device_resource_ref();
76+
77+
cudf::table expect = random_table_with_index(seed, num_rows, 0, 10);
78+
79+
std::vector<cudf::size_type> splits;
80+
for (int i = 1; i < num_partitions; ++i) {
81+
splits.emplace_back(i * num_rows / num_partitions);
82+
}
83+
84+
auto chunks = rapidsmpf::shuffler::split_and_pack(expect, splits, stream, mr);
85+
86+
// Convert to a vector (restoring the original order).
87+
std::vector<rapidsmpf::PackedData> chunks_vector;
88+
for (int i = 0; i < num_partitions; ++i) {
89+
chunks_vector.emplace_back(std::move(chunks.at(i)));
90+
}
91+
EXPECT_EQ(chunks_vector.size(), num_partitions);
92+
93+
auto result =
94+
rapidsmpf::shuffler::unpack_and_concat(std::move(chunks_vector), stream, mr);
95+
96+
// Compare the input table with the result.
97+
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(expect, *result);
98+
}
99+
70100
TEST(MetadataMessage, round_trip) {
71101
auto metadata = iota_vector<uint8_t>(100);
72102

python/rapidsmpf/rapidsmpf/shuffler.pyi

+6
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ def partition_and_pack(
2121
stream: Stream,
2222
device_mr: DeviceMemoryResource,
2323
) -> dict[int, PackedData]: ...
24+
def split_and_pack(
25+
table: Table,
26+
splits: Iterable[int],
27+
stream: Stream,
28+
device_mr: DeviceMemoryResource,
29+
) -> dict[int, PackedData]: ...
2430
def unpack_and_concat(
2531
partitions: Iterable[PackedData],
2632
stream: Stream,

python/rapidsmpf/rapidsmpf/shuffler.pyx

+70
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@ cdef extern from "<rapidsmpf/shuffler/partition.hpp>" nogil:
3838
device_memory_resource *mr,
3939
) except +
4040

41+
cdef unordered_map[uint32_t, cpp_PackedData] cpp_split_and_pack \
42+
"rapidsmpf::shuffler::split_and_pack"(
43+
const table_view& table,
44+
const vector[size_type] &splits,
45+
cuda_stream_view stream,
46+
device_memory_resource *mr,
47+
) except +
48+
4149

4250
cpdef dict partition_and_pack(
4351
Table table,
@@ -76,6 +84,7 @@ cpdef dict partition_and_pack(
7684
rapidsmpf.shuffler.unpack_and_concat
7785
pylibcudf.partitioning.hash_partition
7886
pylibcudf.contiguous_split.pack
87+
pylibcudf.partitioning.split_and_pack
7988
"""
8089
cdef vector[size_type] _columns_to_hash = tuple(columns_to_hash)
8190
cdef unordered_map[uint32_t, cpp_PackedData] _ret
@@ -103,6 +112,67 @@ cpdef dict partition_and_pack(
103112
return ret
104113

105114

115+
cpdef dict split_and_pack(
116+
Table table,
117+
splits,
118+
stream,
119+
DeviceMemoryResource device_mr,
120+
):
121+
"""
122+
Splits rows from the input table into multiple packed (serialized) tables.
123+
124+
Parameters
125+
----------
126+
table
127+
The input table to split and pack. The table cannot be empty (the
128+
split points would not be valid).
129+
splits
130+
The split points, equivalent to cudf::split(), i.e. one less than
131+
the number of result partitions.
132+
stream
133+
The CUDA stream used for memory operations.
134+
device_mr
135+
Reference to the RMM device memory resource used for device allocations.
136+
137+
Returns
138+
-------
139+
A dictionary where the keys are partition IDs and the values are packed tables.
140+
141+
Raises
142+
------
143+
ValueError
144+
If the input table is empty.
145+
146+
See Also
147+
--------
148+
rapidsmp.shuffler.unpack_and_concat
149+
pylibcudf.copy.split
150+
pylibcudf.partitioning.partition_and_pack
151+
"""
152+
cdef vector[size_type] _splits = tuple(splits)
153+
cdef unordered_map[uint32_t, cpp_PackedData] _ret
154+
cdef table_view tbl = table.view()
155+
if stream is None:
156+
raise ValueError("stream cannot be None")
157+
cdef cuda_stream_view _stream = Stream(stream).view()
158+
159+
with nogil:
160+
_ret = cpp_split_and_pack(
161+
tbl,
162+
_splits,
163+
_stream,
164+
device_mr.get_mr()
165+
)
166+
ret = {}
167+
cdef unordered_map[uint32_t, cpp_PackedData].iterator it = _ret.begin()
168+
while(it != _ret.end()):
169+
ret[deref(it).first] = PackedData.from_librapidsmpf(
170+
make_unique[cpp_PackedData](move(deref(it).second))
171+
)
172+
postincrement(it)
173+
return ret
174+
175+
106176
cdef extern from "<rapidsmpf/shuffler/partition.hpp>" nogil:
107177
cdef unique_ptr[cpp_table] cpp_unpack_and_concat \
108178
"rapidsmpf::shuffler::unpack_and_concat"(

python/rapidsmpf/rapidsmpf/tests/test_shuffler.py

+51-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@
1313

1414
from rapidsmpf.buffer.resource import BufferResource
1515
from rapidsmpf.progress_thread import ProgressThread
16-
from rapidsmpf.shuffler import Shuffler, partition_and_pack, unpack_and_concat
16+
from rapidsmpf.shuffler import (
17+
Shuffler,
18+
partition_and_pack,
19+
split_and_pack,
20+
unpack_and_concat,
21+
)
1722
from rapidsmpf.testing import assert_eq
1823
from rapidsmpf.utils.cudf import (
1924
cudf_to_pylibcudf_table,
@@ -50,6 +55,51 @@ def test_partition_and_pack_unpack(
5055
assert_eq(expect, got, sort_rows="0")
5156

5257

58+
@pytest.mark.parametrize(
59+
"df",
60+
[
61+
{"0": [1, 2, 3], "1": [2, 2, 1]},
62+
{"0": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], "1": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]},
63+
],
64+
)
65+
@pytest.mark.parametrize("num_partitions", [1, 2, 3, 10])
66+
def test_split_and_pack_unpack(
67+
device_mr: rmm.mr.CudaMemoryResource, df: dict[str, list[int]], num_partitions: int
68+
) -> None:
69+
expect = cudf.DataFrame(df)
70+
splits = np.linspace(0, len(expect), num_partitions, endpoint=False)[1:].astype(int)
71+
partitions = split_and_pack(
72+
cudf_to_pylibcudf_table(expect),
73+
splits=splits,
74+
stream=DEFAULT_STREAM,
75+
device_mr=device_mr,
76+
)
77+
got = pylibcudf_to_cudf_dataframe(
78+
unpack_and_concat(
79+
tuple(partitions[i] for i in range(num_partitions)),
80+
stream=DEFAULT_STREAM,
81+
device_mr=device_mr,
82+
)
83+
)
84+
85+
assert_eq(expect, got)
86+
87+
88+
@pytest.mark.parametrize("num_partitions", [1, 2, 3, 10])
89+
def test_split_and_pack_unpack_empty_table(
90+
device_mr: rmm.mr.CudaMemoryResource, num_partitions: int
91+
) -> None:
92+
expect = cudf.DataFrame({"0": [], "1": []})
93+
splits = np.linspace(0, len(expect), num_partitions, endpoint=False)[1:].astype(int)
94+
with pytest.raises(ValueError, match=".*the input table cannot be empty"):
95+
split_and_pack(
96+
cudf_to_pylibcudf_table(expect),
97+
splits=splits,
98+
stream=DEFAULT_STREAM,
99+
device_mr=device_mr,
100+
)
101+
102+
53103
@pytest.mark.parametrize("wait_on", [False, True])
54104
@pytest.mark.parametrize("total_num_partitions", [1, 2, 3, 10])
55105
def test_shuffler_single_nonempty_partition(

0 commit comments

Comments
 (0)