Skip to content

Commit 94f02ad

Browse files
authored
Add metadata-only inputs. (#5635)
* Assign a stream to CPU inputs with GPU (non-metadata) inputs * Add Metadata input device - this declares that the input is used for metadata (shape, dtype, etc) access only * Don't synchronize metadata inputs in executor. * Don't prolong the lifetime of metadata-only inputs. * Add InputDevice specifier to random number generators shape_like input. --------- Signed-off-by: Michał Zientkiewicz <[email protected]>
1 parent 8904209 commit 94f02ad

15 files changed

+170
-34
lines changed

dali/operators/generic/cast.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ DALI_SCHEMA(Cast)
7676
DALI_SCHEMA(CastLike)
7777
.DocStr("Cast the first tensor to the type of the second tensor.")
7878
.NumInput(2)
79-
.InputDevice(1, InputDevice::Any)
79+
.InputDevice(1, InputDevice::Metadata)
8080
.NumOutput(1)
8181
.AllowSequences()
8282
.SupportVolumetric();

dali/operators/generic/constant_value.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ DALI_SCHEMA(FullLike)
8484
.DocStr(R"code(Returns new data with the same shape and type as the input data, filled with a `fill_value`.)code")
8585
.NumInput(2)
8686
.InputDox(0, "data_like", "TensorList", R"code(The input data value to copy the shape and type from.)code")
87-
.InputDevice(0, InputDevice::Any)
87+
.InputDevice(0, InputDevice::Metadata)
8888
.InputDox(1, "fill_value", "TensorList", R"code(The fill value.)code")
8989
.NumOutput(1);
9090
DALI_REGISTER_OPERATOR(FullLike, FullLike<CPUBackend>, CPU);
@@ -102,7 +102,7 @@ DALI_SCHEMA(ZerosLike)
102102
.DocStr(R"code(Returns new data with the same shape and type as the input array, filled with zeros.)code")
103103
.NumInput(1)
104104
.InputDox(0, "data_like", "TensorList", R"code(The input data value to copy the shape and type from.)code")
105-
.InputDevice(0, InputDevice::Any)
105+
.InputDevice(0, InputDevice::Metadata)
106106
.NumOutput(1)
107107
.AddOptionalTypeArg("dtype", R"code(Overrides the output data type.)code", DALI_INT32);
108108
DALI_REGISTER_OPERATOR(ZerosLike, ZerosLike<CPUBackend>, CPU);
@@ -120,7 +120,7 @@ DALI_SCHEMA(OnesLike)
120120
.DocStr(R"code(Returns new data with the same shape and type as the input array, filled with ones.)code")
121121
.NumInput(1)
122122
.InputDox(0, "data_like", "TensorList", R"code(The input data value to copy the shape and type from.)code")
123-
.InputDevice(0, InputDevice::Any)
123+
.InputDevice(0, InputDevice::Metadata)
124124
.NumOutput(1)
125125
.AddOptionalTypeArg("dtype", R"code(Overrides the output data type.)code", DALI_INT32);
126126
DALI_REGISTER_OPERATOR(OnesLike, OnesLike<CPUBackend>, CPU);

dali/operators/generic/shapes.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ namespace dali {
1919
DALI_SCHEMA(Shapes)
2020
.DocStr(R"code(Returns the shapes of inputs.)code")
2121
.NumInput(1)
22-
.InputDevice(0, InputDevice::Any)
22+
.InputDevice(0, InputDevice::Metadata)
2323
.NumOutput(1)
2424
.AllowSequences()
2525
.SupportVolumetric()

dali/operators/random/beta_distribution_cpu.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ a single value per sample is generated.
3838
.NumInput(0, 1)
3939
.InputDox(0, "shape_like", "TensorList",
4040
"Shape of this input will be used to infer the shape of the output, if provided.")
41+
.InputDevice(0, InputDevice::Metadata)
4142
.NumOutput(1)
4243
.AddOptionalArg("alpha", R"code(The alpha parameter, a positive ``float32`` scalar.)code", 1.0f,
4344
true)

dali/operators/random/choice_cpu.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ that is: :meth:`nvidia.dali.types.DALIDataType`, :meth:`nvidia.dali.types.DALIIm
4242
"Otherwise ``__a`` is treated as 1D array of input samples.")
4343
.InputDox(1, "shape_like", "TensorList",
4444
"Shape of this input will be used to infer the shape of the output, if provided.")
45+
.InputDevice(1, InputDevice::Metadata)
4546
.NumOutput(1)
4647
.AddOptionalArg<std::vector<float>>("p",
4748
"Distribution of the probabilities. "

dali/operators/random/coin_flip_cpu.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ a single value per sample is generated.
3030
.NumInput(0, 1)
3131
.InputDox(0, "shape_like", "TensorList",
3232
"Shape of this input will be used to infer the shape of the output, if provided.")
33+
.InputDevice(0, InputDevice::Metadata)
3334
.NumOutput(1)
3435
.AddOptionalArg<float>("probability",
3536
R"code(Probability of value 1.)code",
@@ -51,6 +52,7 @@ sample is generated.
5152
.NumInput(0, 1)
5253
.InputDox(0, "shape_like", "TensorList",
5354
"Shape of this input will be used to infer the shape of the output, if provided.")
55+
.InputDevice(0, InputDevice::Metadata)
5456
.NumOutput(1)
5557
.AddParent("random__CoinFlip")
5658
.Deprecate("random__CoinFlip"); // Deprecated in 0.30

dali/operators/random/normal_distribution_cpu.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ a single value per sample is generated.
2828
.NumInput(0, 1)
2929
.InputDox(0, "shape_like", "TensorList",
3030
"Shape of this input will be used to infer the shape of the output, if provided.")
31-
.InputDevice(0, InputDevice::Any)
31+
.InputDevice(0, InputDevice::Metadata)
3232
.NumOutput(1)
3333
.AddOptionalArg<float>("mean",
3434
R"code(Mean of the distribution.)code",
@@ -51,7 +51,7 @@ a single value per sample is generated.
5151
.NumInput(0, 1)
5252
.InputDox(0, "shape_like", "TensorList",
5353
"Shape of this input will be used to infer the shape of the output, if provided.")
54-
.InputDevice(0, InputDevice::Any)
54+
.InputDevice(0, InputDevice::Metadata)
5555
.NumOutput(1)
5656
.AddParent("random__Normal")
5757
.Deprecate("random__Normal"); // Deprecated in 0.30

dali/operators/random/uniform_distribution_cpu.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ a single value per sample is generated.
3232
.NumInput(0, 1)
3333
.InputDox(0, "shape_like", "TensorList",
3434
"Shape of this input will be used to infer the shape of the output, if provided.")
35+
.InputDevice(0, InputDevice::Metadata)
3536
.NumOutput(1)
3637
.AddOptionalArg("range",
3738
R"code(Range ``[min, max)`` of a continuous uniform distribution.
@@ -67,6 +68,7 @@ a single value per sample is generated.
6768
.NumInput(0, 1)
6869
.InputDox(0, "shape_like", "TensorList",
6970
"Shape of this input will be used to infer the shape of the output, if provided.")
71+
.InputDevice(0, InputDevice::Metadata)
7072
.NumOutput(1)
7173
.AddOptionalArg("range",
7274
R"code(Range ``[min, max)`` of a continuous uniform distribution.

dali/pipeline/executor/executor2/exec_graph.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,16 @@ struct ExecEdge {
9090
/** The index of the input in OpSpec. It matches the edge's index in consumer->inputs. */
9191
int consumer_input_idx = 0;
9292
StorageDevice device = {};
93+
/** The input passes only the metadata, skipping stream synchronization. */
94+
bool metadata = false;
9395

9496
constexpr bool operator==(const ExecEdge &other) const {
9597
return producer == other.producer &&
9698
consumer == other.consumer &&
9799
producer_output_idx == other.producer_output_idx &&
98100
consumer_input_idx == other.consumer_input_idx &&
99-
device == other.device;
101+
device == other.device &&
102+
metadata == other.metadata;
100103
}
101104

102105
constexpr bool operator!=(const ExecEdge &other) const {

dali/pipeline/executor/executor2/exec_graph_lowering.cc

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,16 @@ void ExecGraph::Lower(const graph::OpGraph &def) {
5050
for (auto &consumer : out->consumers) {
5151
auto *exec_con = def2exec[consumer.op];
5252
assert(exec_con != nullptr);
53-
Link(&exec_node, o, exec_con, consumer.idx)->device = dev;
53+
auto *edge = Link(&exec_node, o, exec_con, consumer.idx);
54+
edge->device = dev;
55+
if (consumer.op) {
56+
auto &consumer_spec = consumer.op->spec;
57+
auto &schema = consumer_spec.GetSchemaOrDefault();
58+
if (edge->consumer_input_idx < schema.MaxNumInput()) { // only regular inputs
59+
if (schema.GetInputDevice(edge->consumer_input_idx) == InputDevice::Metadata)
60+
edge->metadata = true;
61+
}
62+
}
5463
}
5564
exec_node.outputs[o].device = dev;
5665
}

dali/pipeline/executor/executor2/exec_node_task.cc

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -278,16 +278,13 @@ void OpTask::SetWorkspaceInputs() {
278278
auto process_input = [&](int i, auto backend) {
279279
using Backend = decltype(backend);
280280
const auto &inp = TaskInput<Backend>(ti);
281-
// If the output order of the operator is `host` then we don't wait for GPU
282-
// inputs - they can't be accessed directly on host and the operator will
283-
// have to issue some sort of synchronization if and when necessary.
284-
// This optimization is essential to avoid oversynchronization
285-
// when the operator needs to access the metadata only (e.g. getting the shape).
286-
if ((order.is_device() || std::is_same_v<Backend, CPUBackend>) /*see comment above */ &&
287-
inp.event && inp.order != order)
281+
bool is_meta = node_->inputs[i]->metadata;
282+
// metadata-only inputs don't need to be synchronized
283+
if (!is_meta && inp.event && inp.order != order)
288284
events.insert(inp.event);
289285

290-
if (inp.order == order) { // use the input directly
286+
// metadata-only inputs don't need a proper stream
287+
if (inp.order == order || is_meta) { // use the input directly
291288
ws_->SetInput(i, inp.data);
292289
} else { // create another TL and set its order (and layout, while we're at it)
293290
auto tl = std::make_shared<TensorList<Backend>>();
@@ -477,7 +474,7 @@ tasking::SharedTask ExecNodeTask::CreateTask(ExecNode *node, const WorkspacePara
477474
}
478475
}
479476

480-
void ClearWorkspacePayload(Workspace &ws) {
477+
void ClearWorkspacePayload(Workspace &ws, ExecNode &node) {
481478
auto event = ws.has_event() ? ws.event() : nullptr;
482479
for (int i = 0; i < ws.NumInput(); i++) {
483480
// TODO(michalz): Some smarter deletion management
@@ -492,14 +489,16 @@ void ClearWorkspacePayload(Workspace &ws) {
492489
if (ws.InputIsType<CPUBackend>(i)) {
493490
if (auto &pinp = ws.InputPtr<CPUBackend>(i)) {
494491
auto &inp = *pinp;
495-
if (inp.is_pinned() && event && inp.order() != ws.output_order())
492+
if (event &&
493+
!node.inputs[i]->metadata &&
494+
inp.is_pinned() && inp.order() != ws.output_order())
496495
inp.order().wait(event);
497496
ws.SetInput<CPUBackend>(i, nullptr);
498497
}
499498
} else if (ws.InputIsType<GPUBackend>(i)) {
500499
if (auto &pinp = ws.InputPtr<GPUBackend>(i)) {
501500
auto &inp = *pinp;
502-
if (event && inp.order() != ws.output_order())
501+
if (event && !node.inputs[i]->metadata && inp.order() != ws.output_order())
503502
inp.order().wait(event);
504503
ws.SetInput<GPUBackend>(i, nullptr);
505504
}
@@ -525,7 +524,7 @@ void ClearWorkspacePayload(Workspace &ws) {
525524

526525
void ExecNodeTask::ClearWorkspace() {
527526
assert(ws_);
528-
ClearWorkspacePayload(*ws_);
527+
ClearWorkspacePayload(*ws_, *node_);
529528
}
530529

531530
} // namespace exec2

dali/pipeline/executor/executor2/stream_assignment.h

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,15 @@ template <StreamPolicy policy>
3535
class StreamAssignment;
3636

3737
inline bool NeedsStream(const ExecNode *node) {
38-
if (node->is_pipeline_output) {
39-
for (auto &pipe_out : node->inputs) {
40-
if (pipe_out->device == StorageDevice::GPU)
38+
if (node->is_pipeline_output || node->backend == OpType::CPU) {
39+
for (auto &input : node->inputs) {
40+
if (input->device == StorageDevice::GPU && !input->metadata)
4141
return true;
4242
}
43+
return false;
4344
} else {
44-
return node->backend != OpType::CPU;
45+
return true;
4546
}
46-
return false;
4747
}
4848

4949
inline OpType NodeType(const ExecNode *node) {
@@ -117,6 +117,12 @@ class StreamAssignment<StreamPolicy::PerBackend> {
117117
if (has_gpu_)
118118
return; // we already have both, nothing more can happen
119119
break;
120+
case OpType::CPU:
121+
if (NeedsStream(&node)) { // treat CPU nodes with GPU inputs as GPU
122+
has_gpu_ = true;
123+
if (has_mixed_)
124+
return;
125+
}
120126
default:
121127
break;
122128
}
@@ -128,11 +134,14 @@ class StreamAssignment<StreamPolicy::PerBackend> {
128134
* If the node is a Mixed node, it gets stream index 0.
129135
* If the node is a GPU node it gets stream index 1 if there are any mixed nodes, otherwise
130136
* the only stream is the GPU stream and the returned index is 0.
137+
* CPU nodes get GPU stream if they need one (i.e. they have a GPU input)
131138
*/
132139
std::optional<int> operator[](const ExecNode *node) const {
133140
switch (NodeType(node)) {
134141
case OpType::CPU:
135-
return std::nullopt;
142+
if (!NeedsStream(node))
143+
return std::nullopt;
144+
// fall-through to GPU
136145
case OpType::GPU:
137146
return has_mixed_ ? 1 : 0;
138147
case OpType::MIXED:

dali/pipeline/executor/executor2/stream_assignment_test.cc

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,31 @@ DALI_REGISTER_OPERATOR(StreamAssignmentDummyOp, StreamAssignmentDummyOp<CPUBacke
4848
DALI_REGISTER_OPERATOR(StreamAssignmentDummyOp, StreamAssignmentDummyOp<MixedBackend>, Mixed);
4949
DALI_REGISTER_OPERATOR(StreamAssignmentDummyOp, StreamAssignmentDummyOp<GPUBackend>, GPU);
5050

51+
52+
template <typename Backend>
53+
class StreamAssignmentMetaOp : public Operator<Backend> {
54+
public:
55+
using Operator<Backend>::Operator;
56+
USE_OPERATOR_MEMBERS();
57+
58+
void RunImpl(Workspace &ws) override {}
59+
bool SetupImpl(std::vector<OutputDesc> &output_desc, const Workspace &ws) override {
60+
return false;
61+
}
62+
};
63+
64+
DALI_SCHEMA(StreamAssignmentMetaOp)
65+
.NumInput(0, 999)
66+
.InputDevice(0, 999, InputDevice::Metadata)
67+
.NumOutput(0)
68+
.AdditionalOutputsFn([](const OpSpec &spec) {
69+
return spec.NumOutput();
70+
});
71+
72+
DALI_REGISTER_OPERATOR(StreamAssignmentMetaOp, StreamAssignmentMetaOp<CPUBackend>, CPU);
73+
DALI_REGISTER_OPERATOR(StreamAssignmentMetaOp, StreamAssignmentMetaOp<MixedBackend>, Mixed);
74+
DALI_REGISTER_OPERATOR(StreamAssignmentMetaOp, StreamAssignmentMetaOp<GPUBackend>, GPU);
75+
5176
namespace exec2 {
5277

5378
namespace {
@@ -71,6 +96,27 @@ OpSpec SpecMixed() {
7196
return SpecDev("mixed");
7297
}
7398

99+
100+
OpSpec SpecMetaDev(const std::string &device) {
101+
return OpSpec("StreamAssignmentMetaOp")
102+
.AddArg("device", device)
103+
.AddArg("num_threads", 1)
104+
.AddArg("max_batch_size", 1);
105+
}
106+
107+
OpSpec SpecMetaGPU() {
108+
return SpecMetaDev("gpu");
109+
}
110+
111+
OpSpec SpecMetaCPU() {
112+
return SpecMetaDev("cpu");
113+
}
114+
115+
OpSpec SpecMetaMixed() {
116+
return SpecMetaDev("mixed");
117+
}
118+
119+
74120
auto MakeNodeMap(const ExecGraph &graph) {
75121
std::map<std::string_view, const ExecNode *, std::less<>> map;
76122
for (auto &n : graph.Nodes())
@@ -122,6 +168,38 @@ TEST(Exec2Test, StreamAssignment_Single_CPUMixedGPU) {
122168
EXPECT_EQ(assignment[map["c"]], 0);
123169
}
124170

171+
template <StreamPolicy policy>
172+
void TestGPU2CPUAssignment() {
173+
graph::OpGraph::Builder b;
174+
b.Add("a",
175+
SpecGPU()
176+
.AddOutput("a->b", "gpu")
177+
.AddOutput("a->c", "gpu"));
178+
b.Add("b",
179+
SpecCPU()
180+
.AddInput("a->b", "gpu")
181+
.AddOutput("b->out", "cpu"));
182+
b.Add("c",
183+
SpecMetaCPU()
184+
.AddInput("a->c", "gpu")
185+
.AddOutput("c->out", "cpu"));
186+
b.AddOutput("b->out_cpu");
187+
b.AddOutput("c->out_cpu");
188+
auto g = std::move(b).GetGraph(true);
189+
ExecGraph eg;
190+
eg.Lower(g);
191+
192+
StreamAssignment<policy> assignment(eg);
193+
auto map = MakeNodeMap(eg);
194+
EXPECT_EQ(assignment[map["a"]], 0);
195+
EXPECT_EQ(assignment[map["b"]], 0); // CPU operator with GPU input
196+
EXPECT_EQ(assignment[map["c"]], std::nullopt); // metadata-only
197+
}
198+
199+
TEST(Exec2Test, StreamAssignment_Single_GPU2CPU) {
200+
TestGPU2CPUAssignment<StreamPolicy::Single>();
201+
}
202+
125203

126204
TEST(Exec2Test, StreamAssignment_PerBackend_OnlyCPU) {
127205
graph::OpGraph::Builder b;
@@ -194,6 +272,13 @@ TEST(Exec2Test, StreamAssignment_PerBackend_CPUMixedGPU) {
194272
}
195273

196274

275+
TEST(Exec2Test, StreamAssignment_PerBackend_GPU2CPU) {
276+
TestGPU2CPUAssignment<StreamPolicy::PerBackend>();
277+
}
278+
279+
TEST(Exec2Test, StreamAssignment_OperOperator_GPU2CPU) {
280+
TestGPU2CPUAssignment<StreamPolicy::PerOperator>();
281+
}
197282

198283
TEST(Exec2Test, StreamAssignment_PerOperator_1) {
199284
ExecGraph eg;
@@ -272,7 +357,7 @@ TEST(Exec2Test, StreamAssignment_PerOperator_2) {
272357
SpecGPU()
273358
.AddOutput("i->j", "gpu"));
274359
b.Add("j",
275-
SpecCPU()
360+
SpecMetaCPU()
276361
.AddInput("i->j", "gpu")
277362
.AddOutput("j->h", "cpu"));
278363
b.Add("b",
@@ -320,15 +405,15 @@ TEST(Exec2Test, StreamAssignment_PerOperator_2) {
320405
StreamAssignment<StreamPolicy::PerOperator> assignment(eg);
321406
auto map = MakeNodeMap(eg);
322407
EXPECT_EQ(assignment[map["a"]], 0);
323-
EXPECT_EQ(assignment[map["b"]], std::nullopt);
408+
EXPECT_EQ(assignment[map["b"]], 0); // CPU operator with a GPU input needs a stream
324409
EXPECT_EQ(assignment[map["c"]], 0);
325410
EXPECT_EQ(assignment[map["d"]], 0);
326411
EXPECT_EQ(assignment[map["e"]], 1);
327412
EXPECT_EQ(assignment[map["f"]], 0);
328413
EXPECT_EQ(assignment[map["g"]], 0);
329414
EXPECT_EQ(assignment[map["h"]], 0);
330415
EXPECT_EQ(assignment[map["i"]], 2);
331-
EXPECT_EQ(assignment[map["j"]], std::nullopt);
416+
EXPECT_EQ(assignment[map["j"]], std::nullopt); // metadata only
332417
EXPECT_EQ(assignment[map["k"]], 3);
333418
}
334419

0 commit comments

Comments
 (0)