Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
.tag("request_type", "SET_JOB")
.tag("job_id", request.job_id);
if (request.__isset.event) {
st = manager.set_event(request.job_id, request.event);
const std::vector<int64_t>* table_ids_ptr = nullptr;
if (request.__isset.table_ids) {
table_ids_ptr = &request.table_ids;
}
st = manager.set_event(request.job_id, request.event, false, table_ids_ptr);
if (st.ok()) {
break;
}
Expand Down
136 changes: 131 additions & 5 deletions be/src/cloud/cloud_internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,27 @@
#include <bthread/countdown_event.h>

#include <algorithm>
#include <chrono>
#include <limits>
#include <list>
#include <memory>
#include <optional>
#include <thread>
#include <unordered_map>

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/cloud_warm_up_manager.h"
#include "cloud/cloud_warmup_metrics.h"
#include "cloud/config.h"
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "runtime/thread_context.h"
#include "runtime/workload_management/io_throttle.h"
#include "util/async_io.h"
#include "util/bvar_windowed_adder.h"
#include "util/debug_points.h"

namespace doris {
Expand Down Expand Up @@ -410,10 +417,103 @@ bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_wait_for_compaction_num(
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_wait_for_compaction_timeout_num(
"file_cache_warm_up_rowset_wait_for_compaction_timeout_num");

// Per-job windowed metrics for target BE
// bvar::Window enforces MAX_SECONDS_LIMIT = 3600, so the longest window is 1h.
static constexpr int WINDOW_5M = 300;
static constexpr int WINDOW_30M = 1800;
static constexpr int WINDOW_1H = 3600;

MBvarWindowedAdder g_warmup_ed_finish_segment_num("warmup_ed_finish_segment_num", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
MBvarWindowedAdder g_warmup_ed_finish_segment_size("warmup_ed_finish_segment_size", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
MBvarWindowedAdder g_warmup_ed_finish_index_num("warmup_ed_finish_index_num", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
MBvarWindowedAdder g_warmup_ed_finish_index_size("warmup_ed_finish_index_size", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
MBvarWindowedAdder g_warmup_ed_fail_segment_num("warmup_ed_fail_segment_num", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
MBvarWindowedAdder g_warmup_ed_fail_segment_size("warmup_ed_fail_segment_size", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
MBvarWindowedAdder g_warmup_ed_fail_index_num("warmup_ed_fail_index_num", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
MBvarWindowedAdder g_warmup_ed_fail_index_size("warmup_ed_fail_index_size", {"job_id"},
{WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
bvar::MultiDimension<bvar::Status<int64_t>> g_warmup_ed_last_finish_ts({"job_id"});

void update_warmup_ed_last_finish_ts(const std::string& job_id_str) {
auto* finish_ts = g_warmup_ed_last_finish_ts.get_stats(std::list<std::string> {job_id_str});
if (finish_ts) {
finish_ts->set_value(std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count());
}
}

void record_warmup_ed_finish_segment(const std::string& job_id_str, int64_t segment_size) {
g_warmup_ed_finish_segment_num.put({job_id_str}, 1);
g_warmup_ed_finish_segment_size.put({job_id_str}, segment_size);
update_warmup_ed_last_finish_ts(job_id_str);
}

void record_warmup_ed_finish_index(const std::string& job_id_str, int64_t idx_size) {
g_warmup_ed_finish_index_num.put({job_id_str}, 1);
g_warmup_ed_finish_index_size.put({job_id_str}, idx_size);
update_warmup_ed_last_finish_ts(job_id_str);
}

void record_warmup_ed_fail_segment(const std::string& job_id_str, int64_t segment_size) {
g_warmup_ed_fail_segment_num.put({job_id_str}, 1);
g_warmup_ed_fail_segment_size.put({job_id_str}, segment_size);
}

void record_warmup_ed_fail_index(const std::string& job_id_str, int64_t idx_size) {
g_warmup_ed_fail_index_num.put({job_id_str}, 1);
g_warmup_ed_fail_index_size.put({job_id_str}, idx_size);
}

void record_warmup_ed_skipped_rowset_as_finished(RowsetMeta& rs_meta,
const std::string& job_id_str) {
auto schema_ptr = rs_meta.tablet_schema();
bool has_inverted_index = schema_ptr->has_inverted_index() || schema_ptr->has_ann_index();
auto idx_version = schema_ptr->get_inverted_index_storage_format();
for (int64_t segment_id = 0; segment_id < rs_meta.num_segments(); segment_id++) {
record_warmup_ed_finish_segment(job_id_str, rs_meta.segment_file_size(segment_id));

if (!has_inverted_index) {
continue;
}
auto&& inverted_index_info = rs_meta.inverted_index_file_info(segment_id);
if (idx_version == InvertedIndexStorageFormatPB::V1) {
std::unordered_map<int64_t, int64_t> index_size_map;
for (const auto& info : inverted_index_info.index_info()) {
if (info.index_file_size() != -1) {
index_size_map[info.index_id()] = info.index_file_size();
} else {
VLOG_DEBUG << "Invalid index_file_size for segment_id " << segment_id
<< ", index_id " << info.index_id();
}
}
for (const auto& index : schema_ptr->inverted_indexes()) {
record_warmup_ed_finish_index(job_id_str, index_size_map[index->index_id()]);
}
} else { // InvertedIndexStorageFormatPB::V2
int64_t idx_size = 0;
if (inverted_index_info.has_index_size()) {
idx_size = inverted_index_info.index_size();
} else {
VLOG_DEBUG << "index_size is not set for segment " << segment_id;
}
record_warmup_ed_finish_index(job_id_str, idx_size);
}
}
}

void handle_segment_download_done(Status st, int64_t tablet_id, const RowsetId& rowset_id,
int64_t segment_id, std::shared_ptr<CloudTablet> tablet,
std::shared_ptr<bthread::CountdownEvent> wait, Version version,
int64_t segment_size, int64_t request_ts, int64_t handle_ts) {
int64_t segment_size, int64_t request_ts, int64_t handle_ts,
std::string job_id_str, int64_t upstream_trigger_ts_ms) {
DBUG_EXECUTE_IF("CloudInternalServiceImpl::warm_up_rowset.download_segment", {
auto sleep_time = dp->param<int>("sleep", 3);
LOG_INFO("[verbose] block download for rowset={}, version={}, sleep={}",
Expand All @@ -431,6 +531,7 @@ void handle_segment_download_done(Status st, int64_t tablet_id, const RowsetId&
if (st.ok()) {
g_file_cache_event_driven_warm_up_finished_segment_num << 1;
g_file_cache_event_driven_warm_up_finished_segment_size << segment_size;
record_warmup_ed_finish_segment(job_id_str, segment_size);
int64_t now_ts = current_unix_time_us();
g_file_cache_warm_up_rowset_last_finish_unix_ts.set_value(now_ts);
auto rowset_latency_us = warm_up_rowset_cross_host_latency_us(request_ts, now_ts);
Expand All @@ -454,6 +555,7 @@ void handle_segment_download_done(Status st, int64_t tablet_id, const RowsetId&
} else {
g_file_cache_event_driven_warm_up_failed_segment_num << 1;
g_file_cache_event_driven_warm_up_failed_segment_size << segment_size;
record_warmup_ed_fail_segment(job_id_str, segment_size);
LOG(WARNING) << "download segment failed, tablet_id: " << tablet_id
<< " rowset_id: " << rowset_id.to_string() << ", error: " << st;
}
Expand All @@ -463,6 +565,7 @@ void handle_segment_download_done(Status st, int64_t tablet_id, const RowsetId&
VLOG_DEBUG << "warmup rowset " << version.to_string() << "(" << rowset_id.to_string()
<< ") completed";
}
g_warmup_ed_downstream_progress_tracker.record_task_done(job_id_str, upstream_trigger_ts_ms);
if (wait) {
wait->signal();
}
Expand All @@ -473,7 +576,8 @@ void handle_inverted_index_download_done(Status st, int64_t tablet_id, const Row
std::shared_ptr<CloudTablet> tablet,
std::shared_ptr<bthread::CountdownEvent> wait,
Version version, uint64_t idx_size, int64_t request_ts,
int64_t handle_ts) {
int64_t handle_ts, std::string job_id_str,
int64_t upstream_trigger_ts_ms) {
DBUG_EXECUTE_IF("CloudInternalServiceImpl::warm_up_rowset.download_inverted_idx", {
auto sleep_time = dp->param<int>("sleep", 3);
LOG_INFO(
Expand All @@ -485,6 +589,7 @@ void handle_inverted_index_download_done(Status st, int64_t tablet_id, const Row
if (st.ok()) {
g_file_cache_event_driven_warm_up_finished_index_num << 1;
g_file_cache_event_driven_warm_up_finished_index_size << idx_size;
record_warmup_ed_finish_index(job_id_str, static_cast<int64_t>(idx_size));
int64_t now_ts = current_unix_time_us();
g_file_cache_warm_up_rowset_last_finish_unix_ts.set_value(now_ts);
auto rowset_latency_us = warm_up_rowset_cross_host_latency_us(request_ts, now_ts);
Expand All @@ -508,6 +613,7 @@ void handle_inverted_index_download_done(Status st, int64_t tablet_id, const Row
} else {
g_file_cache_event_driven_warm_up_failed_index_num << 1;
g_file_cache_event_driven_warm_up_failed_index_size << idx_size;
record_warmup_ed_fail_index(job_id_str, static_cast<int64_t>(idx_size));
LOG(WARNING) << "download inverted index failed, tablet_id: " << tablet_id
<< " rowset_id: " << rowset_id << ", error: " << st;
}
Expand All @@ -517,6 +623,7 @@ void handle_inverted_index_download_done(Status st, int64_t tablet_id, const Row
VLOG_DEBUG << "warmup rowset " << version.to_string() << "(" << rowset_id.to_string()
<< ") completed";
}
g_warmup_ed_downstream_progress_tracker.record_task_done(job_id_str, upstream_trigger_ts_ms);
if (wait) {
wait->signal();
}
Expand All @@ -537,6 +644,11 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
due_time = butil::milliseconds_from_now(request->sync_wait_timeout_ms());
}

// Extract job_id from request (0 if not set, for backward compatibility)
std::string job_id_str = std::to_string(request->has_job_id() ? request->job_id() : 0);
int64_t upstream_trigger_ts_ms =
request->has_upstream_trigger_ts_ms() ? request->upstream_trigger_ts_ms() : 0;

for (auto& rs_meta_pb : request->rowset_metas()) {
RowsetMeta rs_meta;
rs_meta.init_from_pb(rs_meta_pb);
Expand Down Expand Up @@ -584,8 +696,15 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
if (!tablet->add_rowset_warmup_state(rs_meta, WarmUpTriggerSource::EVENT_DRIVEN)) {
LOG(INFO) << "found duplicate warmup task for rowset " << rowset_id.to_string()
<< ", skip it";
g_warmup_ed_downstream_progress_tracker.record_task_done(job_id_str,
upstream_trigger_ts_ms);
record_warmup_ed_skipped_rowset_as_finished(rs_meta, job_id_str);
continue;
}
if (rs_meta.num_segments() == 0) {
g_warmup_ed_downstream_progress_tracker.record_task_done(job_id_str,
upstream_trigger_ts_ms);
}

for (int64_t segment_id = 0; segment_id < rs_meta.num_segments(); segment_id++) {
if (!config::file_cache_enable_only_warm_up_idx) {
Expand All @@ -607,20 +726,24 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
.download_done = [=, version = rs_meta.version()](Status st) {
handle_segment_download_done(st, tablet_id, rowset_id, segment_id,
tablet, wait, version, segment_size,
request_ts, handle_ts);
request_ts, handle_ts, job_id_str,
upstream_trigger_ts_ms);
}};

g_file_cache_event_driven_warm_up_submitted_segment_num << 1;
g_file_cache_event_driven_warm_up_submitted_segment_size << segment_size;
if (wait) {
wait->add_count();
}
g_warmup_ed_downstream_progress_tracker.record_task_submit(job_id_str,
upstream_trigger_ts_ms);

_engine.file_cache_block_downloader().submit_download_task(download_meta);
}

// Use rs_meta.fs() to support packed files for inverted index download.
auto download_inverted_index = [&, tablet](std::string index_path, uint64_t idx_size) {
auto download_inverted_index = [&, tablet, job_id_str](std::string index_path,
uint64_t idx_size) {
io::DownloadFileMeta download_meta {
.path = io::Path(index_path),
.file_size = static_cast<int64_t>(idx_size),
Expand All @@ -632,7 +755,8 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
.download_done = [=, version = rs_meta.version()](Status st) {
handle_inverted_index_download_done(
st, tablet_id, rowset_id, segment_id, index_path, tablet, wait,
version, idx_size, request_ts, handle_ts);
version, idx_size, request_ts, handle_ts, job_id_str,
upstream_trigger_ts_ms);
}};
g_file_cache_event_driven_warm_up_submitted_index_num << 1;
g_file_cache_event_driven_warm_up_submitted_index_size << idx_size;
Expand All @@ -641,6 +765,8 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
if (wait) {
wait->add_count();
}
g_warmup_ed_downstream_progress_tracker.record_task_submit(job_id_str,
upstream_trigger_ts_ms);
_engine.file_cache_block_downloader().submit_download_task(download_meta);
};

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1525,7 +1525,7 @@ Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, const std::string& job_i
<< ", with timeout: " << timeout_ms << " ms";
}
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
manager.warm_up_rowset(rs_meta, timeout_ms);
manager.warm_up_rowset(rs_meta, table_id, timeout_ms);
return st;
}

Expand Down
Loading
Loading