-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscraper.py
More file actions
457 lines (359 loc) · 16.5 KB
/
scraper.py
File metadata and controls
457 lines (359 loc) · 16.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
#!/usr/bin/env python3
"""
ProductHunt Scraper - Finds today's launches, their websites, and emails
"""
import csv
import re
from datetime import datetime
from dataclasses import dataclass, asdict, field
from playwright.sync_api import sync_playwright, Page, Browser
@dataclass
class Product:
name: str
tagline: str
ph_url: str
website: str
email: str
maker_name: str = ""
maker_profile: str = ""
twitter: str = ""
linkedin: str = ""
github: str = ""
other_social: str = ""
source: str = "producthunt"
date: str = field(default_factory=lambda: datetime.now().strftime("%Y-%m-%d"))
def extract_emails_from_page(page: Page) -> list[str]:
"""Extract email addresses from page content"""
emails = set()
try:
# Get page content
content = page.content()
# Find emails using regex
email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
found = re.findall(email_pattern, content)
for email in found:
email = email.lower()
# Filter out common non-contact emails
if not any(x in email for x in ['example.com', 'sentry.io', 'wixpress',
'cloudflare', 'googleapis', 'schema.org', 'w3.org',
'webpack', 'github', '.png', '.jpg', '.svg']):
emails.add(email)
# Also check for mailto: links
mailto_links = page.query_selector_all('a[href^="mailto:"]')
for link in mailto_links:
href = link.get_attribute('href') or ''
if href.startswith('mailto:'):
email = href.replace('mailto:', '').split('?')[0].lower()
emails.add(email)
except Exception:
pass
return list(emails)
def get_product_website(page: Page, ph_product_url: str) -> str:
"""Visit ProductHunt product page and find the actual website"""
try:
page.goto(ph_product_url, timeout=30000, wait_until="domcontentloaded")
page.wait_for_timeout(2000)
# Look for "Visit website" link - it has the actual URL
visit_links = page.query_selector_all('a')
for link in visit_links:
text = (link.inner_text() or '').strip().lower()
if 'visit website' in text:
href = link.get_attribute('href')
if href and 'producthunt.com' not in href:
# Clean the URL - remove ref parameter
clean_url = href.split('?')[0]
return clean_url
except Exception as e:
print(f" ⚠️ Error getting website: {e}")
return ""
def get_maker_info(page: Page, ph_product_url: str) -> dict:
"""Extract maker info from the first comment on product page
Returns dict with maker_name, maker_profile, twitter, linkedin, github, other_social
"""
result = {
'maker_name': '',
'maker_profile': '',
'twitter': '',
'linkedin': '',
'github': '',
'other_social': ''
}
try:
# Make sure we're on the product page
if page.url != ph_product_url:
page.goto(ph_product_url, timeout=30000, wait_until="domcontentloaded")
page.wait_for_timeout(2000)
# Find the first comment with "Maker" badge
# The maker badge appears near the commenter's name
maker_elements = page.query_selector_all('text=Maker')
if maker_elements:
# Find the parent container and look for the maker's profile link
for maker_badge in maker_elements:
try:
# Navigate up to find the comment container
parent = maker_badge.evaluate_handle('el => el.closest("div")')
if not parent:
continue
# Look for the maker's profile link nearby
# The structure is usually: name link followed by Maker badge
profile_links = page.query_selector_all('a[href^="/@"], a[href^="https://www.producthunt.com/@"]')
for link in profile_links:
href = link.get_attribute('href') or ''
if '/@' in href:
# Get maker name
name = link.inner_text().strip()
if name and len(name) > 1 and not name.startswith('Image'):
result['maker_name'] = name
# Get full profile URL
if href.startswith('/'):
result['maker_profile'] = f"https://www.producthunt.com{href}"
else:
result['maker_profile'] = href
break
if result['maker_profile']:
break
except Exception:
continue
# If we found a maker profile, visit it to get social links
if result['maker_profile']:
print(f" → Found maker: {result['maker_name']}")
print(f" → Visiting maker profile...")
social_links = get_maker_social_links(page, result['maker_profile'])
result.update(social_links)
except Exception as e:
print(f" ⚠️ Error getting maker info: {e}")
return result
def get_maker_social_links(page: Page, maker_profile_url: str) -> dict:
"""Visit maker profile page and extract social links"""
result = {
'twitter': '',
'linkedin': '',
'github': '',
'other_social': ''
}
try:
page.goto(maker_profile_url, timeout=30000, wait_until="domcontentloaded")
page.wait_for_timeout(2000)
# Find all links on the page
all_links = page.query_selector_all('a[href]')
other_socials = []
for link in all_links:
href = (link.get_attribute('href') or '').lower()
# Twitter/X
if ('twitter.com/' in href or 'x.com/' in href) and not result['twitter']:
if '/ProductHunt' not in href and '/producthunt' not in href:
result['twitter'] = link.get_attribute('href')
# LinkedIn
elif 'linkedin.com/in/' in href and not result['linkedin']:
result['linkedin'] = link.get_attribute('href')
# GitHub
elif 'github.com/' in href and not result['github']:
# Exclude common non-personal github links
if not any(x in href for x in ['/issues', '/pull', '/blob', '/tree']):
result['github'] = link.get_attribute('href')
# Other social links (Instagram, YouTube, personal website, etc.)
elif any(social in href for social in ['instagram.com/', 'youtube.com/', 'facebook.com/',
'tiktok.com/', 'medium.com/@', 'dev.to/', 'threads.net/']):
other_socials.append(link.get_attribute('href'))
# Join other social links
if other_socials:
result['other_social'] = ' | '.join(other_socials[:3]) # Limit to 3
# Log what we found
found = [k for k, v in result.items() if v]
if found:
print(f" → Social links: {', '.join(found)}")
else:
print(f" → No social links found")
except Exception as e:
print(f" ⚠️ Error getting social links: {e}")
return result
def get_email_from_website(page: Page, website_url: str) -> str:
"""Visit website and find email address"""
if not website_url:
return ""
emails = []
try:
# Visit main page
page.goto(website_url, timeout=20000)
page.wait_for_timeout(2000)
emails.extend(extract_emails_from_page(page))
# If no emails found, try common pages
if not emails:
base_url = website_url.rstrip('/')
for path in ['/contact', '/about', '/support']:
try:
page.goto(f"{base_url}{path}", timeout=10000)
page.wait_for_timeout(1000)
emails.extend(extract_emails_from_page(page))
if emails:
break
except Exception:
continue
except Exception as e:
print(f" ⚠️ Error visiting website: {e}")
# Return first valid email or empty
return emails[0] if emails else ""
def scrape_producthunt(limit: int = None) -> list[Product]:
"""Scrape today's products from ProductHunt homepage
Args:
limit: Optional limit on number of products to process (for testing)
"""
products = []
with sync_playwright() as p:
browser = p.chromium.launch(
headless=False,
args=['--disable-blink-features=AutomationControlled']
)
context = browser.new_context(
user_agent='Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
)
page = context.new_page()
# Step 1: Load ProductHunt homepage
print(" 📡 Loading ProductHunt homepage...")
page.goto("https://www.producthunt.com", timeout=60000)
page.wait_for_timeout(5000)
# Step 2: Click "See all of today's products" to load all today's products
print(" 🔘 Clicking 'See all of today's products'...")
try:
btn = page.query_selector('button:has-text("See all of today")')
if btn:
btn.click()
# Wait for products to load (takes ~4 seconds)
page.wait_for_timeout(5000)
print(" ✅ Loaded all today's products")
else:
print(" ⚠️ 'See all' button not found")
except Exception as e:
print(f" ⚠️ Could not click 'See all': {e}")
# Step 3: Get only today's products from the today section
today_section = page.query_selector('[data-test="homepage-section-today"]')
if today_section:
posts = today_section.query_selector_all('[data-test^="post-item-"]')
else:
posts = page.query_selector_all('[data-test^="post-item-"]')
print(f" ℹ️ Found {len(posts)} products launching today")
# Collect product info first (deduplicate by post_id)
product_data = []
seen_ids = set()
for post in posts:
try:
test_id = post.get_attribute("data-test")
post_id = test_id.replace("post-item-", "")
if post_id in seen_ids:
continue
seen_ids.add(post_id)
# Get product name
name_el = page.query_selector(f'[data-test="post-name-{post_id}"]')
name = name_el.inner_text().strip() if name_el else ""
if not name:
continue
# Clean up numbered names like "333. ProductName"
if '. ' in name[:6] and name.split('. ')[0].isdigit():
name = name.split('. ', 1)[1]
# Get ProductHunt product page link
link = post.query_selector("a[href*='/products/']")
href = link.get_attribute("href") if link else ""
ph_url = f"https://www.producthunt.com{href}" if href and href.startswith("/") else href
# Get tagline
tagline = ""
texts = post.query_selector_all("p, span")
for t in texts:
txt = t.inner_text().strip()
if txt and not txt.isdigit() and len(txt) > 10 and txt != name and not txt.startswith(name):
tagline = txt
break
product_data.append({
'name': name,
'tagline': tagline,
'ph_url': ph_url
})
except Exception:
continue
print(f"\n 🔍 Processing {len(product_data)} unique products...")
# Apply limit if specified
if limit and limit < len(product_data):
print(f" ⚠️ Limiting to first {limit} products")
product_data = product_data[:limit]
# Step 4: Visit each product page to get website, then get email
for i, data in enumerate(product_data):
print(f"\n [{i+1}/{len(product_data)}] {data['name']}")
# Get actual website from PH product page
print(f" → Getting website...")
website = get_product_website(page, data['ph_url'])
print(f" → Website: {website or 'Not found'}")
# Get maker info (name, profile, social links)
print(f" → Getting maker info...")
maker_info = get_maker_info(page, data['ph_url'])
# Get email from website
email = ""
if website:
print(f" → Finding email...")
email = get_email_from_website(page, website)
print(f" → Email: {email or 'Not found'}")
products.append(Product(
name=data['name'],
tagline=data['tagline'],
ph_url=data['ph_url'],
website=website,
email=email,
maker_name=maker_info['maker_name'],
maker_profile=maker_info['maker_profile'],
twitter=maker_info['twitter'],
linkedin=maker_info['linkedin'],
github=maker_info['github'],
other_social=maker_info['other_social']
))
browser.close()
return products
def save_to_csv(products: list[Product], filename: str = None):
"""Save products to CSV file in data directory"""
from pathlib import Path
# Create data directory if it doesn't exist
data_dir = Path(__file__).parent / "data"
data_dir.mkdir(exist_ok=True)
if not filename:
filename = f"launches-{datetime.now().strftime('%Y-%m-%d')}.csv"
filepath = data_dir / filename
if not products:
print(" ⚠️ No products to save")
return
fieldnames = ["date", "source", "name", "tagline", "website", "email", "maker_name",
"maker_profile", "twitter", "linkedin", "github", "other_social", "ph_url"]
with open(filepath, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for product in products:
row = asdict(product)
writer.writerow({k: row[k] for k in fieldnames})
print(f" ✅ Saved {len(products)} products to {filepath}")
def main():
import sys
# Optional limit from command line: python scraper.py 10
limit = int(sys.argv[1]) if len(sys.argv) > 1 else None
print("\n🚀 ProductHunt Scraper")
print("=" * 50)
if limit:
print(f"⚠️ Limiting to first {limit} products")
print("\n📥 Scraping ProductHunt...")
try:
products = scrape_producthunt(limit=limit)
print(f"\n ✅ Found {len(products)} products")
# Count stats
with_email = sum(1 for p in products if p.email)
with_maker = sum(1 for p in products if p.maker_name)
with_twitter = sum(1 for p in products if p.twitter)
with_linkedin = sum(1 for p in products if p.linkedin)
print(f" 📧 {with_email} products have email addresses")
print(f" 👤 {with_maker} products have maker info")
print(f" 🐦 {with_twitter} makers have Twitter/X")
print(f" 💼 {with_linkedin} makers have LinkedIn")
except Exception as e:
print(f" ❌ Error: {e}")
products = []
# Save results
print("\n💾 Saving results...")
save_to_csv(products)
print("\n" + "=" * 50)
print(f"✅ Done! Total: {len(products)} products")
if __name__ == "__main__":
main()