diff --git a/cloud_pipelines_backend/api_router.py b/cloud_pipelines_backend/api_router.py index 6652637..8822659 100644 --- a/cloud_pipelines_backend/api_router.py +++ b/cloud_pipelines_backend/api_router.py @@ -269,6 +269,97 @@ def stream_container_log( router.get("/api/pipeline_runs/", tags=["pipelineRuns"], **default_config)( inject_session_dependency(list_pipeline_runs_func) ) + router.post( + "/api/pipeline_runs/search/", + tags=["pipelineRuns"], + summary="Search Pipeline Runs", + description="""Search pipeline runs with annotation filters. + +## What is an Annotation? + +An annotation is a **key-value pair** attached to a pipeline run. +For example the following annotations (i.e. key = value): +- `environment` = `production` +- `team` = `backend` +- `priority` = `high` + +## Filter Types + +### KeyFilter - Search by annotation key +| Operator | Description | +|----------|-------------| +| `exists` | Key exists (any value) | +| `equals` | Key exactly matches string | +| `contains` | Key contains substring | +| `in_set` | Key matches one of multiple values | + +### ValueFilter - Search by annotation value (across ALL annotations) +| Operator | Description | +|----------|-------------| +| `equals` | Value exactly matches string | +| `contains` | Value contains substring | +| `in_set` | Value matches one of multiple values | + +### FilterGroup - Combine filters with logic +| Operator | Description | +|----------|-------------| +| `and` | ALL filters must match | +| `or` | ANY filter must match | + +All filters support `negate: true` to invert the condition (e.g., NOT equals). + +--- + +## Examples + +### 1. Key equals a string +Find runs where annotation key equals "environment": +```json +{ + "annotation_filters": { + "filters": [ + {"operator": "equals", "key": "environment"} + ] + } +} +``` + +### 2. Key contains substring AND value in set +Find runs where key contains "env" AND value is "prod" or "staging": +```json +{ + "annotation_filters": { + "filters": [ + {"operator": "contains", "key": "env"}, + {"operator": "in_set", "values": ["prod", "staging"]} + ], + "operator": "and" + } +} +``` + +### 3. Complex: (key contains OR value contains) AND key NOT contains +Find runs where (key contains "env" OR any value contains "prod") AND key NOT contains "deprecated": +```json +{ + "annotation_filters": { + "filters": [ + { + "filters": [ + {"operator": "contains", "key": "env"}, + {"operator": "contains", "value": "prod"} + ], + "operator": "or" + }, + {"operator": "contains", "value": "deprecated", "negate": true} + ], + "operator": "and" + } +} +``` +""", + **default_config, + )(inject_session_dependency(pipeline_run_service.search)) router.get("/api/pipeline_runs/{id}", tags=["pipelineRuns"], **default_config)( inject_session_dependency(pipeline_run_service.get) ) diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index e8e0624..a279d6f 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -1,10 +1,11 @@ import base64 import dataclasses import datetime +import enum import json import logging import typing -from typing import Any, Optional +from typing import Any, Final, Optional, Union if typing.TYPE_CHECKING: from cloud_pipelines.orchestration.storage_providers import ( @@ -31,6 +32,105 @@ def _get_current_time() -> datetime.datetime: from . import errors from .errors import ItemNotFoundError +# ==== Annotation Filter Types for Search ==== + + +class GroupOperator(enum.StrEnum): + """Logical operators for combining filters in a group.""" + + AND = "and" + OR = "or" + + +class KeyFilterOperator(enum.StrEnum): + """Operators for filtering by annotation key.""" + + CONTAINS = "contains" # Key contains substring + EQUALS = "equals" # Key equals exact string + EXISTS = "exists" # Key exists (regardless of value) + IN_SET = "in_set" # Key is in a set of values + + +class ValueFilterOperator(enum.StrEnum): + """Operators for filtering by annotation value.""" + + CONTAINS = "contains" # Value contains substring + EQUALS = "equals" # Value equals exact string + IN_SET = "in_set" # Value is in a set of values + + +@dataclasses.dataclass(kw_only=True) +class KeyFilter: + """Filter annotations by key patterns. + + Examples: + - KeyFilter(operator=KeyFilterOperator.EXISTS, key="environment") + → Find runs that have an "environment" annotation + - KeyFilter(operator=KeyFilterOperator.CONTAINS, key="env", negate=True) + → Find runs that do NOT have any key containing "env" + - KeyFilter(operator=KeyFilterOperator.IN_SET, keys=["env", "team"]) + → Find runs that have a key matching "env" or "team" + """ + + operator: KeyFilterOperator + key: str | None = None # For EXISTS, EQUALS, CONTAINS operators + keys: list[str] | None = None # For IN_SET operator + negate: bool = False # If True, negates the operation (NOT EXISTS, NOT IN, etc.) + + +@dataclasses.dataclass(kw_only=True) +class ValueFilter: + """Filter annotations by value patterns across ALL annotation values. + + Examples: + - ValueFilter(operator=ValueFilterOperator.EQUALS, value="production") + → Find runs where ANY annotation has value "production" + - ValueFilter(operator=ValueFilterOperator.CONTAINS, value="error") + → Find runs where ANY annotation value contains "error" + - ValueFilter(operator=ValueFilterOperator.IN_SET, values=["high", "critical"]) + → Find runs where ANY annotation value is "high" or "critical" + """ + + operator: ValueFilterOperator + value: str | None = None # For EQUALS, CONTAINS operators + values: list[str] | None = None # For IN_SET operator + negate: bool = False # If True, negates the operation + + +# Type alias using Union to support forward reference +AnnotationFilterType = Union[KeyFilter, ValueFilter, "FilterGroup"] + + +@dataclasses.dataclass(kw_only=True) +class FilterGroup: + """A group of filters combined with AND/OR logic. + + Examples: + - FilterGroup(operator=GroupOperator.AND, filters=[...]) + → All filters must match + - FilterGroup(operator=GroupOperator.OR, filters=[...]) + → At least one filter must match + - FilterGroup(filters=[...]) + → Defaults to AND logic when operator is not specified + """ + + # Union type for filter types that can appear in a group (recursive) + filters: list[AnnotationFilterType] + # Operator defaults to None, which is treated as AND logic + # TODO: add documetation why this is more user friendly + operator: GroupOperator | None = None + + +@dataclasses.dataclass(kw_only=True) +class SearchFilters: + """Top-level search filters for pipeline runs. + + The annotation_filters field accepts a FilterGroup that can contain + nested groups and individual filters for complex query logic. + """ + + annotation_filters: FilterGroup | None = None + # ==== PipelineJobService @dataclasses.dataclass(kw_only=True) @@ -63,15 +163,339 @@ class GetPipelineRunResponse(PipelineRunResponse): class ListPipelineJobsResponse: pipeline_runs: list[PipelineRunResponse] next_page_token: str | None = None + debug_where_clause: str | None = ( + None # Populated when debug_where_clause=True in search() + ) import sqlalchemy as sql from sqlalchemy import orm +# Pagination constants +OFFSET_KEY: Final[str] = "offset" +PAGE_SIZE: Final[int] = 10 + + +def _compile_where_clauses_to_string( + session: orm.Session, + where_clauses: list[sql.ColumnElement[bool]], +) -> str: + """Compile WHERE clauses to a SQL string for debugging. + + Uses the dialect from the session's engine (SQLite, MySQL, etc.) + and inlines literal values for readability. + """ + if not where_clauses: + return "(no where clauses)" + + # Combine all clauses with AND + combined = sql.and_(*where_clauses) if len(where_clauses) > 1 else where_clauses[0] + + # Get dialect from session's engine + dialect = session.bind.dialect if session.bind else None + + try: + compiled = combined.compile( + dialect=dialect, + compile_kwargs={"literal_binds": True}, + ) + return str(compiled) + except Exception as e: + # Fallback if literal_binds fails (e.g., for complex types) + try: + compiled = combined.compile(dialect=dialect) + return f"{compiled} [params: {compiled.params}]" + except Exception: + return f"(failed to compile: {e})" + class PipelineRunsApiService_Sql: PIPELINE_NAME_EXTRA_DATA_KEY = "pipeline_name" + def _query_pipeline_runs( + self, + *, + session: orm.Session, + where_clauses: list, + offset: int, + page_size: int, + ) -> list[bts.PipelineRun]: + """Query pipeline runs with pagination and filtering.""" + return list( + session.scalars( + sql.select(bts.PipelineRun) + .where(*where_clauses) + .order_by(bts.PipelineRun.created_at.desc()) + .offset(offset) + .limit(page_size) + ).all() + ) + + def _calculate_next_page_offset( + self, + *, + offset: int, + page_size: int, + ) -> int: + """Calculate the offset for the next page.""" + return offset + page_size + + def _get_next_page_token( + self, + *, + num_results: int, + page_size: int, + next_page_token_dict: dict[str, Any], + ) -> str | None: + """Get the next page token, or None if this is the last page.""" + if num_results < page_size: + return None + return _encode_page_token(next_page_token_dict) + + def _create_pipeline_run_response( + self, + session: orm.Session, + pipeline_run: bts.PipelineRun, + include_pipeline_names: bool = False, + include_execution_stats: bool = False, + ) -> PipelineRunResponse: + """Create a PipelineRunResponse with optional enrichment.""" + response = PipelineRunResponse.from_db(pipeline_run) + if include_pipeline_names: + pipeline_name = None + extra_data = pipeline_run.extra_data or {} + if self.PIPELINE_NAME_EXTRA_DATA_KEY in extra_data: + pipeline_name = extra_data[self.PIPELINE_NAME_EXTRA_DATA_KEY] + else: + execution_node = session.get( + bts.ExecutionNode, pipeline_run.root_execution_id + ) + if execution_node: + task_spec = structures.TaskSpec.from_json_dict( + execution_node.task_spec + ) + component_spec = task_spec.component_ref.spec + if component_spec: + pipeline_name = component_spec.name + response.pipeline_name = pipeline_name + if include_execution_stats: + execution_status_stats = self._calculate_execution_status_stats( + session=session, root_execution_id=pipeline_run.root_execution_id + ) + response.execution_status_stats = { + status.value: count for status, count in execution_status_stats.items() + } + return response + + def _build_list_response( + self, + *, + session: orm.Session, + pipeline_runs: list[bts.PipelineRun], + next_page_token: str | None, + include_pipeline_names: bool = False, + include_execution_stats: bool = False, + ) -> ListPipelineJobsResponse: + """Build the ListPipelineJobsResponse from pipeline runs.""" + return ListPipelineJobsResponse( + pipeline_runs=[ + self._create_pipeline_run_response( + session=session, + pipeline_run=pipeline_run, + include_pipeline_names=include_pipeline_names, + include_execution_stats=include_execution_stats, + ) + for pipeline_run in pipeline_runs + ], + next_page_token=next_page_token, + ) + + def _build_key_filter_condition( + self, + *, + filter: KeyFilter, + ) -> sql.ColumnElement[bool]: + """Build a SQLAlchemy condition for a KeyFilter. + + Returns just the WHERE condition (not wrapped in EXISTS). + The condition is applied to the same annotation row when combined + with other filters in a FilterGroup. + + Args: + filter: KeyFilter with operator, key/keys, and optional negate flag. + + Returns: + A SQLAlchemy condition expression for the key filter. + """ + if filter.operator == KeyFilterOperator.EXISTS: + # Key exists (regardless of value) + condition = bts.PipelineRunAnnotation.key != None + elif filter.operator == KeyFilterOperator.EQUALS: + # Key equals exact string + if not filter.key: + raise ValueError("EQUALS operator requires 'key' to be set") + condition = bts.PipelineRunAnnotation.key == filter.key + elif filter.operator == KeyFilterOperator.CONTAINS: + # Key contains substring + if not filter.key: + raise ValueError("CONTAINS operator requires 'key' to be set") + condition = bts.PipelineRunAnnotation.key.contains(filter.key) + elif filter.operator == KeyFilterOperator.IN_SET: + # Key is in a set of values + if not filter.keys: + raise ValueError("IN_SET operator requires 'keys' to be set") + condition = bts.PipelineRunAnnotation.key.in_(filter.keys) + else: + raise ValueError(f"Unknown KeyFilterOperator: {filter.operator}") + + if filter.negate: + condition = ~condition + + return condition + + def _build_value_filter_condition( + self, + *, + filter: ValueFilter, + ) -> sql.ColumnElement[bool]: + """Build a SQLAlchemy condition for a ValueFilter. + + Returns just the WHERE condition (not wrapped in EXISTS). + The condition is applied to the same annotation row when combined + with other filters in a FilterGroup. + + Args: + filter: ValueFilter with operator, value/values, and optional negate flag. + + Returns: + A SQLAlchemy condition expression for the value filter. + """ + if filter.operator == ValueFilterOperator.EQUALS: + # Value equals exact string + if filter.value is None: + raise ValueError("EQUALS operator requires 'value' to be set") + condition = bts.PipelineRunAnnotation.value == filter.value + elif filter.operator == ValueFilterOperator.CONTAINS: + # Value contains substring + if filter.value is None: + raise ValueError("CONTAINS operator requires 'value' to be set") + condition = bts.PipelineRunAnnotation.value.contains(filter.value) + elif filter.operator == ValueFilterOperator.IN_SET: + # Value is in a set of values + if not filter.values: + raise ValueError("IN_SET operator requires 'values' to be set") + condition = bts.PipelineRunAnnotation.value.in_(filter.values) + else: + raise ValueError(f"Unknown ValueFilterOperator: {filter.operator}") + + if filter.negate: + condition = ~condition + + return condition + + def _build_filter_group_clause( + self, + *, + group: FilterGroup, + ) -> sql.ColumnElement[bool]: + """Build a SQLAlchemy clause for a FilterGroup. + + Design Decision - Single EXISTS per group: + All simple filters (KeyFilter, ValueFilter) within the same group + are merged into a SINGLE EXISTS subquery. This ensures that all + conditions are checked against the SAME annotation row. + + Nested FilterGroups get their OWN separate EXISTS subquery, + which is then combined with the parent group using AND/OR. + + This enables "same row" semantics for queries like: + - annotation[key] == value (key and value in same row) + - key contains 'env' AND value equals 'prod' (same row) + + Args: + group: FilterGroup containing filters and an optional operator. + + Returns: + A SQLAlchemy EXISTS clause (or combined clauses for nested groups). + """ + if not group.filters: + # Empty group matches everything + return sql.true() + + # Separate simple filters from nested groups + simple_filters: list[KeyFilter | ValueFilter] = [] + nested_groups: list[FilterGroup] = [] + + for f in group.filters: + if isinstance(f, FilterGroup): + nested_groups.append(f) + elif isinstance(f, (KeyFilter, ValueFilter)): + simple_filters.append(f) + else: + raise ValueError(f"Unknown filter type: {type(f)}") + + clauses: list[sql.ColumnElement[bool]] = [] + + # Build SINGLE EXISTS for all simple filters in this group + if simple_filters: + # Base subquery for annotation table + subquery = sql.select(bts.PipelineRunAnnotation).where( + bts.PipelineRunAnnotation.pipeline_run_id == bts.PipelineRun.id + ) + + # Build conditions for each simple filter + conditions: list[sql.ColumnElement[bool]] = [] + for f in simple_filters: + if isinstance(f, KeyFilter): + conditions.append(self._build_key_filter_condition(filter=f)) + elif isinstance(f, ValueFilter): + conditions.append(self._build_value_filter_condition(filter=f)) + + # Combine conditions with group operator (AND/OR) + if group.operator == GroupOperator.OR: + combined_condition = sql.or_(*conditions) + else: + # Default to AND if operator is not specified + combined_condition = sql.and_(*conditions) + + # Add combined condition to subquery and wrap in EXISTS + subquery = subquery.where(combined_condition) + clauses.append(subquery.exists()) + + # Build SEPARATE EXISTS for each nested group (recursive) + for nested in nested_groups: + clauses.append(self._build_filter_group_clause(group=nested)) + + # Combine all clauses (simple filters EXISTS + nested group EXISTS) + if len(clauses) == 1: + return clauses[0] + + if group.operator == GroupOperator.OR: + return sql.or_(*clauses) + else: + # Default to AND if operator is not specified + return sql.and_(*clauses) + + def _build_search_where_clauses( + self, + *, + filters: SearchFilters | None, + ) -> list[sql.ColumnElement[bool]]: + """Build where clauses from SearchFilters.""" + where_clauses: list[sql.ColumnElement[bool]] = [] + + if filters is None: + return where_clauses + + # Handle annotation filters + if filters.annotation_filters is not None: + annotation_clause = self._build_filter_group_clause( + group=filters.annotation_filters + ) + where_clauses.append(annotation_clause) + + return where_clauses + def create( self, session: orm.Session, @@ -156,22 +580,125 @@ def terminate( execution_node.extra_data["desired_state"] = "TERMINATED" session.commit() + def search( + self, + *, + session: orm.Session, + filters: SearchFilters | None = None, + page_token: str | None = None, + include_pipeline_names: bool = False, + include_execution_stats: bool = False, + debug_where_clause: bool = False, + ) -> ListPipelineJobsResponse: + """Search pipeline runs with advanced filtering capabilities. + + This is an enhanced version of `list()` that supports filtering by annotations. + + **Parameters:** + - **filters**: Search filters including annotation filters + - **page_token**: Pagination token from a previous search call + - **include_pipeline_names**: Whether to include pipeline names in responses + - **include_execution_stats**: Whether to include execution statistics + - **debug_where_clause**: If True, includes the compiled SQL WHERE clause in the response + + **Returns:** `ListPipelineJobsResponse` with matching pipeline runs. + + **Example 1:** Find runs that have an 'environment' annotation key + + ```python + search(session=session, filters=SearchFilters( + annotation_filters=FilterGroup( + filters=[ + KeyFilter( + operator=KeyFilterOperator.EQUALS, + key="environment" + ) + ] + ) + )) + ``` + + **Example 2:** Find runs where ANY annotation value equals 'production' + + ```python + search(session=session, filters=SearchFilters( + annotation_filters=FilterGroup( + filters=[ + ValueFilter( + operator=ValueFilterOperator.EQUALS, + value="production" + ) + ] + ) + )) + ``` + + **Example 3:** Find runs where ANY annotation value contains 'error' + + ```python + search(session=session, filters=SearchFilters( + annotation_filters=FilterGroup( + filters=[ + ValueFilter( + operator=ValueFilterOperator.CONTAINS, + value="error" + ) + ] + ) + )) + ``` + """ + page_token_dict = _decode_page_token(page_token) + offset = page_token_dict.get(OFFSET_KEY, 0) + + where_clauses = self._build_search_where_clauses(filters=filters) + + pipeline_runs = self._query_pipeline_runs( + session=session, + where_clauses=where_clauses, + offset=offset, + page_size=PAGE_SIZE, + ) + + next_page_offset = self._calculate_next_page_offset( + offset=offset, page_size=PAGE_SIZE + ) + next_page_token_dict = {OFFSET_KEY: next_page_offset} + next_page_token = self._get_next_page_token( + num_results=len(pipeline_runs), + page_size=PAGE_SIZE, + next_page_token_dict=next_page_token_dict, + ) + + response = self._build_list_response( + session=session, + pipeline_runs=pipeline_runs, + next_page_token=next_page_token, + include_pipeline_names=include_pipeline_names, + include_execution_stats=include_execution_stats, + ) + + if debug_where_clause: + response.debug_where_clause = _compile_where_clauses_to_string( + session=session, + where_clauses=where_clauses, + ) + + return response + # Note: This method must be last to not shadow the "list" type def list( self, *, session: orm.Session, page_token: str | None = None, - # page_size: int = 10, filter: str | None = None, current_user: str | None = None, include_pipeline_names: bool = False, include_execution_stats: bool = False, ) -> ListPipelineJobsResponse: page_token_dict = _decode_page_token(page_token) - OFFSET_KEY = "offset" offset = page_token_dict.get(OFFSET_KEY, 0) - page_size = 10 FILTER_KEY = "filter" if page_token: @@ -200,58 +727,30 @@ def list( where_clauses.append(bts.PipelineRun.created_by == None) else: raise NotImplementedError(f"Unsupported filter {filter}.") - pipeline_runs = list( - session.scalars( - sql.select(bts.PipelineRun) - .where(*where_clauses) - .order_by(bts.PipelineRun.created_at.desc()) - .offset(offset) - .limit(page_size) - ).all() + + pipeline_runs = self._query_pipeline_runs( + session=session, + where_clauses=where_clauses, + offset=offset, + page_size=PAGE_SIZE, + ) + + next_page_offset = self._calculate_next_page_offset( + offset=offset, page_size=PAGE_SIZE ) - next_page_offset = offset + page_size next_page_token_dict = {OFFSET_KEY: next_page_offset, FILTER_KEY: filter} - next_page_token = _encode_page_token(next_page_token_dict) - if len(pipeline_runs) < page_size: - next_page_token = None - - def create_pipeline_run_response( - pipeline_run: bts.PipelineRun, - ) -> PipelineRunResponse: - response = PipelineRunResponse.from_db(pipeline_run) - if include_pipeline_names: - pipeline_name = None - extra_data = pipeline_run.extra_data or {} - if self.PIPELINE_NAME_EXTRA_DATA_KEY in extra_data: - pipeline_name = extra_data[self.PIPELINE_NAME_EXTRA_DATA_KEY] - else: - execution_node = session.get( - bts.ExecutionNode, pipeline_run.root_execution_id - ) - if execution_node: - task_spec = structures.TaskSpec.from_json_dict( - execution_node.task_spec - ) - component_spec = task_spec.component_ref.spec - if component_spec: - pipeline_name = component_spec.name - response.pipeline_name = pipeline_name - if include_execution_stats: - execution_status_stats = self._calculate_execution_status_stats( - session=session, root_execution_id=pipeline_run.root_execution_id - ) - response.execution_status_stats = { - status.value: count - for status, count in execution_status_stats.items() - } - return response + next_page_token = self._get_next_page_token( + num_results=len(pipeline_runs), + page_size=PAGE_SIZE, + next_page_token_dict=next_page_token_dict, + ) - return ListPipelineJobsResponse( - pipeline_runs=[ - create_pipeline_run_response(pipeline_run) - for pipeline_run in pipeline_runs - ], + return self._build_list_response( + session=session, + pipeline_runs=pipeline_runs, next_page_token=next_page_token, + include_pipeline_names=include_pipeline_names, + include_execution_stats=include_execution_stats, ) def _calculate_execution_status_stats( diff --git a/cloud_pipelines_backend/backend_types_sql.py b/cloud_pipelines_backend/backend_types_sql.py index af16b3c..99e52e4 100644 --- a/cloud_pipelines_backend/backend_types_sql.py +++ b/cloud_pipelines_backend/backend_types_sql.py @@ -2,7 +2,7 @@ import datetime import enum import typing -from typing import Any +from typing import Any, Final import sqlalchemy as sql from sqlalchemy import orm @@ -466,6 +466,14 @@ class ContainerExecution(_TableBase): ) +PIPELINE_RUN_ANNOTATION_KEY_VALUE_INDEX_NAME: Final[str] = ( + "ix_pipeline_run_annotation_key_value" +) +PIPELINE_RUN_ANNOTATION_VALUE_INDEX_NAME: Final[str] = ( + "ix_pipeline_run_annotation_value" +) + + class PipelineRunAnnotation(_TableBase): __tablename__ = "pipeline_run_annotation" pipeline_run_id: orm.Mapped[IdType] = orm.mapped_column( @@ -476,3 +484,19 @@ class PipelineRunAnnotation(_TableBase): pipeline_run: orm.Mapped[PipelineRun] = orm.relationship(repr=False, init=False) key: orm.Mapped[str] = orm.mapped_column(default=None, primary_key=True) value: orm.Mapped[str | None] = orm.mapped_column(default=None) + + __table_args__ = ( + # Index for searching pipeline runs by annotation key/value + # Enables efficient queries like "find runs where key='environment' and value='production'" + sql.Index( + PIPELINE_RUN_ANNOTATION_KEY_VALUE_INDEX_NAME, + "key", + "value", + ), + # Index for searching pipeline runs by annotation value only (across all keys) + # Enables efficient queries like "find runs where any annotation value='production'" + sql.Index( + PIPELINE_RUN_ANNOTATION_VALUE_INDEX_NAME, + "value", + ), + ) diff --git a/cloud_pipelines_backend/database_ops.py b/cloud_pipelines_backend/database_ops.py index 3d94ed1..a84a1f3 100644 --- a/cloud_pipelines_backend/database_ops.py +++ b/cloud_pipelines_backend/database_ops.py @@ -77,3 +77,14 @@ def migrate_db(db_engine: sqlalchemy.Engine): for index in bts.ExecutionNode.__table__.indexes: if index.name == "ix_execution_node_container_execution_cache_key": index.create(db_engine, checkfirst=True) + + # TODO: I believe we should create an index for the Annotation key + # column and migrate existing data? + # + # Migration for annotation filtering feature + for index in bts.PipelineRunAnnotation.__table__.indexes: + if index.name in [ + bts.PIPELINE_RUN_ANNOTATION_KEY_VALUE_INDEX_NAME, + bts.PIPELINE_RUN_ANNOTATION_VALUE_INDEX_NAME, + ]: + index.create(db_engine, checkfirst=True) diff --git a/pyproject.toml b/pyproject.toml index 5f68f78..9363e1b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ dependencies = [ [dependency-groups] dev = [ + "black>=26.1.0", "pytest>=8.4.2", ] huggingface = [ @@ -23,3 +24,20 @@ huggingface = [ [tool.setuptools.packages.find] include = ["cloud_pipelines_backend*"] namespaces = true + +[tool.black] +# Usage: uv run black . +# +# TODO: Line length per Google Python Style Guide: +# https://google.github.io/styleguide/pyguide.html#32-line-length +# line-length = 80 +target-version = ["py310"] +include = '\.pyi?$' +exclude = ''' +/( + \.git + | \.venv + | build + | dist +)/ +''' diff --git a/tests/test_pipeline_run_search.py b/tests/test_pipeline_run_search.py new file mode 100644 index 0000000..df7e907 --- /dev/null +++ b/tests/test_pipeline_run_search.py @@ -0,0 +1,570 @@ +"""Tests for the pipeline run search functionality.""" + +from sqlalchemy import orm +import pytest + +from cloud_pipelines_backend import api_server_sql +from cloud_pipelines_backend import database_ops + + +def _initialize_db_and_get_session_factory(): + """Initialize an in-memory SQLite database and return a session factory.""" + db_engine = database_ops.create_db_engine_and_migrate_db(database_uri="sqlite://") + return lambda: orm.Session(bind=db_engine) + + +class TestPipelineRunSearch: + """Tests for PipelineRunsApiService_Sql.search()""" + + def test_search_with_no_filters(self): + """Test search with filters=None returns all pipeline runs.""" + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + response = service.search( + session=session, + filters=None, + debug_where_clause=True, + ) + + assert response.debug_where_clause == "(no where clauses)" + assert response.pipeline_runs == [] + + def test_search_with_key_exists_filter(self): + """Test search with KeyFilter EXISTS operator.""" + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.AND, + filters=[ + api_server_sql.KeyFilter( + operator=api_server_sql.KeyFilterOperator.EXISTS + ) + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + expected = ( + "EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + 'AND pipeline_run_annotation."key" IS NOT NULL)' + ) + assert response.debug_where_clause == expected + + def test_search_with_key_exists_no_group_operator_filter(self): + """Test search with KeyFilter EXISTS operator without specifying group operator. + + When operator is not specified in FilterGroup, it should default to AND logic. + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + # Note: operator is not specified, should default to AND + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + filters=[ + api_server_sql.KeyFilter( + operator=api_server_sql.KeyFilterOperator.EXISTS + ) + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + # Should produce the same result as test_search_with_key_exists_filter + expected = ( + "EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + 'AND pipeline_run_annotation."key" IS NOT NULL)' + ) + assert response.debug_where_clause == expected + + def test_search_with_key_contains_filter(self): + """Test search with KeyFilter CONTAINS operator.""" + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.AND, + filters=[ + api_server_sql.KeyFilter( + operator=api_server_sql.KeyFilterOperator.CONTAINS, + key="env", + ) + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + expected = ( + "EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND (pipeline_run_annotation.\"key\" LIKE '%' || 'env' || '%'))" + ) + assert response.debug_where_clause == expected + + def test_search_with_key_equals_filter(self): + """Test search with KeyFilter EQUALS operator.""" + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.AND, + filters=[ + api_server_sql.KeyFilter( + operator=api_server_sql.KeyFilterOperator.EQUALS, + key="environment", + ) + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + expected = ( + "EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.\"key\" = 'environment')" + ) + assert response.debug_where_clause == expected + + def test_search_with_key_equals_negate_filter(self): + """Test search with KeyFilter EQUALS operator with negate=True. + + With the merged approach, negate=True negates the condition WITHIN + the EXISTS (key != 'environment'), not the entire EXISTS. + + This finds runs where some annotation has a key that is NOT 'environment'. + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.AND, + filters=[ + api_server_sql.KeyFilter( + operator=api_server_sql.KeyFilterOperator.EQUALS, + key="environment", + negate=True, + ) + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + # negate=True produces: key != 'environment' (within the EXISTS) + expected = ( + "EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.\"key\" != 'environment')" + ) + assert response.debug_where_clause == expected + + def test_search_with_key_in_set_filter(self): + """Test search with KeyFilter IN_SET operator.""" + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.AND, + filters=[ + api_server_sql.KeyFilter( + operator=api_server_sql.KeyFilterOperator.IN_SET, + keys=["environment", "team"], + ) + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + expected = ( + "EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.\"key\" IN ('environment', 'team'))" + ) + assert response.debug_where_clause == expected + + def test_search_with_value_contains_filter(self): + """Test search with ValueFilter CONTAINS operator. + + Searches across ALL annotation values for substring match. + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.AND, + filters=[ + api_server_sql.ValueFilter( + operator=api_server_sql.ValueFilterOperator.CONTAINS, + value="prod", + ) + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + expected = ( + "EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND (pipeline_run_annotation.value LIKE '%' || 'prod' || '%'))" + ) + assert response.debug_where_clause == expected + + def test_search_with_value_equals_filter(self): + """Test search with ValueFilter EQUALS operator. + + Searches across ALL annotation values for exact match. + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.AND, + filters=[ + api_server_sql.ValueFilter( + operator=api_server_sql.ValueFilterOperator.EQUALS, + value="production", + ) + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + expected = ( + "EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.value = 'production')" + ) + assert response.debug_where_clause == expected + + def test_search_with_value_equals_negate_filter(self): + """Test search with ValueFilter EQUALS operator with negate=True. + + With the merged approach, negate=True negates the condition WITHIN + the EXISTS (value != 'production'), not the entire EXISTS. + + This finds runs where some annotation has a value that is NOT 'production'. + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.AND, + filters=[ + api_server_sql.ValueFilter( + operator=api_server_sql.ValueFilterOperator.EQUALS, + value="production", + negate=True, + ) + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + # negate=True produces: value != 'production' (within the EXISTS) + expected = ( + "EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.value != 'production')" + ) + assert response.debug_where_clause == expected + + def test_search_with_value_in_set_filter(self): + """Test search with ValueFilter IN_SET operator. + + Searches across ALL annotation values for set membership. + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.AND, + filters=[ + api_server_sql.ValueFilter( + operator=api_server_sql.ValueFilterOperator.IN_SET, + values=["backend", "frontend"], + ) + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + expected = ( + "EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + "AND pipeline_run_annotation.value IN ('backend', 'frontend'))" + ) + assert response.debug_where_clause == expected + + def test_search_with_key_in_set_and_value_contains_negate(self): + """Test search with KeyFilter IN_SET and ValueFilter CONTAINS (negate=True). + + This tests the "same row" semantics: find runs where some annotation + has a key in ['environment', 'team'] AND that same annotation's value + does NOT contain 'error'. + + Structure: + Group (AND): + ├── KeyFilter(IN_SET, keys=["environment", "team"]) + └── ValueFilter(CONTAINS, value="error", negate=True) + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.AND, + filters=[ + api_server_sql.KeyFilter( + operator=api_server_sql.KeyFilterOperator.IN_SET, + keys=["environment", "team"], + ), + api_server_sql.ValueFilter( + operator=api_server_sql.ValueFilterOperator.CONTAINS, + value="error", + negate=True, + ), + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + # Expected SQL structure: + # + # Single EXISTS with both conditions (same row semantics): + # ├── KeyFilter(IN_SET, keys=["environment", "team"]) + # │ → key IN ('environment', 'team') + # └── ValueFilter(CONTAINS, value="error", negate=True) + # → value NOT LIKE '%error%' + # + expected = ( + # ===== Single EXISTS (same row) ===== + "EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id " + # | + # |-- Condition 1: KeyFilter(IN_SET, keys=["environment", "team"]) + "AND pipeline_run_annotation.\"key\" IN ('environment', 'team') " + # | + # |-- AND (combining conditions in same row) + # | + # |-- Condition 2: ValueFilter(CONTAINS, value="error", negate=True) + "AND (pipeline_run_annotation.value NOT LIKE '%' || 'error' || '%'))" + # ===== End Single EXISTS ===== + ) + assert response.debug_where_clause == expected + + def test_search_with_complex_nested_filters(self): + """Test search with complex nested filter groups. + + With the merged approach: + - Each nested FilterGroup produces ONE EXISTS with merged conditions + - The root group combines the nested groups' EXISTS with OR + + Structure: + Root Group (OR): + ├── Group 1 (OR) → ONE EXISTS with OR-ed conditions + │ ├── KeyFilter(CONTAINS, key="env") + │ └── ValueFilter(EQUALS, value="admin", negate=True) + └── Group 2 (AND) → ONE EXISTS with AND-ed conditions + ├── KeyFilter(EXISTS, key="status", negate=True) + └── ValueFilter(IN_SET, values=["high", "critical"]) + """ + session_factory = _initialize_db_and_get_session_factory() + service = api_server_sql.PipelineRunsApiService_Sql() + + with session_factory() as session: + filters = api_server_sql.SearchFilters( + annotation_filters=api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.OR, + filters=[ + # Group 1 (OR) + api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.OR, + filters=[ + api_server_sql.KeyFilter( + operator=api_server_sql.KeyFilterOperator.CONTAINS, + key="env", + ), + api_server_sql.ValueFilter( + operator=api_server_sql.ValueFilterOperator.EQUALS, + value="admin", + negate=True, + ), + ], + ), + # Group 2 (AND) + api_server_sql.FilterGroup( + operator=api_server_sql.GroupOperator.AND, + filters=[ + api_server_sql.KeyFilter( + operator=api_server_sql.KeyFilterOperator.EXISTS, + key="status", + negate=True, + ), + api_server_sql.ValueFilter( + operator=api_server_sql.ValueFilterOperator.IN_SET, + values=["high", "critical"], + ), + ], + ), + ], + ) + ) + + response = service.search( + session=session, + filters=filters, + debug_where_clause=True, + ) + + # Expected SQL structure (merged approach): + # + # Root Group (OR): + # ├── Group 1 EXISTS (OR): + # │ └── (key LIKE '%env%') OR (value != 'admin') + # └── Group 2 EXISTS (AND): + # └── (key IS NULL) AND (value IN ('high', 'critical')) + # + expected = ( + # ===== Root Group (OR) ===== + # | + # |-- Group 1: ONE EXISTS with OR-ed conditions ---------------- + "(EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id AND " + # | | + # | |-- Combined OR condition (wrapped in parentheses) + # | |-- Condition 1: KeyFilter(CONTAINS, key="env") + "((pipeline_run_annotation.\"key\" LIKE '%' || 'env' || '%') " + # | | + # | OR (within same EXISTS) + "OR " + # | | + # | |-- Condition 2: ValueFilter(EQUALS, value="admin", negate=True) + "pipeline_run_annotation.value != 'admin'))) " + # | + # OR (Root level - between Group 1 and Group 2) + "OR " + # | + # |-- Group 2: ONE EXISTS with AND-ed conditions --------------- + "(EXISTS (SELECT pipeline_run_annotation.pipeline_run_id, " + 'pipeline_run_annotation."key", pipeline_run_annotation.value \n' + "FROM pipeline_run_annotation, pipeline_run \n" + "WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id AND " + # | | + # | |-- Condition 1: KeyFilter(EXISTS, negate=True) + "pipeline_run_annotation.\"key\" IS NULL " + # | | + # | AND (within same EXISTS) + "AND " + # | | + # | |-- Condition 2: ValueFilter(IN_SET, values=["high", "critical"]) + "pipeline_run_annotation.value IN ('high', 'critical')))" + # ===== End Root Group ===== + ) + assert response.debug_where_clause == expected + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])