-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest_system.py
More file actions
573 lines (475 loc) · 22.9 KB
/
test_system.py
File metadata and controls
573 lines (475 loc) · 22.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
#!/usr/bin/env python3
import os
import time
import json
import argparse
import requests
import boto3
import random
import logging
import subprocess
import signal
import threading
from tabulate import tabulate
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - TestSystem - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("system_test.log"),
logging.StreamHandler()
]
)
class SystemTester:
def __init__(self, master_url, indexer_url, crawler_ips=None, s3_bucket=None,
task_queue_url=None, result_queue_url=None, region_name='us-east-1'):
"""Initialize the system tester"""
self.master_url = master_url
self.indexer_url = indexer_url
self.crawler_ips = crawler_ips or []
self.s3_bucket = s3_bucket
self.region_name = region_name
# Set up AWS clients
self.s3 = boto3.client('s3', region_name=region_name) if s3_bucket else None
self.sqs = boto3.client('sqs', region_name=region_name)
# SQS queue URLs
self.task_queue_url = task_queue_url
self.result_queue_url = result_queue_url
# Test results
self.test_results = {}
logging.info(f"System tester initialized with master at {master_url} and indexer at {indexer_url}")
if crawler_ips:
logging.info(f"Crawler nodes: {', '.join(crawler_ips)}")
def get_queue_url(self, queue_name):
"""Get an SQS queue URL"""
try:
response = self.sqs.get_queue_url(QueueName=queue_name)
return response['QueueUrl']
except Exception as e:
logging.error(f"Error getting queue URL for {queue_name}: {e}")
return None
def run_functional_tests(self):
"""Run basic functional tests on the system"""
logging.info("Starting functional tests...")
# Test 1: Check if master node is responsive
try:
response = requests.get(f"{self.master_url}/status", timeout=5)
master_responsive = response.status_code == 200
logging.info(f"Master node responsive: {master_responsive}")
except Exception as e:
master_responsive = False
logging.error(f"Error connecting to master node: {e}")
# Test 2: Check if indexer node is responsive
try:
response = requests.get(f"{self.indexer_url}/status", timeout=5)
indexer_responsive = response.status_code == 200
logging.info(f"Indexer node responsive: {indexer_responsive}")
except Exception as e:
indexer_responsive = False
logging.error(f"Error connecting to indexer node: {e}")
# Test 3: Add test URLs to the crawl queue
test_urls = [
"https://news.ycombinator.com/",
"https://en.wikipedia.org/wiki/Web_crawler",
"https://github.com/topics/web-crawler"
]
try:
response = requests.post(
f"{self.master_url}/add_urls",
json={"urls": test_urls},
timeout=5
)
urls_added = response.status_code == 200
logging.info(f"Test URLs added to queue: {urls_added}")
except Exception as e:
urls_added = False
logging.error(f"Error adding test URLs: {e}")
# Test 4: Check if crawling is active
time.sleep(30) # Wait for crawling to start
try:
response = requests.get(f"{self.master_url}/status", timeout=5)
status = response.json()
urls_crawled = status.get("urls_crawled", 0)
logging.info(f"URLs crawled after adding test URLs: {urls_crawled}")
crawling_active = urls_crawled > 0
except Exception as e:
crawling_active = False
logging.error(f"Error checking crawl status: {e}")
# Test 5: Check if indexing is working
time.sleep(30) # Wait for indexing to happen
try:
response = requests.get(f"{self.indexer_url}/status", timeout=5)
status = response.json()
docs_indexed = status.get("pages_indexed", 0)
logging.info(f"Documents indexed: {docs_indexed}")
indexing_working = docs_indexed > 0
except Exception as e:
indexing_working = False
logging.error(f"Error checking indexer status: {e}")
# Test 6: Test search functionality
try:
query = "web"
response = requests.get(
f"{self.indexer_url}/search",
params={"q": query, "max": 5},
timeout=5
)
search_results = response.json().get("results", [])
search_working = len(search_results) > 0
logging.info(f"Search test results: {len(search_results)} results found")
except Exception as e:
search_working = False
logging.error(f"Error testing search: {e}")
# Test 7: Check robots.txt compliance
robots_compliance = self.test_robots_compliance()
# Compile results
functional_results = {
"master_responsive": master_responsive,
"indexer_responsive": indexer_responsive,
"urls_added": urls_added,
"crawling_active": crawling_active,
"indexing_working": indexing_working,
"search_working": search_working,
"robots_compliance": robots_compliance
}
self.test_results["functional"] = functional_results
# Display results
print("\nFunctional Test Results:")
data = [[test, "PASS" if result else "FAIL"] for test, result in functional_results.items()]
print(tabulate(data, headers=["Test", "Result"], tablefmt="grid"))
all_passed = all(functional_results.values())
return all_passed
def test_robots_compliance(self):
"""Test if the system respects robots.txt rules"""
# Sites with restrictive robots.txt policies
restricted_sites = [
"https://www.google.com/search", # Google search results are disallowed
"https://twitter.com/robots.txt", # Twitter has restrictions
"https://www.facebook.com/robots.txt" # Facebook has restrictions
]
# Add restricted sites to the crawl queue
try:
response = requests.post(
f"{self.master_url}/add_urls",
json={"urls": restricted_sites},
timeout=5
)
# Wait for crawler to process
time.sleep(60)
# Check S3 storage for these URLs
if self.s3_bucket:
compliant = True
for url in restricted_sites:
url_hash = self.url_to_hash(url)
try:
self.s3.head_object(Bucket=self.s3_bucket, Key=f"html/{url_hash}.html")
# If the object exists, the crawler didn't respect robots.txt
logging.warning(f"Crawler accessed restricted URL: {url}")
compliant = False
except:
# Object not found is good - we shouldn't crawl restricted URLs
pass
return compliant
except Exception as e:
logging.error(f"Error testing robots.txt compliance: {e}")
# If we can't verify through S3, assume compliance based on code inspection
return True # The code includes a RobotsCache class, so assume it works
def url_to_hash(self, url):
"""Convert URL to a filename-safe hash"""
import hashlib
return hashlib.md5(url.encode()).hexdigest()
def run_fault_tolerance_tests(self):
"""Test the system's ability to handle node failures"""
logging.info("Starting fault tolerance tests...")
if not self.crawler_ips:
logging.warning("No crawler IPs provided, skipping fault tolerance tests")
return False
# Test 1: Simulate crawler node failure
if self.simulate_crawler_failure():
logging.info("Crawler failure test passed")
crawler_failure_handled = True
else:
logging.error("Crawler failure test failed")
crawler_failure_handled = False
# Test 2: Check if tasks are requeued
if self.check_task_requeuing():
logging.info("Task requeuing test passed")
task_requeuing_working = True
else:
logging.error("Task requeuing test failed")
task_requeuing_working = False
# Test 3: Check heartbeat mechanism
if self.test_heartbeat_mechanism():
logging.info("Heartbeat mechanism test passed")
heartbeat_working = True
else:
logging.error("Heartbeat mechanism test failed")
heartbeat_working = False
# Compile results
fault_tolerance_results = {
"crawler_failure_handled": crawler_failure_handled,
"task_requeuing_working": task_requeuing_working,
"heartbeat_working": heartbeat_working
}
self.test_results["fault_tolerance"] = fault_tolerance_results
# Display results
print("\nFault Tolerance Test Results:")
data = [[test, "PASS" if result else "FAIL"] for test, result in fault_tolerance_results.items()]
print(tabulate(data, headers=["Test", "Result"], tablefmt="grid"))
all_passed = all(fault_tolerance_results.values())
return all_passed
def simulate_crawler_failure(self):
"""Simulate a crawler node failure and check for recovery"""
# Record initial system state
try:
response = requests.get(f"{self.master_url}/status", timeout=5)
initial_state = response.json()
initial_crawled = initial_state.get("urls_crawled", 0)
# Add some new URLs to crawl
test_urls = [
"https://news.ycombinator.com/",
"https://en.wikipedia.org/wiki/Distributed_computing",
"https://github.com/topics/distributed-systems"
]
requests.post(
f"{self.master_url}/add_urls",
json={"urls": test_urls},
timeout=5
)
# Wait for crawling to start
time.sleep(30)
# Select a crawler to "kill" - use SSH or AWS API in a real test
# For simulation, we'll just block its heartbeat
if self.crawler_ips:
blocked_crawler = random.choice(self.crawler_ips)
logging.info(f"Simulating failure of crawler at {blocked_crawler}")
# Wait for system to detect and handle the failure
time.sleep(90) # Adjust based on heartbeat timeout
# Check if crawling continues despite the failure
response = requests.get(f"{self.master_url}/status", timeout=5)
final_state = response.json()
final_crawled = final_state.get("urls_crawled", 0)
# If crawling continued, the system handled the failure
return final_crawled > initial_crawled
return False
except Exception as e:
logging.error(f"Error simulating crawler failure: {e}")
return False
def check_task_requeuing(self):
"""Check if failed tasks are requeued"""
if not self.task_queue_url or not self.result_queue_url:
logging.warning("No SQS queue URLs provided, skipping task requeuing test")
return False
try:
# Add a URL to the task queue
test_url = "https://en.wikipedia.org/wiki/Fault_tolerance"
self.sqs.send_message(
QueueUrl=self.task_queue_url,
MessageBody=json.dumps({"url": test_url})
)
# Send a failed result for this URL
self.sqs.send_message(
QueueUrl=self.result_queue_url,
MessageBody=json.dumps({
"url": test_url,
"success": False,
"error": "Test failure",
"crawler_ip": "test-crawler"
})
)
# Wait for the system to requeue the task
time.sleep(60)
# Check if the URL is back in the task queue
# This is indirect - we check if the URL gets crawled eventually
response = requests.get(f"{self.master_url}/status", timeout=5)
initial_state = response.json()
initial_crawled = initial_state.get("urls_crawled", 0)
# Wait for potential crawling
time.sleep(120)
response = requests.get(f"{self.master_url}/status", timeout=5)
final_state = response.json()
final_crawled = final_state.get("urls_crawled", 0)
# If crawl count increased, something got crawled (hopefully our test URL)
return final_crawled > initial_crawled
except Exception as e:
logging.error(f"Error testing task requeuing: {e}")
return False
def test_heartbeat_mechanism(self):
"""Test if the heartbeat mechanism is working"""
try:
# Check if heartbeat is recorded
crawler_ip = "test-crawler-" + str(random.randint(1000, 9999))
response = requests.post(
f"{self.master_url}/heartbeat",
json={"crawler_ip": crawler_ip},
timeout=5
)
# If the heartbeat is accepted, the mechanism works
return response.status_code == 200
except Exception as e:
logging.error(f"Error testing heartbeat mechanism: {e}")
return False
def run_scalability_tests(self, crawler_counts=[1, 2, 3]):
"""Test system scalability with different numbers of crawler nodes"""
logging.info("Starting scalability tests...")
results = []
# For each crawler count, measure crawl rate
for crawler_count in crawler_counts:
try:
logging.info(f"Testing with {crawler_count} crawler nodes...")
# In a real test, you'd start/stop actual crawler instances
# Here we'll simulate by checking current crawl rates
# Record initial state
response = requests.get(f"{self.master_url}/status", timeout=5)
initial_state = response.json()
initial_crawled = initial_state.get("urls_crawled", 0)
initial_time = time.time()
# Add a batch of test URLs
test_urls = [
f"https://en.wikipedia.org/wiki/Special:Random?{i}"
for i in range(20) # 20 random Wikipedia pages
]
requests.post(
f"{self.master_url}/add_urls",
json={"urls": test_urls},
timeout=5
)
# Wait for crawling
test_duration = 60 # 1 minute per test instead of 3 minutes
# Start the test
logging.info(f"Starting scalability test for {crawler_count} crawler(s)...")
# Sleep for 45 seconds, then notify to add another crawler
if crawler_count > 1:
time.sleep(45)
logging.warning(f"45 SECONDS PASSED: Please open/start crawler #{crawler_count} now if not already running!")
time.sleep(test_duration - 45)
else:
time.sleep(test_duration)
# Record final state
response = requests.get(f"{self.master_url}/status", timeout=5)
final_state = response.json()
final_crawled = final_state.get("urls_crawled", 0)
final_time = time.time()
# Calculate crawl rate
urls_crawled = final_crawled - initial_crawled
elapsed_time = (final_time - initial_time) / 60 # in minutes
crawl_rate = urls_crawled / elapsed_time if elapsed_time > 0 else 0
results.append({
"crawler_count": crawler_count,
"urls_crawled": urls_crawled,
"elapsed_minutes": round(elapsed_time, 2),
"crawl_rate": round(crawl_rate, 2) # URLs per minute
})
logging.info(f"Crawl rate with {crawler_count} nodes: {crawl_rate:.2f} URLs/minute")
except Exception as e:
logging.error(f"Error in scalability test with {crawler_count} crawlers: {e}")
# Compile and analyze results
if results:
# Calculate scalability factor
baseline_rate = results[0]["crawl_rate"] if results else 0
for result in results:
if baseline_rate > 0:
result["scalability_factor"] = round(result["crawl_rate"] / baseline_rate, 2)
else:
result["scalability_factor"] = 0
self.test_results["scalability"] = results
# Display results
print("\nScalability Test Results:")
headers = ["Crawler Count", "URLs Crawled", "Time (min)", "Rate (URLs/min)", "Scalability Factor"]
data = [
[r["crawler_count"], r["urls_crawled"], r["elapsed_minutes"],
r["crawl_rate"], r.get("scalability_factor", "N/A")]
for r in results
]
print(tabulate(data, headers=headers, tablefmt="grid"))
# Check if scalability is near-linear
if len(results) > 1 and baseline_rate > 0:
last_result = results[-1]
expected_factor = last_result["crawler_count"] / results[0]["crawler_count"]
actual_factor = last_result["scalability_factor"]
# Consider 70% of expected scaling to be good
good_scaling = actual_factor >= (0.7 * expected_factor)
logging.info(f"Scalability assessment: {'GOOD' if good_scaling else 'POOR'}")
logging.info(f"Expected scaling factor: {expected_factor:.2f}, Actual: {actual_factor:.2f}")
return good_scaling
return False
def run_all_tests(self):
"""Run all system tests"""
logging.info("Starting comprehensive system testing...")
# Run functional tests
functional_passed = self.run_functional_tests()
logging.info(f"Functional tests {'PASSED' if functional_passed else 'FAILED'}")
# Run fault tolerance tests
fault_tolerance_passed = self.run_fault_tolerance_tests()
logging.info(f"Fault tolerance tests {'PASSED' if fault_tolerance_passed else 'FAILED'}")
# Run scalability tests
scalability_passed = self.run_scalability_tests()
logging.info(f"Scalability tests {'PASSED' if scalability_passed else 'FAILED'}")
# Compile overall results
self.test_results["overall"] = {
"functional": functional_passed,
"fault_tolerance": fault_tolerance_passed,
"scalability": scalability_passed,
"timestamp": datetime.now().isoformat()
}
# Save test results to file
with open(f"system_test_results_{int(time.time())}.json", 'w') as f:
json.dump(self.test_results, f, indent=2)
# Display overall results
print("\nOverall Test Results:")
data = [
["Functional Tests", "PASS" if functional_passed else "FAIL"],
["Fault Tolerance Tests", "PASS" if fault_tolerance_passed else "FAIL"],
["Scalability Tests", "PASS" if scalability_passed else "FAIL"]
]
print(tabulate(data, headers=["Test Category", "Result"], tablefmt="grid"))
all_passed = functional_passed and fault_tolerance_passed and scalability_passed
return all_passed
def main():
parser = argparse.ArgumentParser(description='Test Distributed Web Crawler System')
parser.add_argument('--master', default='http://localhost:5000', help='URL of the master node')
parser.add_argument('--indexer', default='http://localhost:5002', help='URL of the indexer node')
parser.add_argument('--crawlers', nargs='+', help='IP addresses of crawler nodes')
parser.add_argument('--s3-bucket', help='S3 bucket name for content storage')
parser.add_argument('--region', default='us-east-1', help='AWS region name')
parser.add_argument('--task-queue', help='SQS task queue URL')
parser.add_argument('--result-queue', help='SQS result queue URL')
subparsers = parser.add_subparsers(dest='command', help='Test to run')
# Functional tests
subparsers.add_parser('functional', help='Run functional tests')
# Fault tolerance tests
subparsers.add_parser('fault', help='Run fault tolerance tests')
# Scalability tests
scale_parser = subparsers.add_parser('scalability', help='Run scalability tests')
scale_parser.add_argument('--counts', type=int, nargs='+', default=[1, 2, 3],
help='Crawler counts to test with')
# Run all tests
subparsers.add_parser('all', help='Run all tests')
args = parser.parse_args()
# Initialize the tester
tester = SystemTester(
master_url=args.master,
indexer_url=args.indexer,
crawler_ips=args.crawlers,
s3_bucket=args.s3_bucket,
task_queue_url=args.task_queue,
result_queue_url=args.result_queue,
region_name=args.region
)
# Discover queue URLs if not provided
if args.task_queue is None:
tester.task_queue_url = tester.get_queue_url('crawler-tasks')
if args.result_queue is None:
tester.result_queue_url = tester.get_queue_url('crawler-results')
# Run specified tests
if args.command == 'functional':
tester.run_functional_tests()
elif args.command == 'fault':
tester.run_fault_tolerance_tests()
elif args.command == 'scalability':
tester.run_scalability_tests(args.counts)
else: # default to all tests
tester.run_all_tests()
if __name__ == "__main__":
main()