Skip to content

Commit b10d71e

Browse files
authored
Merge pull request #110 from Enmk/memory_optimization_on_send_and_receive_lz4_blocks
Reduced memory overhead of preparing LZ4-compressed data for server.
2 parents 5cfda7f + 83c91db commit b10d71e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+562
-494
lines changed

clickhouse/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
SET ( clickhouse-cpp-lib-src
2-
base/coded.cpp
32
base/compressed.cpp
43
base/input.cpp
54
base/output.cpp
65
base/platform.cpp
76
base/socket.cpp
7+
base/wire_format.cpp
88

99
columns/array.cpp
1010
columns/date.cpp

clickhouse/base/coded.cpp

Lines changed: 0 additions & 100 deletions
This file was deleted.

clickhouse/base/coded.h

Lines changed: 0 additions & 65 deletions
This file was deleted.

clickhouse/base/compressed.cpp

Lines changed: 84 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,24 @@
11
#include "compressed.h"
22
#include "wire_format.h"
3+
#include "output.h"
34

45
#include <cityhash/city.h>
56
#include <lz4/lz4.h>
67
#include <stdexcept>
78
#include <system_error>
89

9-
#define DBMS_MAX_COMPRESSED_SIZE 0x40000000ULL // 1GB
10+
namespace {
11+
constexpr size_t HEADER_SIZE = 9;
12+
// see DB::CompressionMethodByte::LZ4 from src/Compression/CompressionInfo.h of ClickHouse project
13+
constexpr uint8_t COMPRESSION_METHOD = 0x82;
14+
// Documentation says that compression is faster when output buffer is larger than LZ4_compressBound estimation.
15+
constexpr size_t EXTRA_COMPRESS_BUFFER_SIZE = 4096;
16+
constexpr size_t DBMS_MAX_COMPRESSED_SIZE = 0x40000000ULL; // 1GB
17+
}
1018

1119
namespace clickhouse {
1220

13-
CompressedInput::CompressedInput(CodedInputStream* input)
21+
CompressedInput::CompressedInput(InputStream* input)
1422
: input_(input)
1523
{
1624
}
@@ -22,7 +30,7 @@ CompressedInput::~CompressedInput() {
2230
#else
2331
if (!std::uncaught_exceptions()) {
2432
#endif
25-
throw std::runtime_error("some data was not readed");
33+
throw std::runtime_error("some data was not read");
2634
}
2735
}
2836
}
@@ -50,9 +58,8 @@ bool CompressedInput::Decompress() {
5058
return false;
5159
}
5260

53-
if (method != 0x82) {
54-
throw std::runtime_error("unsupported compression method " +
55-
std::to_string(int(method)));
61+
if (method != COMPRESSION_METHOD) {
62+
throw std::runtime_error("unsupported compression method " + std::to_string(int(method)));
5663
} else {
5764
if (!WireFormat::ReadFixed(input_, &compressed)) {
5865
return false;
@@ -75,7 +82,7 @@ bool CompressedInput::Decompress() {
7582
out.Write(&original, sizeof(original));
7683
}
7784

78-
if (!WireFormat::ReadBytes(input_, tmp.data() + 9, compressed - 9)) {
85+
if (!WireFormat::ReadBytes(input_, tmp.data() + HEADER_SIZE, compressed - HEADER_SIZE)) {
7986
return false;
8087
} else {
8188
if (hash != CityHash128((const char*)tmp.data(), compressed)) {
@@ -85,7 +92,7 @@ bool CompressedInput::Decompress() {
8592

8693
data_ = Buffer(original);
8794

88-
if (LZ4_decompress_safe((const char*)tmp.data() + 9, (char*)data_.data(), compressed - 9, original) < 0) {
95+
if (LZ4_decompress_safe((const char*)tmp.data() + HEADER_SIZE, (char*)data_.data(), compressed - HEADER_SIZE, original) < 0) {
8996
throw std::runtime_error("can't decompress data");
9097
} else {
9198
mem_.Reset(data_.data(), original);
@@ -95,4 +102,73 @@ bool CompressedInput::Decompress() {
95102
return true;
96103
}
97104

105+
106+
CompressedOutput::CompressedOutput(OutputStream * destination, size_t max_compressed_chunk_size)
107+
: destination_(destination)
108+
, max_compressed_chunk_size_(max_compressed_chunk_size)
109+
{
110+
PreallocateCompressBuffer(max_compressed_chunk_size);
111+
}
112+
113+
CompressedOutput::~CompressedOutput() {
114+
Flush();
115+
}
116+
117+
size_t CompressedOutput::DoWrite(const void* data, size_t len) {
118+
const size_t original_len = len;
119+
// what if len > max_compressed_chunk_size_ ?
120+
const size_t max_chunk_size = max_compressed_chunk_size_ > 0 ? max_compressed_chunk_size_ : len;
121+
if (max_chunk_size > max_compressed_chunk_size_) {
122+
PreallocateCompressBuffer(len);
123+
}
124+
125+
while (len > 0) {
126+
auto to_compress = std::min(len, max_chunk_size);
127+
Compress(data, to_compress);
128+
129+
len -= to_compress;
130+
data = reinterpret_cast<const char*>(data) + to_compress;
131+
}
132+
133+
return original_len - len;
134+
}
135+
136+
void CompressedOutput::DoFlush() {
137+
destination_->Flush();
138+
}
139+
140+
void CompressedOutput::Compress(const void * data, size_t len) {
141+
const auto compressed_size = LZ4_compress_default(
142+
(const char*)data,
143+
(char*)compressed_buffer_.data() + HEADER_SIZE,
144+
len,
145+
compressed_buffer_.size() - HEADER_SIZE);
146+
if (compressed_size <= 0)
147+
throw std::runtime_error("Failed to compress chunk of " + std::to_string(len) + " bytes, "
148+
"LZ4 error: " + std::to_string(compressed_size));
149+
150+
{
151+
auto header = compressed_buffer_.data();
152+
WriteUnaligned(header, COMPRESSION_METHOD);
153+
// Compressed data size with header
154+
WriteUnaligned(header + 1, static_cast<uint32_t>(compressed_size + HEADER_SIZE));
155+
// Original data size
156+
WriteUnaligned(header + 5, static_cast<uint32_t>(len));
157+
}
158+
159+
WireFormat::WriteFixed(destination_, CityHash128(
160+
(const char*)compressed_buffer_.data(), compressed_size + HEADER_SIZE));
161+
WireFormat::WriteBytes(destination_, compressed_buffer_.data(), compressed_size + HEADER_SIZE);
162+
163+
destination_->Flush();
164+
}
165+
166+
void CompressedOutput::PreallocateCompressBuffer(size_t input_size) {
167+
const auto estimated_compressed_buffer_size = LZ4_compressBound(input_size);
168+
if (estimated_compressed_buffer_size <= 0)
169+
throw std::runtime_error("Failed to estimate compressed buffer size, LZ4 error: " + std::to_string(estimated_compressed_buffer_size));
170+
171+
compressed_buffer_.resize(estimated_compressed_buffer_size + HEADER_SIZE + EXTRA_COMPRESS_BUFFER_SIZE);
172+
}
173+
98174
}

clickhouse/base/compressed.h

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
#pragma once
22

3-
#include "coded.h"
3+
#include "input.h"
4+
#include "output.h"
5+
#include "buffer.h"
46

57
namespace clickhouse {
68

79
class CompressedInput : public ZeroCopyInput {
810
public:
9-
CompressedInput(CodedInputStream* input);
11+
CompressedInput(InputStream* input);
1012
~CompressedInput();
1113

1214
protected:
@@ -15,10 +17,29 @@ class CompressedInput : public ZeroCopyInput {
1517
bool Decompress();
1618

1719
private:
18-
CodedInputStream* const input_;
20+
InputStream* const input_;
1921

2022
Buffer data_;
2123
ArrayInput mem_;
2224
};
2325

26+
class CompressedOutput : public OutputStream {
27+
public:
28+
CompressedOutput(OutputStream * destination, size_t max_compressed_chunk_size = 0);
29+
~CompressedOutput();
30+
31+
protected:
32+
size_t DoWrite(const void* data, size_t len) override;
33+
void DoFlush() override;
34+
35+
private:
36+
void Compress(const void * data, size_t len);
37+
void PreallocateCompressBuffer(size_t input_size);
38+
39+
private:
40+
OutputStream * destination_;
41+
const size_t max_compressed_chunk_size_;
42+
Buffer compressed_buffer_;
43+
};
44+
2445
}

clickhouse/base/input.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,21 @@
55

66
namespace clickhouse {
77

8+
bool ZeroCopyInput::Skip(size_t bytes) {
9+
while (bytes > 0) {
10+
const void* ptr;
11+
size_t len = Next(&ptr, bytes);
12+
13+
if (len == 0) {
14+
return false;
15+
}
16+
17+
bytes -= len;
18+
}
19+
20+
return true;
21+
}
22+
823
size_t ZeroCopyInput::DoRead(void* buf, size_t len) {
924
const void* ptr;
1025
size_t result = DoNext(&ptr, len);

0 commit comments

Comments
 (0)