From c328a1aadb9eb8d0fa564e570c99ddf4e9f9ed3a Mon Sep 17 00:00:00 2001 From: jmestwa-coder Date: Wed, 8 Apr 2026 16:53:01 +0530 Subject: [PATCH] Fix path flattening collisions and make runfiles resolution deterministic in make_corpus_dir --- fuzzing/tools/BUILD | 7 + fuzzing/tools/make_corpus_dir.py | 77 ++++++++++- fuzzing/tools/make_corpus_dir_test.py | 186 ++++++++++++++++++++++++++ 3 files changed, 268 insertions(+), 2 deletions(-) create mode 100644 fuzzing/tools/make_corpus_dir_test.py diff --git a/fuzzing/tools/BUILD b/fuzzing/tools/BUILD index 9fbcc787..d40cfdb5 100644 --- a/fuzzing/tools/BUILD +++ b/fuzzing/tools/BUILD @@ -83,3 +83,10 @@ py_test( srcs = ["dict_validation_test.py"], deps = [":dict_validation"], ) + +py_test( + name = "make_corpus_dir_test", + srcs = ["make_corpus_dir_test.py"], + data = ["make_corpus_dir.py"], + deps = [requirement("absl-py")], +) diff --git a/fuzzing/tools/make_corpus_dir.py b/fuzzing/tools/make_corpus_dir.py index aeae9e32..5caf0e44 100644 --- a/fuzzing/tools/make_corpus_dir.py +++ b/fuzzing/tools/make_corpus_dir.py @@ -36,12 +36,36 @@ flags.mark_flag_as_required("output_dir") +def flatten_corpus_path(corpus): + prefix = "" + if corpus.startswith("./") or (os.sep == "\\" and corpus.startswith(".\\")): + prefix = "dot-" + corpus = corpus[2:] + + if os.sep == "\\": + corpus = corpus.replace("/", "\\") + + drive, tail = os.path.splitdrive(corpus) + + parts = [part for part in tail.split(os.sep) if part] + flattened = "-".join(parts) + + if drive: + drive_part = drive.rstrip(":\\/").replace("\\", "-").replace("/", "-") + flattened = drive_part + ("-" + flattened if flattened else "") + elif tail.startswith(os.sep): + flattened = "-" + flattened + + return prefix + flattened + def expand_corpus_to_file_list(corpus, file_list): if not os.path.exists(corpus): raise FileNotFoundError("file " + corpus + " doesn't exist") if os.path.isdir(corpus): # The first element in glob("dir/**") is "dir/", which needs to be excluded - file_list.extend(glob.glob(os.path.join(corpus, "**"), recursive=True)[1:]) + for expanded_path in glob.glob(os.path.join(corpus, "**"), recursive=True)[1:]: + if os.path.isfile(expanded_path): + file_list.append(expanded_path) else: file_list.append(corpus) @@ -59,8 +83,57 @@ def main(argv): corpus_line.rstrip("\n"), expanded_file_list) if expanded_file_list: + max_flattened_length = 200 + flattened_names = {} + flattened_name_counts = {} + needs_suffix = set() + + for corpus in expanded_file_list: + flattened = flatten_corpus_path(corpus) + flattened_names[corpus] = flattened + flattened_key = flattened.lower() if os.name == "nt" else flattened + flattened_name_counts[flattened_key] = ( + flattened_name_counts.get(flattened_key, 0) + 1) + if len(flattened) > max_flattened_length: + needs_suffix.add(corpus) + + for corpus in expanded_file_list: + flattened = flattened_names[corpus] + flattened_key = flattened.lower() if os.name == "nt" else flattened + if flattened_name_counts[flattened_key] > 1: + needs_suffix.add(corpus) + + suffix_map = {} + if needs_suffix: + suffix_width = len(str(len(needs_suffix))) + for index, corpus in enumerate(sorted(needs_suffix), start=1): + suffix_map[corpus] = f"{index:0{suffix_width}d}" + + final_name_map = {} + final_name_counts = {} + for corpus in expanded_file_list: + flattened = flattened_names[corpus] + suffix = suffix_map.get(corpus) + if suffix: + prefix_budget = max_flattened_length - len(suffix) - 2 + flattened = flattened[:max(1, prefix_budget)] + "--" + suffix + final_name_map[corpus] = flattened + flattened_key = flattened.lower() if os.name == "nt" else flattened + final_name_counts[flattened_key] = ( + final_name_counts.get(flattened_key, 0) + 1) + + if any(count > 1 for count in final_name_counts.values()): + unique_corpora = sorted(set(expanded_file_list)) + alias_width = len(str(len(unique_corpora))) + alias_map = { + corpus: f"entry-{index:0{alias_width}d}" + for index, corpus in enumerate(unique_corpora, start=1) + } + for corpus in expanded_file_list: + final_name_map[corpus] = alias_map[corpus] + for corpus in expanded_file_list: - dest = os.path.join(FLAGS.output_dir, corpus.replace("/", "-")) + dest = os.path.join(FLAGS.output_dir, final_name_map[corpus]) # Whatever the separator we choose, there is an chance that # the dest name conflicts with another file if os.path.exists(dest): diff --git a/fuzzing/tools/make_corpus_dir_test.py b/fuzzing/tools/make_corpus_dir_test.py new file mode 100644 index 00000000..0c98cb41 --- /dev/null +++ b/fuzzing/tools/make_corpus_dir_test.py @@ -0,0 +1,186 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Lint as: python3 +"""Unit tests for make_corpus_dir.py.""" + +import os +import subprocess +import sys +import tempfile +import unittest +from pathlib import Path + + +def resolve_script_path(): + candidates = [Path(__file__).with_name("make_corpus_dir.py")] + test_workspace = os.environ.get("TEST_WORKSPACE") + manifest_lookup_path = "fuzzing/tools/make_corpus_dir.py" + if test_workspace: + test_srcdir = os.environ.get("TEST_SRCDIR") + if test_srcdir: + candidates.append( + Path(test_srcdir) / test_workspace / "fuzzing" / "tools" / + "make_corpus_dir.py") + runfiles_dir = os.environ.get("RUNFILES_DIR") + if runfiles_dir: + candidates.append( + Path(runfiles_dir) / test_workspace / "fuzzing" / "tools" / + "make_corpus_dir.py") + manifest_lookup_path = ( + f"{test_workspace}/fuzzing/tools/make_corpus_dir.py") + + for candidate in candidates: + if candidate.is_file(): + return candidate + + manifest_file = os.environ.get("RUNFILES_MANIFEST_FILE") + if manifest_file: + try: + workspace_match = None + main_match = None + with open(manifest_file, "r", encoding="utf-8") as manifest: + for line in manifest: + entry = line.rstrip("\n") + if not entry: + continue + logical_path, separator, real_path = entry.partition(" ") + if not separator: + continue + normalized_path = logical_path.replace("\\", "/") + if not normalized_path.endswith("fuzzing/tools/make_corpus_dir.py"): + continue + candidate = Path(real_path) + if not candidate.is_file(): + continue + if test_workspace and normalized_path.startswith(f"{test_workspace}/"): + workspace_match = candidate + break + if normalized_path.startswith("_main/") and not main_match: + main_match = candidate + if workspace_match: + return workspace_match + if main_match: + return main_match + except OSError: + pass + + raise FileNotFoundError("could not resolve make_corpus_dir.py in test runfiles") + + +SCRIPT_PATH = resolve_script_path() + + +class MakeCorpusDirTest(unittest.TestCase): + + def run_tool(self, args, cwd): + return subprocess.run( + [sys.executable, str(SCRIPT_PATH)] + args, + cwd=str(cwd), + text=True, + capture_output=True, + check=False, + ) + + def test_copies_nested_corpus_directory(self): + with tempfile.TemporaryDirectory() as td: + tmp = Path(td) + corpus = tmp / "corpus" + (corpus / "nested").mkdir(parents=True) + (corpus / "a.txt").write_text("A", encoding="utf-8") + (corpus / "nested" / "b.txt").write_text("B", encoding="utf-8") + output_dir = tmp / "out" + + result = self.run_tool( + ["--corpus_list=corpus", "--output_dir=out"], cwd=tmp) + + self.assertEqual(result.returncode, 0, msg=result.stderr) + copied_files = [path for path in output_dir.iterdir() if path.is_file()] + self.assertEqual(len(copied_files), 2) + copied_contents = sorted(path.read_text(encoding="utf-8") + for path in copied_files) + self.assertEqual(copied_contents, ["A", "B"]) + + def test_copies_absolute_corpus_file(self): + with tempfile.TemporaryDirectory() as td: + tmp = Path(td) + corpus_file = tmp / "corpus-input.txt" + corpus_file.write_text("payload", encoding="utf-8") + output_dir = tmp / "out" + + result = self.run_tool( + [f"--corpus_list={corpus_file}", f"--output_dir={output_dir}"], + cwd=tmp, + ) + + self.assertEqual(result.returncode, 0, msg=result.stderr) + copied_files = [path for path in output_dir.iterdir() if path.is_file()] + self.assertEqual(len(copied_files), 1) + self.assertEqual(copied_files[0].read_text(encoding="utf-8"), "payload") + + def test_distinguishes_dot_prefix_from_plain_relative_path(self): + with tempfile.TemporaryDirectory() as td: + tmp = Path(td) + corpus_file = tmp / "a.txt" + corpus_file.write_text("payload", encoding="utf-8") + output_dir = tmp / "out" + + result = self.run_tool( + ["--corpus_list=./a.txt,a.txt", "--output_dir=out"], + cwd=tmp, + ) + + self.assertEqual(result.returncode, 0, msg=result.stderr) + copied_files = [path for path in output_dir.iterdir() if path.is_file()] + self.assertEqual(len(copied_files), 2) + + def test_distinguishes_parent_navigation_from_plain_relative_path(self): + with tempfile.TemporaryDirectory() as td: + tmp = Path(td) + (tmp / "dir").mkdir() + corpus_file = tmp / "a.txt" + corpus_file.write_text("payload", encoding="utf-8") + output_dir = tmp / "out" + + result = self.run_tool( + ["--corpus_list=dir/../a.txt,a.txt", "--output_dir=out"], + cwd=tmp, + ) + + self.assertEqual(result.returncode, 0, msg=result.stderr) + copied_files = [path for path in output_dir.iterdir() if path.is_file()] + self.assertEqual(len(copied_files), 2) + + def test_distinguishes_dot_prefix_from_literal_dot_filename(self): + with tempfile.TemporaryDirectory() as td: + tmp = Path(td) + (tmp / "a.txt").write_text("from-a", encoding="utf-8") + (tmp / "dot-a.txt").write_text("from-dot-a", encoding="utf-8") + output_dir = tmp / "out" + + result = self.run_tool( + ["--corpus_list=./a.txt,dot-a.txt", "--output_dir=out"], + cwd=tmp, + ) + + self.assertEqual(result.returncode, 0, msg=result.stderr) + copied_files = [path for path in output_dir.iterdir() if path.is_file()] + self.assertEqual(len(copied_files), 2) + copied_contents = sorted(path.read_text(encoding="utf-8") + for path in copied_files) + self.assertEqual(copied_contents, ["from-a", "from-dot-a"]) + + +if __name__ == "__main__": + unittest.main()