diff --git a/apps/pair.py b/apps/pair.py index 135c9be6..9bfa0a55 100644 --- a/apps/pair.py +++ b/apps/pair.py @@ -20,11 +20,12 @@ import asyncio import logging import os +from typing import ClassVar import click from prompt_toolkit.shortcuts import PromptSession -from bumble import data_types +from bumble import data_types, smp from bumble.a2dp import make_audio_sink_service_sdp_records from bumble.att import ( ATT_INSUFFICIENT_AUTHENTICATION_ERROR, @@ -40,7 +41,7 @@ PhysicalTransport, ProtocolError, ) -from bumble.device import Device, Peer +from bumble.device import Connection, Device, Peer from bumble.gatt import ( GATT_DEVICE_NAME_CHARACTERISTIC, GATT_GENERIC_ACCESS_SERVICE, @@ -53,7 +54,6 @@ from bumble.keys import JsonKeyStore from bumble.pairing import OobData, PairingConfig, PairingDelegate from bumble.smp import OobContext, OobLegacyContext -from bumble.smp import error_name as smp_error_name from bumble.transport import open_transport from bumble.utils import AsyncRunner @@ -65,7 +65,7 @@ # ----------------------------------------------------------------------------- class Waiter: - instance: Waiter | None = None + instance: ClassVar[Waiter | None] = None def __init__(self, linger=False): self.done = asyncio.get_running_loop().create_future() @@ -319,12 +319,13 @@ async def on_classic_pairing(connection): # ----------------------------------------------------------------------------- @AsyncRunner.run_in_task() -async def on_pairing_failure(connection, reason): +async def on_pairing_failure(connection: Connection, reason: smp.ErrorCode): print(color('***-----------------------------------', 'red')) - print(color(f'*** Pairing failed: {smp_error_name(reason)}', 'red')) + print(color(f'*** Pairing failed: {reason.name}', 'red')) print(color('***-----------------------------------', 'red')) await connection.disconnect() - Waiter.instance.terminate() + if Waiter.instance: + Waiter.instance.terminate() # ----------------------------------------------------------------------------- diff --git a/bumble/a2dp.py b/bumble/a2dp.py index 4c9eb320..31ec5528 100644 --- a/bumble/a2dp.py +++ b/bumble/a2dp.py @@ -88,13 +88,6 @@ class CodecType(utils.OpenIntEnum): SBC_STEREO_CHANNEL_MODE = 0x02 SBC_JOINT_STEREO_CHANNEL_MODE = 0x03 -SBC_CHANNEL_MODE_NAMES = { - SBC_MONO_CHANNEL_MODE: 'SBC_MONO_CHANNEL_MODE', - SBC_DUAL_CHANNEL_MODE: 'SBC_DUAL_CHANNEL_MODE', - SBC_STEREO_CHANNEL_MODE: 'SBC_STEREO_CHANNEL_MODE', - SBC_JOINT_STEREO_CHANNEL_MODE: 'SBC_JOINT_STEREO_CHANNEL_MODE' -} - SBC_BLOCK_LENGTHS = [4, 8, 12, 16] SBC_SUBBANDS = [4, 8] @@ -102,11 +95,6 @@ class CodecType(utils.OpenIntEnum): SBC_SNR_ALLOCATION_METHOD = 0x00 SBC_LOUDNESS_ALLOCATION_METHOD = 0x01 -SBC_ALLOCATION_METHOD_NAMES = { - SBC_SNR_ALLOCATION_METHOD: 'SBC_SNR_ALLOCATION_METHOD', - SBC_LOUDNESS_ALLOCATION_METHOD: 'SBC_LOUDNESS_ALLOCATION_METHOD' -} - SBC_MAX_FRAMES_IN_RTP_PAYLOAD = 15 MPEG_2_4_AAC_SAMPLING_FREQUENCIES = [ @@ -129,13 +117,6 @@ class CodecType(utils.OpenIntEnum): MPEG_4_AAC_LTP_OBJECT_TYPE = 0x02 MPEG_4_AAC_SCALABLE_OBJECT_TYPE = 0x03 -MPEG_2_4_OBJECT_TYPE_NAMES = { - MPEG_2_AAC_LC_OBJECT_TYPE: 'MPEG_2_AAC_LC_OBJECT_TYPE', - MPEG_4_AAC_LC_OBJECT_TYPE: 'MPEG_4_AAC_LC_OBJECT_TYPE', - MPEG_4_AAC_LTP_OBJECT_TYPE: 'MPEG_4_AAC_LTP_OBJECT_TYPE', - MPEG_4_AAC_SCALABLE_OBJECT_TYPE: 'MPEG_4_AAC_SCALABLE_OBJECT_TYPE' -} - OPUS_MAX_FRAMES_IN_RTP_PAYLOAD = 15 diff --git a/bumble/hci.py b/bumble/hci.py index 46c41c74..14e01d81 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -247,28 +247,6 @@ class SpecificationVersion(SpecableEnum): HCI_VERSION_BLUETOOTH_CORE_6_1 = SpecificationVersion.BLUETOOTH_CORE_6_1 HCI_VERSION_BLUETOOTH_CORE_6_2 = SpecificationVersion.BLUETOOTH_CORE_6_2 -HCI_VERSION_NAMES = { - HCI_VERSION_BLUETOOTH_CORE_1_0B: 'HCI_VERSION_BLUETOOTH_CORE_1_0B', - HCI_VERSION_BLUETOOTH_CORE_1_1: 'HCI_VERSION_BLUETOOTH_CORE_1_1', - HCI_VERSION_BLUETOOTH_CORE_1_2: 'HCI_VERSION_BLUETOOTH_CORE_1_2', - HCI_VERSION_BLUETOOTH_CORE_2_0_EDR: 'HCI_VERSION_BLUETOOTH_CORE_2_0_EDR', - HCI_VERSION_BLUETOOTH_CORE_2_1_EDR: 'HCI_VERSION_BLUETOOTH_CORE_2_1_EDR', - HCI_VERSION_BLUETOOTH_CORE_3_0_HS: 'HCI_VERSION_BLUETOOTH_CORE_3_0_HS', - HCI_VERSION_BLUETOOTH_CORE_4_0: 'HCI_VERSION_BLUETOOTH_CORE_4_0', - HCI_VERSION_BLUETOOTH_CORE_4_1: 'HCI_VERSION_BLUETOOTH_CORE_4_1', - HCI_VERSION_BLUETOOTH_CORE_4_2: 'HCI_VERSION_BLUETOOTH_CORE_4_2', - HCI_VERSION_BLUETOOTH_CORE_5_0: 'HCI_VERSION_BLUETOOTH_CORE_5_0', - HCI_VERSION_BLUETOOTH_CORE_5_1: 'HCI_VERSION_BLUETOOTH_CORE_5_1', - HCI_VERSION_BLUETOOTH_CORE_5_2: 'HCI_VERSION_BLUETOOTH_CORE_5_2', - HCI_VERSION_BLUETOOTH_CORE_5_3: 'HCI_VERSION_BLUETOOTH_CORE_5_3', - HCI_VERSION_BLUETOOTH_CORE_5_4: 'HCI_VERSION_BLUETOOTH_CORE_5_4', - HCI_VERSION_BLUETOOTH_CORE_6_0: 'HCI_VERSION_BLUETOOTH_CORE_6_0', - HCI_VERSION_BLUETOOTH_CORE_6_1: 'HCI_VERSION_BLUETOOTH_CORE_6_1', - HCI_VERSION_BLUETOOTH_CORE_6_2: 'HCI_VERSION_BLUETOOTH_CORE_6_2', -} - -LMP_VERSION_NAMES = HCI_VERSION_NAMES - # HCI Packet types HCI_COMMAND_PACKET = 0x01 HCI_ACL_DATA_PACKET = 0x02 diff --git a/bumble/pairing.py b/bumble/pairing.py index b6509a6c..d02946a8 100644 --- a/bumble/pairing.py +++ b/bumble/pairing.py @@ -21,18 +21,9 @@ import secrets from dataclasses import dataclass -from bumble import hci +from bumble import hci, smp from bumble.core import AdvertisingData, LeRole from bumble.smp import ( - SMP_DISPLAY_ONLY_IO_CAPABILITY, - SMP_DISPLAY_YES_NO_IO_CAPABILITY, - SMP_ENC_KEY_DISTRIBUTION_FLAG, - SMP_ID_KEY_DISTRIBUTION_FLAG, - SMP_KEYBOARD_DISPLAY_IO_CAPABILITY, - SMP_KEYBOARD_ONLY_IO_CAPABILITY, - SMP_LINK_KEY_DISTRIBUTION_FLAG, - SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY, - SMP_SIGN_KEY_DISTRIBUTION_FLAG, OobContext, OobLegacyContext, OobSharedData, @@ -96,11 +87,11 @@ class PairingDelegate: # These are defined abstractly, and can be mapped to specific Classic pairing # and/or SMP constants. class IoCapability(enum.IntEnum): - NO_OUTPUT_NO_INPUT = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY - KEYBOARD_INPUT_ONLY = SMP_KEYBOARD_ONLY_IO_CAPABILITY - DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY - DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY - DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY + NO_OUTPUT_NO_INPUT = smp.IoCapability.NO_INPUT_NO_OUTPUT + KEYBOARD_INPUT_ONLY = smp.IoCapability.KEYBOARD_ONLY + DISPLAY_OUTPUT_ONLY = smp.IoCapability.DISPLAY_ONLY + DISPLAY_OUTPUT_AND_YES_NO_INPUT = smp.IoCapability.DISPLAY_YES_NO + DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = smp.IoCapability.KEYBOARD_DISPLAY # Direct names for backward compatibility. NO_OUTPUT_NO_INPUT = IoCapability.NO_OUTPUT_NO_INPUT @@ -111,10 +102,10 @@ class IoCapability(enum.IntEnum): # Key Distribution [LE only] class KeyDistribution(enum.IntFlag): - DISTRIBUTE_ENCRYPTION_KEY = SMP_ENC_KEY_DISTRIBUTION_FLAG - DISTRIBUTE_IDENTITY_KEY = SMP_ID_KEY_DISTRIBUTION_FLAG - DISTRIBUTE_SIGNING_KEY = SMP_SIGN_KEY_DISTRIBUTION_FLAG - DISTRIBUTE_LINK_KEY = SMP_LINK_KEY_DISTRIBUTION_FLAG + DISTRIBUTE_ENCRYPTION_KEY = smp.KeyDistribution.ENC_KEY + DISTRIBUTE_IDENTITY_KEY = smp.KeyDistribution.ID_KEY + DISTRIBUTE_SIGNING_KEY = smp.KeyDistribution.SIGN_KEY + DISTRIBUTE_LINK_KEY = smp.KeyDistribution.LINK_KEY DEFAULT_KEY_DISTRIBUTION: KeyDistribution = ( KeyDistribution.DISTRIBUTE_ENCRYPTION_KEY diff --git a/bumble/smp.py b/bumble/smp.py index 76b4d00b..9d0bb7ca 100644 --- a/bumble/smp.py +++ b/bumble/smp.py @@ -31,14 +31,13 @@ from dataclasses import dataclass, field from typing import TYPE_CHECKING, ClassVar, TypeVar, cast -from bumble import crypto, utils +from bumble import crypto, hci, utils from bumble.colors import color from bumble.core import ( AdvertisingData, InvalidArgumentError, PhysicalTransport, ProtocolError, - name_or_number, ) from bumble.hci import ( Address, @@ -46,7 +45,6 @@ HCI_LE_Enable_Encryption_Command, HCI_Object, Role, - key_with_value, metadata, ) from bumble.keys import PairingKeys @@ -71,110 +69,110 @@ SMP_CID = 0x06 SMP_BR_CID = 0x07 -SMP_PAIRING_REQUEST_COMMAND = 0x01 -SMP_PAIRING_RESPONSE_COMMAND = 0x02 -SMP_PAIRING_CONFIRM_COMMAND = 0x03 -SMP_PAIRING_RANDOM_COMMAND = 0x04 -SMP_PAIRING_FAILED_COMMAND = 0x05 -SMP_ENCRYPTION_INFORMATION_COMMAND = 0x06 -SMP_MASTER_IDENTIFICATION_COMMAND = 0x07 -SMP_IDENTITY_INFORMATION_COMMAND = 0x08 -SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND = 0x09 -SMP_SIGNING_INFORMATION_COMMAND = 0x0A -SMP_SECURITY_REQUEST_COMMAND = 0x0B -SMP_PAIRING_PUBLIC_KEY_COMMAND = 0x0C -SMP_PAIRING_DHKEY_CHECK_COMMAND = 0x0D -SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND = 0x0E - -SMP_COMMAND_NAMES = { - SMP_PAIRING_REQUEST_COMMAND: 'SMP_PAIRING_REQUEST_COMMAND', - SMP_PAIRING_RESPONSE_COMMAND: 'SMP_PAIRING_RESPONSE_COMMAND', - SMP_PAIRING_CONFIRM_COMMAND: 'SMP_PAIRING_CONFIRM_COMMAND', - SMP_PAIRING_RANDOM_COMMAND: 'SMP_PAIRING_RANDOM_COMMAND', - SMP_PAIRING_FAILED_COMMAND: 'SMP_PAIRING_FAILED_COMMAND', - SMP_ENCRYPTION_INFORMATION_COMMAND: 'SMP_ENCRYPTION_INFORMATION_COMMAND', - SMP_MASTER_IDENTIFICATION_COMMAND: 'SMP_MASTER_IDENTIFICATION_COMMAND', - SMP_IDENTITY_INFORMATION_COMMAND: 'SMP_IDENTITY_INFORMATION_COMMAND', - SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND: 'SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND', - SMP_SIGNING_INFORMATION_COMMAND: 'SMP_SIGNING_INFORMATION_COMMAND', - SMP_SECURITY_REQUEST_COMMAND: 'SMP_SECURITY_REQUEST_COMMAND', - SMP_PAIRING_PUBLIC_KEY_COMMAND: 'SMP_PAIRING_PUBLIC_KEY_COMMAND', - SMP_PAIRING_DHKEY_CHECK_COMMAND: 'SMP_PAIRING_DHKEY_CHECK_COMMAND', - SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND: 'SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND' -} - -SMP_DISPLAY_ONLY_IO_CAPABILITY = 0x00 -SMP_DISPLAY_YES_NO_IO_CAPABILITY = 0x01 -SMP_KEYBOARD_ONLY_IO_CAPABILITY = 0x02 -SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY = 0x03 -SMP_KEYBOARD_DISPLAY_IO_CAPABILITY = 0x04 - -SMP_IO_CAPABILITY_NAMES = { - SMP_DISPLAY_ONLY_IO_CAPABILITY: 'SMP_DISPLAY_ONLY_IO_CAPABILITY', - SMP_DISPLAY_YES_NO_IO_CAPABILITY: 'SMP_DISPLAY_YES_NO_IO_CAPABILITY', - SMP_KEYBOARD_ONLY_IO_CAPABILITY: 'SMP_KEYBOARD_ONLY_IO_CAPABILITY', - SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: 'SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY', - SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: 'SMP_KEYBOARD_DISPLAY_IO_CAPABILITY' -} - -SMP_PASSKEY_ENTRY_FAILED_ERROR = 0x01 -SMP_OOB_NOT_AVAILABLE_ERROR = 0x02 -SMP_AUTHENTICATION_REQUIREMENTS_ERROR = 0x03 -SMP_CONFIRM_VALUE_FAILED_ERROR = 0x04 -SMP_PAIRING_NOT_SUPPORTED_ERROR = 0x05 -SMP_ENCRYPTION_KEY_SIZE_ERROR = 0x06 -SMP_COMMAND_NOT_SUPPORTED_ERROR = 0x07 -SMP_UNSPECIFIED_REASON_ERROR = 0x08 -SMP_REPEATED_ATTEMPTS_ERROR = 0x09 -SMP_INVALID_PARAMETERS_ERROR = 0x0A -SMP_DHKEY_CHECK_FAILED_ERROR = 0x0B -SMP_NUMERIC_COMPARISON_FAILED_ERROR = 0x0C -SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR = 0x0D -SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR = 0x0E - -SMP_ERROR_NAMES = { - SMP_PASSKEY_ENTRY_FAILED_ERROR: 'SMP_PASSKEY_ENTRY_FAILED_ERROR', - SMP_OOB_NOT_AVAILABLE_ERROR: 'SMP_OOB_NOT_AVAILABLE_ERROR', - SMP_AUTHENTICATION_REQUIREMENTS_ERROR: 'SMP_AUTHENTICATION_REQUIREMENTS_ERROR', - SMP_CONFIRM_VALUE_FAILED_ERROR: 'SMP_CONFIRM_VALUE_FAILED_ERROR', - SMP_PAIRING_NOT_SUPPORTED_ERROR: 'SMP_PAIRING_NOT_SUPPORTED_ERROR', - SMP_ENCRYPTION_KEY_SIZE_ERROR: 'SMP_ENCRYPTION_KEY_SIZE_ERROR', - SMP_COMMAND_NOT_SUPPORTED_ERROR: 'SMP_COMMAND_NOT_SUPPORTED_ERROR', - SMP_UNSPECIFIED_REASON_ERROR: 'SMP_UNSPECIFIED_REASON_ERROR', - SMP_REPEATED_ATTEMPTS_ERROR: 'SMP_REPEATED_ATTEMPTS_ERROR', - SMP_INVALID_PARAMETERS_ERROR: 'SMP_INVALID_PARAMETERS_ERROR', - SMP_DHKEY_CHECK_FAILED_ERROR: 'SMP_DHKEY_CHECK_FAILED_ERROR', - SMP_NUMERIC_COMPARISON_FAILED_ERROR: 'SMP_NUMERIC_COMPARISON_FAILED_ERROR', - SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR: 'SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR', - SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR: 'SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR' -} - -SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE = 0 -SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE = 1 -SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE = 2 -SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE = 3 -SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE = 4 - -SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES = { - SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE', - SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE', - SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE', - SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE', - SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE' -} +class CommandCode(hci.SpecableEnum): + PAIRING_REQUEST = 0x01 + PAIRING_RESPONSE = 0x02 + PAIRING_CONFIRM = 0x03 + PAIRING_RANDOM = 0x04 + PAIRING_FAILED = 0x05 + ENCRYPTION_INFORMATION = 0x06 + MASTER_IDENTIFICATION = 0x07 + IDENTITY_INFORMATION = 0x08 + IDENTITY_ADDRESS_INFORMATION = 0x09 + SIGNING_INFORMATION = 0x0A + SECURITY_REQUEST = 0x0B + PAIRING_PUBLIC_KEY = 0x0C + PAIRING_DHKEY_CHECK = 0x0D + PAIRING_KEYPRESS_NOTIFICATION = 0x0E + + +class IoCapability(hci.SpecableEnum): + DISPLAY_ONLY = 0x00 + DISPLAY_YES_NO = 0x01 + KEYBOARD_ONLY = 0x02 + NO_INPUT_NO_OUTPUT = 0x03 + KEYBOARD_DISPLAY = 0x04 + +SMP_DISPLAY_ONLY_IO_CAPABILITY = IoCapability.DISPLAY_ONLY +SMP_DISPLAY_YES_NO_IO_CAPABILITY = IoCapability.DISPLAY_YES_NO +SMP_KEYBOARD_ONLY_IO_CAPABILITY = IoCapability.KEYBOARD_ONLY +SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY = IoCapability.NO_INPUT_NO_OUTPUT +SMP_KEYBOARD_DISPLAY_IO_CAPABILITY = IoCapability.KEYBOARD_DISPLAY + +class ErrorCode(hci.SpecableEnum): + PASSKEY_ENTRY_FAILED = 0x01 + OOB_NOT_AVAILABLE = 0x02 + AUTHENTICATION_REQUIREMENTS = 0x03 + CONFIRM_VALUE_FAILED = 0x04 + PAIRING_NOT_SUPPORTED = 0x05 + ENCRYPTION_KEY_SIZE = 0x06 + COMMAND_NOT_SUPPORTED = 0x07 + UNSPECIFIED_REASON = 0x08 + REPEATED_ATTEMPTS = 0x09 + INVALID_PARAMETERS = 0x0A + DHKEY_CHECK_FAILED = 0x0B + NUMERIC_COMPARISON_FAILED = 0x0C + BD_EDR_PAIRING_IN_PROGRESS = 0x0D + CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED = 0x0E + +SMP_PASSKEY_ENTRY_FAILED_ERROR = ErrorCode.PASSKEY_ENTRY_FAILED +SMP_OOB_NOT_AVAILABLE_ERROR = ErrorCode.OOB_NOT_AVAILABLE +SMP_AUTHENTICATION_REQUIREMENTS_ERROR = ErrorCode.AUTHENTICATION_REQUIREMENTS +SMP_CONFIRM_VALUE_FAILED_ERROR = ErrorCode.CONFIRM_VALUE_FAILED +SMP_PAIRING_NOT_SUPPORTED_ERROR = ErrorCode.PAIRING_NOT_SUPPORTED +SMP_ENCRYPTION_KEY_SIZE_ERROR = ErrorCode.ENCRYPTION_KEY_SIZE +SMP_COMMAND_NOT_SUPPORTED_ERROR = ErrorCode.COMMAND_NOT_SUPPORTED +SMP_UNSPECIFIED_REASON_ERROR = ErrorCode.UNSPECIFIED_REASON +SMP_REPEATED_ATTEMPTS_ERROR = ErrorCode.REPEATED_ATTEMPTS +SMP_INVALID_PARAMETERS_ERROR = ErrorCode.INVALID_PARAMETERS +SMP_DHKEY_CHECK_FAILED_ERROR = ErrorCode.DHKEY_CHECK_FAILED +SMP_NUMERIC_COMPARISON_FAILED_ERROR = ErrorCode.NUMERIC_COMPARISON_FAILED +SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR = ErrorCode.BD_EDR_PAIRING_IN_PROGRESS +SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR = ErrorCode.CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED + +class KeypressNotificationType(hci.SpecableEnum): + PASSKEY_ENTRY_STARTED = 0 + PASSKEY_DIGIT_ENTERED = 1 + PASSKEY_DIGIT_ERASED = 2 + PASSKEY_CLEARED = 3 + PASSKEY_ENTRY_COMPLETED = 4 # Bit flags for key distribution/generation -SMP_ENC_KEY_DISTRIBUTION_FLAG = 0b0001 -SMP_ID_KEY_DISTRIBUTION_FLAG = 0b0010 -SMP_SIGN_KEY_DISTRIBUTION_FLAG = 0b0100 -SMP_LINK_KEY_DISTRIBUTION_FLAG = 0b1000 +class KeyDistribution(hci.SpecableFlag): + ENC_KEY = 0b0001 + ID_KEY = 0b0010 + SIGN_KEY = 0b0100 + LINK_KEY = 0b1000 # AuthReq fields -SMP_BONDING_AUTHREQ = 0b00000001 -SMP_MITM_AUTHREQ = 0b00000100 -SMP_SC_AUTHREQ = 0b00001000 -SMP_KEYPRESS_AUTHREQ = 0b00010000 -SMP_CT2_AUTHREQ = 0b00100000 +class AuthReq(hci.SpecableFlag): + BONDING = 0b00000001 + MITM = 0b00000100 + SC = 0b00001000 + KEYPRESS = 0b00010000 + CT2 = 0b00100000 + + @classmethod + def from_booleans( + cls, + bonding: bool = False, + sc: bool = False, + mitm: bool = False, + keypress: bool = False, + ct2: bool = False, + ) -> AuthReq: + auth_req = AuthReq(0) + if bonding: + auth_req |= AuthReq.BONDING + if sc: + auth_req |= AuthReq.SC + if mitm: + auth_req |= AuthReq.MITM + if keypress: + auth_req |= AuthReq.KEYPRESS + if ct2: + auth_req |= AuthReq.CT2 + return auth_req # Crypto salt SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('000000000000000000000000746D7031') @@ -188,8 +186,6 @@ # ----------------------------------------------------------------------------- # Utils # ----------------------------------------------------------------------------- -def error_name(error_code: int) -> str: - return name_or_number(SMP_ERROR_NAMES, error_code) # ----------------------------------------------------------------------------- @@ -201,20 +197,20 @@ class SMP_Command: See Bluetooth spec @ Vol 3, Part H - 3 SECURITY MANAGER PROTOCOL ''' - smp_classes: ClassVar[dict[int, type[SMP_Command]]] = {} + smp_classes: ClassVar[dict[CommandCode, type[SMP_Command]]] = {} fields: ClassVar[Fields] - code: int = field(default=0, init=False) + code: CommandCode = field(default=CommandCode(0), init=False) name: str = field(default='', init=False) _payload: bytes | None = field(default=None, init=False) @classmethod def from_bytes(cls, pdu: bytes) -> SMP_Command: - code = pdu[0] + code = CommandCode(pdu[0]) subclass = SMP_Command.smp_classes.get(code) if subclass is None: instance = SMP_Command() - instance.name = SMP_Command.command_name(code) + instance.name = code.name instance.code = code instance.payload = pdu return instance @@ -222,59 +218,14 @@ def from_bytes(cls, pdu: bytes) -> SMP_Command: instance.payload = pdu[1:] return instance - @staticmethod - def command_name(code: int) -> str: - return name_or_number(SMP_COMMAND_NAMES, code) - - @staticmethod - def auth_req_str(value: int) -> str: - bonding_flags = value & 3 - mitm = (value >> 2) & 1 - sc = (value >> 3) & 1 - keypress = (value >> 4) & 1 - ct2 = (value >> 5) & 1 - - return ( - f'bonding_flags={bonding_flags}, ' - f'MITM={mitm}, sc={sc}, keypress={keypress}, ct2={ct2}' - ) - - @staticmethod - def io_capability_name(io_capability: int) -> str: - return name_or_number(SMP_IO_CAPABILITY_NAMES, io_capability) - - @staticmethod - def key_distribution_str(value: int) -> str: - key_types: list[str] = [] - if value & SMP_ENC_KEY_DISTRIBUTION_FLAG: - key_types.append('ENC') - if value & SMP_ID_KEY_DISTRIBUTION_FLAG: - key_types.append('ID') - if value & SMP_SIGN_KEY_DISTRIBUTION_FLAG: - key_types.append('SIGN') - if value & SMP_LINK_KEY_DISTRIBUTION_FLAG: - key_types.append('LINK') - return ','.join(key_types) - - @staticmethod - def keypress_notification_type_name(notification_type: int) -> str: - return name_or_number(SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES, notification_type) - _Command = TypeVar("_Command", bound="SMP_Command") @classmethod def subclass(cls, subclass: type[_Command]) -> type[_Command]: - subclass.name = subclass.__name__.upper() - subclass.code = key_with_value(SMP_COMMAND_NAMES, subclass.name) - if subclass.code is None: - raise KeyError( - f'Command name {subclass.name} not found in SMP_COMMAND_NAMES' - ) subclass.fields = HCI_Object.fields_from_dataclass(subclass) - + subclass.name = subclass.__name__.upper() # Register a factory for this class SMP_Command.smp_classes[subclass.code] = subclass - return subclass @property @@ -308,19 +259,17 @@ class SMP_Pairing_Request_Command(SMP_Command): See Bluetooth spec @ Vol 3, Part H - 3.5.1 Pairing Request ''' - io_capability: int = field( - metadata=metadata({'size': 1, 'mapper': SMP_Command.io_capability_name}) - ) + code = CommandCode.PAIRING_REQUEST + + io_capability: IoCapability = field(metadata=IoCapability.type_metadata(1)) oob_data_flag: int = field(metadata=metadata(1)) - auth_req: int = field( - metadata=metadata({'size': 1, 'mapper': SMP_Command.auth_req_str}) - ) + auth_req: AuthReq = field(metadata=AuthReq.type_metadata(1)) maximum_encryption_key_size: int = field(metadata=metadata(1)) - initiator_key_distribution: int = field( - metadata=metadata({'size': 1, 'mapper': SMP_Command.key_distribution_str}) + initiator_key_distribution: KeyDistribution = field( + metadata=KeyDistribution.type_metadata(1) ) - responder_key_distribution: int = field( - metadata=metadata({'size': 1, 'mapper': SMP_Command.key_distribution_str}) + responder_key_distribution: KeyDistribution = field( + metadata=KeyDistribution.type_metadata(1) ) @@ -332,19 +281,17 @@ class SMP_Pairing_Response_Command(SMP_Command): See Bluetooth spec @ Vol 3, Part H - 3.5.2 Pairing Response ''' - io_capability: int = field( - metadata=metadata({'size': 1, 'mapper': SMP_Command.io_capability_name}) - ) + code = CommandCode.PAIRING_RESPONSE + + io_capability: IoCapability = field(metadata=IoCapability.type_metadata(1)) oob_data_flag: int = field(metadata=metadata(1)) - auth_req: int = field( - metadata=metadata({'size': 1, 'mapper': SMP_Command.auth_req_str}) - ) + auth_req: AuthReq = field(metadata=AuthReq.type_metadata(1)) maximum_encryption_key_size: int = field(metadata=metadata(1)) - initiator_key_distribution: int = field( - metadata=metadata({'size': 1, 'mapper': SMP_Command.key_distribution_str}) + initiator_key_distribution: KeyDistribution = field( + metadata=KeyDistribution.type_metadata(1) ) - responder_key_distribution: int = field( - metadata=metadata({'size': 1, 'mapper': SMP_Command.key_distribution_str}) + responder_key_distribution: KeyDistribution = field( + metadata=KeyDistribution.type_metadata(1) ) @@ -356,6 +303,8 @@ class SMP_Pairing_Confirm_Command(SMP_Command): See Bluetooth spec @ Vol 3, Part H - 3.5.3 Pairing Confirm ''' + code = CommandCode.PAIRING_CONFIRM + confirm_value: bytes = field(metadata=metadata(16)) @@ -367,6 +316,8 @@ class SMP_Pairing_Random_Command(SMP_Command): See Bluetooth spec @ Vol 3, Part H - 3.5.4 Pairing Random ''' + code = CommandCode.PAIRING_RANDOM + random_value: bytes = field(metadata=metadata(16)) @@ -378,7 +329,9 @@ class SMP_Pairing_Failed_Command(SMP_Command): See Bluetooth spec @ Vol 3, Part H - 3.5.5 Pairing Failed ''' - reason: int = field(metadata=metadata({'size': 1, 'mapper': error_name})) + code = CommandCode.PAIRING_FAILED + + reason: ErrorCode = field(metadata=ErrorCode.type_metadata(1)) # ----------------------------------------------------------------------------- @@ -389,6 +342,8 @@ class SMP_Pairing_Public_Key_Command(SMP_Command): See Bluetooth spec @ Vol 3, Part H - 3.5.6 Pairing Public Key ''' + code = CommandCode.PAIRING_PUBLIC_KEY + public_key_x: bytes = field(metadata=metadata(32)) public_key_y: bytes = field(metadata=metadata(32)) @@ -401,6 +356,8 @@ class SMP_Pairing_DHKey_Check_Command(SMP_Command): See Bluetooth spec @ Vol 3, Part H - 3.5.7 Pairing DHKey Check ''' + code = CommandCode.PAIRING_DHKEY_CHECK + dhkey_check: bytes = field(metadata=metadata(16)) @@ -412,10 +369,10 @@ class SMP_Pairing_Keypress_Notification_Command(SMP_Command): See Bluetooth spec @ Vol 3, Part H - 3.5.8 Keypress Notification ''' - notification_type: int = field( - metadata=metadata( - {'size': 1, 'mapper': SMP_Command.keypress_notification_type_name} - ) + code = CommandCode.PAIRING_KEYPRESS_NOTIFICATION + + notification_type: KeypressNotificationType = field( + metadata=KeypressNotificationType.type_metadata(1) ) @@ -427,6 +384,8 @@ class SMP_Encryption_Information_Command(SMP_Command): See Bluetooth spec @ Vol 3, Part H - 3.6.2 Encryption Information ''' + code = CommandCode.ENCRYPTION_INFORMATION + long_term_key: bytes = field(metadata=metadata(16)) @@ -438,6 +397,8 @@ class SMP_Master_Identification_Command(SMP_Command): See Bluetooth spec @ Vol 3, Part H - 3.6.3 Master Identification ''' + code = CommandCode.MASTER_IDENTIFICATION + ediv: int = field(metadata=metadata(2)) rand: bytes = field(metadata=metadata(8)) @@ -450,6 +411,8 @@ class SMP_Identity_Information_Command(SMP_Command): See Bluetooth spec @ Vol 3, Part H - 3.6.4 Identity Information ''' + code = CommandCode.IDENTITY_INFORMATION + identity_resolving_key: bytes = field(metadata=metadata(16)) @@ -461,6 +424,8 @@ class SMP_Identity_Address_Information_Command(SMP_Command): See Bluetooth spec @ Vol 3, Part H - 3.6.5 Identity Address Information ''' + code = CommandCode.IDENTITY_ADDRESS_INFORMATION + addr_type: int = field(metadata=metadata(Address.ADDRESS_TYPE_SPEC)) bd_addr: Address = field(metadata=metadata(Address.parse_address_preceded_by_type)) @@ -473,6 +438,8 @@ class SMP_Signing_Information_Command(SMP_Command): See Bluetooth spec @ Vol 3, Part H - 3.6.6 Signing Information ''' + code = CommandCode.SIGNING_INFORMATION + signature_key: bytes = field(metadata=metadata(16)) @@ -484,25 +451,9 @@ class SMP_Security_Request_Command(SMP_Command): See Bluetooth spec @ Vol 3, Part H - 3.6.7 Security Request ''' - auth_req: int = field( - metadata=metadata({'size': 1, 'mapper': SMP_Command.auth_req_str}) - ) + code = CommandCode.SECURITY_REQUEST - -# ----------------------------------------------------------------------------- -def smp_auth_req(bonding: bool, mitm: bool, sc: bool, keypress: bool, ct2: bool) -> int: - value = 0 - if bonding: - value |= SMP_BONDING_AUTHREQ - if mitm: - value |= SMP_MITM_AUTHREQ - if sc: - value |= SMP_SC_AUTHREQ - if keypress: - value |= SMP_KEYPRESS_AUTHREQ - if ct2: - value |= SMP_CT2_AUTHREQ - return value + auth_req: AuthReq = field(metadata=AuthReq.type_metadata(1)) # ----------------------------------------------------------------------------- @@ -676,8 +627,8 @@ def __init__( self.ltk_rand = bytes(8) self.link_key: bytes | None = None self.maximum_encryption_key_size: int = 0 - self.initiator_key_distribution: int = 0 - self.responder_key_distribution: int = 0 + self.initiator_key_distribution: KeyDistribution = KeyDistribution(0) + self.responder_key_distribution: KeyDistribution = KeyDistribution(0) self.peer_random_value: bytes | None = None self.peer_public_key_x: bytes = bytes(32) self.peer_public_key_y = bytes(32) @@ -728,10 +679,10 @@ def __init__( ) # Key Distribution (default values before negotiation) - self.initiator_key_distribution = ( + self.initiator_key_distribution = KeyDistribution( pairing_config.delegate.local_initiator_key_distribution ) - self.responder_key_distribution = ( + self.responder_key_distribution = KeyDistribution( pairing_config.delegate.local_responder_key_distribution ) @@ -743,7 +694,7 @@ def __init__( self.ct2: bool = False # I/O Capabilities - self.io_capability = pairing_config.delegate.io_capability + self.io_capability = IoCapability(pairing_config.delegate.io_capability) self.peer_io_capability = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY # OOB @@ -822,8 +773,14 @@ def nb(self) -> bytes: return self.nx[0 if self.is_responder else 1] @property - def auth_req(self) -> int: - return smp_auth_req(self.bonding, self.mitm, self.sc, self.keypress, self.ct2) + def auth_req(self) -> AuthReq: + return AuthReq.from_booleans( + bonding=self.bonding, + sc=self.sc, + mitm=self.mitm, + keypress=self.keypress, + ct2=self.ct2, + ) def get_long_term_key(self, rand: bytes, ediv: int) -> bytes | None: if not self.sc and not self.completed: @@ -843,7 +800,7 @@ def decide_pairing_method( if self.connection.transport == PhysicalTransport.BR_EDR: self.pairing_method = PairingMethod.CTKD_OVER_CLASSIC return - if (not self.mitm) and (auth_req & SMP_MITM_AUTHREQ == 0): + if (not self.mitm) and (auth_req & AuthReq.MITM == 0): self.pairing_method = PairingMethod.JUST_WORKS return @@ -861,7 +818,7 @@ def decide_pairing_method( self.passkey_display = details[1 if self.is_initiator else 2] def check_expected_value( - self, expected: bytes, received: bytes, error: int + self, expected: bytes, received: bytes, error: ErrorCode ) -> bool: logger.debug(f'expected={expected.hex()} got={received.hex()}') if expected != received: @@ -881,7 +838,7 @@ async def prompt() -> None: except Exception: logger.exception('exception while confirm') - self.send_pairing_failed(SMP_CONFIRM_VALUE_FAILED_ERROR) + self.send_pairing_failed(ErrorCode.CONFIRM_VALUE_FAILED) self.connection.cancel_on_disconnection(prompt()) @@ -900,7 +857,7 @@ async def prompt() -> None: except Exception: logger.exception('exception while prompting') - self.send_pairing_failed(SMP_CONFIRM_VALUE_FAILED_ERROR) + self.send_pairing_failed(ErrorCode.CONFIRM_VALUE_FAILED) self.connection.cancel_on_disconnection(prompt()) @@ -911,13 +868,13 @@ async def prompt() -> None: passkey = await self.pairing_config.delegate.get_number() if passkey is None: logger.debug('Passkey request rejected') - self.send_pairing_failed(SMP_PASSKEY_ENTRY_FAILED_ERROR) + self.send_pairing_failed(ErrorCode.PASSKEY_ENTRY_FAILED) return logger.debug(f'user input: {passkey}') next_steps(passkey) except Exception: logger.exception('exception while prompting') - self.send_pairing_failed(SMP_PASSKEY_ENTRY_FAILED_ERROR) + self.send_pairing_failed(ErrorCode.PASSKEY_ENTRY_FAILED) self.connection.cancel_on_disconnection(prompt()) @@ -972,7 +929,7 @@ async def display_passkey(): def send_command(self, command: SMP_Command) -> None: self.manager.send_command(self.connection, command) - def send_pairing_failed(self, error: int) -> None: + def send_pairing_failed(self, error: ErrorCode) -> None: self.send_command(SMP_Pairing_Failed_Command(reason=error)) self.on_pairing_failure(error) @@ -1144,7 +1101,7 @@ async def get_link_key_and_derive_ltk(self) -> None: 'Try to derive LTK but host does not have the LK. Send a SMP_PAIRING_FAILED but the procedure will not be paused!' ) self.send_pairing_failed( - SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR + ErrorCode.CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED ) else: self.ltk = self.derive_ltk(self.link_key, self.ct2) @@ -1155,14 +1112,14 @@ def distribute_keys(self) -> None: # CTKD: Derive LTK from LinkKey if ( self.connection.transport == PhysicalTransport.BR_EDR - and self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG + and self.initiator_key_distribution & KeyDistribution.ENC_KEY ): self.ctkd_task = self.connection.cancel_on_disconnection( self.get_link_key_and_derive_ltk() ) elif not self.sc: # Distribute the LTK, EDIV and RAND - if self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG: + if self.initiator_key_distribution & KeyDistribution.ENC_KEY: self.send_command( SMP_Encryption_Information_Command(long_term_key=self.ltk) ) @@ -1173,7 +1130,7 @@ def distribute_keys(self) -> None: ) # Distribute IRK & BD ADDR - if self.initiator_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG: + if self.initiator_key_distribution & KeyDistribution.ID_KEY: self.send_command( SMP_Identity_Information_Command( identity_resolving_key=self.manager.device.irk @@ -1183,25 +1140,25 @@ def distribute_keys(self) -> None: # Distribute CSRK csrk = bytes(16) # FIXME: testing - if self.initiator_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG: + if self.initiator_key_distribution & KeyDistribution.SIGN_KEY: self.send_command(SMP_Signing_Information_Command(signature_key=csrk)) # CTKD, calculate BR/EDR link key - if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: + if self.initiator_key_distribution & KeyDistribution.LINK_KEY: self.link_key = self.derive_link_key(self.ltk, self.ct2) else: # CTKD: Derive LTK from LinkKey if ( self.connection.transport == PhysicalTransport.BR_EDR - and self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG + and self.responder_key_distribution & KeyDistribution.ENC_KEY ): self.ctkd_task = self.connection.cancel_on_disconnection( self.get_link_key_and_derive_ltk() ) # Distribute the LTK, EDIV and RAND elif not self.sc: - if self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG: + if self.responder_key_distribution & KeyDistribution.ENC_KEY: self.send_command( SMP_Encryption_Information_Command(long_term_key=self.ltk) ) @@ -1212,7 +1169,7 @@ def distribute_keys(self) -> None: ) # Distribute IRK & BD ADDR - if self.responder_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG: + if self.responder_key_distribution & KeyDistribution.ID_KEY: self.send_command( SMP_Identity_Information_Command( identity_resolving_key=self.manager.device.irk @@ -1222,30 +1179,30 @@ def distribute_keys(self) -> None: # Distribute CSRK csrk = bytes(16) # FIXME: testing - if self.responder_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG: + if self.responder_key_distribution & KeyDistribution.SIGN_KEY: self.send_command(SMP_Signing_Information_Command(signature_key=csrk)) # CTKD, calculate BR/EDR link key - if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: + if self.responder_key_distribution & KeyDistribution.LINK_KEY: self.link_key = self.derive_link_key(self.ltk, self.ct2) def compute_peer_expected_distributions(self, key_distribution_flags: int) -> None: # Set our expectations for what to wait for in the key distribution phase self.peer_expected_distributions = [] if not self.sc and self.connection.transport == PhysicalTransport.LE: - if key_distribution_flags & SMP_ENC_KEY_DISTRIBUTION_FLAG != 0: + if key_distribution_flags & KeyDistribution.ENC_KEY != 0: self.peer_expected_distributions.append( SMP_Encryption_Information_Command ) self.peer_expected_distributions.append( SMP_Master_Identification_Command ) - if key_distribution_flags & SMP_ID_KEY_DISTRIBUTION_FLAG != 0: + if key_distribution_flags & KeyDistribution.ID_KEY != 0: self.peer_expected_distributions.append(SMP_Identity_Information_Command) self.peer_expected_distributions.append( SMP_Identity_Address_Information_Command ) - if key_distribution_flags & SMP_SIGN_KEY_DISTRIBUTION_FLAG != 0: + if key_distribution_flags & KeyDistribution.SIGN_KEY != 0: self.peer_expected_distributions.append(SMP_Signing_Information_Command) logger.debug( 'expecting distributions: ' @@ -1258,7 +1215,7 @@ def check_key_distribution(self, command_class: type[SMP_Command]) -> None: logger.warning( color('received key distribution on a non-encrypted connection', 'red') ) - self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR) + self.send_pairing_failed(ErrorCode.UNSPECIFIED_REASON) return # Check that this command class is expected @@ -1278,7 +1235,7 @@ def check_key_distribution(self, command_class: type[SMP_Command]) -> None: 'red', ) ) - self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR) + self.send_pairing_failed(ErrorCode.UNSPECIFIED_REASON) async def pair(self) -> None: # Start pairing as an initiator @@ -1389,34 +1346,56 @@ async def on_pairing(self) -> None: ) await self.manager.on_pairing(self, peer_address, keys) - def on_pairing_failure(self, reason: int) -> None: - logger.warning(f'pairing failure ({error_name(reason)})') + def on_pairing_failure(self, reason: ErrorCode) -> None: + logger.warning('pairing failure (%s)', reason.name) if self.completed: return self.completed = True - error = ProtocolError(reason, 'smp', error_name(reason)) + error = ProtocolError(reason, 'smp', reason.name) if self.pairing_result is not None and not self.pairing_result.done(): self.pairing_result.set_exception(error) self.manager.on_pairing_failure(self, reason) def on_smp_command(self, command: SMP_Command) -> None: - # Find the handler method - handler_name = f'on_{command.name.lower()}' - handler = getattr(self, handler_name, None) - if handler is not None: - try: - handler(command) - except Exception: - logger.exception(color("!!! Exception in handler:", "red")) - response = SMP_Pairing_Failed_Command( - reason=SMP_UNSPECIFIED_REASON_ERROR - ) - self.send_command(response) - else: - logger.error(color('SMP command not handled???', 'red')) + try: + match command: + case SMP_Pairing_Request_Command(): + self.on_smp_pairing_request_command(command) + case SMP_Pairing_Response_Command(): + self.on_smp_pairing_response_command(command) + case SMP_Pairing_Confirm_Command(): + self.on_smp_pairing_confirm_command(command) + case SMP_Pairing_Random_Command(): + self.on_smp_pairing_random_command(command) + case SMP_Pairing_Failed_Command(): + self.on_smp_pairing_failed_command(command) + case SMP_Encryption_Information_Command(): + self.on_smp_encryption_information_command(command) + case SMP_Master_Identification_Command(): + self.on_smp_master_identification_command(command) + case SMP_Identity_Information_Command(): + self.on_smp_identity_information_command(command) + case SMP_Identity_Address_Information_Command(): + self.on_smp_identity_address_information_command(command) + case SMP_Signing_Information_Command(): + self.on_smp_signing_information_command(command) + case SMP_Pairing_Public_Key_Command(): + self.on_smp_pairing_public_key_command(command) + case SMP_Pairing_DHKey_Check_Command(): + self.on_smp_pairing_dhkey_check_command(command) + # case SMP_Security_Request_Command(): + # self.on_smp_security_request_command(command) + # case SMP_Pairing_Keypress_Notification_Command(): + # self.on_smp_pairing_keypress_notification_command(command) + case _: + logger.error(color('SMP command not handled', 'red')) + except Exception: + logger.exception(color("!!! Exception in handler:", "red")) + response = SMP_Pairing_Failed_Command(reason=ErrorCode.UNSPECIFIED_REASON) + self.send_command(response) def on_smp_pairing_request_command( self, command: SMP_Pairing_Request_Command @@ -1436,16 +1415,16 @@ async def on_smp_pairing_request_command_async( accepted = False if not accepted: logger.debug('pairing rejected by delegate') - self.send_pairing_failed(SMP_PAIRING_NOT_SUPPORTED_ERROR) + self.send_pairing_failed(ErrorCode.PAIRING_NOT_SUPPORTED) return # Save the request self.preq = bytes(command) # Bonding and SC require both sides to request/support it - self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0) - self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0) - self.ct2 = self.ct2 and (command.auth_req & SMP_CT2_AUTHREQ != 0) + self.bonding = self.bonding and (command.auth_req & AuthReq.BONDING != 0) + self.sc = self.sc and (command.auth_req & AuthReq.SC != 0) + self.ct2 = self.ct2 and (command.auth_req & AuthReq.CT2 != 0) # Infer the pairing method if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or ( @@ -1456,7 +1435,7 @@ async def on_smp_pairing_request_command_async( if not self.sc and self.tk is None: # For legacy OOB, TK is required. logger.warning("legacy OOB without TK") - self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR) + self.send_pairing_failed(ErrorCode.OOB_NOT_AVAILABLE) return if command.oob_data_flag == 0: # The peer doesn't have OOB data, use r=0 @@ -1475,8 +1454,11 @@ async def on_smp_pairing_request_command_async( ( self.initiator_key_distribution, self.responder_key_distribution, - ) = await self.pairing_config.delegate.key_distribution_response( - command.initiator_key_distribution, command.responder_key_distribution + ) = map( + KeyDistribution, + await self.pairing_config.delegate.key_distribution_response( + command.initiator_key_distribution, command.responder_key_distribution + ), ) self.compute_peer_expected_distributions(self.initiator_key_distribution) @@ -1514,8 +1496,8 @@ def on_smp_pairing_response_command( self.peer_io_capability = command.io_capability # Bonding and SC require both sides to request/support it - self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0) - self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0) + self.bonding = self.bonding and (command.auth_req & AuthReq.BONDING != 0) + self.sc = self.sc and (command.auth_req & AuthReq.SC != 0) # Infer the pairing method if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or ( @@ -1526,7 +1508,7 @@ def on_smp_pairing_response_command( if not self.sc and self.tk is None: # For legacy OOB, TK is required. logger.warning("legacy OOB without TK") - self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR) + self.send_pairing_failed(ErrorCode.OOB_NOT_AVAILABLE) return if command.oob_data_flag == 0: # The peer doesn't have OOB data, use r=0 @@ -1546,7 +1528,7 @@ def on_smp_pairing_response_command( command.responder_key_distribution & ~self.responder_key_distribution != 0 ): # The response isn't a subset of the request - self.send_pairing_failed(SMP_INVALID_PARAMETERS_ERROR) + self.send_pairing_failed(ErrorCode.INVALID_PARAMETERS) return self.initiator_key_distribution = command.initiator_key_distribution self.responder_key_distribution = command.responder_key_distribution @@ -1624,7 +1606,7 @@ def on_smp_pairing_random_command_legacy( ) assert self.confirm_value if not self.check_expected_value( - self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR + self.confirm_value, confirm_verifier, ErrorCode.CONFIRM_VALUE_FAILED ): return @@ -1665,7 +1647,7 @@ def on_smp_pairing_random_command_secure_connections( self.pkb, self.pka, command.random_value, bytes([0]) ) if not self.check_expected_value( - self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR + self.confirm_value, confirm_verifier, ErrorCode.CONFIRM_VALUE_FAILED ): return elif self.pairing_method == PairingMethod.PASSKEY: @@ -1678,7 +1660,7 @@ def on_smp_pairing_random_command_secure_connections( bytes([0x80 + ((self.passkey >> self.passkey_step) & 1)]), ) if not self.check_expected_value( - self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR + self.confirm_value, confirm_verifier, ErrorCode.CONFIRM_VALUE_FAILED ): return @@ -1707,7 +1689,7 @@ def on_smp_pairing_random_command_secure_connections( bytes([0x80 + ((self.passkey >> self.passkey_step) & 1)]), ) if not self.check_expected_value( - self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR + self.confirm_value, confirm_verifier, ErrorCode.CONFIRM_VALUE_FAILED ): return @@ -1824,7 +1806,7 @@ def on_smp_pairing_public_key_command( if not self.check_expected_value( self.peer_oob_data.c, confirm_verifier, - SMP_CONFIRM_VALUE_FAILED_ERROR, + ErrorCode.CONFIRM_VALUE_FAILED, ): return @@ -1858,7 +1840,7 @@ def on_smp_pairing_dhkey_check_command( expected = self.eb if self.is_initiator else self.ea assert expected if not self.check_expected_value( - expected, command.dhkey_check, SMP_DHKEY_CHECK_FAILED_ERROR + expected, command.dhkey_check, ErrorCode.DHKEY_CHECK_FAILED ): return @@ -1962,7 +1944,7 @@ def on_smp_pdu(self, connection: Connection, pdu: bytes) -> None: ) # Security request is more than just pairing, so let applications handle them - if command.code == SMP_SECURITY_REQUEST_COMMAND: + if command.code == CommandCode.SECURITY_REQUEST: self.on_smp_security_request_command( connection, cast(SMP_Security_Request_Command, command) ) @@ -2002,15 +1984,13 @@ async def pair(self, connection: Connection) -> None: def request_pairing(self, connection: Connection) -> None: pairing_config = self.pairing_config_factory(connection) if pairing_config: - auth_req = smp_auth_req( - pairing_config.bonding, - pairing_config.mitm, - pairing_config.sc, - False, - False, + auth_req = AuthReq.from_booleans( + bonding=pairing_config.bonding, + sc=pairing_config.sc, + mitm=pairing_config.mitm, ) else: - auth_req = 0 + auth_req = AuthReq(0) self.send_command(connection, SMP_Security_Request_Command(auth_req=auth_req)) def on_session_start(self, session: Session) -> None: @@ -2026,7 +2006,7 @@ async def on_pairing( # Notify the device self.device.on_pairing(session.connection, identity_address, keys, session.sc) - def on_pairing_failure(self, session: Session, reason: int) -> None: + def on_pairing_failure(self, session: Session, reason: ErrorCode) -> None: self.device.on_pairing_failure(session.connection, reason) def on_session_end(self, session: Session) -> None: diff --git a/tests/self_test.py b/tests/self_test.py index 17acfdb2..069c9a6a 100644 --- a/tests/self_test.py +++ b/tests/self_test.py @@ -29,8 +29,7 @@ from bumble.hci import Role from bumble.pairing import PairingConfig, PairingDelegate from bumble.smp import ( - SMP_CONFIRM_VALUE_FAILED_ERROR, - SMP_PAIRING_NOT_SUPPORTED_ERROR, + ErrorCode, OobContext, OobLegacyContext, ) @@ -378,7 +377,7 @@ async def accept(self): await _test_self_smp_with_configs(None, rejecting_pairing_config) paired = True except ProtocolError as error: - assert error.error_code == SMP_PAIRING_NOT_SUPPORTED_ERROR + assert error.error_code == ErrorCode.PAIRING_NOT_SUPPORTED assert not paired @@ -403,7 +402,7 @@ async def compare_numbers(self, number, digits): ) paired = True except ProtocolError as error: - assert error.error_code == SMP_CONFIRM_VALUE_FAILED_ERROR + assert error.error_code == ErrorCode.CONFIRM_VALUE_FAILED assert not paired @@ -534,11 +533,11 @@ async def test_self_smp_oob_sc(): with pytest.raises(ProtocolError) as error: await _test_self_smp_with_configs(pairing_config_1, pairing_config_4) - assert error.value.error_code == SMP_CONFIRM_VALUE_FAILED_ERROR + assert error.value.error_code == ErrorCode.CONFIRM_VALUE_FAILED with pytest.raises(ProtocolError): await _test_self_smp_with_configs(pairing_config_4, pairing_config_1) - assert error.value.error_code == SMP_CONFIRM_VALUE_FAILED_ERROR + assert error.value.error_code == ErrorCode.CONFIRM_VALUE_FAILED # -----------------------------------------------------------------------------