Conversation
There was a problem hiding this comment.
Greptile Overview
Greptile Summary
This PR integrates YOLO-based fruit segmentation with Flink streaming and adds a fruit ripeness alerting system.
Major Changes:
- Added new
FruitSegmentationRunnerusing YOLOv8 to detect and segment fruits from camera images, uploading crops to MinIO - Created
fruit_ripeness_alertservice that queries weekly ripeness rollups and sends Kafka alerts when thresholds are exceeded - Added database tables
ripeness_predictionsandripeness_weekly_rollups_tsto track fruit ripeness over time - Added new API endpoint
/api/tables/ripeness_weekly_rollups_tsfor querying aggregated ripeness data - Created
camera-inference-httpservice andflink-dispatcher-camerato process camera imagery through the ML pipeline - Configured MinIO event notifications to trigger Kafka events for different image types (aerial, fruits, leaves, etc.)
- Added
image-linkerFlink job infrastructure for linking related images
Critical Issues:
repo.pycalls undefinedparse_ts()function causing immediate runtime failurerouter.pyreferences undefinedRipenessWeeklyRollupOutschema- Security concern: disabled torch
weights_onlycheck allowing arbitrary code execution from pickled models - Duplicate import statements in
app.py
Confidence Score: 1/5
- This PR has critical bugs that will cause runtime failures and cannot be merged safely
- Score of 1 reflects multiple runtime-breaking bugs: undefined
parse_ts()function in repo.py will cause NameError when querying rollups, undefinedRipenessWeeklyRollupOutschema in router.py will cause immediate import failures. The DB API endpoints are non-functional, breaking the entire alerting pipeline. Additionally, there's a security vulnerability from disabling torch safety checks. - Critical attention required for
services/db_api_service/app/tables/ripeness_weekly_rollups_ts/repo.pyandservices/db_api_service/app/tables/ripeness_weekly_rollups_ts/router.py- both have blocking runtime errors. Security review needed forservices/inference_http/adapters/fruit_segmentation_runner.py
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| services/fruit_ripeness_alert/app.py | 2/5 | New alerting service with duplicate imports; fetches ripeness rollups and sends Kafka alerts when threshold exceeded |
| services/inference_http/adapters/fruit_segmentation_runner.py | 1/5 | New YOLO-based fruit segmentation runner with security risk (disabled torch weights_only check); detects and crops fruits from images |
| services/db_api_service/app/tables/ripeness_weekly_rollups_ts/repo.py | 0/5 | Database repository with critical bug: calls undefined parse_ts function causing NameError at runtime |
| services/db_api_service/app/tables/ripeness_weekly_rollups_ts/router.py | 1/5 | FastAPI router with undefined schema reference RipenessWeeklyRollupOut causing runtime error |
| docker-compose.yml | 3/5 | Major infrastructure changes: added camera-inference-http, flink-dispatcher-camera, MinIO event notifiers, and image-linker services |
Sequence Diagram
sequenceDiagram
participant Camera as Camera/Device
participant MinIO as MinIO Storage
participant Kafka as Kafka
participant Flink as Flink Dispatcher
participant InferenceHTTP as Inference HTTP (YOLO)
participant FlinkWriter as Flink Writer
participant PostgreSQL as PostgreSQL
participant DBAPI as DB API Service
participant AlertService as Fruit Ripeness Alert
participant AlertConsumer as Alert Consumer
Camera->>MinIO: Upload image to imagery-hot bucket
MinIO->>Kafka: Publish event to imagery.new.camera topic
Kafka->>Flink: Consume image event
Flink->>InferenceHTTP: HTTP POST /infer_json with bucket/key
InferenceHTTP->>MinIO: Download image from bucket
InferenceHTTP->>InferenceHTTP: Run YOLO model for fruit segmentation
InferenceHTTP->>MinIO: Upload cropped fruit segments
InferenceHTTP->>Flink: Return detection results (count, latency)
Flink->>FlinkWriter: Forward inference results
FlinkWriter->>PostgreSQL: Write to ripeness_predictions table
FlinkWriter->>PostgreSQL: Aggregate to ripeness_weekly_rollups_ts
Note over AlertService: Scheduled job runs periodically
AlertService->>DBAPI: GET /api/tables/task_thresholds
DBAPI->>PostgreSQL: Query thresholds
PostgreSQL->>DBAPI: Return threshold (e.g., 0.8)
DBAPI->>AlertService: Return threshold
AlertService->>DBAPI: GET /api/tables/ripeness_weekly_rollups_ts
DBAPI->>PostgreSQL: Query rollups for time window
PostgreSQL->>DBAPI: Return rollup data
DBAPI->>AlertService: Return rollups with pct_ripe
AlertService->>AlertService: Check if pct_ripe >= threshold
AlertService->>Kafka: Publish alert to alerts topic
Kafka->>AlertConsumer: Forward alert to alertmanager
35 files reviewed, 7 comments
| from datetime import datetime, timedelta, timezone | ||
| from kafka import KafkaProducer | ||
| from token_bootstrap import get_service_token | ||
| from datetime import datetime, timezone |
There was a problem hiding this comment.
syntax: duplicate import statement
| from datetime import datetime, timezone | |
| import os, json, uuid, requests | |
| from datetime import datetime, timedelta, timezone | |
| from kafka import KafkaProducer | |
| from token_bootstrap import get_service_token |
| threshold = float(match.get("threshold", 0.8)) | ||
| print(f"[INFO] Task '{task_name}' threshold: {threshold*100:.1f}%") | ||
| return threshold | ||
| from datetime import datetime, timezone |
There was a problem hiding this comment.
syntax: duplicate import statement (already imported on line 3)
| from datetime import datetime, timezone | |
| def get_rollups(window_start, window_end, headers=None): |
|
|
||
| if from_ts: | ||
| q += " AND ts >= :from_ts" | ||
| params["from_ts"] = parse_ts(from_ts) |
There was a problem hiding this comment.
logic: parse_ts function is called but not defined anywhere in this file, causing NameError at runtime
| params["from_ts"] = parse_ts(from_ts) | |
| params["from_ts"] = from_ts |
| params["from_ts"] = parse_ts(from_ts) | ||
| if to_ts: | ||
| q += " AND ts <= :to_ts" | ||
| params["to_ts"] = parse_ts(to_ts) |
There was a problem hiding this comment.
logic: parse_ts function is called but not defined anywhere in this file, causing NameError at runtime
| params["to_ts"] = parse_ts(to_ts) | |
| params["to_ts"] = to_ts |
| raise HTTPException(status_code=400, detail=str(e)) | ||
|
|
||
|
|
||
| @router.get("/{id}", response_model=RipenessWeeklyRollupOut) |
There was a problem hiding this comment.
syntax: RipenessWeeklyRollupOut is not defined, should be schemas.RipenessWeeklyRollupRead
| @router.get("/{id}", response_model=RipenessWeeklyRollupOut) | |
| @router.get("/{id}", response_model=schemas.RipenessWeeklyRollupRead) |
| def allow_unrestricted_torch_load(): | ||
| _original_load = torch.load | ||
| def patched_load(*args, **kwargs): | ||
| kwargs["weights_only"] = False | ||
| return _original_load(*args, **kwargs) | ||
| torch.load = patched_load | ||
|
|
||
| allow_unrestricted_torch_load() |
There was a problem hiding this comment.
logic: patching torch.load to disable weights_only safety check is a security risk - allows arbitrary code execution when loading pickled model files
| obj.close() | ||
| obj.release_conn() | ||
|
|
||
| # Attempt to run the model with bytes input first |
No description provided.