Skip to content

Commit cbbcb6d

Browse files
committed
Formats/CapnProtoRowInputStream: support Nested and Tuple, fix alignment issues
This updated contrib/capnproto to a newer version that fixes problems with unaligned access to message frames. It also adds support for parsing Struct types as Tuple (named or unnamed), and Nested array types. The `struct X { a @0 :UInt64; b @1 :Text }` in Cap'nProto is equivalent to `x Tuple(a UInt64, b String)` in ClickHouse. Arrays of Struct types such as `y List(X)` are equivalent to `y Nested(a UInt64, b String)`.
1 parent b326b95 commit cbbcb6d

File tree

3 files changed

+59
-14
lines changed

3 files changed

+59
-14
lines changed

contrib/capnproto

Submodule capnproto updated 132 files

dbms/src/Formats/CapnProtoRowInputStream.cpp

+56-12
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,24 @@ Field convertNodeToField(capnp::DynamicValue::Reader value)
6868
auto listValue = value.as<capnp::DynamicList>();
6969
Array res(listValue.size());
7070
for (auto i : kj::indices(listValue))
71-
res[i] = convertNodeToField(listValue[i]);
71+
res[i] = convertNodeToField(listValue[i]);
72+
7273
return res;
7374
}
7475
case capnp::DynamicValue::ENUM:
7576
return UInt64(value.as<capnp::DynamicEnum>().getRaw());
7677
case capnp::DynamicValue::STRUCT:
77-
throw Exception("STRUCT type not supported, read individual fields instead");
78+
{
79+
auto structValue = value.as<capnp::DynamicStruct>();
80+
const auto & fields = structValue.getSchema().getFields();
81+
82+
Field field = Tuple(TupleBackend(fields.size()));
83+
TupleBackend & tuple = get<Tuple &>(field).toUnderType();
84+
for (auto i : kj::indices(fields))
85+
tuple[i] = convertNodeToField(structValue.get(fields[i]));
86+
87+
return field;
88+
}
7889
case capnp::DynamicValue::CAPABILITY:
7990
throw Exception("CAPABILITY type not supported");
8091
case capnp::DynamicValue::ANY_POINTER:
@@ -88,9 +99,8 @@ capnp::StructSchema::Field getFieldOrThrow(capnp::StructSchema node, const std::
8899
KJ_IF_MAYBE(child, node.findFieldByName(field))
89100
return *child;
90101
else
91-
throw Exception("Field " + field + " doesn't exist in schema.");
102+
throw Exception("Field " + field + " doesn't exist in schema " + node.getShortDisplayName().cStr());
92103
}
93-
94104
void CapnProtoRowInputStream::createActions(const NestedFieldList & sortedFields, capnp::StructSchema reader)
95105
{
96106
String last;
@@ -110,13 +120,28 @@ void CapnProtoRowInputStream::createActions(const NestedFieldList & sortedFields
110120
// Descend to a nested structure
111121
for (; level < field.tokens.size() - 1; ++level)
112122
{
113-
last = field.tokens[level];
114-
parent = getFieldOrThrow(reader, last);
115-
reader = parent.getType().asStruct();
116-
actions.push_back({Action::PUSH, parent});
123+
auto node = getFieldOrThrow(reader, field.tokens[level]);
124+
if (node.getType().isStruct()) {
125+
// Descend to field structure
126+
last = field.tokens[level];
127+
parent = node;
128+
reader = parent.getType().asStruct();
129+
actions.push_back({Action::PUSH, parent});
130+
} else if (node.getType().isList()) {
131+
break; // Collect list
132+
} else
133+
throw Exception("Field " + field.tokens[level] + "is neither Struct nor List");
117134
}
135+
118136
// Read field from the structure
119-
actions.push_back({Action::READ, getFieldOrThrow(reader, field.tokens[level]), field.pos});
137+
auto node = getFieldOrThrow(reader, field.tokens[level]);
138+
if (node.getType().isList() && actions.size() > 0 && actions.back().field == node) {
139+
// The field list here flattens Nested elements into multiple arrays
140+
// In order to map Nested types in Cap'nProto back, they need to be collected
141+
actions.back().columns.push_back(field.pos);
142+
} else {
143+
actions.push_back({Action::READ, node, {field.pos}});
144+
}
120145
}
121146
}
122147

@@ -176,7 +201,7 @@ bool CapnProtoRowInputStream::read(MutableColumns & columns)
176201
array = heap_array.asPtr();
177202
}
178203

179-
capnp::FlatArrayMessageReader msg(array);
204+
capnp::UnalignedFlatArrayMessageReader msg(array);
180205
std::vector<capnp::DynamicStruct::Reader> stack;
181206
stack.push_back(msg.getRoot<capnp::DynamicStruct>(root));
182207

@@ -186,9 +211,28 @@ bool CapnProtoRowInputStream::read(MutableColumns & columns)
186211
{
187212
case Action::READ:
188213
{
189-
auto & col = columns[action.column];
190214
Field value = convertNodeToField(stack.back().get(action.field));
191-
col->insert(value);
215+
if (action.columns.size() > 1) {
216+
// Nested columns must be flattened into several arrays
217+
// e.g. Array(Tuple(x ..., y ...)) -> Array(x ...), Array(y ...)
218+
const Array & collected = DB::get<const Array &>(value);
219+
size_t size = collected.size();
220+
// The flattened array contains an array of a part of the nested tuple
221+
Array flattened(size);
222+
for (size_t column_index = 0; column_index < action.columns.size(); ++column_index) {
223+
// Populate array with a single tuple elements
224+
for (size_t off = 0; off < size; ++off) {
225+
const TupleBackend & tuple = DB::get<const Tuple &>(collected[off]).toUnderType();
226+
flattened[off] = tuple[column_index];
227+
}
228+
auto & col = columns[action.columns[column_index]];
229+
col->insert(flattened);
230+
}
231+
} else {
232+
auto & col = columns[action.columns[0]];
233+
col->insert(value);
234+
}
235+
192236
break;
193237
}
194238
case Action::POP:

dbms/src/Formats/CapnProtoRowInputStream.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,13 @@ class CapnProtoRowInputStream : public IRowInputStream
4141
void createActions(const NestedFieldList & sortedFields, capnp::StructSchema reader);
4242

4343
/* Action for state machine for traversing nested structures. */
44+
using BlockPositionList = std::vector<size_t>;
4445
struct Action
4546
{
4647
enum Type { POP, PUSH, READ };
4748
Type type;
4849
capnp::StructSchema::Field field = {};
49-
size_t column = 0;
50+
BlockPositionList columns = {};
5051
};
5152

5253
// Wrapper for classes that could throw in destructor

0 commit comments

Comments
 (0)