diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index 35fe42814..85522e24a 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -1556,6 +1556,33 @@ async def mutate_row( exception_factory=_retry_exception_factory, ) + @CrossSync.convert + def _get_mutate_rows_operation( + self, + mutation_entries: list[RowMutationEntry], + *, + operation_timeout: float | TABLE_DEFAULT, + attempt_timeout: float | None | TABLE_DEFAULT, + retryable_errors: Sequence[type[Exception]] | TABLE_DEFAULT, + ) -> CrossSync._MutateRowsOperation: + """ + Gets the bulk mutate rows operation object for the given mutation entries. + """ + operation_timeout, attempt_timeout = _get_timeouts( + operation_timeout, attempt_timeout, self + ) + retryable_excs = _get_retryable_errors(retryable_errors, self) + + operation = CrossSync._MutateRowsOperation( + self.client._gapic_client, + self, + mutation_entries, + operation_timeout, + attempt_timeout, + retryable_exceptions=retryable_excs, + ) + return operation + @CrossSync.convert async def bulk_mutate_rows( self, @@ -1597,18 +1624,11 @@ async def bulk_mutate_rows( Contains details about any failed entries in .exceptions ValueError: if invalid arguments are provided """ - operation_timeout, attempt_timeout = _get_timeouts( - operation_timeout, attempt_timeout, self - ) - retryable_excs = _get_retryable_errors(retryable_errors, self) - - operation = CrossSync._MutateRowsOperation( - self.client._gapic_client, - self, + operation = self._get_mutate_rows_operation( mutation_entries, - operation_timeout, - attempt_timeout, - retryable_exceptions=retryable_excs, + operation_timeout=operation_timeout, + attempt_timeout=attempt_timeout, + retryable_errors=retryable_errors, ) await operation.start() diff --git a/google/cloud/bigtable/data/_sync_autogen/client.py b/google/cloud/bigtable/data/_sync_autogen/client.py index 88136ddad..dc95b6f96 100644 --- a/google/cloud/bigtable/data/_sync_autogen/client.py +++ b/google/cloud/bigtable/data/_sync_autogen/client.py @@ -1299,6 +1299,29 @@ def mutate_row( exception_factory=_retry_exception_factory, ) + def _get_mutate_rows_operation( + self, + mutation_entries: list[RowMutationEntry], + *, + operation_timeout: float | TABLE_DEFAULT, + attempt_timeout: float | None | TABLE_DEFAULT, + retryable_errors: Sequence[type[Exception]] | TABLE_DEFAULT, + ) -> CrossSync._Sync_Impl._MutateRowsOperation: + """Gets the bulk mutate rows operation object for the given mutation entries.""" + (operation_timeout, attempt_timeout) = _get_timeouts( + operation_timeout, attempt_timeout, self + ) + retryable_excs = _get_retryable_errors(retryable_errors, self) + operation = CrossSync._Sync_Impl._MutateRowsOperation( + self.client._gapic_client, + self, + mutation_entries, + operation_timeout, + attempt_timeout, + retryable_exceptions=retryable_excs, + ) + return operation + def bulk_mutate_rows( self, mutation_entries: list[RowMutationEntry], @@ -1337,17 +1360,11 @@ def bulk_mutate_rows( MutationsExceptionGroup: if one or more mutations fails Contains details about any failed entries in .exceptions ValueError: if invalid arguments are provided""" - (operation_timeout, attempt_timeout) = _get_timeouts( - operation_timeout, attempt_timeout, self - ) - retryable_excs = _get_retryable_errors(retryable_errors, self) - operation = CrossSync._Sync_Impl._MutateRowsOperation( - self.client._gapic_client, - self, + operation = self._get_mutate_rows_operation( mutation_entries, - operation_timeout, - attempt_timeout, - retryable_exceptions=retryable_excs, + operation_timeout=operation_timeout, + attempt_timeout=attempt_timeout, + retryable_errors=retryable_errors, ) operation.start() diff --git a/google/cloud/bigtable/row.py b/google/cloud/bigtable/row.py index 2b02c2f9f..66af79f84 100644 --- a/google/cloud/bigtable/row.py +++ b/google/cloud/bigtable/row.py @@ -153,15 +153,16 @@ def _set_cell(self, column_family_id, column, value, timestamp=None, state=None) integer (8 bytes). :type timestamp: :class:`datetime.datetime` - :param timestamp: (Optional) The timestamp of the operation. + :param timestamp: (Optional) The timestamp of the operation. If a + timestamp is not provided, the current system time + will be used. :type state: bool :param state: (Optional) The state that is passed along to :meth:`_get_mutations`. """ - if timestamp is None: - # Use current Bigtable server time. - timestamp_micros = mutations._SERVER_SIDE_TIMESTAMP + if timestamp is None or timestamp == mutations._SERVER_SIDE_TIMESTAMP: + timestamp_micros = timestamp else: timestamp_micros = _microseconds_from_datetime(timestamp) # Truncate to millisecond granularity. @@ -351,7 +352,9 @@ def set_cell(self, column_family_id, column, value, timestamp=None): integer (8 bytes). :type timestamp: :class:`datetime.datetime` - :param timestamp: (Optional) The timestamp of the operation. + :param timestamp: (Optional) The timestamp of the operation. If a + timestamp is not provided, the current system time + will be used. """ self._set_cell(column_family_id, column, value, timestamp=timestamp, state=None) @@ -651,7 +654,9 @@ def set_cell(self, column_family_id, column, value, timestamp=None, state=True): integer (8 bytes). :type timestamp: :class:`datetime.datetime` - :param timestamp: (Optional) The timestamp of the operation. + :param timestamp: (Optional) The timestamp of the operation. If a + timestamp is not provided, the current system time + will be used. :type state: bool :param state: (Optional) The state that the mutation should be diff --git a/google/cloud/bigtable/table.py b/google/cloud/bigtable/table.py index 39e02e4c3..654d87201 100644 --- a/google/cloud/bigtable/table.py +++ b/google/cloud/bigtable/table.py @@ -17,11 +17,10 @@ from typing import Set import warnings -from google.api_core import timeout +from google.api_core.exceptions import GoogleAPICallError from google.api_core.exceptions import Aborted from google.api_core.exceptions import DeadlineExceeded from google.api_core.exceptions import NotFound -from google.api_core.exceptions import RetryError from google.api_core.exceptions import ServiceUnavailable from google.api_core.exceptions import InternalServerError from google.api_core.gapic_v1.method import DEFAULT @@ -31,6 +30,9 @@ from google.cloud.bigtable.backup import Backup from google.cloud.bigtable.column_family import _gc_rule_from_pb from google.cloud.bigtable.column_family import ColumnFamily +from google.cloud.bigtable.data._helpers import TABLE_DEFAULT +from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup +from google.cloud.bigtable.data.mutations import RowMutationEntry from google.cloud.bigtable.batcher import MutationsBatcher from google.cloud.bigtable.batcher import FLUSH_COUNT, MAX_MUTATION_SIZE from google.cloud.bigtable.encryption_info import EncryptionInfo @@ -38,10 +40,7 @@ from google.cloud.bigtable.row import AppendRow from google.cloud.bigtable.row import ConditionalRow from google.cloud.bigtable.row import DirectRow -from google.cloud.bigtable.row_data import ( - PartialRowsData, - _retriable_internal_server_error, -) +from google.cloud.bigtable.row_data import PartialRowsData from google.cloud.bigtable.row_data import DEFAULT_RETRY_READ_ROWS from google.cloud.bigtable.row_set import RowSet from google.cloud.bigtable.row_set import RowRange @@ -52,6 +51,7 @@ from google.cloud.bigtable.admin.types import ( bigtable_table_admin as table_admin_messages_v2_pb2, ) +from google.rpc import code_pb2, status_pb2 # Maximum number of mutations in bulk (MutateRowsRequest message): # (https://cloud.google.com/bigtable/docs/reference/data/rpc/ @@ -714,6 +714,9 @@ def mutate_rows(self, rows, retry=DEFAULT_RETRY, timeout=DEFAULT): specify a ``retry`` strategy of "do-nothing", a deadline of ``0.0`` can be specified. + If a deadline of ``None`` is specified, the deadline defaults to + a table-default of 600 seconds (10 minutes). + :type rows: list :param rows: List or other iterable of :class:`.DirectRow` instances. @@ -731,18 +734,66 @@ def mutate_rows(self, rows, retry=DEFAULT_RETRY, timeout=DEFAULT): :returns: A list of response statuses (`google.rpc.status_pb2.Status`) corresponding to success or failure of each row mutation sent. These will be in the same order as the `rows`. + + :raise: ValueError: If a row entry has no mutations, or too many mutations """ if timeout is DEFAULT: timeout = self.mutation_timeout - retryable_mutate_rows = _RetryableMutateRowsWorker( - self._instance._client, - self.name, - rows, - app_profile_id=self._app_profile_id, - timeout=timeout, + # To adhere to the retry strategy of do-nothing being achievable with a deadline + # of 0.0, we modify the retryable errors to be empty if such a deadline is passed. + retryable_errors = RETRYABLE_MUTATION_ERRORS + operation_timeout = retry.deadline + + # The data client cannot take in zero or null values for deadline, so we set it to + # the default if that is the case. It shouldn't affect the behavior of the retry + # if a 0.0 deadline is set. + if not retry.deadline: + operation_timeout = TABLE_DEFAULT.MUTATE_ROWS + if retry.deadline == 0.0: + retryable_errors = [] + + attempt_timeout = timeout + mutation_entries = [ + RowMutationEntry(row.row_key, row._get_mutations()) for row in rows + ] + return_statuses = [status_pb2.Status(code=code_pb2.Code.OK)] * len( + mutation_entries + ) # By default, return status OKs for everything + + operation = self._table_impl._get_mutate_rows_operation( + mutation_entries, + operation_timeout=operation_timeout, + attempt_timeout=attempt_timeout, + retryable_errors=retryable_errors, ) - return retryable_mutate_rows(retry=retry) + + try: + operation.start() + except MutationsExceptionGroup: + # Take the first exception for each error index with a gRPC status code + # and set the status of that row entry to that grpc status. if none of the + # errors for a given index have gRPC status codes, return an UNKNOWN status + # with the first error message of each index. + for idx, errors in operation.errors.items(): + return_statuses[idx] = status_pb2.Status( + code=code_pb2.Code.UNKNOWN, + message=str(errors[0]), + ) + + for error in errors: + if ( + isinstance(error, GoogleAPICallError) + and error.grpc_status_code is not None + ): + return_statuses[idx] = status_pb2.Status( + code=error.grpc_status_code.value[0], + message=error.message, + details=error.details, + ) + break + + return return_statuses def sample_row_keys(self): """Read a sample of row keys in the table. @@ -1070,133 +1121,6 @@ def restore(self, new_table_id, cluster_id=None, backup_id=None, backup_name=Non ) -class _RetryableMutateRowsWorker(object): - """A callable worker that can retry to mutate rows with transient errors. - - This class is a callable that can retry mutating rows that result in - transient errors. After all rows are successful or none of the rows - are retryable, any subsequent call on this callable will be a no-op. - """ - - def __init__(self, client, table_name, rows, app_profile_id=None, timeout=None): - self.client = client - self.table_name = table_name - self.rows = rows - self.app_profile_id = app_profile_id - self.responses_statuses = [None] * len(self.rows) - self.timeout = timeout - - def __call__(self, retry=DEFAULT_RETRY): - """Attempt to mutate all rows and retry rows with transient errors. - - Will retry the rows with transient errors until all rows succeed or - ``deadline`` specified in the `retry` is reached. - - :rtype: list - :returns: A list of response statuses (`google.rpc.status_pb2.Status`) - corresponding to success or failure of each row mutation - sent. These will be in the same order as the ``rows``. - """ - mutate_rows = self._do_mutate_retryable_rows - if retry: - mutate_rows = retry(self._do_mutate_retryable_rows) - - try: - mutate_rows() - except (_BigtableRetryableError, RetryError): - # - _BigtableRetryableError raised when no retry strategy is used - # and a retryable error on a mutation occurred. - # - RetryError raised when retry deadline is reached. - # In both cases, just return current `responses_statuses`. - pass - - return self.responses_statuses - - @staticmethod - def _is_retryable(status): - return status is None or status.code in RETRYABLE_CODES - - def _do_mutate_retryable_rows(self): - """Mutate all the rows that are eligible for retry. - - A row is eligible for retry if it has not been tried or if it resulted - in a transient error in a previous call. - - :rtype: list - :return: The responses statuses, which is a list of - :class:`~google.rpc.status_pb2.Status`. - :raises: One of the following: - - * :exc:`~.table._BigtableRetryableError` if any - row returned a transient error. - * :exc:`RuntimeError` if the number of responses doesn't - match the number of rows that were retried - """ - retryable_rows = [] - index_into_all_rows = [] - for index, status in enumerate(self.responses_statuses): - if self._is_retryable(status): - retryable_rows.append(self.rows[index]) - index_into_all_rows.append(index) - - if not retryable_rows: - # All mutations are either successful or non-retryable now. - return self.responses_statuses - - entries = _compile_mutation_entries(self.table_name, retryable_rows) - data_client = self.client.table_data_client - - kwargs = {} - if self.timeout is not None: - kwargs["timeout"] = timeout.ExponentialTimeout(deadline=self.timeout) - - try: - responses = data_client.mutate_rows( - table_name=self.table_name, - entries=entries, - app_profile_id=self.app_profile_id, - retry=None, - **kwargs - ) - except RETRYABLE_MUTATION_ERRORS as exc: - # If an exception, considered retryable by `RETRYABLE_MUTATION_ERRORS`, is - # returned from the initial call, consider - # it to be retryable. Wrap as a Bigtable Retryable Error. - # For InternalServerError, it is only retriable if the message is related to RST Stream messages - if _retriable_internal_server_error(exc) or not isinstance( - exc, InternalServerError - ): - raise _BigtableRetryableError - else: - # re-raise the original exception - raise - - num_responses = 0 - num_retryable_responses = 0 - for response in responses: - for entry in response.entries: - num_responses += 1 - index = index_into_all_rows[entry.index] - self.responses_statuses[index] = entry.status - if self._is_retryable(entry.status): - num_retryable_responses += 1 - if entry.status.code == 0: - self.rows[index].clear() - - if len(retryable_rows) != num_responses: - raise RuntimeError( - "Unexpected number of responses", - num_responses, - "Expected", - len(retryable_rows), - ) - - if num_retryable_responses: - raise _BigtableRetryableError - - return self.responses_statuses - - class ClusterState(object): """Representation of a Cluster State. @@ -1343,73 +1267,3 @@ def _create_row_request( row_set._update_message_request(message) return message - - -def _compile_mutation_entries(table_name, rows): - """Create list of mutation entries - - :type table_name: str - :param table_name: The name of the table to write to. - - :type rows: list - :param rows: List or other iterable of :class:`.DirectRow` instances. - - :rtype: List[:class:`data_messages_v2_pb2.MutateRowsRequest.Entry`] - :returns: entries corresponding to the inputs. - :raises: :exc:`~.table.TooManyMutationsError` if the number of mutations is - greater than the max ({}) - """.format( - _MAX_BULK_MUTATIONS - ) - entries = [] - mutations_count = 0 - entry_klass = data_messages_v2_pb2.MutateRowsRequest.Entry - - for row in rows: - _check_row_table_name(table_name, row) - _check_row_type(row) - mutations = row._get_mutation_pbs() - entries.append(entry_klass(row_key=row.row_key, mutations=mutations)) - mutations_count += len(mutations) - - if mutations_count > _MAX_BULK_MUTATIONS: - raise TooManyMutationsError( - "Maximum number of mutations is %s" % (_MAX_BULK_MUTATIONS,) - ) - return entries - - -def _check_row_table_name(table_name, row): - """Checks that a row belongs to a table. - - :type table_name: str - :param table_name: The name of the table. - - :type row: :class:`~google.cloud.bigtable.row.Row` - :param row: An instance of :class:`~google.cloud.bigtable.row.Row` - subclasses. - - :raises: :exc:`~.table.TableMismatchError` if the row does not belong to - the table. - """ - if row.table is not None and row.table.name != table_name: - raise TableMismatchError( - "Row %s is a part of %s table. Current table: %s" - % (row.row_key, row.table.name, table_name) - ) - - -def _check_row_type(row): - """Checks that a row is an instance of :class:`.DirectRow`. - - :type row: :class:`~google.cloud.bigtable.row.Row` - :param row: An instance of :class:`~google.cloud.bigtable.row.Row` - subclasses. - - :raises: :class:`TypeError ` if the row is not an - instance of DirectRow. - """ - if not isinstance(row, DirectRow): - raise TypeError( - "Bulk processing can not be applied for " "conditional or append mutations." - ) diff --git a/tests/system/v2_client/test_data_api.py b/tests/system/v2_client/test_data_api.py index 3c0e1cbbf..b5ab5e9b9 100644 --- a/tests/system/v2_client/test_data_api.py +++ b/tests/system/v2_client/test_data_api.py @@ -279,39 +279,10 @@ def test_table_mutate_rows(data_table, rows_to_delete): assert row2_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value == CELL_VAL4 -def _add_test_error_handler(retry): - """Overwrites the current on_error function to assert that backoff values are within expected bounds.""" - import time - - curr_time = time.monotonic() - times_triggered = 0 - - # Assert that the retry handler works properly. - def test_error_handler(exc): - nonlocal curr_time, times_triggered, retry - next_time = time.monotonic() - if times_triggered >= 1: - gap = next_time - curr_time - - # Exponential backoff = uniform randomness from 0 to max_gap - max_gap = min( - retry._initial * retry._multiplier**times_triggered, - retry._maximum, - ) - assert gap <= max_gap + GAP_MARGIN_OF_ERROR - times_triggered += 1 - curr_time = next_time - - retry._on_error = test_error_handler - - def test_table_mutate_rows_retries_timeout(data_table, rows_to_delete): import mock - import copy - from google.api_core import retry as retries - from google.api_core.exceptions import InvalidArgument from google.cloud.bigtable_v2 import MutateRowsResponse - from google.cloud.bigtable.table import DEFAULT_RETRY, _BigtableRetryableError + from google.cloud.bigtable.table import DEFAULT_RETRY from google.rpc.code_pb2 import Code from google.rpc.status_pb2 import Status @@ -365,10 +336,7 @@ def test_table_mutate_rows_retries_timeout(data_table, rows_to_delete): rows_to_delete.append(row_2) row_2.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) - # Testing the default retry - default_retry_copy = copy.copy(DEFAULT_RETRY) - _add_test_error_handler(default_retry_copy) - statuses = data_table.mutate_rows([row, row_2], retry=default_retry_copy) + statuses = data_table.mutate_rows([row, row_2]) assert statuses[0].code == Code.OK assert statuses[1].code == Code.OK @@ -388,28 +356,35 @@ def test_table_mutate_rows_retries_timeout(data_table, rows_to_delete): rows_to_delete.append(row_2) row_2.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) - # Testing the default retry - default_retry_copy = copy.copy(DEFAULT_RETRY) - _add_test_error_handler(default_retry_copy) - statuses = data_table.mutate_rows([row, row_2], retry=default_retry_copy) + statuses = data_table.mutate_rows([row, row_2]) assert statuses[0].code == Code.OK assert statuses[1].code == Code.INTERNAL - # Because of the way the retriable mutate worker class works, unusual things can happen - # when passing in custom retry predicates. - row = data_table.direct_row(ROW_KEY) - rows_to_delete.append(row) + # Retries with deadline 0 should do nothing. + with mock.patch.object( + data_table._instance._client.table_data_client, "mutate_rows" + ) as mutate_mock: + mutate_mock.side_effect = [ + initial_error_response, + followup_error_response, + followup_error_response, + final_success_response, + ] - row_2 = data_table.direct_row(ROW_KEY_ALT) - rows_to_delete.append(row_2) + row = data_table.direct_row(ROW_KEY) + rows_to_delete.append(row) + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) - retry = DEFAULT_RETRY.with_predicate( - retries.if_exception_type(_BigtableRetryableError, InvalidArgument) - ) - _add_test_error_handler(retry) - statuses = data_table.mutate_rows([row, row_2], retry=retry) - assert statuses[0] is None - assert statuses[1] is None + row_2 = data_table.direct_row(ROW_KEY_ALT) + rows_to_delete.append(row_2) + row_2.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) + + do_nothing_retry = DEFAULT_RETRY.with_deadline(0.0) + + statuses = data_table.mutate_rows([row, row_2], retry=do_nothing_retry) + assert statuses[0].code == Code.OK + assert statuses[1].code == Code.INTERNAL + mutate_mock.assert_called_once() def _populate_table( @@ -497,25 +472,23 @@ def test_table_mutate_rows_integers(data_table, rows_to_delete): def test_table_mutate_rows_input_errors(data_table, rows_to_delete): - from google.api_core.exceptions import InvalidArgument - from google.cloud.bigtable.table import TooManyMutationsError, _MAX_BULK_MUTATIONS + from google.cloud.bigtable.table import _MAX_BULK_MUTATIONS row = data_table.direct_row(ROW_KEY) rows_to_delete.append(row) - # Mutate row with 0 mutations gives an API error from the service, not - # from the client library. - with pytest.raises(InvalidArgument): + # Mutate row with 0 mutations gives a ValueError from the client library. + with pytest.raises(ValueError): data_table.mutate_rows([row]) row.clear() - # Mutate row with >100k mutations gives a TooManyMutationsError from the + # Mutate row with >100k mutations gives a ValueError from the # client library. for _ in range(0, _MAX_BULK_MUTATIONS + 1): row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1) - with pytest.raises(TooManyMutationsError): + with pytest.raises(ValueError): data_table.mutate_rows([row]) diff --git a/tests/unit/v2_client/test_row.py b/tests/unit/v2_client/test_row.py index 20edfc254..4c98364a2 100644 --- a/tests/unit/v2_client/test_row.py +++ b/tests/unit/v2_client/test_row.py @@ -20,6 +20,11 @@ _INSTANCE_ID = "test-instance" +_TIME_NS_RETURN_VALUE = 1234567890 +_EXPECTED_DEFAULT_TIMESTAMP_MICROS = _TIME_NS_RETURN_VALUE // 1000 +_EXPECTED_DEFAULT_TIMESTAMP_MICROS = _EXPECTED_DEFAULT_TIMESTAMP_MICROS - ( + _EXPECTED_DEFAULT_TIMESTAMP_MICROS % 1000 +) def _make_client(*args, **kwargs): @@ -140,7 +145,7 @@ def _set_cell_helper( column_bytes=None, value=b"foobar", timestamp=None, - timestamp_micros=-1, + timestamp_micros=_EXPECTED_DEFAULT_TIMESTAMP_MICROS, ): import struct @@ -153,18 +158,20 @@ def _set_cell_helper( table = object() row = _make_direct_row(row_key, table) assert row._mutations == [] - row.set_cell(column_family_id, column, value, timestamp=timestamp) - if isinstance(value, int): - value = struct.pack(">q", value) - expected_mutation = SetCell( - family=column_family_id, - qualifier=column_bytes or column, - new_value=value, - timestamp_micros=timestamp_micros, - ) + with mock.patch("time.time_ns", return_value=_TIME_NS_RETURN_VALUE): + row.set_cell(column_family_id, column, value, timestamp=timestamp) - _assert_mutations_equal(row._mutations, [expected_mutation]) + if isinstance(value, int): + value = struct.pack(">q", value) + expected_mutation = SetCell( + family=column_family_id, + qualifier=column_bytes or column, + new_value=value, + timestamp_micros=timestamp_micros, + ) + + _assert_mutations_equal(row._mutations, [expected_mutation]) def test_direct_row_set_cell(): @@ -204,6 +211,14 @@ def test_direct_row_set_cell_with_non_null_timestamp(): _set_cell_helper(timestamp=timestamp, timestamp_micros=millis_granularity) +def test_direct_row_set_cell_with_server_side_timestamp(): + from google.cloud.bigtable.data.mutations import _SERVER_SIDE_TIMESTAMP + + _set_cell_helper( + timestamp=_SERVER_SIDE_TIMESTAMP, timestamp_micros=_SERVER_SIDE_TIMESTAMP + ) + + def test_direct_row_delete(): from google.cloud.bigtable.data.mutations import DeleteAllFromRow @@ -476,7 +491,6 @@ def test_direct_row_commit_with_unknown_exception(): def test_direct_row_commit_with_invalid_argument(): from google.cloud.bigtable_v2.services.bigtable import BigtableClient - from google.rpc import code_pb2, status_pb2 project_id = "project-id" row_key = b"row_key" diff --git a/tests/unit/v2_client/test_table.py b/tests/unit/v2_client/test_table.py index d1cc518f7..5c3172424 100644 --- a/tests/unit/v2_client/test_table.py +++ b/tests/unit/v2_client/test_table.py @@ -48,105 +48,7 @@ RETRYABLES = (RETRYABLE_1, RETRYABLE_2, RETRYABLE_3) NON_RETRYABLE = StatusCode.CANCELLED.value[0] STATUS_INTERNAL = StatusCode.INTERNAL.value[0] - - -@mock.patch("google.cloud.bigtable.table._MAX_BULK_MUTATIONS", new=3) -def test__compile_mutation_entries_w_too_many_mutations(): - from google.cloud.bigtable.row import DirectRow - from google.cloud.bigtable.table import TooManyMutationsError - from google.cloud.bigtable.table import _compile_mutation_entries - - table = mock.Mock(name="table", spec=["name"]) - table.name = "table" - rows = [ - DirectRow(row_key=b"row_key", table=table), - DirectRow(row_key=b"row_key_2", table=table), - ] - rows[0].set_cell("cf1", b"c1", 1) - rows[0].set_cell("cf1", b"c1", 2) - rows[1].set_cell("cf1", b"c1", 3) - rows[1].set_cell("cf1", b"c1", 4) - - with pytest.raises(TooManyMutationsError): - _compile_mutation_entries("table", rows) - - -def test__compile_mutation_entries_normal(): - from google.cloud.bigtable.row import DirectRow - from google.cloud.bigtable.table import _compile_mutation_entries - from google.cloud.bigtable_v2.types import MutateRowsRequest - from google.cloud.bigtable_v2.types import data - - table = mock.Mock(spec=["name"]) - table.name = "table" - rows = [ - DirectRow(row_key=b"row_key", table=table), - DirectRow(row_key=b"row_key_2"), - ] - rows[0].set_cell("cf1", b"c1", b"1") - rows[1].set_cell("cf1", b"c1", b"2") - - result = _compile_mutation_entries("table", rows) - - entry_1 = MutateRowsRequest.Entry() - entry_1.row_key = b"row_key" - mutations_1 = data.Mutation() - mutations_1.set_cell.family_name = "cf1" - mutations_1.set_cell.column_qualifier = b"c1" - mutations_1.set_cell.timestamp_micros = -1 - mutations_1.set_cell.value = b"1" - entry_1.mutations.append(mutations_1) - - entry_2 = MutateRowsRequest.Entry() - entry_2.row_key = b"row_key_2" - mutations_2 = data.Mutation() - mutations_2.set_cell.family_name = "cf1" - mutations_2.set_cell.column_qualifier = b"c1" - mutations_2.set_cell.timestamp_micros = -1 - mutations_2.set_cell.value = b"2" - entry_2.mutations.append(mutations_2) - assert result == [entry_1, entry_2] - - -def test__check_row_table_name_w_wrong_table_name(): - from google.cloud.bigtable.table import _check_row_table_name - from google.cloud.bigtable.table import TableMismatchError - from google.cloud.bigtable.row import DirectRow - - table = mock.Mock(name="table", spec=["name"]) - table.name = "table" - row = DirectRow(row_key=b"row_key", table=table) - - with pytest.raises(TableMismatchError): - _check_row_table_name("other_table", row) - - -def test__check_row_table_name_w_right_table_name(): - from google.cloud.bigtable.row import DirectRow - from google.cloud.bigtable.table import _check_row_table_name - - table = mock.Mock(name="table", spec=["name"]) - table.name = "table" - row = DirectRow(row_key=b"row_key", table=table) - - assert not _check_row_table_name("table", row) - - -def test__check_row_type_w_wrong_row_type(): - from google.cloud.bigtable.row import ConditionalRow - from google.cloud.bigtable.table import _check_row_type - - row = ConditionalRow(row_key=b"row_key", table="table", filter_=None) - with pytest.raises(TypeError): - _check_row_type(row) - - -def test__check_row_type_w_right_row_type(): - from google.cloud.bigtable.row import DirectRow - from google.cloud.bigtable.table import _check_row_type - - row = DirectRow(row_key=b"row_key", table="table") - assert not _check_row_type(row) +STATUS_UNKNOWN = StatusCode.UNKNOWN.value[0] def _make_client(*args, **kwargs): @@ -811,10 +713,19 @@ def test_table_read_row_still_partial(): def _table_mutate_rows_helper( - mutation_timeout=None, app_profile_id=None, retry=None, timeout=None + mutation_timeout=None, + app_profile_id=None, + retry=None, + timeout=None, + expected_operation_timeout=None, + expected_attempt_timeout=None, + expected_retryable_errors=None, ): - from google.rpc.status_pb2 import Status + from google.api_core import exceptions + from google.rpc import status_pb2 from google.cloud.bigtable.table import DEFAULT_RETRY + from google.cloud.bigtable.table import RETRYABLE_MUTATION_ERRORS + from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup credentials = _make_credentials() client = _make_client(project="project-id", credentials=credentials, admin=True) @@ -827,15 +738,20 @@ def _table_mutate_rows_helper( if app_profile_id is not None: ctor_kwargs["app_profile_id"] = app_profile_id - table = _make_table(TABLE_ID, instance, **ctor_kwargs) + if expected_operation_timeout is None: + expected_operation_timeout = DEFAULT_RETRY.deadline - rows = [mock.MagicMock(), mock.MagicMock()] - response = [Status(code=0), Status(code=1)] - instance_mock = mock.Mock(return_value=response) - klass_mock = mock.patch( - "google.cloud.bigtable.table._RetryableMutateRowsWorker", - new=mock.MagicMock(return_value=instance_mock), - ) + if expected_retryable_errors is None: + expected_retryable_errors = RETRYABLE_MUTATION_ERRORS + + rows = [ + _MockRow(b"first"), + _MockRow(b"second"), + _MockRow(b"third"), + _MockRow(b"fourth"), + ] + + table = _make_table(TABLE_ID, instance, **ctor_kwargs) call_kwargs = {} @@ -843,29 +759,72 @@ def _table_mutate_rows_helper( call_kwargs["retry"] = retry if timeout is not None: - expected_timeout = call_kwargs["timeout"] = timeout - else: - expected_timeout = mutation_timeout + call_kwargs["timeout"] = timeout - with klass_mock: - statuses = table.mutate_rows(rows, **call_kwargs) + with mock.patch.object( + table._table_impl, "_get_mutate_rows_operation" + ) as get_operation_mock: + get_operation_mock.return_value.start.side_effect = MutationsExceptionGroup( + excs=[Exception()], total_entries=4 + ) - result = [status.code for status in statuses] - expected_result = [0, 1] - assert result == expected_result + # First entry = success + # Second entry = api errors + # Third entry = api errors, but not the first item + # Fourth entry = Errors, but no errors with grpc status codes + get_operation_mock.return_value.errors = { + 1: [ + exceptions.InternalServerError("First exception"), + exceptions.InternalServerError("Second exception"), + ], + 2: [ + ValueError("First exception"), + TypeError("Second exception"), + exceptions.InternalServerError("API error"), + ], + 3: [ + ValueError("First exception"), + TypeError("Second exception"), + ], + } + statuses = table.mutate_rows(rows, **call_kwargs) - klass_mock.new.assert_called_once_with( - client, - TABLE_NAME, - rows, - app_profile_id=app_profile_id, - timeout=expected_timeout, - ) + assert statuses == [ + status_pb2.Status( + code=SUCCESS, + message="", + ), + status_pb2.Status( + code=STATUS_INTERNAL, + message="First exception", + ), + status_pb2.Status( + code=STATUS_INTERNAL, + message="API error", + ), + status_pb2.Status( + code=STATUS_UNKNOWN, + message="First exception", + ), + ] - if retry is not None: - instance_mock.assert_called_once_with(retry=retry) - else: - instance_mock.assert_called_once_with(retry=DEFAULT_RETRY) + # Check all call args other than mutation_entries + get_operation_mock.assert_called_once_with( + mock.ANY, + operation_timeout=expected_operation_timeout, + attempt_timeout=expected_attempt_timeout, + retryable_errors=expected_retryable_errors, + ) + + # Check that mutation entries are in order + mutation_entries = get_operation_mock.call_args.args[0] + mutation_entry_keys = [row.row_key for row in mutation_entries] + assert mutation_entry_keys == [ + b"first", + b"second", + b"third", + b"fourth", + ] def test_table_mutate_rows_w_default_mutation_timeout_app_profile_id(): @@ -874,7 +833,9 @@ def test_table_mutate_rows_w_default_mutation_timeout_app_profile_id(): def test_table_mutate_rows_w_mutation_timeout(): mutation_timeout = 123 - _table_mutate_rows_helper(mutation_timeout=mutation_timeout) + _table_mutate_rows_helper( + mutation_timeout=mutation_timeout, expected_attempt_timeout=mutation_timeout + ) def test_table_mutate_rows_w_app_profile_id(): @@ -883,19 +844,49 @@ def test_table_mutate_rows_w_app_profile_id(): def test_table_mutate_rows_w_retry(): + deadline = 456.0 + retry = mock.Mock() + retry.deadline = deadline + _table_mutate_rows_helper(retry=retry, expected_operation_timeout=deadline) + + +def test_table_mutate_rows_w_zero_deadline_retry(): + from google.cloud.bigtable.data._helpers import TABLE_DEFAULT + + deadline = 0.0 + retry = mock.Mock() + retry.deadline = deadline + _table_mutate_rows_helper( + retry=retry, + expected_operation_timeout=TABLE_DEFAULT.MUTATE_ROWS, + expected_retryable_errors=[], + ) + + +def test_table_mutate_rows_w_none_deadline_retry(): + from google.cloud.bigtable.data._helpers import TABLE_DEFAULT + + deadline = None retry = mock.Mock() - _table_mutate_rows_helper(retry=retry) + retry.deadline = deadline + _table_mutate_rows_helper( + retry=retry, expected_operation_timeout=TABLE_DEFAULT.MUTATE_ROWS + ) def test_table_mutate_rows_w_timeout_arg(): timeout = 123 - _table_mutate_rows_helper(timeout=timeout) + _table_mutate_rows_helper(timeout=timeout, expected_attempt_timeout=timeout) def test_table_mutate_rows_w_mutation_timeout_and_timeout_arg(): mutation_timeout = 123 timeout = 456 - _table_mutate_rows_helper(mutation_timeout=mutation_timeout, timeout=timeout) + _table_mutate_rows_helper( + mutation_timeout=mutation_timeout, + timeout=timeout, + expected_attempt_timeout=timeout, + ) def test_table_read_rows(): @@ -1553,505 +1544,6 @@ def test_table_restore_table_w_backup_name(): _table_restore_helper(backup_name=BACKUP_NAME) -def _make_worker(*args, **kwargs): - from google.cloud.bigtable.table import _RetryableMutateRowsWorker - - return _RetryableMutateRowsWorker(*args, **kwargs) - - -def _make_responses_statuses(codes): - from google.rpc.status_pb2 import Status - - response = [Status(code=code) for code in codes] - return response - - -def _make_responses(codes): - from google.cloud.bigtable_v2.types.bigtable import MutateRowsResponse - from google.rpc.status_pb2 import Status - - entries = [ - MutateRowsResponse.Entry(index=i, status=Status(code=codes[i])) - for i in range(len(codes)) - ] - return MutateRowsResponse(entries=entries) - - -def test_rmrw_callable_empty_rows(): - credentials = _make_credentials() - client = _make_client(project="project-id", credentials=credentials, admin=True) - instance = client.instance(instance_id=INSTANCE_ID) - table = _make_table(TABLE_ID, instance) - gapic_api = _make_gapic_api(client) - gapic_api.mutate_rows.return_value = [] - gapic_api.table_path.return_value = ( - f"projects/{PROJECT_ID}/instances/{INSTANCE_ID}/tables/{TABLE_ID}" - ) - - worker = _make_worker(client, table.name, []) - statuses = worker() - - assert len(statuses) == 0 - - -def test_rmrw_callable_no_retry_strategy(): - from google.cloud.bigtable.row import DirectRow - - # Setup: - # - Mutate 3 rows. - # Action: - # - Attempt to mutate the rows w/o any retry strategy. - # Expectation: - # - Since no retry, should return statuses as they come back. - # - Even if there are retryable errors, no retry attempt is made. - # - State of responses_statuses should be - # [success, retryable, non-retryable] - credentials = _make_credentials() - client = _make_client(project="project-id", credentials=credentials, admin=True) - instance = client.instance(instance_id=INSTANCE_ID) - table = _make_table(TABLE_ID, instance) - - row_1 = DirectRow(row_key=b"row_key", table=table) - row_1.set_cell("cf", b"col", b"value1") - row_2 = DirectRow(row_key=b"row_key_2", table=table) - row_2.set_cell("cf", b"col", b"value2") - row_3 = DirectRow(row_key=b"row_key_3", table=table) - row_3.set_cell("cf", b"col", b"value3") - - response_codes = [SUCCESS, RETRYABLE_1, NON_RETRYABLE] - response = _make_responses(response_codes) - - gapic_api = _make_gapic_api(client) - gapic_api.mutate_rows.return_value = [response] - gapic_api.table_path.return_value = ( - f"projects/{PROJECT_ID}/instances/{INSTANCE_ID}/tables/{TABLE_ID}" - ) - worker = _make_worker(client, table.name, [row_1, row_2, row_3]) - - statuses = worker(retry=None) - - result = [status.code for status in statuses] - assert result == response_codes - - gapic_api.mutate_rows.assert_called_once() - - -def test_rmrw_callable_retry(): - from google.cloud.bigtable.row import DirectRow - from google.cloud.bigtable.table import DEFAULT_RETRY - - # Setup: - # - Mutate 3 rows. - # Action: - # - Initial attempt will mutate all 3 rows. - # Expectation: - # - First attempt will result in one retryable error. - # - Second attempt will result in success for the retry-ed row. - # - Check MutateRows is called twice. - # - State of responses_statuses should be - # [success, success, non-retryable] - - credentials = _make_credentials() - client = _make_client(project="project-id", credentials=credentials, admin=True) - instance = client.instance(instance_id=INSTANCE_ID) - table = _make_table(TABLE_ID, instance) - row_1 = DirectRow(row_key=b"row_key", table=table) - row_1.set_cell("cf", b"col", b"value1") - row_2 = DirectRow(row_key=b"row_key_2", table=table) - row_2.set_cell("cf", b"col", b"value2") - row_3 = DirectRow(row_key=b"row_key_3", table=table) - row_3.set_cell("cf", b"col", b"value3") - - response_1 = _make_responses([SUCCESS, RETRYABLE_1, NON_RETRYABLE]) - response_2 = _make_responses([SUCCESS]) - gapic_api = _make_gapic_api(client) - gapic_api.mutate_rows.side_effect = [[response_1], [response_2]] - gapic_api.table_path.return_value = ( - f"projects/{PROJECT_ID}/instances/{INSTANCE_ID}/tables/{TABLE_ID}" - ) - worker = _make_worker(client, table.name, [row_1, row_2, row_3]) - retry = DEFAULT_RETRY.with_delay(initial=0.1) - - statuses = worker(retry=retry) - - result = [status.code for status in statuses] - - assert result == [SUCCESS, SUCCESS, NON_RETRYABLE] - - assert client._table_data_client._gapic_client.mutate_rows.call_count == 2 - - -def _do_mutate_retryable_rows_helper( - row_cells, - responses, - prior_statuses=None, - expected_result=None, - raising_retry=False, - retryable_error=False, - timeout=None, - mutate_rows_side_effect=None, -): - from google.api_core.exceptions import ServiceUnavailable - from google.cloud.bigtable.row import DirectRow - from google.cloud.bigtable.table import _BigtableRetryableError - from google.cloud.bigtable_v2.types import bigtable as data_messages_v2_pb2 - - # Setup: - # - Mutate 2 rows. - # Action: - # - Initial attempt will mutate all 2 rows. - # Expectation: - # - Expect [success, non-retryable] - - credentials = _make_credentials() - client = _make_client(project="project-id", credentials=credentials, admin=True) - instance = client.instance(instance_id=INSTANCE_ID) - table = _make_table(TABLE_ID, instance) - - rows = [] - for row_key, cell_data in row_cells: - row = DirectRow(row_key=row_key, table=table) - row.set_cell(*cell_data) - rows.append(row) - - response = _make_responses(responses) - - gapic_api = _make_gapic_api(client) - if retryable_error: - if mutate_rows_side_effect is not None: - gapic_api.mutate_rows.side_effect = mutate_rows_side_effect - else: - gapic_api.mutate_rows.side_effect = ServiceUnavailable("testing") - else: - if mutate_rows_side_effect is not None: - gapic_api.mutate_rows.side_effect = mutate_rows_side_effect - gapic_api.mutate_rows.return_value = [response] - - worker = _make_worker(client, table.name, rows=rows) - - if prior_statuses is not None: - assert len(prior_statuses) == len(rows) - worker.responses_statuses = _make_responses_statuses(prior_statuses) - - expected_entries = [] - for row, prior_status in zip(rows, worker.responses_statuses): - if prior_status is None or prior_status.code in RETRYABLES: - entry = data_messages_v2_pb2.MutateRowsRequest.Entry( - row_key=row.row_key, - mutations=row._get_mutation_pbs().copy(), # row clears on success - ) - expected_entries.append(entry) - - expected_kwargs = {} - if timeout is not None: - worker.timeout = timeout - expected_kwargs["timeout"] = mock.ANY - - if retryable_error or raising_retry: - with pytest.raises(_BigtableRetryableError): - worker._do_mutate_retryable_rows() - statuses = worker.responses_statuses - else: - statuses = worker._do_mutate_retryable_rows() - - if not retryable_error: - result = [status.code for status in statuses] - - if expected_result is None: - expected_result = responses - - assert result == expected_result - - if len(responses) == 0 and not retryable_error: - gapic_api.mutate_rows.assert_not_called() - else: - gapic_api.mutate_rows.assert_called_once_with( - table_name=table.name, - entries=expected_entries, - app_profile_id=None, - retry=None, - **expected_kwargs, - ) - if timeout is not None: - called = gapic_api.mutate_rows.mock_calls[0] - assert called.kwargs["timeout"]._deadline == timeout - - -def test_rmrw_do_mutate_retryable_rows_empty_rows(): - # - # Setup: - # - No mutated rows. - # Action: - # - No API call made. - # Expectation: - # - No change. - # - row_cells = [] - responses = [] - - _do_mutate_retryable_rows_helper(row_cells, responses) - - -def test_rmrw_do_mutate_retryable_rows_w_timeout(): - # - # Setup: - # - Mutate 2 rows. - # Action: - # - Initial attempt will mutate all 2 rows. - # Expectation: - # - No retryable error codes, so don't expect a raise. - # - State of responses_statuses should be [success, non-retryable]. - # - row_cells = [ - (b"row_key_1", ("cf", b"col", b"value1")), - (b"row_key_2", ("cf", b"col", b"value2")), - ] - - responses = [SUCCESS, NON_RETRYABLE] - - timeout = 5 # seconds - - _do_mutate_retryable_rows_helper( - row_cells, - responses, - timeout=timeout, - ) - - -def test_rmrw_do_mutate_retryable_rows_w_retryable_error(): - # - # Setup: - # - Mutate 2 rows. - # Action: - # - Initial attempt will mutate all 2 rows. - # Expectation: - # - No retryable error codes, so don't expect a raise. - # - State of responses_statuses should be [success, non-retryable]. - # - row_cells = [ - (b"row_key_1", ("cf", b"col", b"value1")), - (b"row_key_2", ("cf", b"col", b"value2")), - ] - - responses = () - - _do_mutate_retryable_rows_helper( - row_cells, - responses, - retryable_error=True, - ) - - -def test_rmrw_do_mutate_retryable_rows_w_retryable_error_internal_rst_stream_error(): - # Mutate two rows - # Raise internal server error with RST STREAM error messages - # There should be no error raised and that the request is retried - from google.api_core.exceptions import InternalServerError - from google.cloud.bigtable.row_data import RETRYABLE_INTERNAL_ERROR_MESSAGES - - row_cells = [ - (b"row_key_1", ("cf", b"col", b"value1")), - (b"row_key_2", ("cf", b"col", b"value2")), - ] - responses = () - - for retryable_internal_error_message in RETRYABLE_INTERNAL_ERROR_MESSAGES: - for message in [ - retryable_internal_error_message, - retryable_internal_error_message.upper(), - ]: - _do_mutate_retryable_rows_helper( - row_cells, - responses, - retryable_error=True, - mutate_rows_side_effect=InternalServerError(message), - ) - - -def test_rmrw_do_mutate_rows_w_retryable_error_internal_not_retryable(): - # Mutate two rows - # Raise internal server error but not RST STREAM error messages - # mutate_rows should raise Internal Server Error - from google.api_core.exceptions import InternalServerError - - row_cells = [ - (b"row_key_1", ("cf", b"col", b"value1")), - (b"row_key_2", ("cf", b"col", b"value2")), - ] - responses = () - - with pytest.raises(InternalServerError): - _do_mutate_retryable_rows_helper( - row_cells, - responses, - mutate_rows_side_effect=InternalServerError("Error not retryable."), - ) - - -def test_rmrw_do_mutate_retryable_rows_retry(): - # - # Setup: - # - Mutate 3 rows. - # Action: - # - Initial attempt will mutate all 3 rows. - # Expectation: - # - Second row returns retryable error code, so expect a raise. - # - State of responses_statuses should be - # [success, retryable, non-retryable] - # - row_cells = [ - (b"row_key_1", ("cf", b"col", b"value1")), - (b"row_key_2", ("cf", b"col", b"value2")), - (b"row_key_3", ("cf", b"col", b"value3")), - ] - - responses = [SUCCESS, RETRYABLE_1, NON_RETRYABLE] - - _do_mutate_retryable_rows_helper( - row_cells, - responses, - raising_retry=True, - ) - - -def test_rmrw_do_mutate_retryable_rows_second_retry(): - # - # Setup: - # - Mutate 4 rows. - # - First try results: - # [success, retryable, non-retryable, retryable] - # Action: - # - Second try should re-attempt the 'retryable' rows. - # Expectation: - # - After second try: - # [success, success, non-retryable, retryable] - # - One of the rows tried second time returns retryable error code, - # so expect a raise. - # - Exception contains response whose index should be '3' even though - # only two rows were retried. - # - row_cells = [ - (b"row_key_1", ("cf", b"col", b"value1")), - (b"row_key_2", ("cf", b"col", b"value2")), - (b"row_key_3", ("cf", b"col", b"value3")), - (b"row_key_4", ("cf", b"col", b"value4")), - ] - - responses = [SUCCESS, RETRYABLE_1] - - prior_statuses = [ - SUCCESS, - RETRYABLE_1, - NON_RETRYABLE, - RETRYABLE_2, - ] - - expected_result = [ - SUCCESS, - SUCCESS, - NON_RETRYABLE, - RETRYABLE_1, - ] - - _do_mutate_retryable_rows_helper( - row_cells, - responses, - prior_statuses=prior_statuses, - expected_result=expected_result, - raising_retry=True, - ) - - -def test_rmrw_do_mutate_retryable_rows_second_try(): - # - # Setup: - # - Mutate 4 rows. - # - First try results: - # [success, retryable, non-retryable, retryable] - # Action: - # - Second try should re-attempt the 'retryable' rows. - # Expectation: - # - After second try: - # [success, non-retryable, non-retryable, success] - # - row_cells = [ - (b"row_key_1", ("cf", b"col", b"value1")), - (b"row_key_2", ("cf", b"col", b"value2")), - (b"row_key_3", ("cf", b"col", b"value3")), - (b"row_key_4", ("cf", b"col", b"value4")), - ] - - responses = [NON_RETRYABLE, SUCCESS] - - prior_statuses = [ - SUCCESS, - RETRYABLE_1, - NON_RETRYABLE, - RETRYABLE_2, - ] - - expected_result = [ - SUCCESS, - NON_RETRYABLE, - NON_RETRYABLE, - SUCCESS, - ] - - _do_mutate_retryable_rows_helper( - row_cells, - responses, - prior_statuses=prior_statuses, - expected_result=expected_result, - ) - - -def test_rmrw_do_mutate_retryable_rows_second_try_no_retryable(): - # - # Setup: - # - Mutate 2 rows. - # - First try results: [success, non-retryable] - # Action: - # - Second try has no row to retry. - # Expectation: - # - After second try: [success, non-retryable] - # - row_cells = [ - (b"row_key_1", ("cf", b"col", b"value1")), - (b"row_key_2", ("cf", b"col", b"value2")), - ] - - responses = [] # no calls will be made - - prior_statuses = [ - SUCCESS, - NON_RETRYABLE, - ] - - expected_result = [ - SUCCESS, - NON_RETRYABLE, - ] - - _do_mutate_retryable_rows_helper( - row_cells, - responses, - prior_statuses=prior_statuses, - expected_result=expected_result, - ) - - -def test_rmrw_do_mutate_retryable_rows_mismatch_num_responses(): - row_cells = [ - (b"row_key_1", ("cf", b"col", b"value1")), - (b"row_key_2", ("cf", b"col", b"value2")), - ] - - responses = [SUCCESS] - - with pytest.raises(RuntimeError): - _do_mutate_retryable_rows_helper(row_cells, responses) - - def test__create_row_request_table_name_only(): from google.cloud.bigtable.table import _create_row_request @@ -2273,6 +1765,14 @@ def _ReadRowsResponsePB(*args, **kw): return messages_v2_pb2.ReadRowsResponse(*args, **kw) +class _MockRow(object): + def __init__(self, row_key): + self.row_key = row_key + + def _get_mutations(self): + return [mock.MagicMock()] + + class _MockReadRowsIterator(object): def __init__(self, *values): self.iter_values = iter(values)