Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -998,18 +998,6 @@ void update_tablet_meta_callback(StorageEngine& engine, const TAgentTaskRequest&
tablet->set_binlog_config(new_binlog_config);
need_to_save = true;
}
if (tablet_meta_info.__isset.enable_single_replica_compaction) {
std::shared_lock rlock(tablet->get_header_lock());
tablet->tablet_meta()->mutable_tablet_schema()->set_enable_single_replica_compaction(
tablet_meta_info.enable_single_replica_compaction);
for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) {
rowset_meta->tablet_schema()->set_enable_single_replica_compaction(
tablet_meta_info.enable_single_replica_compaction);
}
tablet->tablet_schema_unlocked()->set_enable_single_replica_compaction(
tablet_meta_info.enable_single_replica_compaction);
need_to_save = true;
}
if (tablet_meta_info.__isset.disable_auto_compaction) {
std::shared_lock rlock(tablet->get_header_lock());
tablet->tablet_meta()->mutable_tablet_schema()->set_disable_auto_compaction(
Expand Down
4 changes: 0 additions & 4 deletions be/src/cloud/pb_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ void doris_tablet_schema_to_cloud(TabletSchemaCloudPB* out, const TabletSchemaPB
out->mutable_index()->CopyFrom(in.index());
out->set_version_col_idx(in.version_col_idx());
out->set_store_row_column(in.store_row_column());
out->set_enable_single_replica_compaction(in.enable_single_replica_compaction());
out->set_skip_write_index_on_load(in.skip_write_index_on_load());
out->mutable_cluster_key_uids()->CopyFrom(in.cluster_key_uids());
out->set_is_dynamic_schema(in.is_dynamic_schema());
Expand Down Expand Up @@ -457,7 +456,6 @@ void doris_tablet_schema_to_cloud(TabletSchemaCloudPB* out, TabletSchemaPB&& in)
out->mutable_index()->Swap(in.mutable_index());
out->set_version_col_idx(in.version_col_idx());
out->set_store_row_column(in.store_row_column());
out->set_enable_single_replica_compaction(in.enable_single_replica_compaction());
out->set_skip_write_index_on_load(in.skip_write_index_on_load());
out->mutable_cluster_key_uids()->Swap(in.mutable_cluster_key_uids());
out->set_is_dynamic_schema(in.is_dynamic_schema());
Expand Down Expand Up @@ -514,7 +512,6 @@ void cloud_tablet_schema_to_doris(TabletSchemaPB* out, const TabletSchemaCloudPB
out->mutable_index()->CopyFrom(in.index());
out->set_version_col_idx(in.version_col_idx());
out->set_store_row_column(in.store_row_column());
out->set_enable_single_replica_compaction(in.enable_single_replica_compaction());
out->set_skip_write_index_on_load(in.skip_write_index_on_load());
out->mutable_cluster_key_uids()->CopyFrom(in.cluster_key_uids());
out->set_is_dynamic_schema(in.is_dynamic_schema());
Expand Down Expand Up @@ -559,7 +556,6 @@ void cloud_tablet_schema_to_doris(TabletSchemaPB* out, TabletSchemaCloudPB&& in)
out->mutable_index()->Swap(in.mutable_index());
out->set_version_col_idx(in.version_col_idx());
out->set_store_row_column(in.store_row_column());
out->set_enable_single_replica_compaction(in.enable_single_replica_compaction());
out->set_skip_write_index_on_load(in.skip_write_index_on_load());
out->mutable_cluster_key_uids()->Swap(in.mutable_cluster_key_uids());
out->set_is_dynamic_schema(in.is_dynamic_schema());
Expand Down
4 changes: 0 additions & 4 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,6 @@ DEFINE_mInt32(ordered_data_compaction_min_segment_size, "10485760");
// This config can be set to limit thread number in compaction thread pool.
DEFINE_mInt32(max_base_compaction_threads, "4");
DEFINE_mInt32(max_cumu_compaction_threads, "-1");
DEFINE_mInt32(max_single_replica_compaction_threads, "-1");

DEFINE_Bool(enable_base_compaction_idle_sched, "true");
DEFINE_mInt64(base_compaction_min_rowset_num, "5");
Expand Down Expand Up @@ -515,9 +514,6 @@ DEFINE_mInt64(total_permits_for_compaction_score, "1000000");
// sleep interval in ms after generated compaction tasks
DEFINE_mInt32(generate_compaction_tasks_interval_ms, "100");

// sleep interval in second after update replica infos
DEFINE_mInt32(update_replica_infos_interval_seconds, "60");

// Compaction task number per disk.
// Must be greater than 2, because Base compaction and Cumulative compaction have at least one thread each.
DEFINE_mInt32(compaction_task_num_per_disk, "4");
Expand Down
3 changes: 0 additions & 3 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,6 @@ DECLARE_mInt32(ordered_data_compaction_min_segment_size);
// This config can be set to limit thread number in compaction thread pool.
DECLARE_mInt32(max_base_compaction_threads);
DECLARE_mInt32(max_cumu_compaction_threads);
DECLARE_mInt32(max_single_replica_compaction_threads);

DECLARE_Bool(enable_base_compaction_idle_sched);
DECLARE_mInt64(base_compaction_min_rowset_num);
Expand Down Expand Up @@ -584,8 +583,6 @@ DECLARE_mInt64(total_permits_for_compaction_score);

// sleep interval in ms after generated compaction tasks
DECLARE_mInt32(generate_compaction_tasks_interval_ms);
// sleep interval in second after update replica infos
DECLARE_mInt32(update_replica_infos_interval_seconds);

// Compaction task number per disk.
// Must be greater than 2, because Base compaction and Cumulative compaction have at least one thread each.
Expand Down
6 changes: 0 additions & 6 deletions be/src/common/metrics/doris_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ DEFINE_ENGINE_COUNTER_METRIC(finish_task_requests_total, finish_task, total);
DEFINE_ENGINE_COUNTER_METRIC(finish_task_requests_failed, finish_task, failed);
DEFINE_ENGINE_COUNTER_METRIC(base_compaction_request_total, base_compaction, total);
DEFINE_ENGINE_COUNTER_METRIC(base_compaction_request_failed, base_compaction, failed);
DEFINE_ENGINE_COUNTER_METRIC(single_compaction_request_total, single_compaction, total);
DEFINE_ENGINE_COUNTER_METRIC(single_compaction_request_failed, single_compaction, failed);
DEFINE_ENGINE_COUNTER_METRIC(single_compaction_request_cancelled, single_compaction, cancelled);
DEFINE_ENGINE_COUNTER_METRIC(cumulative_compaction_request_total, cumulative_compaction, total);
DEFINE_ENGINE_COUNTER_METRIC(cumulative_compaction_request_failed, cumulative_compaction, failed);
DEFINE_ENGINE_COUNTER_METRIC(publish_task_request_total, publish, total);
Expand Down Expand Up @@ -309,9 +306,6 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, base_compaction_request_failed);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, cumulative_compaction_request_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, cumulative_compaction_request_failed);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, single_compaction_request_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, single_compaction_request_failed);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, single_compaction_request_cancelled);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, publish_task_request_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, publish_task_failed_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, alter_inverted_index_requests_total);
Expand Down
3 changes: 0 additions & 3 deletions be/src/common/metrics/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,6 @@ class DorisMetrics {
IntCounter* base_compaction_request_failed = nullptr;
IntCounter* cumulative_compaction_request_total = nullptr;
IntCounter* cumulative_compaction_request_failed = nullptr;
IntCounter* single_compaction_request_total = nullptr;
IntCounter* single_compaction_request_failed = nullptr;
IntCounter* single_compaction_request_cancelled = nullptr;

IntCounter* local_compaction_read_rows_total = nullptr;
IntCounter* local_compaction_read_bytes_total = nullptr;
Expand Down
30 changes: 4 additions & 26 deletions be/src/service/http/action/compaction_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
#include "storage/compaction/cumulative_compaction.h"
#include "storage/compaction/cumulative_compaction_policy.h"
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
#include "storage/compaction/single_replica_compaction.h"
#include "storage/compaction_task_tracker.h"
#include "storage/olap_define.h"
#include "storage/storage_engine.h"
Expand Down Expand Up @@ -140,15 +139,6 @@ Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j
return Status::NotSupported("The compaction type '{}' is not supported", compaction_type);
}

// "remote" = "true" means tablet should do single replica compaction to fetch rowset from peer
bool fetch_from_remote = false;
std::string param_remote = req->param(PARAM_COMPACTION_REMOTE);
if (param_remote == "true") {
fetch_from_remote = true;
} else if (!param_remote.empty() && param_remote != "false") {
return Status::NotSupported("The remote = '{}' is not supported", param_remote);
}

// "force" = "true" means skip permit limiter when submitting full compaction to thread pool
bool force = false;
std::string param_force = req->param(PARAM_COMPACTION_FORCE);
Expand All @@ -173,9 +163,6 @@ Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j
return Status::NotFound("Tablet not found. tablet_id={}", tablet_id);
}

if (fetch_from_remote && !tablet->should_fetch_from_peer()) {
return Status::NotSupported("tablet should do compaction locally");
}
DBUG_EXECUTE_IF("CompactionAction._handle_run_compaction.submit_cumu_task", {
RETURN_IF_ERROR(_engine.submit_compaction_task(
tablet, CompactionType::CUMULATIVE_COMPACTION, false));
Expand All @@ -193,8 +180,8 @@ Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j
force, true, 1));
} else {
// 3. execute base/cumulative compaction task in a detached thread
std::packaged_task<Status()> task([this, tablet, compaction_type, fetch_from_remote]() {
return _execute_compaction_callback(tablet, compaction_type, fetch_from_remote);
std::packaged_task<Status()> task([this, tablet, compaction_type]() {
return _execute_compaction_callback(tablet, compaction_type);
});
std::future<Status> future_obj = task.get_future();
std::thread(std::move(task)).detach();
Expand Down Expand Up @@ -303,8 +290,7 @@ Status CompactionAction::_handle_run_status_compaction(HttpRequest* req, std::st
}

Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
const std::string& compaction_type,
bool fetch_from_remote) {
const std::string& compaction_type) {
MonotonicStopWatch timer;
timer.start();

Expand Down Expand Up @@ -362,15 +348,7 @@ Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
}
}
} else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) {
if (fetch_from_remote) {
SingleReplicaCompaction single_compaction(_engine, tablet,
CompactionType::CUMULATIVE_COMPACTION);
res = do_compact(single_compaction, CompactionProfileType::CUMULATIVE);
if (!res) {
LOG(WARNING) << "failed to do single compaction. res=" << res
<< ", table=" << tablet->tablet_id();
}
} else {
{
CumulativeCompaction cumulative_compaction(_engine, tablet);
res = do_compact(cumulative_compaction, CompactionProfileType::CUMULATIVE);
if (!res) {
Expand Down
4 changes: 1 addition & 3 deletions be/src/service/http/action/compaction_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ const std::string PARAM_COMPACTION_TYPE = "compact_type";
const std::string PARAM_COMPACTION_BASE = "base";
const std::string PARAM_COMPACTION_CUMULATIVE = "cumulative";
const std::string PARAM_COMPACTION_FULL = "full";
const std::string PARAM_COMPACTION_REMOTE = "remote";
const std::string PARAM_COMPACTION_FORCE = "force";

/// This action is used for viewing the compaction status.
Expand All @@ -62,8 +61,7 @@ class CompactionAction : public HttpHandlerWithAuth {
Status _handle_run_compaction(HttpRequest* req, std::string* json_result);

/// thread callback function for the tablet to do base/cumulative compaction
Status _execute_compaction_callback(TabletSharedPtr tablet, const std::string& compaction_type,
bool fetch_from_remote);
Status _execute_compaction_callback(TabletSharedPtr tablet, const std::string& compaction_type);

/// fetch compaction running status
Status _handle_run_status_compaction(HttpRequest* req, std::string* json_result);
Expand Down
Loading
Loading