Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
.tag("request_type", "SET_JOB")
.tag("job_id", request.job_id);
if (request.__isset.event) {
st = manager.set_event(request.job_id, request.event);
const std::vector<int64_t>* table_ids_ptr = nullptr;
if (request.__isset.table_ids) {
table_ids_ptr = &request.table_ids;
}
st = manager.set_event(request.job_id, request.event, false, table_ids_ptr);
if (st.ok()) {
break;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_delete_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ Status CloudDeleteTask::execute(CloudStorageEngine& engine, const TPushReq& requ
return st;
}

st = engine.meta_mgr().commit_rowset(*rowset->rowset_meta(), "");
st = engine.meta_mgr().commit_rowset(*rowset->rowset_meta(), "", tablet->table_id());

// Update tablet stats
tablet->fetch_add_approximate_num_rowsets(1);
Expand Down
6 changes: 4 additions & 2 deletions be/src/cloud/cloud_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ Status CloudDeltaWriter::commit_rowset() {
}

// Handle normal rowset with data
return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
return _engine.meta_mgr().commit_rowset(*rowset_meta(), "",
rowset_builder()->tablet()->table_id());
}

Status CloudDeltaWriter::_commit_empty_rowset() {
Expand All @@ -139,7 +140,8 @@ Status CloudDeltaWriter::_commit_empty_rowset() {
return Status::OK();
}
// write a empty rowset kv to keep version continuous
return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
return _engine.meta_mgr().commit_rowset(*rowset_meta(), "",
rowset_builder()->tablet()->table_id());
}

Status CloudDeltaWriter::set_txn_related_delete_bitmap() {
Expand Down
367 changes: 251 additions & 116 deletions be/src/cloud/cloud_internal_service.cpp

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,7 @@ Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, const std::string
return st;
}

Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, const std::string& job_id,
Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, const std::string& job_id, int64_t table_id,
RowsetMetaSharedPtr* existed_rs_meta) {
VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id()
<< ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id();
Expand Down Expand Up @@ -1361,7 +1361,7 @@ Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, const std::string& job_i
<< ", with timeout: " << timeout_ms << " ms";
}
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
manager.warm_up_rowset(rs_meta, timeout_ms);
manager.warm_up_rowset(rs_meta, table_id, timeout_ms);
return st;
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class CloudMetaMgr {
Status prepare_rowset(const RowsetMeta& rs_meta, const std::string& job_id,
std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr);

Status commit_rowset(RowsetMeta& rs_meta, const std::string& job_id,
Status commit_rowset(RowsetMeta& rs_meta, const std::string& job_id, int64_t table_id,
std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr);

Status update_tmp_rowset(const RowsetMeta& rs_meta);
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,8 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
st.to_string());
}

st = _cloud_storage_engine.meta_mgr().commit_rowset(*rowset_writer->rowset_meta(), _job_id,
&existed_rs_meta);
st = _cloud_storage_engine.meta_mgr().commit_rowset(
*rowset_writer->rowset_meta(), _job_id, _new_tablet->table_id(), &existed_rs_meta);
if (!st.ok()) {
if (st.is<ALREADY_EXIST>()) {
LOG(INFO) << "Rowset " << rs_reader->version() << " has already existed in tablet "
Expand Down
223 changes: 175 additions & 48 deletions be/src/cloud/cloud_warm_up_manager.cpp

Large diffs are not rendered by default.

39 changes: 32 additions & 7 deletions be/src/cloud/cloud_warm_up_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
#include <bthread/countdown_event.h>

#include <condition_variable>
#include <cstddef>
#include <deque>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_set>
#include <vector>

#include "cloud/cloud_storage_engine.h"
Expand All @@ -39,6 +41,16 @@ enum class DownloadType {
S3,
};

// Filter for event-driven warmup jobs.
// nullopt = cluster-level (no table filter, warm up all tables)
// has_value = table-level filter (only warm up tables in the set)
using EventDrivenJobFilter = std::optional<std::unordered_set<int64_t>>;

struct JobReplicaInfo {
int64_t job_id;
TReplicaInfo replica;
};

struct JobMeta {
JobMeta() = default;
JobMeta(const TJobMeta& meta);
Expand Down Expand Up @@ -75,7 +87,8 @@ class CloudWarmUpManager {
// Cancel the job
Status clear_job(int64_t job_id);

Status set_event(int64_t job_id, TWarmUpEventType::type event, bool clear = false);
Status set_event(int64_t job_id, TWarmUpEventType::type event, bool clear = false,
const std::vector<int64_t>* table_ids = nullptr);

// If `sync_wait_timeout_ms` <= 0, the function will send the warm-up RPC
// and return immediately without waiting for the warm-up to complete.
Expand All @@ -85,7 +98,7 @@ class CloudWarmUpManager {
// @param rs_meta Metadata of the rowset to be warmed up.
// @param sync_wait_timeout_ms Timeout in milliseconds to wait for the warm-up
// to complete. Non-positive value means no waiting.
void warm_up_rowset(RowsetMeta& rs_meta, int64_t sync_wait_timeout_ms = -1);
void warm_up_rowset(RowsetMeta& rs_meta, int64_t table_id, int64_t sync_wait_timeout_ms = -1);

void recycle_cache(int64_t tablet_id, const std::vector<RecycledRowsets>& rowsets);

Expand All @@ -98,17 +111,27 @@ class CloudWarmUpManager {
std::unordered_map<int64_t, std::pair<std::string, int32_t>> get_all_balanced_tablets() const;

private:
struct WarmUpRowsetFailure {
int code;
std::string reason;
};

static Status _build_warm_up_rowset_result(const std::vector<WarmUpRowsetFailure>& failures,
size_t replica_count, int64_t tablet_id,
int64_t table_id, const std::string& rowset_id);

void schedule_remove_balanced_tablet(int64_t tablet_id);
static void clean_up_expired_mappings(void* arg);
void handle_jobs();

Status _do_warm_up_rowset(RowsetMeta& rs_meta, std::vector<TReplicaInfo>& replicas,
int64_t sync_wait_timeout_ms, bool skip_existence_check);
Status _do_warm_up_rowset(RowsetMeta& rs_meta, int64_t table_id,
std::vector<JobReplicaInfo>& replicas, int64_t sync_wait_timeout_ms,
bool skip_existence_check);

std::vector<TReplicaInfo> get_replica_info(int64_t tablet_id, bool bypass_cache,
bool& cache_hit);
std::vector<JobReplicaInfo> get_replica_info(int64_t tablet_id, int64_t table_id,
bool bypass_cache, bool& cache_hit);

void _warm_up_rowset(RowsetMeta& rs_meta, int64_t sync_wait_timeout_ms);
void _warm_up_rowset(RowsetMeta& rs_meta, int64_t table_id, int64_t sync_wait_timeout_ms);
void _recycle_cache(int64_t tablet_id, const std::vector<RecycledRowsets>& rowsets);

void submit_download_tasks(io::Path path, int64_t file_size, io::FileSystemSPtr file_system,
Expand All @@ -133,6 +156,8 @@ class CloudWarmUpManager {
using Cache = std::unordered_map<int64_t, CacheEntry>;
// job_id -> cache
std::unordered_map<int64_t, Cache> _tablet_replica_cache;
// job_id -> table filter (nullopt = cluster-level, no filter)
std::unordered_map<int64_t, EventDrivenJobFilter> _event_driven_filters;
std::unique_ptr<ThreadPool> _thread_pool;
std::unique_ptr<ThreadPoolToken> _thread_pool_token;

Expand Down
82 changes: 82 additions & 0 deletions be/src/cloud/cloud_warmup_metrics.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "cloud/cloud_warmup_metrics.h"

#include <algorithm>

namespace doris {

WarmUpEdDownstreamProgressTracker g_warmup_ed_downstream_progress_tracker;

void WarmUpEdDownstreamProgressTracker::record_task_submit(const std::string& job_id_str,
int64_t upstream_trigger_ts_ms) {
if (upstream_trigger_ts_ms <= 0) {
return;
}
std::lock_guard lock(_mtx);
auto& progress = _progress_by_job[job_id_str];
++progress.pending_trigger_ts_counts[upstream_trigger_ts_ms];
}

void WarmUpEdDownstreamProgressTracker::record_task_done(const std::string& job_id_str,
int64_t upstream_trigger_ts_ms) {
if (upstream_trigger_ts_ms <= 0) {
return;
}
std::lock_guard lock(_mtx);
auto& progress = _progress_by_job[job_id_str];
auto pending_it = progress.pending_trigger_ts_counts.find(upstream_trigger_ts_ms);
if (pending_it != progress.pending_trigger_ts_counts.end()) {
--pending_it->second;
if (pending_it->second <= 0) {
progress.pending_trigger_ts_counts.erase(pending_it);
}
}
progress.last_finished_trigger_ts =
std::max(progress.last_finished_trigger_ts, upstream_trigger_ts_ms);
}

int64_t WarmUpEdDownstreamProgressTracker::get_progress_ts(const std::string& job_id_str) const {
std::lock_guard lock(_mtx);
auto progress_it = _progress_by_job.find(job_id_str);
if (progress_it == _progress_by_job.end()) {
return 0;
}
const auto& progress = progress_it->second;
if (!progress.pending_trigger_ts_counts.empty()) {
return progress.pending_trigger_ts_counts.begin()->first;
}
return progress.last_finished_trigger_ts;
}

std::vector<std::string> WarmUpEdDownstreamProgressTracker::list_job_ids() const {
std::lock_guard lock(_mtx);
std::vector<std::string> job_ids;
job_ids.reserve(_progress_by_job.size());
for (const auto& entry : _progress_by_job) {
job_ids.emplace_back(entry.first);
}
return job_ids;
}

void WarmUpEdDownstreamProgressTracker::reset_for_test() {
std::lock_guard lock(_mtx);
_progress_by_job.clear();
}

} // namespace doris
76 changes: 76 additions & 0 deletions be/src/cloud/cloud_warmup_metrics.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <bvar/bvar.h>
#include <bvar/multi_dimension.h>

#include <cstdint>
#include <map>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>

#include "util/bvar_windowed_adder.h"

namespace doris {

// Source BE metrics keyed by job_id (defined in cloud_warm_up_manager.cpp).
extern MBvarWindowedAdder g_warmup_ed_requested_segment_num;
extern MBvarWindowedAdder g_warmup_ed_requested_segment_size;
extern MBvarWindowedAdder g_warmup_ed_requested_index_num;
extern MBvarWindowedAdder g_warmup_ed_requested_index_size;
extern bvar::MultiDimension<bvar::Status<int64_t>> g_warmup_ed_last_trigger_ts;

// Target BE metrics keyed by job_id (defined in cloud_internal_service.cpp).
extern MBvarWindowedAdder g_warmup_ed_finish_segment_num;
extern MBvarWindowedAdder g_warmup_ed_finish_segment_size;
extern MBvarWindowedAdder g_warmup_ed_finish_index_num;
extern MBvarWindowedAdder g_warmup_ed_finish_index_size;
extern MBvarWindowedAdder g_warmup_ed_fail_segment_num;
extern MBvarWindowedAdder g_warmup_ed_fail_segment_size;
extern MBvarWindowedAdder g_warmup_ed_fail_index_num;
extern MBvarWindowedAdder g_warmup_ed_fail_index_size;
extern bvar::MultiDimension<bvar::Status<int64_t>> g_warmup_ed_last_finish_ts;

// Tracks the target BE's event-driven warm-up progress by upstream trigger timestamp.
// If there are unfinished downloads for a job, progress is the earliest pending upstream trigger
// time. If the job has no pending downloads, progress falls back to the latest completed upstream
// trigger time, so FE can report a zero trigger gap once the target side catches up.
class WarmUpEdDownstreamProgressTracker {
public:
void record_task_submit(const std::string& job_id_str, int64_t upstream_trigger_ts_ms);
void record_task_done(const std::string& job_id_str, int64_t upstream_trigger_ts_ms);
int64_t get_progress_ts(const std::string& job_id_str) const;
std::vector<std::string> list_job_ids() const;
void reset_for_test();

private:
struct JobProgress {
std::map<int64_t, int64_t> pending_trigger_ts_counts;
int64_t last_finished_trigger_ts = 0;
};

mutable std::mutex _mtx;
std::unordered_map<std::string, JobProgress> _progress_by_job;
};

extern WarmUpEdDownstreamProgressTracker g_warmup_ed_downstream_progress_tracker;

} // namespace doris
Loading
Loading