diff --git a/src/fosslight_source/_scan_item.py b/src/fosslight_source/_scan_item.py index 4a94ba1..e6b8dc7 100644 --- a/src/fosslight_source/_scan_item.py +++ b/src/fosslight_source/_scan_item.py @@ -44,6 +44,8 @@ def __init__(self, value: str) -> None: self.oss_version = "" self.checksum = get_checksum_sha1(value) + self.kb_origin_url = "" # URL from OSS KB (_get_origin_url_from_md5_hash) + self.kb_evidence = "" # Evidence from KB API (exact_match or code snippet) def __del__(self) -> None: pass @@ -104,6 +106,7 @@ def _get_hash(self, path_to_scan: str = "") -> tuple: return md5_hex, wfp def _get_origin_url_from_md5_hash(self, md5_hash: str, wfp: str = "") -> str: + """Return origin_url from KB API.""" try: payload = {"file_hash": md5_hash} if wfp and wfp.strip(): @@ -115,7 +118,6 @@ def _get_origin_url_from_md5_hash(self, md5_hash: str, wfp: str = "") -> str: with urllib.request.urlopen(request, timeout=10) as response: data = json.loads(response.read().decode()) if isinstance(data, dict): - # Only extract output if return_code is 0 (success) return_code = data.get('return_code', -1) if return_code == 0: output = data.get('output', '') @@ -183,6 +185,8 @@ def set_oss_item(self, path_to_scan: str = "", run_kb: bool = False) -> None: if md5_hash: origin_url = self._get_origin_url_from_md5_hash(md5_hash, wfp) if origin_url: + self.kb_origin_url = origin_url + self.kb_evidence = "exact_match" extracted_name, extracted_version, repo_url = self._extract_oss_info_from_url(origin_url) if extracted_name: self.oss_name = extracted_name diff --git a/src/fosslight_source/cli.py b/src/fosslight_source/cli.py index d38caa9..61b1232 100755 --- a/src/fosslight_source/cli.py +++ b/src/fosslight_source/cli.py @@ -6,6 +6,7 @@ import sys import os import platform +import time import warnings import logging import urllib.request @@ -40,7 +41,9 @@ MERGED_HEADER = {SRC_SHEET_NAME: ['ID', 'Source Path', 'OSS Name', 'OSS Version', 'License', 'Download Location', 'Homepage', 'Copyright Text', 'Exclude', 'Comment', 'license_reference']} -SCANNER_TYPE = ['kb', 'scancode', 'scanoss', 'all'] +KB_REFERENCE_HEADER = ['ID', 'Source Path', 'KB Origin URL', 'Evidence'] +ALL_MODE = 'all' +SCANNER_TYPE = ['kb', 'scancode', 'scanoss', ALL_MODE] logger = logging.getLogger(constant.LOGGER_NAME) @@ -72,7 +75,7 @@ def main() -> None: parser.add_argument('-o', '--output', nargs=1, type=str, required=False, default="") parser.add_argument('-m', '--matched', action='store_true', required=False) parser.add_argument('-f', '--formats', nargs='*', type=str, required=False) - parser.add_argument('-s', '--scanner', nargs=1, type=str, required=False, default='all') + parser.add_argument('-s', '--scanner', nargs=1, type=str, required=False, default=ALL_MODE) parser.add_argument('-t', '--timeout', type=int, required=False, default=120) parser.add_argument('-c', '--cores', type=int, required=False, default=-1) parser.add_argument('-e', '--exclude', nargs='*', required=False, default=[]) @@ -137,7 +140,8 @@ def create_report_file( output_path: str = "", output_files: list = [], output_extensions: list = [], correct_mode: bool = True, correct_filepath: str = "", path_to_scan: str = "", path_to_exclude: list = [], - formats: list = [], api_limit_exceed: bool = False, files_count: int = 0, final_output_path: str = "" + formats: list = [], api_limit_exceed: bool = False, files_count: int = 0, final_output_path: str = "", + run_kb_msg: str = "" ) -> 'ScannerItem': """ Create report files for given scanned result. @@ -206,14 +210,11 @@ def create_report_file( if api_limit_exceed: scan_item.set_cover_comment("SCANOSS skipped (API limits)") - run_kb = True if selected_scanner in ['kb'] else False - if run_kb: - scan_item.set_cover_comment("KB Enabled" if check_kb_server_reachable() else "KB Unreachable") + if run_kb_msg: + scan_item.set_cover_comment(run_kb_msg) display_mode = selected_scanner - if selected_scanner == "kb": - display_mode += ", scancode" - elif selected_scanner == "all": - display_mode = "scancode, scanoss" + if selected_scanner == ALL_MODE: + display_mode = ", ".join([s for s in SCANNER_TYPE if s != ALL_MODE]) scan_item.set_cover_comment(f"Mode : {display_mode}") if merged_result: @@ -230,11 +231,17 @@ def create_report_file( sheet_list["scancode_reference"] = get_license_list_to_print(license_list) elif selected_scanner == 'scanoss': sheet_list["scanoss_reference"] = get_scanoss_extra_info(scanoss_result) + elif selected_scanner == 'kb': + kb_ref = get_kb_reference_to_print(merged_result) + sheet_list["kb_reference"] = kb_ref else: sheet_list["scancode_reference"] = get_license_list_to_print(license_list) sheet_list["scanoss_reference"] = get_scanoss_extra_info(scanoss_result) - if sheet_list: - scan_item.external_sheets = sheet_list + kb_ref = get_kb_reference_to_print(merged_result) + sheet_list["kb_reference"] = kb_ref + + if sheet_list: + scan_item.external_sheets = sheet_list if correct_mode: success, msg_correct, correct_item = correct_with_yaml(correct_filepath, path_to_scan, scan_item) @@ -262,25 +269,56 @@ def create_report_file( def check_kb_server_reachable() -> bool: - try: - request = urllib.request.Request(f"{KB_URL}health", method='GET') - with urllib.request.urlopen(request, timeout=10) as response: - logger.debug(f"KB server is reachable. Response status: {response.status}") - return response.status != 404 - except urllib.error.HTTPError as e: - logger.debug(f"KB server returned HTTP error: {e.code}") - return e.code != 404 - except urllib.error.URLError as e: - logger.debug(f"KB server is unreachable (timeout or connection error): {e}") - return False - except Exception as e: - logger.debug(f"Unexpected error checking KB server: {e}") - return False + for attempt in range(3): + try: + request = urllib.request.Request(f"{KB_URL}health", method='GET') + with urllib.request.urlopen(request, timeout=10) as response: + logger.debug(f"KB server is reachable. Response status: {response.status}") + return True + except urllib.error.HTTPError: + logger.debug("KB server responded (HTTP error), considered reachable") + return True + except urllib.error.URLError as e: + logger.debug(f"KB server is unreachable (timeout or connection error): {e}") + if attempt < 2: + time.sleep(1) + else: + return False + except Exception as e: + logger.debug(f"Unexpected error checking KB server: {e}") + if attempt < 2: + time.sleep(1) + else: + return False + return False + + +def get_kb_reference_to_print(merged_result: list) -> list: + """ + Build kb_reference sheet rows: file path and URL from _get_origin_url_from_md5_hash. + :param merged_result: list of SourceItem (merged scan result). + :return: list of rows, first row is header, rest are [source_path, kb_origin_url]. + """ + rows = [item for item in merged_result if getattr(item, 'kb_origin_url', None)] + if not rows: + return [KB_REFERENCE_HEADER] + rows.sort(key=lambda x: x.source_name_or_path) + data = [ + [ + item.source_name_or_path, + item.kb_origin_url, + str(getattr(item, 'kb_evidence', '') or '') + ] + for item in rows + ] + data.insert(0, KB_REFERENCE_HEADER) + return data def merge_results( scancode_result: list = [], scanoss_result: list = [], spdx_downloads: dict = {}, - path_to_scan: str = "", run_kb: bool = False, manifest_licenses: dict = {} + path_to_scan: str = "", run_kb: bool = False, manifest_licenses: dict = {}, + excluded_files: set = None ) -> list: """ @@ -290,8 +328,11 @@ def merge_results( :param spdx_downloads: dictionary of spdx parsed results. :param path_to_scan: path to the scanned directory for constructing absolute file paths. :param run_kb: if True, load kb result. + :param excluded_files: set of relative paths to exclude from KB-only file discovery. :return merged_result: list of merged result in SourceItem. """ + if excluded_files is None: + excluded_files = set() scancode_result.extend([item for item in scanoss_result if item not in scancode_result]) @@ -319,16 +360,27 @@ def merge_results( new_result_item.licenses = licenses new_result_item.is_manifest_file = True scancode_result.append(new_result_item) - if run_kb and not check_kb_server_reachable(): - run_kb = False - if run_kb: - logger.info("KB server is reachable. Loading data from OSS KB.") - else: - logger.info("Skipping KB lookup.") for item in scancode_result: item.set_oss_item(path_to_scan, run_kb) + # Add OSSItem for files in path_to_scan that are not in scancode_result + # when KB returns an origin URL for their MD5 hash (skip excluded_files) + if run_kb: + abs_path_to_scan = os.path.abspath(path_to_scan) + scancode_paths = {item.source_name_or_path for item in scancode_result} + for root, _dirs, files in os.walk(path_to_scan): + for file in files: + file_path = os.path.join(root, file) + rel_path = os.path.relpath(file_path, abs_path_to_scan).replace("\\", "/") + if rel_path in scancode_paths or rel_path in excluded_files: + continue + extra_item = SourceItem(rel_path) + extra_item.set_oss_item(path_to_scan, run_kb) + if extra_item.download_location: + scancode_result.append(extra_item) + scancode_paths.add(rel_path) + return scancode_result @@ -338,7 +390,7 @@ def run_scanners( called_by_cli: bool = True, print_matched_text: bool = False, formats: list = [], time_out: int = 120, correct_mode: bool = True, correct_filepath: str = "", - selected_scanner: str = 'all', path_to_exclude: list = [], + selected_scanner: str = ALL_MODE, path_to_exclude: list = [], all_exclude_mode: tuple = () ) -> Tuple[bool, str, 'ScannerItem', list, list]: """ @@ -397,8 +449,8 @@ def run_scanners( logger.debug(f"Skipped paths: {excluded_path_with_default_exclusion}") if not selected_scanner: - selected_scanner = 'all' - if selected_scanner in ['scancode', 'all', 'kb']: + selected_scanner = ALL_MODE + if selected_scanner in ['scancode', ALL_MODE]: success, result_log[RESULT_KEY], scancode_result, license_list = run_scan(path_to_scan, output_file_name, write_json_file, num_cores, True, print_matched_text, formats, called_by_cli, @@ -406,20 +458,28 @@ def run_scanners( excluded_path_with_default_exclusion, excluded_files) excluded_files = set(excluded_files) if excluded_files else set() - if selected_scanner in ['scanoss', 'all']: + if selected_scanner in ['scanoss', ALL_MODE]: scanoss_result, api_limit_exceed = run_scanoss_py(path_to_scan, output_path, formats, True, num_cores, excluded_path_with_default_exclusion, excluded_files, write_json_file) + run_kb_msg = "" if selected_scanner in SCANNER_TYPE: - run_kb = True if selected_scanner in ['kb'] else False + run_kb = True if selected_scanner in ['kb', ALL_MODE] else False + if run_kb: + if not check_kb_server_reachable(): + run_kb = False + run_kb_msg = "KB Unreachable" + else: + run_kb_msg = "KB Enabled" + spdx_downloads, manifest_licenses = metadata_collector(path_to_scan, excluded_files) merged_result = merge_results(scancode_result, scanoss_result, spdx_downloads, - path_to_scan, run_kb, manifest_licenses) + path_to_scan, run_kb, manifest_licenses, excluded_files) scan_item = create_report_file(start_time, merged_result, license_list, scanoss_result, selected_scanner, print_matched_text, output_path, output_files, output_extensions, correct_mode, correct_filepath, path_to_scan, excluded_path_without_dot, formats, - api_limit_exceed, cnt_file_except_skipped, final_output_path) + api_limit_exceed, cnt_file_except_skipped, final_output_path, run_kb_msg) else: print_help_msg_source_scanner() result_log[RESULT_KEY] = "Unsupported scanner"