diff --git a/github_ops_manager/configuration/cli.py b/github_ops_manager/configuration/cli.py index ad2b213..c08df51 100644 --- a/github_ops_manager/configuration/cli.py +++ b/github_ops_manager/configuration/cli.py @@ -17,8 +17,10 @@ from github_ops_manager.processing.yaml_processor import YAMLProcessor from github_ops_manager.schemas.default_issue import IssueModel, IssuesYAMLModel, PullRequestModel from github_ops_manager.synchronize.driver import run_process_issues_workflow +from github_ops_manager.utils.constants import DEFAULT_MAX_ISSUE_BODY_LENGTH from github_ops_manager.utils.tac import find_issue_with_title from github_ops_manager.utils.templates import construct_jinja2_template_from_file, render_template_with_model +from github_ops_manager.utils.truncation import truncate_test_case_outputs from github_ops_manager.utils.yaml import dump_yaml_to_file, load_test_case_definitions_from_directory, load_yaml_file load_dotenv() @@ -77,6 +79,10 @@ def tac_sync_issues_cli( # can be constructed. template = construct_jinja2_template_from_file(Path(__file__).parent.parent / "templates" / "tac_issues_body.j2") + # Get max body length from environment variable or use default + max_body_length = int(os.getenv("GITHUB_MAX_ISSUE_BODY_LENGTH", str(DEFAULT_MAX_ISSUE_BODY_LENGTH))) + typer.echo(f"Using max issue body length: {max_body_length}") + # Iterate through the test case definitions and ensure matching issues # exist in the YAML file. for test_case_definition in testing_as_code_test_case_definitions_model.test_cases: @@ -100,10 +106,13 @@ def tac_sync_issues_cli( # script will have a control label called "script-already-created". # Additionally, a field named "generated_script_path" will be # populated with the path to the created test automation script. + # + # Truncate outputs if they would exceed the max body length + truncated_test_case = truncate_test_case_outputs(test_case_definition, max_body_length) new_issue = IssueModel( title=test_case_definition.title, body=render_template_with_model( - model=test_case_definition, + model=truncated_test_case, template=template, ), labels=test_case_definition.labels, @@ -116,15 +125,18 @@ def tac_sync_issues_cli( f"path of '{test_case_definition.generated_script_path}' - " "creating a Pull Request" ) + script_path = test_automation_scripts_directory / test_case_definition.generated_script_path new_issue.pull_request = PullRequestModel( title=f"GenAI, Review: {test_case_definition.title}", - files=[test_case_definition.generated_script_path], + files=[str(script_path)], ) desired_issues_yaml_model.issues.append(new_issue) else: # Update the existing issue based upon the test case definition. + # Truncate outputs if they would exceed the max body length + truncated_test_case = truncate_test_case_outputs(test_case_definition, max_body_length) existing_issue.body = render_template_with_model( - model=test_case_definition, + model=truncated_test_case, template=template, ) existing_issue.labels = test_case_definition.labels diff --git a/github_ops_manager/synchronize/issues.py b/github_ops_manager/synchronize/issues.py index 74e3ad6..2019f29 100644 --- a/github_ops_manager/synchronize/issues.py +++ b/github_ops_manager/synchronize/issues.py @@ -1,5 +1,6 @@ """Contains synchronization logic for GitHub issues.""" +import os import time import jinja2 @@ -11,7 +12,9 @@ from github_ops_manager.synchronize.models import SyncDecision from github_ops_manager.synchronize.results import AllIssueSynchronizationResults, IssueSynchronizationResult from github_ops_manager.synchronize.utils import compare_github_field, compare_label_sets +from github_ops_manager.utils.constants import DEFAULT_MAX_ISSUE_BODY_LENGTH from github_ops_manager.utils.templates import construct_jinja2_template_from_file +from github_ops_manager.utils.truncation import truncate_data_dict_outputs logger: structlog.stdlib.BoundLogger = structlog.get_logger(__name__) @@ -140,8 +143,15 @@ async def render_issue_bodies(issues_yaml_model: IssuesYAMLModel) -> IssuesYAMLM logger.error("Encountered a syntax error with the provided issue template", issue_template=issues_yaml_model.issue_template, error=str(exc)) raise + # Get max body length from environment variable or use default + max_body_length = int(os.getenv("GITHUB_MAX_ISSUE_BODY_LENGTH", str(DEFAULT_MAX_ISSUE_BODY_LENGTH))) + for issue in issues_yaml_model.issues: if issue.data is not None: + # Truncate command outputs if present to fit within GitHub's body limit + if "commands" in issue.data: + issue.data = truncate_data_dict_outputs(issue.data, max_body_length) + # Render with all issue fields available render_context = issue.model_dump() try: diff --git a/github_ops_manager/utils/constants.py b/github_ops_manager/utils/constants.py index abbd14b..eab48f7 100644 --- a/github_ops_manager/utils/constants.py +++ b/github_ops_manager/utils/constants.py @@ -23,3 +23,24 @@ DEFAULT_RELEASE_NOTES_HEADER = "# Release Notes\n\nThis document tracks the new features, enhancements, and bug fixes for each release." """Default header expected in release notes file.""" + +# Issue Body Truncation Constants +# ------------------------------- + +DEFAULT_MAX_ISSUE_BODY_LENGTH = 60000 +"""Default maximum length for issue bodies (leaves margin for GitHub's 65,536 limit).""" + +GITHUB_MAX_ISSUE_BODY_LENGTH = 65536 +"""GitHub's actual maximum issue body length.""" + +TRUNCATION_SUFFIX = "\n... [truncated - {remaining} characters removed]" +"""Suffix template appended to truncated content. Use .format(remaining=N) to fill in count.""" + +TEMPLATE_OVERHEAD_PER_COMMAND = 500 +"""Estimated character overhead per command in the TAC issue template (markdown, code fences).""" + +BASE_TEMPLATE_OVERHEAD = 2000 +"""Estimated base overhead for non-command template content (headers, pass criteria, etc).""" + +MIN_OUTPUT_LENGTH = 500 +"""Minimum characters to preserve in truncated output fields for readability.""" diff --git a/github_ops_manager/utils/truncation.py b/github_ops_manager/utils/truncation.py new file mode 100644 index 0000000..e754c21 --- /dev/null +++ b/github_ops_manager/utils/truncation.py @@ -0,0 +1,312 @@ +"""Utilities for truncating issue body content to fit within GitHub's character limits.""" + +from __future__ import annotations + +from copy import deepcopy +from typing import Any + +import structlog + +from github_ops_manager.schemas.tac import TestingAsCodeCommand, TestingAsCodeTestCaseDefinition +from github_ops_manager.utils.constants import ( + BASE_TEMPLATE_OVERHEAD, + DEFAULT_MAX_ISSUE_BODY_LENGTH, + MIN_OUTPUT_LENGTH, + TEMPLATE_OVERHEAD_PER_COMMAND, + TRUNCATION_SUFFIX, +) + +logger: structlog.stdlib.BoundLogger = structlog.get_logger(__name__) + + +def truncate_string_at_end( + content: str, + max_length: int, + truncation_suffix: str = TRUNCATION_SUFFIX, +) -> tuple[str, bool]: + """Truncate a string at the end if it exceeds max_length. + + Args: + content: The string to potentially truncate. + max_length: Maximum allowed length for the result (including truncation suffix). + truncation_suffix: Template for truncation indicator with {remaining} placeholder. + + Returns: + Tuple of (truncated_content, was_truncated). + If truncation occurs, the result includes the truncation suffix. + """ + if not content or len(content) <= max_length: + return content, False + + # Calculate how much space we need for the suffix + # Use a reasonable estimate for the {remaining} placeholder + remaining_chars = len(content) - max_length + suffix = truncation_suffix.format(remaining=remaining_chars) + + # Truncate content to leave room for the suffix + truncate_at = max_length - len(suffix) + if truncate_at <= 0: + # Edge case: max_length is smaller than the suffix itself + return content[:max_length], True + + truncated = content[:truncate_at] + suffix + return truncated, True + + +def estimate_template_overhead(test_case: TestingAsCodeTestCaseDefinition) -> int: + """Estimate the non-output character overhead from the Jinja2 template. + + This estimates how many characters the template will use for fixed content + like headers, markdown formatting, pass criteria, and per-command overhead. + + Args: + test_case: The test case definition to estimate overhead for. + + Returns: + Estimated character count for template overhead. + """ + overhead = BASE_TEMPLATE_OVERHEAD + + # Add per-command overhead (markdown code fences, headers, etc.) + overhead += len(test_case.commands) * TEMPLATE_OVERHEAD_PER_COMMAND + + # Add variable content that isn't truncated + if test_case.purpose: + overhead += len(test_case.purpose) + if test_case.pass_criteria: + overhead += len(test_case.pass_criteria) + if test_case.jobfile_parameters: + overhead += len(test_case.jobfile_parameters) + if test_case.jobfile_parameters_mapping: + overhead += len(test_case.jobfile_parameters_mapping) + + return overhead + + +def calculate_field_sizes(commands: list[TestingAsCodeCommand]) -> list[dict[str, int]]: + """Calculate the current size of output fields in each command. + + Args: + commands: List of commands to analyze. + + Returns: + List of dicts with 'command_output' and 'parsed_output' sizes per command. + """ + sizes = [] + for cmd in commands: + sizes.append( + { + "command_output": len(cmd.command_output or ""), + "parsed_output": len(cmd.parsed_output or ""), + } + ) + return sizes + + +def distribute_budget_proportionally( + field_sizes: list[dict[str, int]], + available_budget: int, +) -> list[dict[str, int]]: + """Distribute character budget proportionally across output fields. + + Larger fields get a proportionally larger share of the budget. + Each field is guaranteed at least MIN_OUTPUT_LENGTH chars if its original + size was at least that large. + + Args: + field_sizes: List of dicts with current field sizes per command. + available_budget: Total character budget for all output fields. + + Returns: + List of dicts with 'command_output' and 'parsed_output' budgets per command. + """ + # Calculate total current size + total_size = sum(sizes["command_output"] + sizes["parsed_output"] for sizes in field_sizes) + + if total_size == 0 or total_size <= available_budget: + # No truncation needed, return original sizes as budgets + return field_sizes + + # Distribute proportionally + budgets = [] + for sizes in field_sizes: + cmd_budget = {} + for field in ["command_output", "parsed_output"]: + field_size = sizes[field] + if field_size == 0: + cmd_budget[field] = 0 + else: + # Proportional allocation + proportion = field_size / total_size + allocated = int(available_budget * proportion) + # Ensure minimum readable content + cmd_budget[field] = max(allocated, min(field_size, MIN_OUTPUT_LENGTH)) + budgets.append(cmd_budget) + + return budgets + + +def truncate_test_case_outputs( + test_case: TestingAsCodeTestCaseDefinition, + max_body_length: int = DEFAULT_MAX_ISSUE_BODY_LENGTH, +) -> TestingAsCodeTestCaseDefinition: + """Truncate command outputs in a test case definition to fit within the body limit. + + This is the main entry point for truncation. It returns a new TestingAsCodeTestCaseDefinition + with potentially truncated command_output and parsed_output fields. + + Args: + test_case: The original test case definition. + max_body_length: Maximum target issue body length. + + Returns: + A new TestingAsCodeTestCaseDefinition with truncated outputs (if needed). + The original test_case is not modified. + """ + # Calculate available budget for outputs + overhead = estimate_template_overhead(test_case) + available_budget = max_body_length - overhead + + if available_budget <= 0: + logger.warning( + "Template overhead exceeds max body length", + test_case_title=test_case.title, + overhead=overhead, + max_body_length=max_body_length, + ) + available_budget = MIN_OUTPUT_LENGTH * len(test_case.commands) * 2 + + # Calculate current field sizes + field_sizes = calculate_field_sizes(test_case.commands) + + # Check if truncation is needed + total_current_size = sum(sizes["command_output"] + sizes["parsed_output"] for sizes in field_sizes) + if total_current_size <= available_budget: + logger.debug( + "No truncation needed for test case", + test_case_title=test_case.title, + total_output_size=total_current_size, + available_budget=available_budget, + ) + return test_case + + # Distribute budget and truncate + budgets = distribute_budget_proportionally(field_sizes, available_budget) + + # Create a deep copy to avoid mutating the original + truncated_test_case = deepcopy(test_case) + + for idx, (cmd, budget) in enumerate(zip(truncated_test_case.commands, budgets, strict=True)): + # Truncate command_output + if cmd.command_output and len(cmd.command_output) > budget["command_output"]: + original_len = len(cmd.command_output) + cmd.command_output, was_truncated = truncate_string_at_end(cmd.command_output, budget["command_output"]) + if was_truncated: + logger.info( + "Truncated command_output", + test_case_title=test_case.title, + command_index=idx, + command=cmd.command, + original_length=original_len, + truncated_length=len(cmd.command_output), + characters_removed=original_len - len(cmd.command_output), + ) + + # Truncate parsed_output + if cmd.parsed_output and len(cmd.parsed_output) > budget["parsed_output"]: + original_len = len(cmd.parsed_output) + cmd.parsed_output, was_truncated = truncate_string_at_end(cmd.parsed_output, budget["parsed_output"]) + if was_truncated: + logger.info( + "Truncated parsed_output", + test_case_title=test_case.title, + command_index=idx, + command=cmd.command, + original_length=original_len, + truncated_length=len(cmd.parsed_output), + characters_removed=original_len - len(cmd.parsed_output), + ) + + return truncated_test_case + + +def truncate_data_dict_outputs( + data: dict[str, Any], + max_body_length: int = DEFAULT_MAX_ISSUE_BODY_LENGTH, +) -> dict[str, Any]: + """Truncate command outputs within a generic data dictionary. + + This is for use with IssueModel.data which may contain a 'commands' list + with command_output and parsed_output fields as plain dicts. + + Args: + data: The data dictionary potentially containing 'commands'. + max_body_length: Maximum target issue body length. + + Returns: + A new data dictionary with truncated outputs (if needed). + The original data is not modified. + """ + if "commands" not in data: + return data + + commands = data.get("commands", []) + if not commands: + return data + + # Calculate available budget (rough estimate without full test case context) + overhead = BASE_TEMPLATE_OVERHEAD + len(commands) * TEMPLATE_OVERHEAD_PER_COMMAND + + # Add other fields that contribute to body length + for field in ["purpose", "pass_criteria", "jobfile_parameters", "jobfile_parameters_mapping"]: + if field in data and data[field]: + overhead += len(str(data[field])) + + available_budget = max_body_length - overhead + if available_budget <= 0: + available_budget = MIN_OUTPUT_LENGTH * len(commands) * 2 + + # Calculate current sizes + field_sizes = [] + for cmd in commands: + field_sizes.append( + { + "command_output": len(cmd.get("command_output") or ""), + "parsed_output": len(cmd.get("parsed_output") or ""), + } + ) + + # Check if truncation is needed + total_current_size = sum(sizes["command_output"] + sizes["parsed_output"] for sizes in field_sizes) + if total_current_size <= available_budget: + return data + + # Distribute budget + budgets = distribute_budget_proportionally(field_sizes, available_budget) + + # Deep copy and truncate + truncated_data = deepcopy(data) + for idx, (cmd, budget) in enumerate(zip(truncated_data["commands"], budgets, strict=True)): + # Truncate command_output + cmd_output = cmd.get("command_output") + if cmd_output and len(cmd_output) > budget["command_output"]: + cmd["command_output"], _ = truncate_string_at_end(cmd_output, budget["command_output"]) + logger.info( + "Truncated command_output in data dict", + command_index=idx, + original_length=len(cmd_output), + truncated_length=len(cmd["command_output"]), + ) + + # Truncate parsed_output + parsed_output = cmd.get("parsed_output") + if parsed_output and len(parsed_output) > budget["parsed_output"]: + cmd["parsed_output"], _ = truncate_string_at_end(parsed_output, budget["parsed_output"]) + logger.info( + "Truncated parsed_output in data dict", + command_index=idx, + original_length=len(parsed_output), + truncated_length=len(cmd["parsed_output"]), + ) + + return truncated_data diff --git a/tests/unit/test_utils_truncation.py b/tests/unit/test_utils_truncation.py new file mode 100644 index 0000000..a8b9477 --- /dev/null +++ b/tests/unit/test_utils_truncation.py @@ -0,0 +1,330 @@ +"""Unit tests for the truncation utility module.""" + +from github_ops_manager.schemas.tac import TestingAsCodeCommand, TestingAsCodeTestCaseDefinition +from github_ops_manager.utils.constants import MIN_OUTPUT_LENGTH +from github_ops_manager.utils.truncation import ( + calculate_field_sizes, + distribute_budget_proportionally, + estimate_template_overhead, + truncate_data_dict_outputs, + truncate_string_at_end, + truncate_test_case_outputs, +) + + +class TestTruncateStringAtEnd: + """Tests for truncate_string_at_end function.""" + + def test_no_truncation_when_under_limit(self) -> None: + """Content under the limit should not be truncated.""" + content = "Hello, world!" + result, was_truncated = truncate_string_at_end(content, max_length=100) + assert result == content + assert was_truncated is False + + def test_no_truncation_when_at_limit(self) -> None: + """Content exactly at the limit should not be truncated.""" + content = "x" * 50 + result, was_truncated = truncate_string_at_end(content, max_length=50) + assert result == content + assert was_truncated is False + + def test_truncates_long_content(self) -> None: + """Content over the limit should be truncated with indicator.""" + content = "a" * 1000 + result, was_truncated = truncate_string_at_end(content, max_length=100) + assert was_truncated is True + assert len(result) <= 100 + assert "truncated" in result + assert "characters removed" in result + + def test_empty_string_not_truncated(self) -> None: + """Empty string should pass through unchanged.""" + result, was_truncated = truncate_string_at_end("", max_length=100) + assert result == "" + assert was_truncated is False + + def test_none_string_not_truncated(self) -> None: + """None value should pass through unchanged.""" + result, was_truncated = truncate_string_at_end(None, max_length=100) # type: ignore[arg-type] + assert result is None + assert was_truncated is False + + def test_truncation_suffix_includes_char_count(self) -> None: + """Truncation suffix should include the number of characters removed.""" + content = "a" * 1000 + result, was_truncated = truncate_string_at_end(content, max_length=100) + assert was_truncated is True + # The suffix should mention how many chars were removed + # Check that a number is in the result + import re + + match = re.search(r"\d+", result.split("truncated")[1]) + assert match is not None + chars_removed = int(match.group()) + assert chars_removed > 0 + + +class TestEstimateTemplateOverhead: + """Tests for estimate_template_overhead function.""" + + def test_basic_overhead_calculation(self) -> None: + """Basic test case should have minimal overhead.""" + test_case = TestingAsCodeTestCaseDefinition( + title="Test", + purpose="Simple purpose", + labels=["test"], + commands=[ + TestingAsCodeCommand(command="show version"), + ], + ) + overhead = estimate_template_overhead(test_case) + # Should include base overhead + 1 command overhead + purpose length + assert overhead > 0 + assert overhead >= len("Simple purpose") + + def test_multiple_commands_increase_overhead(self) -> None: + """More commands should increase overhead.""" + test_case_1_cmd = TestingAsCodeTestCaseDefinition( + title="Test", + purpose="Purpose", + labels=["test"], + commands=[TestingAsCodeCommand(command="cmd1")], + ) + test_case_3_cmd = TestingAsCodeTestCaseDefinition( + title="Test", + purpose="Purpose", + labels=["test"], + commands=[ + TestingAsCodeCommand(command="cmd1"), + TestingAsCodeCommand(command="cmd2"), + TestingAsCodeCommand(command="cmd3"), + ], + ) + overhead_1 = estimate_template_overhead(test_case_1_cmd) + overhead_3 = estimate_template_overhead(test_case_3_cmd) + assert overhead_3 > overhead_1 + + def test_includes_optional_fields(self) -> None: + """Optional fields should contribute to overhead.""" + test_case_minimal = TestingAsCodeTestCaseDefinition( + title="Test", + purpose="Purpose", + labels=["test"], + commands=[TestingAsCodeCommand(command="cmd1")], + ) + test_case_full = TestingAsCodeTestCaseDefinition( + title="Test", + purpose="Purpose", + labels=["test"], + commands=[TestingAsCodeCommand(command="cmd1")], + pass_criteria="* Check X\n* Check Y", + jobfile_parameters="key: value\nkey2: value2", + jobfile_parameters_mapping="mapping info", + ) + overhead_minimal = estimate_template_overhead(test_case_minimal) + overhead_full = estimate_template_overhead(test_case_full) + assert overhead_full > overhead_minimal + + +class TestCalculateFieldSizes: + """Tests for calculate_field_sizes function.""" + + def test_empty_outputs(self) -> None: + """Commands with no outputs should have zero sizes.""" + commands = [TestingAsCodeCommand(command="show version")] + sizes = calculate_field_sizes(commands) + assert len(sizes) == 1 + assert sizes[0]["command_output"] == 0 + assert sizes[0]["parsed_output"] == 0 + + def test_populated_outputs(self) -> None: + """Commands with outputs should have correct sizes.""" + commands = [ + TestingAsCodeCommand( + command="show version", + command_output="output" * 100, + parsed_output="parsed" * 50, + ), + ] + sizes = calculate_field_sizes(commands) + assert sizes[0]["command_output"] == 600 # "output" * 100 = 600 chars + assert sizes[0]["parsed_output"] == 300 # "parsed" * 50 = 300 chars + + def test_multiple_commands(self) -> None: + """Should calculate sizes for all commands.""" + commands = [ + TestingAsCodeCommand(command="cmd1", command_output="a" * 100), + TestingAsCodeCommand(command="cmd2", parsed_output="b" * 200), + ] + sizes = calculate_field_sizes(commands) + assert len(sizes) == 2 + assert sizes[0]["command_output"] == 100 + assert sizes[0]["parsed_output"] == 0 + assert sizes[1]["command_output"] == 0 + assert sizes[1]["parsed_output"] == 200 + + +class TestDistributeBudgetProportionally: + """Tests for distribute_budget_proportionally function.""" + + def test_no_truncation_when_under_budget(self) -> None: + """Should return original sizes if under budget.""" + field_sizes = [{"command_output": 100, "parsed_output": 100}] + budgets = distribute_budget_proportionally(field_sizes, available_budget=500) + assert budgets == field_sizes + + def test_proportional_distribution(self) -> None: + """Larger fields should get larger budget share.""" + field_sizes = [ + {"command_output": 8000, "parsed_output": 2000}, # 80% vs 20% + ] + budgets = distribute_budget_proportionally(field_sizes, available_budget=5000) + # 8000 is 80% of 10000, so should get ~80% of 5000 = 4000 + # 2000 is 20% of 10000, so should get ~20% of 5000 = 1000 + assert budgets[0]["command_output"] > budgets[0]["parsed_output"] + assert budgets[0]["command_output"] >= 3500 # Roughly 80% of 5000 + + def test_minimum_budget_enforced(self) -> None: + """Each field should get at least MIN_OUTPUT_LENGTH if original was larger.""" + field_sizes = [ + {"command_output": 10000, "parsed_output": 10000}, + ] + # Very small budget + budgets = distribute_budget_proportionally(field_sizes, available_budget=200) + # Even with tiny budget, should get minimum + assert budgets[0]["command_output"] >= MIN_OUTPUT_LENGTH + assert budgets[0]["parsed_output"] >= MIN_OUTPUT_LENGTH + + def test_empty_fields_get_zero_budget(self) -> None: + """Empty fields should get zero budget.""" + field_sizes = [{"command_output": 0, "parsed_output": 1000}] + budgets = distribute_budget_proportionally(field_sizes, available_budget=500) + assert budgets[0]["command_output"] == 0 + + +class TestTruncateTestCaseOutputs: + """Tests for truncate_test_case_outputs function.""" + + def test_no_truncation_when_under_limit(self) -> None: + """Test case under limit should not be modified.""" + test_case = TestingAsCodeTestCaseDefinition( + title="Test", + purpose="Purpose", + labels=["test"], + commands=[ + TestingAsCodeCommand( + command="show version", + command_output="short output", + parsed_output="short parsed", + ), + ], + ) + result = truncate_test_case_outputs(test_case, max_body_length=60000) + assert result.commands[0].command_output == "short output" + assert result.commands[0].parsed_output == "short parsed" + + def test_truncates_large_outputs(self) -> None: + """Large outputs should be truncated.""" + large_output = "x" * 50000 + test_case = TestingAsCodeTestCaseDefinition( + title="Test", + purpose="Purpose", + labels=["test"], + commands=[ + TestingAsCodeCommand( + command="show version", + command_output=large_output, + parsed_output=large_output, + ), + ], + ) + result = truncate_test_case_outputs(test_case, max_body_length=30000) + assert len(result.commands[0].command_output or "") < len(large_output) + assert len(result.commands[0].parsed_output or "") < len(large_output) + assert "truncated" in (result.commands[0].command_output or "") + assert "truncated" in (result.commands[0].parsed_output or "") + + def test_does_not_mutate_original(self) -> None: + """Original test case should not be modified.""" + large_output = "x" * 50000 + test_case = TestingAsCodeTestCaseDefinition( + title="Test", + purpose="Purpose", + labels=["test"], + commands=[ + TestingAsCodeCommand( + command="show version", + command_output=large_output, + ), + ], + ) + original_length = len(test_case.commands[0].command_output or "") + _ = truncate_test_case_outputs(test_case, max_body_length=10000) + # Original should be unchanged + assert len(test_case.commands[0].command_output or "") == original_length + + def test_multiple_commands_distributed(self) -> None: + """Budget should be distributed across multiple commands.""" + test_case = TestingAsCodeTestCaseDefinition( + title="Test", + purpose="Purpose", + labels=["test"], + commands=[ + TestingAsCodeCommand(command="cmd1", command_output="a" * 30000), + TestingAsCodeCommand(command="cmd2", command_output="b" * 30000), + ], + ) + result = truncate_test_case_outputs(test_case, max_body_length=30000) + # Both should be truncated, each getting roughly half the budget + len1 = len(result.commands[0].command_output or "") + len2 = len(result.commands[1].command_output or "") + assert len1 < 30000 + assert len2 < 30000 + # They should be roughly equal since original sizes were equal + assert abs(len1 - len2) < 1000 + + +class TestTruncateDataDictOutputs: + """Tests for truncate_data_dict_outputs function.""" + + def test_no_commands_returns_unchanged(self) -> None: + """Data without commands should pass through unchanged.""" + data = {"key": "value", "other": 123} + result = truncate_data_dict_outputs(data, max_body_length=60000) + assert result == data + + def test_truncates_command_outputs(self) -> None: + """Large outputs in data dict should be truncated.""" + large_output = "x" * 50000 + data = { + "purpose": "Test", + "commands": [ + {"command": "show version", "command_output": large_output}, + ], + } + result = truncate_data_dict_outputs(data, max_body_length=30000) + assert len(result["commands"][0]["command_output"]) < len(large_output) + assert "truncated" in result["commands"][0]["command_output"] + + def test_does_not_mutate_original_dict(self) -> None: + """Original data dict should not be modified.""" + large_output = "x" * 50000 + data = { + "commands": [ + {"command": "show version", "command_output": large_output}, + ], + } + original_length = len(data["commands"][0]["command_output"]) + _ = truncate_data_dict_outputs(data, max_body_length=10000) + assert len(data["commands"][0]["command_output"]) == original_length + + def test_handles_missing_output_fields(self) -> None: + """Commands without output fields should not cause errors.""" + data = { + "commands": [ + {"command": "show version"}, # No output fields + ], + } + result = truncate_data_dict_outputs(data, max_body_length=60000) + assert result["commands"][0] == {"command": "show version"}