From a5929dc4f55ff995bc4186864c73557a30a77830 Mon Sep 17 00:00:00 2001 From: Ashutosh0x Date: Mon, 15 Jun 2026 00:41:38 +0530 Subject: [PATCH] fix: validate module paths in YAML config to prevent arbitrary code execution via importlib --- src/google/adk/agents/config_agent_utils.py | 79 +++++++++++++++ .../agents/test_config_module_validation.py | 99 +++++++++++++++++++ 2 files changed, 178 insertions(+) create mode 100644 tests/unittests/agents/test_config_module_validation.py diff --git a/src/google/adk/agents/config_agent_utils.py b/src/google/adk/agents/config_agent_utils.py index 82aaa6e452..ee768b384e 100644 --- a/src/google/adk/agents/config_agent_utils.py +++ b/src/google/adk/agents/config_agent_utils.py @@ -15,6 +15,8 @@ from __future__ import annotations import importlib +import logging +import re import inspect import os from typing import Any @@ -28,6 +30,80 @@ from .agent_config import AgentConfig from .base_agent import BaseAgent from .base_agent_config import BaseAgentConfig + +logger = logging.getLogger("google_adk." + __name__) + +# Modules that must never be loaded via YAML agent configuration. +# Importing these can lead to arbitrary code execution, file system +# access, or process spawning. +_BLOCKED_MODULE_PREFIXES: tuple[str, ...] = ( + "os", + "sys", + "subprocess", + "shutil", + "socket", + "http", + "ctypes", + "multiprocessing", + "signal", + "importlib", + "pickle", + "shelve", + "marshal", + "code", + "codeop", + "compile", + "compileall", + "runpy", + "builtins", + "io", + "tempfile", + "glob", + "pathlib", + "webbrowser", + "antigravity", +) + +_VALID_MODULE_PATH_RE = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_.]*$") + + +def _validate_module_path(module_path: str) -> None: + """Validates that a module path is safe to import. + + Rejects module paths that reference dangerous standard library modules + or contain suspicious patterns. + + Args: + module_path: Dotted Python module path (e.g., 'my_package.my_module'). + + Raises: + ValueError: If the module path is blocked or invalid. + """ + if not module_path: + raise ValueError("Module path must not be empty.") + + if not _VALID_MODULE_PATH_RE.match(module_path): + raise ValueError( + f"Module path {module_path!r} contains invalid characters." + ) + + # Check for dunder/private module segments. + segments = module_path.split(".") + for segment in segments: + if segment.startswith("__") and segment.endswith("__"): + raise ValueError( + f"Module path {module_path!r} contains dunder segment" + f" {segment!r}." + ) + + # Block dangerous top-level modules. + top_level = segments[0] + if top_level in _BLOCKED_MODULE_PREFIXES: + raise ValueError( + f"Module path {module_path!r} references blocked module" + f" {top_level!r}. Importing arbitrary standard library modules" + " via YAML configuration is not permitted." + ) from .common_configs import AgentRefConfig from .common_configs import CodeConfig @@ -109,6 +185,7 @@ def _load_config_from_path(config_path: str) -> AgentConfig: def resolve_fully_qualified_name(name: str) -> Any: try: module_path, obj_name = name.rsplit(".", 1) + _validate_module_path(module_path) module = importlib.import_module(module_path) return getattr(module, obj_name) except Exception as e: @@ -161,6 +238,7 @@ def _resolve_agent_code_reference(code: str) -> Any: raise ValueError(f"Invalid code reference: {code}") module_path, obj_name = code.rsplit(".", 1) + _validate_module_path(module_path) module = importlib.import_module(module_path) obj = getattr(module, obj_name) @@ -190,6 +268,7 @@ def resolve_code_reference(code_config: CodeConfig) -> Any: raise ValueError("Invalid CodeConfig.") module_path, obj_name = code_config.name.rsplit(".", 1) + _validate_module_path(module_path) module = importlib.import_module(module_path) return getattr(module, obj_name) diff --git a/tests/unittests/agents/test_config_module_validation.py b/tests/unittests/agents/test_config_module_validation.py new file mode 100644 index 0000000000..da82f31c2a --- /dev/null +++ b/tests/unittests/agents/test_config_module_validation.py @@ -0,0 +1,99 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for module path validation in YAML agent config resolution.""" + +import pytest + +from google.adk.agents.config_agent_utils import _validate_module_path + + +class TestValidateModulePath: + """Tests for _validate_module_path blocklist enforcement.""" + + def test_safe_module_passes(self): + """User-defined modules should pass validation.""" + _validate_module_path("my_app.agents.my_agent") + _validate_module_path("google.adk.agents") + _validate_module_path("my_package") + + def test_os_module_blocked(self): + """os module should be blocked.""" + with pytest.raises(ValueError, match="blocked module"): + _validate_module_path("os") + + def test_os_path_blocked(self): + """os.path should be blocked.""" + with pytest.raises(ValueError, match="blocked module"): + _validate_module_path("os.path") + + def test_subprocess_blocked(self): + """subprocess module should be blocked.""" + with pytest.raises(ValueError, match="blocked module"): + _validate_module_path("subprocess") + + def test_sys_blocked(self): + """sys module should be blocked.""" + with pytest.raises(ValueError, match="blocked module"): + _validate_module_path("sys") + + def test_shutil_blocked(self): + """shutil module should be blocked.""" + with pytest.raises(ValueError, match="blocked module"): + _validate_module_path("shutil") + + def test_pickle_blocked(self): + """pickle module should be blocked.""" + with pytest.raises(ValueError, match="blocked module"): + _validate_module_path("pickle") + + def test_importlib_blocked(self): + """importlib module should be blocked.""" + with pytest.raises(ValueError, match="blocked module"): + _validate_module_path("importlib") + + def test_builtins_blocked(self): + """builtins module should be blocked.""" + with pytest.raises(ValueError, match="blocked module"): + _validate_module_path("builtins") + + def test_socket_blocked(self): + """socket module should be blocked.""" + with pytest.raises(ValueError, match="blocked module"): + _validate_module_path("socket") + + def test_empty_path_blocked(self): + """Empty module path should be rejected.""" + with pytest.raises(ValueError, match="must not be empty"): + _validate_module_path("") + + def test_invalid_characters_blocked(self): + """Module paths with special characters should be rejected.""" + with pytest.raises(ValueError, match="invalid characters"): + _validate_module_path("os;import sys") + + def test_dunder_segment_blocked(self): + """Module paths with __dunder__ segments should be rejected.""" + with pytest.raises(ValueError, match="dunder segment"): + _validate_module_path("my_app.__builtins__.evil") + + def test_google_adk_passes(self): + """google.adk modules should pass (not blocked).""" + _validate_module_path("google.adk.tools.my_tool") + _validate_module_path("google.adk.agents.llm_agent") + + def test_multiprocessing_blocked(self): + """multiprocessing should be blocked.""" + with pytest.raises(ValueError, match="blocked module"): + _validate_module_path("multiprocessing.pool")