diff --git a/README.md b/README.md index c81f0df..c4132a2 100644 --- a/README.md +++ b/README.md @@ -29,8 +29,8 @@ specify them as command line arguments. Clone the repo and install the dependencies: ```bash -git clone https://github.com/yourusername/substack_scraper.git -cd substack_scraper +git clone https://github.com/yourusername/Substack2Markdown.git +cd Substack2Markdown # # Optinally create a virtual environment # python -m venv venv diff --git a/requirements.txt b/requirements.txt index c58926a..7cff179 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,25 @@ +attrs==25.4.0 +beautifulsoup4==4.14.3 bs4==0.0.1 +certifi==2026.1.4 +charset-normalizer==3.4.4 +h11==0.16.0 html2text==2020.1.16 +idna==3.11 +Markdown==3.6 +outcome==1.3.0.post0 +packaging==26.0 +PySocks==1.7.1 +python-dotenv==1.2.1 requests==2.31.0 selenium==4.16.0 +sniffio==1.3.1 +sortedcontainers==2.4.0 +soupsieve==2.8.3 tqdm==4.66.1 -webdriver_manager==4.0.1 -Markdown==3.6 +trio==0.32.0 +trio-websocket==0.12.2 +typing_extensions==4.15.0 +urllib3==2.6.3 +webdriver-manager==4.0.1 +wsproto==1.3.2 diff --git a/substack_scraper.py b/substack_scraper.py index 126d260..6601937 100644 --- a/substack_scraper.py +++ b/substack_scraper.py @@ -1,11 +1,12 @@ import argparse import json import os +import sys +import random from abc import ABC, abstractmethod from typing import List, Optional, Tuple from time import sleep - import html2text import markdown import requests @@ -16,10 +17,14 @@ from selenium import webdriver from selenium.webdriver.common.by import By -from webdriver_manager.microsoft import EdgeChromiumDriverManager +from selenium.common.exceptions import SessionNotCreatedException, WebDriverException +from selenium.webdriver.chrome.service import Service as ChromeService +from selenium.webdriver.edge.service import Service as EdgeService +from selenium.webdriver.chrome.options import Options as ChromeOptions from selenium.webdriver.edge.options import Options as EdgeOptions -from selenium.common.exceptions import SessionNotCreatedException -from selenium.webdriver.chrome.service import Service +from webdriver_manager.chrome import ChromeDriverManager +from webdriver_manager.microsoft import EdgeChromiumDriverManager + from urllib.parse import urlparse from config import EMAIL, PASSWORD @@ -28,42 +33,34 @@ BASE_MD_DIR: str = "substack_md_files" # Name of the directory we'll save the .md essay files BASE_HTML_DIR: str = "substack_html_pages" # Name of the directory we'll save the .html essay files HTML_TEMPLATE: str = "author_template.html" # HTML template to use for the author page -JSON_DATA_DIR: str = "data" +JSON_DATA_DIR: str = "data" NUM_POSTS_TO_SCRAPE: int = 3 # Set to 0 if you want all posts def extract_main_part(url: str) -> str: - parts = urlparse(url).netloc.split('.') # Parse the URL to get the netloc, and split on '.' - return parts[1] if parts[0] == 'www' else parts[0] # Return the main part of the domain, while ignoring 'www' if - # present + parts = urlparse(url).netloc.split('.') + return parts[1] if parts[0] == 'www' else parts[0] def generate_html_file(author_name: str) -> None: - """ - Generates a HTML file for the given author. - """ if not os.path.exists(BASE_HTML_DIR): os.makedirs(BASE_HTML_DIR) - # Read JSON data json_path = os.path.join(JSON_DATA_DIR, f'{author_name}.json') with open(json_path, 'r', encoding='utf-8') as file: essays_data = json.load(file) - # Convert JSON data to a JSON string for embedding embedded_json_data = json.dumps(essays_data, ensure_ascii=False, indent=4) with open(HTML_TEMPLATE, 'r', encoding='utf-8') as file: html_template = file.read() - # Insert the JSON string into the script tag in the HTML template - html_with_data = html_template.replace('', author_name).replace( + html_with_data = html_template.replace('', author_name).replace( '', f'' ) html_with_author = html_with_data.replace('author_name', author_name) - # Write the modified HTML to a new file html_output_path = os.path.join(BASE_HTML_DIR, f'{author_name}.html') with open(html_output_path, 'w', encoding='utf-8') as file: file.write(html_with_author) @@ -92,62 +89,42 @@ def __init__(self, base_substack_url: str, md_save_dir: str, html_save_dir: str) self.post_urls: List[str] = self.get_all_post_urls() def get_all_post_urls(self) -> List[str]: - """ - Attempts to fetch URLs from sitemap.xml, falling back to feed.xml if necessary. - """ urls = self.fetch_urls_from_sitemap() if not urls: urls = self.fetch_urls_from_feed() return self.filter_urls(urls, self.keywords) def fetch_urls_from_sitemap(self) -> List[str]: - """ - Fetches URLs from sitemap.xml. - """ sitemap_url = f"{self.base_substack_url}sitemap.xml" response = requests.get(sitemap_url) - if not response.ok: print(f'Error fetching sitemap at {sitemap_url}: {response.status_code}') return [] - root = ET.fromstring(response.content) urls = [element.text for element in root.iter('{http://www.sitemaps.org/schemas/sitemap/0.9}loc')] return urls def fetch_urls_from_feed(self) -> List[str]: - """ - Fetches URLs from feed.xml. - """ print('Falling back to feed.xml. This will only contain up to the 22 most recent posts.') feed_url = f"{self.base_substack_url}feed.xml" response = requests.get(feed_url) - if not response.ok: print(f'Error fetching feed at {feed_url}: {response.status_code}') return [] - root = ET.fromstring(response.content) urls = [] for item in root.findall('.//item'): link = item.find('link') if link is not None and link.text: urls.append(link.text) - return urls @staticmethod def filter_urls(urls: List[str], keywords: List[str]) -> List[str]: - """ - This method filters out URLs that contain certain keywords - """ return [url for url in urls if all(keyword not in url for keyword in keywords)] @staticmethod def html_to_md(html_content: str) -> str: - """ - This method converts HTML to Markdown - """ if not isinstance(html_content, str): raise ValueError("html_content must be a string") h = html2text.HTML2Text() @@ -157,45 +134,24 @@ def html_to_md(html_content: str) -> str: @staticmethod def save_to_file(filepath: str, content: str) -> None: - """ - This method saves content to a file. Can be used to save HTML or Markdown - """ if not isinstance(filepath, str): raise ValueError("filepath must be a string") - if not isinstance(content, str): raise ValueError("content must be a string") - if os.path.exists(filepath): print(f"File already exists: {filepath}") return - with open(filepath, 'w', encoding='utf-8') as file: file.write(content) @staticmethod def md_to_html(md_content: str) -> str: - """ - This method converts Markdown to HTML - """ return markdown.markdown(md_content, extensions=['extra']) - def save_to_html_file(self, filepath: str, content: str) -> None: - """ - This method saves HTML content to a file with a link to an external CSS file. - """ - if not isinstance(filepath, str): - raise ValueError("filepath must be a string") - - if not isinstance(content, str): - raise ValueError("content must be a string") - - # Calculate the relative path from the HTML file to the CSS file html_dir = os.path.dirname(filepath) css_path = os.path.relpath("./assets/css/essay-styles.css", html_dir) - css_path = css_path.replace("\\", "/") # Ensure forward slashes for web paths - + css_path = css_path.replace("\\", "/") html_content = f""" @@ -212,65 +168,41 @@ def save_to_html_file(self, filepath: str, content: str) -> None: """ - with open(filepath, 'w', encoding='utf-8') as file: file.write(html_content) @staticmethod def get_filename_from_url(url: str, filetype: str = ".md") -> str: - """ - Gets the filename from the URL (the ending) - """ if not isinstance(url, str): raise ValueError("url must be a string") - if not isinstance(filetype, str): raise ValueError("filetype must be a string") - if not filetype.startswith("."): filetype = f".{filetype}" - return url.split("/")[-1] + filetype @staticmethod def combine_metadata_and_content(title: str, subtitle: str, date: str, like_count: str, content) -> str: - """ - Combines the title, subtitle, and content into a single string with Markdown format - """ if not isinstance(title, str): raise ValueError("title must be a string") - if not isinstance(content, str): raise ValueError("content must be a string") - metadata = f"# {title}\n\n" if subtitle: metadata += f"## {subtitle}\n\n" metadata += f"**{date}**\n\n" metadata += f"**Likes:** {like_count}\n\n" - return metadata + content def extract_post_data(self, soup: BeautifulSoup) -> Tuple[str, str, str, str, str]: - """ - Converts a Substack post soup to markdown, returning metadata and content. - Returns (title, subtitle, like_count, date, md_content). - """ - # Title (sometimes h2 if video present) title_element = soup.select_one("h1.post-title, h2") title = title_element.text.strip() if title_element else "Untitled" - - # Subtitle subtitle_element = soup.select_one("h3.subtitle") subtitle = subtitle_element.text.strip() if subtitle_element else "" - - # Date — try CSS selector first date = "" date_element = soup.select_one("div.pencraft.pc-reset.color-pub-secondary-text-hGQ02T") if date_element and date_element.text.strip(): date = date_element.text.strip() - - # Fallback: JSON-LD metadata if not date: script_tag = soup.find("script", {"type": "application/ld+json"}) if script_tag and script_tag.string: @@ -282,41 +214,28 @@ def extract_post_data(self, soup: BeautifulSoup) -> Tuple[str, str, str, str, st date = date_obj.strftime("%b %d, %Y") except (json.JSONDecodeError, ValueError, KeyError): pass - if not date: date = "Date not found" - - # Like count like_count_element = soup.select_one("a.post-ufi-button .label") like_count = ( like_count_element.text.strip() if like_count_element and like_count_element.text.strip().isdigit() else "0" ) - - # Post content content_element = soup.select_one("div.available-content") content_html = str(content_element) if content_element else "" md = self.html_to_md(content_html) - - # Combine metadata + content md_content = self.combine_metadata_and_content(title, subtitle, date, like_count, md) - return title, subtitle, like_count, date, md_content - @abstractmethod def get_url_soup(self, url: str) -> str: raise NotImplementedError def save_essays_data_to_json(self, essays_data: list) -> None: - """ - Saves essays data to a JSON file for a specific author. - """ data_dir = os.path.join(JSON_DATA_DIR) if not os.path.exists(data_dir): os.makedirs(data_dir) - json_path = os.path.join(data_dir, f'{self.writer_name}.json') if os.path.exists(json_path): with open(json_path, 'r', encoding='utf-8') as file: @@ -332,6 +251,7 @@ def scrape_posts(self, num_posts_to_scrape: int = 0) -> None: essays_data = [] count = 0 total = num_posts_to_scrape if num_posts_to_scrape != 0 else len(self.post_urls) + for url in tqdm(self.post_urls, total=total): try: md_filename = self.get_filename_from_url(url, filetype=".md") @@ -340,14 +260,19 @@ def scrape_posts(self, num_posts_to_scrape: int = 0) -> None: html_filepath = os.path.join(self.html_save_dir, html_filename) if not os.path.exists(md_filepath): + # CHANGED: Use tqdm.write to print above the progress bar + tqdm.write(f"Downloading: {md_filename}") + + # Sleep happens AFTER we tell the user what we are doing + sleep_time = random.uniform(10, 20) + sleep(sleep_time) + soup = self.get_url_soup(url) if soup is None: - total += 1 continue + title, subtitle, like_count, date, md = self.extract_post_data(soup) self.save_to_file(md_filepath, md) - - # Convert markdown to HTML and save html_content = self.md_to_html(md) self.save_to_html_file(html_filepath, html_content) @@ -360,12 +285,16 @@ def scrape_posts(self, num_posts_to_scrape: int = 0) -> None: "html_link": html_filepath }) else: - print(f"File already exists: {md_filepath}") + # CHANGED: Use tqdm.write for skipping as well to keep the UI consistent + tqdm.write(f"Skipping (Exists): {md_filename}") + except Exception as e: - print(f"Error scraping post: {e}") + tqdm.write(f"Error scraping post: {e}") + count += 1 if num_posts_to_scrape != 0 and count == num_posts_to_scrape: break + self.save_essays_data_to_json(essays_data=essays_data) generate_html_file(author_name=self.writer_name) @@ -375,16 +304,25 @@ def __init__(self, base_substack_url: str, md_save_dir: str, html_save_dir: str) super().__init__(base_substack_url, md_save_dir, html_save_dir) def get_url_soup(self, url: str) -> Optional[BeautifulSoup]: - """ - Gets soup from URL using requests - """ try: - page = requests.get(url, headers=None) - soup = BeautifulSoup(page.content, "html.parser") - if soup.find("h2", class_="paywall-title"): - print(f"Skipping premium article: {url}") - return None - return soup + # CHANGED: Added basic retry for requests-based scraper too + max_retries = 3 + for attempt in range(max_retries): + page = requests.get(url, headers=None) + if page.status_code == 429: + print(f"Rate limited (429). Waiting 60s... (Attempt {attempt+1}/{max_retries})") + sleep(60) + continue + elif page.status_code != 200: + print(f"Error fetching page: Status {page.status_code}") + return None + + soup = BeautifulSoup(page.content, "html.parser") + if soup.find("h2", class_="paywall-title"): + print(f"Skipping premium article: {url}") + return None + return soup + return None except Exception as e: raise ValueError(f"Error fetching page: {e}") from e @@ -396,197 +334,185 @@ def __init__( md_save_dir: str, html_save_dir: str, headless: bool = False, - edge_path: str = '', - edge_driver_path: str = '', + browser: str = 'auto', + browser_path: str = '', + driver_path: str = '', user_agent: str = '' ) -> None: super().__init__(base_substack_url, md_save_dir, html_save_dir) - options = EdgeOptions() - if headless: - # modern headless flag (works better with recent Edge/Chromium) - options.add_argument("--headless=new") - if edge_path: - options.binary_location = edge_path - if user_agent: - options.add_argument(f"user-agent={user_agent}") - - if isinstance(options, EdgeOptions): - os.environ.setdefault("SE_DRIVER_MIRROR_URL", "https://msedgedriver.microsoft.com") - elif isinstance(options, ChromeOptions): - os.environ.setdefault("SE_DRIVER_MIRROR_URL", "https://chromedriver.storage.googleapis.com") - - + self.headless = headless + self.user_agent = user_agent + self.browser_path = browser_path + self.driver_path = driver_path self.driver = None + + os.environ.setdefault("SE_DRIVER_MIRROR_URL", "https://chromedriver.storage.googleapis.com") - # 1) Prefer an explicit driver path (manual download) - if edge_driver_path and os.path.exists(edge_driver_path): - service = Service(executable_path=edge_driver_path) - self.driver = webdriver.Edge(service=service, options=options) + if browser.lower() == 'chrome': + self._init_chrome() + elif browser.lower() == 'edge': + self._init_edge() else: - # 2) Try webdriver_manager (needs network/DNS) try: - service = Service(EdgeChromiumDriverManager().install()) - self.driver = webdriver.Edge(service=service, options=options) - except Exception as e: - print("webdriver_manager could not download msedgedriver (network/DNS). Falling back to Selenium Manager.") - # 3) Selenium Manager fallback (still needs network; but avoids webdriver_manager) - try: - # IMPORTANT: ensure no stale driver in PATH (e.g. C:\Windows\msedgedriver.exe v138) - self.driver = webdriver.Edge(options=options) - except SessionNotCreatedException as se: - raise RuntimeError( - "Selenium Manager fallback failed due to driver/browser mismatch.\n" - "Fix by either: (a) removing stale msedgedriver in PATH (e.g. C:\\Windows\\msedgedriver.exe) and replace with a fresh one downloaded from https://developer.microsoft.com/en-us/microsoft-edge/tools/webdriver, " - "or (b) pass --edge-driver-path to a manually downloaded driver that matches your Edge version." - ) from se + print("Attempting to initialize Chrome...") + self._init_chrome() + except (SessionNotCreatedException, WebDriverException, Exception) as e: + print(f"Chrome initialization failed: {e}") + print("Falling back to Edge...") + self._init_edge() + + if self.driver is None: + raise RuntimeError("Failed to initialize both Chrome and Edge drivers.") self.login() + def _init_chrome(self): + options = ChromeOptions() + if self.headless: + options.add_argument("--headless=new") + if self.browser_path and "chrome" in self.browser_path.lower(): + options.binary_location = self.browser_path + if self.user_agent: + options.add_argument(f"user-agent={self.user_agent}") + + driver_path = self.driver_path + if not driver_path: + try: + driver_path = ChromeDriverManager().install() + if "THIRD_PARTY_NOTICES" in driver_path: + base_dir = os.path.dirname(driver_path) + candidate = os.path.join(base_dir, "chromedriver") + if os.path.exists(candidate): + driver_path = candidate + except Exception as e: + print(f"Error installing Chromedriver via manager: {e}") + raise + + if driver_path and os.path.exists(driver_path) and os.name == 'posix': + try: + os.chmod(driver_path, 0o755) + except Exception: + pass + + if driver_path: + service = ChromeService(executable_path=driver_path) + self.driver = webdriver.Chrome(service=service, options=options) + else: + self.driver = webdriver.Chrome(options=options) + + def _init_edge(self): + options = EdgeOptions() + if self.headless: + options.add_argument("--headless=new") + if self.browser_path and "msedge" in self.browser_path.lower(): + options.binary_location = self.browser_path + if self.user_agent: + options.add_argument(f"user-agent={self.user_agent}") + + if self.driver_path and os.path.exists(self.driver_path) and "msedge" in self.driver_path.lower(): + service = EdgeService(executable_path=self.driver_path) + self.driver = webdriver.Edge(service=service, options=options) + else: + service = EdgeService(EdgeChromiumDriverManager().install()) + self.driver = webdriver.Edge(service=service, options=options) + def login(self) -> None: - """ - This method logs into Substack using Selenium - """ self.driver.get("https://substack.com/sign-in") sleep(3) - signin_with_password = self.driver.find_element( By.XPATH, "//a[@class='login-option substack-login__login-option']" ) signin_with_password.click() sleep(3) - - # Email and password email = self.driver.find_element(By.NAME, "email") password = self.driver.find_element(By.NAME, "password") email.send_keys(EMAIL) password.send_keys(PASSWORD) - - # Find the submit button and click it. submit = self.driver.find_element(By.XPATH, "//*[@id=\"substack-login\"]/div[2]/div[2]/form/button") submit.click() - sleep(30) # Wait for the page to load - + sleep(30) if self.is_login_failed(): - raise Exception( - "Warning: Login unsuccessful. Please check your email and password, or your account status.\n" - "Use the non-premium scraper for the non-paid posts. \n" - "If running headless, run non-headlessly to see if blocked by Captcha." - ) + raise Exception("Warning: Login unsuccessful.") def is_login_failed(self) -> bool: - """ - Check for the presence of the 'error-container' to indicate a failed login attempt. - """ error_container = self.driver.find_elements(By.ID, 'error-container') return len(error_container) > 0 and error_container[0].is_displayed() - def get_url_soup(self, url: str) -> BeautifulSoup: + def get_url_soup(self, url: str) -> Optional[BeautifulSoup]: """ - Gets soup from URL using logged in selenium driver + Gets soup from URL using logged in selenium driver with RETRY LOGIC for Rate Limits """ - try: - self.driver.get(url) - return BeautifulSoup(self.driver.page_source, "html.parser") - except Exception as e: - raise ValueError(f"Error fetching page: {e}") from e + max_retries = 3 + for attempt in range(max_retries): + try: + self.driver.get(url) + + # CHANGED: Randomized wait for JS to render + sleep(random.uniform(5, 8)) + + page_source = self.driver.page_source + soup = BeautifulSoup(page_source, "html.parser") + + # Check for rate limit indicators in the page content + text_content = soup.get_text().lower() + if "too many requests" in text_content: + wait_time = 60 + (attempt * 30) # Increase wait on subsequent failures + print(f"Rate limited detected (Too Many Requests). Waiting {wait_time}s before retry {attempt + 1}/{max_retries}...") + sleep(wait_time) + continue + + return soup + + except Exception as e: + print(f"Error fetching page (attempt {attempt+1}): {e}") + if attempt == max_retries - 1: + # On final failure, raise the error so it can be caught by the outer loop + raise ValueError(f"Failed to fetch {url} after {max_retries} attempts") from e + sleep(30) + + return None def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Scrape a Substack site.") - parser.add_argument( - "-u", "--url", type=str, help="The base URL of the Substack site to scrape." - ) - parser.add_argument( - "-d", "--directory", type=str, help="The directory to save scraped posts." - ) - parser.add_argument( - "-n", - "--number", - type=int, - default=0, - help="The number of posts to scrape. If 0 or not provided, all posts will be scraped.", - ) - parser.add_argument( - "-p", - "--premium", - action="store_true", - help="Include -p in command to use the Premium Substack Scraper with selenium.", - ) - parser.add_argument( - "--headless", - action="store_true", - help="Include -h in command to run browser in headless mode when using the Premium Substack " - "Scraper.", - ) - parser.add_argument( - "--edge-path", - type=str, - default="", - help='Optional: The path to the Edge browser executable (i.e. "path_to_msedge.exe").', - ) - parser.add_argument( - "--edge-driver-path", - type=str, - default="", - help='Optional: The path to the Edge WebDriver executable (i.e. "path_to_msedgedriver.exe").', - ) - parser.add_argument( - "--user-agent", - type=str, - default="", - help="Optional: Specify a custom user agent for selenium browser automation. Useful for " - "passing captcha in headless mode", - ) - parser.add_argument( - "--html-directory", - type=str, - help="The directory to save scraped posts as HTML files.", - ) - + parser.add_argument("-u", "--url", type=str, help="The base URL of the Substack site to scrape.") + parser.add_argument("-d", "--directory", type=str, help="The directory to save scraped posts.") + parser.add_argument("-n", "--number", type=int, default=0, help="The number of posts to scrape.") + parser.add_argument("-p", "--premium", action="store_true", help="Use the Premium Substack Scraper.") + parser.add_argument("--headless", action="store_true", help="Run browser in headless mode.") + parser.add_argument("--browser", type=str, default="auto", choices=["chrome", "edge", "auto"], help="The browser to use.") + parser.add_argument("--browser-path", type=str, default="", help='Optional: Path to browser executable.') + parser.add_argument("--driver-path", type=str, default="", help='Optional: Path to WebDriver executable.') + parser.add_argument("--user-agent", type=str, default="", help="Optional: Specify a custom user agent.") + parser.add_argument("--html-directory", type=str, help="The directory to save scraped posts as HTML files.") return parser.parse_args() def main(): args = parse_args() - - if args.directory is None: - args.directory = BASE_MD_DIR - - if args.html_directory is None: - args.html_directory = BASE_HTML_DIR + if args.directory is None: args.directory = BASE_MD_DIR + if args.html_directory is None: args.html_directory = BASE_HTML_DIR if args.url: if args.premium: scraper = PremiumSubstackScraper( - args.url, - headless=args.headless, - md_save_dir=args.directory, - html_save_dir=args.html_directory + args.url, headless=args.headless, md_save_dir=args.directory, + html_save_dir=args.html_directory, browser=args.browser, + browser_path=args.browser_path, driver_path=args.driver_path, + user_agent=args.user_agent ) else: - scraper = SubstackScraper( - args.url, - md_save_dir=args.directory, - html_save_dir=args.html_directory - ) + scraper = SubstackScraper(args.url, md_save_dir=args.directory, html_save_dir=args.html_directory) scraper.scrape_posts(args.number) - - else: # Use the hardcoded values at the top of the file + else: if USE_PREMIUM: scraper = PremiumSubstackScraper( - base_substack_url=BASE_SUBSTACK_URL, - md_save_dir=args.directory, - html_save_dir=args.html_directory, - edge_path=args.edge_path, - edge_driver_path=args.edge_driver_path + base_substack_url=BASE_SUBSTACK_URL, md_save_dir=args.directory, + html_save_dir=args.html_directory, browser="auto" ) else: - scraper = SubstackScraper( - base_substack_url=BASE_SUBSTACK_URL, - md_save_dir=args.directory, - html_save_dir=args.html_directory - ) + scraper = SubstackScraper(base_substack_url=BASE_SUBSTACK_URL, md_save_dir=args.directory, html_save_dir=args.html_directory) scraper.scrape_posts(num_posts_to_scrape=NUM_POSTS_TO_SCRAPE)