Skip to content

Don't fetch parts not changed by mutations #3103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions dbms/src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1866,14 +1866,18 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(
return nullptr;
}

MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name)
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const MergeTreePartInfo & part_info)
{
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);

DataPartsLock data_parts_lock(data_parts_mutex);
return getActiveContainingPart(part_info, DataPartState::Committed, data_parts_lock);
}

MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name)
{
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
return getActiveContainingPart(part_info);
}


MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartition(MergeTreeData::DataPartState state, const String & partition_id)
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ class MergeTreeData : public ITableDeclaration

/// Returns a committed part with the given name or a part containing it. If there is no such part, returns nullptr.
DataPartPtr getActiveContainingPart(const String & part_name);
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info);
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock &lock);

/// Returns all parts in specified partition
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Storages/MergeTree/MergeTreeDataPartChecksum.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ struct MinimalisticDataPartChecksums
uint128 hash_of_uncompressed_files {};
uint128 uncompressed_hash_of_compressed_files {};

bool operator==(const MinimalisticDataPartChecksums & other) const
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI. You can also provide tuple method, that will use std::tie or std::forward_as_tuple.

{
return num_compressed_files == other.num_compressed_files
&& num_uncompressed_files == other.num_uncompressed_files
&& hash_of_all_files == other.hash_of_all_files
&& hash_of_uncompressed_files == other.hash_of_uncompressed_files
&& uncompressed_hash_of_compressed_files == other.uncompressed_hash_of_compressed_files;
}

/// Is set only for old formats
std::unique_ptr<MergeTreeDataPartChecksums> full_checksums;

Expand Down
73 changes: 63 additions & 10 deletions dbms/src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2714,7 +2714,9 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name)

bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_path, bool to_detached, size_t quorum)
{
if (auto part = data.getPartIfExists(part_name, {MergeTreeDataPart::State::Outdated, MergeTreeDataPart::State::Deleting}))
const auto part_info = MergeTreePartInfo::fromPartName(part_name, data.format_version);

if (auto part = data.getPartIfExists(part_info, {MergeTreeDataPart::State::Outdated, MergeTreeDataPart::State::Deleting}))
{
LOG_DEBUG(log, "Part " << part->getNameWithState() << " should be deleted after previous attempt before fetch");
/// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt.
Expand Down Expand Up @@ -2755,17 +2757,64 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
part_name, part, replaced_parts, nullptr);
};

ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef());
auto [user, password] = context.getInterserverCredentials();
String interserver_scheme = context.getInterserverScheme();
MergeTreeData::DataPartPtr part_to_clone;
{
/// If the desired part is a result of a part mutation, try to find the source part and compare
/// its checksums to the checksums of the desired part. If they match, we can just clone the local part.

/// If we have the source part, its part_info will contain covered_part_info.
auto covered_part_info = part_info;
covered_part_info.mutation = 0;
auto source_part = data.getActiveContainingPart(covered_part_info);

try
if (source_part)
{
MinimalisticDataPartChecksums source_part_checksums;
source_part_checksums.computeTotalChecksums(source_part->checksums);

String desired_checksums_str = getZooKeeper()->get(replica_path + "/parts/" + part_name + "/checksums");
auto desired_checksums = MinimalisticDataPartChecksums::deserializeFrom(desired_checksums_str);
if (source_part_checksums == desired_checksums)
{
LOG_TRACE(log, "Found local part " << source_part->name << " with the same checksums as " << part_name);
part_to_clone = source_part;
}
}

}

std::function<MergeTreeData::MutableDataPartPtr()> get_part;
if (part_to_clone)
{
get_part = [&, part_to_clone]()
{
return data.cloneAndLoadDataPart(part_to_clone, "tmp_clone_", part_info);
};
}
else
{
if (interserver_scheme != address.scheme)
throw Exception("Interserver schemes are different '" + interserver_scheme + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::LOGICAL_ERROR);
ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef());
auto [user, password] = context.getInterserverCredentials();
String interserver_scheme = context.getInterserverScheme();

part = fetcher.fetchPart(part_name, replica_path, address.host, address.replication_port, timeouts, user, password, interserver_scheme, to_detached);
get_part = [&, address, timeouts, user, password, interserver_scheme]()
{
if (interserver_scheme != address.scheme)
throw Exception("Interserver schemes are different: '" + interserver_scheme
+ "' != '" + address.scheme + "', can't fetch part from " + address.host,
ErrorCodes::LOGICAL_ERROR);

return fetcher.fetchPart(
part_name, replica_path,
address.host, address.replication_port,
timeouts, user, password, interserver_scheme, to_detached);
};
}

try
{
part = get_part();

if (!to_detached)
{
Expand Down Expand Up @@ -2810,7 +2859,11 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin

ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);

LOG_DEBUG(log, "Fetched part " << part_name << " from " << replica_path << (to_detached ? " (to 'detached' directory)" : ""));
if (part_to_clone)
LOG_DEBUG(log, "Cloned part " << part_name << " from " << part_to_clone->name << (to_detached ? " (to 'detached' directory)" : ""));
else
LOG_DEBUG(log, "Fetched part " << part_name << " from " << replica_path << (to_detached ? " (to 'detached' directory)" : ""));

return true;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
*** Check data after fetch of merged part ***
all_0_2_1 1
all_0_2_1 2
all_0_2_1 3
*** Check data after fetch/clone of mutated part ***
all_0_2_1_3 1
all_0_2_1_3 2
all_0_2_1_3 3
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
DROP TABLE IF EXISTS test.fetches_r1;
DROP TABLE IF EXISTS test.fetches_r2;

CREATE TABLE test.fetches_r1(x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/fetches', 'r1') ORDER BY x;
CREATE TABLE test.fetches_r2(x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/fetches', 'r2') ORDER BY x
SETTINGS prefer_fetch_merged_part_time_threshold=0,
prefer_fetch_merged_part_size_threshold=0;

INSERT INTO test.fetches_r1 VALUES (1);
INSERT INTO test.fetches_r1 VALUES (2);
INSERT INTO test.fetches_r1 VALUES (3);

SYSTEM SYNC REPLICA test.fetches_r2;

DETACH TABLE test.fetches_r2;

SET replication_alter_partitions_sync=0;
OPTIMIZE TABLE test.fetches_r1 PARTITION tuple() FINAL;
SYSTEM SYNC REPLICA test.fetches_r1;

-- After attach replica r2 should fetch the merged part from r1.
ATTACH TABLE test.fetches_r2;
SYSTEM SYNC REPLICA test.fetches_r2;

SELECT '*** Check data after fetch of merged part ***';
SELECT _part, * FROM test.fetches_r2 ORDER BY x;

DETACH TABLE test.fetches_r2;

-- Add mutation that doesn't change data.
ALTER TABLE test.fetches_r1 DELETE WHERE x = 0;
SYSTEM SYNC REPLICA test.fetches_r1;

-- After attach replica r2 should compare checksums for mutated part and clone the local part.
ATTACH TABLE test.fetches_r2;
SYSTEM SYNC REPLICA test.fetches_r2;

SELECT '*** Check data after fetch/clone of mutated part ***';
SELECT _part, * FROM test.fetches_r2 ORDER BY x;

DROP TABLE test.fetches_r1;
DROP TABLE test.fetches_r2;