Skip to content

Commit 10c024b

Browse files
committed
Implement split_and_shuffle, as a building block for sort-shuffling
Implementing a distributed sort has the following components (more complex things possibly exists, but I think this should be OK): LocalSort -> SelectEquidistant/SplitPointValues -> BroadcastToAll (Split points are selected by the number of partitions.) The BroadcastToAll result will then contain: 1. The original data (by which we sorted, no need for all rows) 2. Two additional rows with (partition_id, local_row_id) to establish a global order. The `BroadcastToAll` result can then be sorted again and used to figure out how to move each split around. And for that step one needs something like the `split_and_pack()` to replace the `partition_and_pack()` currently used for hash partitioning. So with the above one should be able to implement a ShuffleForSort(LocalSort, BroadcastToAll). The final sort result would then be another local sort after shuffling. (The needed broadcast is small, so I assume there is no need to do it with rapidsmpf initially.) The above approach guarantees that the result partitions are at most a factor of two less balanced than the input (I can look up the reference). Signed-off-by: Sebastian Berg <[email protected]>
1 parent 0317f38 commit 10c024b

File tree

14 files changed

+214
-20
lines changed

14 files changed

+214
-20
lines changed

cpp/include/rapidsmpf/shuffler/partition.hpp

+26
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,32 @@ partition_and_split(
8585
rmm::device_async_resource_ref mr
8686
);
8787

88+
89+
/**
90+
* @brief Splits rows from the input table into multiple packed (serialized) tables.
91+
*
92+
* @param tables The tables to pack into partitions.
93+
* @param splits The split points, equivalent to cudf::split(), i.e. one less than
94+
* the number of result partitions.
95+
* @param stream CUDA stream used for device memory operations and kernel launches.
96+
* @param mr Device memory resource used to allocate the returned table's device memory.
97+
*
98+
* @return A map of partition IDs and their packed tables.
99+
*
100+
* @throw std::invalid_argument if the input table is empty
101+
*
102+
* @see unpack_and_concat
103+
* @see cudf::split
104+
* @see partition_and_pack
105+
*/
106+
[[nodiscard]] std::unordered_map<PartID, PackedData> split_and_pack(
107+
cudf::table_view const& table,
108+
std::vector<cudf::size_type> const& splits,
109+
rmm::cuda_stream_view stream,
110+
rmm::device_async_resource_ref mr
111+
);
112+
113+
88114
/**
89115
* @brief Unpack (deserialize) input tables and concatenate them.
90116
*

cpp/src/shuffler/partition.cpp

+21
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,27 @@ std::unordered_map<PartID, PackedData> partition_and_pack(
7979
return ret;
8080
}
8181

82+
std::unordered_map<PartID, PackedData> split_and_pack(
83+
cudf::table_view const& table,
84+
std::vector<cudf::size_type> const& splits,
85+
rmm::cuda_stream_view stream,
86+
rmm::device_async_resource_ref mr
87+
) {
88+
RAPIDSMPF_NVTX_FUNC_RANGE();
89+
// Can't split empty tables (0 is out of bounds), so raise
90+
RAPIDSMPF_EXPECTS(
91+
table.num_rows() > 0, "the input table cannot be empty", std::invalid_argument
92+
);
93+
94+
auto tables = cudf::split(table, splits, stream);
95+
std::unordered_map<PartID, PackedData> ret;
96+
for (PartID i = 0; static_cast<std::size_t>(i) < tables.size(); ++i) {
97+
auto pack = cudf::detail::pack(tables[i], stream, mr);
98+
ret.emplace(i, PackedData(std::move(pack.metadata), std::move(pack.gpu_data)));
99+
}
100+
return ret;
101+
}
102+
82103
std::unique_ptr<cudf::table> unpack_and_concat(
83104
std::vector<PackedData>&& partitions,
84105
rmm::cuda_stream_view stream,

cpp/tests/test_shuffler.cpp

+30
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,36 @@ TEST_P(NumOfPartitions, partition_and_pack) {
6767
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(sort_table(expect), sort_table(result));
6868
}
6969

70+
TEST_P(NumOfPartitions, split_and_pack) {
71+
int const num_partitions = std::get<0>(GetParam());
72+
int const num_rows = std::get<1>(GetParam());
73+
std::int64_t const seed = 42;
74+
auto stream = cudf::get_default_stream();
75+
auto mr = cudf::get_current_device_resource_ref();
76+
77+
cudf::table expect = random_table_with_index(seed, num_rows, 0, 10);
78+
79+
std::vector<cudf::size_type> splits;
80+
for (int i = 1; i < num_partitions; ++i) {
81+
splits.emplace_back(i * num_rows / num_partitions);
82+
}
83+
84+
auto chunks = rapidsmpf::shuffler::split_and_pack(expect, splits, stream, mr);
85+
86+
// Convert to a vector (restoring the original order).
87+
std::vector<rapidsmpf::PackedData> chunks_vector;
88+
for (int i = 0; i < num_partitions; ++i) {
89+
chunks_vector.emplace_back(std::move(chunks.at(i)));
90+
}
91+
EXPECT_EQ(chunks_vector.size(), num_partitions);
92+
93+
auto result =
94+
rapidsmpf::shuffler::unpack_and_concat(std::move(chunks_vector), stream, mr);
95+
96+
// Compare the input table with the result.
97+
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(expect, *result);
98+
}
99+
70100
TEST(MetadataMessage, round_trip) {
71101
auto metadata = iota_vector<uint8_t>(100);
72102

python/rapidsmpf/rapidsmpf/buffer/packed_data.pyx

-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
from libcpp.memory cimport unique_ptr
55
from libcpp.utility cimport move
6-
76
from rapidsmpf.buffer.packed_data cimport cpp_PackedData
87

98

python/rapidsmpf/rapidsmpf/buffer/resource.pxd

+3-4
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,14 @@ from libcpp cimport bool
88
from libcpp.memory cimport shared_ptr
99
from libcpp.optional cimport optional
1010
from libcpp.unordered_map cimport unordered_map
11+
from rapidsmpf.buffer.buffer cimport MemoryType
12+
from rapidsmpf.buffer.spill_manager cimport SpillManager, cpp_SpillManager
13+
from rapidsmpf.utils.time cimport cpp_Duration
1114
from rmm.librmm.memory_resource cimport (device_memory_resource,
1215
statistics_resource_adaptor)
1316
from rmm.pylibrmm.memory_resource cimport (DeviceMemoryResource,
1417
StatisticsResourceAdaptor)
1518

16-
from rapidsmpf.buffer.buffer cimport MemoryType
17-
from rapidsmpf.buffer.spill_manager cimport SpillManager, cpp_SpillManager
18-
from rapidsmpf.utils.time cimport cpp_Duration
19-
2019

2120
cdef extern from "<functional>" nogil:
2221
cdef cppclass cpp_MemoryAvailable "std::function<std::int64_t()>":

python/rapidsmpf/rapidsmpf/buffer/spill_manager.pyx

-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
from cython.operator cimport dereference as deref
55
from libc.stddef cimport size_t
6-
76
from rapidsmpf.buffer.resource cimport BufferResource
87
from rapidsmpf.exception_handling cimport (CppExcept,
98
throw_py_as_cpp_exception,

python/rapidsmpf/rapidsmpf/communicator/mpi.pyx

-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from libcpp.memory cimport make_shared
55
from mpi4py cimport libmpi
66
from mpi4py.MPI cimport Intracomm
7-
87
from rapidsmpf.communicator.communicator cimport Communicator
98

109

python/rapidsmpf/rapidsmpf/communicator/ucxx.pyx

+1-2
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,9 @@ from libcpp.optional cimport nullopt, nullopt_t
1010
from libcpp.pair cimport pair
1111
from libcpp.string cimport string
1212
from libcpp.utility cimport move
13-
from ucxx._lib.libucxx cimport Address, UCXAddress, UCXWorker, Worker
14-
1513
from rapidsmpf.communicator.communicator cimport *
1614
from rapidsmpf.communicator.ucxx cimport *
15+
from ucxx._lib.libucxx cimport Address, UCXAddress, UCXWorker, Worker
1716

1817

1918
cdef extern from "<variant>" namespace "std" nogil:

python/rapidsmpf/rapidsmpf/progress_thread.pxd

-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ from libc.stdint cimport uint64_t, uintptr_t
77
from libcpp.functional cimport function
88
from libcpp.memory cimport shared_ptr, unique_ptr
99
from libcpp.utility cimport move
10-
1110
from rapidsmpf.communicator.communicator cimport cpp_Logger
1211
from rapidsmpf.statistics cimport cpp_Statistics
1312

python/rapidsmpf/rapidsmpf/progress_thread.pyx

-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
from cython.operator cimport dereference as deref
66
from libcpp.memory cimport make_shared
7-
87
from rapidsmpf.communicator.communicator cimport Communicator
98
from rapidsmpf.statistics cimport Statistics
109

python/rapidsmpf/rapidsmpf/shuffler.pxd

+3-4
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,14 @@ from libcpp.string cimport string
88
from libcpp.unordered_map cimport unordered_map
99
from libcpp.vector cimport vector
1010
from pylibcudf.table cimport Table
11-
from rmm.librmm.cuda_stream_view cimport cuda_stream_view
12-
from rmm.pylibrmm.memory_resource cimport DeviceMemoryResource
13-
from rmm.pylibrmm.stream cimport Stream
14-
1511
from rapidsmpf.buffer.packed_data cimport cpp_PackedData
1612
from rapidsmpf.buffer.resource cimport BufferResource, cpp_BufferResource
1713
from rapidsmpf.communicator.communicator cimport Communicator, cpp_Communicator
1814
from rapidsmpf.progress_thread cimport cpp_ProgressThread
1915
from rapidsmpf.statistics cimport cpp_Statistics
16+
from rmm.librmm.cuda_stream_view cimport cuda_stream_view
17+
from rmm.pylibrmm.memory_resource cimport DeviceMemoryResource
18+
from rmm.pylibrmm.stream cimport Stream
2019

2120

2221
cpdef dict partition_and_pack(

python/rapidsmpf/rapidsmpf/shuffler.pyi

+6
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ def partition_and_pack(
2121
stream: Stream,
2222
device_mr: DeviceMemoryResource,
2323
) -> dict[int, PackedData]: ...
24+
def split_and_pack(
25+
table: Table,
26+
splits: Iterable[int],
27+
stream: Stream,
28+
device_mr: DeviceMemoryResource,
29+
) -> dict[int, PackedData]: ...
2430
def unpack_and_concat(
2531
partitions: Iterable[PackedData],
2632
stream: Stream,

python/rapidsmpf/rapidsmpf/shuffler.pyx

+73-4
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,14 @@ from pylibcudf.libcudf.table.table cimport table as cpp_table
1313
from pylibcudf.libcudf.table.table_view cimport table_view
1414
from pylibcudf.libcudf.types cimport size_type
1515
from pylibcudf.table cimport Table
16+
from rapidsmpf.buffer.packed_data cimport PackedData, cpp_PackedData
17+
from rapidsmpf.progress_thread cimport ProgressThread
18+
from rapidsmpf.statistics cimport Statistics
1619
from rmm.librmm.cuda_stream_view cimport cuda_stream_view
1720
from rmm.librmm.memory_resource cimport device_memory_resource
1821
from rmm.pylibrmm.memory_resource cimport DeviceMemoryResource
1922
from rmm.pylibrmm.stream cimport Stream
2023

21-
from rapidsmpf.buffer.packed_data cimport PackedData, cpp_PackedData
22-
from rapidsmpf.progress_thread cimport ProgressThread
23-
from rapidsmpf.statistics cimport Statistics
24-
2524

2625
cdef extern from "<rapidsmpf/shuffler/partition.hpp>" nogil:
2726
int cpp_HASH_MURMUR3"cudf::hash_id::HASH_MURMUR3"
@@ -38,6 +37,14 @@ cdef extern from "<rapidsmpf/shuffler/partition.hpp>" nogil:
3837
device_memory_resource *mr,
3938
) except +
4039

40+
cdef unordered_map[uint32_t, cpp_PackedData] cpp_split_and_pack \
41+
"rapidsmpf::shuffler::split_and_pack"(
42+
const table_view& table,
43+
const vector[size_type] &splits,
44+
cuda_stream_view stream,
45+
device_memory_resource *mr,
46+
) except +
47+
4148

4249
cpdef dict partition_and_pack(
4350
Table table,
@@ -76,6 +83,7 @@ cpdef dict partition_and_pack(
7683
rapidsmpf.shuffler.unpack_and_concat
7784
pylibcudf.partitioning.hash_partition
7885
pylibcudf.contiguous_split.pack
86+
pylibcudf.partitioning.split_and_pack
7987
"""
8088
cdef vector[size_type] _columns_to_hash = tuple(columns_to_hash)
8189
cdef unordered_map[uint32_t, cpp_PackedData] _ret
@@ -103,6 +111,67 @@ cpdef dict partition_and_pack(
103111
return ret
104112

105113

114+
cpdef dict split_and_pack(
115+
Table table,
116+
splits,
117+
stream,
118+
DeviceMemoryResource device_mr,
119+
):
120+
"""
121+
Splits rows from the input table into multiple packed (serialized) tables.
122+
123+
Parameters
124+
----------
125+
table
126+
The input table to split and pack. The table cannot be empty (the
127+
split points would not be valid).
128+
splits
129+
The split points, equivalent to cudf::split(), i.e. one less than
130+
the number of result partitions.
131+
stream
132+
The CUDA stream used for memory operations.
133+
device_mr
134+
Reference to the RMM device memory resource used for device allocations.
135+
136+
Returns
137+
-------
138+
A dictionary where the keys are partition IDs and the values are packed tables.
139+
140+
Raises
141+
------
142+
ValueError
143+
If the input table is empty.
144+
145+
See Also
146+
--------
147+
rapidsmp.shuffler.unpack_and_concat
148+
pylibcudf.copy.split
149+
pylibcudf.partitioning.partition_and_pack
150+
"""
151+
cdef vector[size_type] _splits = tuple(splits)
152+
cdef unordered_map[uint32_t, cpp_PackedData] _ret
153+
cdef table_view tbl = table.view()
154+
if stream is None:
155+
raise ValueError("stream cannot be None")
156+
cdef cuda_stream_view _stream = Stream(stream).view()
157+
158+
with nogil:
159+
_ret = cpp_split_and_pack(
160+
tbl,
161+
_splits,
162+
_stream,
163+
device_mr.get_mr()
164+
)
165+
ret = {}
166+
cdef unordered_map[uint32_t, cpp_PackedData].iterator it = _ret.begin()
167+
while(it != _ret.end()):
168+
ret[deref(it).first] = PackedData.from_librapidsmpf(
169+
make_unique[cpp_PackedData](move(deref(it).second))
170+
)
171+
postincrement(it)
172+
return ret
173+
174+
106175
cdef extern from "<rapidsmpf/shuffler/partition.hpp>" nogil:
107176
cdef unique_ptr[cpp_table] cpp_unpack_and_concat \
108177
"rapidsmpf::shuffler::unpack_and_concat"(

python/rapidsmpf/rapidsmpf/tests/test_shuffler.py

+51-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@
1313

1414
from rapidsmpf.buffer.resource import BufferResource
1515
from rapidsmpf.progress_thread import ProgressThread
16-
from rapidsmpf.shuffler import Shuffler, partition_and_pack, unpack_and_concat
16+
from rapidsmpf.shuffler import (
17+
Shuffler,
18+
partition_and_pack,
19+
split_and_pack,
20+
unpack_and_concat,
21+
)
1722
from rapidsmpf.testing import assert_eq
1823
from rapidsmpf.utils.cudf import (
1924
cudf_to_pylibcudf_table,
@@ -50,6 +55,51 @@ def test_partition_and_pack_unpack(
5055
assert_eq(expect, got, sort_rows="0")
5156

5257

58+
@pytest.mark.parametrize(
59+
"df",
60+
[
61+
{"0": [1, 2, 3], "1": [2, 2, 1]},
62+
{"0": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], "1": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]},
63+
],
64+
)
65+
@pytest.mark.parametrize("num_partitions", [1, 2, 3, 10])
66+
def test_split_and_pack_unpack(
67+
device_mr: rmm.mr.CudaMemoryResource, df: dict[str, list[int]], num_partitions: int
68+
) -> None:
69+
expect = cudf.DataFrame(df)
70+
splits = np.linspace(0, len(expect), num_partitions, endpoint=False)[1:].astype(int)
71+
partitions = split_and_pack(
72+
cudf_to_pylibcudf_table(expect),
73+
splits=splits,
74+
stream=DEFAULT_STREAM,
75+
device_mr=device_mr,
76+
)
77+
got = pylibcudf_to_cudf_dataframe(
78+
unpack_and_concat(
79+
tuple(partitions[i] for i in range(num_partitions)),
80+
stream=DEFAULT_STREAM,
81+
device_mr=device_mr,
82+
)
83+
)
84+
85+
assert_eq(expect, got)
86+
87+
88+
@pytest.mark.parametrize("num_partitions", [1, 2, 3, 10])
89+
def test_split_and_pack_unpack_empty_table(
90+
device_mr: rmm.mr.CudaMemoryResource, num_partitions: int
91+
) -> None:
92+
expect = cudf.DataFrame({"0": [], "1": []})
93+
splits = np.linspace(0, len(expect), num_partitions, endpoint=False)[1:].astype(int)
94+
with pytest.raises(ValueError, match=".*the input table cannot be empty"):
95+
split_and_pack(
96+
cudf_to_pylibcudf_table(expect),
97+
splits=splits,
98+
stream=DEFAULT_STREAM,
99+
device_mr=device_mr,
100+
)
101+
102+
53103
@pytest.mark.parametrize("wait_on", [False, True])
54104
@pytest.mark.parametrize("total_num_partitions", [1, 2, 3, 10])
55105
def test_shuffler_single_nonempty_partition(

0 commit comments

Comments
 (0)