diff --git a/jobs/release_scraping/release_scraping/main.py b/jobs/release_scraping/release_scraping/main.py index c1846f08..152ea048 100644 --- a/jobs/release_scraping/release_scraping/main.py +++ b/jobs/release_scraping/release_scraping/main.py @@ -1,6 +1,8 @@ +import html import json import re import time +import xml.etree.ElementTree as ET import feedparser import requests @@ -8,6 +10,7 @@ from bs4 import BeautifulSoup from datetime import datetime from google.cloud import storage +from urllib.parse import urlparse from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service @@ -23,8 +26,42 @@ MIN_RELEASE_DATE = "2020-01-01" +# Firefox user-facing release notes +FIREFOX_PRODUCT_DETAILS_URL = "https://product-details.mozilla.org/1.0/firefox.json" +FIREFOX_USER_NOTES_URL = "https://www.firefox.com/en-US/firefox/{version}/releasenotes/" + +# User-facing blog RSS feeds +GCS_BLOGS_PREFIX = "MARKET_RESEARCH/BLOGS" + +BLOG_FEEDS = { + "Chrome": "https://blog.google/products-and-platforms/products/chrome/rss/", + "Edge": "https://blogs.windows.com/msedgedev/feed/", + "Brave": "https://brave.com/blog/index.xml", + "Opera": "https://blogs.opera.com/desktop/feed/", + "Vivaldi": "https://vivaldi.com/feed/", +} + +# Job postings +GCS_JOBS_PREFIX = "MARKET_RESEARCH/JOBS" + +GREENHOUSE_BOARDS = { + "Mozilla": "mozilla", + "Brave": "brave", +} +GREENHOUSE_API_URL = ( + "https://boards-api.greenhouse.io/v1/boards/{board}/jobs?content=true" +) + +OPERA_SITEMAP_URL = "https://jobs.opera.com/sitemap.xml" + TIMEOUT_IN_SECONDS = 20 REQUEST_DELAY_SECONDS = 2 +REQUEST_HEADERS = { + "User-Agent": ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko)" + ) +} DRIVER_TYP = "Chromium" BINARY_LOC = "/usr/bin/chromium" DRIVER_PATH = "/usr/bin/chromedriver" @@ -96,6 +133,84 @@ def gcs_path_for(browser_name, version, release_date): ) +def gcs_user_release_path_for(browser_name, version, release_date): + """Construct the GCS object path for a user-facing browser release. + + Uses a user_release_ prefix to distinguish from developer release notes + stored by gcs_path_for. + """ + browser_path = browser_name.replace(" ", "_") + version_clean = version.replace(".", "_") + date_clean = release_date.replace("-", "") + return ( + f"{GCS_STRUCTURED_PREFIX}/{browser_path}" + f"/user_release_{version_clean}_{date_clean}.json" + ) + + +def gcs_blog_path_for(browser_name, publish_date, url): + """Construct the GCS object path for a browser blog post. + + Uses the last URL path segment as a stable slug for deduplication. + """ + browser_path = browser_name.replace(" ", "_") + date_clean = publish_date.replace("-", "") + slug = urlparse(url).path.rstrip("/").split("/")[-1] + slug = re.sub(r"[^a-zA-Z0-9-]", "-", slug)[:40] + return f"{GCS_BLOGS_PREFIX}/{browser_path}/post_{date_clean}_{slug}.json" + + +def fetch_firefox_user_releases(): + """Fetch all major Firefox releases from the Mozilla product-details API. + + Returns a list of dicts with keys: version, release_date. + Only major releases are included. Ordered newest-first. + """ + response = requests.get( + FIREFOX_PRODUCT_DETAILS_URL, + headers=REQUEST_HEADERS, + timeout=TIMEOUT_IN_SECONDS, + ) + response.raise_for_status() + data = response.json() + + releases = [] + for release in data["releases"].values(): + if release["category"] != "major": + continue + releases.append( + { + "version": release["version"], + "release_date": release["date"], + } + ) + + releases.sort(key=lambda r: r["release_date"], reverse=True) + return releases + + +def parse_blog_feed(feed_url): + """Parse an RSS/Atom blog feed and return post entries. + + Returns a list of dicts with keys: title, release_date, url. + Ordered as provided by the feed (typically newest-first). + Entries missing a link or date are skipped. + """ + feed = feedparser.parse(feed_url) + posts = [] + for entry in feed.entries: + title = getattr(entry, "title", "") or "" + link = getattr(entry, "link", "") or "" + date_parsed = getattr(entry, "published_parsed", None) or getattr( + entry, "updated_parsed", None + ) + if not link or not date_parsed: + continue + publish_date = datetime(*date_parsed[:3]).strftime("%Y-%m-%d") + posts.append({"title": title, "release_date": publish_date, "url": link}) + return posts + + def scrape_page_text(url, driver=None, use_js=False): """Scrape plain text from a URL, using Selenium for JS-rendered pages.""" if use_js and driver is not None: @@ -107,7 +222,9 @@ def scrape_page_text(url, driver=None, use_js=False): time.sleep(2) soup = BeautifulSoup(driver.page_source, "html.parser") else: - response = requests.get(url, timeout=TIMEOUT_IN_SECONDS) + response = requests.get( + url, headers=REQUEST_HEADERS, timeout=TIMEOUT_IN_SECONDS + ) response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") # Prefer the most specific semantic content element to avoid nav/sidebar bloat. @@ -140,10 +257,11 @@ def main(): client = storage.Client(project="moz-fx-data-shared-prod") bucket = client.bucket(GCS_BUCKET_NAME) - # Fetch all existing GCS paths once to avoid N individual exists() calls + # Fetch all existing GCS paths once to avoid N individual exists() calls. + # Use the common "MARKET_RESEARCH/" ancestor to cover both STRUCTURED/ and BLOGS/. existing_paths = { blob.name - for blob in client.list_blobs(GCS_BUCKET_NAME, prefix=GCS_STRUCTURED_PREFIX) + for blob in client.list_blobs(GCS_BUCKET_NAME, prefix="MARKET_RESEARCH/") } print(f"Found {len(existing_paths)} existing objects in GCS") @@ -196,6 +314,356 @@ def main(): if driver is not None: driver.quit() + scrape_and_upload_user_releases(scraped_date, bucket, existing_paths) + scrape_and_upload_blog_posts(scraped_date, bucket, existing_paths) + scrape_and_upload_jobs(scraped_date, bucket) + + +def scrape_and_upload_user_releases(scraped_date, bucket, existing_paths): + """Scrape Firefox user-facing release notes and upload new ones to GCS.""" + print("--- Scraping Firefox user-facing release notes ---") + try: + ff_releases = fetch_firefox_user_releases() + except Exception as e: + print(f"Failed to fetch Firefox product details: {e}") + return + + ff_releases = [r for r in ff_releases if r["release_date"] >= MIN_RELEASE_DATE] + print(f"Found {len(ff_releases)} Firefox user releases since {MIN_RELEASE_DATE}") + + for release in ff_releases: + version = release["version"] + release_date = release["release_date"] + url = FIREFOX_USER_NOTES_URL.format(version=version) + gcs_path = gcs_user_release_path_for("Firefox", version, release_date) + + if gcs_path in existing_paths: + print(f"Skipping Firefox {version} user release — already in GCS") + continue + + print(f"Scraping Firefox {version} user release ({release_date}): {url}") + try: + raw_text = scrape_page_text(url) + except Exception as e: + print(f"Failed to scrape Firefox {version} user release: {e}") + continue + + record = { + "browser": "Firefox", + "version": version, + "release_date": release_date, + "scraped_date": scraped_date, + "source_url": url, + "source_type": "user_release_notes", + "features": [], + "raw_text": raw_text, + } + + blob = bucket.blob(gcs_path) + blob.upload_from_string( + json.dumps(record, indent=2), content_type="application/json" + ) + print(f"Uploaded to gs://{GCS_BUCKET_NAME}/{gcs_path}") + time.sleep(REQUEST_DELAY_SECONDS) + + +def scrape_and_upload_blog_posts(scraped_date, bucket, existing_paths): + """Scrape browser blog RSS feeds and upload new posts to GCS.""" + print("--- Scraping browser blog posts ---") + for browser_name, feed_url in BLOG_FEEDS.items(): + print(f"Fetching {browser_name} blog feed") + try: + posts = parse_blog_feed(feed_url) + except Exception as e: + print(f"Failed to fetch {browser_name} blog feed: {e}") + continue + + for post in posts: + publish_date = post["release_date"] + url = post["url"] + title = post["title"] + + if publish_date < MIN_RELEASE_DATE: + continue + + gcs_path = gcs_blog_path_for(browser_name, publish_date, url) + + if gcs_path in existing_paths: + print(f"Skipping {browser_name} post ({publish_date}) — already in GCS") + continue + + print(f"Scraping {browser_name} blog post: {title}") + try: + raw_text = scrape_page_text(url) + except Exception as e: + print(f"Failed to scrape {browser_name} post {url}: {e}") + continue + + record = { + "browser": browser_name, + "version": None, + "release_date": publish_date, + "scraped_date": scraped_date, + "source_url": url, + "source_type": "blog_post", + "title": title, + "features": [], + "raw_text": raw_text, + } + + blob = bucket.blob(gcs_path) + blob.upload_from_string( + json.dumps(record, indent=2), content_type="application/json" + ) + print(f"Uploaded to gs://{GCS_BUCKET_NAME}/{gcs_path}") + time.sleep(REQUEST_DELAY_SECONDS) + + +def gcs_job_path_for(company, scraped_date, job_id): + """Construct GCS path for a job posting snapshot. + + Path includes the scrape date so each run produces a full snapshot + and the same job appears in every snapshot where it's still open. + """ + company_path = company.replace(" ", "_") + return f"{GCS_JOBS_PREFIX}/{company_path}/{scraped_date}/job_{job_id}.json" + + +def fetch_greenhouse_jobs(board_slug): + """Fetch all jobs from a Greenhouse board with full descriptions. + + Uses ?content=true to get everything in a single API call. + Returns the raw list of job dicts from the API response. + """ + url = GREENHOUSE_API_URL.format(board=board_slug) + response = requests.get(url, headers=REQUEST_HEADERS, timeout=TIMEOUT_IN_SECONDS) + response.raise_for_status() + return response.json()["jobs"] + + +def greenhouse_job_to_record(company, job, scraped_date): + """Transform a Greenhouse API job dict into our unified record schema.""" + # Greenhouse returns content as HTML-entity-encoded text; unescape first + content_raw = job.get("content", "") + content_html = html.unescape(content_raw) + description_text = BeautifulSoup(content_html, "html.parser").get_text( + separator="\n", strip=True + ) + + departments = [d["name"] for d in job.get("departments", []) if d.get("name")] + offices = [o["name"] for o in job.get("offices", []) if o.get("name")] + + return { + "company": company, + "source": "greenhouse", + "scraped_date": scraped_date, + "job_id": str(job["id"]), + "title": job.get("title", ""), + "department": departments[0] if departments else None, + "location": job.get("location", {}).get("name", ""), + "offices": offices if offices else None, + "url": job.get("absolute_url", ""), + "first_published": job.get("first_published", ""), + "updated_at": job.get("updated_at", ""), + "description_html": content_html, + "description_text": description_text, + } + + +def fetch_opera_job_urls(): + """Parse Opera's sitemap.xml to extract individual job posting URLs.""" + response = requests.get( + OPERA_SITEMAP_URL, headers=REQUEST_HEADERS, timeout=TIMEOUT_IN_SECONDS + ) + response.raise_for_status() + + root = ET.fromstring(response.content) + ns = {"sm": "http://www.sitemaps.org/schemas/sitemap/0.9"} + + urls = [] + for loc in root.findall(".//sm:loc", ns): + url = loc.text.strip() + if re.search(r"/jobs/\d+-", url): + urls.append(url) + return urls + + +def opera_job_id_from_url(url): + """Extract the numeric job ID from an Opera job URL.""" + match = re.search(r"/jobs/(\d+)", url) + return match.group(1) if match else url.rstrip("/").split("/")[-1] + + +def scrape_opera_job(url): + """Scrape a single Opera job page and return a record dict. + + Extracts title, description, department, and location from the + server-rendered Teamtailor HTML. Handles cookie consent dialogs. + """ + response = requests.get(url, headers=REQUEST_HEADERS, timeout=TIMEOUT_IN_SECONDS) + response.raise_for_status() + soup = BeautifulSoup(response.text, "html.parser") + + # Remove cookie consent dialogs before extracting content + for el in soup.find_all( + ["div", "dialog", "section"], + attrs={"class": re.compile(r"cookie|consent|gdpr", re.I)}, + ): + el.decompose() + for el in soup.find_all( + ["div", "dialog", "section"], + attrs={"id": re.compile(r"cookie|consent|gdpr", re.I)}, + ): + el.decompose() + + title = "" + for h1 in soup.find_all("h1"): + text = h1.get_text(strip=True) + if text and "cookie" not in text.lower() and "consent" not in text.lower(): + title = text + break + + content_el = soup.find("article") + if not content_el: + content_el = soup.find("main") + if not content_el: + content_el = soup.find("body") + + description_text = ( + content_el.get_text(separator="\n", strip=True) if content_el else "" + ) + description_html = str(content_el) if content_el else "" + + # Extract metadata from JSON-LD structured data (schema.org/JobPosting) + department = None + location = None + first_published = None + for script in soup.find_all("script", type="application/ld+json"): + try: + ld = json.loads(script.string) + if ld.get("@type") == "JobPosting": + if ld.get("datePosted"): + first_published = ld["datePosted"][:10] + job_locations = ld.get("jobLocation", []) + if isinstance(job_locations, dict): + job_locations = [job_locations] + loc_parts = [] + for loc in job_locations: + addr = loc.get("address", {}) + city = addr.get("addressLocality", "") + country = addr.get("addressRegion", "") or addr.get( + "addressCountry", "" + ) + if city and country: + loc_parts.append(f"{city}, {country}") + elif city or country: + loc_parts.append(city or country) + if loc_parts: + location = "; ".join(loc_parts) + break + except (json.JSONDecodeError, TypeError): + continue + + # Fall back to
definition list for department and location + for dt in soup.find_all("dt"): + label = dt.get_text(strip=True).lower() + dd = dt.find_next_sibling("dd") + if not dd: + continue + value = dd.get_text(strip=True) + if "department" in label: + department = value + elif "location" in label and not location: + location = value + + return { + "title": title, + "department": department, + "location": location, + "first_published": first_published, + "description_html": description_html, + "description_text": description_text, + } + + +def scrape_and_upload_jobs(scraped_date, bucket): + """Scrape job postings from all configured sources and upload to GCS. + + Each run writes a complete snapshot under a date directory. A job that + stays open across runs appears in every snapshot (no cross-date dedup). + Same-day reruns overwrite (idempotent). + """ + print("--- Scraping job postings ---") + + for company, board in GREENHOUSE_BOARDS.items(): + print(f"{company} (Greenhouse: {board})") + try: + jobs = fetch_greenhouse_jobs(board) + except Exception as e: + print(f"Failed to fetch {company} jobs: {e}") + continue + + print(f" Found {len(jobs)} jobs") + for job in jobs: + try: + record = greenhouse_job_to_record(company, job, scraped_date) + except Exception as e: + print(f" Failed to parse job {job.get('id', '?')}: {e}") + continue + + gcs_path = gcs_job_path_for(company, scraped_date, record["job_id"]) + try: + blob = bucket.blob(gcs_path) + blob.upload_from_string( + json.dumps(record, indent=2, ensure_ascii=False), + content_type="application/json", + ) + print(f" {record['title']} -> gs://{GCS_BUCKET_NAME}/{gcs_path}") + except Exception as e: + print(f" Failed to upload {record['title']}: {e}") + + time.sleep(REQUEST_DELAY_SECONDS) + + print("Opera (Teamtailor)") + try: + job_urls = fetch_opera_job_urls() + except Exception as e: + print(f"Failed to fetch Opera sitemap: {e}") + job_urls = [] + + print(f" Found {len(job_urls)} jobs") + for url in job_urls: + job_id = opera_job_id_from_url(url) + try: + job_data = scrape_opera_job(url) + except Exception as e: + print(f" Failed to scrape {url}: {e}") + continue + + record = { + "company": "Opera", + "source": "teamtailor", + "scraped_date": scraped_date, + "job_id": job_id, + "url": url, + "offices": None, + "updated_at": None, + **job_data, + } + + gcs_path = gcs_job_path_for("Opera", scraped_date, job_id) + try: + blob = bucket.blob(gcs_path) + blob.upload_from_string( + json.dumps(record, indent=2, ensure_ascii=False), + content_type="application/json", + ) + print(f" {job_data['title']} -> gs://{GCS_BUCKET_NAME}/{gcs_path}") + except Exception as e: + print(f" Failed to upload {job_data.get('title', url)}: {e}") + + time.sleep(REQUEST_DELAY_SECONDS) + if __name__ == "__main__": main() diff --git a/jobs/release_scraping/tests/test_main.py b/jobs/release_scraping/tests/test_main.py index 97d493dd..85479efe 100644 --- a/jobs/release_scraping/tests/test_main.py +++ b/jobs/release_scraping/tests/test_main.py @@ -7,7 +7,12 @@ import pytest from release_scraping.main import ( + BLOG_FEEDS, + gcs_blog_path_for, gcs_path_for, + gcs_user_release_path_for, + fetch_firefox_user_releases, + parse_blog_feed, parse_feed, scrape_page_text, ) @@ -162,7 +167,13 @@ def fake_scrape(url, driver=None, use_js=False): with patch("release_scraping.main.parse_feed", return_value=fake_releases), patch( "release_scraping.main.storage.Client", return_value=mock_client - ), patch("release_scraping.main.scrape_page_text", side_effect=fake_scrape): + ), patch("release_scraping.main.scrape_page_text", side_effect=fake_scrape), patch( + "release_scraping.main.fetch_firefox_user_releases", return_value=[] + ), patch( + "release_scraping.main.parse_blog_feed", return_value=[] + ), patch( + "release_scraping.main.scrape_and_upload_jobs" + ): import sys sys.argv = ["main.py", "--date", "2026-03-13"] @@ -175,6 +186,209 @@ def fake_scrape(url, driver=None, use_js=False): ] +# --------------------------------------------------------------------------- +# New unit tests for additive sources +# --------------------------------------------------------------------------- + + +def test_gcs_user_release_path_for(): + assert gcs_user_release_path_for("Firefox", "149.0", "2026-03-24") == ( + "MARKET_RESEARCH/STRUCTURED/Firefox/user_release_149_0_20260324.json" + ) + # Must not collide with the dev-notes path for the same browser/date + assert gcs_user_release_path_for("Firefox", "149.0", "2026-03-24") != gcs_path_for( + "Firefox", "149.0", "2026-03-24" + ) + + +def test_gcs_blog_path_for(): + assert gcs_blog_path_for( + "Chrome", "2026-03-10", "https://blog.google/chrome/new-tab-redesign/" + ) == ("MARKET_RESEARCH/BLOGS/Chrome/post_20260310_new-tab-redesign.json") + # Spaces in browser name become underscores + assert gcs_blog_path_for( + "Opera Desktop", "2026-04-01", "https://example.com/some-post/" + ) == ("MARKET_RESEARCH/BLOGS/Opera_Desktop/post_20260401_some-post.json") + # Slug is capped at 40 characters + long_url = "https://example.com/" + "a" * 60 + "/" + result = gcs_blog_path_for("Brave", "2026-01-01", long_url) + slug = result.split("post_20260101_")[1].replace(".json", "") + assert len(slug) <= 40 + + +def test_fetch_firefox_user_releases(): + fake_response = MagicMock() + fake_response.json.return_value = { + "releases": { + "firefox-149.0": { + "category": "major", + "version": "149.0", + "date": "2026-03-24", + }, + "firefox-149.0b1": { + "category": "dev", + "version": "149.0b1", + "date": "2026-03-10", + }, + "firefox-148.0": { + "category": "major", + "version": "148.0", + "date": "2026-02-10", + }, + } + } + + with patch("release_scraping.main.requests.get", return_value=fake_response): + results = fetch_firefox_user_releases() + + assert len(results) == 2 # dev release filtered out + assert results[0] == {"version": "149.0", "release_date": "2026-03-24"} + assert results[1] == {"version": "148.0", "release_date": "2026-02-10"} + + +def test_parse_blog_feed(): + fake_feed = MagicMock() + fake_feed.entries = [ + MagicMock( + title="What's new in Chrome 146", + link="https://blog.google/chrome-146", + published_parsed=(2026, 3, 10, 0, 0, 0, 0, 0, 0), + updated_parsed=None, + ), + MagicMock( + title="No link post", + link="", + published_parsed=(2026, 3, 1, 0, 0, 0, 0, 0, 0), + updated_parsed=None, + ), + MagicMock( + title="No date post", + link="https://blog.google/no-date", + published_parsed=None, + updated_parsed=None, + ), + ] + + with patch("release_scraping.main.feedparser.parse", return_value=fake_feed): + results = parse_blog_feed("https://example.com/feed") + + assert len(results) == 1 # no-link and no-date entries skipped + assert results[0] == { + "title": "What's new in Chrome 146", + "release_date": "2026-03-10", + "url": "https://blog.google/chrome-146", + } + + +def test_main_firefox_user_release_skips_existing(): + """main() skips Firefox user releases that are already in GCS.""" + from release_scraping.main import main + + fake_ff_releases = [{"version": "149.0", "release_date": "2026-03-24"}] + + mock_bucket = MagicMock() + mock_client = MagicMock() + mock_client.bucket.return_value = mock_bucket + + existing_path = ( + "MARKET_RESEARCH/STRUCTURED/Firefox/user_release_149_0_20260324.json" + ) + existing_blob = MagicMock() + existing_blob.name = existing_path + mock_client.list_blobs.return_value = [existing_blob] + + with patch("release_scraping.main.parse_feed", return_value=[]), patch( + "release_scraping.main.fetch_firefox_user_releases", + return_value=fake_ff_releases, + ), patch("release_scraping.main.parse_blog_feed", return_value=[]), patch( + "release_scraping.main.storage.Client", return_value=mock_client + ), patch( + "release_scraping.main.scrape_and_upload_jobs" + ): + import sys + + sys.argv = ["main.py", "--date", "2026-04-17"] + main() + + mock_bucket.blob.assert_not_called() + + +def test_main_blog_posts_skips_existing_and_continues_on_failure(): + """main() skips existing blog posts and continues after a scrape failure.""" + from release_scraping.main import main + + chrome_post = { + "title": "Existing Chrome post", + "release_date": "2026-03-10", + "url": "https://blog.google/chrome/existing-post/", + } + edge_post = { + "title": "Failing Edge post", + "release_date": "2026-03-17", + "url": "https://blogs.windows.com/msedgedev/2026/03/17/failing-post/", + } + brave_post = { + "title": "Successful Brave post", + "release_date": "2026-03-20", + "url": "https://brave.com/blog/brave-post/", + } + + mock_bucket = MagicMock() + mock_blob = MagicMock() + mock_bucket.blob.return_value = mock_blob + mock_client = MagicMock() + mock_client.bucket.return_value = mock_bucket + + chrome_path = gcs_blog_path_for( + "Chrome", chrome_post["release_date"], chrome_post["url"] + ) + existing_blob = MagicMock() + existing_blob.name = chrome_path + mock_client.list_blobs.return_value = [existing_blob] + + def fake_blog_feed(feed_url): + if "chrome" in feed_url: + return [chrome_post] + if "msedgedev" in feed_url: + return [edge_post] + if "brave" in feed_url: + return [brave_post] + return [] + + def fake_scrape(url, driver=None, use_js=False): + if "failing" in url: + raise Exception("timeout") + return "blog post text" + + with patch("release_scraping.main.parse_feed", return_value=[]), patch( + "release_scraping.main.fetch_firefox_user_releases", return_value=[] + ), patch( + "release_scraping.main.parse_blog_feed", side_effect=fake_blog_feed + ), patch( + "release_scraping.main.scrape_page_text", side_effect=fake_scrape + ), patch( + "release_scraping.main.storage.Client", return_value=mock_client + ), patch( + "release_scraping.main.scrape_and_upload_jobs" + ): + import sys + + sys.argv = ["main.py", "--date", "2026-04-17"] + main() + + # Only Brave should have been uploaded (Chrome skipped, Edge failed) + uploaded_paths = [call.args[0] for call in mock_bucket.blob.call_args_list] + assert ( + gcs_blog_path_for("Brave", brave_post["release_date"], brave_post["url"]) + in uploaded_paths + ) + assert chrome_path not in uploaded_paths + assert ( + gcs_blog_path_for("Edge", edge_post["release_date"], edge_post["url"]) + not in uploaded_paths + ) + + # --------------------------------------------------------------------------- # Integration tests — run with: pytest --integration # --------------------------------------------------------------------------- @@ -360,3 +574,116 @@ def test_scrape_last_year_to_file(local_driver): print(f"\nOutput written to: {output_path}") assert os.path.exists(output_path) assert len(results) > 0 + + +# --------------------------------------------------------------------------- +# Integration tests for additive sources — run with: pytest --integration +# --------------------------------------------------------------------------- + + +@pytest.mark.integration +def test_firefox_product_details_api(): + """Product-details API returns major Firefox releases with required fields.""" + import re as _re + + releases = fetch_firefox_user_releases() + assert len(releases) > 0 + for r in releases: + assert r.get("version"), f"Missing version: {r}" + assert r.get("release_date"), f"Missing release_date: {r}" + assert _re.match( + r"\d{4}-\d{2}-\d{2}", r["release_date"] + ), f"Bad date format: {r['release_date']}" + + +@pytest.mark.integration +def test_blog_feeds_parseable(): + """All blog feeds return at least one post with required fields.""" + for browser_name, feed_url in BLOG_FEEDS.items(): + posts = parse_blog_feed(feed_url) + assert len(posts) > 0, f"No posts from {browser_name} feed ({feed_url})" + for p in posts: + for field in ("title", "release_date", "url"): + assert p.get(field), f"{browser_name} post missing '{field}': {p}" + + +@pytest.mark.integration +def test_scrape_user_content_to_file(): + """Scrape the latest entry from each user-facing source and write to a local JSON file. + + Output: tests/integration_output/scrape_user_content_{date}.json + """ + scraped_date = datetime.now(timezone.utc).strftime("%Y%m%d") + results = [] + + # Latest Firefox user-facing release + ff_releases = fetch_firefox_user_releases() + if ff_releases: + latest = ff_releases[0] + url = "https://www.firefox.com/en-US/firefox/{version}/releasenotes/".format( + version=latest["version"] + ) + record = { + "browser": "Firefox", + "version": latest["version"], + "release_date": latest["release_date"], + "scraped_date": scraped_date, + "source_url": url, + "source_type": "user_release_notes", + "raw_text": None, + "error": None, + } + try: + record["raw_text"] = scrape_page_text(url) + except Exception as e: + record["error"] = str(e) + char_count = len(record["raw_text"]) if record["raw_text"] else 0 + status = ( + f"{char_count:,} chars" + if record["raw_text"] + else f"FAILED ({record['error']})" + ) + print(f" Firefox {latest['version']} (user): {status}") + results.append(record) + + # Latest post from each blog feed + for browser_name, feed_url in BLOG_FEEDS.items(): + posts = parse_blog_feed(feed_url) + if not posts: + print(f" {browser_name}: no posts in feed") + continue + post = posts[0] + record = { + "browser": browser_name, + "version": None, + "release_date": post["release_date"], + "scraped_date": scraped_date, + "source_url": post["url"], + "source_type": "blog_post", + "title": post["title"], + "raw_text": None, + "error": None, + } + try: + record["raw_text"] = scrape_page_text(post["url"]) + except Exception as e: + record["error"] = str(e) + char_count = len(record["raw_text"]) if record["raw_text"] else 0 + status = ( + f"{char_count:,} chars" + if record["raw_text"] + else f"FAILED ({record['error']})" + ) + print(f" {browser_name} blog ({post['release_date']}): {status}") + results.append(record) + + output_dir = os.path.join(os.path.dirname(__file__), "integration_output") + os.makedirs(output_dir, exist_ok=True) + output_path = os.path.join(output_dir, f"scrape_user_content_{scraped_date}.json") + + with open(output_path, "w") as f: + json.dump(results, f, indent=2) + + print(f"\nOutput written to: {output_path}") + assert os.path.exists(output_path) + assert len(results) > 0