diff --git a/tests/api/test_bulk_proxy_settings.py b/tests/api/test_bulk_proxy_settings.py new file mode 100644 index 00000000..7f47f1a4 --- /dev/null +++ b/tests/api/test_bulk_proxy_settings.py @@ -0,0 +1,206 @@ +"""Tests for bulk proxy settings changes in this PR. + +Key changes tested: +- app/operation/user.py: bulk_modify_proxy_settings now returns 400 when method is None +- app/db/crud/bulk.py: flow update logic removed (only method is supported) +- Endpoint: POST /api/users/bulk/proxy_settings +""" +from fastapi import status + +from tests.api import client +from tests.api.helpers import ( + create_core, + create_group, + create_user, + delete_core, + delete_group, + delete_user, + unique_name, +) + + +def _setup_groups(access_token: str, count: int = 1): + core = create_core(access_token) + groups = [create_group(access_token, name=unique_name(f"bps_group_{idx}")) for idx in range(count)] + return core, groups + + +def _cleanup(access_token: str, core: dict, groups: list, users: list): + for user in users: + delete_user(access_token, user["username"]) + for group in groups: + delete_group(access_token, group["id"]) + delete_core(access_token, core["id"]) + + +def _auth(access_token: str) -> dict: + return {"Authorization": f"Bearer {access_token}"} + + +# --------------------------------------------------------------------------- +# Validation: method is now required +# --------------------------------------------------------------------------- + + +def test_bulk_proxy_settings_empty_body_returns_400(access_token): + """No proxy settings provided at all should result in 400.""" + response = client.post( + "/api/users/bulk/proxy_settings", + headers=_auth(access_token), + json={}, + ) + assert response.status_code == status.HTTP_400_BAD_REQUEST + + +def test_bulk_proxy_settings_null_method_returns_400(access_token): + """Explicitly null method should result in 400.""" + response = client.post( + "/api/users/bulk/proxy_settings", + headers=_auth(access_token), + json={"method": None}, + ) + assert response.status_code == status.HTTP_400_BAD_REQUEST + + +def test_bulk_proxy_settings_error_message_is_informative(access_token): + """The 400 error response body should describe the problem.""" + response = client.post( + "/api/users/bulk/proxy_settings", + headers=_auth(access_token), + json={}, + ) + assert response.status_code == status.HTTP_400_BAD_REQUEST + body = response.json() + detail = body.get("detail", "") + assert "proxy" in detail.lower() or "settings" in detail.lower() + + +# --------------------------------------------------------------------------- +# Flow field removed — sending only flow now returns 400 (method missing) +# or 422 (unknown field) +# --------------------------------------------------------------------------- + + +def test_bulk_proxy_settings_flow_only_not_accepted(access_token): + """Sending only a flow value (no method) must not succeed. + + After this PR, flow is removed from BulkUsersProxy. The endpoint should + return 400 (no supported proxy settings provided) or 422 (validation error). + """ + response = client.post( + "/api/users/bulk/proxy_settings", + headers=_auth(access_token), + json={"flow": "xtls-rprx-vision"}, + ) + assert response.status_code in ( + status.HTTP_400_BAD_REQUEST, + status.HTTP_422_UNPROCESSABLE_ENTITY, + ), f"Expected 400 or 422 when only flow is provided, got {response.status_code}" + + +# --------------------------------------------------------------------------- +# Successful operations +# --------------------------------------------------------------------------- + + +def test_bulk_proxy_settings_valid_method_succeeds(access_token): + """A valid method should be accepted and return 200.""" + core, groups = _setup_groups(access_token, 1) + users = [ + create_user( + access_token, + group_ids=[groups[0]["id"]], + payload={"username": unique_name("bps_ok1")}, + ), + create_user( + access_token, + group_ids=[groups[0]["id"]], + payload={"username": unique_name("bps_ok2")}, + ), + ] + try: + response = client.post( + "/api/users/bulk/proxy_settings", + headers=_auth(access_token), + json={"method": "aes-256-gcm"}, + ) + assert response.status_code == status.HTTP_200_OK + finally: + _cleanup(access_token, core, groups, users) + + +def test_bulk_proxy_settings_applies_method_to_user(access_token): + """The chosen method should be persisted on the user's proxy_settings.""" + core, groups = _setup_groups(access_token, 1) + user = create_user( + access_token, + group_ids=[groups[0]["id"]], + payload={"username": unique_name("bps_apply")}, + ) + try: + client.post( + "/api/users/bulk/proxy_settings", + headers=_auth(access_token), + json={"method": "aes-128-gcm", "users": [user["username"]]}, + ) + get_resp = client.get( + f"/api/user/{user['username']}", + headers=_auth(access_token), + ) + assert get_resp.status_code == status.HTTP_200_OK + updated = get_resp.json() + assert updated["proxy_settings"]["shadowsocks"]["method"] == "aes-128-gcm" + finally: + _cleanup(access_token, core, groups, [user]) + + +def test_bulk_proxy_settings_all_valid_methods_accepted(access_token): + """All four ShadowsocksMethods values must be individually accepted.""" + valid_methods = [ + "aes-128-gcm", + "aes-256-gcm", + "chacha20-ietf-poly1305", + "xchacha20-poly1305", + ] + for method in valid_methods: + response = client.post( + "/api/users/bulk/proxy_settings", + headers=_auth(access_token), + json={"method": method}, + ) + assert response.status_code == status.HTTP_200_OK, ( + f"Method '{method}' should be accepted but got {response.status_code}" + ) + + +def test_bulk_proxy_settings_dry_run(access_token): + """dry_run=True should return affected count without persisting changes.""" + core, groups = _setup_groups(access_token, 1) + user = create_user( + access_token, + group_ids=[groups[0]["id"]], + payload={"username": unique_name("bps_dry")}, + ) + try: + response = client.post( + "/api/users/bulk/proxy_settings", + headers=_auth(access_token), + json={"method": "aes-256-gcm", "dry_run": True}, + ) + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert "affected_users" in data + assert data.get("dry_run") is True + assert data["affected_users"] >= 1 + finally: + _cleanup(access_token, core, groups, [user]) + + +def test_bulk_proxy_settings_invalid_method_returns_422(access_token): + """An invalid cipher string must return 422 Unprocessable Entity.""" + response = client.post( + "/api/users/bulk/proxy_settings", + headers=_auth(access_token), + json={"method": "totally-invalid-cipher"}, + ) + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY \ No newline at end of file diff --git a/tests/test_core_protocols.py b/tests/test_core_protocols.py new file mode 100644 index 00000000..0dfe8c12 --- /dev/null +++ b/tests/test_core_protocols.py @@ -0,0 +1,264 @@ +"""Tests for the protocols property added to WireGuardConfig and XRayConfig, +and the _protocols_from_inbounds_by_tag helper function. + +Covered changes: +- app/core/abstract_core.py: protocols abstract property +- app/core/wireguard.py: protocols property returning frozenset({ProxyProtocol.wireguard}) +- app/core/xray.py: _protocols_from_inbounds_by_tag + XRayConfig.protocols property +""" +import pytest + +from app.models.protocol import ProxyProtocol +from app.core.xray import XRayConfig, _protocols_from_inbounds_by_tag +from app.core.wireguard import WireGuardConfig + + +# --------------------------------------------------------------------------- +# _protocols_from_inbounds_by_tag +# --------------------------------------------------------------------------- + +class TestProtocolsFromInboundsByTag: + """Unit tests for the pure helper function.""" + + def test_empty_dict_returns_empty_frozenset(self): + result = _protocols_from_inbounds_by_tag({}) + assert result == frozenset() + + def test_single_known_protocol(self): + inbounds = {"tag1": {"protocol": "vmess"}} + result = _protocols_from_inbounds_by_tag(inbounds) + assert result == frozenset({ProxyProtocol.vmess}) + + def test_multiple_different_protocols(self): + inbounds = { + "a": {"protocol": "vmess"}, + "b": {"protocol": "vless"}, + "c": {"protocol": "trojan"}, + } + result = _protocols_from_inbounds_by_tag(inbounds) + assert result == frozenset({ProxyProtocol.vmess, ProxyProtocol.vless, ProxyProtocol.trojan}) + + def test_all_six_protocols(self): + inbounds = { + "a": {"protocol": "vmess"}, + "b": {"protocol": "vless"}, + "c": {"protocol": "trojan"}, + "d": {"protocol": "shadowsocks"}, + "e": {"protocol": "wireguard"}, + "f": {"protocol": "hysteria"}, + } + result = _protocols_from_inbounds_by_tag(inbounds) + assert result == frozenset(ProxyProtocol) + + def test_unknown_protocol_is_skipped(self): + inbounds = { + "a": {"protocol": "unknown_proto"}, + "b": {"protocol": "vmess"}, + } + result = _protocols_from_inbounds_by_tag(inbounds) + assert result == frozenset({ProxyProtocol.vmess}) + + def test_all_unknown_protocols_returns_empty(self): + inbounds = { + "a": {"protocol": "http"}, + "b": {"protocol": "dns"}, + } + result = _protocols_from_inbounds_by_tag(inbounds) + assert result == frozenset() + + def test_duplicate_protocols_deduplicated(self): + inbounds = { + "tag1": {"protocol": "vless"}, + "tag2": {"protocol": "vless"}, + "tag3": {"protocol": "vless"}, + } + result = _protocols_from_inbounds_by_tag(inbounds) + assert result == frozenset({ProxyProtocol.vless}) + assert len(result) == 1 + + def test_returns_frozenset(self): + result = _protocols_from_inbounds_by_tag({"a": {"protocol": "trojan"}}) + assert isinstance(result, frozenset) + + def test_mixed_known_and_unknown(self): + inbounds = { + "a": {"protocol": "shadowsocks"}, + "b": {"protocol": "not-real"}, + "c": {"protocol": "hysteria"}, + } + result = _protocols_from_inbounds_by_tag(inbounds) + assert ProxyProtocol.shadowsocks in result + assert ProxyProtocol.hysteria in result + assert len(result) == 2 + + +# --------------------------------------------------------------------------- +# WireGuardConfig.protocols +# --------------------------------------------------------------------------- + +class TestWireGuardConfigProtocols: + """WireGuardConfig.protocols must always return frozenset({ProxyProtocol.wireguard}).""" + + def _make_wg_config(self) -> WireGuardConfig: + """Create a WireGuardConfig with skip_validation to avoid needing real keys.""" + return WireGuardConfig( + config={ + "interface_name": "wg0", + "private_key": "fake_key", + "listen_port": 51820, + "address": ["10.0.0.1/24"], + }, + skip_validation=True, + ) + + def test_protocols_returns_frozenset(self): + wg = self._make_wg_config() + assert isinstance(wg.protocols, frozenset) + + def test_protocols_contains_wireguard(self): + wg = self._make_wg_config() + assert ProxyProtocol.wireguard in wg.protocols + + def test_protocols_does_not_contain_other_protocols(self): + wg = self._make_wg_config() + for protocol in ProxyProtocol: + if protocol != ProxyProtocol.wireguard: + assert protocol not in wg.protocols + + def test_protocols_length_is_one(self): + wg = self._make_wg_config() + assert len(wg.protocols) == 1 + + def test_protocols_is_correct_singleton(self): + wg = self._make_wg_config() + assert wg.protocols == frozenset({ProxyProtocol.wireguard}) + + def test_from_json_protocols_is_wireguard(self): + """from_json path should also produce the correct protocols.""" + data = { + "config": {}, + "inbounds": ["wg0"], + "inbounds_by_tag": {"wg0": {"protocol": "wireguard"}}, + } + wg = WireGuardConfig.from_json(data) + # WireGuardConfig.protocols is a fixed class-level constant, not derived from inbounds + assert wg.protocols == frozenset({ProxyProtocol.wireguard}) + + +# --------------------------------------------------------------------------- +# XRayConfig.protocols +# --------------------------------------------------------------------------- + +class TestXRayConfigProtocols: + """XRayConfig.protocols derived from inbounds during _resolve_inbounds.""" + + def _make_xray_config(self, inbounds: list) -> XRayConfig: + """Create XRayConfig with specified inbounds.""" + cfg = XRayConfig( + config={"inbounds": inbounds}, + ) + return cfg + + def _make_xray_config_skip_validation(self, inbounds_by_tag: dict) -> XRayConfig: + """Create XRayConfig using from_json to set inbounds_by_tag directly.""" + data = { + "config": {}, + "inbounds": list(inbounds_by_tag.keys()), + "inbounds_by_tag": inbounds_by_tag, + } + return XRayConfig.from_json(data) + + def test_empty_config_protocols_is_empty(self): + cfg = XRayConfig(config={}, skip_validation=True) + assert cfg.protocols == frozenset() + + def test_from_json_no_inbounds_empty_protocols(self): + cfg = self._make_xray_config_skip_validation({}) + assert cfg.protocols == frozenset() + + def test_from_json_single_vless_inbound(self): + cfg = self._make_xray_config_skip_validation( + {"tag1": {"protocol": "vless"}} + ) + assert cfg.protocols == frozenset({ProxyProtocol.vless}) + + def test_from_json_multiple_protocols(self): + cfg = self._make_xray_config_skip_validation( + { + "tag1": {"protocol": "vmess"}, + "tag2": {"protocol": "trojan"}, + "tag3": {"protocol": "shadowsocks"}, + } + ) + assert cfg.protocols == frozenset({ + ProxyProtocol.vmess, + ProxyProtocol.trojan, + ProxyProtocol.shadowsocks, + }) + + def test_from_json_unknown_protocol_excluded(self): + cfg = self._make_xray_config_skip_validation( + { + "tag1": {"protocol": "vmess"}, + "tag_bad": {"protocol": "unknown-proto"}, + } + ) + assert cfg.protocols == frozenset({ProxyProtocol.vmess}) + + def test_protocols_returns_frozenset(self): + cfg = self._make_xray_config_skip_validation({"t": {"protocol": "hysteria"}}) + assert isinstance(cfg.protocols, frozenset) + + def test_protocols_is_immutable(self): + cfg = self._make_xray_config_skip_validation({"t": {"protocol": "trojan"}}) + # frozenset is immutable – confirm it raises AttributeError on add + with pytest.raises(AttributeError): + cfg.protocols.add(ProxyProtocol.vmess) + + def test_protocols_population_via_resolve_inbounds(self): + """Test that protocols is populated during full config resolution (not skip_validation).""" + # Minimal valid xray config with a vless inbound + config = { + "inbounds": [ + { + "tag": "test-vless", + "listen": "0.0.0.0", + "port": 1080, + "protocol": "vless", + "settings": {"clients": [], "decryption": "none"}, + "streamSettings": {"network": "tcp"}, + } + ] + } + cfg = XRayConfig(config=config) + assert ProxyProtocol.vless in cfg.protocols + + +# --------------------------------------------------------------------------- +# AbstractCore.protocols contract +# --------------------------------------------------------------------------- + +class TestAbstractCoreProtocolsContract: + """Verify that AbstractCore defines protocols as abstract.""" + + def test_abstract_core_has_protocols_abstract_property(self): + from app.core.abstract_core import AbstractCore + import inspect + + # protocols must be listed as an abstract method + abstract_methods = getattr(AbstractCore, "__abstractmethods__", set()) + assert "protocols" in abstract_methods, ( + "AbstractCore.protocols must be declared as an abstract property" + ) + + def test_concrete_classes_implement_protocols(self): + """Both WireGuardConfig and XRayConfig should implement the protocols property.""" + from app.core.abstract_core import AbstractCore + + for cls in (WireGuardConfig, XRayConfig): + assert hasattr(cls, "protocols"), f"{cls.__name__} should have protocols property" + # It should NOT be in abstractmethods (i.e., it must be implemented) + abstract_methods = getattr(cls, "__abstractmethods__", set()) + assert "protocols" not in abstract_methods, ( + f"{cls.__name__}.protocols should be implemented, not abstract" + ) \ No newline at end of file diff --git a/tests/test_models_host.py b/tests/test_models_host.py new file mode 100644 index 00000000..55971716 --- /dev/null +++ b/tests/test_models_host.py @@ -0,0 +1,94 @@ +"""Tests for app/models/host.py — uplink_chunk_size type/pattern change. + +The PR changed XHttpSettings.uplink_chunk_size from: + int | None +to: + str | int | None with pattern r"^\d{1,16}(-\d{1,16})?$" + +and added "uplink_chunk_size" to the _empty_str_to_none validator list, +so empty string is coerced to None before pattern validation. +""" +import pytest +from pydantic import ValidationError + +from app.models.host import XHttpSettings + + +def _make_xhttp(**kwargs) -> XHttpSettings: + """Helper: create XHttpSettings with only the given kwargs.""" + return XHttpSettings(**kwargs) + + +class TestXHttpSettingsUplinkChunkSizeTypes: + """uplink_chunk_size now accepts str, int, or None.""" + + def test_none_is_accepted(self): + s = _make_xhttp(uplink_chunk_size=None) + assert s.uplink_chunk_size is None + + def test_integer_is_accepted(self): + s = _make_xhttp(uplink_chunk_size=1024) + assert s.uplink_chunk_size == 1024 + + def test_numeric_string_is_accepted(self): + s = _make_xhttp(uplink_chunk_size="512") + assert s.uplink_chunk_size == "512" + + def test_range_string_is_accepted(self): + # Pattern: ^\d{1,16}(-\d{1,16})?$ + s = _make_xhttp(uplink_chunk_size="256-1024") + assert s.uplink_chunk_size == "256-1024" + + +class TestXHttpSettingsUplinkChunkSizeEmptyString: + """Empty string must be coerced to None (validator added in this PR).""" + + def test_empty_string_becomes_none(self): + s = _make_xhttp(uplink_chunk_size="") + assert s.uplink_chunk_size is None + + +class TestXHttpSettingsUplinkChunkSizePatternValidation: + """Strings must match ^\d{1,16}(-\d{1,16})?$.""" + + def test_single_number_string(self): + s = _make_xhttp(uplink_chunk_size="100") + assert s.uplink_chunk_size == "100" + + def test_range_string(self): + s = _make_xhttp(uplink_chunk_size="100-200") + assert s.uplink_chunk_size == "100-200" + + def test_large_number_at_boundary(self): + # 16 digits is the max allowed + s = _make_xhttp(uplink_chunk_size="1234567890123456") + assert s.uplink_chunk_size == "1234567890123456" + + def test_invalid_string_with_letters_raises(self): + with pytest.raises(ValidationError): + _make_xhttp(uplink_chunk_size="abc") + + def test_invalid_string_double_dash_raises(self): + with pytest.raises(ValidationError): + _make_xhttp(uplink_chunk_size="100--200") + + def test_invalid_string_leading_dash_raises(self): + with pytest.raises(ValidationError): + _make_xhttp(uplink_chunk_size="-100") + + def test_too_many_digits_raises(self): + # 17 digits should fail the 1-16 digit constraint + with pytest.raises(ValidationError): + _make_xhttp(uplink_chunk_size="12345678901234567") + + +class TestXHttpSettingsOtherFieldsUnchanged: + """Smoke test: other fields on XHttpSettings are unaffected.""" + + def test_default_construction(self): + s = XHttpSettings() + assert s.uplink_chunk_size is None + + def test_sc_max_each_post_bytes_still_accepts_range_string(self): + s = _make_xhttp(sc_max_each_post_bytes="100-200") + assert s.sc_max_each_post_bytes == "100-200" \ No newline at end of file diff --git a/tests/test_models_protocol.py b/tests/test_models_protocol.py new file mode 100644 index 00000000..0a9ba63f --- /dev/null +++ b/tests/test_models_protocol.py @@ -0,0 +1,96 @@ +"""Tests for app/models/protocol.py — new ProxyProtocol enum.""" +import pytest + +from app.models.protocol import ProxyProtocol + + +class TestProxyProtocolValues: + """Verify every enum member exists with the correct integer value.""" + + def test_vmess_value(self): + assert ProxyProtocol.vmess == 1 + + def test_vless_value(self): + assert ProxyProtocol.vless == 2 + + def test_trojan_value(self): + assert ProxyProtocol.trojan == 3 + + def test_shadowsocks_value(self): + assert ProxyProtocol.shadowsocks == 4 + + def test_wireguard_value(self): + assert ProxyProtocol.wireguard == 5 + + def test_hysteria_value(self): + assert ProxyProtocol.hysteria == 6 + + def test_member_count(self): + assert len(ProxyProtocol) == 6 + + def test_is_int_enum(self): + assert isinstance(ProxyProtocol.vmess, int) + + +class TestProxyProtocolFromValue: + """Tests for the ProxyProtocol.from_value classmethod.""" + + def test_from_value_vmess(self): + assert ProxyProtocol.from_value("vmess") is ProxyProtocol.vmess + + def test_from_value_vless(self): + assert ProxyProtocol.from_value("vless") is ProxyProtocol.vless + + def test_from_value_trojan(self): + assert ProxyProtocol.from_value("trojan") is ProxyProtocol.trojan + + def test_from_value_shadowsocks(self): + assert ProxyProtocol.from_value("shadowsocks") is ProxyProtocol.shadowsocks + + def test_from_value_wireguard(self): + assert ProxyProtocol.from_value("wireguard") is ProxyProtocol.wireguard + + def test_from_value_hysteria(self): + assert ProxyProtocol.from_value("hysteria") is ProxyProtocol.hysteria + + def test_from_value_unknown_returns_none(self): + assert ProxyProtocol.from_value("unknown_protocol") is None + + def test_from_value_empty_string_returns_none(self): + assert ProxyProtocol.from_value("") is None + + def test_from_value_case_sensitive_upper_returns_none(self): + # Names are stored lowercase; uppercase should not match + assert ProxyProtocol.from_value("VMESS") is None + + def test_from_value_partial_name_returns_none(self): + assert ProxyProtocol.from_value("vmes") is None + + def test_from_value_integer_string_returns_none(self): + # The lookup is by name, not by numeric value string + assert ProxyProtocol.from_value("1") is None + + def test_all_names_resolvable(self): + """Every enum member name should be resolvable via from_value.""" + for member in ProxyProtocol: + assert ProxyProtocol.from_value(member.name) is member + + +class TestProxyProtocolFrozenset: + """Verify that protocol values can be used in frozensets as expected.""" + + def test_frozenset_membership(self): + protocols = frozenset({ProxyProtocol.vmess, ProxyProtocol.vless}) + assert ProxyProtocol.vmess in protocols + assert ProxyProtocol.trojan not in protocols + + def test_frozenset_of_all_protocols(self): + all_protocols = frozenset(ProxyProtocol) + assert len(all_protocols) == 6 + for member in ProxyProtocol: + assert member in all_protocols + + def test_singleton_frozenset_wireguard(self): + wg = frozenset((ProxyProtocol.wireguard,)) + assert ProxyProtocol.wireguard in wg + assert ProxyProtocol.vmess not in wg \ No newline at end of file diff --git a/tests/test_models_proxy.py b/tests/test_models_proxy.py new file mode 100644 index 00000000..a2cc8440 --- /dev/null +++ b/tests/test_models_proxy.py @@ -0,0 +1,101 @@ +"""Tests for app/models/proxy.py — changes in this PR. + +Specifically: +- XTLSFlows removed (flow no longer exists in VlessSettings) +- VlessSettings no longer has a `flow` field +- ProxyTable no longer carries flow information through VlessSettings +""" +import pytest +from pydantic import ValidationError + +from app.models.proxy import ( + ProxyTable, + ShadowsocksMethods, + VlessSettings, + VMessSettings, +) + + +class TestVlessSettingsNoFlow: + """VlessSettings should no longer contain a flow field.""" + + def test_vless_settings_has_no_flow_attribute(self): + settings = VlessSettings() + assert not hasattr(settings, "flow"), "flow field should not exist on VlessSettings" + + def test_vless_settings_only_has_id(self): + settings = VlessSettings() + fields = set(settings.model_fields_set | set(settings.model_dump().keys())) + assert "id" in fields + assert "flow" not in fields + + def test_vless_settings_ignores_flow_in_input(self): + # Extra fields should be ignored (pydantic default) or raise; either way, no flow attr + try: + settings = VlessSettings(flow="xtls-rprx-vision") + except ValidationError: + # Pydantic strict mode raises – acceptable + return + assert not hasattr(settings, "flow") + + def test_vless_settings_id_is_uuid(self): + import uuid + settings = VlessSettings() + assert isinstance(settings.id, uuid.UUID) + + def test_vless_settings_dict_has_no_flow(self): + settings = VlessSettings() + d = settings.model_dump() + assert "flow" not in d + + +class TestProxyTableNoFlow: + """ProxyTable.vless should not carry any flow field.""" + + def test_proxy_table_vless_no_flow(self): + table = ProxyTable() + assert not hasattr(table.vless, "flow") + + def test_proxy_table_dict_no_flow(self): + table = ProxyTable() + d = table.dict() + assert "flow" not in d.get("vless", {}) + + def test_proxy_table_shadowsocks_default_method(self): + table = ProxyTable() + assert table.shadowsocks.method == ShadowsocksMethods.CHACHA20_POLY1305 + + def test_proxy_table_all_protocols_present(self): + table = ProxyTable() + d = table.dict() + for protocol in ("vmess", "vless", "trojan", "shadowsocks", "wireguard", "hysteria"): + assert protocol in d, f"Protocol '{protocol}' missing from ProxyTable.dict()" + + +class TestXTLSFlowsRemoved: + """XTLSFlows enum should no longer be importable from app.models.proxy.""" + + def test_xtls_flows_not_in_proxy_module(self): + import app.models.proxy as proxy_module + assert not hasattr(proxy_module, "XTLSFlows"), ( + "XTLSFlows was removed from app.models.proxy in this PR and should not be importable" + ) + + +class TestShadowsocksMethodsIntact: + """ShadowsocksMethods enum should still have all four methods.""" + + def test_aes_128_gcm(self): + assert ShadowsocksMethods.AES_128_GCM == "aes-128-gcm" + + def test_aes_256_gcm(self): + assert ShadowsocksMethods.AES_256_GCM == "aes-256-gcm" + + def test_chacha20_poly1305(self): + assert ShadowsocksMethods.CHACHA20_POLY1305 == "chacha20-ietf-poly1305" + + def test_xchacha20_poly1305(self): + assert ShadowsocksMethods.XCHACHA20_POLY1305 == "xchacha20-poly1305" + + def test_member_count(self): + assert len(ShadowsocksMethods) == 4 \ No newline at end of file diff --git a/tests/test_models_settings.py b/tests/test_models_settings.py new file mode 100644 index 00000000..228625ac --- /dev/null +++ b/tests/test_models_settings.py @@ -0,0 +1,49 @@ +"""Tests for app/models/settings.py — General model no longer has default_flow. + +The PR removed: + default_flow: XTLSFlows = Field(default=XTLSFlows.NONE) +from the General model. +""" +import pytest + +from app.models.settings import General +from app.models.proxy import ShadowsocksMethods + + +class TestGeneralModelNoDefaultFlow: + """General should no longer have a default_flow field.""" + + def test_general_has_no_default_flow_attribute(self): + g = General() + assert not hasattr(g, "default_flow"), ( + "General model must not have a default_flow field after this PR" + ) + + def test_general_fields_do_not_include_flow(self): + g = General() + fields = g.model_dump() + assert "default_flow" not in fields + + def test_general_only_has_default_method(self): + g = General() + fields = g.model_dump() + assert set(fields.keys()) == {"default_method"} + + def test_general_default_method_is_chacha20(self): + g = General() + assert g.default_method == ShadowsocksMethods.CHACHA20_POLY1305 + + def test_general_default_method_can_be_set(self): + g = General(default_method=ShadowsocksMethods.AES_256_GCM) + assert g.default_method == ShadowsocksMethods.AES_256_GCM + + def test_general_accepts_all_shadowsocks_methods(self): + for method in ShadowsocksMethods: + g = General(default_method=method) + assert g.default_method == method + + def test_xtls_flows_not_importable_from_settings(self): + import app.models.settings as settings_module + assert not hasattr(settings_module, "XTLSFlows"), ( + "XTLSFlows should not be importable from app.models.settings after this PR" + ) \ No newline at end of file diff --git a/tests/test_models_status_emojis.py b/tests/test_models_status_emojis.py new file mode 100644 index 00000000..ac6b0b5a --- /dev/null +++ b/tests/test_models_status_emojis.py @@ -0,0 +1,43 @@ +"""Tests for app/models/status_emojis.py — new STATUS_EMOJIS constant.""" +import pytest + +from app.models.status_emojis import STATUS_EMOJIS + + +class TestStatusEmojis: + """Verify STATUS_EMOJIS has the expected structure and values.""" + + EXPECTED_KEYS = {"active", "expired", "limited", "disabled", "on_hold"} + + def test_is_dict(self): + assert isinstance(STATUS_EMOJIS, dict) + + def test_contains_all_expected_keys(self): + assert set(STATUS_EMOJIS.keys()) == self.EXPECTED_KEYS + + def test_active_emoji(self): + assert STATUS_EMOJIS["active"] == "✅" + + def test_expired_emoji(self): + assert STATUS_EMOJIS["expired"] == "⌛️" + + def test_limited_emoji(self): + assert STATUS_EMOJIS["limited"] == "🪫" + + def test_disabled_emoji(self): + assert STATUS_EMOJIS["disabled"] == "❌" + + def test_on_hold_emoji(self): + assert STATUS_EMOJIS["on_hold"] == "🔌" + + def test_all_values_are_non_empty_strings(self): + for key, value in STATUS_EMOJIS.items(): + assert isinstance(value, str), f"Value for '{key}' is not a string" + assert value, f"Value for '{key}' is empty" + + def test_no_extra_keys(self): + assert len(STATUS_EMOJIS) == len(self.EXPECTED_KEYS) + + def test_key_not_present_returns_keyerror(self): + with pytest.raises(KeyError): + _ = STATUS_EMOJIS["unknown_status"] \ No newline at end of file diff --git a/tests/test_models_subscription.py b/tests/test_models_subscription.py new file mode 100644 index 00000000..494e84c6 --- /dev/null +++ b/tests/test_models_subscription.py @@ -0,0 +1,88 @@ +"""Tests for app/models/subscription.py — XHTTPTransportConfig.uplink_chunk_size change. + +The PR changed uplink_chunk_size from: + int | None +to: + str | int | None with pattern r"^\d{1,16}(?:-\d{1,16})?$" +""" +import pytest +from pydantic import ValidationError + +from app.models.subscription import XHTTPTransportConfig + + +def _make_xhttp(**kwargs) -> XHTTPTransportConfig: + return XHTTPTransportConfig(**kwargs) + + +class TestXHTTPTransportConfigUplinkChunkSizeNoneAndInt: + def test_none_is_accepted(self): + cfg = _make_xhttp(uplink_chunk_size=None) + assert cfg.uplink_chunk_size is None + + def test_integer_is_accepted(self): + cfg = _make_xhttp(uplink_chunk_size=2048) + assert cfg.uplink_chunk_size == 2048 + + def test_default_is_none(self): + cfg = XHTTPTransportConfig() + assert cfg.uplink_chunk_size is None + + +class TestXHTTPTransportConfigUplinkChunkSizeString: + def test_numeric_string_accepted(self): + cfg = _make_xhttp(uplink_chunk_size="1024") + assert cfg.uplink_chunk_size == "1024" + + def test_range_string_accepted(self): + cfg = _make_xhttp(uplink_chunk_size="512-2048") + assert cfg.uplink_chunk_size == "512-2048" + + def test_single_digit_accepted(self): + cfg = _make_xhttp(uplink_chunk_size="1") + assert cfg.uplink_chunk_size == "1" + + def test_max_boundary_16_digits(self): + cfg = _make_xhttp(uplink_chunk_size="9999999999999999") + assert cfg.uplink_chunk_size == "9999999999999999" + + def test_range_max_boundary(self): + cfg = _make_xhttp(uplink_chunk_size="1-9999999999999999") + assert cfg.uplink_chunk_size == "1-9999999999999999" + + +class TestXHTTPTransportConfigUplinkChunkSizeInvalidStrings: + def test_alphabetic_string_raises(self): + with pytest.raises(ValidationError): + _make_xhttp(uplink_chunk_size="big") + + def test_leading_dash_raises(self): + with pytest.raises(ValidationError): + _make_xhttp(uplink_chunk_size="-512") + + def test_double_dash_raises(self): + with pytest.raises(ValidationError): + _make_xhttp(uplink_chunk_size="512--1024") + + def test_too_many_digits_raises(self): + with pytest.raises(ValidationError): + _make_xhttp(uplink_chunk_size="12345678901234567") # 17 digits + + def test_trailing_dash_raises(self): + with pytest.raises(ValidationError): + _make_xhttp(uplink_chunk_size="512-") + + +class TestXHTTPTransportConfigUplinkChunkSizeSerialization: + """Verify serialization alias is applied correctly.""" + + def test_serialized_alias_is_uplinkChunkSize(self): + cfg = _make_xhttp(uplink_chunk_size="256") + serialized = cfg.model_dump(by_alias=True, exclude_none=True) + assert "uplinkChunkSize" in serialized + assert serialized["uplinkChunkSize"] == "256" + + def test_none_excluded_from_serialization(self): + cfg = _make_xhttp(uplink_chunk_size=None) + serialized = cfg.model_dump(by_alias=True, exclude_none=True) + assert "uplinkChunkSize" not in serialized \ No newline at end of file diff --git a/tests/test_models_user.py b/tests/test_models_user.py new file mode 100644 index 00000000..c5eb7a2e --- /dev/null +++ b/tests/test_models_user.py @@ -0,0 +1,60 @@ +"""Tests for app/models/user.py — BulkUsersProxy no longer has flow field. + +The PR removed: + flow: XTLSFlows | None = Field(default=None) +from BulkUsersProxy. +""" +import pytest +from pydantic import ValidationError + +from app.models.user import BulkUsersProxy +from app.models.proxy import ShadowsocksMethods + + +class TestBulkUsersProxyNoFlow: + """BulkUsersProxy should not have a flow field.""" + + def test_no_flow_attribute(self): + obj = BulkUsersProxy() + assert not hasattr(obj, "flow"), "BulkUsersProxy must not have a flow field after this PR" + + def test_fields_do_not_include_flow(self): + obj = BulkUsersProxy() + fields = obj.model_dump() + assert "flow" not in fields + + def test_method_field_exists_and_defaults_none(self): + obj = BulkUsersProxy() + assert obj.method is None + + def test_method_can_be_set(self): + obj = BulkUsersProxy(method=ShadowsocksMethods.AES_256_GCM) + assert obj.method == ShadowsocksMethods.AES_256_GCM + + def test_all_shadowsocks_methods_accepted(self): + for method in ShadowsocksMethods: + obj = BulkUsersProxy(method=method) + assert obj.method == method + + def test_xtls_flows_not_importable_from_user_module(self): + import app.models.user as user_module + assert not hasattr(user_module, "XTLSFlows"), ( + "XTLSFlows should not be present in app.models.user after this PR" + ) + + def test_bulk_users_proxy_inherits_filter_fields(self): + # BulkUsersProxy inherits from BulkUserFilter which has group_ids, admins, users, etc. + obj = BulkUsersProxy(group_ids=[1, 2]) + assert obj.group_ids == [1, 2] + + +class TestBulkUsersProxyMethodSerialization: + def test_method_serializes_correctly(self): + obj = BulkUsersProxy(method=ShadowsocksMethods.CHACHA20_POLY1305) + d = obj.model_dump() + assert d["method"] == "chacha20-ietf-poly1305" + + def test_none_method_serializes_as_none(self): + obj = BulkUsersProxy(method=None) + d = obj.model_dump() + assert d["method"] is None \ No newline at end of file diff --git a/tests/test_models_user_template.py b/tests/test_models_user_template.py new file mode 100644 index 00000000..186f8433 --- /dev/null +++ b/tests/test_models_user_template.py @@ -0,0 +1,67 @@ +"""Tests for app/models/user_template.py — ExtraSettings no longer has flow field. + +The PR removed: + flow: XTLSFlows | None = Field(XTLSFlows.NONE) +from ExtraSettings. +""" +import pytest +from pydantic import ValidationError + +from app.models.user_template import ExtraSettings +from app.models.proxy import ShadowsocksMethods + + +class TestExtraSettingsNoFlow: + """ExtraSettings should not have a flow field.""" + + def test_no_flow_attribute(self): + es = ExtraSettings() + assert not hasattr(es, "flow"), "ExtraSettings must not have a flow field after this PR" + + def test_fields_do_not_include_flow(self): + es = ExtraSettings() + fields = es.model_dump() + assert "flow" not in fields + + def test_only_method_field_exists(self): + es = ExtraSettings() + fields = set(es.model_dump().keys()) + assert fields == {"method"} + + def test_default_method_is_chacha20(self): + es = ExtraSettings() + assert es.method == ShadowsocksMethods.CHACHA20_POLY1305 + + def test_method_can_be_none(self): + es = ExtraSettings(method=None) + assert es.method is None + + def test_all_shadowsocks_methods_accepted(self): + for method in ShadowsocksMethods: + es = ExtraSettings(method=method) + assert es.method == method + + def test_xtls_flows_not_importable_from_user_template(self): + import app.models.user_template as ut_module + assert not hasattr(ut_module, "XTLSFlows"), ( + "XTLSFlows should not be importable from app.models.user_template after this PR" + ) + + +class TestExtraSettingsDict: + """ExtraSettings.dict() should not include flow.""" + + def test_dict_no_flow(self): + es = ExtraSettings() + d = es.dict() + assert "flow" not in d + + def test_dict_contains_method(self): + es = ExtraSettings(method=ShadowsocksMethods.AES_128_GCM) + d = es.dict() + assert d["method"] == "aes-128-gcm" + + def test_dict_method_none(self): + es = ExtraSettings(method=None) + d = es.dict() + assert d["method"] is None \ No newline at end of file diff --git a/tests/test_models_validators.py b/tests/test_models_validators.py new file mode 100644 index 00000000..4ba8af79 --- /dev/null +++ b/tests/test_models_validators.py @@ -0,0 +1,89 @@ +"""Tests for app/models/validators.py — ProxyValidator change. + +The PR changed: + if value is None: + return value +to: + if not value: + return None + +This means that empty strings now also produce None instead of being passed +through to the regex check, which previously would have raised a ValueError. +""" +import pytest + +from app.models.validators import ProxyValidator + + +class TestProxyValidatorEmptyString: + """Empty string should now return None (same as None input).""" + + def test_none_returns_none(self): + assert ProxyValidator.validate_proxy_url(None) is None + + def test_empty_string_returns_none(self): + # Key change: empty string must now return None instead of raising + result = ProxyValidator.validate_proxy_url("") + assert result is None + + def test_empty_string_does_not_raise(self): + # Previously empty string would hit the regex and raise ValueError + try: + result = ProxyValidator.validate_proxy_url("") + except ValueError: + pytest.fail("validate_proxy_url('') should not raise ValueError after the PR change") + + +class TestProxyValidatorValidUrls: + """Valid proxy URLs should pass through unchanged.""" + + def test_http_host_port(self): + url = "http://example.com:8080" + assert ProxyValidator.validate_proxy_url(url) == url + + def test_https_host_port(self): + url = "https://proxy.example.com:443" + assert ProxyValidator.validate_proxy_url(url) == url + + def test_socks4_host_port(self): + url = "socks4://10.0.0.1:1080" + assert ProxyValidator.validate_proxy_url(url) == url + + def test_socks5_host_port(self): + url = "socks5://10.0.0.1:1080" + assert ProxyValidator.validate_proxy_url(url) == url + + def test_http_with_auth(self): + url = "http://user:pass@host.example.com:3128" + assert ProxyValidator.validate_proxy_url(url) == url + + def test_socks5_with_auth(self): + url = "socks5://alice:secret@proxy.local:1080" + assert ProxyValidator.validate_proxy_url(url) == url + + +class TestProxyValidatorInvalidUrls: + """Invalid proxy URL strings should raise ValueError.""" + + def test_missing_scheme_raises(self): + with pytest.raises(ValueError, match="proxy_url must be a valid proxy address"): + ProxyValidator.validate_proxy_url("host.example.com:8080") + + def test_unsupported_scheme_raises(self): + with pytest.raises(ValueError, match="proxy_url must be a valid proxy address"): + ProxyValidator.validate_proxy_url("ftp://host.example.com:21") + + def test_missing_port_raises(self): + with pytest.raises(ValueError, match="proxy_url must be a valid proxy address"): + ProxyValidator.validate_proxy_url("http://host.example.com") + + def test_whitespace_only_returns_none(self): + # Falsy value check: whitespace string is truthy, but let's verify behaviour + # A whitespace-only string is truthy in Python, so it should reach the regex + # and raise ValueError (it won't match the scheme pattern) + with pytest.raises(ValueError): + ProxyValidator.validate_proxy_url(" ") + + def test_arbitrary_string_raises(self): + with pytest.raises(ValueError, match="proxy_url must be a valid proxy address"): + ProxyValidator.validate_proxy_url("not-a-proxy-url") \ No newline at end of file diff --git a/tests/test_node_user.py b/tests/test_node_user.py new file mode 100644 index 00000000..e4d75e16 --- /dev/null +++ b/tests/test_node_user.py @@ -0,0 +1,238 @@ +"""Tests for app/node/user.py — _serialize_user_for_node with allowed_protocols filtering. + +The PR refactored _serialize_user_for_node to conditionally include proxy +parameters based on the allowed_protocols frozenset. Previously it always +passed all proxy kwargs (filtering only by what create_proxy accepted via +_CREATE_PROXY_PARAMS). Now it explicitly guards each protocol block. + +PasarGuardNodeBridge is mocked because it is not available in the test env. +""" +import sys +from types import ModuleType +from unittest.mock import MagicMock, patch, call + +import pytest + +from app.models.protocol import ProxyProtocol + + +# --------------------------------------------------------------------------- +# Bootstrap: create a minimal mock of the PasarGuardNodeBridge package so +# the module under test can be imported without the real library. +# --------------------------------------------------------------------------- + +def _make_bridge_mock(): + bridge_pkg = ModuleType("PasarGuardNodeBridge") + bridge_pkg.create_proxy = MagicMock(return_value=MagicMock(name="proxy_obj")) + bridge_pkg.create_user = MagicMock(return_value=MagicMock(name="user_obj")) + + common_pkg = ModuleType("PasarGuardNodeBridge.common") + service_pb2_pkg = ModuleType("PasarGuardNodeBridge.common.service_pb2") + service_pb2_pkg.User = MagicMock(name="ProtoUser") + + bridge_pkg.common = common_pkg + common_pkg.service_pb2 = service_pb2_pkg + + sys.modules.setdefault("PasarGuardNodeBridge", bridge_pkg) + sys.modules.setdefault("PasarGuardNodeBridge.common", common_pkg) + sys.modules.setdefault("PasarGuardNodeBridge.common.service_pb2", service_pb2_pkg) + + return bridge_pkg + + +_BRIDGE_MOCK = _make_bridge_mock() + + +# Now safe to import +from app.node.user import _serialize_user_for_node, _ALL_PROXY_PROTOCOLS # noqa: E402 + + +# --------------------------------------------------------------------------- +# Sample user settings covering all six protocols +# --------------------------------------------------------------------------- + +FULL_USER_SETTINGS = { + "vmess": {"id": "aaaabbbb-cccc-dddd-eeee-ffffaaaabbbb"}, + "vless": {"id": "11112222-3333-4444-5555-666677778888"}, + "trojan": {"password": "trojan-pass"}, + "shadowsocks": {"password": "ss-pass", "method": "chacha20-ietf-poly1305"}, + "wireguard": {"public_key": "wg-pub-key", "peer_ips": ["10.0.0.2/32"]}, + "hysteria": {"auth": "hysteria-auth"}, +} + + +def _call_serialize(user_settings: dict, allowed_protocols=None): + """Helper that resets mocks, calls _serialize_user_for_node, and returns (create_proxy_kwargs, create_user_args).""" + _BRIDGE_MOCK.create_proxy.reset_mock() + _BRIDGE_MOCK.create_user.reset_mock() + _BRIDGE_MOCK.create_proxy.return_value = MagicMock(name="proxy_obj") + + _serialize_user_for_node( + id=1, + username="testuser", + user_settings=user_settings, + inbounds=["tag1"], + allowed_protocols=allowed_protocols, + ) + + assert _BRIDGE_MOCK.create_proxy.call_count == 1 + kwargs_passed = _BRIDGE_MOCK.create_proxy.call_args.kwargs + return kwargs_passed + + +class TestSerializeUserForNodeAllProtocolsDefault: + """When allowed_protocols is None, all six protocols should be included.""" + + def test_none_allowed_protocols_includes_vmess(self): + kwargs = _call_serialize(FULL_USER_SETTINGS, allowed_protocols=None) + assert "vmess_id" in kwargs + + def test_none_allowed_protocols_includes_vless(self): + kwargs = _call_serialize(FULL_USER_SETTINGS, allowed_protocols=None) + assert "vless_id" in kwargs + + def test_none_allowed_protocols_includes_trojan(self): + kwargs = _call_serialize(FULL_USER_SETTINGS, allowed_protocols=None) + assert "trojan_password" in kwargs + + def test_none_allowed_protocols_includes_shadowsocks(self): + kwargs = _call_serialize(FULL_USER_SETTINGS, allowed_protocols=None) + assert "shadowsocks_password" in kwargs + assert "shadowsocks_method" in kwargs + + def test_none_allowed_protocols_includes_wireguard(self): + kwargs = _call_serialize(FULL_USER_SETTINGS, allowed_protocols=None) + assert "wireguard_public_key" in kwargs + assert "wireguard_peer_ips" in kwargs + + def test_none_allowed_protocols_includes_hysteria(self): + kwargs = _call_serialize(FULL_USER_SETTINGS, allowed_protocols=None) + assert "hysteria_auth" in kwargs + + def test_empty_frozenset_equivalent_to_none(self): + """_ALL_PROXY_PROTOCOLS covers all six protocols.""" + assert _ALL_PROXY_PROTOCOLS == frozenset(ProxyProtocol) + + +class TestSerializeUserForNodeFilteredProtocols: + """When a restricted frozenset is passed, only those protocols appear in kwargs.""" + + def test_only_vmess_allowed(self): + kwargs = _call_serialize(FULL_USER_SETTINGS, frozenset({ProxyProtocol.vmess})) + assert "vmess_id" in kwargs + assert "vless_id" not in kwargs + assert "trojan_password" not in kwargs + assert "shadowsocks_password" not in kwargs + assert "wireguard_public_key" not in kwargs + assert "hysteria_auth" not in kwargs + + def test_only_vless_allowed(self): + kwargs = _call_serialize(FULL_USER_SETTINGS, frozenset({ProxyProtocol.vless})) + assert "vless_id" in kwargs + assert "vmess_id" not in kwargs + + def test_only_trojan_allowed(self): + kwargs = _call_serialize(FULL_USER_SETTINGS, frozenset({ProxyProtocol.trojan})) + assert "trojan_password" in kwargs + assert "vmess_id" not in kwargs + + def test_only_shadowsocks_allowed(self): + kwargs = _call_serialize(FULL_USER_SETTINGS, frozenset({ProxyProtocol.shadowsocks})) + assert "shadowsocks_password" in kwargs + assert "shadowsocks_method" in kwargs + assert "vmess_id" not in kwargs + assert "vless_id" not in kwargs + + def test_only_wireguard_allowed(self): + kwargs = _call_serialize(FULL_USER_SETTINGS, frozenset({ProxyProtocol.wireguard})) + assert "wireguard_public_key" in kwargs + assert "wireguard_peer_ips" in kwargs + assert "vmess_id" not in kwargs + + def test_only_hysteria_allowed(self): + kwargs = _call_serialize(FULL_USER_SETTINGS, frozenset({ProxyProtocol.hysteria})) + assert "hysteria_auth" in kwargs + assert "vmess_id" not in kwargs + + def test_wireguard_only_protocols_frozenset(self): + """Simulates a WireGuard core node with only wireguard protocol.""" + wg_only = frozenset({ProxyProtocol.wireguard}) + kwargs = _call_serialize(FULL_USER_SETTINGS, wg_only) + assert set(kwargs.keys()) == {"wireguard_public_key", "wireguard_peer_ips"} + + def test_xray_subset_vmess_vless_trojan(self): + xray_protocols = frozenset({ProxyProtocol.vmess, ProxyProtocol.vless, ProxyProtocol.trojan}) + kwargs = _call_serialize(FULL_USER_SETTINGS, xray_protocols) + assert "vmess_id" in kwargs + assert "vless_id" in kwargs + assert "trojan_password" in kwargs + assert "shadowsocks_password" not in kwargs + assert "wireguard_public_key" not in kwargs + assert "hysteria_auth" not in kwargs + + def test_empty_allowed_protocols_passes_no_kwargs(self): + kwargs = _call_serialize(FULL_USER_SETTINGS, frozenset()) + assert kwargs == {} + + +class TestSerializeUserForNodeValueMapping: + """Verify that the correct values are extracted from user_settings.""" + + def test_vmess_id_value(self): + kwargs = _call_serialize(FULL_USER_SETTINGS, frozenset({ProxyProtocol.vmess})) + assert kwargs["vmess_id"] == "aaaabbbb-cccc-dddd-eeee-ffffaaaabbbb" + + def test_vless_id_value(self): + kwargs = _call_serialize(FULL_USER_SETTINGS, frozenset({ProxyProtocol.vless})) + assert kwargs["vless_id"] == "11112222-3333-4444-5555-666677778888" + + def test_trojan_password_value(self): + kwargs = _call_serialize(FULL_USER_SETTINGS, frozenset({ProxyProtocol.trojan})) + assert kwargs["trojan_password"] == "trojan-pass" + + def test_shadowsocks_values(self): + kwargs = _call_serialize(FULL_USER_SETTINGS, frozenset({ProxyProtocol.shadowsocks})) + assert kwargs["shadowsocks_password"] == "ss-pass" + assert kwargs["shadowsocks_method"] == "chacha20-ietf-poly1305" + + def test_wireguard_values(self): + kwargs = _call_serialize(FULL_USER_SETTINGS, frozenset({ProxyProtocol.wireguard})) + assert kwargs["wireguard_public_key"] == "wg-pub-key" + assert kwargs["wireguard_peer_ips"] == ["10.0.0.2/32"] + + def test_hysteria_auth_value(self): + kwargs = _call_serialize(FULL_USER_SETTINGS, frozenset({ProxyProtocol.hysteria})) + assert kwargs["hysteria_auth"] == "hysteria-auth" + + def test_wireguard_peer_ips_defaults_to_empty_list_when_missing(self): + settings = {"wireguard": {"public_key": "some-key"}} # no peer_ips + kwargs = _call_serialize(settings, frozenset({ProxyProtocol.wireguard})) + assert kwargs["wireguard_peer_ips"] == [] + + def test_missing_protocol_settings_returns_none(self): + # Settings dict missing vmess key entirely + settings = {} + kwargs = _call_serialize(settings, frozenset({ProxyProtocol.vmess})) + assert kwargs["vmess_id"] is None + + +class TestSerializeUserForNodeCreateUserCall: + """Verify create_user is called with the expected arguments.""" + + def test_create_user_called_once(self): + _BRIDGE_MOCK.create_user.reset_mock() + _BRIDGE_MOCK.create_proxy.reset_mock() + _serialize_user_for_node(1, "bob", FULL_USER_SETTINGS, ["tag1"], None) + assert _BRIDGE_MOCK.create_user.call_count == 1 + + def test_create_user_receives_id_dot_username(self): + _BRIDGE_MOCK.create_user.reset_mock() + _serialize_user_for_node(42, "alice", FULL_USER_SETTINGS, ["tag1"], None) + positional_args = _BRIDGE_MOCK.create_user.call_args.args + assert positional_args[0] == "42.alice" + + def test_create_user_receives_inbounds(self): + _BRIDGE_MOCK.create_user.reset_mock() + _serialize_user_for_node(1, "bob", FULL_USER_SETTINGS, ["tag_a", "tag_b"], None) + positional_args = _BRIDGE_MOCK.create_user.call_args.args + assert positional_args[2] == ["tag_a", "tag_b"]