diff --git a/github_ops_manager/configuration/cli.py b/github_ops_manager/configuration/cli.py index c08df51..21cd527 100644 --- a/github_ops_manager/configuration/cli.py +++ b/github_ops_manager/configuration/cli.py @@ -216,8 +216,43 @@ def process_issues_cli( create_prs: Annotated[bool, Option(envvar="CREATE_PRS", help="Create PRs for issues.")] = False, debug: Annotated[bool, Option(envvar="DEBUG", help="Enable debug mode.")] = False, testing_as_code_workflow: Annotated[bool, Option(envvar="TESTING_AS_CODE_WORKFLOW", help="Enable Testing as Code workflow.")] = False, + catalog_repo: Annotated[ + str, + Option( + envvar="CATALOG_REPO", + help="Catalog repository name (owner/repo) for catalog-destined test cases. Used when test cases have catalog_destined=true.", + ), + ] = "Testing-as-Code/tac-catalog", + test_cases_dir: Annotated[ + Path, + Option( + envvar="TEST_CASES_DIR", + help="Directory containing test_cases.yaml files for catalog PR metadata writeback. Used when test cases have catalog_destined=true.", + ), + ] = Path("workspace/test_cases/"), + create_tracking_issues: Annotated[ + bool, + Option( + envvar="CREATE_TRACKING_ISSUES", + help="Create tracking issues in project repo for catalog PRs and parameter learning tasks.", + ), + ] = False, + tracking_issue_labels: Annotated[ + str | None, + Option( + envvar="TRACKING_ISSUE_LABELS", + help="Comma-separated list of labels to apply to tracking issues (e.g., 'parameter-learning,catalog-pr').", + ), + ] = None, ) -> None: - """Processes issues in a GitHub repository.""" + """Processes issues in a GitHub repository. + + Automatically detects catalog-destined test cases (catalog_destined=true) and creates + PRs against the catalog repository with proper directory structure and metadata writeback. + Non-catalog test cases are processed normally against the project repository. + + Optionally creates tracking issues for catalog PRs to track parameter learning tasks. + """ repo: str = ctx.obj["repo"] github_api_url: str = ctx.obj["github_api_url"] github_pat_token: str = ctx.obj["github_pat_token"] @@ -229,6 +264,11 @@ def process_issues_cli( if testing_as_code_workflow is True: typer.echo("Testing as Code workflow is enabled - any Pull Requests created will have an augmented body") + # Parse tracking issue labels from comma-separated string + parsed_labels = None + if tracking_issue_labels: + parsed_labels = [label.strip() for label in tracking_issue_labels.split(",") if label.strip()] + # Run the workflow result = asyncio.run( run_process_issues_workflow( @@ -241,6 +281,10 @@ def process_issues_cli( github_api_url=github_api_url, yaml_path=yaml_path, testing_as_code_workflow=testing_as_code_workflow, + catalog_repo=catalog_repo, + test_cases_dir=test_cases_dir, + create_tracking_issues=create_tracking_issues, + tracking_issue_labels=parsed_labels, ) ) if result.errors: diff --git a/github_ops_manager/github/adapter.py b/github_ops_manager/github/adapter.py index 2cf021c..f450819 100644 --- a/github_ops_manager/github/adapter.py +++ b/github_ops_manager/github/adapter.py @@ -473,13 +473,36 @@ async def list_files_in_pull_request(self, pull_number: int) -> list[Any]: return response.parsed_data async def get_file_content_from_pull_request(self, file_path: str, branch: str) -> str: - """Get the content of a file from a specific branch (typically the PR's head branch).""" + """Get the content of a file from a specific branch (typically the PR's head branch). + + Handles both small files (inline base64 content) and large files (> 1MB, via download_url). + """ response = await self.client.rest.repos.async_get_content( owner=self.owner, repo=self.repo_name, path=file_path, ref=branch, ) + + # For large files (> 1MB), GitHub API returns content as None and provides download_url + if response.parsed_data.content is None or response.parsed_data.content == "": + download_url = getattr(response.parsed_data, "download_url", None) + if download_url: + logger.info( + "File too large for inline content, using download_url", + file_path=file_path, + download_url=download_url, + ) + # Use httpx to download the raw file content + import httpx + + async with httpx.AsyncClient() as client: + download_response = await client.get(download_url) + download_response.raise_for_status() + return download_response.text + else: + raise ValueError(f"File content is empty and no download_url provided for {file_path}") + return base64.b64decode(response.parsed_data.content).decode("utf-8") # Release/Tag Operations diff --git a/github_ops_manager/processing/test_cases_processor.py b/github_ops_manager/processing/test_cases_processor.py new file mode 100644 index 0000000..e123e44 --- /dev/null +++ b/github_ops_manager/processing/test_cases_processor.py @@ -0,0 +1,347 @@ +"""Handles processing of test case YAML files for catalog workflow. + +This module provides utilities for finding, loading, updating, and saving +test case definition files, particularly for writing PR metadata back after +catalog PR creation. +""" + +from pathlib import Path +from typing import Any + +import structlog +from githubkit.versions.latest.models import PullRequest +from ruamel.yaml import YAML + +logger: structlog.stdlib.BoundLogger = structlog.get_logger(__name__) + +# Initialize YAML handler with format preservation +yaml = YAML() +yaml.preserve_quotes = True +yaml.default_flow_style = False + + +# Mapping from tac-quicksilver normalized OS to catalog directory names +OS_TO_CATALOG_DIR_MAP = { + "iosxe": "IOS-XE", + "ios-xe": "IOS-XE", + "ios_xe": "IOS-XE", + "nxos": "NX-OS", + "nx-os": "NX-OS", + "nx_os": "NX-OS", + "iosxr": "IOS-XR", + "ios-xr": "IOS-XR", + "ios_xr": "IOS-XR", + "ios": "IOS", + "ise": "ISE", + "aci": "ACI", + "sdwan": "SD-WAN", + "sd-wan": "SD-WAN", + "dnac": "DNAC", + "catalyst_center": "DNAC", + "spirent": "Spirent", +} + + +def normalize_os_to_catalog_dir(os_name: str) -> str: + """Convert normalized OS name to catalog directory name. + + Args: + os_name: The OS name in normalized form (e.g., "iosxe", "nxos") + + Returns: + Catalog directory name (e.g., "IOS-XE", "NX-OS") + + Example: + >>> normalize_os_to_catalog_dir("ios_xe") + 'IOS-XE' + >>> normalize_os_to_catalog_dir("nxos") + 'NX-OS' + """ + normalized = OS_TO_CATALOG_DIR_MAP.get(os_name.lower(), os_name.upper()) + logger.debug("Normalized OS name to catalog directory", os_name=os_name, catalog_dir=normalized) + return normalized + + +def extract_os_from_robot_content(robot_content: str) -> str | None: + """Extract OS from robot file Test Tags section. + + Looks for the os: tag in the Test Tags section of a Robot Framework file. + This is more reliable than filename parsing since tags are structured metadata. + + Args: + robot_content: Complete content of robot file + + Returns: + Extracted OS name (e.g., "ios-xe", "nx-os") or None if not found + + Example: + >>> content = ''' + ... Test Tags + ... ... os:ios-xe + ... ... category:foundations + ... ''' + >>> extract_os_from_robot_content(content) + 'ios-xe' + """ + import re + + # Regex pattern to find os: tag in Test Tags section + # Matches: os:ios-xe, os:nx-os, etc. + pattern = r"(?:^|\s)os:(\S+)" + + match = re.search(pattern, robot_content, re.MULTILINE | re.IGNORECASE) + + if match: + os_value = match.group(1).lower() + logger.debug("Extracted OS from Test Tags", os=os_value) + return os_value + + logger.warning("Could not find os: tag in robot file Test Tags section") + return None + + +def extract_os_from_robot_filename(filename: str) -> str | None: + """Extract OS from robot filename pattern like verify_ios_xe_*.robot. + + This is a fallback method if Test Tags parsing fails. + Prefer extract_os_from_robot_content() for more reliable extraction. + + Args: + filename: Robot filename (e.g., "verify_ios_xe_interfaces.robot") + + Returns: + Extracted OS name or None if pattern doesn't match + + Example: + >>> extract_os_from_robot_filename("verify_ios_xe_interfaces.robot") + 'ios_xe' + >>> extract_os_from_robot_filename("verify_nx_os_vlans.robot") + 'nx_os' + """ + # Remove .robot extension + base_name = filename.replace(".robot", "") + + # Pattern: __.robot + # OS is typically second part (verify_ios_xe_...) + parts = base_name.split("_") + + if len(parts) >= 3: + # Try two-part OS first (ios_xe, nx_os, ios_xr) + potential_os = f"{parts[1]}_{parts[2]}" + if potential_os in OS_TO_CATALOG_DIR_MAP: + logger.debug("Extracted two-part OS from filename", filename=filename, os=potential_os) + return potential_os + + if len(parts) >= 2: + # Try single-part OS (iosxe, nxos, iosxr, ise, aci, etc.) + potential_os = parts[1] + if potential_os in OS_TO_CATALOG_DIR_MAP: + logger.debug("Extracted single-part OS from filename", filename=filename, os=potential_os) + return potential_os + + logger.warning("Could not extract OS from robot filename", filename=filename) + return None + + +def find_test_cases_files(test_cases_dir: Path) -> list[Path]: + """Find all test_cases.yaml files in directory (non-recursive). + + Only searches the immediate directory to avoid picking up backup files + in subdirectories like .backups/ + + Args: + test_cases_dir: Directory to search for test case files + + Returns: + List of paths to test_cases.yaml files + """ + if not test_cases_dir.exists(): + logger.error("Test cases directory does not exist", test_cases_dir=str(test_cases_dir)) + return [] + + # Look for .yaml and .yml files in immediate directory only (non-recursive) + yaml_files = list(test_cases_dir.glob("*.yaml")) + list(test_cases_dir.glob("*.yml")) + + # Filter for files that likely contain test cases + test_case_files = [] + for yaml_file in yaml_files: + if "test_case" in yaml_file.name.lower(): + test_case_files.append(yaml_file) + + logger.info("Found test case files", count=len(test_case_files), test_cases_dir=str(test_cases_dir)) + return test_case_files + + +def load_test_cases_yaml(filepath: Path) -> dict[str, Any] | None: + """Load test cases YAML preserving formatting. + + Args: + filepath: Path to test cases YAML file + + Returns: + Dictionary containing test cases data, or None on error + """ + try: + with open(filepath, encoding="utf-8") as f: + data = yaml.load(f) + + if not isinstance(data, dict): + logger.error("Test cases file is not a dictionary", filepath=str(filepath)) + return None + + logger.debug("Loaded test cases YAML", filepath=str(filepath), has_test_cases="test_cases" in data) + return data + + except Exception as e: + logger.error("Failed to load test cases YAML", filepath=str(filepath), error=str(e)) + return None + + +def save_test_cases_yaml(filepath: Path, data: dict[str, Any]) -> bool: + """Save test cases YAML preserving formatting. + + Args: + filepath: Path to test cases YAML file + data: Dictionary containing test cases data + + Returns: + True if save succeeded, False otherwise + """ + try: + # CRITICAL: Use atomic write to prevent data loss if yaml.dump() fails + # Write to temporary file first, then rename atomically + import os + import tempfile + + temp_fd, temp_path = tempfile.mkstemp(dir=filepath.parent, prefix=f".{filepath.name}.", suffix=".tmp") + try: + with os.fdopen(temp_fd, "w", encoding="utf-8") as f: + yaml.dump(data, f) + + # Atomic rename - if this fails, original file is untouched + os.replace(temp_path, filepath) + + logger.info("Saved test cases YAML", filepath=str(filepath)) + return True + + except Exception as e: + # Clean up temp file if something went wrong + try: + os.unlink(temp_path) + except Exception: + pass + raise e + + except Exception as e: + logger.error("Failed to save test cases YAML", filepath=str(filepath), error=str(e)) + return False + + +def find_test_case_by_filename(test_cases: list[dict[str, Any]], generated_script_path: str) -> tuple[int, dict[str, Any]] | None: + """Find test case by matching generated_script_path field. + + Args: + test_cases: List of test case dictionaries + generated_script_path: Generated script path to match + + Returns: + Tuple of (index, test_case) or None if not found + """ + for idx, test_case in enumerate(test_cases): + if test_case.get("generated_script_path") == generated_script_path: + logger.debug("Found matching test case", index=idx, generated_script_path=generated_script_path) + return (idx, test_case) + + logger.debug("No matching test case found", generated_script_path=generated_script_path) + return None + + +def update_test_case_with_pr_metadata(test_case: dict[str, Any], pr: PullRequest, catalog_repo_url: str) -> dict[str, Any]: + """Add PR metadata fields to test case. + + Args: + test_case: Test case dictionary to update + pr: GitHub PullRequest object + catalog_repo_url: Full URL to catalog repository + + Returns: + Updated test case dictionary + """ + test_case["catalog_pr_git_url"] = catalog_repo_url + test_case["catalog_pr_number"] = pr.number + test_case["catalog_pr_url"] = pr.html_url + test_case["catalog_pr_branch"] = pr.head.ref + + logger.info( + "Updated test case with PR metadata", + catalog_pr_number=pr.number, + catalog_pr_url=pr.html_url, + catalog_pr_branch=pr.head.ref, + ) + + return test_case + + +def update_test_case_with_issue_metadata(test_case: dict[str, Any], issue_number: int, issue_url: str) -> dict[str, Any]: + """Add project issue metadata fields to test case. + + Args: + test_case: Test case dictionary to update + issue_number: GitHub Issue number + issue_url: GitHub Issue URL + + Returns: + Updated test case dictionary + """ + test_case["project_issue_number"] = issue_number + test_case["project_issue_url"] = issue_url + + logger.info( + "Updated test case with project issue metadata", + project_issue_number=issue_number, + project_issue_url=issue_url, + ) + + return test_case + + +def load_catalog_destined_test_cases(test_cases_dir: Path) -> list[dict[str, Any]]: + """Load test cases that are catalog-destined from test_cases.yaml files. + + Args: + test_cases_dir: Directory containing test_cases.yaml files + + Returns: + List of test case dictionaries with catalog_destined=true + """ + catalog_test_cases = [] + test_case_files = find_test_cases_files(test_cases_dir) + + for test_case_file in test_case_files: + data = load_test_cases_yaml(test_case_file) + if not data or "test_cases" not in data: + continue + + test_cases = data["test_cases"] + if not isinstance(test_cases, list): + logger.warning("test_cases field is not a list", filepath=str(test_case_file)) + continue + + # Filter for catalog-destined test cases with generated scripts + for test_case in test_cases: + is_catalog = test_case.get("catalog_destined", False) + has_script = test_case.get("generated_script_path") + + if is_catalog and has_script: + # Add metadata about source file for later writeback + test_case["_source_file"] = str(test_case_file) + catalog_test_cases.append(test_case) + logger.debug( + "Found catalog-destined test case", + title=test_case.get("title"), + script=has_script, + source_file=str(test_case_file), + ) + + logger.info("Loaded catalog-destined test cases", count=len(catalog_test_cases), test_cases_dir=str(test_cases_dir)) + return catalog_test_cases diff --git a/github_ops_manager/synchronize/driver.py b/github_ops_manager/synchronize/driver.py index 6d1ef58..a48528e 100644 --- a/github_ops_manager/synchronize/driver.py +++ b/github_ops_manager/synchronize/driver.py @@ -10,8 +10,9 @@ from github_ops_manager.github.adapter import GitHubKitAdapter from github_ops_manager.processing.yaml_processor import YAMLProcessingError, YAMLProcessor from github_ops_manager.synchronize.issues import render_issue_bodies, sync_github_issues -from github_ops_manager.synchronize.pull_requests import sync_github_pull_requests +from github_ops_manager.synchronize.pull_requests import create_catalog_pull_requests, sync_github_pull_requests from github_ops_manager.synchronize.results import AllIssueSynchronizationResults, ProcessIssuesResult +from github_ops_manager.synchronize.tracking_issues import create_tracking_issues_for_catalog_prs logger: structlog.stdlib.BoundLogger = structlog.get_logger(__name__) @@ -27,8 +28,16 @@ async def run_process_issues_workflow( yaml_path: Path, raise_on_yaml_error: bool = False, testing_as_code_workflow: bool = False, + catalog_repo: str = "Testing-as-Code/tac-catalog", + test_cases_dir: Path = Path("workspace/test_cases/"), + create_tracking_issues: bool = False, + tracking_issue_labels: list[str] | None = None, ) -> ProcessIssuesResult: - """Run the process-issues workflow: load issues from YAML and return them/errors.""" + """Run the process-issues workflow: load issues from YAML and return them/errors. + + Supports both project and catalog-destined test cases in the same run. + Issues with catalog_destined=true will have PRs created against the catalog repository. + """ processor = YAMLProcessor(raise_on_error=raise_on_yaml_error) try: issues_model = processor.load_issues_model([str(yaml_path)]) @@ -39,7 +48,7 @@ async def run_process_issues_workflow( if issues_model.issue_template: issues_model = await render_issue_bodies(issues_model) - # Set up GitHub adapter. + # Set up GitHub adapter for project repository. github_adapter = await GitHubKitAdapter.create( repo=repo, github_auth_type=github_auth_type, @@ -116,6 +125,17 @@ async def run_process_issues_workflow( start_time = time.time() logger.info("Processing pull requests", start_time=start_time) + + # Build catalog repo URL for metadata writeback + # e.g., "https://api.github.com" -> "https://github.com" + # or "https://wwwin-github.cisco.com/api/v3" -> "https://wwwin-github.cisco.com" + if "api.github.com" in github_api_url: + base_url = "https://github.com" + else: + # For GitHub Enterprise, remove /api/v3 suffix and any trailing slashes + base_url = github_api_url.replace("/api/v3", "").replace("/api", "").rstrip("/") + catalog_repo_url = f"{base_url}/{catalog_repo}" + await sync_github_pull_requests( issues_model.issues, refreshed_issues, @@ -124,8 +144,86 @@ async def run_process_issues_workflow( default_branch, yaml_dir, testing_as_code_workflow=testing_as_code_workflow, + # Catalog configuration for catalog-destined issues + catalog_repo=catalog_repo, + catalog_repo_url=catalog_repo_url, + test_cases_dir=test_cases_dir, + # Auth parameters for creating catalog adapter + github_auth_type=github_auth_type, + github_pat_token=github_pat_token, + github_app_id=github_app_id, + github_app_private_key_path=github_app_private_key_path, + github_app_installation_id=github_app_installation_id, + github_api_url=github_api_url, ) end_time = time.time() total_time = end_time - start_time logger.info("Processed pull requests", start_time=start_time, end_time=end_time, duration=round(total_time, 2)) + + # Create standalone catalog PRs (no issues) for catalog-destined test cases + if test_cases_dir.exists(): + logger.info("Processing catalog-destined test cases from test_cases.yaml files", test_cases_dir=str(test_cases_dir)) + + # Create adapter for catalog repository + catalog_adapter = await GitHubKitAdapter.create( + repo=catalog_repo, + github_auth_type=github_auth_type, + github_pat_token=github_pat_token, + github_app_id=github_app_id, + github_app_private_key_path=github_app_private_key_path, + github_app_installation_id=github_app_installation_id, + github_api_url=github_api_url, + ) + + # Get catalog repository info + catalog_repo_info = await catalog_adapter.get_repository() + catalog_default_branch = catalog_repo_info.default_branch + + # Base directory for resolving robot file paths + # Robot files are typically in workspace/ directory, which is parent of test_cases_dir + base_directory = test_cases_dir.parent if test_cases_dir.name != "." else test_cases_dir + + logger.info( + "Creating catalog PRs", + catalog_repo=catalog_repo, + catalog_default_branch=catalog_default_branch, + base_directory=str(base_directory), + ) + + start_catalog_time = time.time() + catalog_pr_data = await create_catalog_pull_requests( + test_cases_dir=test_cases_dir, + base_directory=base_directory, + catalog_repo=catalog_repo, + catalog_repo_url=catalog_repo_url, + catalog_default_branch=catalog_default_branch, + github_adapter=catalog_adapter, + ) + end_catalog_time = time.time() + catalog_duration = end_catalog_time - start_catalog_time + logger.info("Completed catalog PR creation", duration=round(catalog_duration, 2)) + + # Create tracking issues in project repo for catalog PRs + if create_tracking_issues and catalog_pr_data: + logger.info( + "Creating tracking issues in project repository", + catalog_pr_count=len(catalog_pr_data), + repo=repo, + ) + + start_tracking_time = time.time() + tracking_issues = await create_tracking_issues_for_catalog_prs( + github_adapter=github_adapter, # Project repo adapter + catalog_pr_data=catalog_pr_data, + catalog_repo=catalog_repo, + labels=tracking_issue_labels, + ) + end_tracking_time = time.time() + tracking_duration = end_tracking_time - start_tracking_time + logger.info( + "Completed tracking issue creation", + duration=round(tracking_duration, 2), + issues_created=len(tracking_issues), + ) + return ProcessIssuesResult(issue_sync_results) diff --git a/github_ops_manager/synchronize/pull_requests.py b/github_ops_manager/synchronize/pull_requests.py index 872a9d7..a141038 100644 --- a/github_ops_manager/synchronize/pull_requests.py +++ b/github_ops_manager/synchronize/pull_requests.py @@ -2,12 +2,23 @@ import re from pathlib import Path +from typing import Any import structlog from githubkit.versions.latest.models import Issue, PullRequest from structlog.contextvars import bound_contextvars from github_ops_manager.github.adapter import GitHubKitAdapter +from github_ops_manager.processing.test_cases_processor import ( + extract_os_from_robot_content, + extract_os_from_robot_filename, + find_test_cases_files, + load_catalog_destined_test_cases, + load_test_cases_yaml, + normalize_os_to_catalog_dir, + save_test_cases_yaml, + update_test_case_with_pr_metadata, +) from github_ops_manager.schemas.default_issue import IssueModel, PullRequestModel from github_ops_manager.synchronize.models import SyncDecision from github_ops_manager.synchronize.utils import compare_github_field, compare_label_sets @@ -74,8 +85,19 @@ async def get_pull_request_associated_with_issue(issue: Issue, existing_pull_req return None -async def get_desired_pull_request_file_content(base_directory: Path, desired_issue: IssueModel) -> list[tuple[str, str]]: - """Get the content of the desired pull request files.""" +async def get_desired_pull_request_file_content( + base_directory: Path, desired_issue: IssueModel, catalog_workflow: bool = False +) -> list[tuple[str, str]]: + """Get the content of the desired pull request files. + + Args: + base_directory: Base directory where files are located + desired_issue: Issue model containing pull request information + catalog_workflow: If True, transform .robot file paths to catalog structure + + Returns: + List of tuples (file_path_in_pr, file_content) + """ if desired_issue.pull_request is None: raise ValueError("Desired issue has no pull request associated with it") files: list[tuple[str, str]] = [] @@ -83,7 +105,39 @@ async def get_desired_pull_request_file_content(base_directory: Path, desired_is file_path = base_directory / file logger.info("Checking if file exists", file=file, file_path=str(file_path), base_directory=str(base_directory)) if file_path.exists(): - files.append((file, file_path.read_text(encoding="utf-8"))) + file_content = file_path.read_text(encoding="utf-8") + + # Transform path if catalog workflow and file is a robot file + if catalog_workflow and file.endswith(".robot"): + filename = Path(file).name + + # Try to extract OS from Test Tags in robot file content (preferred) + os_name = extract_os_from_robot_content(file_content) + extraction_method = "test_tags" + + # Fall back to filename parsing if Test Tags parsing fails + if not os_name: + logger.info("Test Tags parsing failed, falling back to filename parsing", filename=filename) + os_name = extract_os_from_robot_filename(filename) + extraction_method = "filename" + + if os_name: + catalog_dir = normalize_os_to_catalog_dir(os_name) + catalog_path = f"catalog/{catalog_dir}/{filename}" + logger.info( + "Transformed robot file path for catalog", + original_path=file, + catalog_path=catalog_path, + os_name=os_name, + catalog_dir=catalog_dir, + extraction_method=extraction_method, + ) + files.append((catalog_path, file_content)) + else: + logger.warning("Could not extract OS from robot file, using original path", filename=filename) + files.append((file, file_content)) + else: + files.append((file, file_content)) else: logger.warning("Pull Request file not found", file=file, issue_title=desired_issue.title) return files @@ -155,9 +209,23 @@ async def decide_github_pull_request_sync_action(desired_issue: IssueModel, exis async def commit_files_to_branch( - desired_issue: IssueModel, existing_issue: Issue, desired_branch_name: str, base_directory: Path, github_adapter: GitHubKitAdapter + desired_issue: IssueModel, + existing_issue: Issue, + desired_branch_name: str, + base_directory: Path, + github_adapter: GitHubKitAdapter, + catalog_workflow: bool = False, ) -> None: - """Commit files to a branch.""" + """Commit files to a branch. + + Args: + desired_issue: Issue model containing pull request information + existing_issue: GitHub Issue object + desired_branch_name: Name of the branch to commit to + base_directory: Base directory where files are located + github_adapter: GitHub adapter for API calls + catalog_workflow: If True, transform robot file paths to catalog structure + """ if desired_issue.pull_request is None: raise ValueError("Desired issue has no pull request associated with it") @@ -166,7 +234,7 @@ async def commit_files_to_branch( logger.info("Preparing files to commit for pull request", issue_title=desired_issue.title, branch=desired_branch_name) for file_path in desired_issue.pull_request.files: try: - files_to_commit = await get_desired_pull_request_file_content(base_directory, desired_issue) + files_to_commit = await get_desired_pull_request_file_content(base_directory, desired_issue, catalog_workflow) except FileNotFoundError as exc: logger.error("File for PR not found or unreadable", file=file_path, error=str(exc)) missing_files.append(file_path) @@ -182,6 +250,77 @@ async def commit_files_to_branch( await github_adapter.commit_files_to_branch(desired_branch_name, files_to_commit, commit_message) +async def write_pr_metadata_to_test_cases( + pr: PullRequest, + catalog_repo_url: str, + test_cases_dir: Path, +) -> None: + """Write PR metadata back to test_cases.yaml files after catalog PR creation. + + Args: + pr: GitHub PullRequest object with created PR information + catalog_repo_url: Full URL to catalog repository + test_cases_dir: Directory containing test_cases.yaml files + """ + logger.info("Writing PR metadata to test cases files", pr_number=pr.number, test_cases_dir=str(test_cases_dir)) + + # Get robot filename from PR files + pr_files = [f.filename for f in pr.changed_files] if hasattr(pr, "changed_files") else [] + robot_files = [f for f in pr_files if f.endswith(".robot")] + + if not robot_files: + logger.warning("No robot files found in PR, cannot write back metadata", pr_number=pr.number) + return + + # For catalog PRs, the filename will be in format: catalog//.robot + # We need to extract just the filename + robot_filename = Path(robot_files[0]).name + + logger.info("Processing robot file for metadata writeback", robot_filename=robot_filename, pr_number=pr.number) + + # Find test_cases.yaml files + test_case_files = find_test_cases_files(test_cases_dir) + + if not test_case_files: + logger.warning("No test case files found in directory", test_cases_dir=str(test_cases_dir)) + return + + # Search through test case files for matching test case + for test_case_file in test_case_files: + data = load_test_cases_yaml(test_case_file) + if not data or "test_cases" not in data: + continue + + test_cases = data["test_cases"] + if not isinstance(test_cases, list): + logger.warning("test_cases field is not a list", filepath=str(test_case_file)) + continue + + # Look for test case with matching generated_script_path + # The generated_script_path might be just the filename or include a directory + for test_case in test_cases: + generated_script_path = test_case.get("generated_script_path") + if generated_script_path and Path(generated_script_path).name == robot_filename: + logger.info( + "Found matching test case, updating with PR metadata", + test_case_file=str(test_case_file), + generated_script_path=generated_script_path, + ) + + # Update test case with PR metadata + update_test_case_with_pr_metadata(test_case, pr, catalog_repo_url) + + # Save updated YAML + if save_test_cases_yaml(test_case_file, data): + logger.info("Successfully wrote PR metadata back to test case file", test_case_file=str(test_case_file)) + return + else: + logger.error("Failed to save test case file", test_case_file=str(test_case_file)) + return + + logger.warning("No matching test case found for robot file", robot_filename=robot_filename) + + async def sync_github_pull_request( desired_issue: IssueModel, existing_issue: Issue, @@ -190,6 +329,9 @@ async def sync_github_pull_request( base_directory: Path, existing_pull_request: PullRequest | None = None, testing_as_code_workflow: bool = False, + catalog_workflow: bool = False, + catalog_repo_url: str | None = None, + test_cases_dir: Path | None = None, ) -> None: """Synchronize a specific pull request for an issue.""" with bound_contextvars( @@ -235,7 +377,7 @@ async def sync_github_pull_request( logger.info("Branch already exists, skipping creation", branch=desired_branch_name) # Commit files to branch - await commit_files_to_branch(desired_issue, existing_issue, desired_branch_name, base_directory, github_adapter) + await commit_files_to_branch(desired_issue, existing_issue, desired_branch_name, base_directory, github_adapter, catalog_workflow) logger.info("Creating new PR for issue", branch=desired_branch_name, base_branch=default_branch) new_pr = await github_adapter.create_pull_request( @@ -247,6 +389,11 @@ async def sync_github_pull_request( logger.info("Created new PR", pr_number=new_pr.number, branch=desired_branch_name) await github_adapter.set_labels_on_issue(new_pr.number, pr_labels) logger.info("Set labels on new PR", pr_number=new_pr.number, labels=pr_labels) + + # Write PR metadata back to test_cases.yaml if catalog workflow + if catalog_workflow and catalog_repo_url and test_cases_dir: + logger.info("Catalog workflow enabled, writing PR metadata back to test cases") + await write_pr_metadata_to_test_cases(new_pr, catalog_repo_url, test_cases_dir) elif pr_sync_decision == SyncDecision.UPDATE: if existing_pull_request is None: raise ValueError("Existing pull request not found") @@ -257,12 +404,190 @@ async def sync_github_pull_request( body=pr.body, ) await github_adapter.set_labels_on_issue(existing_pull_request.number, pr_labels) - desired_file_data = await get_desired_pull_request_file_content(base_directory, desired_issue) + desired_file_data = await get_desired_pull_request_file_content(base_directory, desired_issue, catalog_workflow) pr_file_sync_decision = await decide_github_pull_request_file_sync_action(desired_file_data, existing_pull_request, github_adapter) if pr_file_sync_decision == SyncDecision.CREATE: # The branch will already exist, so we don't need to create it. # However, we do need to commit the files to the branch. - await commit_files_to_branch(desired_issue, existing_issue, desired_branch_name, base_directory, github_adapter) + await commit_files_to_branch(desired_issue, existing_issue, desired_branch_name, base_directory, github_adapter, catalog_workflow) + + +async def create_catalog_pull_requests( + test_cases_dir: Path, + base_directory: Path, + catalog_repo: str, + catalog_repo_url: str, + catalog_default_branch: str, + github_adapter: GitHubKitAdapter, +) -> list[dict[str, Any]]: + """Create standalone PRs for catalog-destined test cases (no issues). + + Reads test_cases.yaml files directly and creates PRs in catalog repository + without creating issues. This is simpler than the full issue/PR workflow. + + Args: + test_cases_dir: Directory containing test_cases.yaml files + base_directory: Base directory where robot files are located + catalog_repo: Catalog repository name (owner/repo) + catalog_repo_url: Full URL to catalog repository + catalog_default_branch: Default branch in catalog repository + github_adapter: GitHub adapter for catalog repository + + Returns: + List of dicts with keys: pr, test_cases, os_name, branch_name, catalog_path + """ + logger.info("Creating standalone catalog PRs", test_cases_dir=str(test_cases_dir), catalog_repo=catalog_repo) + + # Load catalog-destined test cases from test_cases.yaml files + catalog_test_cases = load_catalog_destined_test_cases(test_cases_dir) + + if not catalog_test_cases: + logger.info("No catalog-destined test cases found") + return [] + + logger.info("Found catalog-destined test cases to process", count=len(catalog_test_cases)) + + # Accumulate created PR data for tracking issue creation + created_pr_data = [] + + for test_case in catalog_test_cases: + title = test_case.get("title", "Untitled Test Case") + script_path = test_case.get("generated_script_path") + source_file = Path(test_case.get("_source_file")) + + if not script_path: + logger.warning("Test case missing generated_script_path", title=title) + continue + + logger.info("Processing catalog test case", title=title, script_path=script_path) + + # Check if PR already exists + existing_pr_number = test_case.get("catalog_pr_number") + existing_pr_url = test_case.get("catalog_pr_url") + + if existing_pr_number and existing_pr_url: + logger.info("PR already exists for test case, skipping", title=title, pr_number=existing_pr_number, pr_url=existing_pr_url) + continue + + # Build file path + robot_file_path = base_directory / script_path + if not robot_file_path.exists(): + logger.error("Robot file not found", file=str(robot_file_path), title=title) + continue + + # Read robot file content + robot_content = robot_file_path.read_text(encoding="utf-8") + + # Extract OS from Test Tags + os_name = extract_os_from_robot_content(robot_content) + if not os_name: + logger.info("Test Tags parsing failed, falling back to filename parsing", filename=robot_file_path.name) + os_name = extract_os_from_robot_filename(robot_file_path.name) + + if not os_name: + logger.error("Could not extract OS from robot file", file=str(robot_file_path), title=title) + continue + + # Transform path for catalog + catalog_dir = normalize_os_to_catalog_dir(os_name) + catalog_path = f"catalog/{catalog_dir}/{robot_file_path.name}" + + logger.info( + "Transformed robot file path for catalog", + original_path=str(script_path), + catalog_path=catalog_path, + os_name=os_name, + catalog_dir=catalog_dir, + ) + + # Create branch name following conventional Git naming patterns + # feat//add- (e.g., feat/ios-xe/add-verify-iosxe-error-disable-detection-reason-presence) + branch_name = f"feat/{os_name}/add-{robot_file_path.stem}".lower().replace("_", "-") + + # Check if branch exists + if await github_adapter.branch_exists(branch_name): + logger.info("Branch already exists, skipping", branch=branch_name, title=title) + # TODO: Could update existing branch/PR here + continue + + # Create branch + logger.info("Creating branch for catalog PR", branch=branch_name, base_branch=catalog_default_branch) + await github_adapter.create_branch(branch_name, catalog_default_branch) + + # Commit file to branch + commit_message = f"feat: add {catalog_dir} test - {title}" + files_to_commit = [(catalog_path, robot_content)] + + logger.info("Committing file to branch", branch=branch_name, file=catalog_path) + await github_adapter.commit_files_to_branch(branch_name, files_to_commit, commit_message) + + # Create PR + pr_title = f"feat: add {catalog_dir} test - {title}" + pr_body = f"""Catalog contribution for test automation. + +**Test Case:** {title} +**Operating System:** {os_name.upper()} +**Script:** `{catalog_path}` + +This PR adds test automation generated by tac-quicksilver to the catalog for reuse across projects. + +🤖 Automatically generated catalog contribution""" + + logger.info("Creating catalog PR", branch=branch_name, base_branch=catalog_default_branch, title=pr_title) + new_pr = await github_adapter.create_pull_request( + title=pr_title, + head=branch_name, + base=catalog_default_branch, + body=pr_body, + ) + + logger.info("Created catalog PR", pr_number=new_pr.number, pr_url=new_pr.html_url) + + # Store PR data for tracking issue creation + created_pr_data.append( + { + "pr": new_pr, + "test_cases": [test_case], + "os_name": os_name, + "branch_name": branch_name, + "catalog_path": catalog_path, + } + ) + + # Write PR metadata back to test_cases.yaml + logger.info("Writing PR metadata back to test case file", source_file=str(source_file)) + + # Reload the source file + data = load_test_cases_yaml(source_file) + if data and "test_cases" in data: + # Find the test case and update it + test_case_found = False + for tc in data["test_cases"]: + if tc.get("generated_script_path") == script_path: + update_test_case_with_pr_metadata(tc, new_pr, catalog_repo_url) + test_case_found = True + break + + # Save back to file ONLY if we found and updated a test case + if test_case_found: + if save_test_cases_yaml(source_file, data): + logger.info("Successfully wrote PR metadata back to test case file", source_file=str(source_file)) + else: + logger.error("Failed to save test case file", source_file=str(source_file)) + else: + logger.warning( + "Test case not found in source file, skipping save to prevent data loss", + source_file=str(source_file), + script_path=script_path, + ) + + logger.info( + "Completed catalog PR creation", + total_processed=len(catalog_test_cases), + prs_created=len(created_pr_data), + ) + + return created_pr_data async def sync_github_pull_requests( @@ -273,10 +598,51 @@ async def sync_github_pull_requests( default_branch: str, base_directory: Path, testing_as_code_workflow: bool = False, + catalog_repo: str = "Testing-as-Code/tac-catalog", + catalog_repo_url: str | None = None, + test_cases_dir: Path | None = None, + github_auth_type: str | None = None, + github_pat_token: str | None = None, + github_app_id: int | None = None, + github_app_private_key_path: Path | None = None, + github_app_installation_id: int | None = None, + github_api_url: str = "https://api.github.com", ) -> None: - """Process pull requests for issues that specify a pull_request field.""" - desired_issues_with_prs = [issue for issue in desired_issues if issue.pull_request is not None] + """Process pull requests for issues that specify a pull_request field. + + Supports both project and catalog-destined PRs in the same run. + Issues with catalog_destined=true will have PRs created against the catalog repository. + + Args: + desired_issues: List of desired issues from YAML + existing_issues: List of existing issues from GitHub (project repo) + existing_pull_requests: List of existing pull requests from GitHub + github_adapter: GitHub adapter for project repository + default_branch: Default branch name + base_directory: Base directory where files are located + testing_as_code_workflow: If True, augment PR bodies for Testing as Code + catalog_repo: Catalog repository name (owner/repo) + catalog_repo_url: Full URL to catalog repository for metadata writeback + test_cases_dir: Directory containing test_cases.yaml files for metadata writeback + github_auth_type: GitHub authentication type for creating catalog adapter + github_pat_token: GitHub PAT token for creating catalog adapter + github_app_id: GitHub App ID for creating catalog adapter + github_app_private_key_path: GitHub App private key path for creating catalog adapter + github_app_installation_id: GitHub App installation ID for creating catalog adapter + github_api_url: GitHub API URL for creating catalog adapter + """ + # Filter out catalog-destined issues - they are handled separately by create_catalog_pull_requests() + desired_issues_with_prs = [issue for issue in desired_issues if issue.pull_request is not None and not getattr(issue, "catalog_destined", False)] + + logger.info( + "Processing project issues with pull requests (catalog-destined filtered out)", + project_issues_count=len(desired_issues_with_prs), + ) + + # Process each issue (all are project issues since catalog-destined are filtered out) for desired_issue in desired_issues_with_prs: + logger.info("Processing project issue", issue_title=desired_issue.title) + existing_issue = next((issue for issue in existing_issues if issue.title == desired_issue.title), None) if existing_issue is not None: logger.info( @@ -300,4 +666,7 @@ async def sync_github_pull_requests( base_directory, existing_pull_request=existing_pr, testing_as_code_workflow=testing_as_code_workflow, + catalog_workflow=False, # Always False for project issues + catalog_repo_url=None, + test_cases_dir=None, ) diff --git a/github_ops_manager/synchronize/tracking_issues.py b/github_ops_manager/synchronize/tracking_issues.py new file mode 100644 index 0000000..d448a79 --- /dev/null +++ b/github_ops_manager/synchronize/tracking_issues.py @@ -0,0 +1,271 @@ +"""Contains logic for creating tracking issues for catalog PRs and parameter learning tasks.""" + +from pathlib import Path +from typing import Any + +import structlog +from githubkit.versions.latest.models import Issue, PullRequest +from jinja2 import Environment, FileSystemLoader, Template + +from github_ops_manager.github.adapter import GitHubKitAdapter +from github_ops_manager.processing.test_cases_processor import ( + load_test_cases_yaml, + save_test_cases_yaml, + update_test_case_with_issue_metadata, +) + +logger: structlog.stdlib.BoundLogger = structlog.get_logger(__name__) + +# Set up Jinja2 environment for loading templates +TEMPLATES_DIR = Path(__file__).parent.parent / "templates" +jinja_env = Environment(loader=FileSystemLoader(str(TEMPLATES_DIR)), autoescape=False) + + +def load_tracking_issue_template() -> Template: + """Load the tracking issue template from disk. + + Returns: + Jinja2 Template object + """ + return jinja_env.get_template("tracking_issue.j2") + + +def strip_os_tag_from_title(title: str) -> str: + """Strip OS tag prefix from test case title. + + Removes leading OS tags like [IOS-XE], [NX-OS], etc. from the title + to get the clean test case group name that appears in cxtm.yaml. + + Args: + title: Test case title potentially with OS tag prefix + + Returns: + Title without OS tag prefix + + Examples: + >>> strip_os_tag_from_title("[IOS-XE] Verify Interface Status") + "Verify Interface Status" + >>> strip_os_tag_from_title("[NX-OS] Check BGP Neighbors") + "Check BGP Neighbors" + >>> strip_os_tag_from_title("Verify LLDP on all devices") + "Verify LLDP on all devices" + """ + import re + + # Pattern matches [ANYTHING] at the start of the string, followed by optional whitespace + pattern = r"^\[.*?\]\s*" + cleaned_title = re.sub(pattern, "", title) + return cleaned_title + + +def compute_project_branch_name(catalog_branch: str) -> str: + """Compute suggested project repository branch name from catalog branch. + + Replaces 'feat/' prefix with 'learn/' to indicate parameter learning branch. + + Args: + catalog_branch: Catalog repository branch name + + Returns: + Suggested project repository branch name + + Examples: + >>> compute_project_branch_name("feat/nx-os/add-verify-nxos-module-port-number") + "learn/nx-os/add-verify-nxos-module-port-number" + >>> compute_project_branch_name("feat/ios-xe/add-verify-iosxe-interface-status") + "learn/ios-xe/add-verify-iosxe-interface-status" + >>> compute_project_branch_name("feature/test") + "learn/test" + """ + # Replace feat/ or feature/ prefix with learn/ + if catalog_branch.startswith("feat/"): + return catalog_branch.replace("feat/", "learn/", 1) + elif catalog_branch.startswith("feature/"): + return catalog_branch.replace("feature/", "learn/", 1) + else: + # If no feat/feature prefix, just prepend learn/ + return f"learn/{catalog_branch}" + + +async def create_tracking_issue_for_catalog_pr( + github_adapter: GitHubKitAdapter, + catalog_pr: PullRequest, + test_cases: list[dict[str, Any]], + os_name: str, + catalog_repo: str, + labels: list[str] | None = None, +) -> Issue: + """Create a tracking issue in project repo for a catalog PR. + + Args: + github_adapter: GitHub adapter for project repository + catalog_pr: The catalog PR that was created + test_cases: List containing single test case dict (always one test case per catalog PR) + os_name: Operating system name (e.g., "ios-xe", "nxos") + catalog_repo: Catalog repository name (owner/repo) + labels: Optional list of label names to apply to the issue + + Returns: + Created Issue object + """ + # Build issue title + # Since each catalog PR contains exactly one test case, get the test case title + test_case = test_cases[0] + test_case_title = test_case.get("title", "Untitled Test Case") + + # Strip OS tag from title for CLI commands (e.g., "[IOS-XE] Do Thing" -> "Do Thing") + # This matches the test case group name that will appear in cxtm.yaml + clean_title = strip_os_tag_from_title(test_case_title) + + # Compute suggested project branch name from catalog branch + suggested_branch = compute_project_branch_name(catalog_pr.head.ref) + + title = f"Review Catalog PR and Learn Parameters: {test_case_title}" + + # Extract test requirement data from test case + # Commands list should only contain command strings, not full command objects + commands_list = [] + if "commands" in test_case: + for cmd in test_case["commands"]: + if isinstance(cmd, dict): + commands_list.append(cmd.get("command", "")) + else: + commands_list.append(str(cmd)) + + test_requirement = { + "purpose": test_case.get("purpose", ""), + "commands": commands_list, + "pass_criteria": test_case.get("pass_criteria", ""), + "sample_parameters": test_case.get("jobfile_parameters", ""), + "parameters_to_parsed_data_mapping": test_case.get("jobfile_parameters_mapping", ""), + } + + # Load and render the tracking issue template + template = load_tracking_issue_template() + body = template.render( + catalog_pr_title=catalog_pr.title, + catalog_pr_url=catalog_pr.html_url, + catalog_pr_number=catalog_pr.number, + catalog_branch=catalog_pr.head.ref, + suggested_project_branch=suggested_branch, + test_case_title=test_case_title, # Original title with OS tag for display + test_case_title_clean=clean_title, # Clean title for CLI commands + os_name=os_name.upper(), + test_requirement=test_requirement, + ) + + logger.info( + "Creating tracking issue in project repository", + catalog_pr_number=catalog_pr.number, + catalog_pr_url=catalog_pr.html_url, + test_case_title=test_case_title, + os_name=os_name, + ) + + # Create the issue + issue = await github_adapter.create_issue( + title=title, + body=body, + ) + + # Apply labels if provided + if labels: + logger.info("Applying labels to tracking issue", issue_number=issue.number, labels=labels) + await github_adapter.set_labels_on_issue(issue.number, labels) + + logger.info( + "Created tracking issue", + issue_number=issue.number, + issue_url=issue.html_url, + catalog_pr_number=catalog_pr.number, + ) + + # Write issue metadata back to test_cases.yaml + source_file_path = test_case.get("_source_file") + if source_file_path: + source_file = Path(source_file_path) + logger.info("Writing project issue metadata back to test case file", source_file=str(source_file)) + + # Reload the source file + data = load_test_cases_yaml(source_file) + if data and "test_cases" in data: + # Find the test case and update it + # Match by title since that's unique and reliable + test_case_found = False + for tc in data["test_cases"]: + if tc.get("title") == test_case_title: + update_test_case_with_issue_metadata(tc, issue.number, issue.html_url) + test_case_found = True + break + + # Save back to file ONLY if we found and updated a test case + if test_case_found: + if save_test_cases_yaml(source_file, data): + logger.info("Successfully wrote project issue metadata back to test case file", source_file=str(source_file)) + else: + logger.error("Failed to save test case file", source_file=str(source_file)) + else: + logger.warning( + "Test case not found in source file, skipping save to prevent data loss", + source_file=str(source_file), + test_case_title=test_case_title, + ) + else: + logger.warning("Could not load test cases from source file", source_file=str(source_file)) + else: + logger.warning("Test case missing _source_file metadata, cannot write back issue metadata", test_case_title=test_case_title) + + return issue + + +async def create_tracking_issues_for_catalog_prs( + github_adapter: GitHubKitAdapter, + catalog_pr_data: list[dict[str, Any]], + catalog_repo: str, + labels: list[str] | None = None, +) -> list[Issue]: + """Create tracking issues for all catalog PRs that were created. + + Args: + github_adapter: GitHub adapter for project repository + catalog_pr_data: List of dicts with keys: pr, test_cases, os_name + catalog_repo: Catalog repository name (owner/repo) + labels: Optional list of label names to apply to issues + + Returns: + List of created Issue objects + """ + if not catalog_pr_data: + logger.info("No catalog PR data provided, skipping tracking issue creation") + return [] + + logger.info("Creating tracking issues for catalog PRs", count=len(catalog_pr_data), catalog_repo=catalog_repo) + + created_issues = [] + + for pr_data in catalog_pr_data: + pr = pr_data["pr"] + test_cases = pr_data["test_cases"] + os_name = pr_data["os_name"] + + try: + issue = await create_tracking_issue_for_catalog_pr( + github_adapter=github_adapter, + catalog_pr=pr, + test_cases=test_cases, + os_name=os_name, + catalog_repo=catalog_repo, + labels=labels, + ) + created_issues.append(issue) + except Exception as e: + logger.error( + "Failed to create tracking issue for catalog PR", + catalog_pr_number=pr.number, + error=str(e), + exc_info=True, + ) + + logger.info("Completed tracking issue creation", created_count=len(created_issues), total_prs=len(catalog_pr_data)) + + return created_issues diff --git a/github_ops_manager/templates/tracking_issue.j2 b/github_ops_manager/templates/tracking_issue.j2 new file mode 100644 index 0000000..49fb3d2 --- /dev/null +++ b/github_ops_manager/templates/tracking_issue.j2 @@ -0,0 +1,51 @@ +## Catalog PR: {{ catalog_pr_title }} + +**Test Case**: {{ test_case_title }} +**Catalog PR**: {{ catalog_pr_url }} +**Catalog Branch**: `{{ catalog_branch }}` +**Operating System**: {{ os_name }} + +## Test Requirement + +```yaml +purpose: | +{{ test_requirement.purpose | indent(2, first=True) }} +commands: +{% for cmd in test_requirement.commands -%} + - "{{ cmd }}" +{% endfor -%} +pass_criteria: | +{{ test_requirement.pass_criteria | indent(2, first=True) }} +sample_parameters: | +{{ test_requirement.sample_parameters | indent(2, first=True) }} +parameters_to_parsed_data_mapping: | +{{ test_requirement.parameters_to_parsed_data_mapping | indent(2, first=True) }} +``` + +### Tasks + +- [ ] Create a new branch in this project repository to track your test case parameter additions. Suggested branch name: + ```bash + git checkout -b {{ suggested_project_branch }} + ``` + +- [ ] Review the Catalog PR for any glaringly obvious issues + +- [ ] Execute the following command to learn parameters and validate whether test case parameters are successfully inserted into the cxtm.yaml file. If any errors occur, troubleshoot and resolve them by editing the test automation script in the branch associated with the Catalog PR in the Catalog repository: + ```bash + tac-tools scripts learn "{{ test_case_title_clean }}" + ``` + +- [ ] Execute the following command to run tests and validate whether test case parameters are successfully validated against the testbed. If any errors occur, troubleshoot and verify that they are not due to a legitimate issue with the testbed. If the testbed is healthy, edit the test automation script in the branch associated with the Catalog PR in the Catalog repository: + ```bash + tac-tools scripts run "{{ test_case_title_clean }}" + ``` + +- [ ] When the test automation script in the Catalog repository is in a known working condition, assign a Catalog reviewer to review the PR + +- [ ] Submit a PR in this project repository tracking your test case parameters. **Make sure to link the PR to this issue so that this issue is automatically closed when the PR is merged** + +- [ ] Either have the PR reviewed by another engineer (recommended) or merge the PR yourself, depending on your project and team's established processes + +--- +🤖 Automatically generated tracking issue for catalog PR #{{ catalog_pr_number }} diff --git a/pyproject.toml b/pyproject.toml index dbee723..3b16df6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,9 @@ docs = [ "mkdocs-git-revision-date-localized-plugin>=1.2.0", ] +[project.scripts] +github-ops-manager = "github_ops_manager.configuration.cli:typer_app" + [project.urls] "Homepage" = "https://github.com/aitestino/github-ops-manager.git" "Bug Tracker" = "https://github.com/aitestino/github-ops-manager.git/issues"