Skip to content

Commit ed23fb8

Browse files
committed
Remove default_cuda_stream_priority from native code and deprecate it in Python.
Signed-off-by: Michał Zientkiewicz <[email protected]>
1 parent 51bd5ea commit ed23fb8

14 files changed

+35
-193
lines changed

dali/pipeline/executor/async_pipelined_executor.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
1+
// Copyright (c) 2017-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -35,10 +35,9 @@ class DLL_PUBLIC AsyncPipelinedExecutor : public PipelinedExecutor {
3535
DLL_PUBLIC inline AsyncPipelinedExecutor(int batch_size, int num_thread, int device_id,
3636
size_t bytes_per_sample_hint, bool set_affinity = false,
3737
int max_num_stream = -1,
38-
int default_cuda_stream_priority = 0,
3938
QueueSizes prefetch_queue_depth = QueueSizes{2, 2})
4039
: PipelinedExecutor(batch_size, num_thread, device_id, bytes_per_sample_hint, set_affinity,
41-
max_num_stream, default_cuda_stream_priority, prefetch_queue_depth),
40+
max_num_stream, prefetch_queue_depth),
4241
cpu_thread_(device_id, set_affinity, "CPU executor"),
4342
mixed_thread_(device_id, set_affinity, "Mixed executor"),
4443
gpu_thread_(device_id, set_affinity, "GPU executor") {}

dali/pipeline/executor/async_separated_pipelined_executor.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,10 @@ class DLL_PUBLIC AsyncSeparatedPipelinedExecutor : public SeparatedPipelinedExec
3434
public:
3535
DLL_PUBLIC inline AsyncSeparatedPipelinedExecutor(
3636
int batch_size, int num_thread, int device_id, size_t bytes_per_sample_hint,
37-
bool set_affinity = false, int max_num_stream = -1, int default_cuda_stream_priority = 0,
37+
bool set_affinity = false, int max_num_stream = -1,
3838
QueueSizes prefetch_queue_depth = QueueSizes{2, 2})
3939
: SeparatedPipelinedExecutor(batch_size, num_thread, device_id, bytes_per_sample_hint,
40-
set_affinity, max_num_stream, default_cuda_stream_priority,
41-
prefetch_queue_depth),
40+
set_affinity, max_num_stream, prefetch_queue_depth),
4241
cpu_thread_(device_id, set_affinity, "CPU executor"),
4342
mixed_thread_(device_id, set_affinity, "Mixed executor"),
4443
gpu_thread_(device_id, set_affinity, "GPU executor") {}

dali/pipeline/executor/executor_factory.cc

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ namespace {
2929
auto MakeExec2Config(int batch_size, int num_thread, int device_id,
3030
size_t bytes_per_sample_hint, bool set_affinity,
3131
int max_num_stream,
32-
int default_cuda_stream_priority,
3332
QueueSizes prefetch_queue_depth) {
3433
exec2::Executor2::Config cfg{};
3534
cfg.async_output = false;
@@ -92,12 +91,11 @@ std::unique_ptr<ExecutorBase> GetExecutor(bool pipelined, bool separated, bool a
9291
int batch_size, int num_thread, int device_id,
9392
size_t bytes_per_sample_hint, bool set_affinity,
9493
int max_num_stream,
95-
int default_cuda_stream_priority,
9694
QueueSizes prefetch_queue_depth) {
9795
return GetExecutorImpl(
9896
pipelined, separated, async, dynamic,
9997
batch_size, num_thread, device_id, bytes_per_sample_hint, set_affinity,
100-
max_num_stream, default_cuda_stream_priority, prefetch_queue_depth);
98+
max_num_stream, prefetch_queue_depth);
10199
}
102100

103101
} // namespace dali

dali/pipeline/executor/executor_factory.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ std::unique_ptr<ExecutorBase> GetExecutor(bool pipelined, bool separated, bool a
2727
int batch_size, int num_thread, int device_id,
2828
size_t bytes_per_sample_hint, bool set_affinity = false,
2929
int max_num_stream = -1,
30-
int default_cuda_stream_priority = 0,
3130
QueueSizes prefetch_queue_depth = QueueSizes{2, 2});
3231

3332
} // namespace dali

dali/pipeline/executor/executor_impl.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
#include "dali/pipeline/operator/name_utils.h"
4646
#include "dali/pipeline/util/batch_utils.h"
4747
#include "dali/pipeline/util/event_pool.h"
48-
#include "dali/pipeline/util/stream_pool.h"
4948
#include "dali/pipeline/util/thread_pool.h"
5049
#include "dali/pipeline/workspace/iteration_data.h"
5150
#include "dali/pipeline/workspace/workspace_data_factory.h"
@@ -75,7 +74,7 @@ class DLL_PUBLIC Executor : public ExecutorBase, public QueuePolicy {
7574
public:
7675
DLL_PUBLIC inline Executor(int max_batch_size, int num_thread, int device_id,
7776
size_t bytes_per_sample_hint, bool set_affinity = false,
78-
int max_num_stream = -1, int default_cuda_stream_priority = 0,
77+
int max_num_stream = -1,
7978
QueueSizes prefetch_queue_depth = QueueSizes{2, 2})
8079
: max_batch_size_(max_batch_size),
8180
device_id_(device_id),

dali/pipeline/executor/pipelined_executor.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,10 @@ class DLL_PUBLIC PipelinedExecutorImpl : public Executor<WorkspacePolicy, QueueP
4141
DLL_PUBLIC inline PipelinedExecutorImpl(int batch_size, int num_thread, int device_id,
4242
size_t bytes_per_sample_hint, bool set_affinity = false,
4343
int max_num_stream = -1,
44-
int default_cuda_stream_priority = 0,
4544
QueueSizes prefetch_queue_depth = {2, 2})
4645
: Executor<WorkspacePolicy, QueuePolicy>(batch_size, num_thread, device_id,
4746
bytes_per_sample_hint, set_affinity, max_num_stream,
48-
default_cuda_stream_priority, prefetch_queue_depth) {
47+
prefetch_queue_depth) {
4948
}
5049

5150
DLL_PUBLIC ~PipelinedExecutorImpl() override = default;

dali/pipeline/operator/op_schema.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ OpSchema::OpSchema(const std::string &name) : name_(name) {
6767
AddInternalArg("max_batch_size", "Max batch size", -1);
6868
AddInternalArg("device", "Device on which the Op is run", std::string("cpu"));
6969
AddInternalArg("inplace", "Whether Op can be run in place", false);
70-
AddInternalArg("default_cuda_stream_priority", "Default cuda stream priority", 0);
70+
AddInternalArg("default_cuda_stream_priority", "Default cuda stream priority", 0); // deprecated
7171
AddInternalArg("checkpointing", "Setting to `true` enables checkpointing", false);
7272

7373
AddOptionalArg("seed", R"code(Random seed.

dali/pipeline/operator/operator.h

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,7 @@ class DLL_PUBLIC OperatorBase {
7575
inline explicit OperatorBase(const OpSpec &spec)
7676
: spec_(spec),
7777
num_threads_(spec.GetArgument<int>("num_threads")),
78-
max_batch_size_(spec.GetArgument<int>("max_batch_size")),
79-
default_cuda_stream_priority_(spec.GetArgument<int>("default_cuda_stream_priority")) {
78+
max_batch_size_(spec.GetArgument<int>("max_batch_size")) {
8079
DALI_ENFORCE(num_threads_ > 0, "Invalid value for argument num_threads.");
8180
DALI_ENFORCE(max_batch_size_ > 0, "Invalid value for argument max_batch_size.");
8281
}
@@ -239,16 +238,14 @@ class DLL_PUBLIC OperatorBase {
239238
const OpSpec spec_;
240239
int num_threads_;
241240
int max_batch_size_;
242-
int default_cuda_stream_priority_;
243241

244242
std::unordered_map<std::string, std::any> diagnostics_;
245243
};
246244

247245
#define USE_OPERATOR_MEMBERS() \
248246
using OperatorBase::spec_; \
249247
using OperatorBase::num_threads_; \
250-
using OperatorBase::max_batch_size_; \
251-
using OperatorBase::default_cuda_stream_priority_
248+
using OperatorBase::max_batch_size_;
252249

253250
/**
254251
* @brief Class defining an operator using specific backend.

dali/pipeline/pipeline.cc

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -97,19 +97,19 @@ void InitializeMemoryResources() {
9797
Pipeline::Pipeline(int max_batch_size, int num_threads, int device_id, int64_t seed,
9898
bool pipelined_execution, int prefetch_queue_depth,
9999
bool async_execution, bool dynamic_execution, size_t bytes_per_sample_hint,
100-
bool set_affinity, int max_num_stream, int default_cuda_stream_priority) {
100+
bool set_affinity, int max_num_stream) {
101101
InitializeMemoryResources();
102102
Init(max_batch_size, num_threads, device_id, seed, pipelined_execution, separated_execution_,
103103
async_execution, dynamic_execution, bytes_per_sample_hint, set_affinity, max_num_stream,
104-
default_cuda_stream_priority, QueueSizes{prefetch_queue_depth});
104+
QueueSizes{prefetch_queue_depth});
105105
}
106106

107107
Pipeline::Pipeline(const string &serialized_pipe,
108108
int batch_size, int num_threads, int device_id,
109109
bool pipelined_execution, int prefetch_queue_depth,
110110
bool async_execution, bool dynamic_execution,
111111
size_t bytes_per_sample_hint, bool set_affinity, int max_num_stream,
112-
int default_cuda_stream_priority, int64_t seed) {
112+
int64_t seed) {
113113
InitializeMemoryResources();
114114
dali_proto::PipelineDef def;
115115
DALI_ENFORCE(DeserializePipeline(serialized_pipe, def), "Error parsing serialized pipeline.");
@@ -142,7 +142,6 @@ Pipeline::Pipeline(const string &serialized_pipe,
142142
bytes_per_sample_hint,
143143
set_affinity,
144144
max_num_stream,
145-
default_cuda_stream_priority,
146145
QueueSizes{prefetch_queue_depth});
147146

148147
// from serialized pipeline, construct new pipeline
@@ -181,7 +180,7 @@ void Pipeline::Init(int max_batch_size, int num_threads, int device_id, int64_t
181180
bool pipelined_execution, bool separated_execution,
182181
bool async_execution, bool dynamic_execution,
183182
size_t bytes_per_sample_hint, bool set_affinity, int max_num_stream,
184-
int default_cuda_stream_priority, QueueSizes prefetch_queue_depth) {
183+
QueueSizes prefetch_queue_depth) {
185184
DALI_ENFORCE(device_id == CPU_ONLY_DEVICE_ID || cuInitChecked(),
186185
"You are trying to create a GPU DALI pipeline, while CUDA is not available. "
187186
"Please install CUDA or set `device_id = None` in Pipeline constructor. "
@@ -200,7 +199,6 @@ void Pipeline::Init(int max_batch_size, int num_threads, int device_id, int64_t
200199
this->bytes_per_sample_hint_ = bytes_per_sample_hint;
201200
this->set_affinity_ = set_affinity;
202201
this->max_num_stream_ = max_num_stream;
203-
this->default_cuda_stream_priority_ = default_cuda_stream_priority;
204202
this->prefetch_queue_depth_ = prefetch_queue_depth;
205203
this->separated_execution_ = (prefetch_queue_depth.cpu_size != prefetch_queue_depth.gpu_size);
206204
DALI_ENFORCE(max_batch_size_ > 0, "Max batch size must be greater than 0");
@@ -215,14 +213,6 @@ void Pipeline::Init(int max_batch_size, int num_threads, int device_id, int64_t
215213
std::min(lowest_cuda_stream_priority, highest_cuda_stream_priority);
216214
const auto max_priority_value =
217215
std::max(lowest_cuda_stream_priority, highest_cuda_stream_priority);
218-
DALI_ENFORCE(
219-
default_cuda_stream_priority >= min_priority_value &&
220-
default_cuda_stream_priority <= max_priority_value,
221-
"Provided default cuda stream priority `" + std::to_string(default_cuda_stream_priority) +
222-
"` is outside the priority range [" + std::to_string(min_priority_value) + ", " +
223-
std::to_string(max_priority_value) + "], with lowest priority being `" +
224-
std::to_string(lowest_cuda_stream_priority) + "` and highest priority being `" +
225-
std::to_string(highest_cuda_stream_priority) + "`");
226216

227217
seed_.resize(MAX_SEEDS);
228218
current_seed_ = 0;
@@ -473,7 +463,7 @@ void Pipeline::Build(std::vector<PipelineOutputDesc> output_descs) {
473463
executor_ =
474464
GetExecutor(pipelined_execution_, separated_execution_, async_execution_, dynamic_execution_,
475465
max_batch_size_, num_threads_, device_id_, bytes_per_sample_hint_, set_affinity_,
476-
max_num_stream_, default_cuda_stream_priority_, prefetch_queue_depth_);
466+
max_num_stream_, prefetch_queue_depth_);
477467
executor_->EnableMemoryStats(enable_memory_stats_);
478468
executor_->EnableCheckpointing(checkpointing_);
479469
executor_->Init();

dali/pipeline/pipeline.h

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,21 +85,19 @@ class DLL_PUBLIC Pipeline {
8585
* configured in the thread pool. Defaults to 'false'.
8686
* @param max_num_stream set an upper limit on the number of cudaStreams
8787
* that can be allocated by the pipeline.
88-
* @param default_cuda_stream_priority CUDA stream priority used by DALI.
89-
* See `cudaStreamCreateWithPriority` in CUDA documentation
9088
*/
9189
DLL_PUBLIC Pipeline(int max_batch_size, int num_threads, int device_id, int64_t seed = -1,
9290
bool pipelined_execution = true, int prefetch_queue_depth = 2,
9391
bool async_execution = true, bool dynamic_execution = false,
9492
size_t bytes_per_sample_hint = 0, bool set_affinity = false,
95-
int max_num_stream = -1, int default_cuda_stream_priority = 0);
93+
int max_num_stream = -1);
9694

9795
DLL_PUBLIC Pipeline(const string &serialized_pipe,
9896
int max_batch_size = -1, int num_threads = -1, int device_id = -1,
9997
bool pipelined_execution = true, int prefetch_queue_depth = 2,
10098
bool async_execution = true, bool dynamic_execution = false,
10199
size_t bytes_per_sample_hint = 0, bool set_affinity = false,
102-
int max_num_stream = -1, int default_cuda_stream_priority = 0,
100+
int max_num_stream = -1,
103101
int64_t seed = -1);
104102

105103
virtual DLL_PUBLIC ~Pipeline();
@@ -587,7 +585,7 @@ class DLL_PUBLIC Pipeline {
587585
void Init(int batch_size, int num_threads, int device_id, int64_t seed, bool pipelined_execution,
588586
bool separated_execution, bool async_execution, bool dynamic_execution,
589587
size_t bytes_per_sample_hint,
590-
bool set_affinity, int max_num_stream, int default_cuda_stream_priority,
588+
bool set_affinity, int max_num_stream,
591589
QueueSizes prefetch_queue_depth = QueueSizes{2});
592590

593591
struct EdgeMeta {
@@ -716,7 +714,6 @@ class DLL_PUBLIC Pipeline {
716714
size_t bytes_per_sample_hint_ = 0;
717715
int set_affinity_ = 0;
718716
int max_num_stream_ = 0;
719-
int default_cuda_stream_priority_ = 0;
720717
int next_logical_id_ = 0;
721718
int next_internal_logical_id_ = -1;
722719
QueueSizes prefetch_queue_depth_{};

dali/pipeline/util/stream_pool.h

Lines changed: 0 additions & 93 deletions
This file was deleted.

dali/python/backend_impl.cc

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2024,13 +2024,12 @@ PYBIND11_MODULE(backend_impl, m) {
20242024
bool pipelined_execution = true, int prefetch_queue_depth = 2,
20252025
bool async_execution = true, bool dynamic_execution = false,
20262026
size_t bytes_per_sample_hint = 0,
2027-
bool set_affinity = false, int max_num_stream = -1,
2028-
int default_cuda_stream_priority = 0) {
2027+
bool set_affinity = false, int max_num_stream = -1) {
20292028
return std::make_unique<PyPipeline>(
20302029
batch_size, num_threads, device_id, seed,
20312030
pipelined_execution, prefetch_queue_depth, async_execution, dynamic_execution,
20322031
bytes_per_sample_hint, set_affinity,
2033-
max_num_stream, default_cuda_stream_priority);
2032+
max_num_stream);
20342033
}),
20352034
"batch_size"_a,
20362035
"num_threads"_a,
@@ -2042,23 +2041,21 @@ PYBIND11_MODULE(backend_impl, m) {
20422041
"exec_dynamic"_a = false,
20432042
"bytes_per_sample_hint"_a = 0,
20442043
"set_affinity"_a = false,
2045-
"max_num_stream"_a = -1,
2046-
"default_cuda_stream_priority"_a = 0
2044+
"max_num_stream"_a = -1
20472045
)
20482046
// initialize from serialized pipeline
20492047
.def(py::init(
20502048
[](string serialized_pipe,
20512049
int batch_size = -1, int num_threads = -1, int device_id = -1,
20522050
bool pipelined_execution = true, int prefetch_queue_depth = 2,
20532051
bool async_execution = true, bool dynamic_execution = false,
2054-
size_t bytes_per_sample_hint = 0, bool set_affinity = false, int max_num_stream = -1,
2055-
int default_cuda_stream_priority = 0) {
2052+
size_t bytes_per_sample_hint = 0, bool set_affinity = false, int max_num_stream = -1) {
20562053
return std::make_unique<PyPipeline>(
20572054
serialized_pipe,
20582055
batch_size, num_threads, device_id, pipelined_execution,
20592056
prefetch_queue_depth, async_execution, dynamic_execution,
20602057
bytes_per_sample_hint, set_affinity,
2061-
max_num_stream, default_cuda_stream_priority);
2058+
max_num_stream);
20622059
}),
20632060
"serialized_pipe"_a,
20642061
"batch_size"_a = -1,
@@ -2070,8 +2067,7 @@ PYBIND11_MODULE(backend_impl, m) {
20702067
"exec_dynamic"_a = true,
20712068
"bytes_per_sample_hint"_a = 0,
20722069
"set_affinity"_a = false,
2073-
"max_num_stream"_a = -1,
2074-
"default_cuda_stream_priority"_a = 0
2070+
"max_num_stream"_a = -1
20752071
)
20762072
.def("AddOperator",
20772073
static_cast<int (Pipeline::*)(const OpSpec &, const std::string &)>

0 commit comments

Comments
 (0)