Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/fosslight_source/_scan_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ def __init__(self, value: str) -> None:
self.oss_version = ""

self.checksum = get_checksum_sha1(value)
self.kb_origin_url = "" # URL from OSS KB (_get_origin_url_from_md5_hash)
self.kb_evidence = "" # Evidence from KB API (exact_match or code snippet)

def __del__(self) -> None:
pass
Expand Down Expand Up @@ -104,6 +106,7 @@ def _get_hash(self, path_to_scan: str = "") -> tuple:
return md5_hex, wfp

def _get_origin_url_from_md5_hash(self, md5_hash: str, wfp: str = "") -> str:
"""Return origin_url from KB API."""
try:
payload = {"file_hash": md5_hash}
if wfp and wfp.strip():
Expand All @@ -115,7 +118,6 @@ def _get_origin_url_from_md5_hash(self, md5_hash: str, wfp: str = "") -> str:
with urllib.request.urlopen(request, timeout=10) as response:
data = json.loads(response.read().decode())
if isinstance(data, dict):
# Only extract output if return_code is 0 (success)
return_code = data.get('return_code', -1)
if return_code == 0:
output = data.get('output', '')
Expand Down Expand Up @@ -183,6 +185,8 @@ def set_oss_item(self, path_to_scan: str = "", run_kb: bool = False) -> None:
if md5_hash:
origin_url = self._get_origin_url_from_md5_hash(md5_hash, wfp)
if origin_url:
self.kb_origin_url = origin_url
self.kb_evidence = "exact_match"
extracted_name, extracted_version, repo_url = self._extract_oss_info_from_url(origin_url)
if extracted_name:
self.oss_name = extracted_name
Expand Down
140 changes: 100 additions & 40 deletions src/fosslight_source/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import sys
import os
import platform
import time
import warnings
import logging
import urllib.request
Expand Down Expand Up @@ -40,7 +41,9 @@
MERGED_HEADER = {SRC_SHEET_NAME: ['ID', 'Source Path', 'OSS Name',
'OSS Version', 'License', 'Download Location',
'Homepage', 'Copyright Text', 'Exclude', 'Comment', 'license_reference']}
SCANNER_TYPE = ['kb', 'scancode', 'scanoss', 'all']
KB_REFERENCE_HEADER = ['ID', 'Source Path', 'KB Origin URL', 'Evidence']
ALL_MODE = 'all'
SCANNER_TYPE = ['kb', 'scancode', 'scanoss', ALL_MODE]


logger = logging.getLogger(constant.LOGGER_NAME)
Expand Down Expand Up @@ -72,7 +75,7 @@ def main() -> None:
parser.add_argument('-o', '--output', nargs=1, type=str, required=False, default="")
parser.add_argument('-m', '--matched', action='store_true', required=False)
parser.add_argument('-f', '--formats', nargs='*', type=str, required=False)
parser.add_argument('-s', '--scanner', nargs=1, type=str, required=False, default='all')
parser.add_argument('-s', '--scanner', nargs=1, type=str, required=False, default=ALL_MODE)
parser.add_argument('-t', '--timeout', type=int, required=False, default=120)
parser.add_argument('-c', '--cores', type=int, required=False, default=-1)
parser.add_argument('-e', '--exclude', nargs='*', required=False, default=[])
Expand Down Expand Up @@ -137,7 +140,8 @@ def create_report_file(
output_path: str = "", output_files: list = [],
output_extensions: list = [], correct_mode: bool = True,
correct_filepath: str = "", path_to_scan: str = "", path_to_exclude: list = [],
formats: list = [], api_limit_exceed: bool = False, files_count: int = 0, final_output_path: str = ""
formats: list = [], api_limit_exceed: bool = False, files_count: int = 0, final_output_path: str = "",
run_kb_msg: str = ""
) -> 'ScannerItem':
"""
Create report files for given scanned result.
Expand Down Expand Up @@ -206,14 +210,11 @@ def create_report_file(
if api_limit_exceed:
scan_item.set_cover_comment("SCANOSS skipped (API limits)")

run_kb = True if selected_scanner in ['kb'] else False
if run_kb:
scan_item.set_cover_comment("KB Enabled" if check_kb_server_reachable() else "KB Unreachable")
if run_kb_msg:
scan_item.set_cover_comment(run_kb_msg)
display_mode = selected_scanner
if selected_scanner == "kb":
display_mode += ", scancode"
elif selected_scanner == "all":
display_mode = "scancode, scanoss"
if selected_scanner == ALL_MODE:
display_mode = ", ".join([s for s in SCANNER_TYPE if s != ALL_MODE])
scan_item.set_cover_comment(f"Mode : {display_mode}")

if merged_result:
Expand All @@ -230,11 +231,17 @@ def create_report_file(
sheet_list["scancode_reference"] = get_license_list_to_print(license_list)
elif selected_scanner == 'scanoss':
sheet_list["scanoss_reference"] = get_scanoss_extra_info(scanoss_result)
elif selected_scanner == 'kb':
kb_ref = get_kb_reference_to_print(merged_result)
sheet_list["kb_reference"] = kb_ref
else:
sheet_list["scancode_reference"] = get_license_list_to_print(license_list)
sheet_list["scanoss_reference"] = get_scanoss_extra_info(scanoss_result)
if sheet_list:
scan_item.external_sheets = sheet_list
kb_ref = get_kb_reference_to_print(merged_result)
sheet_list["kb_reference"] = kb_ref

if sheet_list:
scan_item.external_sheets = sheet_list

if correct_mode:
success, msg_correct, correct_item = correct_with_yaml(correct_filepath, path_to_scan, scan_item)
Expand Down Expand Up @@ -262,25 +269,56 @@ def create_report_file(


def check_kb_server_reachable() -> bool:
try:
request = urllib.request.Request(f"{KB_URL}health", method='GET')
with urllib.request.urlopen(request, timeout=10) as response:
logger.debug(f"KB server is reachable. Response status: {response.status}")
return response.status != 404
except urllib.error.HTTPError as e:
logger.debug(f"KB server returned HTTP error: {e.code}")
return e.code != 404
except urllib.error.URLError as e:
logger.debug(f"KB server is unreachable (timeout or connection error): {e}")
return False
except Exception as e:
logger.debug(f"Unexpected error checking KB server: {e}")
return False
for attempt in range(3):
try:
request = urllib.request.Request(f"{KB_URL}health", method='GET')
with urllib.request.urlopen(request, timeout=10) as response:
logger.debug(f"KB server is reachable. Response status: {response.status}")
return True
except urllib.error.HTTPError:
logger.debug("KB server responded (HTTP error), considered reachable")
return True
except urllib.error.URLError as e:
logger.debug(f"KB server is unreachable (timeout or connection error): {e}")
if attempt < 2:
time.sleep(1)
else:
return False
except Exception as e:
logger.debug(f"Unexpected error checking KB server: {e}")
if attempt < 2:
time.sleep(1)
else:
return False
return False


def get_kb_reference_to_print(merged_result: list) -> list:
"""
Build kb_reference sheet rows: file path and URL from _get_origin_url_from_md5_hash.
:param merged_result: list of SourceItem (merged scan result).
:return: list of rows, first row is header, rest are [source_path, kb_origin_url].
"""
rows = [item for item in merged_result if getattr(item, 'kb_origin_url', None)]
if not rows:
return [KB_REFERENCE_HEADER]
rows.sort(key=lambda x: x.source_name_or_path)
data = [
[
item.source_name_or_path,
item.kb_origin_url,
str(getattr(item, 'kb_evidence', '') or '')
]
for item in rows
]
data.insert(0, KB_REFERENCE_HEADER)
return data


def merge_results(
scancode_result: list = [], scanoss_result: list = [], spdx_downloads: dict = {},
path_to_scan: str = "", run_kb: bool = False, manifest_licenses: dict = {}
path_to_scan: str = "", run_kb: bool = False, manifest_licenses: dict = {},
excluded_files: set = None
) -> list:

"""
Expand All @@ -290,8 +328,11 @@ def merge_results(
:param spdx_downloads: dictionary of spdx parsed results.
:param path_to_scan: path to the scanned directory for constructing absolute file paths.
:param run_kb: if True, load kb result.
:param excluded_files: set of relative paths to exclude from KB-only file discovery.
:return merged_result: list of merged result in SourceItem.
"""
if excluded_files is None:
excluded_files = set()

scancode_result.extend([item for item in scanoss_result if item not in scancode_result])

Expand Down Expand Up @@ -319,16 +360,27 @@ def merge_results(
new_result_item.licenses = licenses
new_result_item.is_manifest_file = True
scancode_result.append(new_result_item)
if run_kb and not check_kb_server_reachable():
run_kb = False
if run_kb:
logger.info("KB server is reachable. Loading data from OSS KB.")
else:
logger.info("Skipping KB lookup.")

for item in scancode_result:
item.set_oss_item(path_to_scan, run_kb)

# Add OSSItem for files in path_to_scan that are not in scancode_result
# when KB returns an origin URL for their MD5 hash (skip excluded_files)
if run_kb:
abs_path_to_scan = os.path.abspath(path_to_scan)
scancode_paths = {item.source_name_or_path for item in scancode_result}
for root, _dirs, files in os.walk(path_to_scan):
for file in files:
file_path = os.path.join(root, file)
rel_path = os.path.relpath(file_path, abs_path_to_scan).replace("\\", "/")
if rel_path in scancode_paths or rel_path in excluded_files:
continue
extra_item = SourceItem(rel_path)
extra_item.set_oss_item(path_to_scan, run_kb)
if extra_item.download_location:
scancode_result.append(extra_item)
scancode_paths.add(rel_path)

return scancode_result


Expand All @@ -338,7 +390,7 @@ def run_scanners(
called_by_cli: bool = True, print_matched_text: bool = False,
formats: list = [], time_out: int = 120,
correct_mode: bool = True, correct_filepath: str = "",
selected_scanner: str = 'all', path_to_exclude: list = [],
selected_scanner: str = ALL_MODE, path_to_exclude: list = [],
all_exclude_mode: tuple = ()
) -> Tuple[bool, str, 'ScannerItem', list, list]:
"""
Expand Down Expand Up @@ -397,29 +449,37 @@ def run_scanners(
logger.debug(f"Skipped paths: {excluded_path_with_default_exclusion}")

if not selected_scanner:
selected_scanner = 'all'
if selected_scanner in ['scancode', 'all', 'kb']:
selected_scanner = ALL_MODE
if selected_scanner in ['scancode', ALL_MODE]:
success, result_log[RESULT_KEY], scancode_result, license_list = run_scan(path_to_scan, output_file_name,
write_json_file, num_cores, True,
print_matched_text, formats, called_by_cli,
time_out, correct_mode, correct_filepath,
excluded_path_with_default_exclusion,
excluded_files)
excluded_files = set(excluded_files) if excluded_files else set()
if selected_scanner in ['scanoss', 'all']:
if selected_scanner in ['scanoss', ALL_MODE]:
scanoss_result, api_limit_exceed = run_scanoss_py(path_to_scan, output_path, formats, True, num_cores,
excluded_path_with_default_exclusion, excluded_files,
write_json_file)

run_kb_msg = ""
if selected_scanner in SCANNER_TYPE:
run_kb = True if selected_scanner in ['kb'] else False
run_kb = True if selected_scanner in ['kb', ALL_MODE] else False
if run_kb:
if not check_kb_server_reachable():
run_kb = False
run_kb_msg = "KB Unreachable"
else:
run_kb_msg = "KB Enabled"

spdx_downloads, manifest_licenses = metadata_collector(path_to_scan, excluded_files)
merged_result = merge_results(scancode_result, scanoss_result, spdx_downloads,
path_to_scan, run_kb, manifest_licenses)
path_to_scan, run_kb, manifest_licenses, excluded_files)
scan_item = create_report_file(start_time, merged_result, license_list, scanoss_result, selected_scanner,
print_matched_text, output_path, output_files, output_extensions, correct_mode,
correct_filepath, path_to_scan, excluded_path_without_dot, formats,
api_limit_exceed, cnt_file_except_skipped, final_output_path)
api_limit_exceed, cnt_file_except_skipped, final_output_path, run_kb_msg)
else:
print_help_msg_source_scanner()
result_log[RESULT_KEY] = "Unsupported scanner"
Expand Down
Loading