Skip to content

Add metadata-only inputs. #5635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dali/operators/generic/cast.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ DALI_SCHEMA(Cast)
DALI_SCHEMA(CastLike)
.DocStr("Cast the first tensor to the type of the second tensor.")
.NumInput(2)
.InputDevice(1, InputDevice::Any)
.InputDevice(1, InputDevice::Metadata)
.NumOutput(1)
.AllowSequences()
.SupportVolumetric();
Expand Down
6 changes: 3 additions & 3 deletions dali/operators/generic/constant_value.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ DALI_SCHEMA(FullLike)
.DocStr(R"code(Returns new data with the same shape and type as the input data, filled with a `fill_value`.)code")
.NumInput(2)
.InputDox(0, "data_like", "TensorList", R"code(The input data value to copy the shape and type from.)code")
.InputDevice(0, InputDevice::Any)
.InputDevice(0, InputDevice::Metadata)
.InputDox(1, "fill_value", "TensorList", R"code(The fill value.)code")
.NumOutput(1);
DALI_REGISTER_OPERATOR(FullLike, FullLike<CPUBackend>, CPU);
Expand All @@ -102,7 +102,7 @@ DALI_SCHEMA(ZerosLike)
.DocStr(R"code(Returns new data with the same shape and type as the input array, filled with zeros.)code")
.NumInput(1)
.InputDox(0, "data_like", "TensorList", R"code(The input data value to copy the shape and type from.)code")
.InputDevice(0, InputDevice::Any)
.InputDevice(0, InputDevice::Metadata)
.NumOutput(1)
.AddOptionalTypeArg("dtype", R"code(Overrides the output data type.)code", DALI_INT32);
DALI_REGISTER_OPERATOR(ZerosLike, ZerosLike<CPUBackend>, CPU);
Expand All @@ -120,7 +120,7 @@ DALI_SCHEMA(OnesLike)
.DocStr(R"code(Returns new data with the same shape and type as the input array, filled with ones.)code")
.NumInput(1)
.InputDox(0, "data_like", "TensorList", R"code(The input data value to copy the shape and type from.)code")
.InputDevice(0, InputDevice::Any)
.InputDevice(0, InputDevice::Metadata)
.NumOutput(1)
.AddOptionalTypeArg("dtype", R"code(Overrides the output data type.)code", DALI_INT32);
DALI_REGISTER_OPERATOR(OnesLike, OnesLike<CPUBackend>, CPU);
Expand Down
2 changes: 1 addition & 1 deletion dali/operators/generic/shapes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace dali {
DALI_SCHEMA(Shapes)
.DocStr(R"code(Returns the shapes of inputs.)code")
.NumInput(1)
.InputDevice(0, InputDevice::Any)
.InputDevice(0, InputDevice::Metadata)
.NumOutput(1)
.AllowSequences()
.SupportVolumetric()
Expand Down
1 change: 1 addition & 0 deletions dali/operators/random/beta_distribution_cpu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ a single value per sample is generated.
.NumInput(0, 1)
.InputDox(0, "shape_like", "TensorList",
"Shape of this input will be used to infer the shape of the output, if provided.")
.InputDevice(0, InputDevice::Metadata)
.NumOutput(1)
.AddOptionalArg("alpha", R"code(The alpha parameter, a positive ``float32`` scalar.)code", 1.0f,
true)
Expand Down
1 change: 1 addition & 0 deletions dali/operators/random/choice_cpu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ that is: :meth:`nvidia.dali.types.DALIDataType`, :meth:`nvidia.dali.types.DALIIm
"Otherwise ``__a`` is treated as 1D array of input samples.")
.InputDox(1, "shape_like", "TensorList",
"Shape of this input will be used to infer the shape of the output, if provided.")
.InputDevice(1, InputDevice::Metadata)
.NumOutput(1)
.AddOptionalArg<std::vector<float>>("p",
"Distribution of the probabilities. "
Expand Down
2 changes: 2 additions & 0 deletions dali/operators/random/coin_flip_cpu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ a single value per sample is generated.
.NumInput(0, 1)
.InputDox(0, "shape_like", "TensorList",
"Shape of this input will be used to infer the shape of the output, if provided.")
.InputDevice(0, InputDevice::Metadata)
.NumOutput(1)
.AddOptionalArg<float>("probability",
R"code(Probability of value 1.)code",
Expand All @@ -51,6 +52,7 @@ sample is generated.
.NumInput(0, 1)
.InputDox(0, "shape_like", "TensorList",
"Shape of this input will be used to infer the shape of the output, if provided.")
.InputDevice(0, InputDevice::Metadata)
.NumOutput(1)
.AddParent("random__CoinFlip")
.Deprecate("random__CoinFlip"); // Deprecated in 0.30
Expand Down
4 changes: 2 additions & 2 deletions dali/operators/random/normal_distribution_cpu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ a single value per sample is generated.
.NumInput(0, 1)
.InputDox(0, "shape_like", "TensorList",
"Shape of this input will be used to infer the shape of the output, if provided.")
.InputDevice(0, InputDevice::Any)
.InputDevice(0, InputDevice::Metadata)
.NumOutput(1)
.AddOptionalArg<float>("mean",
R"code(Mean of the distribution.)code",
Expand All @@ -51,7 +51,7 @@ a single value per sample is generated.
.NumInput(0, 1)
.InputDox(0, "shape_like", "TensorList",
"Shape of this input will be used to infer the shape of the output, if provided.")
.InputDevice(0, InputDevice::Any)
.InputDevice(0, InputDevice::Metadata)
.NumOutput(1)
.AddParent("random__Normal")
.Deprecate("random__Normal"); // Deprecated in 0.30
Expand Down
2 changes: 2 additions & 0 deletions dali/operators/random/uniform_distribution_cpu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ a single value per sample is generated.
.NumInput(0, 1)
.InputDox(0, "shape_like", "TensorList",
"Shape of this input will be used to infer the shape of the output, if provided.")
.InputDevice(0, InputDevice::Metadata)
.NumOutput(1)
.AddOptionalArg("range",
R"code(Range ``[min, max)`` of a continuous uniform distribution.
Expand Down Expand Up @@ -67,6 +68,7 @@ a single value per sample is generated.
.NumInput(0, 1)
.InputDox(0, "shape_like", "TensorList",
"Shape of this input will be used to infer the shape of the output, if provided.")
.InputDevice(0, InputDevice::Metadata)
.NumOutput(1)
.AddOptionalArg("range",
R"code(Range ``[min, max)`` of a continuous uniform distribution.
Expand Down
5 changes: 4 additions & 1 deletion dali/pipeline/executor/executor2/exec_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,16 @@ struct ExecEdge {
/** The index of the input in OpSpec. It matches the edge's index in consumer->inputs. */
int consumer_input_idx = 0;
StorageDevice device = {};
/** The input passes only the metadata, skipping stream synchronization. */
bool metadata = false;

constexpr bool operator==(const ExecEdge &other) const {
return producer == other.producer &&
consumer == other.consumer &&
producer_output_idx == other.producer_output_idx &&
consumer_input_idx == other.consumer_input_idx &&
device == other.device;
device == other.device &&
metadata == other.metadata;
}

constexpr bool operator!=(const ExecEdge &other) const {
Expand Down
11 changes: 10 additions & 1 deletion dali/pipeline/executor/executor2/exec_graph_lowering.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,16 @@ void ExecGraph::Lower(const graph::OpGraph &def) {
for (auto &consumer : out->consumers) {
auto *exec_con = def2exec[consumer.op];
assert(exec_con != nullptr);
Link(&exec_node, o, exec_con, consumer.idx)->device = dev;
auto *edge = Link(&exec_node, o, exec_con, consumer.idx);
edge->device = dev;
if (consumer.op) {
auto &consumer_spec = consumer.op->spec;
auto &schema = consumer_spec.GetSchemaOrDefault();
if (edge->consumer_input_idx < schema.MaxNumInput()) { // only regular inputs
if (schema.GetInputDevice(edge->consumer_input_idx) == InputDevice::Metadata)
edge->metadata = true;
}
}
}
exec_node.outputs[o].device = dev;
}
Expand Down
23 changes: 11 additions & 12 deletions dali/pipeline/executor/executor2/exec_node_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,16 +278,13 @@ void OpTask::SetWorkspaceInputs() {
auto process_input = [&](int i, auto backend) {
using Backend = decltype(backend);
const auto &inp = TaskInput<Backend>(ti);
// If the output order of the operator is `host` then we don't wait for GPU
// inputs - they can't be accessed directly on host and the operator will
// have to issue some sort of synchronization if and when necessary.
// This optimization is essential to avoid oversynchronization
// when the operator needs to access the metadata only (e.g. getting the shape).
Comment on lines -281 to -285
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we get rid of the dangerous special treatment of GPU inputs to host-ordered operators. The new metadata flag takes care of the optimization now.

if ((order.is_device() || std::is_same_v<Backend, CPUBackend>) /*see comment above */ &&
inp.event && inp.order != order)
bool is_meta = node_->inputs[i]->metadata;
// metadata-only inputs don't need to be synchronized
if (!is_meta && inp.event && inp.order != order)
events.insert(inp.event);

if (inp.order == order) { // use the input directly
// metadata-only inputs don't need a proper stream
if (inp.order == order || is_meta) { // use the input directly
ws_->SetInput(i, inp.data);
} else { // create another TL and set its order (and layout, while we're at it)
auto tl = std::make_shared<TensorList<Backend>>();
Expand Down Expand Up @@ -477,7 +474,7 @@ tasking::SharedTask ExecNodeTask::CreateTask(ExecNode *node, const WorkspacePara
}
}

void ClearWorkspacePayload(Workspace &ws) {
void ClearWorkspacePayload(Workspace &ws, ExecNode &node) {
auto event = ws.has_event() ? ws.event() : nullptr;
for (int i = 0; i < ws.NumInput(); i++) {
// TODO(michalz): Some smarter deletion management
Expand All @@ -492,14 +489,16 @@ void ClearWorkspacePayload(Workspace &ws) {
if (ws.InputIsType<CPUBackend>(i)) {
if (auto &pinp = ws.InputPtr<CPUBackend>(i)) {
auto &inp = *pinp;
if (inp.is_pinned() && event && inp.order() != ws.output_order())
if (event &&
!node.inputs[i]->metadata &&
inp.is_pinned() && inp.order() != ws.output_order())
inp.order().wait(event);
ws.SetInput<CPUBackend>(i, nullptr);
}
} else if (ws.InputIsType<GPUBackend>(i)) {
if (auto &pinp = ws.InputPtr<GPUBackend>(i)) {
auto &inp = *pinp;
if (event && inp.order() != ws.output_order())
if (event && !node.inputs[i]->metadata && inp.order() != ws.output_order())
inp.order().wait(event);
ws.SetInput<GPUBackend>(i, nullptr);
}
Expand All @@ -525,7 +524,7 @@ void ClearWorkspacePayload(Workspace &ws) {

void ExecNodeTask::ClearWorkspace() {
assert(ws_);
ClearWorkspacePayload(*ws_);
ClearWorkspacePayload(*ws_, *node_);
}

} // namespace exec2
Expand Down
21 changes: 15 additions & 6 deletions dali/pipeline/executor/executor2/stream_assignment.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ template <StreamPolicy policy>
class StreamAssignment;

inline bool NeedsStream(const ExecNode *node) {
if (node->is_pipeline_output) {
for (auto &pipe_out : node->inputs) {
if (pipe_out->device == StorageDevice::GPU)
if (node->is_pipeline_output || node->backend == OpType::CPU) {
for (auto &input : node->inputs) {
if (input->device == StorageDevice::GPU && !input->metadata)
return true;
}
return false;
} else {
return node->backend != OpType::CPU;
return true;
}
return false;
}

inline OpType NodeType(const ExecNode *node) {
Expand Down Expand Up @@ -117,6 +117,12 @@ class StreamAssignment<StreamPolicy::PerBackend> {
if (has_gpu_)
return; // we already have both, nothing more can happen
break;
case OpType::CPU:
if (NeedsStream(&node)) { // treat CPU nodes with GPU inputs as GPU
has_gpu_ = true;
if (has_mixed_)
return;
}
default:
break;
}
Expand All @@ -128,11 +134,14 @@ class StreamAssignment<StreamPolicy::PerBackend> {
* If the node is a Mixed node, it gets stream index 0.
* If the node is a GPU node it gets stream index 1 if there are any mixed nodes, otherwise
* the only stream is the GPU stream and the returned index is 0.
* CPU nodes get GPU stream if they need one (i.e. they have a GPU input)
*/
std::optional<int> operator[](const ExecNode *node) const {
switch (NodeType(node)) {
case OpType::CPU:
return std::nullopt;
if (!NeedsStream(node))
return std::nullopt;
// fall-through to GPU
case OpType::GPU:
return has_mixed_ ? 1 : 0;
case OpType::MIXED:
Expand Down
91 changes: 88 additions & 3 deletions dali/pipeline/executor/executor2/stream_assignment_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,31 @@ DALI_REGISTER_OPERATOR(StreamAssignmentDummyOp, StreamAssignmentDummyOp<CPUBacke
DALI_REGISTER_OPERATOR(StreamAssignmentDummyOp, StreamAssignmentDummyOp<MixedBackend>, Mixed);
DALI_REGISTER_OPERATOR(StreamAssignmentDummyOp, StreamAssignmentDummyOp<GPUBackend>, GPU);


template <typename Backend>
class StreamAssignmentMetaOp : public Operator<Backend> {
public:
using Operator<Backend>::Operator;
USE_OPERATOR_MEMBERS();

void RunImpl(Workspace &ws) override {}
bool SetupImpl(std::vector<OutputDesc> &output_desc, const Workspace &ws) override {
return false;
}
};

DALI_SCHEMA(StreamAssignmentMetaOp)
.NumInput(0, 999)
.InputDevice(0, 999, InputDevice::Metadata)
.NumOutput(0)
.AdditionalOutputsFn([](const OpSpec &spec) {
return spec.NumOutput();
});

DALI_REGISTER_OPERATOR(StreamAssignmentMetaOp, StreamAssignmentMetaOp<CPUBackend>, CPU);
DALI_REGISTER_OPERATOR(StreamAssignmentMetaOp, StreamAssignmentMetaOp<MixedBackend>, Mixed);
DALI_REGISTER_OPERATOR(StreamAssignmentMetaOp, StreamAssignmentMetaOp<GPUBackend>, GPU);

namespace exec2 {

namespace {
Expand All @@ -71,6 +96,27 @@ OpSpec SpecMixed() {
return SpecDev("mixed");
}


OpSpec SpecMetaDev(const std::string &device) {
return OpSpec("StreamAssignmentMetaOp")
.AddArg("device", device)
.AddArg("num_threads", 1)
.AddArg("max_batch_size", 1);
}

OpSpec SpecMetaGPU() {
return SpecMetaDev("gpu");
}

OpSpec SpecMetaCPU() {
return SpecMetaDev("cpu");
}

OpSpec SpecMetaMixed() {
return SpecMetaDev("mixed");
}


auto MakeNodeMap(const ExecGraph &graph) {
std::map<std::string_view, const ExecNode *, std::less<>> map;
for (auto &n : graph.Nodes())
Expand Down Expand Up @@ -122,6 +168,38 @@ TEST(Exec2Test, StreamAssignment_Single_CPUMixedGPU) {
EXPECT_EQ(assignment[map["c"]], 0);
}

template <StreamPolicy policy>
void TestGPU2CPUAssignment() {
graph::OpGraph::Builder b;
b.Add("a",
SpecGPU()
.AddOutput("a->b", "gpu")
.AddOutput("a->c", "gpu"));
b.Add("b",
SpecCPU()
.AddInput("a->b", "gpu")
.AddOutput("b->out", "cpu"));
b.Add("c",
SpecMetaCPU()
.AddInput("a->c", "gpu")
.AddOutput("c->out", "cpu"));
b.AddOutput("b->out_cpu");
b.AddOutput("c->out_cpu");
auto g = std::move(b).GetGraph(true);
ExecGraph eg;
eg.Lower(g);

StreamAssignment<policy> assignment(eg);
auto map = MakeNodeMap(eg);
EXPECT_EQ(assignment[map["a"]], 0);
EXPECT_EQ(assignment[map["b"]], 0); // CPU operator with GPU input
EXPECT_EQ(assignment[map["c"]], std::nullopt); // metadata-only
}

TEST(Exec2Test, StreamAssignment_Single_GPU2CPU) {
TestGPU2CPUAssignment<StreamPolicy::Single>();
}


TEST(Exec2Test, StreamAssignment_PerBackend_OnlyCPU) {
graph::OpGraph::Builder b;
Expand Down Expand Up @@ -194,6 +272,13 @@ TEST(Exec2Test, StreamAssignment_PerBackend_CPUMixedGPU) {
}


TEST(Exec2Test, StreamAssignment_PerBackend_GPU2CPU) {
TestGPU2CPUAssignment<StreamPolicy::PerBackend>();
}

TEST(Exec2Test, StreamAssignment_OperOperator_GPU2CPU) {
TestGPU2CPUAssignment<StreamPolicy::PerOperator>();
}

TEST(Exec2Test, StreamAssignment_PerOperator_1) {
ExecGraph eg;
Expand Down Expand Up @@ -272,7 +357,7 @@ TEST(Exec2Test, StreamAssignment_PerOperator_2) {
SpecGPU()
.AddOutput("i->j", "gpu"));
b.Add("j",
SpecCPU()
SpecMetaCPU()
.AddInput("i->j", "gpu")
.AddOutput("j->h", "cpu"));
b.Add("b",
Expand Down Expand Up @@ -320,15 +405,15 @@ TEST(Exec2Test, StreamAssignment_PerOperator_2) {
StreamAssignment<StreamPolicy::PerOperator> assignment(eg);
auto map = MakeNodeMap(eg);
EXPECT_EQ(assignment[map["a"]], 0);
EXPECT_EQ(assignment[map["b"]], std::nullopt);
EXPECT_EQ(assignment[map["b"]], 0); // CPU operator with a GPU input needs a stream
EXPECT_EQ(assignment[map["c"]], 0);
EXPECT_EQ(assignment[map["d"]], 0);
EXPECT_EQ(assignment[map["e"]], 1);
EXPECT_EQ(assignment[map["f"]], 0);
EXPECT_EQ(assignment[map["g"]], 0);
EXPECT_EQ(assignment[map["h"]], 0);
EXPECT_EQ(assignment[map["i"]], 2);
EXPECT_EQ(assignment[map["j"]], std::nullopt);
EXPECT_EQ(assignment[map["j"]], std::nullopt); // metadata only
EXPECT_EQ(assignment[map["k"]], 3);
}

Expand Down
Loading
Loading