Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
.tag("request_type", "SET_JOB")
.tag("job_id", request.job_id);
if (request.__isset.event) {
st = manager.set_event(request.job_id, request.event);
const std::vector<int64_t>* table_ids_ptr = nullptr;
if (request.__isset.table_ids) {
table_ids_ptr = &request.table_ids;
}
st = manager.set_event(request.job_id, request.event, false, table_ids_ptr);
if (st.ok()) {
break;
}
Expand Down
139 changes: 133 additions & 6 deletions be/src/cloud/cloud_internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,27 @@
#include <bthread/countdown_event.h>

#include <algorithm>
#include <chrono>
#include <limits>
#include <list>
#include <memory>
#include <optional>
#include <thread>
#include <unordered_map>

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/cloud_warm_up_manager.h"
#include "cloud/cloud_warmup_metrics.h"
#include "cloud/config.h"
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "runtime/thread_context.h"
#include "runtime/workload_management/io_throttle.h"
#include "util/async_io.h"
#include "util/bvar_windowed_adder.h"
#include "util/debug_points.h"

namespace doris {
Expand Down Expand Up @@ -407,10 +414,103 @@ bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_wait_for_compaction_num(
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_wait_for_compaction_timeout_num(
"file_cache_warm_up_rowset_wait_for_compaction_timeout_num");

// Per-job windowed metrics for target BE
// bvar::Window enforces MAX_SECONDS_LIMIT = 3600, so the longest window is 1h.
static constexpr int WINDOW_5M = 300;
static constexpr int WINDOW_30M = 1800;
static constexpr int WINDOW_1H = 3600;

MBvarWindowedAdder g_warmup_ed_finish_segment_num("warmup_ed_finish_segment_num", {"job_id"},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any memory issues if there are many jobs.
how does bvar implement "windows", does it recored every smaples of the adder every second?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does "ed" mean?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the bvar implementation again.

bvar::Window does not record every update written to the Adder. For bvar::Adder, the underlying sampler samples the cumulative adder value roughly once per second, and the window value is calculated from the difference between the latest sampled cumulative value and the oldest sampled cumulative value in the requested window.

The 5m/30m/1h windows created for the same Adder also share the same underlying sampler. The sampler queue is sized by the largest window, so here it keeps about 3600 + 1 samples, not 300 + 1800 + 3600 samples and not one sample per warm-up event.

Rough estimate:

  • One Sample<int64_t> stores data and time_us, so it is about 16 bytes.
  • The largest window is 1h, so one sampler queue is about (3600 + 1) * 16 ~= 56KB.
  • Source-side stats have 4 windowed adders, about 4 * 56KB ~= 224KB/job for sampler queues.
  • Target-side stats have 8 windowed adders, about 8 * 56KB ~= 448KB/job for sampler queues.
  • If the same BE process observes both sides, the sampler queue storage is roughly (4 + 8) * 56KB ~= 672KB/job, plus small object/map/string overhead.

So this is proportional to the number of job_id dimensions seen by a BE process, not proportional to the number of rowsets/segments/events. The overall memory usage should be small for the expected number of warm-up jobs. This state is also BE-process-local memory only; it is not persisted and will be released after BE restart.

{WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
MBvarWindowedAdder g_warmup_ed_finish_segment_size("warmup_ed_finish_segment_size", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
MBvarWindowedAdder g_warmup_ed_finish_index_num("warmup_ed_finish_index_num", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
MBvarWindowedAdder g_warmup_ed_finish_index_size("warmup_ed_finish_index_size", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
MBvarWindowedAdder g_warmup_ed_fail_segment_num("warmup_ed_fail_segment_num", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
MBvarWindowedAdder g_warmup_ed_fail_segment_size("warmup_ed_fail_segment_size", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
MBvarWindowedAdder g_warmup_ed_fail_index_num("warmup_ed_fail_index_num", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
MBvarWindowedAdder g_warmup_ed_fail_index_size("warmup_ed_fail_index_size", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
bvar::MultiDimension<bvar::Status<int64_t>> g_warmup_ed_last_finish_ts({"job_id"});

void update_warmup_ed_last_finish_ts(const std::string& job_id_str) {
auto* finish_ts = g_warmup_ed_last_finish_ts.get_stats(std::list<std::string> {job_id_str});
if (finish_ts) {
finish_ts->set_value(std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count());
}
}

void record_warmup_ed_finish_segment(const std::string& job_id_str, int64_t segment_size) {
g_warmup_ed_finish_segment_num.put({job_id_str}, 1);
g_warmup_ed_finish_segment_size.put({job_id_str}, segment_size);
update_warmup_ed_last_finish_ts(job_id_str);
}

void record_warmup_ed_finish_index(const std::string& job_id_str, int64_t idx_size) {
g_warmup_ed_finish_index_num.put({job_id_str}, 1);
g_warmup_ed_finish_index_size.put({job_id_str}, idx_size);
update_warmup_ed_last_finish_ts(job_id_str);
}

void record_warmup_ed_fail_segment(const std::string& job_id_str, int64_t segment_size) {
g_warmup_ed_fail_segment_num.put({job_id_str}, 1);
g_warmup_ed_fail_segment_size.put({job_id_str}, segment_size);
}

void record_warmup_ed_fail_index(const std::string& job_id_str, int64_t idx_size) {
g_warmup_ed_fail_index_num.put({job_id_str}, 1);
g_warmup_ed_fail_index_size.put({job_id_str}, idx_size);
}

void record_warmup_ed_skipped_rowset_as_finished(RowsetMeta& rs_meta,
const std::string& job_id_str) {
auto schema_ptr = rs_meta.tablet_schema();
bool has_inverted_index = schema_ptr->has_inverted_index() || schema_ptr->has_ann_index();
auto idx_version = schema_ptr->get_inverted_index_storage_format();
for (int64_t segment_id = 0; segment_id < rs_meta.num_segments(); segment_id++) {
record_warmup_ed_finish_segment(job_id_str, rs_meta.segment_file_size(segment_id));

if (!has_inverted_index) {
continue;
}
auto&& inverted_index_info = rs_meta.inverted_index_file_info(segment_id);
if (idx_version == InvertedIndexStorageFormatPB::V1) {
std::unordered_map<int64_t, int64_t> index_size_map;
for (const auto& info : inverted_index_info.index_info()) {
if (info.index_file_size() != -1) {
index_size_map[info.index_id()] = info.index_file_size();
} else {
VLOG_DEBUG << "Invalid index_file_size for segment_id " << segment_id
<< ", index_id " << info.index_id();
}
}
for (const auto& index : schema_ptr->inverted_indexes()) {
record_warmup_ed_finish_index(job_id_str, index_size_map[index->index_id()]);
}
} else { // InvertedIndexStorageFormatPB::V2
int64_t idx_size = 0;
if (inverted_index_info.has_index_size()) {
idx_size = inverted_index_info.index_size();
} else {
VLOG_DEBUG << "index_size is not set for segment " << segment_id;
}
record_warmup_ed_finish_index(job_id_str, idx_size);
}
}
}

void handle_segment_download_done(Status st, int64_t tablet_id, const RowsetId& rowset_id,
int64_t segment_id, std::shared_ptr<CloudTablet> tablet,
std::shared_ptr<bthread::CountdownEvent> wait, Version version,
int64_t segment_size, int64_t request_ts, int64_t handle_ts) {
int64_t segment_size, int64_t request_ts, int64_t handle_ts,
std::string job_id_str, int64_t upstream_trigger_ts_ms) {
DBUG_EXECUTE_IF("CloudInternalServiceImpl::warm_up_rowset.download_segment", {
auto sleep_time = dp->param<int>("sleep", 3);
LOG_INFO("[verbose] block download for rowset={}, version={}, sleep={}",
Expand All @@ -428,6 +528,7 @@ void handle_segment_download_done(Status st, int64_t tablet_id, const RowsetId&
if (st.ok()) {
g_file_cache_event_driven_warm_up_finished_segment_num << 1;
g_file_cache_event_driven_warm_up_finished_segment_size << segment_size;
record_warmup_ed_finish_segment(job_id_str, segment_size);
int64_t now_ts = current_unix_time_us();
g_file_cache_warm_up_rowset_last_finish_unix_ts.set_value(now_ts);
auto rowset_latency_us = warm_up_rowset_cross_host_latency_us(request_ts, now_ts);
Expand All @@ -451,6 +552,7 @@ void handle_segment_download_done(Status st, int64_t tablet_id, const RowsetId&
} else {
g_file_cache_event_driven_warm_up_failed_segment_num << 1;
g_file_cache_event_driven_warm_up_failed_segment_size << segment_size;
record_warmup_ed_fail_segment(job_id_str, segment_size);
LOG(WARNING) << "download segment failed, tablet_id: " << tablet_id
<< " rowset_id: " << rowset_id.to_string() << ", error: " << st;
}
Expand All @@ -460,6 +562,7 @@ void handle_segment_download_done(Status st, int64_t tablet_id, const RowsetId&
VLOG_DEBUG << "warmup rowset " << version.to_string() << "(" << rowset_id.to_string()
<< ") completed";
}
g_warmup_ed_downstream_progress_tracker.record_task_done(job_id_str, upstream_trigger_ts_ms);
if (wait) {
wait->signal();
}
Expand All @@ -470,7 +573,8 @@ void handle_inverted_index_download_done(Status st, int64_t tablet_id, const Row
std::shared_ptr<CloudTablet> tablet,
std::shared_ptr<bthread::CountdownEvent> wait,
Version version, uint64_t idx_size, int64_t request_ts,
int64_t handle_ts) {
int64_t handle_ts, std::string job_id_str,
int64_t upstream_trigger_ts_ms) {
DBUG_EXECUTE_IF("CloudInternalServiceImpl::warm_up_rowset.download_inverted_idx", {
auto sleep_time = dp->param<int>("sleep", 3);
LOG_INFO(
Expand All @@ -482,6 +586,7 @@ void handle_inverted_index_download_done(Status st, int64_t tablet_id, const Row
if (st.ok()) {
g_file_cache_event_driven_warm_up_finished_index_num << 1;
g_file_cache_event_driven_warm_up_finished_index_size << idx_size;
record_warmup_ed_finish_index(job_id_str, static_cast<int64_t>(idx_size));
int64_t now_ts = current_unix_time_us();
g_file_cache_warm_up_rowset_last_finish_unix_ts.set_value(now_ts);
auto rowset_latency_us = warm_up_rowset_cross_host_latency_us(request_ts, now_ts);
Expand All @@ -505,6 +610,7 @@ void handle_inverted_index_download_done(Status st, int64_t tablet_id, const Row
} else {
g_file_cache_event_driven_warm_up_failed_index_num << 1;
g_file_cache_event_driven_warm_up_failed_index_size << idx_size;
record_warmup_ed_fail_index(job_id_str, static_cast<int64_t>(idx_size));
LOG(WARNING) << "download inverted index failed, tablet_id: " << tablet_id
<< " rowset_id: " << rowset_id << ", error: " << st;
}
Expand All @@ -514,6 +620,7 @@ void handle_inverted_index_download_done(Status st, int64_t tablet_id, const Row
VLOG_DEBUG << "warmup rowset " << version.to_string() << "(" << rowset_id.to_string()
<< ") completed";
}
g_warmup_ed_downstream_progress_tracker.record_task_done(job_id_str, upstream_trigger_ts_ms);
if (wait) {
wait->signal();
}
Expand All @@ -534,6 +641,11 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
due_time = butil::milliseconds_from_now(request->sync_wait_timeout_ms());
}

// Extract job_id from request (0 if not set, for backward compatibility)
std::string job_id_str = std::to_string(request->has_job_id() ? request->job_id() : 0);
int64_t upstream_trigger_ts_ms =
request->has_upstream_trigger_ts_ms() ? request->upstream_trigger_ts_ms() : 0;

for (auto& rs_meta_pb : request->rowset_metas()) {
RowsetMeta rs_meta;
rs_meta.init_from_pb(rs_meta_pb);
Expand Down Expand Up @@ -581,8 +693,15 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
if (!tablet->add_rowset_warmup_state(rs_meta, WarmUpTriggerSource::EVENT_DRIVEN)) {
LOG(INFO) << "found duplicate warmup task for rowset " << rowset_id.to_string()
<< ", skip it";
g_warmup_ed_downstream_progress_tracker.record_task_done(job_id_str,
upstream_trigger_ts_ms);
record_warmup_ed_skipped_rowset_as_finished(rs_meta, job_id_str);
continue;
}
if (rs_meta.num_segments() == 0) {
g_warmup_ed_downstream_progress_tracker.record_task_done(job_id_str,
upstream_trigger_ts_ms);
}

for (int64_t segment_id = 0; segment_id < rs_meta.num_segments(); segment_id++) {
if (!config::file_cache_enable_only_warm_up_idx) {
Expand All @@ -605,7 +724,8 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
[=, version = rs_meta.version()](Status st) {
handle_segment_download_done(
st, tablet_id, rowset_id, segment_id, tablet, wait,
version, segment_size, request_ts, handle_ts);
version, segment_size, request_ts, handle_ts,
job_id_str, upstream_trigger_ts_ms);
},
.tablet_id = tablet_id};

Expand All @@ -614,12 +734,15 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
if (wait) {
wait->add_count();
}
g_warmup_ed_downstream_progress_tracker.record_task_submit(job_id_str,
upstream_trigger_ts_ms);

_engine.file_cache_block_downloader().submit_download_task(download_meta);
}

// Use rs_meta.fs() to support packed files for inverted index download.
auto download_inverted_index = [&, tablet](std::string index_path, uint64_t idx_size) {
auto download_inverted_index = [&, tablet, job_id_str](std::string index_path,
uint64_t idx_size) {
io::DownloadFileMeta download_meta {
.path = io::Path(index_path),
.file_size = static_cast<int64_t>(idx_size),
Expand All @@ -632,16 +755,20 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
[=, version = rs_meta.version()](Status st) {
handle_inverted_index_download_done(
st, tablet_id, rowset_id, segment_id, index_path,
tablet, wait, version, idx_size, request_ts, handle_ts);
tablet, wait, version, idx_size, request_ts, handle_ts,
job_id_str, upstream_trigger_ts_ms);
},
.tablet_id = tablet_id};
.tablet_id = tablet_id,
};
g_file_cache_event_driven_warm_up_submitted_index_num << 1;
g_file_cache_event_driven_warm_up_submitted_index_size << idx_size;
tablet->update_rowset_warmup_state_inverted_idx_num(
WarmUpTriggerSource::EVENT_DRIVEN, rowset_id, 1);
if (wait) {
wait->add_count();
}
g_warmup_ed_downstream_progress_tracker.record_task_submit(job_id_str,
upstream_trigger_ts_ms);
_engine.file_cache_block_downloader().submit_download_task(download_meta);
};

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1512,7 +1512,7 @@ Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, const std::string& job_i
<< ", with timeout: " << timeout_ms << " ms";
}
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
manager.warm_up_rowset(rs_meta, timeout_ms);
manager.warm_up_rowset(rs_meta, table_id, timeout_ms);
return st;
}

Expand Down
Loading
Loading