Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1002,18 +1002,6 @@ void update_tablet_meta_callback(StorageEngine& engine, const TAgentTaskRequest&
tablet->set_binlog_config(new_binlog_config);
need_to_save = true;
}
if (tablet_meta_info.__isset.enable_single_replica_compaction) {
std::shared_lock rlock(tablet->get_header_lock());
tablet->tablet_meta()->mutable_tablet_schema()->set_enable_single_replica_compaction(
tablet_meta_info.enable_single_replica_compaction);
for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) {
rowset_meta->tablet_schema()->set_enable_single_replica_compaction(
tablet_meta_info.enable_single_replica_compaction);
}
tablet->tablet_schema_unlocked()->set_enable_single_replica_compaction(
tablet_meta_info.enable_single_replica_compaction);
need_to_save = true;
}
if (tablet_meta_info.__isset.disable_auto_compaction) {
std::shared_lock rlock(tablet->get_header_lock());
tablet->tablet_meta()->mutable_tablet_schema()->set_disable_auto_compaction(
Expand Down
4 changes: 0 additions & 4 deletions be/src/cloud/pb_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,6 @@ void doris_tablet_schema_to_cloud(TabletSchemaCloudPB* out, const TabletSchemaPB
out->mutable_index()->CopyFrom(in.index());
out->set_version_col_idx(in.version_col_idx());
out->set_store_row_column(in.store_row_column());
out->set_enable_single_replica_compaction(in.enable_single_replica_compaction());
out->set_skip_write_index_on_load(in.skip_write_index_on_load());
out->mutable_cluster_key_uids()->CopyFrom(in.cluster_key_uids());
out->set_is_dynamic_schema(in.is_dynamic_schema());
Expand Down Expand Up @@ -510,7 +509,6 @@ void doris_tablet_schema_to_cloud(TabletSchemaCloudPB* out, TabletSchemaPB&& in)
out->mutable_index()->Swap(in.mutable_index());
out->set_version_col_idx(in.version_col_idx());
out->set_store_row_column(in.store_row_column());
out->set_enable_single_replica_compaction(in.enable_single_replica_compaction());
out->set_skip_write_index_on_load(in.skip_write_index_on_load());
out->mutable_cluster_key_uids()->Swap(in.mutable_cluster_key_uids());
out->set_is_dynamic_schema(in.is_dynamic_schema());
Expand Down Expand Up @@ -573,7 +571,6 @@ void cloud_tablet_schema_to_doris(TabletSchemaPB* out, const TabletSchemaCloudPB
out->mutable_index()->CopyFrom(in.index());
out->set_version_col_idx(in.version_col_idx());
out->set_store_row_column(in.store_row_column());
out->set_enable_single_replica_compaction(in.enable_single_replica_compaction());
out->set_skip_write_index_on_load(in.skip_write_index_on_load());
out->mutable_cluster_key_uids()->CopyFrom(in.cluster_key_uids());
out->set_is_dynamic_schema(in.is_dynamic_schema());
Expand Down Expand Up @@ -624,7 +621,6 @@ void cloud_tablet_schema_to_doris(TabletSchemaPB* out, TabletSchemaCloudPB&& in)
out->mutable_index()->Swap(in.mutable_index());
out->set_version_col_idx(in.version_col_idx());
out->set_store_row_column(in.store_row_column());
out->set_enable_single_replica_compaction(in.enable_single_replica_compaction());
out->set_skip_write_index_on_load(in.skip_write_index_on_load());
out->mutable_cluster_key_uids()->Swap(in.mutable_cluster_key_uids());
out->set_is_dynamic_schema(in.is_dynamic_schema());
Expand Down
4 changes: 0 additions & 4 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,6 @@ DEFINE_mInt32(ordered_data_compaction_min_segment_size, "10485760");
// This config can be set to limit thread number in compaction thread pool.
DEFINE_mInt32(max_base_compaction_threads, "4");
DEFINE_mInt32(max_cumu_compaction_threads, "-1");
DEFINE_mInt32(max_single_replica_compaction_threads, "-1");

// Binlog Compaction
DEFINE_mInt64(binlog_compaction_wait_timesec_after_visible, "600");
Expand Down Expand Up @@ -530,9 +529,6 @@ DEFINE_mInt64(total_permits_for_compaction_score, "1000000");
// sleep interval in ms after generated compaction tasks
DEFINE_mInt32(generate_compaction_tasks_interval_ms, "100");

// sleep interval in second after update replica infos
DEFINE_mInt32(update_replica_infos_interval_seconds, "60");

// Compaction task number per disk.
// Must be greater than 2, because Base compaction and Cumulative compaction have at least one thread each.
DEFINE_mInt32(compaction_task_num_per_disk, "4");
Expand Down
3 changes: 0 additions & 3 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,6 @@ DECLARE_mInt32(ordered_data_compaction_min_segment_size);
// This config can be set to limit thread number in compaction thread pool.
DECLARE_mInt32(max_base_compaction_threads);
DECLARE_mInt32(max_cumu_compaction_threads);
DECLARE_mInt32(max_single_replica_compaction_threads);

// Binlog Compaction
DECLARE_mInt64(binlog_compaction_wait_timesec_after_visible);
Expand Down Expand Up @@ -596,8 +595,6 @@ DECLARE_mInt64(total_permits_for_compaction_score);

// sleep interval in ms after generated compaction tasks
DECLARE_mInt32(generate_compaction_tasks_interval_ms);
// sleep interval in second after update replica infos
DECLARE_mInt32(update_replica_infos_interval_seconds);

// Compaction task number per disk.
// Must be greater than 2, because Base compaction and Cumulative compaction have at least one thread each.
Expand Down
6 changes: 0 additions & 6 deletions be/src/common/metrics/doris_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ DEFINE_ENGINE_COUNTER_METRIC(finish_task_requests_total, finish_task, total);
DEFINE_ENGINE_COUNTER_METRIC(finish_task_requests_failed, finish_task, failed);
DEFINE_ENGINE_COUNTER_METRIC(base_compaction_request_total, base_compaction, total);
DEFINE_ENGINE_COUNTER_METRIC(base_compaction_request_failed, base_compaction, failed);
DEFINE_ENGINE_COUNTER_METRIC(single_compaction_request_total, single_compaction, total);
DEFINE_ENGINE_COUNTER_METRIC(single_compaction_request_failed, single_compaction, failed);
DEFINE_ENGINE_COUNTER_METRIC(single_compaction_request_cancelled, single_compaction, cancelled);
DEFINE_ENGINE_COUNTER_METRIC(cumulative_compaction_request_total, cumulative_compaction, total);
DEFINE_ENGINE_COUNTER_METRIC(cumulative_compaction_request_failed, cumulative_compaction, failed);
DEFINE_ENGINE_COUNTER_METRIC(publish_task_request_total, publish, total);
Expand Down Expand Up @@ -322,9 +319,6 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, base_compaction_request_failed);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, cumulative_compaction_request_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, cumulative_compaction_request_failed);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, single_compaction_request_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, single_compaction_request_failed);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, single_compaction_request_cancelled);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, publish_task_request_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, publish_task_failed_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, alter_inverted_index_requests_total);
Expand Down
3 changes: 0 additions & 3 deletions be/src/common/metrics/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,6 @@ class DorisMetrics {
IntCounter* base_compaction_request_failed = nullptr;
IntCounter* cumulative_compaction_request_total = nullptr;
IntCounter* cumulative_compaction_request_failed = nullptr;
IntCounter* single_compaction_request_total = nullptr;
IntCounter* single_compaction_request_failed = nullptr;
IntCounter* single_compaction_request_cancelled = nullptr;

IntCounter* local_compaction_read_rows_total = nullptr;
IntCounter* local_compaction_read_bytes_total = nullptr;
Expand Down
28 changes: 3 additions & 25 deletions be/src/service/http/action/compaction_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
#include "storage/compaction/cumulative_compaction.h"
#include "storage/compaction/cumulative_compaction_policy.h"
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
#include "storage/compaction/single_replica_compaction.h"
#include "storage/compaction_task_tracker.h"
#include "storage/olap_define.h"
#include "storage/storage_engine.h"
Expand Down Expand Up @@ -141,15 +140,6 @@ Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j
return Status::NotSupported("The compaction type '{}' is not supported", compaction_type);
}

// "remote" = "true" means tablet should do single replica compaction to fetch rowset from peer
bool fetch_from_remote = false;
std::string param_remote = req->param(PARAM_COMPACTION_REMOTE);
if (param_remote == "true") {
fetch_from_remote = true;
} else if (!param_remote.empty() && param_remote != "false") {
return Status::NotSupported("The remote = '{}' is not supported", param_remote);
}

// "force" = "true" means skip permit limiter when submitting full compaction to thread pool
bool force = false;
std::string param_force = req->param(PARAM_COMPACTION_FORCE);
Comment thread
csun5285 marked this conversation as resolved.
Expand All @@ -174,9 +164,6 @@ Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j
return Status::NotFound("Tablet not found. tablet_id={}", tablet_id);
}

if (fetch_from_remote && !tablet->should_fetch_from_peer()) {
return Status::NotSupported("tablet should do compaction locally");
}
DBUG_EXECUTE_IF("CompactionAction._handle_run_compaction.submit_cumu_task", {
RETURN_IF_ERROR(_engine.submit_compaction_task(
tablet, CompactionType::CUMULATIVE_COMPACTION, false));
Expand All @@ -194,8 +181,8 @@ Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j
force, true, 1));
} else {
// 3. execute base/cumulative/binlog compaction task in a detached thread
std::packaged_task<Status()> task([this, tablet, compaction_type, fetch_from_remote]() {
return _execute_compaction_callback(tablet, compaction_type, fetch_from_remote);
std::packaged_task<Status()> task([this, tablet, compaction_type]() {
return _execute_compaction_callback(tablet, compaction_type);
});
std::future<Status> future_obj = task.get_future();
std::thread(std::move(task)).detach();
Expand Down Expand Up @@ -319,7 +306,6 @@ Status CompactionAction::_handle_run_status_compaction(HttpRequest* req, std::st

Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
const std::string& compaction_type,
bool fetch_from_remote,
int8_t prefer_compaction_level) {
MonotonicStopWatch timer;
timer.start();
Expand Down Expand Up @@ -378,15 +364,7 @@ Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
}
}
} else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) {
if (fetch_from_remote) {
SingleReplicaCompaction single_compaction(_engine, tablet,
CompactionType::CUMULATIVE_COMPACTION);
res = do_compact(single_compaction, CompactionProfileType::CUMULATIVE);
if (!res) {
LOG(WARNING) << "failed to do single compaction. res=" << res
<< ", table=" << tablet->tablet_id();
}
} else {
{
CumulativeCompaction cumulative_compaction(_engine, tablet);
res = do_compact(cumulative_compaction, CompactionProfileType::CUMULATIVE);
if (!res) {
Expand Down
2 changes: 0 additions & 2 deletions be/src/service/http/action/compaction_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ const std::string PARAM_COMPACTION_BASE = "base";
const std::string PARAM_COMPACTION_CUMULATIVE = "cumulative";
const std::string PARAM_COMPACTION_FULL = "full";
const std::string PARAM_COMPACTION_BINLOG = "binlog";
const std::string PARAM_COMPACTION_REMOTE = "remote";
const std::string PARAM_COMPACTION_FORCE = "force";

/// This action is used for viewing the compaction status.
Expand All @@ -64,7 +63,6 @@ class CompactionAction : public HttpHandlerWithAuth {

/// thread callback function for the tablet to do base/cumulative/binlog compaction
Status _execute_compaction_callback(TabletSharedPtr tablet, const std::string& compaction_type,
bool fetch_from_remote,
int8_t prefer_compaction_level = -1);

/// fetch compaction running status
Expand Down
Loading
Loading