diff --git a/llmfoundry/command_utils/data_prep/convert_dataset_hf.py b/llmfoundry/command_utils/data_prep/convert_dataset_hf.py index 69b5e68..dd43b96 100644 --- a/llmfoundry/command_utils/data_prep/convert_dataset_hf.py +++ b/llmfoundry/command_utils/data_prep/convert_dataset_hf.py @@ -377,7 +377,7 @@ def convert_dataset_hf( ) loader = build_dataloader( dataset=hf_dataset, - batch_size=512, + batch_size=1, num_workers=num_workers, ) samples = generate_samples( @@ -405,6 +405,7 @@ def convert_dataset_hf( columns=columns, out=os.path.join(out_root, folder_split), compression=compression, + size_limit="128mb", ) as out: if denominator is not None: for sample in tqdm( diff --git a/llmfoundry/command_utils/data_prep/convert_finetuning_dataset.py b/llmfoundry/command_utils/data_prep/convert_finetuning_dataset.py index cbd1bd2..7048484 100644 --- a/llmfoundry/command_utils/data_prep/convert_finetuning_dataset.py +++ b/llmfoundry/command_utils/data_prep/convert_finetuning_dataset.py @@ -211,6 +211,7 @@ def convert_finetuning_dataset( out=out, compression=compression, keep_local=keep_local, + size_limit="128mb", ) as out: examples_removed = 0 for sample in tqdm(samples, desc=split_name): diff --git a/llmfoundry/data/data.py b/llmfoundry/data/data.py index 17b28e1..3a99c88 100644 --- a/llmfoundry/data/data.py +++ b/llmfoundry/data/data.py @@ -161,13 +161,11 @@ def __iter__(self) -> Iterable[dict[str, NDArray]]: ) iids = encoded['input_ids'] buffer = buffer + self.bos_tokens + iids + self.eos_tokens - while len(buffer) >= self.max_length: - concat_sample = buffer[:self.max_length] - buffer = buffer[self.max_length:] if self.should_wrap else [] - yield { - # convert to ndarray to store in MDS format - 'tokens': np.asarray(concat_sample, dtype=np.int32), - } + yield { + # convert to ndarray to store in MDS format + 'tokens': np.asarray(buffer, dtype=np.int32), + } + buffer = [] def stream_remote_local_validate( diff --git a/scripts/data_prep/data_lib/__init__.py b/scripts/data_prep/data_lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/data_prep/data_lib/utils.py b/scripts/data_prep/data_lib/utils.py new file mode 100644 index 0000000..cfbd225 --- /dev/null +++ b/scripts/data_prep/data_lib/utils.py @@ -0,0 +1,92 @@ +#% utils +def _banner(msg): + print("#"*len(msg)) + print(msg) + print("#"*len(msg)) + +def str_rows_features(ds): + return f"rows: {len(ds)} features: {ds['train'].features}" + +def get_datasets(): # target_repo): + ds_config = { + "tulu": { + "original": "allenai/tulu-3-sft-olmo-2-mixture", + "decontaminated": "LocalResearchGroup/split-tulu-3-sft-olmo-2-mixture-decontaminated", + "kind": "instruct", + "template": "tulu-with-template", + }, + "numina": { + "original": "AI-MO/NuminaMath-CoT", + "decontaminated": "LocalResearchGroup/split-NuminaMath-CoT-decontaminated", + "kind": "instruct", + "template": "numina-with-template", + }, + "glaive": { + "original": "glaiveai/glaive-code-assistant-v3", + "decontaminated": "LocalResearchGroup/split-glaive-code-assistant-v3-decontaminated", + "kind": "instruct", + "template": "glaive-with-template", + }, + "finemath": { + "original": "HuggingFaceTB/finemath", + "decontaminated": "LocalResearchGroup/split-finemath-decontaminated", + "kind": "pretrain", + "ds_name": "finemath-4plus", + }, + "pythonedu": { + "original": "Avelina/python-edu", + "decontaminated": "LocalResearchGroup/split-avelina-python-edu-decontaminated", + "kind": "pretrain", + }, + } + return ds_config + + +def rel_path(name, decontaminated): + return f"{name}" \ + f"{'-with-template' if get_datasets()[name]['kind'] == 'instruct' else ''}" \ + f"{'-decontaminated' if decontaminated else ''}" + +#% Allow to add extra datasets to CONSTS + +def add_dataset_config(name, splits): + from llmfoundry.command_utils.data_prep.convert_dataset_hf import CONSTS + CONSTS[name] = splits + + +def generate_constants(total_rows, chars_per_sample, chars_per_token): + from llmfoundry.command_utils.data_prep.convert_dataset_hf import CONSTS, DataSplitConstants, DatasetConstants + + ds_const = DatasetConstants( + chars_per_sample=chars_per_sample, + chars_per_token=chars_per_token, + ) + ds_const.splits["train"] = DataSplitConstants( + hf_split="train", + folder_split="train", + raw_samples=total_rows, + truncated_samples=None, + ) + + ds_const.splits["test"] = DataSplitConstants( + hf_split="test", + folder_split="test", + raw_samples=total_rows, + truncated_samples=None, + ) + return ds_const + + +def register_new_datasets(target = "LocalResearchGroup"): + constants = { + "finemath": generate_constants(6_700_000, 6212, 4), + "tulu": generate_constants(939_000, 6212, 4), + "numina": generate_constants(859_00, 6212, 4), + "pythonedu": generate_constants(7_680_000, 6212, 4), + "glaive": generate_constants(950_000, 6212, 4), + } + ds = get_datasets() + for name in ds.keys(): + add_dataset_config(ds[name]["original"], constants[name]) + add_dataset_config(ds[name]["decontaminated"], constants[name]) + diff --git a/scripts/data_prep/download_tokens.py b/scripts/data_prep/download_tokens.py new file mode 100644 index 0000000..b39dce7 --- /dev/null +++ b/scripts/data_prep/download_tokens.py @@ -0,0 +1,62 @@ +from argparse import ArgumentParser, Namespace, BooleanOptionalAction +from huggingface_hub import HfApi, login +import os + +from data_lib.utils import get_datasets, rel_path + + +def main(args): + api = HfApi() + + for ds in args.datasets: + ld = f"{args.out}/{ds}-tokens" + datadown = f"{args.user_org}/{rel_path(ds, args.decontaminated)}-tokenized" + print(f"downloading {datadown=} to {ld=}\n") + local_dir = api.snapshot_download( + repo_id=datadown, + repo_type="dataset", + local_dir=ld, + ) + +def parse_args() -> Namespace: + """Parse commandline arguments.""" + parser = ArgumentParser( + description= + "Downloads tokenized versions of train/test 1M, 100k, 10k, 1k", + ) + datasets = get_datasets().keys() + parser.add_argument( + "--datasets", + nargs="+", + choices=datasets, + default=datasets, + ) + + parser.add_argument( + "--user_org", + default="LocalResearchGroup", + help="user/org containing tokenizations", + ) + + parser.add_argument( + "--out", + default=".", + help="local download folder", + ) + + parser.add_argument( + "--decontaminated", + action=BooleanOptionalAction, + default=False, + help="use decontaminated dataset instead of original one", + ) + parsed = parser.parse_args() + return parsed + + +if __name__ == "__main__": + args = parse_args() + if not os.environ.get("HUGGING_FACE_HUB_TOKEN"): + print("No Hugging Face token found. Please login.") + login() + main(args) diff --git a/scripts/data_prep/text_dataset_preproc.py b/scripts/data_prep/text_dataset_preproc.py new file mode 100644 index 0000000..b1927fc --- /dev/null +++ b/scripts/data_prep/text_dataset_preproc.py @@ -0,0 +1,161 @@ + +from argparse import ArgumentParser, Namespace, BooleanOptionalAction +from datasets import load_dataset, load_from_disk, DatasetDict +from llmfoundry.data.finetuning.tasks import dataset_constructor +from data_lib.utils import get_datasets, register_new_datasets, _banner, str_rows_features + + +def create_refactor(dataset, decontaminated): + ds_name = dataset["ds_name"] if "ds_name" in dataset else None + process = dataset["after_pull"] if "after_pull" in dataset else None + ds = dataset["decontaminated"] if decontaminated else dataset["original"] + original = pull_orifinal_ds(ds, decontaminated, ds_name, process) + return original + +def pull_orifinal_ds( + hf_ds_src, + decontaminated, + ds_name=None, + after_pull=None, +): + _banner(f"Loading dataset {hf_ds_src}/{'default' if ds_name is None else ds_name}") + if ds_name: _banner(ds_name) + from llmfoundry.command_utils.data_prep.convert_dataset_hf import CONSTS + register_new_datasets() + dataset = load_dataset(path=hf_ds_src, name=ds_name) + if after_pull is not None: + dataset = after_pull(dataset, decontaminated) + return dataset + + +#% main loop +def _main_loop(args): + ds_config = get_datasets() + # Add after pull call to process instruct datasets with template + ds_config["tulu"]["after_pull"] = filter_tulu + ds_config["numina"]["after_pull"] = process_numina + ds_config["glaive"]["after_pull"] = process_glaive + for ds in args.datasets: + dataset = create_refactor(ds_config[ds], args.decontaminated) + private=False + hf_repo = f"{args.user_org}/{ds}-with-template{'-decontaminated' if args.decontaminated else ''}" + label="default" + shard_size = "128MB" + dataset.push_to_hub(hf_repo, config_name=label, private=private, max_shard_size=shard_size) + + +#% chat ml template and filtering of original datasets +def apply_chatml_template(inp: dict, k_prompt: str, k_response: str): + """Format dataset into ChatML template.""" + prompt = ( + "<|im_start|>system\nYou are a helpful AI assistant named SmolLM, trained by Local Research Group<|im_end|>\n" + f"<|im_start|>user\n{inp[k_prompt]}\n<|im_end|>\n" + ) + response = ( + f"<|im_start|>assistant\n{inp[k_response]}<|im_end|>\n" + "<|endoftext|>" + ) + return {"prompt": prompt, "response": response} + + +def template_to_tulu(inp: dict): + return apply_chatml_template(inp, "prompt", "response") + + +def template_to_numina(inp: dict): + return apply_chatml_template(inp, "problem", "solution") + + +def template_to_glaive(inp: dict): + return apply_chatml_template(inp, "question", "answer") + + +def filter_tulu(dataset, decontaminated): + print(f"\n\ntulu {str_rows_features(dataset)}\n\n") + if not decontaminated: + dataset = dataset.filter(lambda r: r["source"] is not None and "aya" not in r["source"] and len(r["messages"]) == 2) + dataset = dataset.remove_columns(["source", "dataset"]) + dataset = dataset.remove_columns(["id"]) + + def extract_qa(messages): + user_question = next((msg["content"] for msg in messages if msg["role"] == "user"), None) + assistant_response = next((msg["content"] for msg in messages if msg["role"] == "assistant"), None) + return {"prompt": user_question, "response": assistant_response} + + # Apply function to dataset + dataset = dataset.map(lambda example: extract_qa(example["messages"])) if not decontaminated else dataset + dataset = dataset.remove_columns(["messages"]) if not decontaminated else dataset + dataset = dataset.map(lambda example: template_to_tulu(example)) if not decontaminated else dataset + print(f"tulu after {str_rows_features(dataset)}") + return dataset + + +def process_numina(dataset, decontaminated): + print(f"numina {str_rows_features(dataset)}") + # remove conflictlict that breaks pytorch collate with 2 row per batch! + dataset = dataset.map(lambda example: template_to_numina(example)) + colums = ["source", "problem", "solution"] + if not decontaminated: colums.append("messages") + dataset = dataset.remove_columns(colums) + print(f"numina processed: {str_rows_features(dataset)}") + return dataset + + +def process_glaive(dataset, decontaminated): + print(f"glaive {str_rows_features(dataset)}") + + def extract_qa(messages): + return template_to_glaive(messages) + + dataset = dataset.map(lambda example: extract_qa(example)) + dataset = dataset.remove_columns(["question", "answer"]) + print(f"glaive processed: {str_rows_features(dataset)}") + + return dataset + +#% argument parsing section +def main(args): + if args.datasets: + _main_loop(args) + + +def parse_args() -> Namespace: + """Parse commandline arguments.""" + parser = ArgumentParser( + description="""Refactor instruct datasets with and witout decontamination + """, + ) + ds = [k for k in get_datasets() if get_datasets()[k]["kind"] == "instruct"] + + parser.add_argument( + "--datasets", + nargs="+", + choices=ds, + default=ds, + ) + + parser.add_argument( + "--user_org", + default="LocalResearchGroup", + help="user/org base namespace default is `LocalResearchGroup`", + ) + + parser.add_argument( + "--decontaminated", + action=BooleanOptionalAction, + default=False, + help="use decontaminated dataset instead of original one", + ) + + parsed = parser.parse_args() + return parsed + + +if __name__ == "__main__": + args = parse_args() + import os + if not os.environ.get("HUGGING_FACE_HUB_TOKEN"): + print("No Hugging Face token found. Please login.") + login() + main(args) + diff --git a/scripts/data_prep/text_dataset_tokenize.py b/scripts/data_prep/text_dataset_tokenize.py new file mode 100644 index 0000000..3bf34e4 --- /dev/null +++ b/scripts/data_prep/text_dataset_tokenize.py @@ -0,0 +1,157 @@ +from argparse import ArgumentParser, Namespace, BooleanOptionalAction +from huggingface_hub import HfApi, login +from pathlib import Path + +import os + + +from convert_finetuning_dataset import convert_finetuning_dataset_from_args +from llmfoundry.command_utils import convert_dataset_hf_from_args + +from data_lib.utils import get_datasets, _banner, register_new_datasets, rel_path + +#% + +def upload_token_folder(folder_path, namespace, path_in_repo): + _banner(f"Uploading {folder_path} to {namespace}") + api = HfApi() + cr = api.create_repo(namespace, repo_type="dataset", exist_ok=True) + r = api.upload_folder( + repo_id=namespace, + repo_type="dataset", + folder_path=folder_path, + path_in_repo=path_in_repo, + ) + print(f"token uploaded result: {r}@{cr}") + + + +def create_tokenized_upload(name, user_org, decontaminated): + dataset = get_datasets()[name] + ablations = ["train", "test"] if decontaminated else ["train"] + data_subset = dataset["ds_name"] if "ds_name" in dataset else "default" + if name in ["finemath"] and decontaminated: + data_subset = "default" + for ablation in ablations: + namespace = f"{user_org}/{rel_path(name, decontaminated)}-tokenized" + local_path = Path(".") / f"tokenized/{rel_path(name, decontaminated)}/{data_subset}/{ablation}" # out_root + upload_token_folder(local_path, namespace, f"/{ablation}") + print("upload finished.") + +def create_tokens(name, user_org, decontaminated): + dataset = get_datasets()[name] + max_seq_len = 8192 + data_subset = dataset["ds_name"] if "ds_name" in dataset else "default" + if name in ["finemath"] and decontaminated: + data_subset = "default" + + if dataset["kind"] == "pretrain": + print("\nconvert_dataset_hf_from_args for", name, data_subset) + print(f"{dataset['decontaminated'] if decontaminated else dataset['original']}\n\n") + tokenizer="HuggingFaceTB/SmolLM2-135M" + convert_dataset_hf_from_args( + dataset=f"{dataset['decontaminated'] if decontaminated else dataset['original']}", + data_subset=data_subset, + splits=["train", "test"] if decontaminated else ['train'], + out_root=f"tokenized/{rel_path(name, decontaminated)}/{data_subset}", + compression="zstd", + concat_tokens=max_seq_len, + tokenizer=tokenizer, + tokenizer_kwargs=f'{{"device_map":"auto"}}', + bos_text=None, + eos_text="<|endoftext|>", + no_wrap=True, + num_workers=None, + ) + elif dataset["kind"] == "instruct": + print(f"\nconvert_finetuning_dataset_from_args for", data_subset) + print(f"{user_org}/{rel_path(name,decontaminated)}\n\n") + tokenizer="HuggingFaceTB/SmolLM2-135M-instruct" + convert_finetuning_dataset_from_args( + f"{user_org}/{rel_path(name,decontaminated)}", + ###### f"{dataset['decontaminated'] if decontaminated else dataset['original']}", + f"{data_subset}", # data_subset + ["train", "test"] if decontaminated else ['train'], + None, # no preprocessing dataset is ready + [], + True, + f"tokenized/{rel_path(name, decontaminated)}/{data_subset}", # out_root + None, + "zstd", + None, # num_workers + tokenizer, # tokenizer + None, + max_seq_len, # max_seq_len + "none", # target_prompts + "last", # target_responses + False, # encoder_decoder + ) + else: + raise RuntimeError(f"Unknow dataset kind: {d['kind']}") + + + +#% main loop +def main(args): + register_new_datasets() + for ds in args.datasets: + if args.tokenize: + _banner(f"Making tokens for {ds} {args.decontaminated}") + dataset = create_tokens(ds, args.user_org, args.decontaminated) + if args.upload_tokens: + _banner(f"Uploading tokens for {ds} {args.decontaminated}") + create_tokenized_upload(ds, args.user_org, args.decontaminated) + + +def parse_args() -> Namespace: + """Parse commandline arguments.""" + parser = ArgumentParser( + description="""Tool to refactor instruct datasets + """, + ) + datasets = get_datasets() + parser.add_argument( + "--datasets", + nargs="+", + choices=datasets.keys(), + default=datasets.keys(), + ) + + parser.add_argument( + "--user_org", + default="LocalResearchGroup", + help="user/org base namespace to upload tokens default is `LocalResearchGroup`", + ) + + parser.add_argument( + "--tokenize", + action=BooleanOptionalAction, + default=True, + help="generate local tokenization for splits", + ) + parser.add_argument( + "--upload-tokens", + action=BooleanOptionalAction, + default=True, + help="upload local tokenization to user/org", + ) + + parser.add_argument( + "--decontaminated", + action=BooleanOptionalAction, + default=False, + help="use decontaminated dataset instead of original one", + ) + + + parsed = parser.parse_args() + return parsed + + +if __name__ == "__main__": + args = parse_args() + if not os.environ.get("HUGGING_FACE_HUB_TOKEN"): + print("No Hugging Face token found. Please login.") + login() + main(args) + diff --git a/scripts/modal/modal_script.py b/scripts/modal/modal_script.py index 32cbbdc..ccf7c0e 100644 --- a/scripts/modal/modal_script.py +++ b/scripts/modal/modal_script.py @@ -365,62 +365,85 @@ def push_folder_to_hf(folder_path: str, repo_id: str | None = None, repo_type: s api.upload_folder(folder_path=folder_path, repo_id=repo_id, use_auth_token=True, repo_type=repo_type) print(f'Folder "{folder_path}" uploaded to: "{repo_id}" successfully.') -@app.function(gpu=TRAINING_GPU, image=image, timeout=10800, secrets=[Secret.from_name("LRG")], +@app.function(gpu=TRAINING_GPU, image=image, timeout=3*3600, secrets=[Secret.from_name("LRG")], volumes={DATASETS_VOLUME_MOUNT_PATH: DATASETS_VOLUME}, - concurrency_limit=1) + max_containers=1) def pull_hf_to_folder(): import subprocess import os - + # Change to llm-foundry/scripts directory at the start os.chdir("/llm-foundry/scripts") print(f"Working directory: {os.getcwd()}") - + # Step 1: pull all tokens print(f"Downloading repos to {DATASETS_VOLUME_MOUNT_PATH}/") data_prep_cmd = [ PYTHON_PATH, # Use the correct Python interpreter - "data_prep/download_repo.py", + "data_prep/download_tokens.py", + "--decontaminated", "--out", f"{DATASETS_VOLUME_MOUNT_PATH}/", ] result = subprocess.run(data_prep_cmd, capture_output=True, text=True) print(result.stdout) if result.stderr: print("Download data errors:", result.stderr) - + DATASETS_VOLUME.commit() @app.function(gpu=TRAINING_GPU, image=image, timeout=3600, secrets=[Secret.from_name("LRG")], - concurrency_limit=1) -def process_datasets(): + max_containers=1) +def preprocess_datasets(): import subprocess import os - - # Change to llm-foundry/scripts directory at the start + os.chdir("/llm-foundry/scripts") print(f"Working directory: {os.getcwd()}") - - # Step 1: pull all tokens - print(f"Processing datasets...") + + print(f"Preprocessing datasets...") data_prep_cmd = [ - PYTHON_PATH, # Use the correct Python interpreter - "data_prep/convert_dataset_hf.py", + PYTHON_PATH, + "data_prep/text_dataset_preproc.py", + "--decontaminated", ] result = subprocess.run(data_prep_cmd, capture_output=True, text=True) print(result.stdout) if result.stderr: print("Process dataset errors:", result.stderr) + +@app.function(cpu=8, image=image, timeout=24*3600, secrets=[Secret.from_name("LRG")], + concurrency_limit=1) +def tokenize_datasets(): + import subprocess + import os + + os.chdir("/llm-foundry/scripts") + print(f"Working directory: {os.getcwd()}") + + print(f"Tokenizing datasets...") + data_prep_cmd = [ + PYTHON_PATH, + "data_prep/text_dataset_tokenize.py", + "--decontaminated", + ] + result = subprocess.run(data_prep_cmd, capture_output=False, text=True) + if result.stderr: + print("Process dataset errors:", result.stderr) + @app.local_entrypoint() def main(): from pathlib import Path import time run_ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + print(run_ts) + preprocess_datasets.remote() if False else None + tokenize_datasets.remote() if False else None get_stats.remote() time.sleep(1) - pull_hf_to_folder.remote() # run once to download the datasets - time.sleep(1) + #pull_hf_to_folder.remote() # run once to download the datasets + #time.sleep(1) # uncomment the next three lines to train the model # model_path = train_with_aim.remote(run_ts, yaml_path=f"train/yamls/finetune/{TRAIN_YAML}")