diff --git a/batched/aio/inference/model_batch_processor.py b/batched/aio/inference/model_batch_processor.py index a460a3b..6464d8d 100644 --- a/batched/aio/inference/model_batch_processor.py +++ b/batched/aio/inference/model_batch_processor.py @@ -30,6 +30,7 @@ def __init__( cache: AsyncCache[dict[str, Feature], Feature] | None = None, max_batch_length: int | None = None, pad_tokens: Optional[dict[str, int]] = None, + padding_side: str = "right", priority_strategy: PriorityStrategy = PriorityStrategy.NONE, batch_item_cls: type[AsyncBatchItem[dict[str, Feature], Feature]] = AsyncBatchItem[dict[str, Feature], Feature], spread_kwargs: bool = False, @@ -45,6 +46,7 @@ def __init__( cache (AsyncCache | None): An optional cache for storing results. Defaults to None. max_batch_length (int | None): The maximum length of a batch. Defaults to None. pad_tokens (dict[str, int] | None): Dictionary of padding tokens for each feature. Defaults to None. + padding_side (str): Side to add padding tokens. Either "left" or "right". Defaults to "right". priority_strategy (PriorityStrategy): The strategy to use for prioritizing items. batch_item_cls (type[AsyncBatchItem]): The class to use for batch items. Defaults to AsyncBatchItem. spread_kwargs (bool): Whether to spread the kwargs over passing dict as args. Defaults to False. @@ -61,6 +63,7 @@ def __init__( ) self.pad_tokens = pad_tokens or {} + self.padding_side = padding_side self.spread_kwargs = spread_kwargs async def _process_batches(self): @@ -77,6 +80,7 @@ async def _process_batches(self): batch_inputs = stack_features( [item.content for item in batch], pad_tokens=self.pad_tokens, + padding_side=self.padding_side, ) batch_outputs = ( @@ -129,6 +133,7 @@ def dynamically( small_batch_threshold: int = 8, max_batch_length: int | None = None, pad_tokens: Optional[dict[str, int]] = None, + padding_side: str = "right", priority_strategy: PriorityStrategy = PriorityStrategy.NONE, cache: AsyncCache[dict[str, Feature], Feature] | None = None, batch_item_cls: type[AsyncBatchItem[dict[str, Feature], Feature]] = AsyncBatchItem[dict[str, Feature], Feature], @@ -144,6 +149,7 @@ def dynamically( small_batch_threshold (int): The threshold to give priority to small batches. Defaults to 8. max_batch_length (int | None): The maximum length of a batch. Defaults to None. pad_tokens (dict[str, int] | None): Padding token values for each input feature. Defaults to None. + padding_side (str): Side to add padding tokens. Either "left" or "right". Defaults to "right". priority_strategy (PriorityStrategy): The strategy to use for prioritizing items. cache (AsyncCache | None): An optional cache for storing results. batch_item_cls (type[AsyncBatchItem]): The class to use for batch items. Defaults to AsyncBatchItem. @@ -170,6 +176,7 @@ def make_processor(_func: BatchInfer) -> AsyncModelBatchProcessor: small_batch_threshold=small_batch_threshold, max_batch_length=max_batch_length, pad_tokens=pad_tokens, + padding_side=padding_side, priority_strategy=priority_strategy, cache=cache, batch_item_cls=batch_item_cls, diff --git a/batched/inference/helper.py b/batched/inference/helper.py index 239be46..14f5ab6 100644 --- a/batched/inference/helper.py +++ b/batched/inference/helper.py @@ -56,13 +56,18 @@ def torch_or_np(item: Any): raise ValueError(msg) -def stack_features(inputs: list[dict[str, Feature]], pad_tokens: dict[str, int]) -> dict[str, Feature]: +def stack_features( + inputs: list[dict[str, Feature]], + pad_tokens: dict[str, int], + padding_side: str = "right" +) -> dict[str, Feature]: """ Stack a list of model features into a single batch. Args: inputs (list[ModelFeatures]): List of input features to stack. pad_tokens (dict[str, int]): Dictionary of padding tokens for each feature. + padding_side (str): Side to add padding tokens. Either "left" or "right". Defaults to "right". Returns: ModelFeatures: Stacked features as a single batch. @@ -76,7 +81,13 @@ def stack_features(inputs: list[dict[str, Feature]], pad_tokens: dict[str, int]) for i, item in enumerate(inputs): for key, tensor in padded_tensors.items(): tensor_length = item[key].shape[0] - tensor[i, :tensor_length] = item[key] + if padding_side == "left": + # Left padding: fill from the right side (end of sequence) + start_idx = max_length - tensor_length + tensor[i, start_idx:] = item[key] + else: + # Right padding: fill from the left side (start of sequence) + tensor[i, :tensor_length] = item[key] return padded_tensors diff --git a/batched/inference/model_batch_processor.py b/batched/inference/model_batch_processor.py index bf4e7b0..0c430b2 100644 --- a/batched/inference/model_batch_processor.py +++ b/batched/inference/model_batch_processor.py @@ -27,6 +27,7 @@ def __init__( timeout_ms: float = 5.0, small_batch_threshold: int = 8, pad_tokens: Optional[dict[str, int]] = None, + padding_side: str = "right", spread_kwargs: bool = False, ): """ @@ -38,6 +39,7 @@ def __init__( timeout_ms (float): The timeout in milliseconds between batch generation attempts. Defaults to 5.0. small_batch_threshold (int): The threshold for considering a batch as small. Defaults to 8. pad_tokens (dict[str, int] | None): Dictionary of padding tokens for each feature. Defaults to None. + padding_side (str): Side to add padding tokens. Either "left" or "right". Defaults to "right". spread_kwargs (bool): Whether to spread the kwargs over passing dict as args. Defaults to False. """ super().__init__( @@ -48,6 +50,7 @@ def __init__( ) self.pad_tokens = pad_tokens or {} + self.padding_side = padding_side self.spread_kwargs = spread_kwargs def _process_batches(self): @@ -67,6 +70,7 @@ def _process_batches(self): batch_inputs = stack_features( [item.content for item in batch], pad_tokens=self.pad_tokens, + padding_side=self.padding_side, ) batch_outputs = self.batch_func(**batch_inputs) if self.spread_kwargs else self.batch_func(batch_inputs) @@ -144,6 +148,7 @@ def dynamically( timeout_ms: float = 5.0, small_batch_threshold: int = 8, pad_tokens: Optional[dict[str, int]] = None, + padding_side: str = "right", spread_kwargs: bool = False, ) -> Callable: """ @@ -159,6 +164,7 @@ def dynamically( timeout_ms (float): The timeout in milliseconds between batch generation attempts. Defaults to 5.0. small_batch_threshold (int): The threshold to give priority to small batches. Defaults to 8. pad_tokens (dict[str, int] | None): Dictionary of padding tokens for each feature. Defaults to None. + padding_side (str): Side to add padding tokens. Either "left" or "right". Defaults to "right". spread_kwargs (bool): Whether to spread the kwargs over passing dict as args. Defaults to False. Returns: @@ -193,6 +199,7 @@ def make_processor(_func: BatchInfer) -> ModelBatchProcessor: timeout_ms=timeout_ms, small_batch_threshold=small_batch_threshold, pad_tokens=pad_tokens, + padding_side=padding_side, spread_kwargs=spread_kwargs, ) diff --git a/tests/aio/inference/aio_inference_batch_processor_test.py b/tests/aio/inference/aio_inference_batch_processor_test.py index 7339f72..948edf5 100644 --- a/tests/aio/inference/aio_inference_batch_processor_test.py +++ b/tests/aio/inference/aio_inference_batch_processor_test.py @@ -83,3 +83,238 @@ async def batch_func(features: dict[str, Feature]) -> Feature: np.testing.assert_array_equal(result, np.array([[1, 2], [3, 4], [5, 6], [7, 8]])) assert processor.stats.total_batches == 2 assert processor.stats.total_processed == 4 + + +@pytest.mark.asyncio +async def test_async_model_batch_processor_with_padding_side_left(): + """Test AsyncModelBatchProcessor with left padding.""" + async def batch_func(features: dict[str, Feature]) -> Feature: + return features["input_ids"] + + processor = AsyncModelBatchProcessor( + batch_func, + batch_size=2, + pad_tokens={"input_ids": 0}, + padding_side="left" + ) + + # Use asyncio.gather to ensure concurrent execution + results = await asyncio.gather( + processor(input_ids=np.array([[1, 2, 3]])), + processor(input_ids=np.array([[4, 5]])) + ) + + # Both inputs should be batched together and padded to length 3 + # The second input should be left-padded: [4, 5] -> [0, 4, 5] + expected_results = [ + np.array([[1, 2, 3]]), + np.array([[0, 4, 5]]) # Left-padded + ] + + # Sort results by first element to ensure consistent ordering + results = sorted(results, key=lambda x: x[0, 0]) + expected_results = sorted(expected_results, key=lambda x: x[0, 0]) + + for result, expected in zip(results, expected_results): + np.testing.assert_array_equal(result, expected) + + +@pytest.mark.asyncio +async def test_async_model_batch_processor_with_padding_side_right(): + """Test AsyncModelBatchProcessor with explicit right padding.""" + async def batch_func(features: dict[str, Feature]) -> Feature: + return features["input_ids"] + + processor = AsyncModelBatchProcessor( + batch_func, + batch_size=3, + pad_tokens={"input_ids": 0}, + padding_side="right" + ) + + # Process different length sequences concurrently + results = await asyncio.gather( + processor(input_ids=np.array([[1, 2, 3]])), + processor(input_ids=np.array([[4, 5]])) + ) + + # The results should be padded correctly + expected_results = [ + np.array([[1, 2, 3]]), + np.array([[4, 5, 0]]) + ] + + # Check that we got the expected results (in any order) + assert len(results) == 2 + found_first = any(np.array_equal(r, expected_results[0]) for r in results) + found_second = any(np.array_equal(r, expected_results[1]) for r in results) + assert found_first and found_second, f"Expected results not found in {results}" + + +@pytest.mark.asyncio +async def test_async_model_batch_processor_multiple_keys_left_padding(): + """Test AsyncModelBatchProcessor with multiple keys and left padding.""" + async def batch_func(features: dict[str, Feature]) -> Feature: + return { + "input_ids": features["input_ids"], + "attention_mask": features["attention_mask"] + } + + processor = AsyncModelBatchProcessor( + batch_func, + batch_size=2, + pad_tokens={"input_ids": 0, "attention_mask": 0}, + padding_side="left" + ) + + # Use asyncio.gather to ensure concurrent execution + results = await asyncio.gather( + processor( + input_ids=np.array([[1, 2]]), + attention_mask=np.array([[1, 1]]) + ), + processor( + input_ids=np.array([[3, 4, 5]]), + attention_mask=np.array([[1, 1, 1]]) + ) + ) + + # Both inputs should be batched together and padded to length 3 + # The first input should be left-padded: [1, 2] -> [0, 1, 2] + expected_results = [ + { + "input_ids": np.array([[0, 1, 2]]), + "attention_mask": np.array([[0, 1, 1]]) + }, + { + "input_ids": np.array([[3, 4, 5]]), + "attention_mask": np.array([[1, 1, 1]]) + } + ] + + # Sort results by first element to ensure consistent ordering + results = sorted(results, key=lambda x: x["input_ids"][0, 0]) + expected_results = sorted(expected_results, key=lambda x: x["input_ids"][0, 0]) + + for result, expected in zip(results, expected_results): + np.testing.assert_array_equal(result["input_ids"], expected["input_ids"]) + np.testing.assert_array_equal(result["attention_mask"], expected["attention_mask"]) + + +@pytest.mark.asyncio +async def test_async_dynamically_decorator_with_padding_side_left(): + """Test async dynamically decorator with left padding.""" + @dynamically(batch_size=2, pad_tokens={"input_ids": 0}, padding_side="left") + async def batch_func(features: dict[str, Feature]) -> Feature: + return features["input_ids"] + + # Use asyncio.gather to ensure concurrent execution + results = await asyncio.gather( + batch_func(input_ids=np.array([[1, 2, 3]])), + batch_func(input_ids=np.array([[4, 5]])) + ) + + # Both inputs should be batched together and padded to length 3 + # The second input should be left-padded: [4, 5] -> [0, 4, 5] + expected_results = [ + np.array([[1, 2, 3]]), + np.array([[0, 4, 5]]) # Left-padded + ] + + # Sort results by first element to ensure consistent ordering + results = sorted(results, key=lambda x: x[0, 0]) + expected_results = sorted(expected_results, key=lambda x: x[0, 0]) + + for result, expected in zip(results, expected_results): + np.testing.assert_array_equal(result, expected) + + +@pytest.mark.asyncio +async def test_async_dynamically_decorator_with_padding_side_right(): + """Test async dynamically decorator with right padding.""" + @dynamically(batch_size=2, pad_tokens={"input_ids": 0}, padding_side="right") + async def batch_func(features: dict[str, Feature]) -> Feature: + return features["input_ids"] + + # Use asyncio.gather to ensure concurrent execution + results = await asyncio.gather( + batch_func(input_ids=np.array([[1, 2, 3]])), + batch_func(input_ids=np.array([[4, 5]])) + ) + + # Both inputs should be batched together and padded to length 3 + # The second input should be right-padded: [4, 5] -> [4, 5, 0] + expected_results = [ + np.array([[1, 2, 3]]), + np.array([[4, 5, 0]]) # Right-padded + ] + + # Sort results by first element to ensure consistent ordering + results = sorted(results, key=lambda x: x[0, 0]) + expected_results = sorted(expected_results, key=lambda x: x[0, 0]) + + for result, expected in zip(results, expected_results): + np.testing.assert_array_equal(result, expected) + + +@pytest.mark.asyncio +async def test_async_model_batch_processor_padding_side_initialization(): + """Test that padding_side is properly stored during initialization.""" + async def dummy_batch_func(features: dict[str, Feature]) -> Feature: + return features["input"] + + processor = AsyncModelBatchProcessor( + dummy_batch_func, + batch_size=32, + timeout_ms=5.0, + small_batch_threshold=8, + padding_side="left" + ) + + assert processor.padding_side == "left" + + # Test default + processor_default = AsyncModelBatchProcessor(dummy_batch_func) + assert processor_default.padding_side == "right" + + +@pytest.mark.asyncio +async def test_async_model_batch_processor_concurrent_calls_with_padding(): + """Test AsyncModelBatchProcessor with concurrent calls and left padding.""" + async def batch_func(features: dict[str, Feature]) -> Feature: + await asyncio.sleep(0.01) # Simulate async processing + return features["input_ids"] + + processor = AsyncModelBatchProcessor( + batch_func, + batch_size=5, + timeout_ms=50.0, + pad_tokens={"input_ids": 0}, + padding_side="left" + ) + + # Create inputs of different lengths + inputs = [ + np.array([[1, 2, 3]]), + np.array([[4, 5]]), + np.array([[6, 7, 8, 9]]), + np.array([[10, 11]]), + np.array([[12, 13, 14]]) + ] + + # Execute concurrent calls + results = await asyncio.gather(*[ + processor(input_ids=inp) for inp in inputs + ]) + + # Verify results - all should be left-padded to max length (4) + expected = [ + np.array([[0, 1, 2, 3]]), + np.array([[0, 0, 4, 5]]), + np.array([[6, 7, 8, 9]]), + np.array([[0, 0, 10, 11]]), + np.array([[0, 12, 13, 14]]) + ] + + for result, exp in zip(results, expected): + np.testing.assert_array_equal(result, exp) diff --git a/tests/inference/inference_batch_processor_test.py b/tests/inference/inference_batch_processor_test.py index 3e5d909..f4d487f 100644 --- a/tests/inference/inference_batch_processor_test.py +++ b/tests/inference/inference_batch_processor_test.py @@ -5,6 +5,7 @@ import numpy as np import torch from batched.inference.model_batch_processor import ModelBatchProcessor, dynamically +from batched.inference.helper import stack_features from batched.types import BatchProcessorStats, Feature @@ -172,3 +173,260 @@ def slow_batch_func(features: dict[str, Feature]) -> Feature: np.testing.assert_array_equal(result, np.array([[i * 2]])) assert processor.stats.total_batches == 2 assert processor.stats.total_processed == 10 + + +def test_stack_features_right_padding_default(): + """Test that stack_features uses right padding by default.""" + inputs = [ + {"input_ids": np.array([1, 2, 3])}, + {"input_ids": np.array([4, 5])}, + {"input_ids": np.array([6, 7, 8, 9])}, + ] + pad_tokens = {"input_ids": 0} + + result = stack_features(inputs, pad_tokens) + + expected = np.array([ + [1, 2, 3, 0], + [4, 5, 0, 0], + [6, 7, 8, 9] + ]) + + np.testing.assert_array_equal(result["input_ids"], expected) + + +def test_stack_features_left_padding(): + """Test stack_features with left padding.""" + inputs = [ + {"input_ids": np.array([1, 2, 3])}, + {"input_ids": np.array([4, 5])}, + {"input_ids": np.array([6, 7, 8, 9])}, + ] + pad_tokens = {"input_ids": 0} + + result = stack_features(inputs, pad_tokens, padding_side="left") + + expected = np.array([ + [0, 1, 2, 3], + [0, 0, 4, 5], + [6, 7, 8, 9] + ]) + + np.testing.assert_array_equal(result["input_ids"], expected) + + +def test_stack_features_multiple_keys_left_padding(): + """Test stack_features with multiple keys and left padding.""" + inputs = [ + {"input_ids": np.array([1, 2]), "attention_mask": np.array([1, 1])}, + {"input_ids": np.array([3, 4, 5]), "attention_mask": np.array([1, 1, 1])}, + ] + pad_tokens = {"input_ids": 0, "attention_mask": 0} + + result = stack_features(inputs, pad_tokens, padding_side="left") + + expected_input_ids = np.array([ + [0, 1, 2], + [3, 4, 5] + ]) + expected_attention_mask = np.array([ + [0, 1, 1], + [1, 1, 1] + ]) + + np.testing.assert_array_equal(result["input_ids"], expected_input_ids) + np.testing.assert_array_equal(result["attention_mask"], expected_attention_mask) + + +def test_stack_features_torch_tensors_left_padding(): + """Test stack_features with PyTorch tensors and left padding.""" + inputs = [ + {"input_ids": torch.tensor([1, 2, 3])}, + {"input_ids": torch.tensor([4, 5])}, + ] + pad_tokens = {"input_ids": 0} + + result = stack_features(inputs, pad_tokens, padding_side="left") + + expected = torch.tensor([ + [1, 2, 3], + [0, 4, 5] + ]) + + torch.testing.assert_close(result["input_ids"], expected) + + +def test_batch_processor_with_padding_side_left(): + """Test ModelBatchProcessor with left padding.""" + def batch_func(features: dict[str, Feature]) -> Feature: + return features["input_ids"] + + processor = ModelBatchProcessor( + batch_func, + batch_size=2, + pad_tokens={"input_ids": 0}, + padding_side="left" + ) + + import threading + + results = [] + def worker1(): + results.append(processor(input_ids=np.array([[1, 2, 3]]))) + + def worker2(): + results.append(processor(input_ids=np.array([[4, 5]]))) + + threads = [threading.Thread(target=worker1), threading.Thread(target=worker2)] + for t in threads: + t.start() + for t in threads: + t.join() + + # Both inputs should be batched together and padded to length 3 + # The second input should be left-padded: [4, 5] -> [0, 4, 5] + expected_results = [ + np.array([[1, 2, 3]]), + np.array([[0, 4, 5]]) # Left-padded + ] + + # Sort results by first element to ensure consistent ordering + results.sort(key=lambda x: x[0, 0]) + expected_results.sort(key=lambda x: x[0, 0]) + + for result, expected in zip(results, expected_results): + np.testing.assert_array_equal(result, expected) + + +def test_batch_processor_with_padding_side_right(): + """Test ModelBatchProcessor with explicit right padding.""" + def batch_func(features: dict[str, Feature]) -> Feature: + return features["input_ids"] + + processor = ModelBatchProcessor( + batch_func, + batch_size=2, + pad_tokens={"input_ids": 0}, + padding_side="right" + ) + + import threading + + results = [] + def worker1(): + results.append(processor(input_ids=np.array([[1, 2, 3]]))) + + def worker2(): + results.append(processor(input_ids=np.array([[4, 5]]))) + + threads = [threading.Thread(target=worker1), threading.Thread(target=worker2)] + for t in threads: + t.start() + for t in threads: + t.join() + + # Both inputs should be batched together and padded to length 3 + # The second input should be right-padded: [4, 5] -> [4, 5, 0] + expected_results = [ + np.array([[1, 2, 3]]), + np.array([[4, 5, 0]]) # Right-padded + ] + + # Sort results by first element to ensure consistent ordering + results.sort(key=lambda x: x[0, 0]) + expected_results.sort(key=lambda x: x[0, 0]) + + for result, expected in zip(results, expected_results): + np.testing.assert_array_equal(result, expected) + + +def test_batch_processor_padding_side_initialization(): + """Test that padding_side is properly stored during initialization.""" + def dummy_batch_func(features: dict[str, Feature]) -> Feature: + return features["input"] + + processor = ModelBatchProcessor( + dummy_batch_func, + batch_size=32, + timeout_ms=5.0, + small_batch_threshold=8, + padding_side="left" + ) + + assert processor.padding_side == "left" + + # Test default + processor_default = ModelBatchProcessor(dummy_batch_func) + assert processor_default.padding_side == "right" + + +def test_dynamically_decorator_with_padding_side_left(): + """Test dynamically decorator with left padding.""" + @dynamically(batch_size=2, pad_tokens={"input_ids": 0}, padding_side="left") + def batch_func(features: dict[str, Feature]) -> Feature: + return features["input_ids"] + + import threading + + results = [] + def worker1(): + results.append(batch_func(input_ids=np.array([[1, 2, 3]]))) + + def worker2(): + results.append(batch_func(input_ids=np.array([[4, 5]]))) + + threads = [threading.Thread(target=worker1), threading.Thread(target=worker2)] + for t in threads: + t.start() + for t in threads: + t.join() + + # Both inputs should be batched together and padded to length 3 + # The second input should be left-padded: [4, 5] -> [0, 4, 5] + expected_results = [ + np.array([[1, 2, 3]]), + np.array([[0, 4, 5]]) # Left-padded + ] + + # Sort results by first element to ensure consistent ordering + results.sort(key=lambda x: x[0, 0]) + expected_results.sort(key=lambda x: x[0, 0]) + + for result, expected in zip(results, expected_results): + np.testing.assert_array_equal(result, expected) + + +def test_dynamically_decorator_with_padding_side_right(): + """Test dynamically decorator with right padding.""" + @dynamically(batch_size=2, pad_tokens={"input_ids": 0}, padding_side="right") + def batch_func(features: dict[str, Feature]) -> Feature: + return features["input_ids"] + + import threading + + results = [] + def worker1(): + results.append(batch_func(input_ids=np.array([[1, 2, 3]]))) + + def worker2(): + results.append(batch_func(input_ids=np.array([[4, 5]]))) + + threads = [threading.Thread(target=worker1), threading.Thread(target=worker2)] + for t in threads: + t.start() + for t in threads: + t.join() + + # Both inputs should be batched together and padded to length 3 + # The second input should be right-padded: [4, 5] -> [4, 5, 0] + expected_results = [ + np.array([[1, 2, 3]]), + np.array([[4, 5, 0]]) # Right-padded + ] + + # Sort results by first element to ensure consistent ordering + results.sort(key=lambda x: x[0, 0]) + expected_results.sort(key=lambda x: x[0, 0]) + + for result, expected in zip(results, expected_results): + np.testing.assert_array_equal(result, expected)