Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
IPRPSDetector,
)
from detectors.tft import (
TFtAccumulativeTimeDetector,
TFtAvgTimeDetector,
TFtErrorRequestDetector,
TFtRPSDetector,
)
Expand All @@ -33,9 +33,6 @@
__copyright__ = "Copyright (C) 2023-2025 Tempesta Technologies, Inc."
__license__ = "GPL2"

import asyncio
import signal


shutdown_task = None

Expand Down Expand Up @@ -126,7 +123,7 @@ async def main(app_config):
intersection_percent=app_config.detector_tft_rps_intersection_percent,
block_users_per_iteration=app_config.detector_tft_rps_block_users_per_iteration,
),
TFtAccumulativeTimeDetector.name(): TFtAccumulativeTimeDetector(
TFtAvgTimeDetector.name(): TFtAvgTimeDetector(
access_log=clickhouse_client,
default_threshold=app_config.detector_tft_time_default_threshold,
intersection_percent=app_config.detector_tft_time_intersection_percent,
Expand Down
35 changes: 26 additions & 9 deletions detectors/tft.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,40 @@ def get_request(self, start_at, finish_at):
)


class TFtAccumulativeTimeDetector(TFtRPSDetector):
class TFtAvgTimeDetector(TFtRPSDetector):

@staticmethod
def name() -> str:
return "tft_time"
return "tft_avg_time"

def get_request(self, start_at, finish_at):
return self.shared_filter(
f"""
, top_accumulative_response_time as (
SELECT
tft,
groupUniqArray(tfh) tfh,
groupUniqArray(address) address,
sum(response_time) value
FROM prepared_users
GROUP by tft
),
top_rps as (
SELECT
tft,
count() value
FROM prepared_users
GROUP by tft
)
SELECT
array(tft) tft,
groupUniqArray(tfh) tfh,
groupUniqArray(address) address,
sum(response_time) value
FROM prepared_users
GROUP by tft
HAVING
array(tart.tft) tft,
tart.tfh,
tart.address,
tart.value/tr.value value
FROM top_accumulative_response_time tart
JOIN top_rps tr
ON tart.tft = tr.tft
WHERE
value >= {self.threshold}
LIMIT {self.block_limit_per_check}
""",
Expand Down
55 changes: 44 additions & 11 deletions tests/test_detector_tft_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pytest

from detectors.tft import (
TFtAccumulativeTimeDetector,
TFtAvgTimeDetector,
TFtErrorRequestDetector,
TFtRPSDetector,
)
Expand All @@ -28,6 +28,15 @@ async def data(access_log):
"""
)

@pytest.fixture
async def time_data(access_log):
await access_log.conn.query(
"""
insert into access_log values
(cast('1751535007' as DateTime64(3, 'UTC')), '127.0.0.4', 0, 1, 200, 0, 60, 'default', '/', '/', 'UserAgent4', 13, 23, 0)
"""
)


async def test_rps(access_log):
detector = TFtRPSDetector(
Expand Down Expand Up @@ -152,10 +161,16 @@ async def test_errors_forbidden_statuses(access_log):
assert users_after == []


async def test_time(access_log):
detector = TFtAccumulativeTimeDetector(
async def test_time(access_log, time_data):
"""
UserAgent2/3/4 with TFT 13 generates
1. most requests, TFT COUNT = 4
2. top accum response time = TFT SUM(TIME) = 90
3. TFT 13 is in TOP of COUNT and SUM(TIME). AVG TIME = SUM(TIME)/COUNT = 90/4 = 22.5
"""
detector = TFtAvgTimeDetector(
access_log=access_log,
default_threshold=Decimal("15"),
default_threshold=Decimal("20"),
intersection_percent=Decimal("10"),
)
users_before, users_after = await detector.find_users(
Expand All @@ -170,11 +185,18 @@ async def test_time(access_log):
}


async def test_time_with_user_agents(access_log):
async def test_time_with_user_agents(access_log, time_data):
"""
UserAgent2/4 with TFT 13 generates
1. most requests, TFT COUNT = 3
2. top accum response time = TFT SUM(TIME) = 80
3. TFT 13 is in TOP of COUNT and SUM(TIME). AVG TIME = SUM(TIME)/COUNT = 80/3 = 26
"""

await access_log.user_agents_table_insert([["UserAgent"], ["UserAgent3"]])
detector = TFtAccumulativeTimeDetector(
detector = TFtAvgTimeDetector(
access_log=access_log,
default_threshold=Decimal("15"),
default_threshold=Decimal("20"),
intersection_percent=Decimal("10"),
)
users_before, users_after = await detector.find_users(
Expand All @@ -189,19 +211,30 @@ async def test_time_with_user_agents(access_log):
}


async def test_time_with_persistent_users(access_log):
async def test_time_with_persistent_users(access_log, time_data):
"""
UserAgent4 with TFT 13 generates
1. most requests, TFT COUNT = 2
2. top accum response time = TFT SUM(TIME) = 70
3. TFT 13 is in TOP of COUNT and SUM(TIME). AVG TIME = SUM(TIME)/COUNT = 70/2 = 35
"""

await access_log.persistent_users_table_insert(
[
["127.0.0.3"],
]
)
detector = TFtAccumulativeTimeDetector(
detector = TFtAvgTimeDetector(
access_log=access_log,
default_threshold=Decimal("15"),
default_threshold=Decimal("20"),
intersection_percent=Decimal("10"),
)
users_before, users_after = await detector.find_users(
current_time=1751535010, interval=5
)
assert users_before == []
assert users_after == []
assert len(users_after) == 1
assert users_after[0].tft == ['d']
assert set(users_after[0].ip) == {
IPv4Address("127.0.0.4"),
}