From 41ff218c591f926531da2a5dde982685e7102849 Mon Sep 17 00:00:00 2001 From: Usher Gay Date: Tue, 12 Aug 2025 21:34:04 -0400 Subject: [PATCH] feat: stream metadata enrichment and duckdb join --- requirements.txt | 2 + .../spotify_api/smart_metadata_enrichment.py | 604 +++++++++++------- 2 files changed, 373 insertions(+), 233 deletions(-) diff --git a/requirements.txt b/requirements.txt index f5cf684..4b34f6c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,5 @@ matplotlib seaborn streamlit altair +duckdb +psutil diff --git a/scripts/spotify_api/smart_metadata_enrichment.py b/scripts/spotify_api/smart_metadata_enrichment.py index 30a635e..5afdb0c 100644 --- a/scripts/spotify_api/smart_metadata_enrichment.py +++ b/scripts/spotify_api/smart_metadata_enrichment.py @@ -1,169 +1,225 @@ +""" +Spotify API metadata enrichment with streaming writes and on-disk joins. + +Dependencies: + pip install duckdb psutil + (sqlite3 is part of the Python standard library) +""" + +from __future__ import annotations + +import json +import logging +import os +import sqlite3 +import time +from pathlib import Path +from typing import Dict, Iterable, Set, Tuple + +import duckdb import pandas as pd import spotipy +from dotenv import load_dotenv from spotipy.oauth2 import SpotifyOAuth -import time -import os -import pickle from tqdm import tqdm -import logging -from dotenv import load_dotenv -# Load environment variables +try: # Optional dependency for memory metrics + import psutil # type: ignore +except Exception: # pragma: no cover - psutil is optional + psutil = None + + load_dotenv() -# Set up logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", +) logger = logging.getLogger(__name__) -# Your credentials from environment variables + +# --------------------------------------------------------------------------- +# Configuration & paths +# --------------------------------------------------------------------------- + CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID") CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET") if not CLIENT_ID or not CLIENT_SECRET: raise ValueError("Please set SPOTIFY_CLIENT_ID and SPOTIFY_CLIENT_SECRET in your .env file") -# Configuration -BATCH_SIZE = 50 -PROGRESS_FILE = "data/enriched/api_metadata_progress.pkl" -DELAY_BETWEEN_REQUESTS = 0.3 -DELAY_BETWEEN_BATCHES = 5 +SCRIPT_DIR = Path(__file__).resolve().parent +BASE_DIR = (SCRIPT_DIR / ".." / "..").resolve() -# Set up Spotipy with Authorization Code Flow (user auth) -scope = "user-library-read" -redirect_uri = "http://127.0.0.1:8888/callback" +PROCESSED_DIR = BASE_DIR / "data" / "processed" +ENRICHED_DIR = BASE_DIR / "data" / "enriched" +BASE_CSV = PROCESSED_DIR / "cleaned_streaming_history.csv" +META_CSV = ENRICHED_DIR / "spotify_api_metadata.csv" +FINAL_CSV = ENRICHED_DIR / "spotify_api_enriched_streaming_history.csv" +PROGRESS_DB = ENRICHED_DIR / "api_progress.sqlite" + +BATCH_SIZE = int(os.getenv("BATCH_SIZE", "50")) +DELAY_BETWEEN_REQUESTS = float(os.getenv("DELAY_BETWEEN_REQUESTS", "0.3")) +DELAY_BETWEEN_BATCHES = float(os.getenv("DELAY_BETWEEN_BATCHES", "5")) +TQDM_DISABLE = os.getenv("TQDM_DISABLE") == "1" + + +# --------------------------------------------------------------------------- +# Spotipy client +# --------------------------------------------------------------------------- + +scope = "user-library-read" auth_manager = SpotifyOAuth( client_id=CLIENT_ID, client_secret=CLIENT_SECRET, - redirect_uri=redirect_uri, + redirect_uri="http://127.0.0.1:8888/callback", scope=scope, - cache_path=".spotify_cache" + cache_path=".spotify_cache", ) - sp = spotipy.Spotify(auth_manager=auth_manager) -def load_progress(): - """Load previously processed tracks to avoid re-processing""" - if os.path.exists(PROGRESS_FILE): - with open(PROGRESS_FILE, 'rb') as f: - return pickle.load(f) - return {} - -def save_progress(processed_tracks): - """Save progress to resume later if needed""" - os.makedirs(os.path.dirname(PROGRESS_FILE), exist_ok=True) - with open(PROGRESS_FILE, 'wb') as f: - pickle.dump(processed_tracks, f) - -def load_existing_enrichment(): - """Load tracks that already have external metadata to avoid duplicating work""" - enriched_tracks = set() - - # Check for Ultimate Spotify enrichment - ultimate_file = "data/enriched/ultimate_spotify_enriched_streaming_history.csv" - if os.path.exists(ultimate_file): - logger.info("Loading existing Ultimate Spotify enrichment...") + +# --------------------------------------------------------------------------- +# SQLite progress store +# --------------------------------------------------------------------------- + +def get_db_connection() -> sqlite3.Connection: + """Open the progress database creating the table if needed.""" + ENRICHED_DIR.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(PROGRESS_DB) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS progress( + key TEXT PRIMARY KEY, + meta_json TEXT, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """ + ) + return conn + + +def load_processed_keys(conn: sqlite3.Connection) -> Set[str]: + """Return the set of track keys already processed.""" + try: + rows = conn.execute("SELECT key FROM progress").fetchall() + return {r[0] for r in rows} + except sqlite3.Error: + return set() + + +# --------------------------------------------------------------------------- +# Helper functions +# --------------------------------------------------------------------------- + +def load_existing_enrichment() -> Set[str]: + """Tracks that already have external metadata (Ultimate Spotify).""" + enriched_tracks: Set[str] = set() + ultimate_file = ENRICHED_DIR / "ultimate_spotify_enriched_streaming_history.csv" + if not ultimate_file.exists(): + return enriched_tracks + + try: df = pd.read_csv(ultimate_file) - - # Find tracks that have metadata (non-null in key metadata columns) - metadata_cols = ['acousticness', 'danceability', 'energy', 'valence', 'genre'] + metadata_cols = ["acousticness", "danceability", "energy", "valence", "genre"] has_metadata = df[metadata_cols].notna().any(axis=1) - - enriched_df = df[has_metadata] - for _, row in enriched_df.iterrows(): - track_key = f"{row['master_metadata_track_name']}|||{row['master_metadata_album_artist_name']}" - enriched_tracks.add(track_key) - - logger.info(f"Found {len(enriched_tracks)} tracks already enriched with external metadata") - + for row in df.loc[has_metadata, [ + "master_metadata_track_name", + "master_metadata_album_artist_name", + ]].itertuples(index=False): + enriched_tracks.add(f"{row[0]}|||{row[1]}") + logger.info("Found %d tracks already enriched with external metadata", len(enriched_tracks)) + except Exception as exc: # pragma: no cover - best effort only + logger.warning("Failed loading existing enrichment: %s", exc) return enriched_tracks -def search_track_metadata(track_name, artist_name): - """Search for a single track and get its metadata (excluding audio features)""" + +def search_track_metadata(track_name: str, artist_name: str) -> Dict[str, object] | None: + """Search Spotify for track metadata with basic retry/backoff.""" query = f"track:{track_name} artist:{artist_name}" - - try: - results = sp.search(q=query, type="track", limit=1) - items = results["tracks"]["items"] - - if not items: - return None - - track_data = items[0] - track_id = track_data["id"] - - # Get genres and artist data - artist_data = None + for attempt in range(3): try: - artist_id = track_data["artists"][0]["id"] - artist_data = sp.artist(artist_id) - except Exception as e: - logger.warning(f"Failed to get artist data for {artist_name}: {e}") - - # Extract track and album metadata - album_data = track_data.get("album", {}) - - metadata = { - "spotify_id": track_id, - "spotify_uri": track_data.get("uri"), - "track_popularity": track_data.get("popularity"), - "track_duration_ms": track_data.get("duration_ms"), - "track_explicit": track_data.get("explicit"), - "track_preview_url": track_data.get("preview_url"), - - # Album metadata - "album_name": album_data.get("name"), - "album_release_date": album_data.get("release_date"), - "album_release_date_precision": album_data.get("release_date_precision"), - "album_total_tracks": album_data.get("total_tracks"), - "album_type": album_data.get("album_type"), - - # Artist metadata - "artist_popularity": artist_data.get("popularity") if artist_data else None, - "artist_followers": artist_data.get("followers", {}).get("total") if artist_data else None, - "api_genres": ", ".join(artist_data.get("genres", [])) if artist_data else None, - - # Note: Audio features skipped due to API 403 errors - "audio_features_note": "Skipped due to Spotify API limitations", - "enrichment_source": "spotify_api" - } - - return metadata - - except Exception as e: - logger.warning(f"Error fetching {track_name} by {artist_name}: {e}") - return None - -def process_batch(tracks_batch, processed_tracks): - """Process a batch of tracks""" - batch_results = [] - - for _, row in tracks_batch.iterrows(): - track = row["master_metadata_track_name"] - artist = row["master_metadata_album_artist_name"] - - # Create a unique key for this track - track_key = f"{track}|||{artist}" - - # Skip if already processed - if track_key in processed_tracks: - metadata = processed_tracks[track_key] - else: - metadata = search_track_metadata(track, artist) - processed_tracks[track_key] = metadata - time.sleep(DELAY_BETWEEN_REQUESTS) - - # Add the track info and metadata to results - result = { + results = sp.search(q=query, type="track", limit=1) + items = results["tracks"]["items"] + if not items: + return None + track_data = items[0] + track_id = track_data.get("id") + + artist_data = None + try: + artist_id = track_data["artists"][0]["id"] + artist_data = sp.artist(artist_id) + except Exception as err: # pragma: no cover - best effort + logger.warning("Failed to get artist data for %s: %s", artist_name, err) + + album_data = track_data.get("album", {}) + + return { + "spotify_id": track_id, + "spotify_uri": track_data.get("uri"), + "track_popularity": track_data.get("popularity"), + "track_duration_ms": track_data.get("duration_ms"), + "track_explicit": track_data.get("explicit"), + "track_preview_url": track_data.get("preview_url"), + "album_name": album_data.get("name"), + "album_release_date": album_data.get("release_date"), + "album_release_date_precision": album_data.get("release_date_precision"), + "album_total_tracks": album_data.get("total_tracks"), + "album_type": album_data.get("album_type"), + "artist_popularity": artist_data.get("popularity") if artist_data else None, + "artist_followers": artist_data.get("followers", {}).get("total") if artist_data else None, + "api_genres": ", ".join(artist_data.get("genres", [])) if artist_data else None, + "audio_features_note": "Skipped due to Spotify API limitations", + "enrichment_source": "spotify_api", + } + except Exception as exc: + wait = 2 ** attempt + logger.warning( + "Error fetching %s by %s: %s. Retrying in %s sec", + track_name, + artist_name, + exc, + wait, + ) + time.sleep(wait) + return None + + +def process_batch( + batch_df: pd.DataFrame, + processed_keys: Set[str], + conn: sqlite3.Connection, + meta_path: Path, +) -> Tuple[int, int, int, int]: + """Process a batch of tracks and append results to CSV.""" + + rows = [] + api_hits = 0 + upserts = 0 + skipped = 0 + + for track, artist in batch_df[[ + "master_metadata_track_name", + "master_metadata_album_artist_name", + ]].itertuples(index=False): + key = f"{track}|||{artist}" + if key in processed_keys: + skipped += 1 + continue + + metadata = search_track_metadata(track, artist) + api_hits += 1 + + row = { "master_metadata_track_name": track, - "master_metadata_album_artist_name": artist + "master_metadata_album_artist_name": artist, } - if metadata: - result.update(metadata) + row.update(metadata) else: - # Add None values for failed lookups - result.update({ + row.update({ "spotify_id": None, "spotify_uri": None, "track_popularity": None, @@ -179,119 +235,201 @@ def process_batch(tracks_batch, processed_tracks): "artist_followers": None, "api_genres": None, "audio_features_note": None, - "enrichment_source": "spotify_api" + "enrichment_source": "spotify_api", }) - - batch_results.append(result) - - return batch_results - -def main(): - logger.info("Starting smart Spotify API metadata enrichment...") - logger.info("This will skip tracks already enriched with external datasets") - - # Test authentication first + rows.append(row) + + conn.execute( + "INSERT OR REPLACE INTO progress(key, meta_json) VALUES(?, ?)", + (key, json.dumps(metadata)), + ) + processed_keys.add(key) + upserts += 1 + + processed = len(rows) + if rows: + header = not meta_path.exists() + pd.DataFrame(rows).to_csv(meta_path, index=False, mode="a", header=header) + + return processed, skipped, api_hits, upserts + + +# --------------------------------------------------------------------------- +# DuckDB merge +# --------------------------------------------------------------------------- + +def run_duckdb_merge( + base_csv: Path | str = BASE_CSV, + meta_csv: Path | str = META_CSV, + out_csv: Path | str = FINAL_CSV, +) -> None: + """Perform on-disk merge of streaming history and API metadata.""" + + base_csv = Path(base_csv) + meta_csv = Path(meta_csv) + out_csv = Path(out_csv) + + if not base_csv.exists() or not meta_csv.exists(): + logger.warning("Missing inputs for DuckDB merge: base=%s meta=%s", base_csv, meta_csv) + return + + con = duckdb.connect(database=":memory:") + + con.execute( + f""" + CREATE TABLE base AS + SELECT *, + lower(trim(master_metadata_track_name)) AS track_key, + lower(trim(master_metadata_album_artist_name)) AS artist_key + FROM read_csv_auto('{base_csv}') + """ + ) + + con.execute( + f""" + CREATE TABLE meta_raw AS + SELECT *, + lower(trim(master_metadata_track_name)) AS track_key, + lower(trim(master_metadata_album_artist_name)) AS artist_key + FROM read_csv_auto('{meta_csv}') + """ + ) + + dupes = con.execute( + """ + SELECT track_key, artist_key, COUNT(*) c + FROM meta_raw + GROUP BY 1,2 + HAVING c > 1 + """ + ).fetchall() + if dupes: + logger.warning("Dropping %d duplicate metadata keys", len(dupes)) + con.execute( + """ + CREATE TABLE meta AS + SELECT * EXCLUDE(rn) FROM ( + SELECT *, + row_number() OVER (PARTITION BY track_key, artist_key ORDER BY track_key) rn + FROM meta_raw + ) + WHERE rn = 1 + """ + ) + else: + con.execute("CREATE TABLE meta AS SELECT * FROM meta_raw") + + out_csv.parent.mkdir(parents=True, exist_ok=True) + con.execute( + f""" + COPY ( + SELECT b.*, m.* EXCLUDE(track_key, artist_key) + FROM base b + LEFT JOIN meta m USING(track_key, artist_key) + ) TO '{out_csv}' (HEADER, DELIMITER ',') + """ + ) + + base_rows = con.execute("SELECT COUNT(*) FROM base").fetchone()[0] + logger.info("DuckDB merge wrote %s rows to %s", base_rows, out_csv) + con.close() + + +# --------------------------------------------------------------------------- +# Main enrichment flow +# --------------------------------------------------------------------------- + +def main(merge_only: bool = False) -> None: + if merge_only: + run_duckdb_merge() + return + try: user = sp.current_user() - logger.info(f"Successfully authenticated as: {user['display_name']} ({user['id']})") - except Exception as e: - logger.error(f"Authentication failed: {e}") + logger.info("Authenticated as %s (%s)", user.get("display_name"), user.get("id")) + except Exception as exc: + logger.error("Authentication failed: %s", exc) return - - # Load cleaned data - df = pd.read_csv("data/processed/cleaned_streaming_history.csv") - logger.info(f"Loaded {len(df)} total streaming records") - - # Get unique tracks for metadata lookup + + if not BASE_CSV.exists(): + logger.error("Base streaming history not found: %s", BASE_CSV) + return + + conn = get_db_connection() + processed_keys = load_processed_keys(conn) + logger.info("Loaded %d processed keys from progress DB", len(processed_keys)) + + df = pd.read_csv(BASE_CSV) + logger.info("Loaded %d total streaming records", len(df)) + unique_tracks = ( df[["master_metadata_track_name", "master_metadata_album_artist_name"]] .dropna() .drop_duplicates() .reset_index(drop=True) ) - logger.info(f"Found {len(unique_tracks)} unique tracks to process") - - # Load tracks that already have external metadata + logger.info("Found %d unique tracks", len(unique_tracks)) + already_enriched = load_existing_enrichment() - - # Filter out tracks that already have metadata - tracks_to_process = [] - for _, row in unique_tracks.iterrows(): - track_key = f"{row['master_metadata_track_name']}|||{row['master_metadata_album_artist_name']}" - if track_key not in already_enriched: - tracks_to_process.append(row) - - tracks_to_process = pd.DataFrame(tracks_to_process) - logger.info(f"After filtering already enriched tracks: {len(tracks_to_process)} tracks need API enrichment") - - if len(tracks_to_process) == 0: - logger.info("All tracks already have external metadata! No API calls needed.") + if already_enriched: + unique_tracks["key"] = ( + unique_tracks["master_metadata_track_name"] + + "|||" + + unique_tracks["master_metadata_album_artist_name"] + ) + unique_tracks = unique_tracks[~unique_tracks["key"].isin(already_enriched)].drop(columns=["key"]).reset_index(drop=True) + logger.info("After filtering existing enrichment: %d tracks", len(unique_tracks)) + + if unique_tracks.empty: + logger.info("All tracks already have metadata; running merge only") + conn.close() + run_duckdb_merge() return - - # Load previous progress - processed_tracks = load_progress() - logger.info(f"Loaded {len(processed_tracks)} previously processed API tracks") - - logger.info("NOTE: Skipping audio features due to Spotify API 403 errors") - logger.info("Will collect: track info, album info, artist info, and genres") - - # Process in batches - all_results = [] - total_batches = (len(tracks_to_process) + BATCH_SIZE - 1) // BATCH_SIZE - - try: - for i in tqdm(range(0, len(tracks_to_process), BATCH_SIZE), desc="Processing batches"): - batch_num = i // BATCH_SIZE + 1 - batch = tracks_to_process.iloc[i:i + BATCH_SIZE] - - logger.info(f"Processing batch {batch_num}/{total_batches} ({len(batch)} tracks)") - - batch_results = process_batch(batch, processed_tracks) - all_results.extend(batch_results) - - # Save progress after each batch - save_progress(processed_tracks) - - # Rate limiting between batches - if i + BATCH_SIZE < len(tracks_to_process): - logger.info(f"Waiting {DELAY_BETWEEN_BATCHES} seconds before next batch...") - time.sleep(DELAY_BETWEEN_BATCHES) - - except KeyboardInterrupt: - logger.info("Process interrupted by user. Progress has been saved.") - except Exception as e: - logger.error(f"Error during processing: {e}") - logger.info("Progress has been saved.") - - # Create DataFrame and save results - if all_results: - results_df = pd.DataFrame(all_results) - output_file = "data/enriched/spotify_api_metadata.csv" - results_df.to_csv(output_file, index=False) - logger.info(f"Saved {len(results_df)} track metadata results to {output_file}") - - # Count successful enrichments - successful = results_df['spotify_id'].notna().sum() - logger.info(f"Successfully enriched {successful} tracks via API ({successful/len(results_df)*100:.1f}%)") - - # Merge with original streaming data for complete enriched dataset - logger.info("Merging API metadata with streaming history...") - enriched_data = df.merge( - results_df, - on=['master_metadata_track_name', 'master_metadata_album_artist_name'], - how='left' + + total_batches = (len(unique_tracks) + BATCH_SIZE - 1) // BATCH_SIZE + + for start in tqdm( + range(0, len(unique_tracks), BATCH_SIZE), + disable=TQDM_DISABLE, + desc="Processing batches", + ): + batch_num = start // BATCH_SIZE + 1 + batch_df = unique_tracks.iloc[start : start + BATCH_SIZE] + processed, skipped, api_hits, upserts = process_batch(batch_df, processed_keys, conn, META_CSV) + conn.commit() + logger.info( + "batch=%s processed=%s skipped=%s api_hits=%s sqlite_upserts=%s", + batch_num, + processed, + skipped, + api_hits, + upserts, ) - - final_output = "data/enriched/spotify_api_enriched_streaming_history.csv" - enriched_data.to_csv(final_output, index=False) - logger.info(f"Saved complete enriched dataset to {final_output}") - - # Final statistics - api_enriched_count = enriched_data['spotify_id'].notna().sum() - logger.info(f"Final dataset: {len(enriched_data)} records with {api_enriched_count} API-enriched ({api_enriched_count/len(enriched_data)*100:.1f}%)") - - else: - logger.info("No new tracks were processed.") + + if batch_num % 10 == 0 and psutil: + try: # pragma: no cover - best effort only + rss = psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024) + logger.info("memory_rss_mb=%.1f", rss) + except Exception: + pass + + if start + BATCH_SIZE < len(unique_tracks): + time.sleep(DELAY_BETWEEN_BATCHES) + + conn.close() + + run_duckdb_merge() + if __name__ == "__main__": - main() + import argparse + + parser = argparse.ArgumentParser(description="Spotify API metadata enrichment") + parser.add_argument( + "--merge-only", + action="store_true", + help="run only the DuckDB merge step", + ) + args = parser.parse_args() + main(merge_only=args.merge_only) +