Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Rename RETRYING to RETRY_PENDING and remove COMPLETED status.

Revision ID: 714c920ab85b
Revises: f396ea3231a9
Create Date: 2026-02-21 15:20:31.544304

"""

from typing import Sequence, Union

from alembic import op
from alembic_postgresql_enum import TableReference

# revision identifiers, used by Alembic.
revision: str = "714c920ab85b"
down_revision: Union[str, Sequence[str], None] = "f396ea3231a9"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.sync_enum_values(
enum_schema="public",
enum_name="review_status_enum",
new_values=[
"PENDING",
"PROCESSING",
"RETRY_PENDING",
"PUBLISHED",
"FAILED",
],
affected_columns=[
TableReference(
table_schema="public",
table_name="review_requests",
column_name="status",
)
],
enum_values_to_rename=[("RETRYING", "RETRY_PENDING")],
)
# ### end Alembic commands ###


def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.sync_enum_values(
enum_schema="public",
enum_name="review_status_enum",
new_values=[
"PENDING",
"PROCESSING",
"COMPLETED",
"PUBLISHED",
"FAILED",
"RETRYING",
],
affected_columns=[
TableReference(
table_schema="public",
table_name="review_requests",
column_name="status",
)
],
enum_values_to_rename=[("RETRY_PENDING", "RETRYING")],
)
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Drop task_id.

Revision ID: f396ea3231a9
Revises: 30a5eeb5f400
Create Date: 2026-02-21 07:27:49.712489

"""

from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op
from alembic_postgresql_enum import TableReference

# revision identifiers, used by Alembic.
revision: str = "f396ea3231a9"
down_revision: Union[str, Sequence[str], None] = "30a5eeb5f400"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.sync_enum_values(
enum_schema="public",
enum_name="review_status_enum",
new_values=[
"PENDING",
"PROCESSING",
"COMPLETED",
"PUBLISHED",
"FAILED",
"RETRYING",
],
affected_columns=[
TableReference(
table_schema="public",
table_name="review_requests",
column_name="status",
)
],
enum_values_to_rename=[],
)
op.drop_column("review_requests", "task_id")
# ### end Alembic commands ###


def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"review_requests",
sa.Column("task_id", sa.VARCHAR(), autoincrement=False, nullable=True),
)
op.sync_enum_values(
enum_schema="public",
enum_name="review_status_enum",
new_values=["PENDING", "PROCESSING", "COMPLETED", "PUBLISHED", "FAILED"],
affected_columns=[
TableReference(
table_schema="public",
table_name="review_requests",
column_name="status",
)
],
enum_values_to_rename=[],
)
# ### end Alembic commands ###
1 change: 0 additions & 1 deletion services/reviewhelper-api/app/database/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
2 changes: 0 additions & 2 deletions services/reviewhelper-api/app/database/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ class ReviewRequest(Base):
error: Mapped[str | None] = mapped_column(Text)
summary: Mapped[str | None] = mapped_column(Text)

task_id: Mapped[str | None]

# Relationships
comments: Mapped[list["GeneratedComment"]] = relationship(
"GeneratedComment",
Expand Down
2 changes: 1 addition & 1 deletion services/reviewhelper-api/app/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Platform(str, Enum):
class ReviewStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
RETRY_PENDING = "retry_pending"
PUBLISHED = "published"
FAILED = "failed"

Expand Down
11 changes: 5 additions & 6 deletions services/reviewhelper-api/app/review_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ def get_code_review_tool():
return CodeReviewTool.create()


async def process_review(review_request: ReviewRequest) -> list[GeneratedComment]:
async def process_review(
review_request: ReviewRequest,
) -> tuple[list[GeneratedComment], str, dict]:
"""Process a review request and generate comments.

Args:
review_request: The review request to process.

Returns:
The generated comments from the review processing.
A tuple of (generated comments, patch summary, review details).
"""
logger.info(
"Processing review request %s for platform %s",
Expand Down Expand Up @@ -62,10 +64,7 @@ async def process_review(review_request: ReviewRequest) -> list[GeneratedComment
for comment in result.review_comments
]

review_request.summary = result.patch_summary
review_request.details = result.details

return generated_comments
return generated_comments, result.patch_summary, result.details


def submit_review_to_platform(
Expand Down
2 changes: 2 additions & 0 deletions services/reviewhelper-api/app/routers/feedback.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ async def submit_feedback(

await db.execute(stmt)

await db.commit()

return JSONResponse(
{
"message": (
Expand Down
108 changes: 71 additions & 37 deletions services/reviewhelper-api/app/routers/internal.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import logging
from datetime import UTC, datetime, timedelta
from typing import Annotated

from fastapi import APIRouter, Depends, Header, Response, status
from sqlalchemy import select
from fastapi import APIRouter, Depends, Response, status
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession

from app.auth import verify_internal_api_key
Expand All @@ -20,11 +21,37 @@
dependencies=[Depends(verify_internal_api_key)],
)

RECOVERY_BACKOFF_DELAY = timedelta(minutes=30)


async def claim_review_request(db: AsyncSession, review_request: ReviewRequest) -> bool:
"""Claim a review request for processing.

Uses optimistic concurrency: succeeds only if the row's status and
updated_at haven't changed since we last read them.
"""
stmt = (
update(ReviewRequest)
.where(
ReviewRequest.id == review_request.id,
ReviewRequest.status == review_request.status,
ReviewRequest.updated_at == review_request.updated_at,
)
.values(status=ReviewStatus.PROCESSING)
)
result = await db.execute(stmt)
await db.commit()

if result.rowcount == 0:
return False

await db.refresh(review_request)
return True


@router.get("/process/{review_request_id}")
async def process_review_request(
review_request_id: int,
x_cloudtasks_taskname: Annotated[str, Header()],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Process a review request (called by Cloud Tasks).
Expand All @@ -40,26 +67,6 @@ async def process_review_request(
logger.error("Review request %s not found", review_request_id)
return Response(status_code=status.HTTP_204_NO_CONTENT)

if not review_request.task_id:
# We need to tolerate the possibility of delay or failures before
# committing the transaction that saves the task ID.
logger.info(
"Assigning task ID %s to review request %s",
x_cloudtasks_taskname,
review_request_id,
)
review_request.task_id = x_cloudtasks_taskname

# Ensure the request is being processed by the correct task
if review_request.task_id != x_cloudtasks_taskname:
logger.error(
"Mismatched task ID for review request %s: expected %s, got %s",
review_request_id,
review_request.task_id,
x_cloudtasks_taskname,
)
return Response(status_code=status.HTTP_204_NO_CONTENT)

if review_request.status.is_final:
logger.error(
"Review request %s is already in final state: %s",
Expand All @@ -68,30 +75,56 @@ async def process_review_request(
)
return Response(status_code=status.HTTP_204_NO_CONTENT)

if review_request.status == ReviewStatus.COMPLETED:
# If we already have the generated comments from a previous processing
# attempt, we can directly submit the review without re-processing it.
generated_comments = await review_request.awaitable_attrs.comments
else:
review_request.status = ReviewStatus.PROCESSING
await db.commit()
# If another worker is actively processing, return 509 so Cloud Tasks retries.
# If it's been stuck in PROCESSING for too long, the worker likely crashed —
# fall through and try to reclaim it.
if review_request.status == ReviewStatus.PROCESSING:
age = datetime.now(UTC) - review_request.updated_at
if age < RECOVERY_BACKOFF_DELAY:
logger.warning(
"Review request %s is in PROCESSING state and was updated %s ago",
review_request_id,
age,
)
return Response(status_code=status.HTTP_409_CONFLICT)

if not await claim_review_request(db, review_request):
logger.warning(
"Review request %s was modified concurrently, likely claimed by another worker",
review_request_id,
)
return Response(status_code=status.HTTP_409_CONFLICT)

if review_request.summary:
logger.info(
"Review request %s already has a summary, skipping AI processing",
review_request_id,
)
comments = await review_request.awaitable_attrs.comments
else:
try:
generated_comments = await process_review(review_request)
comments, patch_summary, details = await process_review(review_request)
except LargeDiffError:
review_request.status = ReviewStatus.FAILED
review_request.error = (
"The diff size exceeds the current processing limits."
)
review_request.status = ReviewStatus.FAILED
await db.commit()
# We return 204 here to avoid triggering retries, since this is a
# permanent failure that won't be resolved by retrying.
return Response(status_code=status.HTTP_204_NO_CONTENT)

db.add_all(generated_comments)

review_request.status = ReviewStatus.COMPLETED
except Exception:
review_request.status = ReviewStatus.RETRY_PENDING
await db.commit()
raise

db.add_all(comments)
review_request.summary = patch_summary
review_request.details = details
await db.commit()

for generated_comment, inline_comment_id in submit_review_to_platform(
review_request, generated_comments
review_request, comments
):
# We need to commit after each comment is submitted to ensure that the
# platform_comment_id is saved to the database, which allows the
Expand All @@ -100,3 +133,4 @@ async def process_review_request(
await db.commit()

review_request.status = ReviewStatus.PUBLISHED
await db.commit()
6 changes: 3 additions & 3 deletions services/reviewhelper-api/app/routers/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async def create_or_get_review_request(
await db.commit()

# Queue task for processing
review_request.task_id = await create_review_task(review_request.id)
await create_review_task(review_request.id)

return JSONResponse(
{
Expand All @@ -95,8 +95,8 @@ def _build_response_message(review_request: ReviewRequest) -> str:
if review_request.status == ReviewStatus.PROCESSING:
return f"The review for Diff {review_request.diff_id} is currently being processed. Review Helper will comment once it's done."

if review_request.status == ReviewStatus.COMPLETED:
return f"The review for Diff {review_request.diff_id} has been completed successfully. Review Helper is ready to post its review."
if review_request.status == ReviewStatus.RETRY_PENDING:
return f"The review for Diff {review_request.diff_id} encountered an issue and will be scheduled for retry."

if review_request.status == ReviewStatus.PUBLISHED:
return f"Review Helper already posted its review for Diff {review_request.diff_id}."
Expand Down
Loading