Skip to content

Added low_cardinality_allow_in_native_format setting. #3879

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 7 additions & 26 deletions dbms/programs/server/TCPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,19 +370,7 @@ void TCPHandler::processInsertQuery(const Settings & global_settings)
}

/// Send block to the client - table structure.
Block block = state.io.out->getHeader();

/// Support insert from old clients without low cardinality type.
if (client_revision && client_revision < DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE)
{
for (auto & col : block)
{
col.type = recursiveRemoveLowCardinality(col.type);
col.column = recursiveRemoveLowCardinality(col.column);
}
}

sendData(block);
sendData(state.io.out->getHeader());

readData(global_settings);
state.io.out->writeSuffix();
Expand All @@ -399,16 +387,6 @@ void TCPHandler::processOrdinaryQuery()
{
Block header = state.io.in->getHeader();

/// Send data to old clients without low cardinality type.
if (client_revision && client_revision < DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE)
{
for (auto & column : header)
{
column.column = recursiveRemoveLowCardinality(column.column);
column.type = recursiveRemoveLowCardinality(column.type);
}
}

if (header)
sendData(header);
}
Expand Down Expand Up @@ -782,7 +760,8 @@ void TCPHandler::initBlockInput()
state.block_in = std::make_shared<NativeBlockInputStream>(
*state.maybe_compressed_in,
header,
client_revision);
client_revision,
!connection_context.getSettingsRef().low_cardinality_allow_in_native_format);
}
}

Expand All @@ -803,7 +782,8 @@ void TCPHandler::initBlockOutput(const Block & block)
state.block_out = std::make_shared<NativeBlockOutputStream>(
*state.maybe_compressed_out,
client_revision,
block.cloneEmpty());
block.cloneEmpty(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably also change stream headers based on the setting value.

!connection_context.getSettingsRef().low_cardinality_allow_in_native_format);
}
}

Expand All @@ -815,7 +795,8 @@ void TCPHandler::initLogsBlockOutput(const Block & block)
state.logs_block_out = std::make_shared<NativeBlockOutputStream>(
*out,
client_revision,
block.cloneEmpty());
block.cloneEmpty(),
!connection_context.getSettingsRef().low_cardinality_allow_in_native_format);
}
}

Expand Down
7 changes: 4 additions & 3 deletions dbms/src/DataStreams/NativeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server
{
}

NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_)
: istr(istr_), header(header_), server_revision(server_revision_)
NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_, bool convert_types_to_low_cardinality_)
: istr(istr_), header(header_), server_revision(server_revision_), convert_types_to_low_cardinality(convert_types_to_low_cardinality_)
{
}

Expand Down Expand Up @@ -154,7 +154,8 @@ Block NativeBlockInputStream::readImpl()
column.column = std::move(read_column);

/// Support insert from old clients without low cardinality type.
if (header && server_revision && server_revision < DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE)
bool revision_without_low_cardinality = server_revision && server_revision < DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE;
if (header && (convert_types_to_low_cardinality || revision_without_low_cardinality))
{
column.column = recursiveLowCardinalityConversion(column.column, column.type, header.getByPosition(i).type);
column.type = header.getByPosition(i).type;
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/DataStreams/NativeBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class NativeBlockInputStream : public IProfilingBlockInputStream

/// For cases when data structure (header) is known in advance.
/// NOTE We may use header for data validation and/or type conversions. It is not implemented.
NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_);
NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_, bool convert_types_to_low_cardinality_ = false);

/// For cases when we have an index. It allows to skip columns. Only columns specified in the index will be read.
NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_,
Expand All @@ -91,6 +91,8 @@ class NativeBlockInputStream : public IProfilingBlockInputStream
IndexForNativeFormat::Blocks::const_iterator index_block_end;
IndexOfBlockForNativeFormat::Columns::const_iterator index_column_it;

bool convert_types_to_low_cardinality = false;

/// If an index is specified, then `istr` must be CompressedReadBufferFromFile. Unused otherwise.
CompressedReadBufferFromFile * istr_concrete = nullptr;

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/DataStreams/NativeBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ namespace ErrorCodes


NativeBlockOutputStream::NativeBlockOutputStream(
WriteBuffer & ostr_, UInt64 client_revision_, const Block & header_,
WriteBuffer & ostr_, UInt64 client_revision_, const Block & header_, bool remove_low_cardinality_,
WriteBuffer * index_ostr_, size_t initial_size_of_file_)
: ostr(ostr_), client_revision(client_revision_), header(header_),
index_ostr(index_ostr_), initial_size_of_file(initial_size_of_file_)
index_ostr(index_ostr_), initial_size_of_file(initial_size_of_file_), remove_low_cardinality(remove_low_cardinality_)
{
if (index_ostr)
{
Expand Down Expand Up @@ -104,7 +104,7 @@ void NativeBlockOutputStream::write(const Block & block)
ColumnWithTypeAndName column = block.safeGetByPosition(i);

/// Send data to old clients without low cardinality type.
if (client_revision && client_revision < DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE)
if (remove_low_cardinality || (client_revision && client_revision < DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE))
{
column.column = recursiveRemoveLowCardinality(column.column);
column.type = recursiveRemoveLowCardinality(column.type);
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/DataStreams/NativeBlockOutputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class NativeBlockOutputStream : public IBlockOutputStream
/** If non-zero client_revision is specified, additional block information can be written.
*/
NativeBlockOutputStream(
WriteBuffer & ostr_, UInt64 client_revision_, const Block & header_,
WriteBuffer & ostr_, UInt64 client_revision_, const Block & header_, bool remove_low_cardinality_ = false,
WriteBuffer * index_ostr_ = nullptr, size_t initial_size_of_file_ = 0);

Block getHeader() const override { return header; }
Expand All @@ -42,6 +42,8 @@ class NativeBlockOutputStream : public IBlockOutputStream
size_t initial_size_of_file; /// The initial size of the data file, if `append` done. Used for the index.
/// If you need to write index, then `ostr` must be a CompressedWriteBuffer.
CompressedWriteBuffer * ostr_concrete = nullptr;

bool remove_low_cardinality;
};

}
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ struct Settings
M(SettingBool, parallel_view_processing, false, "Enables pushing to attached views concurrently instead of sequentially.") \
M(SettingBool, enable_debug_queries, false, "Enables debug queries such as AST.") \
M(SettingBool, enable_unaligned_array_join, false, "Allow ARRAY JOIN with multiple arrays that have different sizes. When this settings is enabled, arrays will be resized to the longest one.") \

M(SettingBool, low_cardinality_allow_in_native_format, true, "Use LowCardinality type in Native format. Otherwise, convert LowCardinality columns to ordinary for select query, and convert ordinary columns to required LowCardinality for insert query.") \

#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \
TYPE NAME {DEFAULT};
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/StorageStripeLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class StripeLogBlockOutputStream final : public IBlockOutputStream
data_out(data_out_compressed, CompressionSettings(CompressionMethod::LZ4), storage.max_compress_block_size),
index_out_compressed(storage.full_path() + "index.mrk", INDEX_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT),
index_out(index_out_compressed),
block_out(data_out, 0, storage.getSampleBlock(), &index_out, Poco::File(storage.full_path() + "data.bin").getSize())
block_out(data_out, 0, storage.getSampleBlock(), false, &index_out, Poco::File(storage.full_path() + "data.bin").getSize())
{
}

Expand Down