diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index 16d211aef..df6a7f87d 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -88,6 +88,8 @@ from google.cloud.bigtable.data.row_filters import CellsRowLimitFilter from google.cloud.bigtable.data.row_filters import RowFilterChain from google.cloud.bigtable.data._metrics import BigtableClientSideMetricsController +from google.cloud.bigtable.data._metrics import OperationType +from google.cloud.bigtable.data._metrics import tracked_retry from google.cloud.bigtable.data._cross_sync import CrossSync @@ -1419,26 +1421,28 @@ async def sample_row_keys( retryable_excs = _get_retryable_errors(retryable_errors, self) predicate = retries.if_exception_type(*retryable_excs) - sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60) - - @CrossSync.convert - async def execute_rpc(): - results = await self.client._gapic_client.sample_row_keys( - request=SampleRowKeysRequest( - app_profile_id=self.app_profile_id, **self._request_path - ), - timeout=next(attempt_timeout_gen), - retry=None, + with self._metrics.create_operation( + OperationType.SAMPLE_ROW_KEYS + ) as operation_metric: + + @CrossSync.convert + async def execute_rpc(): + results = await self.client._gapic_client.sample_row_keys( + request=SampleRowKeysRequest( + app_profile_id=self.app_profile_id, **self._request_path + ), + timeout=next(attempt_timeout_gen), + retry=None, + ) + return [(s.row_key, s.offset_bytes) async for s in results] + + return await tracked_retry( + retry_fn=CrossSync.retry_target, + operation=operation_metric, + target=execute_rpc, + predicate=predicate, + timeout=operation_timeout, ) - return [(s.row_key, s.offset_bytes) async for s in results] - - return await CrossSync.retry_target( - execute_rpc, - predicate, - sleep_generator, - operation_timeout, - exception_factory=_retry_exception_factory, - ) @CrossSync.convert(replace_symbols={"MutationsBatcherAsync": "MutationsBatcher"}) def mutations_batcher( @@ -1549,28 +1553,29 @@ async def mutate_row( # mutations should not be retried predicate = retries.if_exception_type() - sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60) - - target = partial( - self.client._gapic_client.mutate_row, - request=MutateRowRequest( - row_key=row_key.encode("utf-8") - if isinstance(row_key, str) - else row_key, - mutations=[mutation._to_pb() for mutation in mutations_list], - app_profile_id=self.app_profile_id, - **self._request_path, - ), - timeout=attempt_timeout, - retry=None, - ) - return await CrossSync.retry_target( - target, - predicate, - sleep_generator, - operation_timeout, - exception_factory=_retry_exception_factory, - ) + with self._metrics.create_operation( + OperationType.MUTATE_ROW + ) as operation_metric: + target = partial( + self.client._gapic_client.mutate_row, + request=MutateRowRequest( + row_key=row_key.encode("utf-8") + if isinstance(row_key, str) + else row_key, + mutations=[mutation._to_pb() for mutation in mutations_list], + app_profile_id=self.app_profile_id, + **self._request_path, + ), + timeout=attempt_timeout, + retry=None, + ) + return await tracked_retry( + retry_fn=CrossSync.retry_target, + operation=operation_metric, + target=target, + predicate=predicate, + timeout=operation_timeout, + ) @CrossSync.convert async def bulk_mutate_rows( @@ -1681,21 +1686,25 @@ async def check_and_mutate_row( ): false_case_mutations = [false_case_mutations] false_case_list = [m._to_pb() for m in false_case_mutations or []] - result = await self.client._gapic_client.check_and_mutate_row( - request=CheckAndMutateRowRequest( - true_mutations=true_case_list, - false_mutations=false_case_list, - predicate_filter=predicate._to_pb() if predicate is not None else None, - row_key=row_key.encode("utf-8") - if isinstance(row_key, str) - else row_key, - app_profile_id=self.app_profile_id, - **self._request_path, - ), - timeout=operation_timeout, - retry=None, - ) - return result.predicate_matched + + with self._metrics.create_operation(OperationType.CHECK_AND_MUTATE): + result = await self.client._gapic_client.check_and_mutate_row( + request=CheckAndMutateRowRequest( + true_mutations=true_case_list, + false_mutations=false_case_list, + predicate_filter=predicate._to_pb() + if predicate is not None + else None, + row_key=row_key.encode("utf-8") + if isinstance(row_key, str) + else row_key, + app_profile_id=self.app_profile_id, + **self._request_path, + ), + timeout=operation_timeout, + retry=None, + ) + return result.predicate_matched @CrossSync.convert async def read_modify_write_row( @@ -1735,20 +1744,22 @@ async def read_modify_write_row( rules = [rules] if not rules: raise ValueError("rules must contain at least one item") - result = await self.client._gapic_client.read_modify_write_row( - request=ReadModifyWriteRowRequest( - rules=[rule._to_pb() for rule in rules], - row_key=row_key.encode("utf-8") - if isinstance(row_key, str) - else row_key, - app_profile_id=self.app_profile_id, - **self._request_path, - ), - timeout=operation_timeout, - retry=None, - ) - # construct Row from result - return Row._from_pb(result.row) + + with self._metrics.create_operation(OperationType.READ_MODIFY_WRITE): + result = await self.client._gapic_client.read_modify_write_row( + request=ReadModifyWriteRowRequest( + rules=[rule._to_pb() for rule in rules], + row_key=row_key.encode("utf-8") + if isinstance(row_key, str) + else row_key, + app_profile_id=self.app_profile_id, + **self._request_path, + ), + timeout=operation_timeout, + retry=None, + ) + # construct Row from result + return Row._from_pb(result.row) @CrossSync.convert async def close(self): diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index e848ebc6f..13bcfcc29 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -103,6 +103,7 @@ def _retry_exception_factory( tuple[Exception, Exception|None]: tuple of the exception to raise, and a cause exception if applicable """ + exc_list = exc_list.copy() if reason == RetryFailureReason.TIMEOUT: timeout_val_str = f"of {timeout_val:0.1f}s " if timeout_val is not None else "" # if failed due to timeout, raise deadline exceeded as primary exception diff --git a/google/cloud/bigtable/data/_sync_autogen/client.py b/google/cloud/bigtable/data/_sync_autogen/client.py index 6be50079b..b72543ebb 100644 --- a/google/cloud/bigtable/data/_sync_autogen/client.py +++ b/google/cloud/bigtable/data/_sync_autogen/client.py @@ -75,6 +75,8 @@ from google.cloud.bigtable.data.row_filters import CellsRowLimitFilter from google.cloud.bigtable.data.row_filters import RowFilterChain from google.cloud.bigtable.data._metrics import BigtableClientSideMetricsController +from google.cloud.bigtable.data._metrics import OperationType +from google.cloud.bigtable.data._metrics import tracked_retry from google.cloud.bigtable.data._cross_sync import CrossSync from typing import Iterable from grpc import insecure_channel @@ -1169,25 +1171,27 @@ def sample_row_keys( ) retryable_excs = _get_retryable_errors(retryable_errors, self) predicate = retries.if_exception_type(*retryable_excs) - sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60) - - def execute_rpc(): - results = self.client._gapic_client.sample_row_keys( - request=SampleRowKeysRequest( - app_profile_id=self.app_profile_id, **self._request_path - ), - timeout=next(attempt_timeout_gen), - retry=None, + with self._metrics.create_operation( + OperationType.SAMPLE_ROW_KEYS + ) as operation_metric: + + def execute_rpc(): + results = self.client._gapic_client.sample_row_keys( + request=SampleRowKeysRequest( + app_profile_id=self.app_profile_id, **self._request_path + ), + timeout=next(attempt_timeout_gen), + retry=None, + ) + return [(s.row_key, s.offset_bytes) for s in results] + + return tracked_retry( + retry_fn=CrossSync._Sync_Impl.retry_target, + operation=operation_metric, + target=execute_rpc, + predicate=predicate, + timeout=operation_timeout, ) - return [(s.row_key, s.offset_bytes) for s in results] - - return CrossSync._Sync_Impl.retry_target( - execute_rpc, - predicate, - sleep_generator, - operation_timeout, - exception_factory=_retry_exception_factory, - ) def mutations_batcher( self, @@ -1288,27 +1292,29 @@ def mutate_row( ) else: predicate = retries.if_exception_type() - sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60) - target = partial( - self.client._gapic_client.mutate_row, - request=MutateRowRequest( - row_key=row_key.encode("utf-8") - if isinstance(row_key, str) - else row_key, - mutations=[mutation._to_pb() for mutation in mutations_list], - app_profile_id=self.app_profile_id, - **self._request_path, - ), - timeout=attempt_timeout, - retry=None, - ) - return CrossSync._Sync_Impl.retry_target( - target, - predicate, - sleep_generator, - operation_timeout, - exception_factory=_retry_exception_factory, - ) + with self._metrics.create_operation( + OperationType.MUTATE_ROW + ) as operation_metric: + target = partial( + self.client._gapic_client.mutate_row, + request=MutateRowRequest( + row_key=row_key.encode("utf-8") + if isinstance(row_key, str) + else row_key, + mutations=[mutation._to_pb() for mutation in mutations_list], + app_profile_id=self.app_profile_id, + **self._request_path, + ), + timeout=attempt_timeout, + retry=None, + ) + return tracked_retry( + retry_fn=CrossSync._Sync_Impl.retry_target, + operation=operation_metric, + target=target, + predicate=predicate, + timeout=operation_timeout, + ) def bulk_mutate_rows( self, @@ -1412,21 +1418,24 @@ def check_and_mutate_row( ): false_case_mutations = [false_case_mutations] false_case_list = [m._to_pb() for m in false_case_mutations or []] - result = self.client._gapic_client.check_and_mutate_row( - request=CheckAndMutateRowRequest( - true_mutations=true_case_list, - false_mutations=false_case_list, - predicate_filter=predicate._to_pb() if predicate is not None else None, - row_key=row_key.encode("utf-8") - if isinstance(row_key, str) - else row_key, - app_profile_id=self.app_profile_id, - **self._request_path, - ), - timeout=operation_timeout, - retry=None, - ) - return result.predicate_matched + with self._metrics.create_operation(OperationType.CHECK_AND_MUTATE): + result = self.client._gapic_client.check_and_mutate_row( + request=CheckAndMutateRowRequest( + true_mutations=true_case_list, + false_mutations=false_case_list, + predicate_filter=predicate._to_pb() + if predicate is not None + else None, + row_key=row_key.encode("utf-8") + if isinstance(row_key, str) + else row_key, + app_profile_id=self.app_profile_id, + **self._request_path, + ), + timeout=operation_timeout, + retry=None, + ) + return result.predicate_matched def read_modify_write_row( self, @@ -1463,19 +1472,20 @@ def read_modify_write_row( rules = [rules] if not rules: raise ValueError("rules must contain at least one item") - result = self.client._gapic_client.read_modify_write_row( - request=ReadModifyWriteRowRequest( - rules=[rule._to_pb() for rule in rules], - row_key=row_key.encode("utf-8") - if isinstance(row_key, str) - else row_key, - app_profile_id=self.app_profile_id, - **self._request_path, - ), - timeout=operation_timeout, - retry=None, - ) - return Row._from_pb(result.row) + with self._metrics.create_operation(OperationType.READ_MODIFY_WRITE): + result = self.client._gapic_client.read_modify_write_row( + request=ReadModifyWriteRowRequest( + rules=[rule._to_pb() for rule in rules], + row_key=row_key.encode("utf-8") + if isinstance(row_key, str) + else row_key, + app_profile_id=self.app_profile_id, + **self._request_path, + ), + timeout=operation_timeout, + retry=None, + ) + return Row._from_pb(result.row) def close(self): """Called to close the Table instance and release any resources held by it.""" diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 8c0eb30b1..0bfe7be84 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -23,10 +23,6 @@ script_path = os.path.dirname(os.path.realpath(__file__)) sys.path.append(script_path) -pytest_plugins = [ - "data.setup_fixtures", -] - @pytest.fixture(scope="session") def event_loop(): diff --git a/tests/system/data/__init__.py b/tests/system/data/__init__.py index 2b35cea8f..6f836fb96 100644 --- a/tests/system/data/__init__.py +++ b/tests/system/data/__init__.py @@ -13,7 +13,242 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import pytest +import os +import uuid TEST_FAMILY = "test-family" TEST_FAMILY_2 = "test-family-2" TEST_AGGREGATE_FAMILY = "test-aggregate-family" + +# authorized view subset to allow all qualifiers +ALLOW_ALL = "" +ALL_QUALIFIERS = {"qualifier_prefixes": [ALLOW_ALL]} + + +class SystemTestRunner: + """ + configures a system test class with configuration for clusters/tables/etc + + used by standard system tests, and metrics tests + """ + + @pytest.fixture(scope="session") + def init_table_id(self): + """ + The table_id to use when creating a new test table + """ + return f"test-table-{uuid.uuid4().hex}" + + @pytest.fixture(scope="session") + def cluster_config(self, project_id): + """ + Configuration for the clusters to use when creating a new instance + """ + from google.cloud.bigtable_admin_v2 import types + + cluster = { + "test-cluster": types.Cluster( + location=f"projects/{project_id}/locations/us-central1-b", + serve_nodes=1, + ) + } + return cluster + + @pytest.fixture(scope="session") + def column_family_config(self): + """ + specify column families to create when creating a new test table + """ + from google.cloud.bigtable_admin_v2 import types + + int_aggregate_type = types.Type.Aggregate( + input_type=types.Type(int64_type={"encoding": {"big_endian_bytes": {}}}), + sum={}, + ) + return { + TEST_FAMILY: types.ColumnFamily(), + TEST_FAMILY_2: types.ColumnFamily(), + TEST_AGGREGATE_FAMILY: types.ColumnFamily( + value_type=types.Type(aggregate_type=int_aggregate_type) + ), + } + + @pytest.fixture(scope="session") + def admin_client(self): + """ + Client for interacting with Table and Instance admin APIs + """ + from google.cloud.bigtable.client import Client + + client = Client(admin=True) + yield client + + @pytest.fixture(scope="session") + def instance_id(self, admin_client, project_id, cluster_config): + """ + Returns BIGTABLE_TEST_INSTANCE if set, otherwise creates a new temporary instance for the test session + """ + from google.cloud.bigtable_admin_v2 import types + from google.api_core import exceptions + from google.cloud.environment_vars import BIGTABLE_EMULATOR + + # use user-specified instance if available + user_specified_instance = os.getenv("BIGTABLE_TEST_INSTANCE") + if user_specified_instance: + print("Using user-specified instance: {}".format(user_specified_instance)) + yield user_specified_instance + return + + # create a new temporary test instance + instance_id = f"python-bigtable-tests-{uuid.uuid4().hex[:6]}" + if os.getenv(BIGTABLE_EMULATOR): + # don't create instance if in emulator mode + yield instance_id + else: + try: + operation = admin_client.instance_admin_client.create_instance( + parent=f"projects/{project_id}", + instance_id=instance_id, + instance=types.Instance( + display_name="Test Instance", + # labels={"python-system-test": "true"}, + ), + clusters=cluster_config, + ) + operation.result(timeout=240) + except exceptions.AlreadyExists: + pass + yield instance_id + admin_client.instance_admin_client.delete_instance( + name=f"projects/{project_id}/instances/{instance_id}" + ) + + @pytest.fixture(scope="session") + def column_split_config(self): + """ + specify initial splits to create when creating a new test table + """ + return [(num * 1000).to_bytes(8, "big") for num in range(1, 10)] + + @pytest.fixture(scope="session") + def table_id( + self, + admin_client, + project_id, + instance_id, + column_family_config, + init_table_id, + column_split_config, + ): + """ + Returns BIGTABLE_TEST_TABLE if set, otherwise creates a new temporary table for the test session + + Args: + - admin_client: Client for interacting with the Table Admin API. Supplied by the admin_client fixture. + - project_id: The project ID of the GCP project to test against. Supplied by the project_id fixture. + - instance_id: The ID of the Bigtable instance to test against. Supplied by the instance_id fixture. + - init_column_families: A list of column families to initialize the table with, if pre-initialized table is not given with BIGTABLE_TEST_TABLE. + Supplied by the init_column_families fixture. + - init_table_id: The table ID to give to the test table, if pre-initialized table is not given with BIGTABLE_TEST_TABLE. + Supplied by the init_table_id fixture. + - column_split_config: A list of row keys to use as initial splits when creating the test table. + """ + from google.api_core import exceptions + from google.api_core import retry + + # use user-specified instance if available + user_specified_table = os.getenv("BIGTABLE_TEST_TABLE") + if user_specified_table: + print("Using user-specified table: {}".format(user_specified_table)) + yield user_specified_table + return + + retry = retry.Retry( + predicate=retry.if_exception_type(exceptions.FailedPrecondition) + ) + try: + parent_path = f"projects/{project_id}/instances/{instance_id}" + print(f"Creating table: {parent_path}/tables/{init_table_id}") + admin_client.table_admin_client.create_table( + request={ + "parent": parent_path, + "table_id": init_table_id, + "table": {"column_families": column_family_config}, + "initial_splits": [{"key": key} for key in column_split_config], + }, + retry=retry, + ) + except exceptions.AlreadyExists: + pass + yield init_table_id + print(f"Deleting table: {parent_path}/tables/{init_table_id}") + try: + admin_client.table_admin_client.delete_table( + name=f"{parent_path}/tables/{init_table_id}" + ) + except exceptions.NotFound: + print(f"Table {init_table_id} not found, skipping deletion") + + @pytest.fixture(scope="session") + def authorized_view_id( + self, + admin_client, + project_id, + instance_id, + table_id, + ): + """ + Creates and returns a new temporary authorized view for the test session + + Args: + - admin_client: Client for interacting with the Table Admin API. Supplied by the admin_client fixture. + - project_id: The project ID of the GCP project to test against. Supplied by the project_id fixture. + - instance_id: The ID of the Bigtable instance to test against. Supplied by the instance_id fixture. + - table_id: The ID of the table to create the authorized view for. Supplied by the table_id fixture. + """ + from google.api_core import exceptions + from google.api_core import retry + + retry = retry.Retry( + predicate=retry.if_exception_type(exceptions.FailedPrecondition) + ) + new_view_id = uuid.uuid4().hex[:8] + parent_path = f"projects/{project_id}/instances/{instance_id}/tables/{table_id}" + new_path = f"{parent_path}/authorizedViews/{new_view_id}" + try: + print(f"Creating view: {new_path}") + admin_client.table_admin_client.create_authorized_view( + request={ + "parent": parent_path, + "authorized_view_id": new_view_id, + "authorized_view": { + "subset_view": { + "row_prefixes": [ALLOW_ALL], + "family_subsets": { + TEST_FAMILY: ALL_QUALIFIERS, + TEST_FAMILY_2: ALL_QUALIFIERS, + TEST_AGGREGATE_FAMILY: ALL_QUALIFIERS, + }, + }, + }, + }, + retry=retry, + ) + except exceptions.AlreadyExists: + pass + except exceptions.MethodNotImplemented: + # will occur when run in emulator. Pass empty id + new_view_id = None + yield new_view_id + if new_view_id: + print(f"Deleting view: {new_path}") + try: + admin_client.table_admin_client.delete_authorized_view(name=new_path) + except exceptions.NotFound: + print(f"View {new_view_id} not found, skipping deletion") + + @pytest.fixture(scope="session") + def project_id(self, client): + """Returns the project ID from the client.""" + yield client.project diff --git a/tests/system/data/setup_fixtures.py b/tests/system/data/setup_fixtures.py deleted file mode 100644 index 169e2396b..000000000 --- a/tests/system/data/setup_fixtures.py +++ /dev/null @@ -1,210 +0,0 @@ -# Copyright 2023 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Contains a set of pytest fixtures for setting up and populating a -Bigtable database for testing purposes. -""" - -import pytest -import os -import uuid - -from . import TEST_FAMILY, TEST_FAMILY_2, TEST_AGGREGATE_FAMILY - -# authorized view subset to allow all qualifiers -ALLOW_ALL = "" -ALL_QUALIFIERS = {"qualifier_prefixes": [ALLOW_ALL]} - - -@pytest.fixture(scope="session") -def admin_client(): - """ - Client for interacting with Table and Instance admin APIs - """ - from google.cloud.bigtable.client import Client - - client = Client(admin=True) - yield client - - -@pytest.fixture(scope="session") -def instance_id(admin_client, project_id, cluster_config): - """ - Returns BIGTABLE_TEST_INSTANCE if set, otherwise creates a new temporary instance for the test session - """ - from google.cloud.bigtable_admin_v2 import types - from google.api_core import exceptions - from google.cloud.environment_vars import BIGTABLE_EMULATOR - - # use user-specified instance if available - user_specified_instance = os.getenv("BIGTABLE_TEST_INSTANCE") - if user_specified_instance: - print("Using user-specified instance: {}".format(user_specified_instance)) - yield user_specified_instance - return - - # create a new temporary test instance - instance_id = f"python-bigtable-tests-{uuid.uuid4().hex[:6]}" - if os.getenv(BIGTABLE_EMULATOR): - # don't create instance if in emulator mode - yield instance_id - else: - try: - operation = admin_client.instance_admin_client.create_instance( - parent=f"projects/{project_id}", - instance_id=instance_id, - instance=types.Instance( - display_name="Test Instance", - # labels={"python-system-test": "true"}, - ), - clusters=cluster_config, - ) - operation.result(timeout=240) - except exceptions.AlreadyExists: - pass - yield instance_id - admin_client.instance_admin_client.delete_instance( - name=f"projects/{project_id}/instances/{instance_id}" - ) - - -@pytest.fixture(scope="session") -def column_split_config(): - """ - specify initial splits to create when creating a new test table - """ - return [(num * 1000).to_bytes(8, "big") for num in range(1, 10)] - - -@pytest.fixture(scope="session") -def table_id( - admin_client, - project_id, - instance_id, - column_family_config, - init_table_id, - column_split_config, -): - """ - Returns BIGTABLE_TEST_TABLE if set, otherwise creates a new temporary table for the test session - - Args: - - admin_client: Client for interacting with the Table Admin API. Supplied by the admin_client fixture. - - project_id: The project ID of the GCP project to test against. Supplied by the project_id fixture. - - instance_id: The ID of the Bigtable instance to test against. Supplied by the instance_id fixture. - - init_column_families: A list of column families to initialize the table with, if pre-initialized table is not given with BIGTABLE_TEST_TABLE. - Supplied by the init_column_families fixture. - - init_table_id: The table ID to give to the test table, if pre-initialized table is not given with BIGTABLE_TEST_TABLE. - Supplied by the init_table_id fixture. - - column_split_config: A list of row keys to use as initial splits when creating the test table. - """ - from google.api_core import exceptions - from google.api_core import retry - - # use user-specified instance if available - user_specified_table = os.getenv("BIGTABLE_TEST_TABLE") - if user_specified_table: - print("Using user-specified table: {}".format(user_specified_table)) - yield user_specified_table - return - - retry = retry.Retry( - predicate=retry.if_exception_type(exceptions.FailedPrecondition) - ) - try: - parent_path = f"projects/{project_id}/instances/{instance_id}" - print(f"Creating table: {parent_path}/tables/{init_table_id}") - admin_client.table_admin_client.create_table( - request={ - "parent": parent_path, - "table_id": init_table_id, - "table": {"column_families": column_family_config}, - "initial_splits": [{"key": key} for key in column_split_config], - }, - retry=retry, - ) - except exceptions.AlreadyExists: - pass - yield init_table_id - print(f"Deleting table: {parent_path}/tables/{init_table_id}") - try: - admin_client.table_admin_client.delete_table( - name=f"{parent_path}/tables/{init_table_id}" - ) - except exceptions.NotFound: - print(f"Table {init_table_id} not found, skipping deletion") - - -@pytest.fixture(scope="session") -def authorized_view_id( - admin_client, - project_id, - instance_id, - table_id, -): - """ - Creates and returns a new temporary authorized view for the test session - - Args: - - admin_client: Client for interacting with the Table Admin API. Supplied by the admin_client fixture. - - project_id: The project ID of the GCP project to test against. Supplied by the project_id fixture. - - instance_id: The ID of the Bigtable instance to test against. Supplied by the instance_id fixture. - - table_id: The ID of the table to create the authorized view for. Supplied by the table_id fixture. - """ - from google.api_core import exceptions - from google.api_core import retry - - retry = retry.Retry( - predicate=retry.if_exception_type(exceptions.FailedPrecondition) - ) - new_view_id = uuid.uuid4().hex[:8] - parent_path = f"projects/{project_id}/instances/{instance_id}/tables/{table_id}" - new_path = f"{parent_path}/authorizedViews/{new_view_id}" - try: - print(f"Creating view: {new_path}") - admin_client.table_admin_client.create_authorized_view( - request={ - "parent": parent_path, - "authorized_view_id": new_view_id, - "authorized_view": { - "subset_view": { - "row_prefixes": [ALLOW_ALL], - "family_subsets": { - TEST_FAMILY: ALL_QUALIFIERS, - TEST_FAMILY_2: ALL_QUALIFIERS, - TEST_AGGREGATE_FAMILY: ALL_QUALIFIERS, - }, - }, - }, - }, - retry=retry, - ) - except exceptions.AlreadyExists: - pass - except exceptions.MethodNotImplemented: - # will occur when run in emulator. Pass empty id - new_view_id = None - yield new_view_id - if new_view_id: - print(f"Deleting view: {new_path}") - try: - admin_client.table_admin_client.delete_authorized_view(name=new_path) - except exceptions.NotFound: - print(f"View {new_view_id} not found, skipping deletion") - - -@pytest.fixture(scope="session") -def project_id(client): - """Returns the project ID from the client.""" - yield client.project diff --git a/tests/system/data/test_metrics_async.py b/tests/system/data/test_metrics_async.py new file mode 100644 index 000000000..3e19ef368 --- /dev/null +++ b/tests/system/data/test_metrics_async.py @@ -0,0 +1,943 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio +import os +import pytest +import uuid + +from grpc import StatusCode + +from google.api_core.exceptions import Aborted +from google.api_core.exceptions import GoogleAPICallError +from google.api_core.exceptions import PermissionDenied +from google.cloud.bigtable.data._metrics.handlers._base import MetricsHandler +from google.cloud.bigtable.data._metrics.data_model import ( + CompletedOperationMetric, + CompletedAttemptMetric, +) +from google.cloud.bigtable_v2.types import ResponseParams + +from google.cloud.bigtable.data._cross_sync import CrossSync + +from . import TEST_FAMILY, SystemTestRunner + +if CrossSync.is_async: + from grpc.aio import UnaryUnaryClientInterceptor + from grpc.aio import UnaryStreamClientInterceptor + from grpc.aio import AioRpcError + from grpc.aio import Metadata +else: + from grpc import UnaryUnaryClientInterceptor + from grpc import UnaryStreamClientInterceptor + from grpc import RpcError + from grpc import intercept_channel + +__CROSS_SYNC_OUTPUT__ = "tests.system.data.test_metrics_autogen" + + +class _MetricsTestHandler(MetricsHandler): + """ + Store completed metrics events in internal lists for testing + """ + + def __init__(self, **kwargs): + self.completed_operations = [] + self.completed_attempts = [] + + def on_operation_complete(self, op): + self.completed_operations.append(op) + + def on_attempt_complete(self, attempt, _): + self.completed_attempts.append(attempt) + + def clear(self): + self.completed_operations.clear() + self.completed_attempts.clear() + + def __repr__(self): + return f"{self.__class__}(completed_operations={len(self.completed_operations)}, completed_attempts={len(self.completed_attempts)}" + + +@CrossSync.convert_class +class _ErrorInjectorInterceptor( + UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor +): + """ + Gprc interceptor used to inject errors into rpc calls, to test failures + """ + + def __init__(self): + self._exc_list = [] + self.fail_mid_stream = False + + def push(self, exc: Exception): + self._exc_list.append(exc) + + def clear(self): + self._exc_list.clear() + self.fail_mid_stream = False + + @CrossSync.convert + async def intercept_unary_unary(self, continuation, client_call_details, request): + if self._exc_list: + raise self._exc_list.pop(0) + return await continuation(client_call_details, request) + + @CrossSync.convert + async def intercept_unary_stream(self, continuation, client_call_details, request): + if not self.fail_mid_stream and self._exc_list: + raise self._exc_list.pop(0) + + response = await continuation(client_call_details, request) + + if self.fail_mid_stream and self._exc_list: + exc = self._exc_list.pop(0) + + class CallWrapper: + def __init__(self, call, exc_to_raise): + self._call = call + self._exc = exc_to_raise + self._raised = False + + @CrossSync.convert(sync_name="__iter__") + def __aiter__(self): + return self + + @CrossSync.convert( + sync_name="__next__", replace_symbols={"__anext__": "__next__"} + ) + async def __anext__(self): + if not self._raised: + self._raised = True + if self._exc: + raise self._exc + return await self._call.__anext__() + + def __getattr__(self, name): + return getattr(self._call, name) + + return CallWrapper(response, exc) + + return response + + +@CrossSync.convert_class(sync_name="TestMetrics") +class TestMetricsAsync(SystemTestRunner): + def _make_client(self): + project = os.getenv("GOOGLE_CLOUD_PROJECT") or None + return CrossSync.DataClient(project=project) + + def _make_exception(self, status, cluster_id=None, zone_id=None): + if cluster_id or zone_id: + metadata = ( + "x-goog-ext-425905942-bin", + ResponseParams.serialize( + ResponseParams(cluster_id=cluster_id, zone_id=zone_id) + ), + ) + else: + metadata = None + if CrossSync.is_async: + metadata = Metadata(metadata) if metadata else Metadata() + return AioRpcError(status, Metadata(), metadata) + else: + exc = RpcError(status) + exc.trailing_metadata = lambda: [metadata] if metadata else [] + exc.initial_metadata = lambda: [] + exc.code = lambda: status + exc.details = lambda: None + + def _result(): + raise exc + + exc.result = _result + return exc + + @pytest.fixture(scope="session") + def handler(self): + return _MetricsTestHandler() + + @pytest.fixture(scope="session") + def error_injector(self): + return _ErrorInjectorInterceptor() + + @CrossSync.convert + @CrossSync.pytest_fixture(scope="function", autouse=True) + async def _clear_state(self, handler, error_injector): + """Clear handler and interceptor between each test""" + handler.clear() + error_injector.clear() + + @CrossSync.convert + @CrossSync.pytest_fixture(scope="session") + async def client(self, error_injector): + async with self._make_client() as client: + if CrossSync.is_async: + client.transport.grpc_channel._unary_unary_interceptors.append( + error_injector + ) + client.transport.grpc_channel._unary_stream_interceptors.append( + error_injector + ) + else: + # inject interceptor after bigtable metrics interceptors + metrics_channel = client.transport._grpc_channel._channel._channel + client.transport._grpc_channel._channel._channel = intercept_channel( + metrics_channel, error_injector + ) + yield client + + @CrossSync.convert + @CrossSync.pytest_fixture(scope="function") + async def temp_rows(self, table): + builder = CrossSync.TempRowBuilder(table) + yield builder + await builder.delete_rows() + + @CrossSync.convert + @CrossSync.pytest_fixture(scope="session") + async def table(self, client, table_id, instance_id, handler): + async with client.get_table(instance_id, table_id) as table: + table._metrics.add_handler(handler) + yield table + + @CrossSync.convert + @CrossSync.pytest_fixture(scope="session") + async def authorized_view( + self, client, table_id, instance_id, authorized_view_id, handler + ): + async with client.get_authorized_view( + instance_id, table_id, authorized_view_id + ) as table: + table._metrics.add_handler(handler) + yield table + + @CrossSync.pytest + async def test_mutate_row(self, table, temp_rows, handler, cluster_config): + row_key = b"mutate" + new_value = uuid.uuid4().hex.encode() + row_key, mutation = await temp_rows.create_row_and_mutation( + table, new_value=new_value + ) + handler.clear() + await table.mutate_row(row_key, mutation) + # validate counts + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + # validate operation + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.value[0] == 0 + assert operation.is_streaming is False + assert operation.op_type.value == "MutateRow" + assert len(operation.completed_attempts) == 1 + assert operation.completed_attempts[0] == handler.completed_attempts[0] + assert operation.cluster_id == next(iter(cluster_config.keys())) + assert ( + operation.zone + == cluster_config[operation.cluster_id].location.split("/")[-1] + ) + assert operation.duration_ns > 0 and operation.duration_ns < 1e9 + assert ( + operation.first_response_latency_ns is None + ) # populated for read_rows only + assert operation.flow_throttling_time_ns == 0 + # validate attempt + attempt = handler.completed_attempts[0] + assert isinstance(attempt, CompletedAttemptMetric) + assert attempt.duration_ns > 0 and attempt.duration_ns < operation.duration_ns + assert attempt.end_status.value[0] == 0 + assert attempt.backoff_before_attempt_ns == 0 + assert ( + attempt.gfe_latency_ns > 0 and attempt.gfe_latency_ns < attempt.duration_ns + ) + assert attempt.application_blocking_time_ns == 0 + + @CrossSync.pytest + async def test_mutate_row_failure_with_retries( + self, table, handler, error_injector + ): + """ + Test failure in grpc layer by injecting errors into an interceptor + with retryable errors, then a terminal one + """ + from google.cloud.bigtable.data.mutations import SetCell + + row_key = b"row_key_1" + mutation = SetCell(TEST_FAMILY, b"q", b"v") + + handler.clear() + expected_zone = "my_zone" + expected_cluster = "my_cluster" + num_retryable = 2 + for i in range(num_retryable): + error_injector.push( + self._make_exception(StatusCode.ABORTED, cluster_id=expected_cluster) + ) + error_injector.push( + self._make_exception(StatusCode.PERMISSION_DENIED, zone_id=expected_zone) + ) + with pytest.raises(PermissionDenied): + await table.mutate_row(row_key, [mutation], retryable_errors=[Aborted]) + # validate counts + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == num_retryable + 1 + # validate operation + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "PERMISSION_DENIED" + assert operation.op_type.value == "MutateRow" + assert operation.is_streaming is False + assert len(operation.completed_attempts) == num_retryable + 1 + assert operation.cluster_id == expected_cluster + assert operation.zone == expected_zone + # validate attempts + for i in range(num_retryable): + attempt = handler.completed_attempts[i] + assert isinstance(attempt, CompletedAttemptMetric) + assert attempt.end_status.name == "ABORTED" + assert attempt.gfe_latency_ns is None + final_attempt = handler.completed_attempts[num_retryable] + assert isinstance(final_attempt, CompletedAttemptMetric) + assert final_attempt.end_status.name == "PERMISSION_DENIED" + assert final_attempt.gfe_latency_ns is None + + @CrossSync.pytest + async def test_mutate_row_failure_timeout(self, table, temp_rows, handler): + """ + Test failure in gapic layer by passing very low timeout + + No grpc headers expected + """ + from google.cloud.bigtable.data.mutations import SetCell + + row_key = b"row_key_1" + mutation = SetCell(TEST_FAMILY, b"q", b"v") + + with pytest.raises(GoogleAPICallError): + await table.mutate_row(row_key, [mutation], operation_timeout=0.001) + # validate counts + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + # validate operation + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "DEADLINE_EXCEEDED" + assert operation.op_type.value == "MutateRow" + assert operation.is_streaming is False + assert len(operation.completed_attempts) == 1 + assert operation.cluster_id == "" + assert operation.zone == "global" + # validate attempt + attempt = handler.completed_attempts[0] + assert isinstance(attempt, CompletedAttemptMetric) + assert attempt.end_status.name == "DEADLINE_EXCEEDED" + assert attempt.gfe_latency_ns is None + + @CrossSync.pytest + async def test_mutate_row_failure_unauthorized( + self, handler, authorized_view, cluster_config + ): + """ + Test failure in backend by accessing an unauthorized family + """ + from google.cloud.bigtable.data.mutations import SetCell + + row_key = b"row_key_1" + mutation = SetCell("unauthorized", b"q", b"v") + + with pytest.raises(GoogleAPICallError) as e: + await authorized_view.mutate_row(row_key, [mutation]) + assert e.value.grpc_status_code.name == "PERMISSION_DENIED" + # validate counts + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + # validate operation + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "PERMISSION_DENIED" + assert operation.op_type.value == "MutateRow" + assert operation.is_streaming is False + assert len(operation.completed_attempts) == 1 + assert operation.cluster_id == next(iter(cluster_config.keys())) + assert ( + operation.zone + == cluster_config[operation.cluster_id].location.split("/")[-1] + ) + # validate attempt + attempt = handler.completed_attempts[0] + assert isinstance(attempt, CompletedAttemptMetric) + assert attempt.end_status.name == "PERMISSION_DENIED" + assert ( + attempt.gfe_latency_ns >= 0 + and attempt.gfe_latency_ns < operation.duration_ns + ) + + @CrossSync.pytest + async def test_mutate_row_failure_unauthorized_with_retries( + self, handler, authorized_view, cluster_config + ): + """ + retry unauthorized request multiple times before timing out + """ + from google.cloud.bigtable.data.mutations import SetCell + + row_key = b"row_key_1" + mutation = SetCell("unauthorized", b"q", b"v") + + with pytest.raises(GoogleAPICallError) as e: + await authorized_view.mutate_row( + row_key, + [mutation], + retryable_errors=[PermissionDenied], + operation_timeout=0.5, + ) + assert e.value.grpc_status_code.name == "DEADLINE_EXCEEDED" + # validate counts + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) > 1 + # validate operation + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "DEADLINE_EXCEEDED" + assert operation.op_type.value == "MutateRow" + assert operation.is_streaming is False + assert len(operation.completed_attempts) > 1 + assert operation.cluster_id == next(iter(cluster_config.keys())) + assert ( + operation.zone + == cluster_config[operation.cluster_id].location.split("/")[-1] + ) + # validate attempts + for attempt in handler.completed_attempts: + assert attempt.end_status.name in ["PERMISSION_DENIED", "DEADLINE_EXCEEDED"] + + @CrossSync.pytest + async def test_sample_row_keys(self, table, temp_rows, handler, cluster_config): + await table.sample_row_keys() + # validate counts + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + # validate operation + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.value[0] == 0 + assert operation.is_streaming is False + assert operation.op_type.value == "SampleRowKeys" + assert len(operation.completed_attempts) == 1 + assert operation.completed_attempts[0] == handler.completed_attempts[0] + assert operation.cluster_id == next(iter(cluster_config.keys())) + assert ( + operation.zone + == cluster_config[operation.cluster_id].location.split("/")[-1] + ) + assert operation.duration_ns > 0 and operation.duration_ns < 1e9 + assert ( + operation.first_response_latency_ns is None + ) # populated for read_rows only + assert operation.flow_throttling_time_ns == 0 + # validate attempt + attempt = handler.completed_attempts[0] + assert isinstance(attempt, CompletedAttemptMetric) + assert attempt.duration_ns > 0 and attempt.duration_ns < operation.duration_ns + assert attempt.end_status.value[0] == 0 + assert attempt.backoff_before_attempt_ns == 0 + assert ( + attempt.gfe_latency_ns > 0 and attempt.gfe_latency_ns < attempt.duration_ns + ) + assert attempt.application_blocking_time_ns == 0 + + @CrossSync.drop + @CrossSync.pytest + async def test_sample_row_keys_failure_cancelled( + self, table, temp_rows, handler, error_injector + ): + """ + Test failure in grpc layer by injecting errors into an interceptor + test with retryable errors, then a terminal one + + No headers expected + """ + num_retryable = 3 + for i in range(num_retryable): + error_injector.push(self._make_exception(StatusCode.ABORTED)) + error_injector.push(asyncio.CancelledError) + with pytest.raises(asyncio.CancelledError): + await table.sample_row_keys(retryable_errors=[Aborted]) + # validate counts + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == num_retryable + 1 + # validate operation + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "UNKNOWN" + assert operation.op_type.value == "SampleRowKeys" + assert operation.is_streaming is False + assert len(operation.completed_attempts) == num_retryable + 1 + assert operation.completed_attempts[0] == handler.completed_attempts[0] + assert operation.cluster_id == "" + assert operation.zone == "global" + # validate attempts + for i in range(num_retryable): + attempt = handler.completed_attempts[i] + assert isinstance(attempt, CompletedAttemptMetric) + assert attempt.end_status.name == "ABORTED" + assert attempt.gfe_latency_ns is None + final_attempt = handler.completed_attempts[num_retryable] + assert isinstance(final_attempt, CompletedAttemptMetric) + assert final_attempt.end_status.name == "UNKNOWN" + assert final_attempt.gfe_latency_ns is None + + @CrossSync.pytest + async def test_sample_row_keys_failure_with_retries( + self, table, temp_rows, handler, error_injector, cluster_config + ): + """ + Test failure in grpc layer by injecting errors into an interceptor + with retryable errors, then a success + """ + num_retryable = 3 + for i in range(num_retryable): + error_injector.push(self._make_exception(StatusCode.ABORTED)) + await table.sample_row_keys(retryable_errors=[Aborted]) + # validate counts + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == num_retryable + 1 + # validate operation + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "OK" + assert operation.op_type.value == "SampleRowKeys" + assert operation.is_streaming is False + assert len(operation.completed_attempts) == num_retryable + 1 + assert operation.completed_attempts[0] == handler.completed_attempts[0] + assert operation.cluster_id == next(iter(cluster_config.keys())) + assert ( + operation.zone + == cluster_config[operation.cluster_id].location.split("/")[-1] + ) + # validate attempts + for i in range(num_retryable): + attempt = handler.completed_attempts[i] + assert isinstance(attempt, CompletedAttemptMetric) + assert attempt.end_status.name == "ABORTED" + assert attempt.gfe_latency_ns is None + final_attempt = handler.completed_attempts[num_retryable] + assert isinstance(final_attempt, CompletedAttemptMetric) + assert final_attempt.end_status.name == "OK" + assert ( + final_attempt.gfe_latency_ns > 0 + and final_attempt.gfe_latency_ns < operation.duration_ns + ) + + @CrossSync.pytest + async def test_sample_row_keys_failure_timeout(self, table, handler): + """ + Test failure in gapic layer by passing very low timeout + + No grpc headers expected + """ + with pytest.raises(GoogleAPICallError): + await table.sample_row_keys(operation_timeout=0.001) + # validate counts + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + # validate operation + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "DEADLINE_EXCEEDED" + assert operation.op_type.value == "SampleRowKeys" + assert operation.is_streaming is False + assert len(operation.completed_attempts) == 1 + assert operation.cluster_id == "" + assert operation.zone == "global" + # validate attempt + attempt = handler.completed_attempts[0] + assert isinstance(attempt, CompletedAttemptMetric) + assert attempt.end_status.name == "DEADLINE_EXCEEDED" + assert attempt.gfe_latency_ns is None + + @CrossSync.pytest + async def test_sample_row_keys_failure_mid_stream( + self, table, temp_rows, handler, error_injector + ): + """ + Test failure in grpc stream + """ + error_injector.fail_mid_stream = True + error_injector.push(self._make_exception(StatusCode.ABORTED)) + error_injector.push(self._make_exception(StatusCode.PERMISSION_DENIED)) + with pytest.raises(PermissionDenied): + await table.sample_row_keys(retryable_errors=[Aborted]) + # validate counts + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 2 + # validate operation + operation = handler.completed_operations[0] + assert operation.final_status.name == "PERMISSION_DENIED" + assert operation.op_type.value == "SampleRowKeys" + assert operation.is_streaming is False + assert len(operation.completed_attempts) == 2 + # validate retried attempt + attempt = handler.completed_attempts[0] + assert attempt.end_status.name == "ABORTED" + # validate final attempt + final_attempt = handler.completed_attempts[-1] + assert final_attempt.end_status.name == "PERMISSION_DENIED" + + @CrossSync.pytest + async def test_read_modify_write(self, table, temp_rows, handler, cluster_config): + from google.cloud.bigtable.data.read_modify_write_rules import IncrementRule + + row_key = b"test-row-key" + family = TEST_FAMILY + qualifier = b"test-qualifier" + await temp_rows.add_row(row_key, value=0, family=family, qualifier=qualifier) + rule = IncrementRule(family, qualifier, 1) + await table.read_modify_write_row(row_key, rule) + # validate counts + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + # validate operation + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.value[0] == 0 + assert operation.is_streaming is False + assert operation.op_type.value == "ReadModifyWriteRow" + assert len(operation.completed_attempts) == 1 + assert operation.completed_attempts[0] == handler.completed_attempts[0] + assert operation.cluster_id == next(iter(cluster_config.keys())) + assert ( + operation.zone + == cluster_config[operation.cluster_id].location.split("/")[-1] + ) + assert operation.duration_ns > 0 and operation.duration_ns < 1e9 + assert ( + operation.first_response_latency_ns is None + ) # populated for read_rows only + assert operation.flow_throttling_time_ns == 0 + # validate attempt + attempt = handler.completed_attempts[0] + assert isinstance(attempt, CompletedAttemptMetric) + assert attempt.duration_ns > 0 and attempt.duration_ns < operation.duration_ns + assert attempt.end_status.value[0] == 0 + assert attempt.backoff_before_attempt_ns == 0 + assert ( + attempt.gfe_latency_ns > 0 and attempt.gfe_latency_ns < attempt.duration_ns + ) + assert attempt.application_blocking_time_ns == 0 + + @CrossSync.drop + @CrossSync.pytest + async def test_read_modify_write_failure_cancelled( + self, table, temp_rows, handler, error_injector + ): + """ + Test failure in grpc layer by injecting an error into an interceptor + + No headers expected + """ + from google.cloud.bigtable.data.read_modify_write_rules import IncrementRule + + row_key = b"test-row-key" + family = TEST_FAMILY + qualifier = b"test-qualifier" + await temp_rows.add_row(row_key, value=0, family=family, qualifier=qualifier) + rule = IncrementRule(family, qualifier, 1) + + # trigger an exception + exc = asyncio.CancelledError("injected") + error_injector.push(exc) + with pytest.raises(asyncio.CancelledError): + await table.read_modify_write_row(row_key, rule) + + # validate counts + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + # validate operation + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "UNKNOWN" + assert operation.is_streaming is False + assert operation.op_type.value == "ReadModifyWriteRow" + assert len(operation.completed_attempts) == len(handler.completed_attempts) + assert operation.completed_attempts == handler.completed_attempts + assert operation.cluster_id == "" + assert operation.zone == "global" + assert operation.duration_ns > 0 and operation.duration_ns < 1e9 + assert ( + operation.first_response_latency_ns is None + ) # populated for read_rows only + assert operation.flow_throttling_time_ns == 0 + # validate attempt + attempt = handler.completed_attempts[0] + assert isinstance(attempt, CompletedAttemptMetric) + assert attempt.duration_ns > 0 + assert attempt.end_status.name == "UNKNOWN" + assert attempt.backoff_before_attempt_ns == 0 + assert attempt.gfe_latency_ns is None + assert attempt.application_blocking_time_ns == 0 + + @CrossSync.pytest + async def test_read_modify_write_failure_timeout(self, table, temp_rows, handler): + """ + Test failure in gapic layer by passing very low timeout + + No grpc headers expected + """ + from google.cloud.bigtable.data.read_modify_write_rules import IncrementRule + + row_key = b"test-row-key" + family = TEST_FAMILY + qualifier = b"test-qualifier" + await temp_rows.add_row(row_key, value=0, family=family, qualifier=qualifier) + rule = IncrementRule(family, qualifier, 1) + with pytest.raises(GoogleAPICallError): + await table.read_modify_write_row(row_key, rule, operation_timeout=0.001) + # validate counts + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + # validate operation + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "DEADLINE_EXCEEDED" + assert operation.op_type.value == "ReadModifyWriteRow" + assert operation.cluster_id == "" + assert operation.zone == "global" + # validate attempt + attempt = handler.completed_attempts[0] + assert attempt.gfe_latency_ns is None + + @CrossSync.pytest + async def test_read_modify_write_failure_unauthorized( + self, handler, authorized_view, cluster_config + ): + """ + Test failure in backend by accessing an unauthorized family + """ + from google.cloud.bigtable.data.read_modify_write_rules import IncrementRule + + row_key = b"test-row-key" + qualifier = b"test-qualifier" + rule = IncrementRule("unauthorized", qualifier, 1) + with pytest.raises(GoogleAPICallError): + await authorized_view.read_modify_write_row(row_key, rule) + # validate counts + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + # validate operation + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "PERMISSION_DENIED" + assert operation.op_type.value == "ReadModifyWriteRow" + assert operation.cluster_id == next(iter(cluster_config.keys())) + assert ( + operation.zone + == cluster_config[operation.cluster_id].location.split("/")[-1] + ) + # validate attempt + attempt = handler.completed_attempts[0] + assert ( + attempt.gfe_latency_ns >= 0 + and attempt.gfe_latency_ns < operation.duration_ns + ) + + @CrossSync.pytest + async def test_check_and_mutate_row( + self, table, temp_rows, handler, cluster_config + ): + from google.cloud.bigtable.data.mutations import SetCell + from google.cloud.bigtable.data.row_filters import ValueRangeFilter + + row_key = b"test-row-key" + family = TEST_FAMILY + qualifier = b"test-qualifier" + await temp_rows.add_row(row_key, value=1, family=family, qualifier=qualifier) + + true_mutation_value = b"true-mutation-value" + true_mutation = SetCell( + family=TEST_FAMILY, qualifier=qualifier, new_value=true_mutation_value + ) + predicate = ValueRangeFilter(0, 2) + await table.check_and_mutate_row( + row_key, + predicate, + true_case_mutations=true_mutation, + ) + # validate counts + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + # validate operation + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.value[0] == 0 + assert operation.is_streaming is False + assert operation.op_type.value == "CheckAndMutateRow" + assert len(operation.completed_attempts) == 1 + assert operation.completed_attempts[0] == handler.completed_attempts[0] + assert operation.cluster_id == next(iter(cluster_config.keys())) + assert ( + operation.zone + == cluster_config[operation.cluster_id].location.split("/")[-1] + ) + assert operation.duration_ns > 0 and operation.duration_ns < 1e9 + assert ( + operation.first_response_latency_ns is None + ) # populated for read_rows only + assert operation.flow_throttling_time_ns == 0 + # validate attempt + attempt = handler.completed_attempts[0] + assert isinstance(attempt, CompletedAttemptMetric) + assert attempt.duration_ns > 0 and attempt.duration_ns < operation.duration_ns + assert attempt.end_status.value[0] == 0 + assert attempt.backoff_before_attempt_ns == 0 + assert ( + attempt.gfe_latency_ns > 0 and attempt.gfe_latency_ns < attempt.duration_ns + ) + assert attempt.application_blocking_time_ns == 0 + + @CrossSync.drop + @CrossSync.pytest + async def test_check_and_mutate_row_failure_cancelled( + self, table, temp_rows, handler, error_injector + ): + """ + Test failure in grpc layer by injecting an error into an interceptor + + No headers expected + """ + from google.cloud.bigtable.data.row_filters import ValueRangeFilter + + row_key = b"test-row-key" + family = TEST_FAMILY + qualifier = b"test-qualifier" + await temp_rows.add_row(row_key, value=1, family=family, qualifier=qualifier) + + # trigger an exception + exc = asyncio.CancelledError("injected") + error_injector.push(exc) + with pytest.raises(asyncio.CancelledError): + await table.check_and_mutate_row( + row_key, + predicate=ValueRangeFilter(0, 2), + ) + # validate counts + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + # validate operation + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "UNKNOWN" + assert operation.is_streaming is False + assert operation.op_type.value == "CheckAndMutateRow" + assert len(operation.completed_attempts) == len(handler.completed_attempts) + assert operation.completed_attempts == handler.completed_attempts + assert operation.cluster_id == "" + assert operation.zone == "global" + assert operation.duration_ns > 0 and operation.duration_ns < 1e9 + assert ( + operation.first_response_latency_ns is None + ) # populated for read_rows only + assert operation.flow_throttling_time_ns == 0 + # validate attempt + attempt = handler.completed_attempts[0] + assert isinstance(attempt, CompletedAttemptMetric) + assert attempt.duration_ns > 0 + assert attempt.end_status.name == "UNKNOWN" + assert attempt.backoff_before_attempt_ns == 0 + assert attempt.gfe_latency_ns is None + assert attempt.application_blocking_time_ns == 0 + + @CrossSync.pytest + async def test_check_and_mutate_row_failure_timeout( + self, table, temp_rows, handler + ): + """ + Test failure in gapic layer by passing very low timeout + + No grpc headers expected + """ + from google.cloud.bigtable.data.mutations import SetCell + from google.cloud.bigtable.data.row_filters import ValueRangeFilter + + row_key = b"test-row-key" + family = TEST_FAMILY + qualifier = b"test-qualifier" + await temp_rows.add_row(row_key, value=1, family=family, qualifier=qualifier) + + true_mutation_value = b"true-mutation-value" + true_mutation = SetCell( + family=TEST_FAMILY, qualifier=qualifier, new_value=true_mutation_value + ) + with pytest.raises(GoogleAPICallError): + await table.check_and_mutate_row( + row_key, + predicate=ValueRangeFilter(0, 2), + true_case_mutations=true_mutation, + operation_timeout=0.001, + ) + # validate counts + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + # validate operation + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "DEADLINE_EXCEEDED" + assert operation.cluster_id == "" + assert operation.zone == "global" + # validate attempt + attempt = handler.completed_attempts[0] + assert attempt.gfe_latency_ns is None + + @CrossSync.pytest + async def test_check_and_mutate_row_failure_unauthorized( + self, handler, authorized_view, cluster_config + ): + """ + Test failure in backend by accessing an unauthorized family + """ + from google.cloud.bigtable.data.mutations import SetCell + from google.cloud.bigtable.data.row_filters import ValueRangeFilter + + row_key = b"test-row-key" + qualifier = b"test-qualifier" + mutation_value = b"true-mutation-value" + mutation = SetCell( + family="unauthorized", qualifier=qualifier, new_value=mutation_value + ) + with pytest.raises(GoogleAPICallError): + await authorized_view.check_and_mutate_row( + row_key, + predicate=ValueRangeFilter(0, 2), + true_case_mutations=mutation, + false_case_mutations=mutation, + ) + # validate counts + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + # validate operation + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "PERMISSION_DENIED" + assert operation.cluster_id == next(iter(cluster_config.keys())) + assert ( + operation.zone + == cluster_config[operation.cluster_id].location.split("/")[-1] + ) + # validate attempt + attempt = handler.completed_attempts[0] + assert ( + attempt.gfe_latency_ns >= 0 + and attempt.gfe_latency_ns < operation.duration_ns + ) diff --git a/tests/system/data/test_metrics_autogen.py b/tests/system/data/test_metrics_autogen.py new file mode 100644 index 000000000..947d25bca --- /dev/null +++ b/tests/system/data/test_metrics_autogen.py @@ -0,0 +1,654 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This file is automatically generated by CrossSync. Do not edit manually. + +import os +import pytest +import uuid +from grpc import StatusCode +from google.api_core.exceptions import Aborted +from google.api_core.exceptions import GoogleAPICallError +from google.api_core.exceptions import PermissionDenied +from google.cloud.bigtable.data._metrics.handlers._base import MetricsHandler +from google.cloud.bigtable.data._metrics.data_model import ( + CompletedOperationMetric, + CompletedAttemptMetric, +) +from google.cloud.bigtable_v2.types import ResponseParams +from google.cloud.bigtable.data._cross_sync import CrossSync +from . import TEST_FAMILY, SystemTestRunner +from grpc import UnaryUnaryClientInterceptor +from grpc import UnaryStreamClientInterceptor +from grpc import RpcError +from grpc import intercept_channel + + +class _MetricsTestHandler(MetricsHandler): + """ + Store completed metrics events in internal lists for testing + """ + + def __init__(self, **kwargs): + self.completed_operations = [] + self.completed_attempts = [] + + def on_operation_complete(self, op): + self.completed_operations.append(op) + + def on_attempt_complete(self, attempt, _): + self.completed_attempts.append(attempt) + + def clear(self): + self.completed_operations.clear() + self.completed_attempts.clear() + + def __repr__(self): + return f"{self.__class__}(completed_operations={len(self.completed_operations)}, completed_attempts={len(self.completed_attempts)}" + + +class _ErrorInjectorInterceptor( + UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor +): + """ + Gprc interceptor used to inject errors into rpc calls, to test failures + """ + + def __init__(self): + self._exc_list = [] + self.fail_mid_stream = False + + def push(self, exc: Exception): + self._exc_list.append(exc) + + def clear(self): + self._exc_list.clear() + self.fail_mid_stream = False + + def intercept_unary_unary(self, continuation, client_call_details, request): + if self._exc_list: + raise self._exc_list.pop(0) + return continuation(client_call_details, request) + + def intercept_unary_stream(self, continuation, client_call_details, request): + if not self.fail_mid_stream and self._exc_list: + raise self._exc_list.pop(0) + response = continuation(client_call_details, request) + if self.fail_mid_stream and self._exc_list: + exc = self._exc_list.pop(0) + + class CallWrapper: + def __init__(self, call, exc_to_raise): + self._call = call + self._exc = exc_to_raise + self._raised = False + + def __iter__(self): + return self + + def __next__(self): + if not self._raised: + self._raised = True + if self._exc: + raise self._exc + return self._call.__next__() + + def __getattr__(self, name): + return getattr(self._call, name) + + return CallWrapper(response, exc) + return response + + +class TestMetrics(SystemTestRunner): + def _make_client(self): + project = os.getenv("GOOGLE_CLOUD_PROJECT") or None + return CrossSync._Sync_Impl.DataClient(project=project) + + def _make_exception(self, status, cluster_id=None, zone_id=None): + if cluster_id or zone_id: + metadata = ( + "x-goog-ext-425905942-bin", + ResponseParams.serialize( + ResponseParams(cluster_id=cluster_id, zone_id=zone_id) + ), + ) + else: + metadata = None + exc = RpcError(status) + exc.trailing_metadata = lambda: [metadata] if metadata else [] + exc.initial_metadata = lambda: [] + exc.code = lambda: status + exc.details = lambda: None + + def _result(): + raise exc + + exc.result = _result + return exc + + @pytest.fixture(scope="session") + def handler(self): + return _MetricsTestHandler() + + @pytest.fixture(scope="session") + def error_injector(self): + return _ErrorInjectorInterceptor() + + @pytest.fixture(scope="function", autouse=True) + def _clear_state(self, handler, error_injector): + """Clear handler and interceptor between each test""" + handler.clear() + error_injector.clear() + + @pytest.fixture(scope="session") + def client(self, error_injector): + with self._make_client() as client: + metrics_channel = client.transport._grpc_channel._channel._channel + client.transport._grpc_channel._channel._channel = intercept_channel( + metrics_channel, error_injector + ) + yield client + + @pytest.fixture(scope="function") + def temp_rows(self, table): + builder = CrossSync._Sync_Impl.TempRowBuilder(table) + yield builder + builder.delete_rows() + + @pytest.fixture(scope="session") + def table(self, client, table_id, instance_id, handler): + with client.get_table(instance_id, table_id) as table: + table._metrics.add_handler(handler) + yield table + + @pytest.fixture(scope="session") + def authorized_view( + self, client, table_id, instance_id, authorized_view_id, handler + ): + with client.get_authorized_view( + instance_id, table_id, authorized_view_id + ) as table: + table._metrics.add_handler(handler) + yield table + + def test_mutate_row(self, table, temp_rows, handler, cluster_config): + row_key = b"mutate" + new_value = uuid.uuid4().hex.encode() + (row_key, mutation) = temp_rows.create_row_and_mutation( + table, new_value=new_value + ) + handler.clear() + table.mutate_row(row_key, mutation) + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.value[0] == 0 + assert operation.is_streaming is False + assert operation.op_type.value == "MutateRow" + assert len(operation.completed_attempts) == 1 + assert operation.completed_attempts[0] == handler.completed_attempts[0] + assert operation.cluster_id == next(iter(cluster_config.keys())) + assert ( + operation.zone + == cluster_config[operation.cluster_id].location.split("/")[-1] + ) + assert operation.duration_ns > 0 and operation.duration_ns < 1000000000.0 + assert operation.first_response_latency_ns is None + assert operation.flow_throttling_time_ns == 0 + attempt = handler.completed_attempts[0] + assert isinstance(attempt, CompletedAttemptMetric) + assert attempt.duration_ns > 0 and attempt.duration_ns < operation.duration_ns + assert attempt.end_status.value[0] == 0 + assert attempt.backoff_before_attempt_ns == 0 + assert ( + attempt.gfe_latency_ns > 0 and attempt.gfe_latency_ns < attempt.duration_ns + ) + assert attempt.application_blocking_time_ns == 0 + + def test_mutate_row_failure_with_retries(self, table, handler, error_injector): + """Test failure in grpc layer by injecting errors into an interceptor + with retryable errors, then a terminal one""" + from google.cloud.bigtable.data.mutations import SetCell + + row_key = b"row_key_1" + mutation = SetCell(TEST_FAMILY, b"q", b"v") + handler.clear() + expected_zone = "my_zone" + expected_cluster = "my_cluster" + num_retryable = 2 + for i in range(num_retryable): + error_injector.push( + self._make_exception(StatusCode.ABORTED, cluster_id=expected_cluster) + ) + error_injector.push( + self._make_exception(StatusCode.PERMISSION_DENIED, zone_id=expected_zone) + ) + with pytest.raises(PermissionDenied): + table.mutate_row(row_key, [mutation], retryable_errors=[Aborted]) + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == num_retryable + 1 + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "PERMISSION_DENIED" + assert operation.op_type.value == "MutateRow" + assert operation.is_streaming is False + assert len(operation.completed_attempts) == num_retryable + 1 + assert operation.cluster_id == expected_cluster + assert operation.zone == expected_zone + for i in range(num_retryable): + attempt = handler.completed_attempts[i] + assert isinstance(attempt, CompletedAttemptMetric) + assert attempt.end_status.name == "ABORTED" + assert attempt.gfe_latency_ns is None + final_attempt = handler.completed_attempts[num_retryable] + assert isinstance(final_attempt, CompletedAttemptMetric) + assert final_attempt.end_status.name == "PERMISSION_DENIED" + assert final_attempt.gfe_latency_ns is None + + def test_mutate_row_failure_timeout(self, table, temp_rows, handler): + """Test failure in gapic layer by passing very low timeout + + No grpc headers expected""" + from google.cloud.bigtable.data.mutations import SetCell + + row_key = b"row_key_1" + mutation = SetCell(TEST_FAMILY, b"q", b"v") + with pytest.raises(GoogleAPICallError): + table.mutate_row(row_key, [mutation], operation_timeout=0.001) + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "DEADLINE_EXCEEDED" + assert operation.op_type.value == "MutateRow" + assert operation.is_streaming is False + assert len(operation.completed_attempts) == 1 + assert operation.cluster_id == "" + assert operation.zone == "global" + attempt = handler.completed_attempts[0] + assert isinstance(attempt, CompletedAttemptMetric) + assert attempt.end_status.name == "DEADLINE_EXCEEDED" + assert attempt.gfe_latency_ns is None + + def test_mutate_row_failure_unauthorized( + self, handler, authorized_view, cluster_config + ): + """Test failure in backend by accessing an unauthorized family""" + from google.cloud.bigtable.data.mutations import SetCell + + row_key = b"row_key_1" + mutation = SetCell("unauthorized", b"q", b"v") + with pytest.raises(GoogleAPICallError) as e: + authorized_view.mutate_row(row_key, [mutation]) + assert e.value.grpc_status_code.name == "PERMISSION_DENIED" + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "PERMISSION_DENIED" + assert operation.op_type.value == "MutateRow" + assert operation.is_streaming is False + assert len(operation.completed_attempts) == 1 + assert operation.cluster_id == next(iter(cluster_config.keys())) + assert ( + operation.zone + == cluster_config[operation.cluster_id].location.split("/")[-1] + ) + attempt = handler.completed_attempts[0] + assert isinstance(attempt, CompletedAttemptMetric) + assert attempt.end_status.name == "PERMISSION_DENIED" + assert ( + attempt.gfe_latency_ns >= 0 + and attempt.gfe_latency_ns < operation.duration_ns + ) + + def test_mutate_row_failure_unauthorized_with_retries( + self, handler, authorized_view, cluster_config + ): + """retry unauthorized request multiple times before timing out""" + from google.cloud.bigtable.data.mutations import SetCell + + row_key = b"row_key_1" + mutation = SetCell("unauthorized", b"q", b"v") + with pytest.raises(GoogleAPICallError) as e: + authorized_view.mutate_row( + row_key, + [mutation], + retryable_errors=[PermissionDenied], + operation_timeout=0.5, + ) + assert e.value.grpc_status_code.name == "DEADLINE_EXCEEDED" + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) > 1 + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "DEADLINE_EXCEEDED" + assert operation.op_type.value == "MutateRow" + assert operation.is_streaming is False + assert len(operation.completed_attempts) > 1 + assert operation.cluster_id == next(iter(cluster_config.keys())) + assert ( + operation.zone + == cluster_config[operation.cluster_id].location.split("/")[-1] + ) + for attempt in handler.completed_attempts: + assert attempt.end_status.name in ["PERMISSION_DENIED", "DEADLINE_EXCEEDED"] + + def test_sample_row_keys(self, table, temp_rows, handler, cluster_config): + table.sample_row_keys() + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.value[0] == 0 + assert operation.is_streaming is False + assert operation.op_type.value == "SampleRowKeys" + assert len(operation.completed_attempts) == 1 + assert operation.completed_attempts[0] == handler.completed_attempts[0] + assert operation.cluster_id == next(iter(cluster_config.keys())) + assert ( + operation.zone + == cluster_config[operation.cluster_id].location.split("/")[-1] + ) + assert operation.duration_ns > 0 and operation.duration_ns < 1000000000.0 + assert operation.first_response_latency_ns is None + assert operation.flow_throttling_time_ns == 0 + attempt = handler.completed_attempts[0] + assert isinstance(attempt, CompletedAttemptMetric) + assert attempt.duration_ns > 0 and attempt.duration_ns < operation.duration_ns + assert attempt.end_status.value[0] == 0 + assert attempt.backoff_before_attempt_ns == 0 + assert ( + attempt.gfe_latency_ns > 0 and attempt.gfe_latency_ns < attempt.duration_ns + ) + assert attempt.application_blocking_time_ns == 0 + + def test_sample_row_keys_failure_with_retries( + self, table, temp_rows, handler, error_injector, cluster_config + ): + """Test failure in grpc layer by injecting errors into an interceptor + with retryable errors, then a success""" + num_retryable = 3 + for i in range(num_retryable): + error_injector.push(self._make_exception(StatusCode.ABORTED)) + table.sample_row_keys(retryable_errors=[Aborted]) + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == num_retryable + 1 + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "OK" + assert operation.op_type.value == "SampleRowKeys" + assert operation.is_streaming is False + assert len(operation.completed_attempts) == num_retryable + 1 + assert operation.completed_attempts[0] == handler.completed_attempts[0] + assert operation.cluster_id == next(iter(cluster_config.keys())) + assert ( + operation.zone + == cluster_config[operation.cluster_id].location.split("/")[-1] + ) + for i in range(num_retryable): + attempt = handler.completed_attempts[i] + assert isinstance(attempt, CompletedAttemptMetric) + assert attempt.end_status.name == "ABORTED" + assert attempt.gfe_latency_ns is None + final_attempt = handler.completed_attempts[num_retryable] + assert isinstance(final_attempt, CompletedAttemptMetric) + assert final_attempt.end_status.name == "OK" + assert ( + final_attempt.gfe_latency_ns > 0 + and final_attempt.gfe_latency_ns < operation.duration_ns + ) + + def test_sample_row_keys_failure_timeout(self, table, handler): + """Test failure in gapic layer by passing very low timeout + + No grpc headers expected""" + with pytest.raises(GoogleAPICallError): + table.sample_row_keys(operation_timeout=0.001) + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "DEADLINE_EXCEEDED" + assert operation.op_type.value == "SampleRowKeys" + assert operation.is_streaming is False + assert len(operation.completed_attempts) == 1 + assert operation.cluster_id == "" + assert operation.zone == "global" + attempt = handler.completed_attempts[0] + assert isinstance(attempt, CompletedAttemptMetric) + assert attempt.end_status.name == "DEADLINE_EXCEEDED" + assert attempt.gfe_latency_ns is None + + def test_sample_row_keys_failure_mid_stream( + self, table, temp_rows, handler, error_injector + ): + """Test failure in grpc stream""" + error_injector.fail_mid_stream = True + error_injector.push(self._make_exception(StatusCode.ABORTED)) + error_injector.push(self._make_exception(StatusCode.PERMISSION_DENIED)) + with pytest.raises(PermissionDenied): + table.sample_row_keys(retryable_errors=[Aborted]) + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 2 + operation = handler.completed_operations[0] + assert operation.final_status.name == "PERMISSION_DENIED" + assert operation.op_type.value == "SampleRowKeys" + assert operation.is_streaming is False + assert len(operation.completed_attempts) == 2 + attempt = handler.completed_attempts[0] + assert attempt.end_status.name == "ABORTED" + final_attempt = handler.completed_attempts[-1] + assert final_attempt.end_status.name == "PERMISSION_DENIED" + + def test_read_modify_write(self, table, temp_rows, handler, cluster_config): + from google.cloud.bigtable.data.read_modify_write_rules import IncrementRule + + row_key = b"test-row-key" + family = TEST_FAMILY + qualifier = b"test-qualifier" + temp_rows.add_row(row_key, value=0, family=family, qualifier=qualifier) + rule = IncrementRule(family, qualifier, 1) + table.read_modify_write_row(row_key, rule) + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.value[0] == 0 + assert operation.is_streaming is False + assert operation.op_type.value == "ReadModifyWriteRow" + assert len(operation.completed_attempts) == 1 + assert operation.completed_attempts[0] == handler.completed_attempts[0] + assert operation.cluster_id == next(iter(cluster_config.keys())) + assert ( + operation.zone + == cluster_config[operation.cluster_id].location.split("/")[-1] + ) + assert operation.duration_ns > 0 and operation.duration_ns < 1000000000.0 + assert operation.first_response_latency_ns is None + assert operation.flow_throttling_time_ns == 0 + attempt = handler.completed_attempts[0] + assert isinstance(attempt, CompletedAttemptMetric) + assert attempt.duration_ns > 0 and attempt.duration_ns < operation.duration_ns + assert attempt.end_status.value[0] == 0 + assert attempt.backoff_before_attempt_ns == 0 + assert ( + attempt.gfe_latency_ns > 0 and attempt.gfe_latency_ns < attempt.duration_ns + ) + assert attempt.application_blocking_time_ns == 0 + + def test_read_modify_write_failure_timeout(self, table, temp_rows, handler): + """Test failure in gapic layer by passing very low timeout + + No grpc headers expected""" + from google.cloud.bigtable.data.read_modify_write_rules import IncrementRule + + row_key = b"test-row-key" + family = TEST_FAMILY + qualifier = b"test-qualifier" + temp_rows.add_row(row_key, value=0, family=family, qualifier=qualifier) + rule = IncrementRule(family, qualifier, 1) + with pytest.raises(GoogleAPICallError): + table.read_modify_write_row(row_key, rule, operation_timeout=0.001) + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "DEADLINE_EXCEEDED" + assert operation.op_type.value == "ReadModifyWriteRow" + assert operation.cluster_id == "" + assert operation.zone == "global" + attempt = handler.completed_attempts[0] + assert attempt.gfe_latency_ns is None + + def test_read_modify_write_failure_unauthorized( + self, handler, authorized_view, cluster_config + ): + """Test failure in backend by accessing an unauthorized family""" + from google.cloud.bigtable.data.read_modify_write_rules import IncrementRule + + row_key = b"test-row-key" + qualifier = b"test-qualifier" + rule = IncrementRule("unauthorized", qualifier, 1) + with pytest.raises(GoogleAPICallError): + authorized_view.read_modify_write_row(row_key, rule) + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "PERMISSION_DENIED" + assert operation.op_type.value == "ReadModifyWriteRow" + assert operation.cluster_id == next(iter(cluster_config.keys())) + assert ( + operation.zone + == cluster_config[operation.cluster_id].location.split("/")[-1] + ) + attempt = handler.completed_attempts[0] + assert ( + attempt.gfe_latency_ns >= 0 + and attempt.gfe_latency_ns < operation.duration_ns + ) + + def test_check_and_mutate_row(self, table, temp_rows, handler, cluster_config): + from google.cloud.bigtable.data.mutations import SetCell + from google.cloud.bigtable.data.row_filters import ValueRangeFilter + + row_key = b"test-row-key" + family = TEST_FAMILY + qualifier = b"test-qualifier" + temp_rows.add_row(row_key, value=1, family=family, qualifier=qualifier) + true_mutation_value = b"true-mutation-value" + true_mutation = SetCell( + family=TEST_FAMILY, qualifier=qualifier, new_value=true_mutation_value + ) + predicate = ValueRangeFilter(0, 2) + table.check_and_mutate_row( + row_key, predicate, true_case_mutations=true_mutation + ) + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.value[0] == 0 + assert operation.is_streaming is False + assert operation.op_type.value == "CheckAndMutateRow" + assert len(operation.completed_attempts) == 1 + assert operation.completed_attempts[0] == handler.completed_attempts[0] + assert operation.cluster_id == next(iter(cluster_config.keys())) + assert ( + operation.zone + == cluster_config[operation.cluster_id].location.split("/")[-1] + ) + assert operation.duration_ns > 0 and operation.duration_ns < 1000000000.0 + assert operation.first_response_latency_ns is None + assert operation.flow_throttling_time_ns == 0 + attempt = handler.completed_attempts[0] + assert isinstance(attempt, CompletedAttemptMetric) + assert attempt.duration_ns > 0 and attempt.duration_ns < operation.duration_ns + assert attempt.end_status.value[0] == 0 + assert attempt.backoff_before_attempt_ns == 0 + assert ( + attempt.gfe_latency_ns > 0 and attempt.gfe_latency_ns < attempt.duration_ns + ) + assert attempt.application_blocking_time_ns == 0 + + def test_check_and_mutate_row_failure_timeout(self, table, temp_rows, handler): + """Test failure in gapic layer by passing very low timeout + + No grpc headers expected""" + from google.cloud.bigtable.data.mutations import SetCell + from google.cloud.bigtable.data.row_filters import ValueRangeFilter + + row_key = b"test-row-key" + family = TEST_FAMILY + qualifier = b"test-qualifier" + temp_rows.add_row(row_key, value=1, family=family, qualifier=qualifier) + true_mutation_value = b"true-mutation-value" + true_mutation = SetCell( + family=TEST_FAMILY, qualifier=qualifier, new_value=true_mutation_value + ) + with pytest.raises(GoogleAPICallError): + table.check_and_mutate_row( + row_key, + predicate=ValueRangeFilter(0, 2), + true_case_mutations=true_mutation, + operation_timeout=0.001, + ) + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "DEADLINE_EXCEEDED" + assert operation.cluster_id == "" + assert operation.zone == "global" + attempt = handler.completed_attempts[0] + assert attempt.gfe_latency_ns is None + + def test_check_and_mutate_row_failure_unauthorized( + self, handler, authorized_view, cluster_config + ): + """Test failure in backend by accessing an unauthorized family""" + from google.cloud.bigtable.data.mutations import SetCell + from google.cloud.bigtable.data.row_filters import ValueRangeFilter + + row_key = b"test-row-key" + qualifier = b"test-qualifier" + mutation_value = b"true-mutation-value" + mutation = SetCell( + family="unauthorized", qualifier=qualifier, new_value=mutation_value + ) + with pytest.raises(GoogleAPICallError): + authorized_view.check_and_mutate_row( + row_key, + predicate=ValueRangeFilter(0, 2), + true_case_mutations=mutation, + false_case_mutations=mutation, + ) + assert len(handler.completed_operations) == 1 + assert len(handler.completed_attempts) == 1 + operation = handler.completed_operations[0] + assert isinstance(operation, CompletedOperationMetric) + assert operation.final_status.name == "PERMISSION_DENIED" + assert operation.cluster_id == next(iter(cluster_config.keys())) + assert ( + operation.zone + == cluster_config[operation.cluster_id].location.split("/")[-1] + ) + attempt = handler.completed_attempts[0] + assert ( + attempt.gfe_latency_ns >= 0 + and attempt.gfe_latency_ns < operation.duration_ns + ) diff --git a/tests/system/data/test_system_async.py b/tests/system/data/test_system_async.py index 39c454996..d0376004b 100644 --- a/tests/system/data/test_system_async.py +++ b/tests/system/data/test_system_async.py @@ -26,7 +26,7 @@ from google.cloud.bigtable.data._cross_sync import CrossSync -from . import TEST_FAMILY, TEST_FAMILY_2, TEST_AGGREGATE_FAMILY +from . import TEST_FAMILY, TEST_FAMILY_2, TEST_AGGREGATE_FAMILY, SystemTestRunner if CrossSync.is_async: from google.cloud.bigtable_v2.services.bigtable.transports.grpc_asyncio import ( @@ -116,9 +116,43 @@ async def delete_rows(self): } await self.target.client._gapic_client.mutate_rows(request) + @CrossSync.convert + async def retrieve_cell_value(self, target, row_key): + """ + Helper to read an individual row + """ + from google.cloud.bigtable.data import ReadRowsQuery + + row_list = await target.read_rows(ReadRowsQuery(row_keys=row_key)) + assert len(row_list) == 1 + row = row_list[0] + cell = row.cells[0] + return cell.value + + @CrossSync.convert + async def create_row_and_mutation( + self, table, *, start_value=b"start", new_value=b"new_value" + ): + """ + Helper to create a new row, and a sample set_cell mutation to change its value + """ + from google.cloud.bigtable.data.mutations import SetCell + + row_key = uuid.uuid4().hex.encode() + family = TEST_FAMILY + qualifier = b"test-qualifier" + await self.add_row( + row_key, family=family, qualifier=qualifier, value=start_value + ) + # ensure cell is initialized + assert await self.retrieve_cell_value(table, row_key) == start_value + + mutation = SetCell(family=TEST_FAMILY, qualifier=qualifier, new_value=new_value) + return row_key, mutation + @CrossSync.convert_class(sync_name="TestSystem") -class TestSystemAsync: +class TestSystemAsync(SystemTestRunner): def _make_client(self): project = os.getenv("GOOGLE_CLOUD_PROJECT") or None return CrossSync.DataClient(project=project) @@ -148,82 +182,6 @@ async def target(self, client, table_id, authorized_view_id, instance_id, reques else: raise ValueError(f"unknown target type: {request.param}") - @pytest.fixture(scope="session") - def column_family_config(self): - """ - specify column families to create when creating a new test table - """ - from google.cloud.bigtable_admin_v2 import types - - int_aggregate_type = types.Type.Aggregate( - input_type=types.Type(int64_type={"encoding": {"big_endian_bytes": {}}}), - sum={}, - ) - return { - TEST_FAMILY: types.ColumnFamily(), - TEST_FAMILY_2: types.ColumnFamily(), - TEST_AGGREGATE_FAMILY: types.ColumnFamily( - value_type=types.Type(aggregate_type=int_aggregate_type) - ), - } - - @pytest.fixture(scope="session") - def init_table_id(self): - """ - The table_id to use when creating a new test table - """ - return f"test-table-{uuid.uuid4().hex}" - - @pytest.fixture(scope="session") - def cluster_config(self, project_id): - """ - Configuration for the clusters to use when creating a new instance - """ - from google.cloud.bigtable_admin_v2 import types - - cluster = { - "test-cluster": types.Cluster( - location=f"projects/{project_id}/locations/us-central1-b", - serve_nodes=1, - ) - } - return cluster - - @CrossSync.convert - @pytest.mark.usefixtures("target") - async def _retrieve_cell_value(self, target, row_key): - """ - Helper to read an individual row - """ - from google.cloud.bigtable.data import ReadRowsQuery - - row_list = await target.read_rows(ReadRowsQuery(row_keys=row_key)) - assert len(row_list) == 1 - row = row_list[0] - cell = row.cells[0] - return cell.value - - @CrossSync.convert - async def _create_row_and_mutation( - self, table, temp_rows, *, start_value=b"start", new_value=b"new_value" - ): - """ - Helper to create a new row, and a sample set_cell mutation to change its value - """ - from google.cloud.bigtable.data.mutations import SetCell - - row_key = uuid.uuid4().hex.encode() - family = TEST_FAMILY - qualifier = b"test-qualifier" - await temp_rows.add_row( - row_key, family=family, qualifier=qualifier, value=start_value - ) - # ensure cell is initialized - assert await self._retrieve_cell_value(table, row_key) == start_value - - mutation = SetCell(family=TEST_FAMILY, qualifier=qualifier, new_value=new_value) - return row_key, mutation - @CrossSync.convert @CrossSync.pytest_fixture(scope="function") async def temp_rows(self, target): @@ -321,13 +279,13 @@ async def test_mutation_set_cell(self, target, temp_rows): """ row_key = b"bulk_mutate" new_value = uuid.uuid4().hex.encode() - row_key, mutation = await self._create_row_and_mutation( - target, temp_rows, new_value=new_value + row_key, mutation = await temp_rows.create_row_and_mutation( + target, new_value=new_value ) await target.mutate_row(row_key, mutation) # ensure cell is updated - assert (await self._retrieve_cell_value(target, row_key)) == new_value + assert (await temp_rows.retrieve_cell_value(target, row_key)) == new_value @CrossSync.pytest @pytest.mark.usefixtures("target") @@ -349,14 +307,14 @@ async def test_mutation_add_to_cell(self, target, temp_rows): await target.mutate_row( row_key, AddToCell(family, qualifier, 1, timestamp_micros=0) ) - encoded_result = await self._retrieve_cell_value(target, row_key) + encoded_result = await temp_rows.retrieve_cell_value(target, row_key) int_result = int.from_bytes(encoded_result, byteorder="big") assert int_result == 1 # update again await target.mutate_row( row_key, AddToCell(family, qualifier, 9, timestamp_micros=0) ) - encoded_result = await self._retrieve_cell_value(target, row_key) + encoded_result = await temp_rows.retrieve_cell_value(target, row_key) int_result = int.from_bytes(encoded_result, byteorder="big") assert int_result == 10 @@ -398,15 +356,15 @@ async def test_bulk_mutations_set_cell(self, client, target, temp_rows): from google.cloud.bigtable.data.mutations import RowMutationEntry new_value = uuid.uuid4().hex.encode() - row_key, mutation = await self._create_row_and_mutation( - target, temp_rows, new_value=new_value + row_key, mutation = await temp_rows.create_row_and_mutation( + target, new_value=new_value ) bulk_mutation = RowMutationEntry(row_key, [mutation]) await target.bulk_mutate_rows([bulk_mutation]) # ensure cell is updated - assert (await self._retrieve_cell_value(target, row_key)) == new_value + assert (await temp_rows.retrieve_cell_value(target, row_key)) == new_value @CrossSync.pytest async def test_bulk_mutations_raise_exception(self, client, target): @@ -444,11 +402,11 @@ async def test_mutations_batcher_context_manager(self, client, target, temp_rows from google.cloud.bigtable.data.mutations import RowMutationEntry new_value, new_value2 = [uuid.uuid4().hex.encode() for _ in range(2)] - row_key, mutation = await self._create_row_and_mutation( - target, temp_rows, new_value=new_value + row_key, mutation = await temp_rows.create_row_and_mutation( + target, new_value=new_value ) - row_key2, mutation2 = await self._create_row_and_mutation( - target, temp_rows, new_value=new_value2 + row_key2, mutation2 = await temp_rows.create_row_and_mutation( + target, new_value=new_value2 ) bulk_mutation = RowMutationEntry(row_key, [mutation]) bulk_mutation2 = RowMutationEntry(row_key2, [mutation2]) @@ -457,7 +415,7 @@ async def test_mutations_batcher_context_manager(self, client, target, temp_rows await batcher.append(bulk_mutation) await batcher.append(bulk_mutation2) # ensure cell is updated - assert (await self._retrieve_cell_value(target, row_key)) == new_value + assert (await temp_rows.retrieve_cell_value(target, row_key)) == new_value assert len(batcher._staged_entries) == 0 @pytest.mark.usefixtures("client") @@ -473,8 +431,8 @@ async def test_mutations_batcher_timer_flush(self, client, target, temp_rows): from google.cloud.bigtable.data.mutations import RowMutationEntry new_value = uuid.uuid4().hex.encode() - row_key, mutation = await self._create_row_and_mutation( - target, temp_rows, new_value=new_value + row_key, mutation = await temp_rows.create_row_and_mutation( + target, new_value=new_value ) bulk_mutation = RowMutationEntry(row_key, [mutation]) flush_interval = 0.1 @@ -485,7 +443,7 @@ async def test_mutations_batcher_timer_flush(self, client, target, temp_rows): await CrossSync.sleep(flush_interval + 0.1) assert len(batcher._staged_entries) == 0 # ensure cell is updated - assert (await self._retrieve_cell_value(target, row_key)) == new_value + assert (await temp_rows.retrieve_cell_value(target, row_key)) == new_value @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("target") @@ -500,12 +458,12 @@ async def test_mutations_batcher_count_flush(self, client, target, temp_rows): from google.cloud.bigtable.data.mutations import RowMutationEntry new_value, new_value2 = [uuid.uuid4().hex.encode() for _ in range(2)] - row_key, mutation = await self._create_row_and_mutation( - target, temp_rows, new_value=new_value + row_key, mutation = await temp_rows.create_row_and_mutation( + target, new_value=new_value ) bulk_mutation = RowMutationEntry(row_key, [mutation]) - row_key2, mutation2 = await self._create_row_and_mutation( - target, temp_rows, new_value=new_value2 + row_key2, mutation2 = await temp_rows.create_row_and_mutation( + target, new_value=new_value2 ) bulk_mutation2 = RowMutationEntry(row_key2, [mutation2]) @@ -525,8 +483,8 @@ async def test_mutations_batcher_count_flush(self, client, target, temp_rows): assert len(batcher._staged_entries) == 0 assert len(batcher._flush_jobs) == 0 # ensure cells were updated - assert (await self._retrieve_cell_value(target, row_key)) == new_value - assert (await self._retrieve_cell_value(target, row_key2)) == new_value2 + assert (await temp_rows.retrieve_cell_value(target, row_key)) == new_value + assert (await temp_rows.retrieve_cell_value(target, row_key2)) == new_value2 @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("target") @@ -541,12 +499,12 @@ async def test_mutations_batcher_bytes_flush(self, client, target, temp_rows): from google.cloud.bigtable.data.mutations import RowMutationEntry new_value, new_value2 = [uuid.uuid4().hex.encode() for _ in range(2)] - row_key, mutation = await self._create_row_and_mutation( - target, temp_rows, new_value=new_value + row_key, mutation = await temp_rows.create_row_and_mutation( + target, new_value=new_value ) bulk_mutation = RowMutationEntry(row_key, [mutation]) - row_key2, mutation2 = await self._create_row_and_mutation( - target, temp_rows, new_value=new_value2 + row_key2, mutation2 = await temp_rows.create_row_and_mutation( + target, new_value=new_value2 ) bulk_mutation2 = RowMutationEntry(row_key2, [mutation2]) @@ -566,8 +524,8 @@ async def test_mutations_batcher_bytes_flush(self, client, target, temp_rows): # for sync version: grab result future.result() # ensure cells were updated - assert (await self._retrieve_cell_value(target, row_key)) == new_value - assert (await self._retrieve_cell_value(target, row_key2)) == new_value2 + assert (await temp_rows.retrieve_cell_value(target, row_key)) == new_value + assert (await temp_rows.retrieve_cell_value(target, row_key2)) == new_value2 @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("target") @@ -580,12 +538,12 @@ async def test_mutations_batcher_no_flush(self, client, target, temp_rows): new_value = uuid.uuid4().hex.encode() start_value = b"unchanged" - row_key, mutation = await self._create_row_and_mutation( - target, temp_rows, start_value=start_value, new_value=new_value + row_key, mutation = await temp_rows.create_row_and_mutation( + target, start_value=start_value, new_value=new_value ) bulk_mutation = RowMutationEntry(row_key, [mutation]) - row_key2, mutation2 = await self._create_row_and_mutation( - target, temp_rows, start_value=start_value, new_value=new_value + row_key2, mutation2 = await temp_rows.create_row_and_mutation( + target, start_value=start_value, new_value=new_value ) bulk_mutation2 = RowMutationEntry(row_key2, [mutation2]) @@ -602,8 +560,10 @@ async def test_mutations_batcher_no_flush(self, client, target, temp_rows): assert len(batcher._staged_entries) == 2 assert len(batcher._flush_jobs) == 0 # ensure cells were not updated - assert (await self._retrieve_cell_value(target, row_key)) == start_value - assert (await self._retrieve_cell_value(target, row_key2)) == start_value + assert (await temp_rows.retrieve_cell_value(target, row_key)) == start_value + assert ( + await temp_rows.retrieve_cell_value(target, row_key2) + ) == start_value @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("target") @@ -674,7 +634,7 @@ async def test_read_modify_write_row_increment( assert result[0].qualifier == qualifier assert int(result[0]) == expected # ensure that reading from server gives same value - assert (await self._retrieve_cell_value(target, row_key)) == result[0].value + assert (await temp_rows.retrieve_cell_value(target, row_key)) == result[0].value @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("target") @@ -714,7 +674,7 @@ async def test_read_modify_write_row_append( assert result[0].qualifier == qualifier assert result[0].value == expected # ensure that reading from server gives same value - assert (await self._retrieve_cell_value(target, row_key)) == result[0].value + assert (await temp_rows.retrieve_cell_value(target, row_key)) == result[0].value @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("target") @@ -751,7 +711,7 @@ async def test_read_modify_write_row_chained(self, client, target, temp_rows): + b"helloworld!" ) # ensure that reading from server gives same value - assert (await self._retrieve_cell_value(target, row_key)) == result[0].value + assert (await temp_rows.retrieve_cell_value(target, row_key)) == result[0].value @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("target") @@ -800,7 +760,7 @@ async def test_check_and_mutate( expected_value = ( true_mutation_value if expected_result else false_mutation_value ) - assert (await self._retrieve_cell_value(target, row_key)) == expected_value + assert (await temp_rows.retrieve_cell_value(target, row_key)) == expected_value @pytest.mark.skipif( bool(os.environ.get(BIGTABLE_EMULATOR)), diff --git a/tests/system/data/test_system_autogen.py b/tests/system/data/test_system_autogen.py index 37c00f2ae..66ca27a66 100644 --- a/tests/system/data/test_system_autogen.py +++ b/tests/system/data/test_system_autogen.py @@ -26,7 +26,7 @@ from google.cloud.environment_vars import BIGTABLE_EMULATOR from google.type import date_pb2 from google.cloud.bigtable.data._cross_sync import CrossSync -from . import TEST_FAMILY, TEST_FAMILY_2, TEST_AGGREGATE_FAMILY +from . import TEST_FAMILY, TEST_FAMILY_2, TEST_AGGREGATE_FAMILY, SystemTestRunner from google.cloud.bigtable_v2.services.bigtable.transports.grpc import ( _LoggingClientInterceptor as GapicInterceptor, ) @@ -100,8 +100,32 @@ def delete_rows(self): } self.target.client._gapic_client.mutate_rows(request) + def retrieve_cell_value(self, target, row_key): + """Helper to read an individual row""" + from google.cloud.bigtable.data import ReadRowsQuery + + row_list = target.read_rows(ReadRowsQuery(row_keys=row_key)) + assert len(row_list) == 1 + row = row_list[0] + cell = row.cells[0] + return cell.value -class TestSystem: + def create_row_and_mutation( + self, table, *, start_value=b"start", new_value=b"new_value" + ): + """Helper to create a new row, and a sample set_cell mutation to change its value""" + from google.cloud.bigtable.data.mutations import SetCell + + row_key = uuid.uuid4().hex.encode() + family = TEST_FAMILY + qualifier = b"test-qualifier" + self.add_row(row_key, family=family, qualifier=qualifier, value=start_value) + assert self.retrieve_cell_value(table, row_key) == start_value + mutation = SetCell(family=TEST_FAMILY, qualifier=qualifier, new_value=new_value) + return (row_key, mutation) + + +class TestSystem(SystemTestRunner): def _make_client(self): project = os.getenv("GOOGLE_CLOUD_PROJECT") or None return CrossSync._Sync_Impl.DataClient(project=project) @@ -127,67 +151,6 @@ def target(self, client, table_id, authorized_view_id, instance_id, request): else: raise ValueError(f"unknown target type: {request.param}") - @pytest.fixture(scope="session") - def column_family_config(self): - """specify column families to create when creating a new test table""" - from google.cloud.bigtable_admin_v2 import types - - int_aggregate_type = types.Type.Aggregate( - input_type=types.Type(int64_type={"encoding": {"big_endian_bytes": {}}}), - sum={}, - ) - return { - TEST_FAMILY: types.ColumnFamily(), - TEST_FAMILY_2: types.ColumnFamily(), - TEST_AGGREGATE_FAMILY: types.ColumnFamily( - value_type=types.Type(aggregate_type=int_aggregate_type) - ), - } - - @pytest.fixture(scope="session") - def init_table_id(self): - """The table_id to use when creating a new test table""" - return f"test-table-{uuid.uuid4().hex}" - - @pytest.fixture(scope="session") - def cluster_config(self, project_id): - """Configuration for the clusters to use when creating a new instance""" - from google.cloud.bigtable_admin_v2 import types - - cluster = { - "test-cluster": types.Cluster( - location=f"projects/{project_id}/locations/us-central1-b", serve_nodes=1 - ) - } - return cluster - - @pytest.mark.usefixtures("target") - def _retrieve_cell_value(self, target, row_key): - """Helper to read an individual row""" - from google.cloud.bigtable.data import ReadRowsQuery - - row_list = target.read_rows(ReadRowsQuery(row_keys=row_key)) - assert len(row_list) == 1 - row = row_list[0] - cell = row.cells[0] - return cell.value - - def _create_row_and_mutation( - self, table, temp_rows, *, start_value=b"start", new_value=b"new_value" - ): - """Helper to create a new row, and a sample set_cell mutation to change its value""" - from google.cloud.bigtable.data.mutations import SetCell - - row_key = uuid.uuid4().hex.encode() - family = TEST_FAMILY - qualifier = b"test-qualifier" - temp_rows.add_row( - row_key, family=family, qualifier=qualifier, value=start_value - ) - assert self._retrieve_cell_value(table, row_key) == start_value - mutation = SetCell(family=TEST_FAMILY, qualifier=qualifier, new_value=new_value) - return (row_key, mutation) - @pytest.fixture(scope="function") def temp_rows(self, target): builder = CrossSync._Sync_Impl.TempRowBuilder(target) @@ -260,11 +223,11 @@ def test_mutation_set_cell(self, target, temp_rows): """Ensure cells can be set properly""" row_key = b"bulk_mutate" new_value = uuid.uuid4().hex.encode() - (row_key, mutation) = self._create_row_and_mutation( - target, temp_rows, new_value=new_value + (row_key, mutation) = temp_rows.create_row_and_mutation( + target, new_value=new_value ) target.mutate_row(row_key, mutation) - assert self._retrieve_cell_value(target, row_key) == new_value + assert temp_rows.retrieve_cell_value(target, row_key) == new_value @pytest.mark.usefixtures("target") @CrossSync._Sync_Impl.Retry( @@ -279,11 +242,11 @@ def test_mutation_add_to_cell(self, target, temp_rows): qualifier = b"test-qualifier" temp_rows.add_aggregate_row(row_key, family=family, qualifier=qualifier) target.mutate_row(row_key, AddToCell(family, qualifier, 1, timestamp_micros=0)) - encoded_result = self._retrieve_cell_value(target, row_key) + encoded_result = temp_rows.retrieve_cell_value(target, row_key) int_result = int.from_bytes(encoded_result, byteorder="big") assert int_result == 1 target.mutate_row(row_key, AddToCell(family, qualifier, 9, timestamp_micros=0)) - encoded_result = self._retrieve_cell_value(target, row_key) + encoded_result = temp_rows.retrieve_cell_value(target, row_key) int_result = int.from_bytes(encoded_result, byteorder="big") assert int_result == 10 @@ -314,12 +277,12 @@ def test_bulk_mutations_set_cell(self, client, target, temp_rows): from google.cloud.bigtable.data.mutations import RowMutationEntry new_value = uuid.uuid4().hex.encode() - (row_key, mutation) = self._create_row_and_mutation( - target, temp_rows, new_value=new_value + (row_key, mutation) = temp_rows.create_row_and_mutation( + target, new_value=new_value ) bulk_mutation = RowMutationEntry(row_key, [mutation]) target.bulk_mutate_rows([bulk_mutation]) - assert self._retrieve_cell_value(target, row_key) == new_value + assert temp_rows.retrieve_cell_value(target, row_key) == new_value def test_bulk_mutations_raise_exception(self, client, target): """If an invalid mutation is passed, an exception should be raised""" @@ -350,18 +313,18 @@ def test_mutations_batcher_context_manager(self, client, target, temp_rows): from google.cloud.bigtable.data.mutations import RowMutationEntry (new_value, new_value2) = [uuid.uuid4().hex.encode() for _ in range(2)] - (row_key, mutation) = self._create_row_and_mutation( - target, temp_rows, new_value=new_value + (row_key, mutation) = temp_rows.create_row_and_mutation( + target, new_value=new_value ) - (row_key2, mutation2) = self._create_row_and_mutation( - target, temp_rows, new_value=new_value2 + (row_key2, mutation2) = temp_rows.create_row_and_mutation( + target, new_value=new_value2 ) bulk_mutation = RowMutationEntry(row_key, [mutation]) bulk_mutation2 = RowMutationEntry(row_key2, [mutation2]) with target.mutations_batcher() as batcher: batcher.append(bulk_mutation) batcher.append(bulk_mutation2) - assert self._retrieve_cell_value(target, row_key) == new_value + assert temp_rows.retrieve_cell_value(target, row_key) == new_value assert len(batcher._staged_entries) == 0 @pytest.mark.usefixtures("client") @@ -374,8 +337,8 @@ def test_mutations_batcher_timer_flush(self, client, target, temp_rows): from google.cloud.bigtable.data.mutations import RowMutationEntry new_value = uuid.uuid4().hex.encode() - (row_key, mutation) = self._create_row_and_mutation( - target, temp_rows, new_value=new_value + (row_key, mutation) = temp_rows.create_row_and_mutation( + target, new_value=new_value ) bulk_mutation = RowMutationEntry(row_key, [mutation]) flush_interval = 0.1 @@ -385,7 +348,7 @@ def test_mutations_batcher_timer_flush(self, client, target, temp_rows): assert len(batcher._staged_entries) == 1 CrossSync._Sync_Impl.sleep(flush_interval + 0.1) assert len(batcher._staged_entries) == 0 - assert self._retrieve_cell_value(target, row_key) == new_value + assert temp_rows.retrieve_cell_value(target, row_key) == new_value @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("target") @@ -397,12 +360,12 @@ def test_mutations_batcher_count_flush(self, client, target, temp_rows): from google.cloud.bigtable.data.mutations import RowMutationEntry (new_value, new_value2) = [uuid.uuid4().hex.encode() for _ in range(2)] - (row_key, mutation) = self._create_row_and_mutation( - target, temp_rows, new_value=new_value + (row_key, mutation) = temp_rows.create_row_and_mutation( + target, new_value=new_value ) bulk_mutation = RowMutationEntry(row_key, [mutation]) - (row_key2, mutation2) = self._create_row_and_mutation( - target, temp_rows, new_value=new_value2 + (row_key2, mutation2) = temp_rows.create_row_and_mutation( + target, new_value=new_value2 ) bulk_mutation2 = RowMutationEntry(row_key2, [mutation2]) with target.mutations_batcher(flush_limit_mutation_count=2) as batcher: @@ -416,8 +379,8 @@ def test_mutations_batcher_count_flush(self, client, target, temp_rows): future.result() assert len(batcher._staged_entries) == 0 assert len(batcher._flush_jobs) == 0 - assert self._retrieve_cell_value(target, row_key) == new_value - assert self._retrieve_cell_value(target, row_key2) == new_value2 + assert temp_rows.retrieve_cell_value(target, row_key) == new_value + assert temp_rows.retrieve_cell_value(target, row_key2) == new_value2 @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("target") @@ -429,12 +392,12 @@ def test_mutations_batcher_bytes_flush(self, client, target, temp_rows): from google.cloud.bigtable.data.mutations import RowMutationEntry (new_value, new_value2) = [uuid.uuid4().hex.encode() for _ in range(2)] - (row_key, mutation) = self._create_row_and_mutation( - target, temp_rows, new_value=new_value + (row_key, mutation) = temp_rows.create_row_and_mutation( + target, new_value=new_value ) bulk_mutation = RowMutationEntry(row_key, [mutation]) - (row_key2, mutation2) = self._create_row_and_mutation( - target, temp_rows, new_value=new_value2 + (row_key2, mutation2) = temp_rows.create_row_and_mutation( + target, new_value=new_value2 ) bulk_mutation2 = RowMutationEntry(row_key2, [mutation2]) flush_limit = bulk_mutation.size() + bulk_mutation2.size() - 1 @@ -448,8 +411,8 @@ def test_mutations_batcher_bytes_flush(self, client, target, temp_rows): for future in list(batcher._flush_jobs): future future.result() - assert self._retrieve_cell_value(target, row_key) == new_value - assert self._retrieve_cell_value(target, row_key2) == new_value2 + assert temp_rows.retrieve_cell_value(target, row_key) == new_value + assert temp_rows.retrieve_cell_value(target, row_key2) == new_value2 @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("target") @@ -459,12 +422,12 @@ def test_mutations_batcher_no_flush(self, client, target, temp_rows): new_value = uuid.uuid4().hex.encode() start_value = b"unchanged" - (row_key, mutation) = self._create_row_and_mutation( - target, temp_rows, start_value=start_value, new_value=new_value + (row_key, mutation) = temp_rows.create_row_and_mutation( + target, start_value=start_value, new_value=new_value ) bulk_mutation = RowMutationEntry(row_key, [mutation]) - (row_key2, mutation2) = self._create_row_and_mutation( - target, temp_rows, start_value=start_value, new_value=new_value + (row_key2, mutation2) = temp_rows.create_row_and_mutation( + target, start_value=start_value, new_value=new_value ) bulk_mutation2 = RowMutationEntry(row_key2, [mutation2]) size_limit = bulk_mutation.size() + bulk_mutation2.size() + 1 @@ -478,8 +441,8 @@ def test_mutations_batcher_no_flush(self, client, target, temp_rows): CrossSync._Sync_Impl.yield_to_event_loop() assert len(batcher._staged_entries) == 2 assert len(batcher._flush_jobs) == 0 - assert self._retrieve_cell_value(target, row_key) == start_value - assert self._retrieve_cell_value(target, row_key2) == start_value + assert temp_rows.retrieve_cell_value(target, row_key) == start_value + assert temp_rows.retrieve_cell_value(target, row_key2) == start_value @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("target") @@ -537,7 +500,7 @@ def test_read_modify_write_row_increment( assert result[0].family == family assert result[0].qualifier == qualifier assert int(result[0]) == expected - assert self._retrieve_cell_value(target, row_key) == result[0].value + assert temp_rows.retrieve_cell_value(target, row_key) == result[0].value @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("target") @@ -570,7 +533,7 @@ def test_read_modify_write_row_append( assert result[0].family == family assert result[0].qualifier == qualifier assert result[0].value == expected - assert self._retrieve_cell_value(target, row_key) == result[0].value + assert temp_rows.retrieve_cell_value(target, row_key) == result[0].value @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("target") @@ -602,7 +565,7 @@ def test_read_modify_write_row_chained(self, client, target, temp_rows): == (start_amount + increment_amount).to_bytes(8, "big", signed=True) + b"helloworld!" ) - assert self._retrieve_cell_value(target, row_key) == result[0].value + assert temp_rows.retrieve_cell_value(target, row_key) == result[0].value @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("target") @@ -640,7 +603,7 @@ def test_check_and_mutate( expected_value = ( true_mutation_value if expected_result else false_mutation_value ) - assert self._retrieve_cell_value(target, row_key) == expected_value + assert temp_rows.retrieve_cell_value(target, row_key) == expected_value @pytest.mark.skipif( bool(os.environ.get(BIGTABLE_EMULATOR)), diff --git a/tests/unit/data/_async/test_client.py b/tests/unit/data/_async/test_client.py index 0fc68af7b..db7bf0d5d 100644 --- a/tests/unit/data/_async/test_client.py +++ b/tests/unit/data/_async/test_client.py @@ -1400,9 +1400,15 @@ async def test_customizable_retryable_errors( predicate_builder_mock.assert_called_once_with( *expected_retryables, *extra_retryables ) - retry_call_args = retry_fn_mock.call_args_list[0].args # output of if_exception_type should be sent in to retry constructor - assert retry_call_args[1] is expected_predicate + retry_call_kwargs = retry_fn_mock.call_args_list[0].kwargs + # check for predicate passed as kwarg + if "predicate" in retry_call_kwargs: + assert retry_call_kwargs["predicate"] is expected_predicate + else: + # check for predicate passed as arg + retry_call_args = retry_fn_mock.call_args_list[0].args + assert retry_call_args[1] is expected_predicate @pytest.mark.parametrize( "fn_name,fn_args,gapic_fn", diff --git a/tests/unit/data/_sync_autogen/test_client.py b/tests/unit/data/_sync_autogen/test_client.py index 5a8b20407..9c30946c5 100644 --- a/tests/unit/data/_sync_autogen/test_client.py +++ b/tests/unit/data/_sync_autogen/test_client.py @@ -1120,8 +1120,12 @@ def test_customizable_retryable_errors( predicate_builder_mock.assert_called_once_with( *expected_retryables, *extra_retryables ) - retry_call_args = retry_fn_mock.call_args_list[0].args - assert retry_call_args[1] is expected_predicate + retry_call_kwargs = retry_fn_mock.call_args_list[0].kwargs + if "predicate" in retry_call_kwargs: + assert retry_call_kwargs["predicate"] is expected_predicate + else: + retry_call_args = retry_fn_mock.call_args_list[0].args + assert retry_call_args[1] is expected_predicate @pytest.mark.parametrize( "fn_name,fn_args,gapic_fn",