diff --git a/data-pipeline/config.py b/data-pipeline/config.py index f781ed6..2707f44 100644 --- a/data-pipeline/config.py +++ b/data-pipeline/config.py @@ -14,9 +14,33 @@ _db_name = os.environ.get("DB_NAME", "vigisus") _db_user = os.environ.get("DB_USER", "vigisus") _db_pass = os.environ.get("DB_PASS", "vigisus123") -DB_URL = os.environ.get( - "DB_URL", - f"postgresql://{_db_user}:{_db_pass}@{_db_host}:{_db_port}/{_db_name}", + + +def _normalize_db_url(db_url: str) -> str: + """Normaliza DB_URL para um formato compatível com SQLAlchemy. + + Aceita também URLs no estilo JDBC (usadas no backend Java), ex.: + jdbc:postgresql://localhost:5432/vigisus + + e converte para: + postgresql://localhost:5432/vigisus + """ + + if not db_url: + return db_url + + # Aceita o formato JDBC do Postgres (muito comum em projetos fullstack Java) + if db_url.startswith("jdbc:postgresql://"): + return db_url.replace("jdbc:", "", 1) + + return db_url + + +DB_URL = _normalize_db_url( + os.environ.get( + "DB_URL", + f"postgresql://{_db_user}:{_db_pass}@{_db_host}:{_db_port}/{_db_name}", + ) ) # FTP DATASUS diff --git a/data-pipeline/ingest_populacao.py b/data-pipeline/ingest_populacao.py index 4a30670..59c2fe9 100644 --- a/data-pipeline/ingest_populacao.py +++ b/data-pipeline/ingest_populacao.py @@ -9,6 +9,8 @@ import os import time +from urllib.parse import urlencode + from sqlalchemy import create_engine, text from clients.requests_config import session @@ -19,13 +21,61 @@ IBGE_AGREGADO = "6579" IBGE_VARIAVEL = "9324" -IBGE_PERIODO = "2023" +IBGE_PERIODO = os.environ.get("POP_IBGE_PERIODO", "2023") REQUEST_SLEEP_SECONDS = float(os.environ.get("POP_REQUEST_SLEEP_SECONDS", "0.2")) PROGRESS_EVERY = int(os.environ.get("POP_PROGRESS_EVERY", "25")) HEARTBEAT_SECONDS = int(os.environ.get("POP_HEARTBEAT_SECONDS", "15")) UPDATE_BATCH_SIZE = max(1, int(os.environ.get("POP_UPDATE_BATCH_SIZE", "100"))) +# Tenta reduzir drasticamente o tempo de execução usando requisições em lote. +# A API de agregados do IBGE aceita múltiplas localidades no parâmetro "localidades". +# Ex.: localidades=N6[3550308,3304557] +IBGE_BATCH_SIZE = max(1, int(os.environ.get("POP_IBGE_BATCH_SIZE", "100"))) + +# Cache simples do melhor período disponível para evitar 5500 chamadas em anos vazios. +_CACHED_IBGE_PERIODO_OK: str | None = None + + +def _discover_periodo_disponivel(preferido: str) -> str: + """Descobre um período (ano) com dados no agregado de estimativa populacional. + + Em alguns anos (ex.: 2022/2023) o IBGE pode retornar lista vazia para esse agregado. + Esse método testa o ano preferido e recua até encontrar dados. + """ + + global _CACHED_IBGE_PERIODO_OK # noqa: PLW0603 + if _CACHED_IBGE_PERIODO_OK: + return _CACHED_IBGE_PERIODO_OK + + try: + ano_pref = int(preferido) + except Exception: # noqa: BLE001 + ano_pref = 2023 + + # Município grande e estável para probe + probe = "3550308" # São Paulo + + for ano in range(ano_pref, max(2000, ano_pref - 10), -1): + url = ( + f"{IBGE_BASE_URL}/v3/agregados/{IBGE_AGREGADO}" + f"/periodos/{ano}/variaveis/{IBGE_VARIAVEL}?localidades=N6[{probe}]" + ) + try: + r = session.get(url, timeout=20) + r.raise_for_status() + data = r.json() + if isinstance(data, list) and len(data) > 0: + _CACHED_IBGE_PERIODO_OK = str(ano) + logger.info("IBGE: usando periodo %s (periodo %s estava vazio)", _CACHED_IBGE_PERIODO_OK, preferido) + return _CACHED_IBGE_PERIODO_OK + except Exception: # noqa: BLE001 + continue + + _CACHED_IBGE_PERIODO_OK = str(ano_pref) + logger.warning("IBGE: nao foi possivel detectar periodo com dados; usando %s", _CACHED_IBGE_PERIODO_OK) + return _CACHED_IBGE_PERIODO_OK + POPULACAO_FALLBACK_UF = { "AC": 15000, "AL": 30000, @@ -58,9 +108,10 @@ def fetch_populacao_municipio(co_ibge: str, max_retries: int = 3) -> int | None: + periodo = _discover_periodo_disponivel(IBGE_PERIODO) url = ( f"{IBGE_BASE_URL}/v3/agregados/{IBGE_AGREGADO}" - f"/periodos/{IBGE_PERIODO}/variaveis/{IBGE_VARIAVEL}" + f"/periodos/{periodo}/variaveis/{IBGE_VARIAVEL}" f"?localidades=N6[{co_ibge}]" ) for tentativa in range(1, max_retries + 1): @@ -68,7 +119,7 @@ def fetch_populacao_municipio(co_ibge: str, max_retries: int = 3) -> int | None: response = session.get(url, timeout=10) response.raise_for_status() data = response.json() - valor = data[0]["resultados"][0]["series"][0]["serie"][IBGE_PERIODO] + valor = data[0]["resultados"][0]["series"][0]["serie"][periodo] return int(valor) except Exception as exc: # noqa: BLE001 logger.debug("Tentativa %d/%d falhou para %s: %s", tentativa, max_retries, co_ibge, exc) @@ -77,6 +128,69 @@ def fetch_populacao_municipio(co_ibge: str, max_retries: int = 3) -> int | None: return None +def fetch_populacao_lote(codigos_ibge: list[str], max_retries: int = 3) -> dict[str, int]: + """Busca população para um lote de municípios em uma única chamada. + + A API do IBGE permite múltiplas localidades no parâmetro "localidades". + + Retorna um dict {co_ibge: populacao} apenas para códigos encontrados. + """ + + if not codigos_ibge: + return {} + + periodo = _discover_periodo_disponivel(IBGE_PERIODO) + base = ( + f"{IBGE_BASE_URL}/v3/agregados/{IBGE_AGREGADO}" + f"/periodos/{periodo}/variaveis/{IBGE_VARIAVEL}" + ) + params = {"localidades": f"N6[{','.join(codigos_ibge)}]"} + url = f"{base}?{urlencode(params)}" + + for tentativa in range(1, max_retries + 1): + try: + response = session.get(url, timeout=30) + response.raise_for_status() + data = response.json() + if not data: + return {} + + out: dict[str, int] = {} + # Formato mais comum: data[0]['resultados'][0]['series'] com localidade.id. + resultados = data[0].get("resultados", []) if isinstance(data[0], dict) else [] + for resultado in resultados: + for serie in resultado.get("series", []) or []: + localidade = serie.get("localidade", {}) or {} + co = str(localidade.get("id", "")) + valor = (serie.get("serie", {}) or {}).get(str(periodo)) + if co and valor and valor != "-": + try: + out[co] = int(valor) + except Exception: # noqa: BLE001 + continue + + # Fallback: alguns retornos usam 'localidades' dentro de resultados + if not out: + for resultado in resultados: + for localidade in resultado.get("localidades", []) or []: + co = str(localidade.get("id", "")) + for serie in (localidade.get("series", []) or []): + valor = (serie.get("serie", {}) or {}).get(str(periodo)) + if co and valor and valor != "-": + try: + out[co] = int(valor) + except Exception: # noqa: BLE001 + continue + + return out + except Exception as exc: # noqa: BLE001 + logger.debug("Tentativa %d/%d falhou para lote (%d codigos): %s", tentativa, max_retries, len(codigos_ibge), exc) + if tentativa < max_retries: + time.sleep(tentativa * 2) + + return {} + + def _flush_updates(engine, updates: list[dict]) -> None: if not updates: return @@ -117,48 +231,57 @@ def run() -> None: last_progress_log_at = 0.0 pending_updates: list[dict] = [] - for i, (co_ibge, sg_uf) in enumerate(municipios, 1): - pop = fetch_populacao_municipio(co_ibge) - if pop is None: - pop = POPULACAO_FALLBACK_UF.get(sg_uf, 20000) - fallbacks += 1 - else: - atualizados += 1 - - pending_updates.append({"pop": pop, "co": co_ibge}) - if len(pending_updates) >= UPDATE_BATCH_SIZE: - _flush_updates(engine, pending_updates) + # Processa em lotes para reduzir o tempo total. + # Mantém fallback por UF quando o IBGE não retorna valor. + for batch_start in range(0, total, IBGE_BATCH_SIZE): + batch = municipios[batch_start : batch_start + IBGE_BATCH_SIZE] + codigos = [co for (co, _uf) in batch] + by_co = fetch_populacao_lote(codigos) - if REQUEST_SLEEP_SECONDS > 0: - time.sleep(REQUEST_SLEEP_SECONDS) + for local_idx, (co_ibge, sg_uf) in enumerate(batch, 1): + i = batch_start + local_idx + pop = by_co.get(str(co_ibge)) + if pop is None: + pop = POPULACAO_FALLBACK_UF.get(sg_uf, 20000) + fallbacks += 1 + else: + atualizados += 1 - now = time.time() - should_log = ( - i == 1 - or i % PROGRESS_EVERY == 0 - or (now - last_progress_log_at) >= HEARTBEAT_SECONDS - or i == total - ) + pending_updates.append({"pop": pop, "co": co_ibge}) + if len(pending_updates) >= UPDATE_BATCH_SIZE: + _flush_updates(engine, pending_updates) - if should_log: - elapsed = max(now - started_at, 0.001) - rate = i / elapsed - eta_seconds = int((total - i) / rate) if rate > 0 else 0 - eta_min, eta_sec = divmod(eta_seconds, 60) - eta_hour, eta_min = divmod(eta_min, 60) - logger.info( - "Populacao IBGE: %d/%d (%.1f%%) | ibge=%d fallback=%d | vel=%.2f mun/s | ETA=%02d:%02d:%02d", - i, - total, - (i / total) * 100, - atualizados, - fallbacks, - rate, - eta_hour, - eta_min, - eta_sec, + now = time.time() + should_log = ( + i == 1 + or i % PROGRESS_EVERY == 0 + or (now - last_progress_log_at) >= HEARTBEAT_SECONDS + or i == total ) - last_progress_log_at = now + + if should_log: + elapsed = max(now - started_at, 0.001) + rate = i / elapsed + eta_seconds = int((total - i) / rate) if rate > 0 else 0 + eta_min, eta_sec = divmod(eta_seconds, 60) + eta_hour, eta_min = divmod(eta_min, 60) + logger.info( + "Populacao IBGE: %d/%d (%.1f%%) | ibge=%d fallback=%d | vel=%.2f mun/s | ETA=%02d:%02d:%02d", + i, + total, + (i / total) * 100, + atualizados, + fallbacks, + rate, + eta_hour, + eta_min, + eta_sec, + ) + last_progress_log_at = now + + # Pequena pausa entre lotes (se configurada) para reduzir chance de rate-limit. + if REQUEST_SLEEP_SECONDS > 0: + time.sleep(REQUEST_SLEEP_SECONDS) _flush_updates(engine, pending_updates)