Skip to content

Implement stats queueing using rabbitMQ#79

Open
Zingzy wants to merge 3 commits intomainfrom
decouple-analytics-using-rabbitmq
Open

Implement stats queueing using rabbitMQ#79
Zingzy wants to merge 3 commits intomainfrom
decouple-analytics-using-rabbitmq

Conversation

@Zingzy
Copy link
Copy Markdown
Member

@Zingzy Zingzy commented Jun 17, 2025

  • Introduced a new analytics worker to process click data asynchronously using RabbitMQ and MongoDB.
  • Added dependencies: aio-pika, loguru, pika, and aiormq for asynchronous messaging and logging.
  • Updated requirements.txt and pyproject.toml to include new packages and their versions.
  • Refactored the redirector to send click data to the analytics worker for processing.
  • Enhanced MongoDB interaction with asynchronous methods for improved performance.

Summary by Sourcery

Introduce an asynchronous analytics pipeline by offloading click processing to a RabbitMQ/MongoDB worker, refactoring the redirector to enqueue click events, and updating project dependencies accordingly.

New Features:

  • Add analytics worker modules and runner script to consume RabbitMQ messages and update MongoDB asynchronously.
  • Introduce a new health-check endpoint ("/ok") in the redirector blueprint.

Enhancements:

  • Refactor redirector to publish click data to a stats queue instead of performing inline aggregation.
  • Implement asynchronous MongoDB client for improved performance in analytics processing.

Build:

  • Update requirements.txt and pyproject.toml to include aio-pika, aiormq, pika, loguru and adjust package versions.

Chores:

  • Add start_worker.py script to bootstrap the analytics worker process.

- Introduced a new analytics worker to process click data asynchronously using RabbitMQ and MongoDB.
- Added dependencies: `aio-pika`, `loguru`, `pika`, and `aiormq` for asynchronous messaging and logging.
- Updated `requirements.txt` and `pyproject.toml` to include new packages and their versions.
- Refactored the redirector to send click data to the analytics worker for processing.
- Enhanced MongoDB interaction with asynchronous methods for improved performance.
@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai bot commented Jun 17, 2025

Reviewer's Guide

The PR introduces a RabbitMQ-based async analytics pipeline: refactors the redirector to enqueue click events, adds a standalone stats worker with async MongoDB updates, and updates dependencies for messaging and structured logging.

Sequence diagram for asynchronous click analytics processing

sequenceDiagram
    participant User as actor User
    participant Redirector as Redirector Service
    participant RabbitMQ as RabbitMQ Queue
    participant Worker as Analytics Worker
    participant MongoDB as MongoDB

    User->>Redirector: HTTP GET /<short_code>
    Redirector->>RabbitMQ: send_to_queue(click event)
    Note right of RabbitMQ: Event is queued asynchronously
    Worker-->>RabbitMQ: Consume click event
    Worker->>MongoDB: Update analytics stats (async)
    MongoDB-->>Worker: Ack update
    Worker-->>RabbitMQ: Ack message
    Redirector-->>User: HTTP Redirect
Loading

Class diagram for new analytics worker and publisher modules

classDiagram
    class stats_publisher {
        +send_to_queue(data)
    }
    class stats_worker {
        +StatsWorker()
    }
    class stats_handler {
        +handle_click_event(data)
    }
    class async_mongo {
        +init_async_db(uri, db_name)
        +get_async_db()
    }

    stats_worker --> stats_handler : uses
    stats_worker --> async_mongo : initializes
    stats_handler --> async_mongo : gets DB
    redirector ..> stats_publisher : uses
Loading

File-Level Changes

Change Details Files
Add asynchronous analytics pipeline with RabbitMQ and MongoDB
  • Created stats_handler to process click events and update MongoDB asynchronously
  • Implemented stats_worker to consume messages via aio-pika and dispatch to handler
  • Built async_mongo module for async DB initialization and retrieval
  • Added start_worker script to launch the analytics worker process
  • Provided stats_publisher to enqueue click events to the stats_queue
workers/stats_handler.py
workers/stats_worker.py
workers/async_mongo.py
start_worker.py
workers/stats_publisher.py
Refactor redirector to publish click events instead of inline analytics
  • Removed inline update logic and replaced with send_to_queue calls
  • Streamlined user-agent, bot detection, and emoji alias validation
  • Simplified click handling by building a message payload
  • Added a /ok health-check endpoint
blueprints/redirector.py
Update project dependencies for async messaging and logging
  • Added aio-pika, aiormq, pika, loguru, pamqp to requirements.txt and pyproject.toml
  • Adjusted versions of existing packages (e.g., requests)
  • Ensured new libraries are declared in both requirements.txt and pyproject.toml
requirements.txt
pyproject.toml

Possibly linked issues


Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @Zingzy - I've reviewed your changes - here's some feedback:

  • Avoid using pika.BlockingConnection inside the Flask redirector—switch to async aio-pika or maintain a persistent publisher connection to prevent blocking requests.
  • Centralize RabbitMQ connection parameters (URL, queue name) in configuration instead of hardcoding them across modules.
  • Implement graceful shutdown in StatsWorker to ensure in-flight messages are properly handled on termination.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Avoid using pika.BlockingConnection inside the Flask redirector—switch to async aio-pika or maintain a persistent publisher connection to prevent blocking requests.
- Centralize RabbitMQ connection parameters (URL, queue name) in configuration instead of hardcoding them across modules.
- Implement graceful shutdown in StatsWorker to ensure in-flight messages are properly handled on termination.

## Individual Comments

### Comment 1
<location> `blueprints/redirector.py:244` </location>
<code_context>
     )
+
+
+@url_redirector.route("/ok", methods=["GET"])
+def simple_redirect():
+    return "Ok", 200
</code_context>

<issue_to_address>
A new /ok endpoint was added.

If this is for health checks, evaluate if protection or rate limiting is needed to prevent misuse or excessive requests.

Suggested implementation:

```python
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

# Assuming 'app' is your Flask app instance and Limiter is initialized elsewhere,
# otherwise, you may need to initialize it here.

@url_redirector.route("/ok", methods=["GET"])
@limiter.limit("10 per minute")  # Adjust the rate as appropriate for your use case
def simple_redirect():
    return "Ok", 200

```

- Ensure that `limiter` is initialized and available in this module. If not, you may need to import it from where it's set up, or initialize it here with your Flask app.
- If you determine that authentication is also required, add the appropriate decorator (e.g., `@login_required`) above the route.
- Adjust the rate limit string ("10 per minute") as needed for your application's requirements.
</issue_to_address>

### Comment 2
<location> `workers/async_mongo.py:2` </location>
<code_context>
+import os
+from pymongo import AsyncMongoClient
+from dotenv import load_dotenv
+
</code_context>

<issue_to_address>
AsyncMongoClient is used, but this is not part of the official PyMongo API.

AsyncMongoClient is not available in PyMongo; use Motor's MotorClient for async operations, or ensure your custom implementation is reliable.
</issue_to_address>

### Comment 3
<location> `workers/stats_worker.py:31` </location>
<code_context>
+    logger.info("[*] Async MongoDB initialized.")
+
+    # Connect to RabbitMQ
+    connection = await aio_pika.connect_robust(RABBITMQ_URL)
+    channel = await connection.channel()
+    await channel.set_qos(prefetch_count=1)
</code_context>

<issue_to_address>
RabbitMQ connection uses hardcoded localhost URL.

Consider making the RabbitMQ URL configurable through environment variables or configuration files to improve deployment flexibility.
</issue_to_address>

<suggested_fix>
<<<<<<< SEARCH
RABBITMQ_URL = "amqp://localhost/"
=======
import os

RABBITMQ_URL = os.getenv("RABBITMQ_URL", "amqp://localhost/")
>>>>>>> REPLACE

</suggested_fix>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread blueprints/redirector.py Outdated
Comment thread workers/async_mongo.py
Comment thread workers/stats_worker.py
format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> | <level>{message}</level>",
)

RABBITMQ_URL = "amqp://localhost/"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: RabbitMQ connection uses hardcoded localhost URL.

Consider making the RabbitMQ URL configurable through environment variables or configuration files to improve deployment flexibility.

Suggested change
RABBITMQ_URL = "amqp://localhost/"
import os
RABBITMQ_URL = os.getenv("RABBITMQ_URL", "amqp://localhost/")

Comment thread blueprints/redirector.py
@url_redirector.route("/<short_code>", methods=["GET"])
@limiter.exempt
def redirect_url(short_code):
def redirect_url(short_code: str):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): We've found these issues:


Explanation

The quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.

How can you solve this?

It might be worth refactoring this function to make it shorter and more readable.

  • Reduce the function length by extracting pieces of functionality out into
    their own functions. This is the most important thing you can do - ideally a
    function should be less than 10 lines.
  • Reduce nesting, perhaps by introducing guard clauses to return early.
  • Ensure that variables are tightly scoped, so that code using related concepts
    sits together within the function rather than being scattered.

Comment thread workers/stats_handler.py Outdated
Comment thread blueprints/redirector.py
Comment on lines +196 to +197
if request.method == "HEAD":
pass
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The route decorator only specifies GET methods (@url_redirector.route("/<short_code>", methods=["GET"])), but there's a conditional check for HEAD requests that does nothing. Either:

  1. Update the route decorator to include HEAD requests:

    @url_redirector.route("/<short_code>", methods=["GET", "HEAD"])
  2. Or remove the conditional check since HEAD requests won't reach this handler with the current configuration.

This would ensure the route handling is consistent with the declared HTTP methods.

Suggested change
if request.method == "HEAD":
pass
# HEAD requests are handled automatically by Flask for GET routes
# No special handling needed

Spotted by Diamond

Is this helpful? React 👍 or 👎 to let us know.

@Zingzy Zingzy changed the title Add analytics worker and update dependencies Integrate stats queueing using rabbitMQ Jun 17, 2025
@Zingzy Zingzy changed the title Integrate stats queueing using rabbitMQ Implement stats queueing using rabbitMQ Jun 17, 2025
@Zingzy Zingzy moved this to 🏗️ In Progress in spoo.me Development Roadmap Jun 17, 2025
@Zingzy Zingzy moved this from 🏗️ In Progress to 🔮Future Plans in spoo.me Development Roadmap Sep 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backend Changes related to Backand/API database optimization

Projects

Status: 🔮Future Plans

Development

Successfully merging this pull request may close these issues.

1 participant