feat: optimize speed with concurrency improvements#51
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces concurrency to reduce end-to-end pipeline latency by overlapping job search and LLM-based evaluation, while adding a global limiter to avoid Gemini rate-limit spikes.
Changes:
- Parallelize
search_all_queries()execution and addon_jobs_foundfor incremental downstream processing. - Add a global Gemini concurrency semaphore and increase evaluation parallelism (plus adjust token budget).
- Update Streamlit pipeline to overlap search + evaluation; adjust a unit test expectation for parallel dispatch.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
stellenscout/search_agent.py |
Adds concurrent query execution and on_jobs_found callback for streaming newly deduped jobs. |
stellenscout/app.py |
Overlaps searching and evaluation; adds threading constructs and higher evaluation concurrency. |
stellenscout/llm.py |
Introduces a module-level semaphore to limit concurrent Gemini calls. |
stellenscout/evaluator_agent.py |
Adjusts evaluation defaults (token budget + worker count). |
tests/test_search_agent.py |
Updates early-stop test expectations to account for parallel query dispatch. |
ROADMAP.md |
Marks “Flip repo to public” as completed. |
stellenscout/app.py
Outdated
| eval_futures: dict[Future, JobListing] = {} | ||
| eval_lock = threading.Lock() | ||
|
|
||
| def _on_jobs_found(new_unique_jobs: list) -> None: |
There was a problem hiding this comment.
Type hint for _on_jobs_found is currently new_unique_jobs: list, which loses the JobListing type information used elsewhere in this flow. Consider typing it as list[JobListing] (and the on_jobs_found callback accordingly) to keep mypy/type checkers useful here.
| def _on_jobs_found(new_unique_jobs: list) -> None: | |
| def _on_jobs_found(new_unique_jobs: list[JobListing]) -> None: |
stellenscout/search_agent.py
Outdated
| if min_unique_jobs and len(all_jobs) >= min_unique_jobs: | ||
| break | ||
| with ThreadPoolExecutor(max_workers=2) as executor: | ||
| futures = {executor.submit(_search_one, sq): idx for idx, sq in enumerate(prepared_queries)} |
There was a problem hiding this comment.
idx in futures = {executor.submit(...): idx ...} is never used. If you don't need query indices, drop the value and use a set/list of futures; if you do need them (e.g., to report which query finished), use the stored idx when calling on_progress/logging.
| futures = {executor.submit(_search_one, sq): idx for idx, sq in enumerate(prepared_queries)} | |
| futures = [executor.submit(_search_one, sq) for sq in prepared_queries] |
| early_stop.set() | ||
| if batch_new and on_jobs_found is not None: | ||
| on_jobs_found(batch_new) | ||
|
|
There was a problem hiding this comment.
min_unique_jobs early-stop no longer reliably stops SerpAPI work: all futures are submitted up front and the as_completed(...) loop waits for every future, so even after early_stop.set() the function will typically block until the remaining searches finish (and may still incur extra API calls). Consider breaking out of the loop once the threshold is reached and cancelling pending futures (or only submitting new queries while early_stop is not set) so the original cost-saving behavior is preserved.
| # If we've reached the desired number of unique jobs, stop early and | |
| # cancel any futures that have not yet started running. | |
| if early_stop.is_set(): | |
| for f in futures: | |
| if f is not future and not f.done(): | |
| f.cancel() | |
| break |
| with lock: | ||
| for job in jobs: | ||
| key = f"{job.title}|{job.company_name}" | ||
| if key not in all_jobs: | ||
| all_jobs[key] = job | ||
| batch_new.append(job) | ||
| completed += 1 | ||
| if on_progress is not None: | ||
| on_progress(completed, len(queries), len(all_jobs)) | ||
| if min_unique_jobs and len(all_jobs) >= min_unique_jobs: | ||
| early_stop.set() |
There was a problem hiding this comment.
on_progress(...) is invoked while holding lock. If the callback is slow (e.g., UI updates), this unnecessarily blocks other completions from merging/deduping results. Capture the values you need under the lock, then call on_progress outside the critical section.
stellenscout/app.py
Outdated
| @@ -764,54 +825,42 @@ def _search_progress(qi: int, total: int, unique: int) -> None: | |||
| jobs_per_query=jobs_per_query, | |||
| location=location, | |||
| on_progress=_search_progress, | |||
| on_jobs_found=_on_jobs_found, | |||
| ) | |||
| cache.save_jobs(jobs, location) | |||
| search_progress.empty() | |||
| status.update(label=f"✅ Found {len(jobs)} unique jobs", state="complete") | |||
|
|
|||
| if not jobs: | |||
| st.warning("No jobs found. Try adjusting your location or uploading a different CV.") | |||
| return | |||
|
|
|||
| # ---- Step 3: Evaluate jobs ------------------------------------------- | |||
| new_jobs, cached_evals = cache.get_unevaluated_jobs(jobs, profile) | |||
|
|
|||
| if not new_jobs: | |||
| with st.status("✅ All evaluations loaded (cached)", state="complete"): | |||
| pass | |||
| all_evals = cached_evals | |||
| else: | |||
| if client is None: | |||
| client = create_client() | |||
|
|
|||
| all_evals = dict(cached_evals) | |||
| progress_bar = st.progress(0, text="⭐ Rating each job for you...") | |||
| results_slot = st.empty() | |||
|
|
|||
| with ThreadPoolExecutor(max_workers=10) as executor: | |||
| futures = {executor.submit(evaluate_job, client, profile, job): job for job in new_jobs} | |||
| for i, future in enumerate(as_completed(futures), 1): | |||
| job = futures[future] | |||
| search_status.update(label=f"✅ Found {len(jobs)} unique jobs", state="complete") | |||
| search_progress.empty() | |||
|
|
|||
| if not jobs: | |||
| eval_executor.shutdown(wait=False) | |||
| st.warning("No jobs found. Try adjusting your location or uploading a different CV.") | |||
| return | |||
There was a problem hiding this comment.
eval_executor = ThreadPoolExecutor(...) is created outside a context manager and only shut down on the success path. If an exception occurs between creation and the later shutdown(...), threads can leak across reruns. Wrap the executor lifecycle in a try/finally (or a with block) to guarantee shutdown (optionally with cancel_futures=True on early returns).
8d2bdaa to
f570175
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
stellenscout/evaluator_agent.py:111
evaluate_job()can raise a PydanticValidationErroronJobEvaluation(**data)(e.g., score out of 0-100 range or wrong field types). That exception is not caught, which can fail the whole parallel evaluation run. Consider catchingValidationError(and possiblyTypeError) and returning the same safe fallback used for parse/API errors.
try:
content = call_gemini(client, prompt, temperature=0.2, max_tokens=4096)
except (ServerError, ClientError):
return JobEvaluation(score=50, reasoning="Could not evaluate (API error after retries)", missing_skills=[])
try:
data = parse_json(content)
except ValueError:
return JobEvaluation(score=50, reasoning="Could not evaluate (failed to parse response)", missing_skills=[])
if not isinstance(data, dict):
return JobEvaluation(score=50, reasoning="Could not evaluate (unexpected response format)", missing_skills=[])
return JobEvaluation(**data)
stellenscout/app.py
Outdated
|
|
||
| cache.save_evaluations(profile, all_evals) | ||
| finally: | ||
| eval_executor.shutdown(wait=False, cancel_futures=True) |
There was a problem hiding this comment.
eval_executor.shutdown(wait=False, cancel_futures=True) can leave already-running evaluation tasks executing in the background if an exception occurs or the function returns early, potentially continuing Gemini calls after the UI flow is done. Since the normal path already waits on as_completed(...), using wait=True in finally (or otherwise ensuring in-flight futures are joined/cancelled deterministically) will avoid orphaned threads and unexpected ongoing API usage.
| eval_executor.shutdown(wait=False, cancel_futures=True) | |
| eval_executor.shutdown(wait=True, cancel_futures=True) |
| @@ -557,31 +564,54 @@ def search_all_queries( | |||
| serpapi_location: str | None = None if remote_search else location or None | |||
|
|
|||
| all_jobs: dict[str, JobListing] = {} # Use title+company as key for dedup | |||
| lock = threading.Lock() | |||
| completed = 0 | |||
| early_stop = threading.Event() | |||
|
|
|||
| for qi, query in enumerate(queries, 1): | |||
| # If the query doesn't already mention a location, append one | |||
| # Prepare all search queries upfront (localisation, location append) | |||
| prepared_queries: list[str] = [] | |||
| for query in queries: | |||
| query_lower = query.lower() | |||
| has_location = any(kw in query_lower for kw in _location_words) | |||
| search_query = query if has_location else f"{query} {local_location}" | |||
|
|
|||
| # Translate any English city/country names in the query itself | |||
| search_query = _localise_query(search_query) | |||
| prepared_queries.append(search_query) | |||
|
|
|||
| jobs = search_jobs( | |||
| def _search_one(search_query: str) -> list[JobListing]: | |||
| if early_stop.is_set(): | |||
| return [] | |||
| return search_jobs( | |||
| search_query, | |||
| num_results=jobs_per_query, | |||
| gl=gl, | |||
| location=serpapi_location, | |||
| ) | |||
| for job in jobs: | |||
| key = f"{job.title}|{job.company_name}" | |||
| if key not in all_jobs: | |||
| all_jobs[key] = job | |||
|
|
|||
| if on_progress is not None: | |||
| on_progress(qi, len(queries), len(all_jobs)) | |||
|
|
|||
| if min_unique_jobs and len(all_jobs) >= min_unique_jobs: | |||
| break | |||
| with ThreadPoolExecutor(max_workers=5) as executor: | |||
| futures = [executor.submit(_search_one, sq) for sq in prepared_queries] | |||
| for future in as_completed(futures): | |||
| jobs = future.result() | |||
| batch_new: list[JobListing] = [] | |||
| with lock: | |||
| for job in jobs: | |||
| key = f"{job.title}|{job.company_name}" | |||
| if key not in all_jobs: | |||
| all_jobs[key] = job | |||
| batch_new.append(job) | |||
| completed += 1 | |||
| progress_args = (completed, len(queries), len(all_jobs)) | |||
| if min_unique_jobs and len(all_jobs) >= min_unique_jobs: | |||
| early_stop.set() | |||
| # Callbacks outside the lock to avoid blocking other threads | |||
| if on_progress is not None: | |||
| on_progress(*progress_args) | |||
There was a problem hiding this comment.
The on_progress docstring says the callback receives (query_index, total_queries, unique_jobs_count), but the implementation passes (completed, len(queries), len(all_jobs)) where completed is completion count (and completion order), not the original query index. Either track/query the original index per future (e.g., submit (qi, search_query) and map future -> qi) or update the docstring/type contract to match the new semantics.
stellenscout/search_agent.py
Outdated
|
|
||
| if min_unique_jobs and len(all_jobs) >= min_unique_jobs: | ||
| break | ||
| with ThreadPoolExecutor(max_workers=5) as executor: |
There was a problem hiding this comment.
max_workers=5 is a hard-coded concurrency limit with no explanation or configurability. To make this tunable and easier to reason about (and to support different environments / quotas), consider adding a max_workers parameter (defaulting to a sensible value) or deriving it from len(queries) with an upper bound.
| with ThreadPoolExecutor(max_workers=5) as executor: | |
| # Limit concurrency to at most 5 workers, and never exceed the number of queries. | |
| max_workers = min(5, max(1, len(prepared_queries))) | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: |
f570175 to
1acce73
Compare
No description provided.