Skip to content

feat: optimize speed with concurrency improvements#51

Merged
TheTrueAI merged 1 commit intomainfrom
speed-it-up
Feb 26, 2026
Merged

feat: optimize speed with concurrency improvements#51
TheTrueAI merged 1 commit intomainfrom
speed-it-up

Conversation

@TheTrueAI
Copy link
Owner

No description provided.

Copilot AI review requested due to automatic review settings February 26, 2026 00:32
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces concurrency to reduce end-to-end pipeline latency by overlapping job search and LLM-based evaluation, while adding a global limiter to avoid Gemini rate-limit spikes.

Changes:

  • Parallelize search_all_queries() execution and add on_jobs_found for incremental downstream processing.
  • Add a global Gemini concurrency semaphore and increase evaluation parallelism (plus adjust token budget).
  • Update Streamlit pipeline to overlap search + evaluation; adjust a unit test expectation for parallel dispatch.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
stellenscout/search_agent.py Adds concurrent query execution and on_jobs_found callback for streaming newly deduped jobs.
stellenscout/app.py Overlaps searching and evaluation; adds threading constructs and higher evaluation concurrency.
stellenscout/llm.py Introduces a module-level semaphore to limit concurrent Gemini calls.
stellenscout/evaluator_agent.py Adjusts evaluation defaults (token budget + worker count).
tests/test_search_agent.py Updates early-stop test expectations to account for parallel query dispatch.
ROADMAP.md Marks “Flip repo to public” as completed.

eval_futures: dict[Future, JobListing] = {}
eval_lock = threading.Lock()

def _on_jobs_found(new_unique_jobs: list) -> None:
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type hint for _on_jobs_found is currently new_unique_jobs: list, which loses the JobListing type information used elsewhere in this flow. Consider typing it as list[JobListing] (and the on_jobs_found callback accordingly) to keep mypy/type checkers useful here.

Suggested change
def _on_jobs_found(new_unique_jobs: list) -> None:
def _on_jobs_found(new_unique_jobs: list[JobListing]) -> None:

Copilot uses AI. Check for mistakes.
if min_unique_jobs and len(all_jobs) >= min_unique_jobs:
break
with ThreadPoolExecutor(max_workers=2) as executor:
futures = {executor.submit(_search_one, sq): idx for idx, sq in enumerate(prepared_queries)}
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idx in futures = {executor.submit(...): idx ...} is never used. If you don't need query indices, drop the value and use a set/list of futures; if you do need them (e.g., to report which query finished), use the stored idx when calling on_progress/logging.

Suggested change
futures = {executor.submit(_search_one, sq): idx for idx, sq in enumerate(prepared_queries)}
futures = [executor.submit(_search_one, sq) for sq in prepared_queries]

Copilot uses AI. Check for mistakes.
early_stop.set()
if batch_new and on_jobs_found is not None:
on_jobs_found(batch_new)

Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

min_unique_jobs early-stop no longer reliably stops SerpAPI work: all futures are submitted up front and the as_completed(...) loop waits for every future, so even after early_stop.set() the function will typically block until the remaining searches finish (and may still incur extra API calls). Consider breaking out of the loop once the threshold is reached and cancelling pending futures (or only submitting new queries while early_stop is not set) so the original cost-saving behavior is preserved.

Suggested change
# If we've reached the desired number of unique jobs, stop early and
# cancel any futures that have not yet started running.
if early_stop.is_set():
for f in futures:
if f is not future and not f.done():
f.cancel()
break

Copilot uses AI. Check for mistakes.
Comment on lines 595 to 605
with lock:
for job in jobs:
key = f"{job.title}|{job.company_name}"
if key not in all_jobs:
all_jobs[key] = job
batch_new.append(job)
completed += 1
if on_progress is not None:
on_progress(completed, len(queries), len(all_jobs))
if min_unique_jobs and len(all_jobs) >= min_unique_jobs:
early_stop.set()
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on_progress(...) is invoked while holding lock. If the callback is slow (e.g., UI updates), this unnecessarily blocks other completions from merging/deduping results. Capture the values you need under the lock, then call on_progress outside the critical section.

Copilot uses AI. Check for mistakes.
Comment on lines 799 to 837
@@ -764,54 +825,42 @@ def _search_progress(qi: int, total: int, unique: int) -> None:
jobs_per_query=jobs_per_query,
location=location,
on_progress=_search_progress,
on_jobs_found=_on_jobs_found,
)
cache.save_jobs(jobs, location)
search_progress.empty()
status.update(label=f"✅ Found {len(jobs)} unique jobs", state="complete")

if not jobs:
st.warning("No jobs found. Try adjusting your location or uploading a different CV.")
return

# ---- Step 3: Evaluate jobs -------------------------------------------
new_jobs, cached_evals = cache.get_unevaluated_jobs(jobs, profile)

if not new_jobs:
with st.status("✅ All evaluations loaded (cached)", state="complete"):
pass
all_evals = cached_evals
else:
if client is None:
client = create_client()

all_evals = dict(cached_evals)
progress_bar = st.progress(0, text="⭐ Rating each job for you...")
results_slot = st.empty()

with ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(evaluate_job, client, profile, job): job for job in new_jobs}
for i, future in enumerate(as_completed(futures), 1):
job = futures[future]
search_status.update(label=f"✅ Found {len(jobs)} unique jobs", state="complete")
search_progress.empty()

if not jobs:
eval_executor.shutdown(wait=False)
st.warning("No jobs found. Try adjusting your location or uploading a different CV.")
return
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eval_executor = ThreadPoolExecutor(...) is created outside a context manager and only shut down on the success path. If an exception occurs between creation and the later shutdown(...), threads can leak across reruns. Wrap the executor lifecycle in a try/finally (or a with block) to guarantee shutdown (optionally with cancel_futures=True on early returns).

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.

Comments suppressed due to low confidence (1)

stellenscout/evaluator_agent.py:111

  • evaluate_job() can raise a Pydantic ValidationError on JobEvaluation(**data) (e.g., score out of 0-100 range or wrong field types). That exception is not caught, which can fail the whole parallel evaluation run. Consider catching ValidationError (and possibly TypeError) and returning the same safe fallback used for parse/API errors.
    try:
        content = call_gemini(client, prompt, temperature=0.2, max_tokens=4096)
    except (ServerError, ClientError):
        return JobEvaluation(score=50, reasoning="Could not evaluate (API error after retries)", missing_skills=[])

    try:
        data = parse_json(content)
    except ValueError:
        return JobEvaluation(score=50, reasoning="Could not evaluate (failed to parse response)", missing_skills=[])

    if not isinstance(data, dict):
        return JobEvaluation(score=50, reasoning="Could not evaluate (unexpected response format)", missing_skills=[])
    return JobEvaluation(**data)


cache.save_evaluations(profile, all_evals)
finally:
eval_executor.shutdown(wait=False, cancel_futures=True)
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eval_executor.shutdown(wait=False, cancel_futures=True) can leave already-running evaluation tasks executing in the background if an exception occurs or the function returns early, potentially continuing Gemini calls after the UI flow is done. Since the normal path already waits on as_completed(...), using wait=True in finally (or otherwise ensuring in-flight futures are joined/cancelled deterministically) will avoid orphaned threads and unexpected ongoing API usage.

Suggested change
eval_executor.shutdown(wait=False, cancel_futures=True)
eval_executor.shutdown(wait=True, cancel_futures=True)

Copilot uses AI. Check for mistakes.
Comment on lines 534 to 607
@@ -557,31 +564,54 @@ def search_all_queries(
serpapi_location: str | None = None if remote_search else location or None

all_jobs: dict[str, JobListing] = {} # Use title+company as key for dedup
lock = threading.Lock()
completed = 0
early_stop = threading.Event()

for qi, query in enumerate(queries, 1):
# If the query doesn't already mention a location, append one
# Prepare all search queries upfront (localisation, location append)
prepared_queries: list[str] = []
for query in queries:
query_lower = query.lower()
has_location = any(kw in query_lower for kw in _location_words)
search_query = query if has_location else f"{query} {local_location}"

# Translate any English city/country names in the query itself
search_query = _localise_query(search_query)
prepared_queries.append(search_query)

jobs = search_jobs(
def _search_one(search_query: str) -> list[JobListing]:
if early_stop.is_set():
return []
return search_jobs(
search_query,
num_results=jobs_per_query,
gl=gl,
location=serpapi_location,
)
for job in jobs:
key = f"{job.title}|{job.company_name}"
if key not in all_jobs:
all_jobs[key] = job

if on_progress is not None:
on_progress(qi, len(queries), len(all_jobs))

if min_unique_jobs and len(all_jobs) >= min_unique_jobs:
break
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(_search_one, sq) for sq in prepared_queries]
for future in as_completed(futures):
jobs = future.result()
batch_new: list[JobListing] = []
with lock:
for job in jobs:
key = f"{job.title}|{job.company_name}"
if key not in all_jobs:
all_jobs[key] = job
batch_new.append(job)
completed += 1
progress_args = (completed, len(queries), len(all_jobs))
if min_unique_jobs and len(all_jobs) >= min_unique_jobs:
early_stop.set()
# Callbacks outside the lock to avoid blocking other threads
if on_progress is not None:
on_progress(*progress_args)
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The on_progress docstring says the callback receives (query_index, total_queries, unique_jobs_count), but the implementation passes (completed, len(queries), len(all_jobs)) where completed is completion count (and completion order), not the original query index. Either track/query the original index per future (e.g., submit (qi, search_query) and map future -> qi) or update the docstring/type contract to match the new semantics.

Copilot uses AI. Check for mistakes.

if min_unique_jobs and len(all_jobs) >= min_unique_jobs:
break
with ThreadPoolExecutor(max_workers=5) as executor:
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max_workers=5 is a hard-coded concurrency limit with no explanation or configurability. To make this tunable and easier to reason about (and to support different environments / quotas), consider adding a max_workers parameter (defaulting to a sensible value) or deriving it from len(queries) with an upper bound.

Suggested change
with ThreadPoolExecutor(max_workers=5) as executor:
# Limit concurrency to at most 5 workers, and never exceed the number of queries.
max_workers = min(5, max(1, len(prepared_queries)))
with ThreadPoolExecutor(max_workers=max_workers) as executor:

Copilot uses AI. Check for mistakes.
@TheTrueAI TheTrueAI merged commit 16aece1 into main Feb 26, 2026
1 check passed
@TheTrueAI TheTrueAI deleted the speed-it-up branch February 26, 2026 01:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants