From 75bd574fa35679dfe54b4a31b0420b37c859b51d Mon Sep 17 00:00:00 2001 From: Suhaib Mujahid Date: Sat, 21 Feb 2026 15:43:10 -0500 Subject: [PATCH 1/2] Add duplicate processing avoidance with optimistic concurrency --- ...b_rename_retrying_to_retry_pending_and_.py | 69 +++++++++++ .../versions/f396ea3231a9_drop_task_id.py | 69 +++++++++++ .../app/database/connection.py | 1 - .../reviewhelper-api/app/database/models.py | 2 - services/reviewhelper-api/app/enums.py | 2 +- .../reviewhelper-api/app/review_processor.py | 11 +- .../reviewhelper-api/app/routers/feedback.py | 2 + .../reviewhelper-api/app/routers/internal.py | 108 ++++++++++++------ .../reviewhelper-api/app/routers/request.py | 6 +- services/reviewhelper-api/app/tasks.py | 11 +- 10 files changed, 223 insertions(+), 58 deletions(-) create mode 100644 services/reviewhelper-api/alembic/versions/714c920ab85b_rename_retrying_to_retry_pending_and_.py create mode 100644 services/reviewhelper-api/alembic/versions/f396ea3231a9_drop_task_id.py diff --git a/services/reviewhelper-api/alembic/versions/714c920ab85b_rename_retrying_to_retry_pending_and_.py b/services/reviewhelper-api/alembic/versions/714c920ab85b_rename_retrying_to_retry_pending_and_.py new file mode 100644 index 0000000000..ee0b232327 --- /dev/null +++ b/services/reviewhelper-api/alembic/versions/714c920ab85b_rename_retrying_to_retry_pending_and_.py @@ -0,0 +1,69 @@ +"""Rename RETRYING to RETRY_PENDING and remove COMPLETED status. + +Revision ID: 714c920ab85b +Revises: f396ea3231a9 +Create Date: 2026-02-21 15:20:31.544304 + +""" + +from typing import Sequence, Union + +from alembic import op +from alembic_postgresql_enum import TableReference + +# revision identifiers, used by Alembic. +revision: str = "714c920ab85b" +down_revision: Union[str, Sequence[str], None] = "f396ea3231a9" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.sync_enum_values( + enum_schema="public", + enum_name="review_status_enum", + new_values=[ + "PENDING", + "PROCESSING", + "RETRY_PENDING", + "PUBLISHED", + "FAILED", + ], + affected_columns=[ + TableReference( + table_schema="public", + table_name="review_requests", + column_name="status", + ) + ], + enum_values_to_rename=[("RETRYING", "RETRY_PENDING")], + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.sync_enum_values( + enum_schema="public", + enum_name="review_status_enum", + new_values=[ + "PENDING", + "PROCESSING", + "COMPLETED", + "PUBLISHED", + "FAILED", + "RETRYING", + ], + affected_columns=[ + TableReference( + table_schema="public", + table_name="review_requests", + column_name="status", + ) + ], + enum_values_to_rename=[("RETRY_PENDING", "RETRYING")], + ) + # ### end Alembic commands ### diff --git a/services/reviewhelper-api/alembic/versions/f396ea3231a9_drop_task_id.py b/services/reviewhelper-api/alembic/versions/f396ea3231a9_drop_task_id.py new file mode 100644 index 0000000000..623b09d0c4 --- /dev/null +++ b/services/reviewhelper-api/alembic/versions/f396ea3231a9_drop_task_id.py @@ -0,0 +1,69 @@ +"""Drop task_id. + +Revision ID: f396ea3231a9 +Revises: 30a5eeb5f400 +Create Date: 2026-02-21 07:27:49.712489 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from alembic_postgresql_enum import TableReference + +# revision identifiers, used by Alembic. +revision: str = "f396ea3231a9" +down_revision: Union[str, Sequence[str], None] = "30a5eeb5f400" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.sync_enum_values( + enum_schema="public", + enum_name="review_status_enum", + new_values=[ + "PENDING", + "PROCESSING", + "COMPLETED", + "PUBLISHED", + "FAILED", + "RETRYING", + ], + affected_columns=[ + TableReference( + table_schema="public", + table_name="review_requests", + column_name="status", + ) + ], + enum_values_to_rename=[], + ) + op.drop_column("review_requests", "task_id") + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.add_column( + "review_requests", + sa.Column("task_id", sa.VARCHAR(), autoincrement=False, nullable=True), + ) + op.sync_enum_values( + enum_schema="public", + enum_name="review_status_enum", + new_values=["PENDING", "PROCESSING", "COMPLETED", "PUBLISHED", "FAILED"], + affected_columns=[ + TableReference( + table_schema="public", + table_name="review_requests", + column_name="status", + ) + ], + enum_values_to_rename=[], + ) + # ### end Alembic commands ### diff --git a/services/reviewhelper-api/app/database/connection.py b/services/reviewhelper-api/app/database/connection.py index dd2caa93a4..9a3ca52168 100644 --- a/services/reviewhelper-api/app/database/connection.py +++ b/services/reviewhelper-api/app/database/connection.py @@ -60,7 +60,6 @@ async def get_db() -> AsyncGenerator[AsyncSession, None]: async with async_session_maker() as session: try: yield session - await session.commit() except Exception: await session.rollback() raise diff --git a/services/reviewhelper-api/app/database/models.py b/services/reviewhelper-api/app/database/models.py index 68ad175355..702ba3c07b 100644 --- a/services/reviewhelper-api/app/database/models.py +++ b/services/reviewhelper-api/app/database/models.py @@ -73,8 +73,6 @@ class ReviewRequest(Base): error: Mapped[str | None] = mapped_column(Text) summary: Mapped[str | None] = mapped_column(Text) - task_id: Mapped[str | None] - # Relationships comments: Mapped[list["GeneratedComment"]] = relationship( "GeneratedComment", diff --git a/services/reviewhelper-api/app/enums.py b/services/reviewhelper-api/app/enums.py index c2b891c3de..d392dccdb5 100644 --- a/services/reviewhelper-api/app/enums.py +++ b/services/reviewhelper-api/app/enums.py @@ -9,7 +9,7 @@ class Platform(str, Enum): class ReviewStatus(str, Enum): PENDING = "pending" PROCESSING = "processing" - COMPLETED = "completed" + RETRY_PENDING = "retry_pending" PUBLISHED = "published" FAILED = "failed" diff --git a/services/reviewhelper-api/app/review_processor.py b/services/reviewhelper-api/app/review_processor.py index d389e8b9c8..b9beac3129 100644 --- a/services/reviewhelper-api/app/review_processor.py +++ b/services/reviewhelper-api/app/review_processor.py @@ -23,14 +23,16 @@ def get_code_review_tool(): return CodeReviewTool.create() -async def process_review(review_request: ReviewRequest) -> list[GeneratedComment]: +async def process_review( + review_request: ReviewRequest, +) -> tuple[list[GeneratedComment], str, dict]: """Process a review request and generate comments. Args: review_request: The review request to process. Returns: - The generated comments from the review processing. + A tuple of (generated comments, patch summary, review details). """ logger.info( "Processing review request %s for platform %s", @@ -62,10 +64,7 @@ async def process_review(review_request: ReviewRequest) -> list[GeneratedComment for comment in result.review_comments ] - review_request.summary = result.patch_summary - review_request.details = result.details - - return generated_comments + return generated_comments, result.patch_summary, result.details def submit_review_to_platform( diff --git a/services/reviewhelper-api/app/routers/feedback.py b/services/reviewhelper-api/app/routers/feedback.py index 726d0c4f5f..9200ffd22c 100644 --- a/services/reviewhelper-api/app/routers/feedback.py +++ b/services/reviewhelper-api/app/routers/feedback.py @@ -85,6 +85,8 @@ async def submit_feedback( await db.execute(stmt) + await db.commit() + return JSONResponse( { "message": ( diff --git a/services/reviewhelper-api/app/routers/internal.py b/services/reviewhelper-api/app/routers/internal.py index 3cf170c7ab..232135d506 100644 --- a/services/reviewhelper-api/app/routers/internal.py +++ b/services/reviewhelper-api/app/routers/internal.py @@ -1,8 +1,9 @@ import logging +from datetime import UTC, datetime, timedelta from typing import Annotated -from fastapi import APIRouter, Depends, Header, Response, status -from sqlalchemy import select +from fastapi import APIRouter, Depends, Response, status +from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession from app.auth import verify_internal_api_key @@ -20,11 +21,37 @@ dependencies=[Depends(verify_internal_api_key)], ) +RECOVERY_BACKOFF_DELAY = timedelta(minutes=30) + + +async def claim_review_request(db: AsyncSession, review_request: ReviewRequest) -> bool: + """Claim a review request for processing. + + Uses optimistic concurrency: succeeds only if the row's status and + updated_at haven't changed since we last read them. + """ + stmt = ( + update(ReviewRequest) + .where( + ReviewRequest.id == review_request.id, + ReviewRequest.status == review_request.status, + ReviewRequest.updated_at == review_request.updated_at, + ) + .values(status=ReviewStatus.PROCESSING) + ) + result = await db.execute(stmt) + await db.commit() + + if result.rowcount == 0: + return False + + await db.refresh(review_request) + return True + @router.get("/process/{review_request_id}") async def process_review_request( review_request_id: int, - x_cloudtasks_taskname: Annotated[str, Header()], db: Annotated[AsyncSession, Depends(get_db)], ): """Process a review request (called by Cloud Tasks). @@ -40,26 +67,6 @@ async def process_review_request( logger.error("Review request %s not found", review_request_id) return Response(status_code=status.HTTP_204_NO_CONTENT) - if not review_request.task_id: - # We need to tolerate the possibility of delay or failures before - # committing the transaction that saves the task ID. - logger.info( - "Assigning task ID %s to review request %s", - x_cloudtasks_taskname, - review_request_id, - ) - review_request.task_id = x_cloudtasks_taskname - - # Ensure the request is being processed by the correct task - if review_request.task_id != x_cloudtasks_taskname: - logger.error( - "Mismatched task ID for review request %s: expected %s, got %s", - review_request_id, - review_request.task_id, - x_cloudtasks_taskname, - ) - return Response(status_code=status.HTTP_204_NO_CONTENT) - if review_request.status.is_final: logger.error( "Review request %s is already in final state: %s", @@ -68,30 +75,56 @@ async def process_review_request( ) return Response(status_code=status.HTTP_204_NO_CONTENT) - if review_request.status == ReviewStatus.COMPLETED: - # If we already have the generated comments from a previous processing - # attempt, we can directly submit the review without re-processing it. - generated_comments = await review_request.awaitable_attrs.comments - else: - review_request.status = ReviewStatus.PROCESSING - await db.commit() + # If another worker is actively processing, return 509 so Cloud Tasks retries. + # If it's been stuck in PROCESSING for too long, the worker likely crashed — + # fall through and try to reclaim it. + if review_request.status == ReviewStatus.PROCESSING: + age = datetime.now(UTC) - review_request.updated_at + if age < RECOVERY_BACKOFF_DELAY: + logger.warning( + "Review request %s is in PROCESSING state and was updated %s ago", + review_request_id, + age, + ) + return Response(status_code=status.HTTP_509_SERVICE_UNAVAILABLE) + + if not await claim_review_request(db, review_request): + logger.warning( + "Review request %s was modified concurrently, likely claimed by another worker", + review_request_id, + ) + return Response(status_code=status.HTTP_509_SERVICE_UNAVAILABLE) + if review_request.summary: + logger.info( + "Review request %s already has a summary, skipping AI processing", + review_request_id, + ) + comments = await review_request.awaitable_attrs.comments + else: try: - generated_comments = await process_review(review_request) + comments, patch_summary, details = await process_review(review_request) except LargeDiffError: - review_request.status = ReviewStatus.FAILED review_request.error = ( "The diff size exceeds the current processing limits." ) + review_request.status = ReviewStatus.FAILED + await db.commit() + # We return 204 here to avoid triggering retries, since this is a + # permanent failure that won't be resolved by retrying. return Response(status_code=status.HTTP_204_NO_CONTENT) - - db.add_all(generated_comments) - - review_request.status = ReviewStatus.COMPLETED + except Exception: + review_request.status = ReviewStatus.RETRY_PENDING + await db.commit() + raise + + db.add_all(comments) + review_request.summary = patch_summary + review_request.details = details await db.commit() for generated_comment, inline_comment_id in submit_review_to_platform( - review_request, generated_comments + review_request, comments ): # We need to commit after each comment is submitted to ensure that the # platform_comment_id is saved to the database, which allows the @@ -100,3 +133,4 @@ async def process_review_request( await db.commit() review_request.status = ReviewStatus.PUBLISHED + await db.commit() diff --git a/services/reviewhelper-api/app/routers/request.py b/services/reviewhelper-api/app/routers/request.py index 33674fa629..de29c9085a 100644 --- a/services/reviewhelper-api/app/routers/request.py +++ b/services/reviewhelper-api/app/routers/request.py @@ -76,7 +76,7 @@ async def create_or_get_review_request( await db.commit() # Queue task for processing - review_request.task_id = await create_review_task(review_request.id) + await create_review_task(review_request.id) return JSONResponse( { @@ -95,8 +95,8 @@ def _build_response_message(review_request: ReviewRequest) -> str: if review_request.status == ReviewStatus.PROCESSING: return f"The review for Diff {review_request.diff_id} is currently being processed. Review Helper will comment once it's done." - if review_request.status == ReviewStatus.COMPLETED: - return f"The review for Diff {review_request.diff_id} has been completed successfully. Review Helper is ready to post its review." + if review_request.status == ReviewStatus.RETRY_PENDING: + return f"The review for Diff {review_request.diff_id} encountered an issue and will be scheduled for retry." if review_request.status == ReviewStatus.PUBLISHED: return f"Review Helper already posted its review for Diff {review_request.diff_id}." diff --git a/services/reviewhelper-api/app/tasks.py b/services/reviewhelper-api/app/tasks.py index 6db665b9c7..19b691da2a 100644 --- a/services/reviewhelper-api/app/tasks.py +++ b/services/reviewhelper-api/app/tasks.py @@ -7,6 +7,7 @@ HttpRequest, Task, ) +from google.protobuf.duration_pb2 import Duration from app.config import settings @@ -18,14 +19,11 @@ def _get_tasks_client(): return CloudTasksAsyncClient() -async def create_review_task(review_request_id: int) -> str: +async def create_review_task(review_request_id: int): """Create a Cloud Task to process a review request. Args: review_request_id: The ID of the review request to process. - - Returns: - The name of the created task. """ client = _get_tasks_client() @@ -45,13 +43,10 @@ async def create_review_task(review_request_id: int) -> str: "Authorization": f"Bearer {settings.internal_api_key}", }, ), + dispatch_deadline=Duration(seconds=30 * 60), ) response = await client.create_task(parent=parent, task=task) logger.info( "Created task %s for review request %s", response.name, review_request_id ) - - task_id = response.name.split("/")[-1] - - return task_id From faa666f54c4d1ab97f8b72d8023e9b5e7c3be530 Mon Sep 17 00:00:00 2001 From: Suhaib Mujahid Date: Sat, 21 Feb 2026 20:22:28 -0500 Subject: [PATCH 2/2] Use 409 Conflict for concurrent claim errors --- services/reviewhelper-api/app/routers/internal.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/reviewhelper-api/app/routers/internal.py b/services/reviewhelper-api/app/routers/internal.py index 232135d506..e132ca9c93 100644 --- a/services/reviewhelper-api/app/routers/internal.py +++ b/services/reviewhelper-api/app/routers/internal.py @@ -86,14 +86,14 @@ async def process_review_request( review_request_id, age, ) - return Response(status_code=status.HTTP_509_SERVICE_UNAVAILABLE) + return Response(status_code=status.HTTP_409_CONFLICT) if not await claim_review_request(db, review_request): logger.warning( "Review request %s was modified concurrently, likely claimed by another worker", review_request_id, ) - return Response(status_code=status.HTTP_509_SERVICE_UNAVAILABLE) + return Response(status_code=status.HTTP_409_CONFLICT) if review_request.summary: logger.info(