-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathapp.py
More file actions
308 lines (262 loc) · 16 KB
/
app.py
File metadata and controls
308 lines (262 loc) · 16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
import requests
import os
import re
import urllib.parse # Import urllib.parse for URL joining
import argparse
import json # Import json for potential JSONDecodeError handling
def get_all_challenge_ids(session_cookie, domain, verbosity, csrf_token=None):
"""Fetches all available challenge IDs from the CTF platform."""
url = f"https://{domain}/api/v1/challenges"
headers = {
"Cookie": f"session={session_cookie}",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36",
"Accept": "application/json",
"Accept-Language": "en-US,en;q=0.9,ar;q=0.8",
"Content-Type": "application/json",
"Priority": "u=1, i",
"Referer": f"https://{domain}/challenges",
"Sec-Ch-UA": '"Not;A=Brand";v="99", "Google Chrome";v="139", "Chromium";v="139"',
"Sec-Ch-UA-Mobile": "?0",
"Sec-Ch-UA-Platform": '"Windows"',
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin"
}
# Add CSRF token if provided
if csrf_token:
headers["csrf-token"] = csrf_token
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
if verbosity >= 2:
print(f"Verbose: Response status code for challenge list: {response.status_code}")
try:
data = response.json()
if data.get('success') and 'data' in data:
challenge_ids = [challenge['id'] for challenge in data['data'] if challenge.get('type') != 'hidden']
if verbosity >= 1:
print(f"Found {len(challenge_ids)} available challenges: {sorted(challenge_ids)}")
return sorted(challenge_ids)
else:
if verbosity >= 1:
print("Error: API response indicates failure or missing data.")
return []
except json.JSONDecodeError:
if verbosity >= 1:
print("Error: Could not decode JSON response for challenge list.")
if verbosity >= 2:
print(f"Response content type: {response.headers.get('content-type', 'unknown')}")
print(f"Raw response content (first 1000 chars):\n{response.text[:1000]}")
return []
except requests.exceptions.HTTPError as e:
print(f"HTTP Error fetching challenge list: {e}")
return []
except requests.exceptions.RequestException as e:
print(f"Request Error fetching challenge list: {e}")
return []
def get_challenge_data(challenge_id, session_cookie, domain, verbosity, csrf_token=None):
"""Fetches challenge data from the API."""
url = f"https://{domain}/api/v1/challenges/{challenge_id}"
headers = {
"Cookie": f"session={session_cookie}",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36",
"Accept": "application/json",
"Accept-Language": "en-US,en;q=0.9,ar;q=0.8",
"Content-Type": "application/json",
"Priority": "u=1, i",
"Referer": f"https://{domain}/challenges",
"Sec-Ch-UA": '"Not;A=Brand";v="99", "Google Chrome";v="139", "Chromium";v="139"',
"Sec-Ch-UA-Mobile": "?0",
"Sec-Ch-UA-Platform": '"Windows"',
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin"
}
# Add CSRF token if provided
if csrf_token:
headers["csrf-token"] = csrf_token
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
if verbosity >= 2: # Verbose level 2 for detailed output
print(f"Verbose: Response status code for challenge {challenge_id}: {response.status_code}")
try:
return response.json() # Try to parse as JSON
except json.JSONDecodeError: # Catch JSON parsing errors
if verbosity >= 1:
print(f"Error: Could not decode JSON response for challenge {challenge_id}.")
if verbosity >= 2:
print(f"Response content type: {response.headers.get('content-type', 'unknown')}")
print(f"Response status: {response.status_code}")
print(f"Raw response content (first 1000 chars):\n{response.text[:1000]}")
return None # Return None if JSON parsing fails
except requests.exceptions.HTTPError as e:
if response.status_code == 404:
if verbosity >= 1: # Verbose level 1 for normal output
print(f"Challenge {challenge_id} not found.")
return None # Challenge not found
else:
print(f"HTTP Error for challenge {challenge_id}: {e}")
return None
except requests.exceptions.RequestException as e:
print(f"Request Error for challenge {challenge_id}: {e}")
return None
def create_markdown_file(challenge_data, output_dir, domain, session_cookie, verbosity): # Added verbosity argument
"""Creates a Markdown file for a challenge in category/challenge folders and downloads files."""
category_name = challenge_data['data']['category']
challenge_name = challenge_data['data']['name']
challenge_id = challenge_data['data']['id']
# Sanitize category and challenge names for folder names (replace spaces and special chars with _)
safe_category_name = re.sub(r'[^a-zA-Z0-9_]+', '_', category_name)
safe_challenge_name = re.sub(r'[^a-zA-Z0-9_]+', '_', challenge_name)
challenge_folder_name = f"{safe_challenge_name}_{challenge_id}"
category_dir = os.path.join(output_dir, safe_category_name) # output_dir is now "output/ctf_name"
challenge_dir = os.path.join(category_dir, challenge_folder_name)
os.makedirs(challenge_dir, exist_ok=True) # Create category and challenge folders if they don't exist
file_name = "readme.md" # readme.md inside challenge folder
file_path = os.path.join(challenge_dir, file_name)
markdown_file_content = f"# {challenge_name}\n\n" # Start building markdown content
markdown_file_content += f"**ID:** {challenge_id}\n"
markdown_file_content += f"**Value:** {challenge_data['data']['value']} points\n"
markdown_file_content += f"**Category:** {category_name}\n"
markdown_file_content += "**Tags:** "
if challenge_data['data']['tags']:
markdown_file_content += ", ".join([tag for tag in challenge_data['data']['tags']]) + "\n"
else:
markdown_file_content += "None\n"
markdown_file_content += "\n"
# Sanitize description to handle potential HTML and format for Markdown
description_md = challenge_data['data']['description']
description_md = re.sub(r'<[^>]*>', '', description_md) # Remove HTML tags
markdown_file_content += f"## Description\n{description_md}\n\n"
# Add connection info if available
if challenge_data['data'].get('connection_info'):
markdown_file_content += f"## Connection\n```\n{challenge_data['data']['connection_info']}\n```\n\n"
if challenge_data['data']['files']:
markdown_file_content += "## Files\n"
local_files_links = [] # List to store local file links for readme.md
for file_url_path in challenge_data['data']['files']: # file_url_path is just the path from API
full_file_url = urllib.parse.urljoin(f"https://{domain}", file_url_path) # Create full URL using domain argument
parsed_url = urllib.parse.urlparse(full_file_url)
if parsed_url.netloc == domain: # Check if domain matches the provided domain
filename_start_index = file_url_path.rfind('/') + 1
filename_end_index = file_url_path.rfind('?')
if filename_end_index != -1 and filename_start_index < filename_end_index:
filename = file_url_path[filename_start_index:filename_end_index] # use path to extract filename
else:
filename = file_url_path[filename_start_index:] # use path to extract filename
download_path = os.path.join(challenge_dir, filename) # Local download path
if verbosity >= 1: # Verbose level 1 for normal output
print(f"Downloading file: {full_file_url} to {download_path}")
if verbosity >= 2: # Verbose level 2 for detailed output
print(f"Verbose: Downloading from URL: {full_file_url}")
try:
with requests.get(full_file_url, stream=True, headers={"Cookie": f"session={session_cookie}"}) as response: # Include cookie for download
response.raise_for_status()
with open(download_path, 'wb') as file_handle:
for chunk in response.iter_content(chunk_size=8192):
file_handle.write(chunk)
local_files_links.append(f"- [{filename}](./{filename})") # Local link for readme
except requests.exceptions.RequestException as e:
print(f"Download error for {full_file_url}: {e}")
markdown_file_content += f"- [Download {filename} from CTF platform]({full_file_url}) (Download failed, check console output)\n" # Link to CTF platform if download fails
continue # Skip to next file if download fails
else: # If not the CTF platform domain, just create external link
filename_start_index = full_file_url.rfind('/') + 1
filename_end_index = full_file_url.rfind('?')
if filename_end_index != -1 and filename_start_index < filename_end_index:
filename = file_url_path[filename_start_index:filename_end_index] # use path to extract filename
else:
filename = file_url_path[filename_start_index:] # use path to extract filename
markdown_file_content += f"- [{filename}]({full_file_url})\n" # External link
if local_files_links: # Add local file links to readme if downloads were successful
markdown_file_content += "\n**Local Files:**\n" + "\n".join(local_files_links) + "\n"
else:
markdown_file_content += "\nNo files downloaded.\n" # Indicate if no files were downloaded
else:
markdown_file_content += "## Files\nNo files provided.\n"
markdown_file_content += "\n---\n*Extracted from [{domain} CTF](https://{domain})*\n".format(domain=domain) # Use domain argument in footer
with open(file_path, "w", encoding="utf-8") as md_file:
md_file.write(markdown_file_content) # Write the complete markdown content
if verbosity >= 1: # Verbose level 1 for normal output
print(f"Markdown file created for challenge {challenge_id}: {file_path}")
if verbosity >= 2: # Verbose level 2 for detailed output
print(f"Verbose: Markdown content written to: {file_path}")
# add argument parser
parser = argparse.ArgumentParser(description="Download challenges from a CTFd platform and create Markdown files.") # More descriptive description
parser.add_argument("-S", "--session_cookie", required=True, help="Session cookie from the browser. Required.") # Make session_cookie required and add help
parser.add_argument("-D", "--domain", required=True, help="Domain of the CTFd platform (example: dh.securinets.tn)") # Add default and help for domain
parser.add_argument("-O", "--output", default="output", help="Parent output directory for CTF challenge folders (default: output)") # Changed default output to "output" and updated help
parser.add_argument("--start_id", type=int, help="Starting challenge ID (enables manual mode instead of auto-fetch)") # Make optional
parser.add_argument("--stop_id", type=int, help="Stopping challenge ID (only used with --start_id in manual mode)") # Add stop_id argument
parser.add_argument("--no-download", action="store_true", help="Disable file downloading") # Add --no-download flag
parser.add_argument("--max-failures", type=int, default=10, help="Maximum consecutive failures before stopping (default: 10, only used in manual mode)")
parser.add_argument("--csrf-token", help="CSRF token for API requests (optional)")
parser.add_argument("-v", "--verbosity", type=int, default=1, choices=[0, 1, 2], help="Verbosity level (0: quiet, 1: normal, 2: verbose, default: 1)") # Add verbosity argument
args = parser.parse_args()
if __name__ == "__main__":
session_cookie = args.session_cookie
domain = args.domain
output_parent_directory = args.output
start_challenge_id = args.start_id
stop_challenge_id = args.stop_id
enable_download = not args.no_download
max_consecutive_failures = args.max_failures
csrf_token = args.csrf_token
verbosity_level = args.verbosity
ctf_output_dir = os.path.join(output_parent_directory, domain.replace(".","_"))
if not os.path.exists(ctf_output_dir):
os.makedirs(ctf_output_dir)
# Determine mode: auto-fetch (default) or manual (when start_id is specified)
if start_challenge_id is not None:
# Manual mode: iterate through ID range
if verbosity_level >= 1:
range_info = f"from {start_challenge_id}"
if stop_challenge_id:
range_info += f" to {stop_challenge_id}"
else:
range_info += " onwards"
print(f"Manual mode: fetching challenges {range_info}...")
challenge_id = start_challenge_id
consecutive_failures = 0
while True:
if stop_challenge_id and challenge_id > stop_challenge_id:
if verbosity_level >= 1:
print(f"Stopping challenge download at challenge ID {stop_challenge_id} as requested.")
break
if verbosity_level >= 1:
print(f"Fetching challenge {challenge_id} from {domain}...")
challenge_json = get_challenge_data(challenge_id, session_cookie, domain, verbosity_level, csrf_token)
if challenge_json and challenge_json.get('success'):
create_markdown_file(challenge_json, ctf_output_dir, domain, session_cookie, verbosity_level)
consecutive_failures = 0 # Reset failure counter on success
else:
consecutive_failures += 1
if verbosity_level >= 1:
print(f"Challenge {challenge_id} not found or error encountered on {domain}. Skipping.")
# Stop if too many consecutive failures
if consecutive_failures >= max_consecutive_failures:
if verbosity_level >= 1:
print(f"Stopping after {max_consecutive_failures} consecutive failures. No more challenges found.")
break
challenge_id += 1 # Always increment to continue to next challenge
else:
# Auto-fetch mode (default): get all challenge IDs from API
if verbosity_level >= 1:
print(f"Auto-fetching challenge IDs from {domain}...")
challenge_ids = get_all_challenge_ids(session_cookie, domain, verbosity_level, csrf_token)
if not challenge_ids:
print("Error: Could not fetch challenge IDs. Please check your session cookie and domain.")
exit(1)
# Process each challenge
for challenge_id in challenge_ids:
if verbosity_level >= 1:
print(f"Processing challenge {challenge_id}...")
challenge_json = get_challenge_data(challenge_id, session_cookie, domain, verbosity_level, csrf_token)
if challenge_json and challenge_json.get('success'):
create_markdown_file(challenge_json, ctf_output_dir, domain, session_cookie, verbosity_level)
else:
if verbosity_level >= 1:
print(f"Skipping challenge {challenge_id} due to error.")
if verbosity_level >= 1:
print(f"\nChallenge Markdown files saved in '{ctf_output_dir}' directory.")