Skip to content

[feat](cloud) Add unused rowset state for CloudTablet #51573

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 48 additions & 22 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ bvar::LatencyRecorder g_cu_compaction_get_delete_bitmap_lock_time_ms(
bvar::LatencyRecorder g_base_compaction_get_delete_bitmap_lock_time_ms(
"base_compaction_get_delete_bitmap_lock_time_ms");

bvar::Adder<int64_t> g_unused_rowsets_count("unused_rowsets_count");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also record size in bytes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

bvar::Adder<int64_t> g_unused_rowsets_bytes("unused_rowsets_bytes");

static constexpr int LOAD_INITIATOR_ID = -1;

CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta)
Expand Down Expand Up @@ -342,17 +345,27 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
// replace existed rowset with `to_add` rowset. This may occur when:
// 1. schema change converts rowsets which have been double written to new tablet
// 2. cumu compaction picks single overlapping input rowset to perform compaction
if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
// add existed rowset to unused_rowsets to remove delete bitmap
if (auto find_it = _rs_version_map.find(rs->version());
find_it != _rs_version_map.end()) {

// add existed rowset to unused_rowsets to remove delete bitmap and recycle cached data

std::vector<RowsetSharedPtr> unused_rowsets;
if (auto find_it = _rs_version_map.find(rs->version());
find_it != _rs_version_map.end()) {
if (find_it->second->rowset_id() == rs->rowset_id()) {
LOG(WARNING) << "tablet_id=" << tablet_id()
<< ", rowset_id=" << rs->rowset_id().to_string()
<< ", existed rowset_id="
<< find_it->second->rowset_id().to_string();
DCHECK(find_it->second->rowset_id() != rs->rowset_id())
<< "tablet_id=" << tablet_id()
<< ", rowset_id=" << rs->rowset_id().to_string()
<< ", existed rowset=" << find_it->second->rowset_id().to_string();
_unused_rowsets.emplace(find_it->second->rowset_id(), find_it->second);
<< ", existed rowset_id="
<< find_it->second->rowset_id().to_string();
}
unused_rowsets.push_back(find_it->second);
}
add_unused_rowsets(unused_rowsets);

_tablet_meta->delete_rs_meta_by_version(rs->version(), nullptr);
_rs_version_map[rs->version()] = rs;
_tablet_meta->add_rowsets_unchecked({rs});
Expand Down Expand Up @@ -460,21 +473,16 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
}
_reconstruct_version_tracker_if_necessary();
}

// if the rowset is not used by any query, we can recycle its cached data early.
recycle_cached_data(expired_rowsets);
if (config::enable_mow_verbose_log) {
LOG_INFO("finish delete_expired_stale_rowset for tablet={}", tablet_id());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems we don't need reclycle_cached_data in line 463 no more?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems we don't need reclycle_cached_data in line 463 no more?

Add a comment,we can recycle cached data early, if stale rowset is not used

}

add_unused_rowsets(expired_rowsets);
if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write() &&
!deleted_stale_rowsets.empty()) {
// record expired rowsets in unused rowsets
{
std::lock_guard<std::mutex> lock(_gc_mutex);
for (const auto& rowset : expired_rowsets) {
_unused_rowsets.emplace(rowset->rowset_id(), rowset);
}
}

// agg delete bitmap for pre rowsets; record unused delete bitmap key ranges
OlapStopWatch watch;
for (const auto& [version, unused_rowsets] : deleted_stale_rowsets) {
Expand Down Expand Up @@ -504,19 +512,36 @@ bool CloudTablet::need_remove_unused_rowsets() {
return !_unused_rowsets.empty() || !_unused_delete_bitmap.empty();
}

void CloudTablet::add_unused_rowsets(const std::vector<RowsetSharedPtr>& rowsets) {
std::lock_guard<std::mutex> lock(_gc_mutex);
for (const auto& rowset : rowsets) {
_unused_rowsets[rowset->rowset_id()] = rowset;
g_unused_rowsets_bytes << rowset->total_disk_size();
}
g_unused_rowsets_count << rowsets.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also record size in bytes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also record size in bytes

done

}

void CloudTablet::remove_unused_rowsets() {
int64_t removed_rowsets_num = 0;
int64_t removed_delete_bitmap_num = 0;
OlapStopWatch watch;
std::lock_guard<std::mutex> lock(_gc_mutex);
// 1. remove unused rowsets and delete bitmap
// 1. remove unused rowsets's cache data and delete bitmap
for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) {
// it->second is std::shared_ptr<Rowset>
auto&& rs = it->second;
if (rs.use_count() > 1) {
LOG(WARNING) << "Rowset " << rs->rowset_id() << " has " << rs.use_count()
<< " references. Can not remove delete bitmap.";
LOG(WARNING) << "tablet_id:" << tablet_id() << " rowset: " << rs->rowset_id() << " has "
<< rs.use_count() << " references, it cannot be removed";
++it;
continue;
}
tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version());
rs->clear_cache();
it = _unused_rowsets.erase(it);
g_unused_rowsets_count << -1;
g_unused_rowsets_bytes << -rs->total_disk_size();
removed_rowsets_num++;
}

// 2. remove delete bitmap of pre rowsets
Expand All @@ -538,13 +563,14 @@ void CloudTablet::remove_unused_rowsets() {
auto& key_ranges = std::get<1>(*it);
tablet_meta()->delete_bitmap().remove(key_ranges);
it = _unused_delete_bitmap.erase(it);
removed_delete_bitmap_num++;
}

if (!_unused_rowsets.empty() || !_unused_delete_bitmap.empty()) {
LOG(INFO) << "tablet_id=" << tablet_id()
<< ", unused_rowset size=" << _unused_rowsets.size()
<< ", unused_delete_bitmap size=" << _unused_delete_bitmap.size();
}
LOG(INFO) << "tablet_id=" << tablet_id() << ", unused_rowset size=" << _unused_rowsets.size()
<< ", unused_delete_bitmap size=" << _unused_delete_bitmap.size()
<< ", removed_rowsets_num=" << removed_rowsets_num
<< ", removed_delete_bitmap_num=" << removed_delete_bitmap_num
<< ", cost(us)=" << watch.get_elapse_time_us();
}

void CloudTablet::update_base_size(const Rowset& rs) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ class CloudTablet final : public BaseTablet {
std::map<std::string, int64_t>& pre_rowset_to_versions);

bool need_remove_unused_rowsets();

void add_unused_rowsets(const std::vector<RowsetSharedPtr>& rowsets);
void remove_unused_rowsets();

private:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql1 --
1 99
2 99
3 99
4 99
5 99

-- !sql2 --
1 99
2 99
3 99
4 99
5 99

-- !sql3 --
1 99
2 99
3 99
4 99
5 99

-- !sql4 --
1 99
2 99
3 99
4 99
5 100

-- !sql5 --
1 99
2 99
3 99
4 99
5 100

Loading