diff --git a/.gitignore b/.gitignore index 3df03d6..d01c1ab 100644 --- a/.gitignore +++ b/.gitignore @@ -91,6 +91,13 @@ ENV/ .idea/ .mypy_cache/ apex/ +LSH/ /data/ results/ outputs/ +lab/ +credentials + +# logs +logs/ +mlruns/ diff --git a/.isort.cfg b/.isort.cfg index c270db0..256241a 100644 --- a/.isort.cfg +++ b/.isort.cfg @@ -6,4 +6,4 @@ use_parentheses=True line_length=119 skip_glob=venv/*,stubs/* known_first_party = language_model -known_third_party = ds_shared,pynlple,setuptools,tokenizers,torch,transformers +known_third_party = bs4,datasets,ds_shared,more_itertools,numpy,pynlple,setuptools,tokenizers,torch,transformers,wget diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8beafd0..745640b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,41 +1,14 @@ repos: - - repo: https://github.com/asottile/seed-isort-config - rev: v1.9.1 + - repo: git@github.com:youscan/python-codestyle.git + rev: pre_commit_version hooks: - id: seed-isort-config - - repo: https://github.com/pre-commit/mirrors-isort - rev: v4.3.21 - hooks: - id: isort - args: ["-rc"] - - repo: https://github.com/psf/black - rev: 19.3b0 - hooks: - id: black - args: ["--line-length=119"] - - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v2.3.0 - hooks: - id: trailing-whitespace - id: check-yaml - id: check-json - id: end-of-file-fixer - id: requirements-txt-fixer - - repo: https://github.com/pycqa/flake8 - rev: 3.8.2 - hooks: - id: flake8 - additional_dependencies: [ - flake8-bugbear==20.1.4, - flake8-builtins==1.5.3, - flake8-debugger==3.2.1, - flake8-isort==3.0.0, - isort==4.3.21, - ] - args: ["--config=setup.cfg"] - - repo: https://github.com/pre-commit/mirrors-mypy - rev: v0.761 - hooks: - id: mypy - args: ["--config=setup.cfg"] - exclude: configs/ diff --git a/README.md b/README.md index b963812..b49e3e9 100644 --- a/README.md +++ b/README.md @@ -28,8 +28,8 @@ Ukrainian Roberta is released via [HuggingFace Transformers library](https://hug ```python from transformers import pipeline, RobertaForMaskedLM, RobertaTokenizer -model = RobertaForMaskedLM.from_pretrained("ukr-roberta-base") -tokenizer = RobertaTokenizer.from_pretrained("ukr-roberta-base") +model = RobertaForMaskedLM.from_pretrained("youscan/ukr-roberta-base") +tokenizer = RobertaTokenizer.from_pretrained("youscan/ukr-roberta-base") fill_mask = pipeline('fill-mask', model=model, tokenizer=tokenizer) fill_mask("Тарас Шевченко – великий українсьский .") diff --git a/configs/cyr/gpt/README.md b/configs/cyr/gpt/README.md new file mode 100644 index 0000000..0d91687 --- /dev/null +++ b/configs/cyr/gpt/README.md @@ -0,0 +1,10 @@ +# Steps: + +1) `python run.py --task configs/cyr/gpt/load_data/wiki.py` +2) `python -m wikiextractor.WikiExtractor outputs/cyr/gpt/load_data/wiki/ukwiki-latest-pages-articles.xml.bz2 -o outputs/cyr/gpt/load_data/wiki/ukwiki-latest-pages-articles -b 1M --no-templates` +3) `python run.py --task configs/cyr/gpt/load_data/in-house.py` +4) `python run.py --task configs/cyr/gpt/extract_texts/train-validation-open-data.py` +5) `python run.py --task configs/cyr/gpt/extract_texts/in-house-data.py` +6) `python run.py --task configs/cyr/gpt/train_tokenizer/ukr-gpt.py` +7) `python run.py --task configs/cyr/gpt/train_tokenizer/convert-to-transformers.py` +8) `shuf outputs/cyr/gpt/extract_texts/train-validation-open-data/train.txt -o outputs/cyr/gpt/extract_texts/train-validation-open-data/train_shuffled.txt` diff --git a/configs/cyr/gpt/extract_texts/in-house-data.py b/configs/cyr/gpt/extract_texts/in-house-data.py new file mode 100644 index 0000000..5852472 --- /dev/null +++ b/configs/cyr/gpt/extract_texts/in-house-data.py @@ -0,0 +1,22 @@ +from pynlple.processing.preprocessor import ( + HtmlTagReplacer, + MultiLetterReplacer, + MultiNonLetterReplacer, + StackingPreprocessor, + URLReplacer, +) + +from language_model.data.extract import ExtractTextsFromData, FromLoadedYsDataSource + +YS_FOLDER_PATHS = ["outputs/cyr/gpt/load_data/in-house"] + + +preprocessor = StackingPreprocessor( + [HtmlTagReplacer(), URLReplacer(), MultiNonLetterReplacer(include_digits=False), MultiLetterReplacer()] +) + +ys_train = FromLoadedYsDataSource(source_folder_paths=YS_FOLDER_PATHS) + +task = ExtractTextsFromData( + text_source=ys_train, preprocessor=preprocessor, seeds=100, char_ngram=20, bands=20, min_jaccard=0.9 +) diff --git a/configs/cyr/gpt/extract_texts/train-validation-open-data.py b/configs/cyr/gpt/extract_texts/train-validation-open-data.py new file mode 100644 index 0000000..91418e1 --- /dev/null +++ b/configs/cyr/gpt/extract_texts/train-validation-open-data.py @@ -0,0 +1,28 @@ +from itertools import chain + +from datasets import load_dataset +from pynlple.processing.preprocessor import ( + HtmlTagReplacer, + MultiLetterReplacer, + MultiNonLetterReplacer, + StackingPreprocessor, + URLReplacer, +) + +from language_model.data.extract import PostWikiExtractorDataSource, RandomSplitTextsFromData + +WIKI_EXTRACTED_PATH = "outputs/cyr/gpt/load_data/wiki/ukwiki-latest-pages-articles" + + +preprocessor = StackingPreprocessor( + [HtmlTagReplacer(), URLReplacer(), MultiNonLetterReplacer(include_digits=False), MultiLetterReplacer()] +) + +oscar_train = (item["text"] for item in load_dataset("oscar", "unshuffled_deduplicated_uk", split="train")) +cc100_train = (item["text"] for item in load_dataset("cc100", lang="uk", split="train")) +wiki_train = (item["text"] for item in PostWikiExtractorDataSource(WIKI_EXTRACTED_PATH)) + + +task = RandomSplitTextsFromData( + text_source=chain(oscar_train, cc100_train, wiki_train), preprocessor=preprocessor, test_size=5_000 +) diff --git a/configs/cyr/gpt/extract_vectors/vectorize-train.py b/configs/cyr/gpt/extract_vectors/vectorize-train.py new file mode 100644 index 0000000..7b4a9bd --- /dev/null +++ b/configs/cyr/gpt/extract_vectors/vectorize-train.py @@ -0,0 +1,27 @@ +import os + +from transformers import PreTrainedTokenizerFast + +from language_model.data.extract import ExtractVectorsFromTexts, LineByLineSource, ShuffledSources + +TOKENIZER_PATH = "outputs/cyr/gpt/train_tokenizer/convert-to-transformers/tokenizer/" + +IN_HOUSE_TRAIN_DATA_PATH = "outputs/cyr/gpt/extract_texts/in-house-data/texts.txt" +OPEN_TRAIN_DATA_PATH = "outputs/cyr/gpt/extract_texts/train-validation-open-data/train_shuffled.txt" +MODEL_MAX_LENGTH = 1024 + +# data +train_data_source = ShuffledSources( + (text for text in LineByLineSource(IN_HOUSE_TRAIN_DATA_PATH)), + (text for text in LineByLineSource(OPEN_TRAIN_DATA_PATH)), +) + +os.environ["TOKENIZERS_PARALLELISM"] = "true" + +task = ExtractVectorsFromTexts( + data_source=train_data_source, + tokenizer=PreTrainedTokenizerFast.from_pretrained(TOKENIZER_PATH), + block_size=MODEL_MAX_LENGTH, + process_batch_size=100_000, + workers=18, +) diff --git a/configs/cyr/gpt/extract_vectors/vectorize-validation.py b/configs/cyr/gpt/extract_vectors/vectorize-validation.py new file mode 100644 index 0000000..11fcd68 --- /dev/null +++ b/configs/cyr/gpt/extract_vectors/vectorize-validation.py @@ -0,0 +1,22 @@ +import os + +from transformers import PreTrainedTokenizerFast + +from language_model.data.extract import ExtractVectorsFromTexts, LineByLineSource + +TOKENIZER_PATH = "outputs/cyr/gpt/train_tokenizer/convert-to-transformers/tokenizer/" + +OPEN_VALIDATION_DATA_PATH = "outputs/cyr/gpt/extract_texts/train-validation-open-data/validation.txt" +MODEL_MAX_LENGTH = 1024 + +# data +validation_data_source = LineByLineSource(OPEN_VALIDATION_DATA_PATH) +os.environ["TOKENIZERS_PARALLELISM"] = "true" + +task = ExtractVectorsFromTexts( + data_source=validation_data_source, + tokenizer=PreTrainedTokenizerFast.from_pretrained(TOKENIZER_PATH), + block_size=MODEL_MAX_LENGTH, + process_batch_size=100_000, + workers=18, +) diff --git a/configs/cyr/gpt/load_data/in-house.py b/configs/cyr/gpt/load_data/in-house.py new file mode 100644 index 0000000..a6acb90 --- /dev/null +++ b/configs/cyr/gpt/load_data/in-house.py @@ -0,0 +1,9 @@ +from language_model.data.load import YSDataDownloadTask +from language_model.data.processing import LightweightMention + +task = YSDataDownloadTask( + credentials_path="credentials", + topic_id=275648, + query={"from": "2019-01-01", "to": "2021-09-01", "sanitize": False, "dedup": False}, + mention_processor=LightweightMention(), +) diff --git a/configs/cyr/gpt/load_data/wiki.py b/configs/cyr/gpt/load_data/wiki.py new file mode 100644 index 0000000..3323178 --- /dev/null +++ b/configs/cyr/gpt/load_data/wiki.py @@ -0,0 +1,3 @@ +from language_model.data.load import WikiDownloadTask + +task = WikiDownloadTask(url="https://dumps.wikimedia.org/ukwiki/latest/ukwiki-latest-pages-articles.xml.bz2") diff --git a/configs/cyr/gpt/train_model/ukr-gpt.py b/configs/cyr/gpt/train_model/ukr-gpt.py new file mode 100644 index 0000000..300fb8d --- /dev/null +++ b/configs/cyr/gpt/train_model/ukr-gpt.py @@ -0,0 +1,74 @@ +from transformers import ( + GPT2Config, + GPT2LMHeadModel, + IntervalStrategy, + PreTrainedTokenizerFast, + Trainer, + TrainingArguments, +) + +from language_model.data.dataset import ( + DataCollatorForGroupTextForCasualLMDataset, + FromInputIdsDataset, + FromInputIdsIterableDataset, +) +from language_model.modelling.trainer import TransformersTrainTask + +TOKENIZER_PATH = "outputs/cyr/gpt/train_tokenizer/convert-to-transformers/tokenizer/" + +TRAIN_IDS_PATH = "outputs/cyr/gpt/extract_vectors/vectorize-train/processed_batch.jsonl" +VALIDATION_IDS_PATH = "outputs/cyr/gpt/extract_vectors/vectorize-validation/processed_batch.jsonl" +MODEL_MAX_LENGTH = 1024 + + +# tokenizer +tokenizer = PreTrainedTokenizerFast.from_pretrained(TOKENIZER_PATH) +# model +model_config = GPT2Config(vocab_size=len(tokenizer), bos_token_id=tokenizer.bos_token_id) +model = GPT2LMHeadModel(model_config) + + +# data +train_dataset = FromInputIdsIterableDataset(TRAIN_IDS_PATH) +valid_dataset = FromInputIdsDataset(VALIDATION_IDS_PATH) +data_collator = DataCollatorForGroupTextForCasualLMDataset(MODEL_MAX_LENGTH) + + +training_args = TrainingArguments( + do_train=True, + do_eval=True, + evaluation_strategy=IntervalStrategy.STEPS, + eval_steps=20_000, + num_train_epochs=5, + per_device_train_batch_size=4, # overall bs = 4 * 16 * num_gpus (GPT2 used 512) + gradient_accumulation_steps=16, + per_device_eval_batch_size=4, + output_dir="checkpoints", + overwrite_output_dir=False, + save_steps=20_000, + save_total_limit=10, + prediction_loss_only=False, + learning_rate=0.0002, # (was manually tuned in GPT2 on held-out validation) + warmup_ratio=0.004, + fp16=True, + logging_dir="logs", + seed=42, + lr_scheduler_type="cosine", # type: ignore + logging_first_step=True, + logging_steps=500, + label_names=["labels"], + load_best_model_at_end=True, + group_by_length=False, + report_to=["mlflow"], + dataloader_num_workers=1, # because of IterableDataset that reads from one opened file +) + +trainer = Trainer( + model=model, + args=training_args, + train_dataset=train_dataset, + eval_dataset=valid_dataset, + data_collator=data_collator, +) + +task = TransformersTrainTask(trainer=trainer) diff --git a/configs/cyr/gpt/train_tokenizer/convert-to-transformers.py b/configs/cyr/gpt/train_tokenizer/convert-to-transformers.py new file mode 100644 index 0000000..4885e79 --- /dev/null +++ b/configs/cyr/gpt/train_tokenizer/convert-to-transformers.py @@ -0,0 +1,22 @@ +from transformers import PreTrainedTokenizerFast + +from language_model.tokenization.factory import FAST_TOKENIZER_DEFAULT_FILE_NAME +from language_model.tokenization.tasks import PreTrainedTokenizerFastSavingTask + +TOKENIZER_PATH = f"outputs/cyr/gpt/train_tokenizer/ukr-gpt/{FAST_TOKENIZER_DEFAULT_FILE_NAME}" + +IN_HOUSE_TRAIN_DATA_PATH = "outputs/cyr/gpt/extract_texts/in-house-data/texts.txt" +OPEN_TRAIN_DATA_PATH = "outputs/cyr/gpt/extract_texts/train-validation-open-data/train_shuffled.txt" +MODEL_MAX_LENGTH = 1024 + + +# tokenizer +tokenizer = PreTrainedTokenizerFast( + tokenizer_file=TOKENIZER_PATH, model_max_length=MODEL_MAX_LENGTH, padding_side="right" +) +tokenizer.add_special_tokens({"bos_token": "<|endoftext|>"}) +# basically `pad_token` wont be used for training, as DataCollatorForGroupTextForCasualLMDataset pack sequences up to +# max_length but to avoid an error within DataCollatorForGroupTextForCasualLMDataset +tokenizer.pad_token = tokenizer.bos_token + +task = PreTrainedTokenizerFastSavingTask(pretrained_fast_tokenizer=tokenizer) diff --git a/configs/cyr/gpt/train_tokenizer/ukr-gpt.py b/configs/cyr/gpt/train_tokenizer/ukr-gpt.py new file mode 100644 index 0000000..8148942 --- /dev/null +++ b/configs/cyr/gpt/train_tokenizer/ukr-gpt.py @@ -0,0 +1,23 @@ +from itertools import islice + +from tokenizers import Tokenizer, decoders, models, pre_tokenizers, processors, trainers + +from language_model.data.extract import LineByLineSource +from language_model.tokenization.trainer import TrainTokenizerTask + +tokenizer = Tokenizer(models.BPE()) + +tokenizer.pre_tokenizer = pre_tokenizers.ByteLevel(add_prefix_space=True) +tokenizer.decoder = decoders.ByteLevel() +tokenizer.post_processor = processors.ByteLevel(trim_offsets=True) + + +TRAIN_DATA_PATH = "outputs/cyr/gpt/extract_texts/train-validation-open-data/train.txt" +NUM_TRAIN_LINES = 1_000_000 +TRAIN_SAMPLING_STEP = 200 +train_data_source = islice( + (line for i, line in enumerate(LineByLineSource(TRAIN_DATA_PATH)) if i % TRAIN_SAMPLING_STEP == 0), NUM_TRAIN_LINES +) +trainer = trainers.BpeTrainer(vocab_size=50264, special_tokens=["<|endoftext|>"]) + +task = TrainTokenizerTask(tokenizer=tokenizer, iterator=train_data_source, trainer=trainer) diff --git a/configs/ukr/train_model/ukr-roberta-base.py b/configs/ukr/train_model/ukr-roberta-base.py deleted file mode 100644 index fd0d8f0..0000000 --- a/configs/ukr/train_model/ukr-roberta-base.py +++ /dev/null @@ -1,23 +0,0 @@ -from transformers import RobertaConfig, RobertaForMaskedLM, RobertaTokenizer - -from language_model.modelling.trainer import RobertaForMaskedLMTrainTask - -_model_config = RobertaConfig( - vocab_size=52000, - max_position_embeddings=514, - num_attention_heads=12, - num_hidden_layers=12, - type_vocab_size=1, - intermediate_size=3072, -) - -_model = RobertaForMaskedLM(_model_config) - -_tokenizer = RobertaTokenizer.from_pretrained("outputs/ukr/train_tokenizer/ukr-roberta-base/tokenizer", max_len=512) - -task = RobertaForMaskedLMTrainTask( - file_path="data/ukr/aggregated_data/ukr-roberta-base/data.txt", - model=_model, - tokenizer=_tokenizer, - batch_size_per_gpu=40, -) diff --git a/configs/ukr/train_tokenizer/ukr-roberta-base.py b/configs/ukr/train_tokenizer/ukr-roberta-base.py deleted file mode 100644 index cf9beeb..0000000 --- a/configs/ukr/train_tokenizer/ukr-roberta-base.py +++ /dev/null @@ -1,11 +0,0 @@ -from tokenizers.implementations import ByteLevelBPETokenizer - -from language_model.tokenization.trainer import ByteLevelBPETokenizerTrainer - -task = ByteLevelBPETokenizerTrainer( - source_folder_path="data/ukr/data/wiki_oscar_data/", - tokenizer=ByteLevelBPETokenizer(), - vocab_size=52000, - min_frequency=5, - special_tokens=["", "", "", "", ""], -) diff --git a/requirements.txt b/requirements.txt index 67488bb..6df5490 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,12 @@ +bs4==0.0.1 +Cython==3.0.0a9 +datasets==1.11.0 +lxml==4.6.3 +more-itertools==8.9.0 +numpy==1.19.5 pyNlple==0.7.5 tokenizers==0.10.1 torch==1.8.1 transformers==4.4.2 +wget==3.2 +wikiextractor==3.0.4 diff --git a/requirements_installation.sh b/requirements_installation.sh index 7e94f07..1d13a40 100644 --- a/requirements_installation.sh +++ b/requirements_installation.sh @@ -9,6 +9,12 @@ pip install -r requirements.dev.txt pip install -v --no-cache-dir --global-option="--cpp_ext" --global-option="--cuda_ext" ./ ) +( + git clone https://github.com/mattilyra/LSH || { echo "Failed to download and install LSH"; exit 1; } + cd LSH && \ + python setup.py install +) + pip install -e . pre-commit install diff --git a/src/language_model/data/dataset.py b/src/language_model/data/dataset.py index 307915d..36ebf58 100644 --- a/src/language_model/data/dataset.py +++ b/src/language_model/data/dataset.py @@ -1,4 +1,5 @@ import itertools +import json import logging import math from itertools import chain @@ -6,7 +7,7 @@ import torch from torch._utils import _accumulate -from torch.utils.data import Dataset +from torch.utils.data import Dataset, IterableDataset from torch.utils.data.dataset import T_co from transformers import PreTrainedTokenizer @@ -76,7 +77,7 @@ def _read_chunk(self) -> Iterator[List[str]]: def __linit_entries__(self) -> Sequence[T_co]: logging.info(f"Creating features from dataset files: {self.file_paths}") - entries: List[List[Dict[str, torch.tensor]]] = [] + entries: List[List[Dict[str, torch.Tensor]]] = [] for lines in self._read_chunk(): entries.append(self._extract_batch(lines)) logging.info(f"Currently read total {sum(map(len, entries))} at end") @@ -126,3 +127,65 @@ def __len__(self) -> int: def split_lazy_dataset(dataset: LazyDataset, portions: Sequence[float]) -> List[LazySubset]: portions_provider = Portions(dataset=dataset, portions=portions) return [LazySubset(dataset, portions_provider=portions_provider, portion_id=i) for i in range(len(portions))] + + +class FromInputIdsIterableDataset(IterableDataset): + def __init__(self, input_ids_file_path: str): + super(FromInputIdsIterableDataset, self).__init__() + self.input_ids_file_path = input_ids_file_path + self.length = self._get_number_of_valid_lines() + + def _get_number_of_valid_lines(self) -> int: + number_of_valid_lines = 0 + for _ in self._read_lines(): + number_of_valid_lines += 1 + return number_of_valid_lines + + def _read_lines(self) -> Iterable[str]: + with open(self.input_ids_file_path, "r") as f: + for line in f: + line = line.strip() + if line: + yield line + + def __len__(self) -> int: + return self.length + + def __iter__(self) -> Iterator[List[int]]: + for line in self._read_lines(): + yield self._process(line) + + @staticmethod + def _process(line: str) -> List[int]: + return json.loads(line) # type: ignore + + +class FromInputIdsDataset(Dataset): + def __init__(self, input_ids_file_path: str): + self.data = [] + with open(input_ids_file_path, "r") as f: + for line in f: + line = line.strip() + if line: + self.data.append(self._process(line)) + + def __getitem__(self, index: int) -> List[int]: + return self.data[index] + + def __len__(self) -> int: + return len(self.data) + + @staticmethod + def _process(line: str) -> List[int]: + return json.loads(line) # type: ignore + + +class DataCollatorForGroupTextForCasualLMDataset: + def __init__(self, max_length: int): + self.max_length = max_length + + def __call__(self, examples: List[List[int]]) -> Dict[str, torch.Tensor]: + examples = [ids[: self.max_length] for ids in examples] + input_ids = torch.tensor(examples, dtype=torch.long) + labels = torch.tensor(examples, dtype=torch.long) + return {"input_ids": input_ids, "labels": labels} diff --git a/src/language_model/data/extract.py b/src/language_model/data/extract.py index 3f80bca..3692eee 100644 --- a/src/language_model/data/extract.py +++ b/src/language_model/data/extract.py @@ -1,17 +1,28 @@ -import io +import json import logging +import multiprocessing import os +import random from collections import Hashable as HashableType from collections import OrderedDict -from typing import Any, Callable, Dict, Hashable, Iterable, Iterator, Optional +from concurrent import futures +from concurrent.futures import ProcessPoolExecutor +from math import ceil +from pathlib import Path +from typing import Any, Callable, Dict, Generator, Hashable, Iterable, Iterator, List, Optional, Union +import numpy as np +from bs4 import BeautifulSoup from ds_shared.loading import load_pickle -from pynlple.data.corpus import FilteringSource, JsonFieldSource, MappingSource, SplittingSource, StackingSource +from more_itertools import chunked +from pynlple.data.corpus import FilteringSource, JsonFieldSource, MappingSource, StackingSource from pynlple.data.filesource import FilePathSource from pynlple.data.source import Source -from pynlple.processing.preprocessor import BoldTagReplacer, IPreprocessor, StackingPreprocessor +from pynlple.processing.preprocessor import IPreprocessor +from transformers import PreTrainedTokenizer, PreTrainedTokenizerFast from ..pipeline import ITask +from .utils import write_to_texts_file, write_to_train_val_files MIN_TEXT_LEN = 10 MIN_TEXT_TOKEN_LENGTH = 2 @@ -74,6 +85,37 @@ def __iter__(self) -> Iterator[Any]: logging.info(f"f={id(self.feature_extractor)} skipped {skipped}/{total}") +class LineByLineSource(Source): + def __init__(self, text_filepath: str) -> None: + super().__init__() + self.text_filepath = text_filepath + + def __iter__(self) -> Iterator[str]: + with open(self.text_filepath, "r") as f: + for line in f: + line = line.strip() + if line: + yield line + + +class ShuffledSources(Source): + def __init__(self, *sources: Generator[str, None, None]) -> None: + self.sources = list(sources) + + def __iter__(self) -> Iterable[str]: + return self + + def __next__(self) -> str: + if not self.sources: + raise StopIteration + source_id = random.choice(range(len(self.sources))) + try: + return next(iter(self.sources[source_id])) + except StopIteration: + self.sources.pop(source_id) + return next(self) + + class PickleDataSource(Source): def __init__(self, pickle_filepath: str) -> None: super().__init__() @@ -83,24 +125,29 @@ def __iter__(self) -> Iterator[Any]: return iter(load_pickle(self.pickle_filepath)) -class ExtractTextsFromData(ITask): - def __init__( - self, - source_folder_paths: Iterable[str], - preprocessor: Optional[IPreprocessor] = None, - min_text_length: int = MIN_TEXT_LEN, - min_text_token_length: int = MIN_TEXT_TOKEN_LENGTH, - ) -> None: +class PostWikiExtractorDataSource(Source): + """Provides wiki articles preprocessed by `python -m wikiextractor.WikiExtractor dump.xml.bz2 ...`""" + + def __init__(self, article_dir: str) -> None: super().__init__() - preprocessors = [BoldTagReplacer()] - if preprocessor is not None: - preprocessors.append(preprocessor) - self.preprocessor = StackingPreprocessor(preprocessor_list=preprocessors) - self.min_text_length = min_text_length - self.min_text_token_length = min_text_token_length + self.article_dir = Path(article_dir) + + def __iter__(self) -> Iterator[Any]: + for subdir in self.article_dir.glob("*"): + for file in subdir.glob("*"): + with open(file, "r") as f: + data = f.read() + soup = BeautifulSoup(data, "lxml") + for doc in soup.find_all("doc"): + yield {"id": doc["id"], "title": doc["title"], "url": doc["url"], "text": doc.text} + + +class FromLoadedYsDataSource(Source): + def __init__(self, source_folder_paths: Iterable[str], preprocessor: Optional[IPreprocessor] = None): self.source_folder_paths = source_folder_paths + self.preprocessor = preprocessor - def execute(self, environment_path: str) -> None: + def __iter__(self) -> Iterator[str]: filepath_source = FilePathSource(paths=self.source_folder_paths, extension_suffix=".p") json_data_source = StackingSource([PickleDataSource(path) for path in filepath_source]) subtitles_filtering_source = FilteringSource( @@ -110,31 +157,164 @@ def execute(self, environment_path: str) -> None: subtitles_filtering_source, cache_size=100000, refresh=False, feature_extractor=pick_text_hash, log=20000 ) text_source = JsonFieldSource(text_hash_filtered_source, key="text", default="") - line_text_source = SplittingSource(text_source, splitting_function=str.splitlines) - processed_text_source = MappingSource(line_text_source, function=self.preprocessor.preprocess) - short_text_filtered_source = FilteringSource( - processed_text_source, + if self.preprocessor is not None: + yield from MappingSource(text_source, function=self.preprocessor.preprocess) + else: + yield from text_source + + +class ExtractTextsFromData(ITask): + def __init__( + self, + text_source: Iterable[str], + preprocessor: Optional[IPreprocessor] = None, + min_text_length: int = MIN_TEXT_LEN, + min_text_token_length: int = MIN_TEXT_TOKEN_LENGTH, + cache_size: int = 100_000, + ) -> None: + super().__init__() + self.preprocessor = preprocessor + self.min_text_length = min_text_length + self.min_text_token_length = min_text_token_length + self.text_source = text_source + self.cache_size = cache_size + + def execute(self, environment_path: str) -> None: + self._write_to_file(self._deduplicate(self._filter(self._preprocess())), environment_path=environment_path) + + def _preprocess(self) -> Union[Source, Iterable[str]]: + if self.preprocessor is not None: + return MappingSource(self.text_source, function=self.preprocessor.preprocess) + return self.text_source + + def _filter(self, preprocessed_source: Union[Source, Iterable[str]]) -> Source: + return FilteringSource( + preprocessed_source, condition=lambda x: len(x) >= self.min_text_length and len(x.split()) >= self.min_text_token_length, ) + + def _deduplicate(self, filtered_source: Source) -> Iterable[str]: left_bound_duplicate_filtered_source = CacheDeduplicatingSource( - short_text_filtered_source, - cache_size=100000, + filtered_source, + cache_size=self.cache_size, refresh=False, feature_extractor=lambda text: str(text[:50]), log=10000, ) right_bound_duplicate_filtered_source = CacheDeduplicatingSource( left_bound_duplicate_filtered_source, - cache_size=100000, + cache_size=self.cache_size, refresh=False, feature_extractor=lambda text: str(text[-50:]), log=10000, ) - output_file_path = os.path.join(environment_path, "texts.txt") - lines = 0 - with io.open(output_file_path, mode="wt", encoding="utf-8") as output_stream: - for line in right_bound_duplicate_filtered_source: - output_stream.write(line) - output_stream.write("\n") - lines += 1 - logging.info(f"Completed extraction of texts: {lines} lines written to file.") + return right_bound_duplicate_filtered_source + + def _write_to_file(self, texts: Iterable[str], environment_path: str) -> None: + return write_to_texts_file(texts, environment_path) + + +class RandomSplitTextsFromData(ExtractTextsFromData): + def __init__( + self, + text_source: Iterable[str], + preprocessor: Optional[IPreprocessor] = None, + min_text_length: int = MIN_TEXT_LEN, + min_text_token_length: int = MIN_TEXT_TOKEN_LENGTH, + seeds: Union[int, np.ndarray] = 100, + test_size: Union[float, int] = 0.1, + ) -> None: + super().__init__(text_source, preprocessor, min_text_length, min_text_token_length, seeds) + # if test_size > 1 (absolute number) there is a probability that we won't reach this number + # if size of full dataset is twice less or approximately equal than test_size + self.test_ratio = 0.5 if test_size > 1 else test_size + self.test_size = test_size if test_size > 1 else float("inf") + + def _write_to_file(self, texts: Iterable[str], environment_path: str) -> None: + return write_to_train_val_files( + texts, environment_path, test_ratio=self.test_ratio, test_size=self.test_size # type: ignore + ) + + +class ExtractVectorsFromTexts(ITask): + def __init__( + self, + data_source: Iterable[str], + tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast], + block_size: int, + workers: int = -1, + process_batch_size: int = 8192, + ): + self.workers = multiprocessing.cpu_count() if workers == -1 else workers + self.tokenizer = tokenizer + self.block_size = block_size + self.data_source = data_source + self.process_batch_size = process_batch_size + + def execute(self, environment_path: str) -> None: + input_ids_file = os.path.join(environment_path, "processed_batch.jsonl") + if os.path.exists(input_ids_file): + raise FileExistsError(f"{input_ids_file} already exists") + + counter = 1 + for lines in self._read_chunk(): + batch_size = ceil(len(lines) / self.workers) + batched_lines = chunked(lines, batch_size) + + with open(input_ids_file, "a") as fp: + with ProcessPoolExecutor(max_workers=self.workers) as executor: + tasks = [ + executor.submit(self._extract_batch, batch_lines, self.tokenizer, self.block_size) + for batch_lines in batched_lines + ] + for completed_task in futures.as_completed(tasks): + for input_ids in completed_task.result(): + fp.write(json.dumps(input_ids)) + fp.write("\n") + logging.info(f"Currently extracted {counter} batches of size {self.process_batch_size}") + counter += 1 + + logging.info("Vectors extracted") + + def _read_chunk(self) -> Iterator[List[str]]: + lines: List[str] = [] + for line in self.data_source: + if len(line) > 0 and not line.isspace(): + lines.append(line) + + if len(lines) == self.process_batch_size: + yield lines + lines = [] + if len(lines) > 0: + yield lines + + @staticmethod + def _extract_batch( + lines: List[str], tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast], block_size: int + ) -> List[List[int]]: + batch_encoding: List[List[int]] = [] + current_line = [tokenizer.bos_token] + for line in lines: + tokens = tokenizer.tokenize(line) + if len(current_line) + len(tokens) + 1 <= block_size: + current_line.append(tokenizer.bos_token) + current_line.extend(tokens) + elif len(current_line) == block_size: + input_ids = tokenizer.convert_tokens_to_ids(current_line) + batch_encoding.append(input_ids) + current_line = [tokenizer.bos_token] + tokens + else: + current_line.append(tokenizer.bos_token) + n_tokens_to_add = block_size - len(current_line) + current_line.extend(tokens[:n_tokens_to_add]) + input_ids = tokenizer.convert_tokens_to_ids(current_line) + batch_encoding.append(input_ids) + + tokens = tokens[n_tokens_to_add:] + while len(tokens) >= block_size: + input_ids = tokenizer.convert_tokens_to_ids(tokens[:block_size]) + batch_encoding.append(input_ids) + tokens = tokens[block_size:] + + current_line = tokens + return batch_encoding diff --git a/src/language_model/data/fuzzy_dedup.py b/src/language_model/data/fuzzy_dedup.py new file mode 100644 index 0000000..5d9ea7d --- /dev/null +++ b/src/language_model/data/fuzzy_dedup.py @@ -0,0 +1,133 @@ +import gc +import multiprocessing +from concurrent.futures import ProcessPoolExecutor +from itertools import islice +from typing import Iterable, List, Set, Tuple, Union + +import numpy as np + +try: + from lsh import cache, minhash +except ImportError: + cache, minhash = None, None + + +class MinHashLSHDeduplicator: + def __init__(self, seeds: Union[int, np.ndarray], char_ngram: int, bands: int, workers: int = -1): + if cache is None or minhash is None: + raise ImportError( + "It seems like you do not have lsh package. To use 'MinHashLSHDeduplicator' you need install it: " + "$git clone https://github.com/mattilyra/LSH " + "$cd LSH && python setup.py install" + ) + hasher = minhash.MinHasher(seeds=seeds, char_ngram=char_ngram, random_state=42) + self.lsh_cache = cache.Cache(num_bands=bands, hasher=hasher) + self.workers = multiprocessing.cpu_count() if workers == -1 else workers + + def deduplicate(self, docs: List[str], min_jaccard: float, clear: bool = True) -> List[str]: + if clear: + self.lsh_cache.clear() + + duplicate_ids: Set[int] = set() + keep_form_duplicate_ids = set() + for i, j in self.get_all_duplicates(docs, min_jaccard): + if i not in duplicate_ids and j not in duplicate_ids: + keep_form_duplicate_ids.add(i) + elif i in keep_form_duplicate_ids and j in keep_form_duplicate_ids: + keep_form_duplicate_ids.remove(j) + duplicate_ids.add(i) + duplicate_ids.add(j) + + keep = set(range(len(docs))) - duplicate_ids | keep_form_duplicate_ids + + return [docs[i] for i in keep] + + def in_memory_batch_deduplicate(self, docs: Iterable[str], min_jaccard: float, batch_size: int) -> Iterable[str]: + batch_docs = list(islice(docs, batch_size)) + while batch_docs: + batch_docs = self.deduplicate(batch_docs, min_jaccard=min_jaccard, clear=True) + gc.collect() + add_into_batch = batch_size - len(batch_docs) + if add_into_batch > 0: + new_docs = list(islice(docs, add_into_batch)) + if not new_docs: + break + batch_docs.extend(new_docs) + else: + yield from batch_docs + batch_docs = list(islice(docs, batch_size)) + yield from batch_docs + + def lsh_batch_deduplicate( + self, docs: Iterable[str], min_jaccard: float, batch_size: int, clear: bool = True + ) -> Iterable[str]: + """ + batch_size: max size of docs will be deduplicated at time, while `batch_docs` list will be extended + until `batch_size` unique docs will be collected into it + """ + if clear: + self.lsh_cache.clear() + + start_id, end_id = 0, batch_size + keep_form_duplicate_ids: Set[int] = set() + duplicate_ids: Set[int] = set() + + batch_docs = list(islice(docs, batch_size)) + + while batch_docs: + + new_duplicate_ids = set() + # batch_docs[start_id:] to process just recently appended docs, + # start_id = start_id to set new ids for recently appended docs + for i, j in self.get_all_duplicates(batch_docs[start_id:], min_jaccard, start_id=start_id): + if i not in duplicate_ids and j not in duplicate_ids: + keep_form_duplicate_ids.add(i) + elif i in keep_form_duplicate_ids and j in keep_form_duplicate_ids: + keep_form_duplicate_ids.remove(j) + duplicate_ids.add(i) + duplicate_ids.add(j) + new_duplicate_ids.add(i) + new_duplicate_ids.add(j) + + # new_duplicate_ids - to clear duplicates from recently appended + # docs, as previous duplicate ids have already cleared + drop_ids = new_duplicate_ids - keep_form_duplicate_ids + + if drop_ids: + for i in drop_ids: + self.lsh_cache.remove_id(i) + + start_id = end_id + end_id = start_id + len(drop_ids) + else: + # all non duplicated + keep_form_duplicate_ids + for i in set(range(len(batch_docs))) - duplicate_ids | keep_form_duplicate_ids: + yield batch_docs[i] + + # new batch + self.lsh_cache.clear() + start_id, end_id = 0, batch_size + duplicate_ids, keep_form_duplicate_ids = set(), set() + batch_docs = [] + + # if drop_ids: we append next len(drop_ids) examples + batch_docs.extend(islice(docs, end_id - start_id)) + + if batch_docs: + # all non duplicated + keep_form_duplicate_ids + for i in set(range(len(batch_docs))) - duplicate_ids | keep_form_duplicate_ids: + yield batch_docs[i] + + def get_all_duplicates(self, docs: Iterable[str], min_jaccard: float, start_id: int = 0) -> Set[Tuple[int, int]]: + self._cache_texts_parallel(docs, start_id=start_id) + return self.lsh_cache.get_all_duplicates(min_jaccard) # type: ignore + + def _cache_texts(self, docs: Iterable[str], start_id: int = 0) -> None: + for i, doc in enumerate(docs, start_id): + self.lsh_cache.add_doc(doc, i) + + def _cache_texts_parallel(self, docs: Iterable[str], start_id: int = 0) -> None: + encoded_docs = (doc.encode("utf8") for doc in docs) + with ProcessPoolExecutor(max_workers=self.workers) as executor: + for i, fingerprint in enumerate(executor.map(self.lsh_cache.hasher.fingerprint, encoded_docs), start_id): + self.lsh_cache.add_fingerprint(fingerprint, i) diff --git a/src/language_model/data/load.py b/src/language_model/data/load.py index ce1bfa5..2bbca11 100644 --- a/src/language_model/data/load.py +++ b/src/language_model/data/load.py @@ -3,6 +3,7 @@ import os from typing import Any, Callable, Dict, Optional +import wget from ds_shared.download import YsDownloader from ds_shared.saving import save_pickle @@ -43,3 +44,12 @@ def execute(self, environment_path: str) -> None: mentions_chunk = downloader.download(self.topic_id, last_mention_id=last_mention_id) logging.info("Download completed.") + + +class WikiDownloadTask(ITask): + def __init__(self, url: str): + self.url = url + + def execute(self, environment_path: str) -> None: + wget.download(self.url, os.path.join(environment_path, self.url.split("/")[-1])) + logging.info("Download completed.") diff --git a/src/language_model/data/utils.py b/src/language_model/data/utils.py new file mode 100644 index 0000000..9f672f2 --- /dev/null +++ b/src/language_model/data/utils.py @@ -0,0 +1,41 @@ +import io +import logging +import os +import random +from typing import Iterable + + +def write_to_texts_file(texts: Iterable[str], environment_path: str) -> None: + output_file_path = os.path.join(environment_path, "texts.txt") + lines = 0 + with io.open(output_file_path, mode="wt", encoding="utf-8") as output_stream: + for line in texts: + output_stream.write(line) + output_stream.write("\n") + lines += 1 + logging.info(f"Completed extraction of texts: {lines} lines written to file.") + + +def write_to_train_val_files(texts: Iterable[str], environment_path: str, test_ratio: float, test_size: int) -> None: + train_file_path = os.path.join(environment_path, "train.txt") + validation_file_path = os.path.join(environment_path, "validation.txt") + train_lines = 0 + test_lines = 0 + train_stream = io.open(train_file_path, mode="wt", encoding="utf-8") + validation_stream = io.open(validation_file_path, mode="wt", encoding="utf-8") + for line in texts: + if test_ratio > random.random() and test_lines <= test_size: + validation_stream.write(line) + validation_stream.write("\n") + test_lines += 1 + else: + train_stream.write(line) + train_stream.write("\n") + train_lines += 1 + + validation_stream.close() + train_stream.close() + logging.info( + f"Completed extraction of texts: {test_lines} lines written to test file " + f"and {train_lines} lines written to train file" + ) diff --git a/src/language_model/tokenization/tasks.py b/src/language_model/tokenization/tasks.py index fcc1497..5a784f5 100644 --- a/src/language_model/tokenization/tasks.py +++ b/src/language_model/tokenization/tasks.py @@ -1,6 +1,7 @@ import os from tokenizers import Tokenizer +from transformers import PreTrainedTokenizerFast from ..pipeline import ITask from .factory import FAST_TOKENIZER_DEFAULT_FILE_NAME @@ -13,3 +14,17 @@ def __init__(self, fast_tokenizer: Tokenizer) -> None: def execute(self, environment_path: str) -> None: self.fast_tokenizer.save(os.path.join(environment_path, FAST_TOKENIZER_DEFAULT_FILE_NAME), pretty=True) + + +class PreTrainedTokenizerFastSavingTask(ITask): + def __init__( + self, pretrained_fast_tokenizer: PreTrainedTokenizerFast, tokenizer_folder_name: str = "tokenizer" + ) -> None: + super().__init__() + self.pretrained_fast_tokenizer = pretrained_fast_tokenizer + self.tokenizer_folder_name = tokenizer_folder_name + + def execute(self, environment_path: str) -> None: + self.pretrained_fast_tokenizer.save_pretrained( + os.path.join(environment_path, self.tokenizer_folder_name), legacy_format=False + ) diff --git a/src/language_model/tokenization/trainer.py b/src/language_model/tokenization/trainer.py index 0f5b0ae..db5863f 100644 --- a/src/language_model/tokenization/trainer.py +++ b/src/language_model/tokenization/trainer.py @@ -61,4 +61,4 @@ def __init__( def execute(self, environment_path: str) -> None: self.tokenizer.train_from_iterator(self.iterator, trainer=self.trainer) - self.tokenizer.save(path=os.path.join(environment_path, self.tokenizer_file_name), pretty=True) + self.tokenizer.save(os.path.join(environment_path, self.tokenizer_file_name), pretty=True)