-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtenable_scan_retrieval.py
More file actions
389 lines (319 loc) · 14.9 KB
/
tenable_scan_retrieval.py
File metadata and controls
389 lines (319 loc) · 14.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
"""
Tenable.io Scan Retrieval Script
This script connects to Tenable.io (cloud vulnerability management platform) via API
to retrieve and download vulnerability scan results. It allows users to:
- Authenticate with Tenable.io using API keys
- View all available scans
- Select scans interactively (single, multiple, or all)
- Export selected scans in .nessus format
- Organize downloads in a date-based directory structure (Year/Month-Name/Week-XX)
The script handles the full export workflow:
1. Initiates scan export requests
2. Monitors export status until ready
3. Downloads completed exports with progress tracking
4. Saves files with sanitized names in organized directories
"""
import requests
import datetime
import time
import logging
import os
import re
import sys
# Setup logging to track script execution and errors
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
def get_api_credentials():
"""Prompt user for Tenable.io API credentials at runtime."""
print("=" * 60)
print("Tenable.io API Credentials")
print("=" * 60)
print("Enter your Tenable.io API credentials.")
print("You can find these in Tenable.io under Settings > My Account > API Keys")
print()
access_key = input("Enter Access Key: ").strip()
if not access_key:
logging.error("Access Key is required. Exiting.")
sys.exit(1)
secret_key = input("Enter Secret Key: ").strip()
if not secret_key:
logging.error("Secret Key is required. Exiting.")
sys.exit(1)
print()
return access_key, secret_key
# Get current date info
current_date = datetime.datetime.now()
year = current_date.strftime("%Y") # Four-digit year (e.g., "2025")
month_num = current_date.strftime("%m") # Two-digit month (e.g., "03")
month_name = current_date.strftime("%B") # Full month name (e.g., "March")
week_number = current_date.isocalendar().week # ISO week number (correct calendar week)
# Construct dynamic directory structure
OUTPUT_DIR = os.path.join(year, f"{month_num}-{month_name}", f"Week-{week_number:02d}")
# Ensure all directories exist
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Print output directory path for verification
print(f"Output directory set to: {OUTPUT_DIR}")
def sanitize_filename(filename):
"""
Sanitizes filenames by removing invalid characters that could cause OS issues.
Replaces characters that are not allowed in Windows/Linux filenames with underscores.
This prevents file system errors when saving downloaded scan files.
Args:
filename: The original filename that may contain invalid characters
Returns:
A sanitized filename safe for use in file systems
"""
return re.sub(r'[\/:*?"<>|]', '_', filename) # Replace special characters with "_"
def get_all_scans(headers):
"""
Fetches all available vulnerability scans from Tenable.io.
Connects to the Tenable.io API to retrieve a list of all scans accessible
with the provided API credentials. This includes all scan types (scheduled,
on-demand, etc.) that the authenticated user has access to.
Args:
headers: HTTP headers dict with API authentication keys
Returns:
A list of scan dictionaries containing scan metadata (id, name, etc.)
Returns an empty list if the API call fails
"""
scans_url = "https://cloud.tenable.com/scans"
try:
response = requests.get(scans_url, headers=headers)
response.raise_for_status() # Raise an exception for bad status codes
scans = response.json().get("scans", []) # Extract scans array from JSON response
logging.info(f"Found {len(scans)} scans.")
return scans
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching scans: {e}")
return []
def initiate_export(scan_id, headers):
"""
Initiates an export request for a specific scan in .nessus format.
Tenable.io exports are asynchronous - this function starts the export process
and returns a file_id that can be used to check status and download the file
once the export is complete.
Args:
scan_id: The unique identifier of the scan to export
headers: HTTP headers dict with API authentication keys
Returns:
The file_id string if export was initiated successfully, None otherwise
"""
export_url = f"https://cloud.tenable.com/scans/{scan_id}/export"
try:
# Request export in .nessus format (XML format used by Nessus/Tenable)
response = requests.post(export_url, headers=headers, json={"format": "nessus"})
response.raise_for_status()
file_id = response.json().get("file") # Extract file_id from response
if file_id:
logging.info(f"Export initiated for scan {scan_id}. File ID: {file_id}")
return file_id
else:
logging.error(f"Failed to retrieve file ID for scan {scan_id}.")
except requests.exceptions.RequestException as e:
logging.error(f"Error initiating export for scan {scan_id}: {e}")
return None
def wait_for_export(scan_id, file_id, headers, max_retries=30, wait_time=5):
"""
Polls the export status until the scan export is ready for download.
Since Tenable.io exports are asynchronous, this function periodically checks
the export status until it becomes "ready". This prevents attempting to download
a file that hasn't finished exporting yet.
Args:
scan_id: The unique identifier of the scan being exported
file_id: The file identifier returned from initiate_export()
headers: HTTP headers dict with API authentication keys
max_retries: Maximum number of status check attempts (default: 30)
wait_time: Seconds to wait between status checks (default: 5)
Returns:
True if export is ready, False if timeout is reached
"""
status_url = f"https://cloud.tenable.com/scans/{scan_id}/export/{file_id}/status"
for attempt in range(max_retries):
try:
response = requests.get(status_url, headers=headers)
response.raise_for_status()
status = response.json().get("status")
if status == "ready":
logging.info(f"Export ready for scan {scan_id}. File ID: {file_id}")
return True
logging.info(f"Waiting for export {scan_id}... (Attempt {attempt + 1}/{max_retries})")
except requests.exceptions.RequestException as e:
logging.error(f"Error checking export status for scan {scan_id}: {e}")
time.sleep(wait_time) # Wait before next status check
logging.error(f"Export did not complete for scan {scan_id}. Skipping.")
return False
def download_scan(scan_id, scan_name, file_id, headers):
"""
Downloads the exported .nessus scan file with progress tracking.
Downloads the completed export file in chunks to handle large files efficiently.
Displays real-time progress including percentage complete and download speed.
Files are saved to the date-organized output directory.
Args:
scan_id: The unique identifier of the scan
scan_name: The name of the scan (used for filename)
file_id: The file identifier from the export process
headers: HTTP headers dict with API authentication keys
Returns:
The path to the downloaded file if successful, None otherwise
"""
download_url = f"https://cloud.tenable.com/scans/{scan_id}/export/{file_id}/download"
safe_scan_name = sanitize_filename(scan_name) # Ensure valid filename
output_file = os.path.join(OUTPUT_DIR, f"{safe_scan_name}.nessus")
try:
start_time = time.time() # Start timing
response = requests.get(download_url, headers=headers, stream=True)
response.raise_for_status()
total_size = int(response.headers.get("Content-Length", 0))
downloaded_size = 0
chunk_size = 1024 * 1024 # 1MB chunks
last_update_time = start_time
with open(output_file, "wb") as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded_size += len(chunk)
# Calculate percentage
percent_done = (downloaded_size / total_size) * 100 if total_size else 0
# Calculate speed
current_time = time.time()
elapsed_chunk_time = current_time - last_update_time
last_update_time = current_time
if elapsed_chunk_time > 0:
speed_mb_s = (len(chunk) / (1024 * 1024)) / elapsed_chunk_time
else:
speed_mb_s = 0
# Update progress in the same line (stdout instead of logging)
sys.stdout.write(f"\rDownloading {output_file}: {percent_done:.2f}% - Speed: {speed_mb_s:.2f} MB/s")
sys.stdout.flush()
end_time = time.time() # End timing
elapsed_time = end_time - start_time # Total time taken
file_size_mb = downloaded_size / (1024 * 1024)
avg_speed = file_size_mb / elapsed_time if elapsed_time > 0 else 0
logging.info(f"\nDownload complete: {output_file} | Size: {file_size_mb:.2f} MB | Avg Speed: {avg_speed:.2f} MB/s")
return output_file
except requests.exceptions.RequestException as e:
logging.error(f"Error downloading scan {scan_id}: {e}")
return None
def select_scans_interactive(scans):
"""
Displays available scans and provides interactive selection interface.
Shows all available scans in a numbered list and allows the user to:
- Select a single scan by entering its number
- Select multiple scans by entering comma-separated numbers (e.g., 1,3,5)
- Select all scans by entering "all"
Includes validation and confirmation prompts to prevent accidental selections.
Args:
scans: List of scan dictionaries from get_all_scans()
Returns:
List of selected scan dictionaries, or empty list if none selected
"""
if not scans:
logging.info("No scans available to select.")
return []
print("\n" + "=" * 60)
print("Available Scans")
print("=" * 60)
# Display all scans with numbers
for idx, scan in enumerate(scans, 1):
scan_id = scan.get("id")
scan_name = scan.get("name", "Unknown Scan")
print(f"{idx:3d}. {scan_name} (ID: {scan_id})")
print("=" * 60)
print("\nSelection Options:")
print(" - Enter a single number (e.g., 1) to select one scan")
print(" - Enter multiple numbers separated by commas (e.g., 1,3,5) to select multiple scans")
print(" - Enter 'all' to select all scans")
print()
while True:
selection = input("Enter your selection: ").strip().lower()
if not selection:
print("Please enter a selection.\n")
continue
# Handle "all" selection
if selection == "all":
confirmation = input(f"Select all {len(scans)} scans? (Y/N): ").strip().lower()
if confirmation == 'y':
return scans
else:
print("Selection cancelled. Please try again.\n")
continue
# Handle numeric selections
try:
# Split by comma and strip whitespace
selected_numbers = [num.strip() for num in selection.split(',')]
selected_indices = []
for num_str in selected_numbers:
num = int(num_str)
if 1 <= num <= len(scans):
selected_indices.append(num - 1) # Convert to 0-based index
else:
print(f"Invalid number: {num}. Please enter numbers between 1 and {len(scans)}.\n")
break
else:
# All numbers were valid
if not selected_indices:
print("No valid selections made. Please try again.\n")
continue
# Remove duplicates while preserving order
selected_indices = list(dict.fromkeys(selected_indices))
selected_scans = [scans[idx] for idx in selected_indices]
# Display selected scans for confirmation
print("\nSelected scans:")
for idx in selected_indices:
scan = scans[idx]
scan_name = scan.get("name", "Unknown Scan")
print(f" - {scan_name}")
confirmation = input(f"\nProceed with {len(selected_scans)} scan(s)? (Y/N): ").strip().lower()
if confirmation == 'y':
return selected_scans
else:
print("Selection cancelled. Please try again.\n")
continue
except ValueError:
print("Invalid input. Please enter numbers separated by commas, or 'all'.\n")
continue
def main():
"""
Main execution function that orchestrates the scan retrieval workflow.
Workflow:
1. Fetches all available scans from Tenable.io
2. Displays scans and allows user to select which ones to download
3. For each selected scan:
- Initiates export request
- Waits for export to complete
- Downloads the exported .nessus file
"""
# Get API credentials at runtime (inside main to avoid prompting on import)
access_key, secret_key = get_api_credentials()
headers = {
"X-ApiKeys": f"accessKey={access_key}; secretKey={secret_key};",
"Accept": "application/json",
"Content-Type": "application/json"
}
print("\n" + "=" * 60)
print("Fetching Scans from Tenable.io")
print("=" * 60)
scans = get_all_scans(headers)
if not scans:
logging.info("No scans found. Exiting.")
return
# Interactive scan selection - user chooses which scans to download
selected_scans = select_scans_interactive(scans)
if not selected_scans:
logging.info("No scans selected. Exiting.")
return
print(f"\nProcessing {len(selected_scans)} selected scan(s)...\n")
# Process each selected scan through the export and download workflow
for scan in selected_scans:
scan_id = scan.get("id")
scan_name = scan.get("name", "Unknown Scan")
logging.info(f"Processing scan: {scan_name} (ID: {scan_id})")
# Step 1: Initiate export request
file_id = initiate_export(scan_id, headers)
if not file_id:
continue # Skip to next scan if export initiation failed
# Step 2: Wait for export to complete
if wait_for_export(scan_id, file_id, headers):
# Step 3: Download the completed export
download_scan(scan_id, scan_name, file_id, headers)
if __name__ == "__main__":
main()