diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 00000000..23cc6e70 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,58 @@ +name: Tests + +on: + push: + branches: [master, develop] + pull_request: + branches: [master, develop] + +jobs: + test-python: + name: Python ${{ matrix.python-version }} + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.9", "3.10", "3.11", "3.12"] + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install package + run: pip install -e . + + - name: Run tests + run: | + cd tests + python run_tests.py + + test-micropython: + name: MicroPython (Unix port) + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.11" + + - name: Install MicroPython Unix port + run: | + sudo apt-get update + sudo apt-get install -y build-essential libffi-dev pkg-config + git clone --depth 1 https://github.com/micropython/micropython.git /tmp/micropython + cd /tmp/micropython/ports/unix + make submodules + make + + - name: Run tests on MicroPython + run: | + cd tests + /tmp/micropython/ports/unix/build-standard/micropython run_tests.py + continue-on-error: true # MicroPython may lack some deps diff --git a/pyproject.toml b/pyproject.toml index fb8fcfe1..b37b810d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,16 @@ +[tool.poetry] +name = "embit" +version = "0.8.0" +description="A minimal bitcoin library for MicroPython and Python3 with a focus on embedded systems." +license="MIT" +authors= ["Stepan Snigirev "] + +[tool.poetry.urls] +repository = "https://github.com/diybitcoinhardware/embit" + +[tool.poetry.dependencies] +python = "^3.0" + [build-system] requires = ["setuptools>=42.0", "wheel"] build-backend = "setuptools.build_meta" diff --git a/src/embit/bip39.py b/src/embit/bip39.py index 418f8b39..18e55f40 100644 --- a/src/embit/bip39.py +++ b/src/embit/bip39.py @@ -10,8 +10,8 @@ def mnemonic_to_bytes(mnemonic: str, ignore_checksum: bool = False, wordlist=WORDLIST): # this function is copied from Jimmy Song's HDPrivateKey.from_mnemonic() method - words = mnemonic.strip().split() - if len(words) % 3 != 0 or len(words) < 12: + words = mnemonic.split(" ") + if len(words) % 3 != 0 or not 12 <= len(words) <= 24: raise ValueError("Invalid recovery phrase") binary_seed = bytearray() @@ -68,7 +68,7 @@ def mnemonic_is_valid(mnemonic: str, wordlist=WORDLIST): try: mnemonic_to_bytes(mnemonic, wordlist=wordlist) return True - except Exception as e: + except Exception: return False @@ -97,7 +97,7 @@ def _extract_index(bits, b, n): def mnemonic_from_bytes(entropy, wordlist=WORDLIST): - if len(entropy) % 4 != 0: + if len(entropy) % 4 != 0 or not 16 <= len(entropy) <= 32: raise ValueError("Byte array should be multiple of 4 long (16, 20, ..., 32)") total_bits = len(entropy) * 8 checksum_bits = total_bits // 32 diff --git a/src/embit/bip85.py b/src/embit/bip85.py index 5867abd7..97013e5e 100644 --- a/src/embit/bip85.py +++ b/src/embit/bip85.py @@ -17,7 +17,8 @@ def derive_entropy(root, app_index, path): """ Derive app-specific bip85 entropy using path m/83696968'/app_index'/...path' """ - assert max(path) < HARDENED_INDEX + if max(path) >= HARDENED_INDEX: + raise ValueError("Path elements must be less than 2^31") derivation = [HARDENED_INDEX + 83696968, HARDENED_INDEX + app_index] + [ p + HARDENED_INDEX for p in path ] @@ -27,7 +28,8 @@ def derive_entropy(root, app_index, path): def derive_mnemonic(root, num_words=12, index=0, language=LANGUAGES.ENGLISH): """Derive a new mnemonic with num_words using language (code, wordlist)""" - assert num_words in [12, 18, 24] + if num_words not in [12, 18, 24]: + raise ValueError("Number of words must be 12, 18 or 24") langcode, wordlist = language path = [langcode, num_words, index] entropy = derive_entropy(root, 39, path) @@ -49,7 +51,9 @@ def derive_xprv(root, index=0): def derive_hex(root, num_bytes=32, index=0): """Derive raw entropy from 16 to 64 bytes long""" - assert num_bytes <= 64 - assert num_bytes >= 16 + if num_bytes > 64: + raise ValueError("Number of bytes must not exceed 64") + if num_bytes < 16: + raise ValueError("Number of bytes must be at least 16") entropy = derive_entropy(root, 128169, [num_bytes, index]) return entropy[:num_bytes] diff --git a/src/embit/descriptor/arguments.py b/src/embit/descriptor/arguments.py index 3f92500a..0c8d4d52 100644 --- a/src/embit/descriptor/arguments.py +++ b/src/embit/descriptor/arguments.py @@ -15,7 +15,8 @@ def __init__(self, fingerprint: bytes, derivation: list): def from_string(cls, s: str): arr = s.split("/") mfp = unhexlify(arr[0]) - assert len(mfp) == 4 + if len(mfp) != 4: + raise ArgumentError("Invalid fingerprint length") arr[0] = "m" path = "/".join(arr) derivation = bip32.parse_path(path) @@ -315,7 +316,8 @@ def xonly(self): return self.key.xonly() def taproot_tweak(self, h=b""): - assert self.taproot + if not self.taproot: + raise ArgumentError("Key is not taproot") return self.key.taproot_tweak(h) def serialize(self): diff --git a/src/embit/descriptor/descriptor.py b/src/embit/descriptor/descriptor.py index 8bc25137..fd3c3057 100644 --- a/src/embit/descriptor/descriptor.py +++ b/src/embit/descriptor/descriptor.py @@ -59,6 +59,9 @@ def script_len(self): @property def num_branches(self): + if self.miniscript is not None: + return max({k.num_branches for k in self.miniscript.keys}) + return max([k.num_branches for k in self.keys]) def branch(self, branch_index=None): @@ -292,7 +295,7 @@ def from_string(cls, desc): @classmethod def read_from(cls, s): # starts with sh(wsh()), sh() or wsh() - start = s.read(7) + start = s.read(8) sh = False wsh = False wpkh = False @@ -301,30 +304,30 @@ def read_from(cls, s): taptree = TapTree() if start.startswith(b"tr("): taproot = True - s.seek(-4, 1) + s.seek(-5, 1) elif start.startswith(b"sh(wsh("): sh = True wsh = True + s.seek(-1, 1) elif start.startswith(b"wsh("): sh = False wsh = True - s.seek(-3, 1) - elif start.startswith(b"sh(wpkh"): + s.seek(-4, 1) + elif start.startswith(b"sh(wpkh("): is_miniscript = False sh = True wpkh = True - assert s.read(1) == b"(" elif start.startswith(b"wpkh("): is_miniscript = False wpkh = True - s.seek(-2, 1) + s.seek(-3, 1) elif start.startswith(b"pkh("): is_miniscript = False - s.seek(-3, 1) + s.seek(-4, 1) elif start.startswith(b"sh("): sh = True wsh = False - s.seek(-4, 1) + s.seek(-5, 1) else: raise ValueError("Invalid descriptor (starts with '%s')" % start.decode()) # taproot always has a key, and may have taptree miniscript diff --git a/src/embit/descriptor/miniscript.py b/src/embit/descriptor/miniscript.py index 990e3e56..9783afc1 100644 --- a/src/embit/descriptor/miniscript.py +++ b/src/embit/descriptor/miniscript.py @@ -54,11 +54,25 @@ def type(self): @classmethod def read_from(cls, s, taproot=False): - op, char = read_until(s, b"(") + def wrapped(m_script): + for w in reversed(wrappers): + if w not in WRAPPER_NAMES: + raise MiniscriptError("Unknown wrapper") + WrapperCls = WRAPPERS[WRAPPER_NAMES.index(w)] + m_script = WrapperCls(m_script, taproot=taproot) + return m_script + + op, char = read_until(s, b"(,)") + if char in (b",", b")"): + s.seek(-1, 1) op = op.decode() wrappers = "" if ":" in op: wrappers, op = op.split(":") + # handle boolean literals: 0 or 1 + if op in ("0", "1"): + miniscript = JustOne() if op == "1" else JustZero() + return wrapped(miniscript) if char != b"(": raise MiniscriptError("Missing operator") if op not in OPERATOR_NAMES: @@ -67,12 +81,7 @@ def read_from(cls, s, taproot=False): MiniscriptCls = OPERATORS[OPERATOR_NAMES.index(op)] args = MiniscriptCls.read_arguments(s, taproot=taproot) miniscript = MiniscriptCls(*args, taproot=taproot) - for w in reversed(wrappers): - if w not in WRAPPER_NAMES: - raise MiniscriptError("Unknown wrapper") - WrapperCls = WRAPPERS[WRAPPER_NAMES.index(w)] - miniscript = WrapperCls(miniscript, taproot=taproot) - return miniscript + return wrapped(miniscript) @classmethod def read_arguments(cls, s, taproot=False): @@ -119,6 +128,28 @@ def len_args(self): ########### Known fragments (miniscript operators) ############## +class JustZero(Miniscript): + TYPE = "B" + PROPS = "zud" + + def inner_compile(self): + return Number(0).compile() + + def __str__(self): + return "0" + + +class JustOne(Miniscript): + TYPE = "B" + PROPS = "zu" + + def inner_compile(self): + return Number(1).compile() + + def __str__(self): + return "1" + + class OneArg(Miniscript): NARGS = 1 @@ -870,6 +901,11 @@ def inner_compile(self): def __len__(self): return len(self.arg) + 1 + + def verify(self): + super().verify() + if self.arg.type != "V": + raise MiniscriptError("t: X must be of type V") @property def properties(self): diff --git a/src/embit/ec.py b/src/embit/ec.py index a93fc714..b36d861d 100644 --- a/src/embit/ec.py +++ b/src/embit/ec.py @@ -26,7 +26,8 @@ def read_from(cls, stream): class SchnorrSig(EmbitBase): def __init__(self, sig): - assert len(sig) == 64 + if len(sig) != 64: + raise ECError("Invalid schnorr signature") self._sig = sig def write_to(self, stream) -> int: @@ -93,7 +94,8 @@ def _xonly(self): @classmethod def from_xonly(cls, data: bytes): - assert len(data) == 32 + if len(data) != 32: + raise ECError("Invalid xonly pubkey") return cls.parse(b"\x02" + data) def schnorr_verify(self, sig, msg_hash) -> bool: diff --git a/src/embit/liquid/pset.py b/src/embit/liquid/pset.py index 3a2c8b85..290c59ac 100644 --- a/src/embit/liquid/pset.py +++ b/src/embit/liquid/pset.py @@ -96,9 +96,11 @@ def unblind(self, blinding_key): return # verify gen = secp256k1.generator_generate_blinded(asset, in_abf) - assert gen == secp256k1.generator_parse(self.utxo.asset) + if gen != secp256k1.generator_parse(self.utxo.asset): + raise PSBTError("Invalid asset commitment") cmt = secp256k1.pedersen_commit(vbf, value, gen) - assert cmt == secp256k1.pedersen_commitment_parse(self.utxo.value) + if cmt != secp256k1.pedersen_commitment_parse(self.utxo.value): + raise PSBTError("Invalid value commitment") self.asset = asset self.value = value @@ -506,7 +508,8 @@ def unblind(self, blinding_key): inp.unblind(blinding_key) def txseed(self, seed: bytes): - assert len(seed) == 32 + if len(seed) != 32: + raise PSBTError("Seed should be 32 bytes") # get unique seed for this tx: # we use seed + txid:vout + scriptpubkey as unique data for tagged hash data = b"".join( diff --git a/src/embit/liquid/psetview.py b/src/embit/liquid/psetview.py index dbedbbd2..70dda48e 100644 --- a/src/embit/liquid/psetview.py +++ b/src/embit/liquid/psetview.py @@ -5,7 +5,8 @@ def skip_commitment(stream): c = stream.read(1) - assert len(c) == 1 + if len(c) != 1: + raise PSBTError("Unexpected end of stream") if c == b"\x00": # None return 1 if c == b"\x01": # unconfidential diff --git a/src/embit/liquid/transaction.py b/src/embit/liquid/transaction.py index b64dd028..ca7af0a5 100644 --- a/src/embit/liquid/transaction.py +++ b/src/embit/liquid/transaction.py @@ -40,16 +40,19 @@ class LTransactionError(TransactionError): def read_commitment(stream): c = stream.read(1) - assert len(c) == 1 + if len(c) != 1: + raise TransactionError("Invalid commitment") if c == b"\x00": # None return None if c == b"\x01": # unconfidential r = stream.read(8) - assert len(r) == 8 + if len(r) != 8: + raise TransactionError("Invalid commitment") return int.from_bytes(r, "big") # confidential r = stream.read(32) - assert len(r) == 32 + if len(r) != 32: + raise TransactionError("Invalid commitment") return c + r @@ -71,10 +74,14 @@ def unblind( message_length=64, ) -> tuple: """Unblinds a range proof and returns value, asset, value blinding factor, asset blinding factor, extra data, min and max values""" - assert len(pubkey) in [33, 65] - assert len(blinding_key) == 32 - assert len(value_commitment) == 33 - assert len(asset_commitment) == 33 + if len(pubkey) not in [33, 65]: + raise TransactionError("Invalid pubkey length") + if len(blinding_key) != 32: + raise TransactionError("Invalid blinding key length") + if len(value_commitment) != 33: + raise TransactionError("Invalid value commitment length") + if len(asset_commitment) != 33: + raise TransactionError("Invalid asset commitment length") pub = secp256k1.ec_pubkey_parse(pubkey) secp256k1.ec_pubkey_tweak_mul(pub, blinding_key) sec = secp256k1.ec_pubkey_serialize(pub) @@ -397,9 +404,11 @@ def __init__(self, nonce, entropy, amount_commitment, token_commitment=None): @classmethod def read_from(cls, stream): nonce = stream.read(32) - assert len(nonce) == 32 + if len(nonce) != 32: + raise TransactionError("Invalid nonce") entropy = stream.read(32) - assert len(entropy) == 32 + if len(entropy) != 32: + raise TransactionError("Invalid entropy") amount_commitment = read_commitment(stream) token_commitment = read_commitment(stream) return cls(nonce, entropy, amount_commitment, token_commitment) diff --git a/src/embit/misc.py b/src/embit/misc.py index fc2c8046..97669ade 100644 --- a/src/embit/misc.py +++ b/src/embit/misc.py @@ -35,7 +35,8 @@ def secure_randint(vmin: int, vmax: int) -> int: """ import math - assert vmax > vmin + if vmax <= vmin: + raise ValueError("vmax must be greater than vmin") delta = vmax - vmin nbits = math.ceil(math.log2(delta + 1)) randn = getrandbits(nbits) diff --git a/src/embit/networks.py b/src/embit/networks.py index 6f1a5418..dc853a4f 100644 --- a/src/embit/networks.py +++ b/src/embit/networks.py @@ -1,5 +1,20 @@ from .misc import const + +def get_network(name): + """ + Get network by name with testnet4 fallback. + Bitcoin Core 28.0+ uses 'testnet4' as the network name, + which has the same address parameters as testnet3. + """ + if name in NETWORKS: + return NETWORKS[name] + # testnet4 uses same parameters as testnet3 + if name == "testnet4": + return NETWORKS["test"] + return None + + NETWORKS = { "main": { "name": "Mainnet", @@ -73,4 +88,24 @@ "Zpub": b"\x02\x57\x54\x83", "bip32": const(1), }, + # testnet4: Bitcoin Core 28.0+ replacement for testnet3 + # Uses identical address parameters (bech32 "tb", same xpub/xprv versions) + "testnet4": { + "name": "Testnet4", + "wif": b"\xEF", + "p2pkh": b"\x6F", + "p2sh": b"\xC4", + "bech32": "tb", + "xprv": b"\x04\x35\x83\x94", + "xpub": b"\x04\x35\x87\xcf", + "yprv": b"\x04\x4a\x4e\x28", + "zprv": b"\x04\x5f\x18\xbc", + "Yprv": b"\x02\x42\x85\xb5", + "Zprv": b"\x02\x57\x50\x48", + "ypub": b"\x04\x4a\x52\x62", + "zpub": b"\x04\x5f\x1c\xf6", + "Ypub": b"\x02\x42\x89\xef", + "Zpub": b"\x02\x57\x54\x83", + "bip32": const(1), + }, } diff --git a/src/embit/psbt.py b/src/embit/psbt.py index 54497c2e..d20c88bb 100644 --- a/src/embit/psbt.py +++ b/src/embit/psbt.py @@ -142,6 +142,7 @@ def __init__(self, unknown: dict = {}, vin=None, compress=CompressMode.KEEP_ALL) self.taproot_bip32_derivations = OrderedDict() self.taproot_internal_key = None self.taproot_merkle_root = None + self.taproot_key_sig = None self.taproot_sigs = OrderedDict() self.taproot_scripts = OrderedDict() @@ -187,6 +188,7 @@ def update(self, other): self.taproot_bip32_derivations.update(other.taproot_bip32_derivations) self.taproot_internal_key = other.taproot_internal_key self.taproot_merkle_root = other.taproot_merkle_root or self.taproot_merkle_root + self.taproot_key_sig = other.taproot_key_sig or self.taproot_key_sig self.taproot_sigs.update(other.taproot_sigs) self.taproot_scripts.update(other.taproot_scripts) self.final_scriptsig = other.final_scriptsig or self.final_scriptsig @@ -350,7 +352,15 @@ def read_value(self, stream, k): elif k == b"\x10": self.sequence = int.from_bytes(v, "little") - # TODO: 0x13 - tap key signature + # PSBT_IN_TAP_KEY_SIG + elif k[0] == 0x13: + # read the taproot key sig + if len(k) != 1: + raise PSBTError("Invalid taproot key signature key") + if self.taproot_key_sig is not None: + raise PSBTError("Duplicated taproot key signature") + self.taproot_key_sig = v + # PSBT_IN_TAP_SCRIPT_SIG elif k[0] == 0x14: if len(k) != 65: @@ -434,6 +444,11 @@ def write_to(self, stream, skip_separator=False, version=None, **kwargs) -> int: r += ser_string(stream, b"\x10") r += ser_string(stream, self.sequence.to_bytes(4, "little")) + # PSBT_IN_TAP_KEY_SIG + if self.taproot_key_sig is not None: + r += ser_string(stream, b"\x13") + r += ser_string(stream, self.taproot_key_sig) + # PSBT_IN_TAP_SCRIPT_SIG for pub, leaf in self.taproot_sigs: r += ser_string(stream, b"\x14" + pub.xonly() + leaf) @@ -881,11 +896,11 @@ def sign_input_with_tapkey( sighash=sighash, ) sig = pk.schnorr_sign(h) - wit = sig.serialize() + sigdata = sig.serialize() if sighash != SIGHASH.DEFAULT: - wit += bytes([sighash]) - # TODO: maybe better to put into internal key sig field - inp.final_scriptwitness = Witness([wit]) + sigdata += bytes([sighash]) + inp.taproot_key_sig = sigdata + inp.final_scriptwitness = Witness([sigdata]) # no need to sign anything else return 1 counter = 0 @@ -977,22 +992,25 @@ def sign_with(self, root, sighash=SIGHASH.DEFAULT) -> int: continue # get all possible derivations with matching fingerprint - bip32_derivations = set() + bip32_derivations = OrderedDict() # OrderedDict to keep order if fingerprint: # if taproot derivations are present add them for pub in inp.taproot_bip32_derivations: (_leafs, derivation) = inp.taproot_bip32_derivations[pub] if derivation.fingerprint == fingerprint: - bip32_derivations.add((pub, derivation)) + # Add only if not already present + if (pub, derivation) not in bip32_derivations: + bip32_derivations[(pub, derivation)] = True # segwit and legacy derivations for pub in inp.bip32_derivations: derivation = inp.bip32_derivations[pub] if derivation.fingerprint == fingerprint: - bip32_derivations.add((pub, derivation)) + if (pub, derivation) not in bip32_derivations: + bip32_derivations[(pub, derivation)] = True # get derived keys for signing - derived_keypairs = set() # (prv, pub) + derived_keypairs = OrderedDict() # (prv, pub) for pub, derivation in bip32_derivations: der = derivation.derivation # descriptor key has origin derivation that we take into account @@ -1008,7 +1026,9 @@ def sign_with(self, root, sighash=SIGHASH.DEFAULT) -> int: if hdkey.xonly() != pub.xonly(): raise PSBTError("Derivation path doesn't look right") - derived_keypairs.add((hdkey.key, pub)) + # Insert into derived_keypairs if not present + if (hdkey.key, pub) not in derived_keypairs: + derived_keypairs[(hdkey.key, pub)] = True # sign with taproot key if inp.is_taproot: diff --git a/src/embit/psbtview.py b/src/embit/psbtview.py index 8012654d..c66bcb5b 100644 --- a/src/embit/psbtview.py +++ b/src/embit/psbtview.py @@ -14,6 +14,7 @@ Makes sense to run gc.collect() after processing of each scope to free memory. """ # TODO: refactor, a lot of code is duplicated here from transaction.py +from collections import OrderedDict import hashlib from . import compact from . import ec @@ -239,8 +240,10 @@ def view(cls, stream, offset=None, compress=CompressMode.KEEP_ALL): num_outputs = compact.from_bytes(value) elif key == b"\x00": # we found global transaction - assert version != 2 - assert (num_inputs is None) and (num_outputs is None) + if version == 2: + raise PSBTError("Global transaction with version 2 PSBT") + if (num_inputs is not None) or (num_outputs is not None): + raise PSBTError("Invalid global transaction") tx_len = compact.read_from(stream) cur += len(compact.to_bytes(tx_len)) tx_offset = cur @@ -742,22 +745,25 @@ def sign_input( return 0 # get all possible derivations with matching fingerprint - bip32_derivations = set() + bip32_derivations = OrderedDict() if fingerprint: # if taproot derivations are present add them for pub in inp.taproot_bip32_derivations: (_leafs, derivation) = inp.taproot_bip32_derivations[pub] if derivation.fingerprint == fingerprint: - bip32_derivations.add((pub, derivation)) + # Add only if not already present + if (pub, derivation) not in bip32_derivations: + bip32_derivations[(pub, derivation)] = True # segwit and legacy derivations for pub in inp.bip32_derivations: derivation = inp.bip32_derivations[pub] if derivation.fingerprint == fingerprint: - bip32_derivations.add((pub, derivation)) + if (pub, derivation) not in bip32_derivations: + bip32_derivations[(pub, derivation)] = True # get derived keys for signing - derived_keypairs = set() # (prv, pub) + derived_keypairs = OrderedDict() # (prv, pub) for pub, derivation in bip32_derivations: der = derivation.derivation # descriptor key has origin derivation that we take into account @@ -773,7 +779,9 @@ def sign_input( if hdkey.xonly() != pub.xonly(): raise PSBTError("Derivation path doesn't look right") - derived_keypairs.add((hdkey.key, pub)) + # Insert into derived_keypairs if not present + if (hdkey.key, pub) not in derived_keypairs: + derived_keypairs[(hdkey.key, pub)] = True counter = 0 # sign with taproot key diff --git a/src/embit/util/ctypes_secp256k1.py b/src/embit/util/ctypes_secp256k1.py index 232abda1..8068355e 100644 --- a/src/embit/util/ctypes_secp256k1.py +++ b/src/embit/util/ctypes_secp256k1.py @@ -761,16 +761,20 @@ def xonly_pubkey_from_pubkey(pubkey, context=_secp.ctx): @locked def schnorrsig_verify(sig, msg, pubkey, context=_secp.ctx): - assert len(sig) == 64 - assert len(msg) == 32 - assert len(pubkey) == 64 + if len(sig) != 64: + raise ValueError("Signature should be 64 bytes long") + if len(msg) != 32: + raise ValueError("Message should be 32 bytes long") + if len(pubkey) != 64: + raise ValueError("Public key should be 64 bytes long") res = _secp.secp256k1_schnorrsig_verify(context, sig, msg, pubkey) return bool(res) @locked def keypair_create(secret, context=_secp.ctx): - assert len(secret) == 32 + if len(secret) != 32: + raise ValueError("Secret key should be 32 bytes long") keypair = bytes(96) r = _secp.secp256k1_keypair_create(context, keypair, secret) if r == 0: @@ -782,11 +786,13 @@ def keypair_create(secret, context=_secp.ctx): def schnorrsig_sign( msg, keypair, nonce_function=None, extra_data=None, context=_secp.ctx ): - assert len(msg) == 32 + if len(msg) != 32: + raise ValueError("Message should be 32 bytes long") if len(keypair) == 32: keypair = keypair_create(keypair, context=context) with _lock: - assert len(keypair) == 96 + if len(keypair) != 96: + raise ValueError("Keypair should be 96 bytes long") sig = bytes(64) r = _secp.secp256k1_schnorrsig_sign( context, sig, msg, keypair, nonce_function, extra_data @@ -916,7 +922,8 @@ def pedersen_blind_generator_blind_sum( if res == 0: raise ValueError("Failed to get the last blinding factor.") res = (c_char * 32).from_address(address).raw - assert len(res) == 32 + if len(res) != 32: + raise ValueError("Blinding factor should be 32 bytes long") return res diff --git a/src/embit/util/key.py b/src/embit/util/key.py index 13b01d95..f573ec5b 100644 --- a/src/embit/util/key.py +++ b/src/embit/util/key.py @@ -43,7 +43,8 @@ def jacobi_symbol(n, k): For our application k is always prime, so this is the same as the Legendre symbol. """ - assert k > 0 and k & 1, "jacobi symbol is only defined for positive odd k" + if k <= 0 or k % 2 == 0: + raise ValueError("jacobi symbol is only defined for positive odd k") n %= k t = 0 while n != 0: @@ -165,7 +166,8 @@ def add_mixed(self, p1, p2): """ x1, y1, z1 = p1 x2, y2, z2 = p2 - assert z2 == 1 + if z2 != 1: + raise ValueError("p2 must be an affine point") # Adding to the point at infinity is a no-op if z1 == 0: return p2 @@ -299,7 +301,8 @@ def is_valid(self): return self.valid def get_bytes(self): - assert self.valid + if not self.valid: + raise ValueError("Invalid public key") p = SECP256K1.affine(self.p) if p is None: return None @@ -313,7 +316,8 @@ def verify_ecdsa(self, sig, msg, low_s=True): See https://en.wikipedia.org/wiki/Elliptic_Curve_Digital_Signature_Algorithm for the ECDSA verifier algorithm""" - assert self.valid + if not self.valid: + raise ValueError("Invalid public key") # Extract r and s from the DER formatted signature. Return false for # any DER encoding errors. @@ -378,7 +382,8 @@ def __init__(self): def set(self, secret, compressed): """Construct a private key object with given 32-byte secret and compressed flag.""" - assert len(secret) == 32 + if len(secret) != 32: + raise ValueError("Invalid secret key length") secret = int.from_bytes(secret, "big") self.valid = secret > 0 and secret < SECP256K1_ORDER if self.valid: @@ -391,7 +396,8 @@ def generate(self, compressed=True): def get_bytes(self): """Retrieve the 32-byte representation of this key.""" - assert self.valid + if not self.valid: + raise ValueError("Invalid private key") return self.secret.to_bytes(32, "big") @property @@ -404,7 +410,8 @@ def is_compressed(self): def get_pubkey(self): """Compute an ECPubKey object for this secret key.""" - assert self.valid + if not self.valid: + raise ValueError("Invalid private key") ret = ECPubKey() p = SECP256K1.mul([(SECP256K1_G, self.secret)]) ret.p = p @@ -417,7 +424,8 @@ def sign_ecdsa(self, msg, nonce_function=None, extra_data=None, low_s=True): See https://en.wikipedia.org/wiki/Elliptic_Curve_Digital_Signature_Algorithm for the ECDSA signer algorithm.""" - assert self.valid + if not self.valid: + raise ValueError("Invalid private key") z = int.from_bytes(msg, "big") if nonce_function is None: nonce_function = deterministic_k @@ -470,7 +478,8 @@ def compute_xonly_pubkey(key): This also returns whether the resulting public key was negated. """ - assert len(key) == 32 + if len(key) != 32: + raise ValueError("Invalid private key length") x = int.from_bytes(key, "big") if x == 0 or x >= SECP256K1_ORDER: return (None, None) @@ -481,8 +490,10 @@ def compute_xonly_pubkey(key): def tweak_add_privkey(key, tweak): """Tweak a private key (after negating it if needed).""" - assert len(key) == 32 - assert len(tweak) == 32 + if len(key) != 32: + raise ValueError("Invalid private key length") + if len(tweak) != 32: + raise ValueError("Invalid tweak length") x = int.from_bytes(key, "big") if x == 0 or x >= SECP256K1_ORDER: @@ -501,8 +512,10 @@ def tweak_add_privkey(key, tweak): def tweak_add_pubkey(key, tweak): """Tweak a public key and return whether the result had to be negated.""" - assert len(key) == 32 - assert len(tweak) == 32 + if len(key) != 32: + raise ValueError("Invalid public key length") + if len(tweak) != 32: + raise ValueError("Invalid tweak length") x_coord = int.from_bytes(key, "big") if x_coord >= SECP256K1_FIELD_SIZE: @@ -525,9 +538,12 @@ def verify_schnorr(key, sig, msg): - sig is a 64-byte Schnorr signature - msg is a 32-byte message """ - assert len(key) == 32 - assert len(msg) == 32 - assert len(sig) == 64 + if len(key) != 32: + raise ValueError("Invalid public key length") + if len(msg) != 32: + raise ValueError("Invalid message length") + if len(sig) != 64: + raise ValueError("Invalid signature length") x_coord = int.from_bytes(key, "big") if x_coord == 0 or x_coord >= SECP256K1_FIELD_SIZE: @@ -556,10 +572,13 @@ def verify_schnorr(key, sig, msg): def sign_schnorr(key, msg, aux=None, flip_p=False, flip_r=False): """Create a Schnorr signature (see BIP 340).""" - assert len(key) == 32 - assert len(msg) == 32 + if len(key) != 32: + raise ValueError("Invalid private key length") + if len(msg) != 32: + raise ValueError("Invalid message length") if aux is not None: - assert len(aux) == 32 + if len(aux) != 32: + raise ValueError("Invalid aux length") sec = int.from_bytes(key, "big") if sec == 0 or sec >= SECP256K1_ORDER: @@ -579,7 +598,8 @@ def sign_schnorr(key, msg, aux=None, flip_p=False, flip_r=False): ) % SECP256K1_ORDER ) - assert kp != 0 + if kp == 0: + raise ValueError("k is zero") R = SECP256K1.affine(SECP256K1.mul([(SECP256K1_G, kp)])) k = kp if SECP256K1.has_even_y(R) != flip_r else SECP256K1_ORDER - kp e = ( diff --git a/src/embit/util/py_ripemd160.py b/src/embit/util/py_ripemd160.py index d42c4a3c..d995b269 100644 --- a/src/embit/util/py_ripemd160.py +++ b/src/embit/util/py_ripemd160.py @@ -359,7 +359,7 @@ def fi(x, y, z, i): elif i == 4: return x ^ (y | ~z) else: - assert False + raise ValueError("Invalid function index") def rol(x, i): diff --git a/src/embit/util/py_secp256k1.py b/src/embit/util/py_secp256k1.py index 85140863..0f1f8e1d 100644 --- a/src/embit/util/py_secp256k1.py +++ b/src/embit/util/py_secp256k1.py @@ -283,9 +283,12 @@ def xonly_pubkey_from_pubkey(pubkey, context=None): def schnorrsig_verify(sig, msg, pubkey, context=None): - assert len(sig) == 64 - assert len(msg) == 32 - assert len(pubkey) == 64 + if len(sig) != 64: + raise ValueError("Signature should be 64 bytes long") + if len(msg) != 32: + raise ValueError("Message should be 32 bytes long") + if len(pubkey) != 64: + raise ValueError("Public key should be 64 bytes long") sec = ec_pubkey_serialize(pubkey) return _key.verify_schnorr(sec[1:33], sig, msg) @@ -298,10 +301,12 @@ def keypair_create(secret, context=None): def schnorrsig_sign(msg, keypair, nonce_function=None, extra_data=None, context=None): - assert len(msg) == 32 + if len(msg) != 32: + raise ValueError("Message should be 32 bytes long") if len(keypair) == 32: keypair = keypair_create(keypair, context=context) - assert len(keypair) == 96 + if len(keypair) != 96: + raise ValueError("Keypair should be 96 bytes long") return _key.sign_schnorr(keypair[:32], msg, extra_data) diff --git a/tests/tests/test_bip39.py b/tests/tests/test_bip39.py index 685cb4e0..ae783fa1 100644 --- a/tests/tests/test_bip39.py +++ b/tests/tests/test_bip39.py @@ -173,8 +173,24 @@ def test_bip39(self): self.assertEqual(act_xkey.to_base58(), xprv) def test_invalid_length(self): - words = "panel trumpet seek bridge income piano history car flower aim loan accident embark canoe" - self.assertFalse(mnemonic_is_valid(words)) + invalid_length = [ + # not divisible by 3, too short, too long + "panel trumpet seek bridge income piano history car flower aim loan accident embark canoe", + "zoo " * 8 + "zebra", + "zoo " * 26 + "valley", + ] + for words in invalid_length: + self.assertFalse(mnemonic_is_valid(words)) + self.assertRaises(ValueError, mnemonic_to_bytes, words) + + invalid_length = [ + # not divisible by 4, too short, too long + b"\x00" * 19, + b"\x00" * 12, + b"\x00" * 36, + ] + for entropy in invalid_length: + self.assertRaises(ValueError, mnemonic_from_bytes, entropy) def test_invalid_word(self): words = "fljsafk minute glow ride mask ceiling old limb rookie discover cotton biology" @@ -190,6 +206,21 @@ def test_invalid_seed(self): seed = "0000000000000000000000000000000042" self.assertRaises(ValueError, lambda x: mnemonic_from_bytes(unhexlify(x)), seed) + def test_spaces(self): + """Test that mnemonic with leading / trailing / double spaces are invalid.""" + # valid mnemonic + mnemonic = "abandon " * 11 + "about" + self.assertTrue(mnemonic_is_valid(mnemonic)) + # leading / trailing space + self.assertFalse(mnemonic_is_valid(" " + mnemonic)) + self.assertFalse(mnemonic_is_valid(mnemonic + " ")) + # double space + mnemonic = "abandon " * 11 + " about" + self.assertFalse(mnemonic_is_valid(mnemonic)) + # new line instead of space + mnemonic = "abandon\n" * 11 + "about" + self.assertFalse(mnemonic_is_valid(mnemonic)) + def test_find_candidates_happy(self): prefix = "a" exp_candidates = [ diff --git a/tests/tests/test_bip85.py b/tests/tests/test_bip85.py index e5ec9c44..7b6280b9 100644 --- a/tests/tests/test_bip85.py +++ b/tests/tests/test_bip85.py @@ -48,12 +48,44 @@ class Bip85Test(TestCase): + + def test_derive_entropy(self): + for app_index, path, expected in [ + (39, [0, 12, 0], unhexlify("6250b68daf746d12a24d58b4787a714bf1b58d69e4c2a466276fb16fe93dc52b6fac6b756894072241447cad56f6405ee326dbb473d2f5e943543590082927c0")), + (2, [0], unhexlify("7040bb53104f27367f317558e78a994ada7296c6fde36a364e5baf206e502bb1f988080b7dd814e7ae7d6d83edbb6689886a560e165f4a740877cdf3beecacf8")), + (32, [0], unhexlify("52405cd0dd21c5be78314a7c1a3c65ffd8d896536cc7dee3157db5824f0c92e2ead0b33988a616cf6a497f1c169d9e92562604e38305ccd3fc96f2252c177682")), + ]: + result = bip85.derive_entropy(ROOT, app_index, path) + self.assertEqual(result, expected) + + def test_derive_entropy_fail_path_ge_hardened_index(self): + with self.assertRaises(ValueError) as exc: + bip85.derive_entropy(ROOT, 39, [bip32.HARDENED_INDEX + 1]) + self.assertEqual(str(exc.exception), "Path elements must be less than 2^31") + def test_bip39(self): for num_words, index, lang, expected in VECTORS_BIP39: self.assertEqual( bip85.derive_mnemonic(ROOT, num_words, index, language=lang), expected ) + def test_bip39_fail_num_words(self): + cases = [ + (11, 0, bip85.LANGUAGES.ENGLISH), + (13, 0, bip85.LANGUAGES.ENGLISH), + (15, 0, bip85.LANGUAGES.ENGLISH), + (17, 0, bip85.LANGUAGES.ENGLISH), + (19, 0, bip85.LANGUAGES.ENGLISH), + (21, 0, bip85.LANGUAGES.ENGLISH), + (23, 0, bip85.LANGUAGES.ENGLISH), + (25, 0, bip85.LANGUAGES.ENGLISH), + ] + + for num_words, index, lang in cases: + with self.assertRaises(ValueError) as exc: + bip85.derive_mnemonic(ROOT, num_words, index, language=lang) + self.assertEqual(str(exc.exception), "Number of words must be 12, 18 or 24") + def test_wif(self): for idx, expected in VECTORS_WIF: self.assertEqual(bip85.derive_wif(ROOT, idx).wif(), expected) @@ -67,3 +99,15 @@ def test_hex(self): self.assertEqual( bip85.derive_hex(ROOT, num_bytes, idx), unhexlify(expected) ) + + def test_hex_fail_num_bytes_ge_64(self): + for num_bytes in [65, 100, 1000, 10000]: + with self.assertRaises(ValueError) as exc: + bip85.derive_hex(ROOT, num_bytes, 1) + self.assertEqual(str(exc.exception), "Number of bytes must not exceed 64") + + def test_hex_fail_num_bytes_le_16(self): + for num_bytes in [15, 14, 10, 0]: + with self.assertRaises(ValueError) as exc: + bip85.derive_hex(ROOT, num_bytes, 2) + self.assertEqual(str(exc.exception), "Number of bytes must be at least 16") diff --git a/tests/tests/test_descriptor.py b/tests/tests/test_descriptor.py index b3f70827..eeaac5bb 100644 --- a/tests/tests/test_descriptor.py +++ b/tests/tests/test_descriptor.py @@ -5,6 +5,7 @@ from embit.descriptor.miniscript import OPERATORS, WRAPPERS from embit.descriptor.errors import MiniscriptError from embit.descriptor.checksum import add_checksum, DescriptorError +from embit.networks import NETWORKS from embit import ec @@ -34,6 +35,7 @@ def test_descriptors(self): "c:pk_k(0250863ad64a87ae8a2fe83c1af1a8403cb53f53e486d8511dad8a04887e5b2352)" "))", "21020e0338c96a8870479f2396c373cc7696ba124e8635d41b0ea581112b67817261ac7364210250863ad64a87ae8a2fe83c1af1a8403cb53f53e486d8511dad8a04887e5b2352ac68", + 1, ), # # pkh - 8e5d7457d33a978d1c3c1e440f92a195e00cc7d8 # ("wsh(v:pk_h(03e7d285b4817f83f724cd29394da75dfc84fe639ed147a944e7e6064703b14130))", None), @@ -41,36 +43,44 @@ def test_descriptors(self): "sh(wsh(and_v(or_c(pk(%s),or_c(pk(%s),v:older(1000))),pk(%s))))" % tuple(keys[-3:]), "2103e7d285b4817f83f724cd29394da75dfc84fe639ed147a944e7e6064703b14130ac642103b8fa5d5959fa4027ccbf0736a86ccde4242e3051ea363437b4ff0d52598d7cecac6402e803b26968682103e7d285b4817f83f724cd29394da75dfc84fe639ed147a944e7e6064703b14130ac", + 2, ), ( "sh(or_b(pk(%s),s:pk(%s)))" % tuple(keys[:2]), "2103801b3a4e3ca0d61d469445621561c47f6c1424d0fd353a44c2c3ebb84ae78f59ac7c2103e7d285b4817f83f724cd29394da75dfc84fe639ed147a944e7e6064703b14130ac9b", + 2, ), ( "wsh(or_d(pk(%s),pkh(%s)))" % tuple(keys[-2:]), "2103b8fa5d5959fa4027ccbf0736a86ccde4242e3051ea363437b4ff0d52598d7cecac736476a9148e5d7457d33a978d1c3c1e440f92a195e00cc7d888ac68", + 2, ), ( "wsh(and_v(v:pk(%s),or_d(pk(%s),older(12960))))" % tuple(keys[:2]), "2103801b3a4e3ca0d61d469445621561c47f6c1424d0fd353a44c2c3ebb84ae78f59ad2103e7d285b4817f83f724cd29394da75dfc84fe639ed147a944e7e6064703b14130ac736402a032b268", + 2, ), ( "wsh(andor(pk(%s),older(1008),pk(%s)))" % tuple(keys[:2]), "2103801b3a4e3ca0d61d469445621561c47f6c1424d0fd353a44c2c3ebb84ae78f59ac642103e7d285b4817f83f724cd29394da75dfc84fe639ed147a944e7e6064703b14130ac6702f003b268", + 2, ), ( "wsh(t:or_c(pk(%s),and_v(v:pk(%s),or_c(pk(%s),v:hash160(e7d285b4817f83f724cd29394da75dfc84fe639e)))))" % tuple(keys[:3]), "2103801b3a4e3ca0d61d469445621561c47f6c1424d0fd353a44c2c3ebb84ae78f59ac642103e7d285b4817f83f724cd29394da75dfc84fe639ed147a944e7e6064703b14130ad2103b8fa5d5959fa4027ccbf0736a86ccde4242e3051ea363437b4ff0d52598d7cecac6482012088a914e7d285b4817f83f724cd29394da75dfc84fe639e88686851", + 2, ), ( "wsh(andor(pk(%s),or_i(and_v(v:pkh(%s),hash160(e7d285b4817f83f724cd29394da75dfc84fe639e)),older(1008)),pk(%s)))" % tuple(keys[:3]), "2103801b3a4e3ca0d61d469445621561c47f6c1424d0fd353a44c2c3ebb84ae78f59ac642103b8fa5d5959fa4027ccbf0736a86ccde4242e3051ea363437b4ff0d52598d7cecac676376a9148e5d7457d33a978d1c3c1e440f92a195e00cc7d888ad82012088a914e7d285b4817f83f724cd29394da75dfc84fe639e876702f003b26868", + 2, ), ( "wsh(multi(2,%s,%s,%s))" % tuple(keys[:3]), "522103801b3a4e3ca0d61d469445621561c47f6c1424d0fd353a44c2c3ebb84ae78f592103e7d285b4817f83f724cd29394da75dfc84fe639ed147a944e7e6064703b141302103b8fa5d5959fa4027ccbf0736a86ccde4242e3051ea363437b4ff0d52598d7cec53ae", + 2, ), # TODO: invalid miniscript for segwit, but valid for taproot # ("wsh(thresh(3,pk(%s),s:pk(%s),s:pk(%s),sdv:older(12960)))" % tuple(keys[:3]), @@ -88,6 +98,7 @@ def test_descriptors(self): "038c8f919f70062c084376223fd8b4f0c08958e70499df496411dde83a1bb64b0d," "02d0ea7084e344b56625277b074d15a15301b9d96b0b2dd9fc905e01fc3de408e1))", "5a210373b665b6fe153c5872de1344339ee60588491257d2c34567aa026af237143a6c2102916ee61974fc4892afb2d3cad4c13472138b5521411de24a78910afb97b95f22210244efc096ea3b7df99071b1cfa1630144e20d8ccd1540e726034a051aa1802d3b2102d9c51dc3f4088d5ce0b83f188fb14901b98c1c9e8cf771c49b7b441e56272b8a2103094990a34af21ef3ed766c8e0cb1e44f5e0d80412bbe00a2ade82a024ca91d232102722a386ad0f6d7f1261808a3e70fab143303bd2264283486411c3183ea3ed1c321036070b1f2995d8ffda8478ef55affd39795689a3982d54b12180397b1ad1f5f7521026515fa7603c10c44f6d316ae7592b5899d46d87ac1e574ec53de8b59f95efad621038c8f919f70062c084376223fd8b4f0c08958e70499df496411dde83a1bb64b0d2102d0ea7084e344b56625277b074d15a15301b9d96b0b2dd9fc905e01fc3de408e15aae", + 1, ), ( "wsh(andor(" @@ -113,18 +124,25 @@ def test_descriptors(self): "a:pkh(379ed952eb4740386acc59c2d28d9aa62e63968d)," "a:pkh(c30d2795e70b1ee6f8af0b33d9460d60cfcf10b3))))", "5421036070b1f2995d8ffda8478ef55affd39795689a3982d54b12180397b1ad1f5f7521026515fa7603c10c44f6d316ae7592b5899d46d87ac1e574ec53de8b59f95efad621038c8f919f70062c084376223fd8b4f0c08958e70499df496411dde83a1bb64b0d2102d0ea7084e344b56625277b074d15a15301b9d96b0b2dd9fc905e01fc3de408e154ae6476a9141ad3ca2d247b8e8888e41f89ac8bef217d83f33f88ac6b76a914f94f2eadc9c1bc3a8b8c2c6364af2c070fd4120688ac6c936b76a9143c306c2c97e4ba62ac0d7fb3965aba66b28e895988ac6c936b76a914ba7b9e846eb6b16420976c6bead54d9bb2b08d3588ac6c936b76a914379ed952eb4740386acc59c2d28d9aa62e63968d88ac6c936b76a914c30d2795e70b1ee6f8af0b33d9460d60cfcf10b388ac6c93558767562103856d447f1b890cc6e0e0114cd5bac58662c37ce7f458c458b72bd396597edfc72103e080e99896384aa8a07da837b2042a4c0d824eeaa8d51e6c9cff20682be75d4f2102c6d258e728005d4d00e55ac4b87786df507921b3ba3efec244a47f4a2e61b4b02102edfc1d6088f9b6470ed4550d8bf2326ebebc0464a7f78581fa7283fc54edecf02102f3630d1f51b2ebaaf1c7ebae9c24318279d4cff5ad16cb290b6d26edf96dca9c210353ecc8e7b1cc90d405cd6fc9d9f24d44b6b5649abc2773f28a6ca4fa7a4cd62956af029000b268", + 1, ), ( "wsh(sortedmulti(2,%s,%s,%s))" % tuple(keys[:3]), "522103801b3a4e3ca0d61d469445621561c47f6c1424d0fd353a44c2c3ebb84ae78f592103b8fa5d5959fa4027ccbf0736a86ccde4242e3051ea363437b4ff0d52598d7cec2103e7d285b4817f83f724cd29394da75dfc84fe639ed147a944e7e6064703b1413053ae", + 2, ), - ("wpkh(%s)" % keys[0], "0014f8f93df2160de8fd3ca716e2f905c74da3f9839f"), - ("sh(wpkh(%s))" % keys[0], "0014f8f93df2160de8fd3ca716e2f905c74da3f9839f"), - ("pkh(%s)" % keys[0], "76a914f8f93df2160de8fd3ca716e2f905c74da3f9839f88ac"), + ( + "wsh(sortedmulti(2,%s,%s,%s))" % tuple(key.replace("<0;1>", "0") for key in keys[:3]), + "522103801b3a4e3ca0d61d469445621561c47f6c1424d0fd353a44c2c3ebb84ae78f592103b8fa5d5959fa4027ccbf0736a86ccde4242e3051ea363437b4ff0d52598d7cec2103e7d285b4817f83f724cd29394da75dfc84fe639ed147a944e7e6064703b1413053ae", + 1, + ), + ("wpkh(%s)" % keys[0], "0014f8f93df2160de8fd3ca716e2f905c74da3f9839f", 2,), + ("sh(wpkh(%s))" % keys[0], "0014f8f93df2160de8fd3ca716e2f905c74da3f9839f", 2,), + ("pkh(%s)" % keys[0], "76a914f8f93df2160de8fd3ca716e2f905c74da3f9839f88ac", 2,), ] error_cases = [1,2,3,4,5,6,7,8,11] - for i, (d, a) in enumerate(dd): + for i, (d, a, n_branches) in enumerate(dd): if i in error_cases: self.assertRaises(DescriptorError, Descriptor.from_string, d) else: @@ -136,6 +154,7 @@ def test_descriptors(self): schex = hexlify(scc.data).decode() self.assertEqual(schex, a) self.assertEqual(str(sc), d) + assert sc.num_branches == n_branches def test_descriptor_from_string_validation(self): """These tests don't verify taproot scripts""" @@ -290,11 +309,22 @@ def test_miniscript_compat(self): "wsh(andor(thresh(1,pk(xpub6BaZSKgpaVvibu2k78QsqeDWXp92xLHZxiu1WoqLB9hKhsBf3miBUDX7PJLgSPvkj66ThVHTqdnbXpeu8crXFmDUd4HeM4s4miQS2xsv3Qb/*)),and_v(v:multi(2,03b506a1dbe57b4bf48c95e0c7d417b87dd3b4349d290d2e7e9ba72c912652d80a,0295e7f5d12a2061f1fd2286cefec592dff656a19f55f4f01305d6aa56630880ce),older(6)),thresh(2,pkh(xpub6AHA9hZDN11k2ijHMeS5QqHx2KP9aMBRhTDqANMnwVtdyw2TDYRmF8PjpvwUFcL1Et8Hj59S3gTSMcUQ5gAqTz3Wd8EsMTmF3DChhqPQBnU/*),a:pkh(xpub6AaffFGfH6WXfm6pwWzmUMuECQnoLeB3agMKaLyEBZ5ZVfwtnS5VJKqXBt8o5ooCWVy2H87GsZshp7DeKE25eWLyd1Ccuh2ZubQUkgpiVux/*))))#76jsyzdg", "wsh(or_d(pk([40259ab7/48'/1'/0'/2']tpubDFcY6nTiMAWBd5d2bS8JZvjcaLjC6GE6XnPAJAPUkVj5wa5Pyb4gumx1ZWvnXQ8tmorCmpAyai69K9hD2mGQUeNkuXfjztsfqnE5FMk1CCh/<0;1>/*),and_v(v:pkh([842a626e/48'/1'/0'/2']tpubDENBboujRvpkS8SgZsrpqG2BCUBoaAc4c57jHFe1NwKAtfVjDZDUadQKYv4pkAEF2afPv6TtQ2BoYFJAPLbuKpL1usiySERZekGo4JmnWhh/<0;1>/*),older(65535))))#deguz53x", "wsh(or_d(pk([40259ab7/48h/1h/0h/2h]tpubDFcY6nTiMAWBd5d2bS8JZvjcaLjC6GE6XnPAJAPUkVj5wa5Pyb4gumx1ZWvnXQ8tmorCmpAyai69K9hD2mGQUeNkuXfjztsfqnE5FMk1CCh/<0;1>/*),and_v(v:pkh([842a626e/48H/1H/0H/2H]tpubDENBboujRvpkS8SgZsrpqG2BCUBoaAc4c57jHFe1NwKAtfVjDZDUadQKYv4pkAEF2afPv6TtQ2BoYFJAPLbuKpL1usiySERZekGo4JmnWhh/<0;1>/*),older(65535))))#deguz53x", + "wsh(and_v(v:0,and_v(v:pk([40259ab7/48h/1h/0h/2h]tpubDFcY6nTiMAWBd5d2bS8JZvjcaLjC6GE6XnPAJAPUkVj5wa5Pyb4gumx1ZWvnXQ8tmorCmpAyai69K9hD2mGQUeNkuXfjztsfqnE5FMk1CCh/<0;1>/*),pk([40259ab7/48h/1h/0h/2h]tpubDFcY6nTiMAWBd5d2bS8JZvjcaLjC6GE6XnPAJAPUkVj5wa5Pyb4gumx1ZWvnXQ8tmorCmpAyai69K9hD2mGQUeNkuXfjztsfqnE5FMk1CCh/<0;1>/*))))", + "wsh(and_v(v:0,and_v(v:0,0)))", ] for desc in generalistic_descs: - Descriptor.from_string(desc) + desc = Descriptor.from_string(desc) + desc.derive(0, 0).address(network=NETWORKS["main"]) + def test_invalid_miniscript(self): + """Ensure an error is raised when parsing invalid miniscript""" + invalid_descs = [ + "wsh(ttvtvtvtvtvtvtv:after(230775))", + ] + for desc in invalid_descs: + self.assertRaises(MiniscriptError, Descriptor.from_string, desc) + def test_len(self): """Checks that len(miniscript) returns correct length""" for op in OPERATORS: @@ -318,32 +348,39 @@ def test_multisig(self): "wsh(c:andor(multi(1,%s,%s),pk_k(%s),pk_k(%s)))" % keys, False, False, + 1 ), ( "wsh(multi(2,%s,%s,%s,%s))" % keys, True, False, + 1 ), ( "wsh(sortedmulti(2,%s,%s,%s,%s))" % keys, True, True, + 1 ), ( "tr(%s,multi_a(2,%s,%s,%s))" % keys, False, False, + 1 ), ( "tr(%s,sortedmulti_a(2,%s,%s,%s))" % keys, False, False, + 1 ), ] - for dstr, is_basic, is_sorted in descriptors: + for dstr, is_basic, is_sorted, n_branches in descriptors: d = Descriptor.from_string(dstr) self.assertEqual(d.is_basic_multisig, is_basic) self.assertEqual(d.is_sorted, is_sorted) + + assert n_branches == d.num_branches # test that: diff --git a/tests/tests/test_networks.py b/tests/tests/test_networks.py new file mode 100644 index 00000000..acc522a1 --- /dev/null +++ b/tests/tests/test_networks.py @@ -0,0 +1,65 @@ +"""Tests for network definitions and get_network function.""" + +from embit import networks +from unittest import TestCase + + +class TestNetworks(TestCase): + """Test Bitcoin network definitions.""" + + def test_all_networks_present(self): + """All expected Bitcoin networks should be defined.""" + expected = ["main", "test", "regtest", "signet", "testnet4"] + for name in expected: + self.assertIn(name, networks.NETWORKS) + + def test_testnet4_parameters(self): + """testnet4 should have same parameters as testnet3.""" + test = networks.NETWORKS["test"] + testnet4 = networks.NETWORKS["testnet4"] + + # Same address parameters + self.assertEqual(testnet4["bech32"], test["bech32"]) + self.assertEqual(testnet4["bech32"], "tb") + self.assertEqual(testnet4["p2pkh"], test["p2pkh"]) + self.assertEqual(testnet4["p2sh"], test["p2sh"]) + self.assertEqual(testnet4["wif"], test["wif"]) + + # Same xpub/xprv versions + self.assertEqual(testnet4["xpub"], test["xpub"]) + self.assertEqual(testnet4["xprv"], test["xprv"]) + self.assertEqual(testnet4["zpub"], test["zpub"]) + self.assertEqual(testnet4["zprv"], test["zprv"]) + + # Same coin type + self.assertEqual(testnet4["bip32"], test["bip32"]) + + def test_get_network_direct(self): + """get_network should return network by name.""" + main = networks.get_network("main") + self.assertEqual(main["name"], "Mainnet") + self.assertEqual(main["bech32"], "bc") + + test = networks.get_network("test") + self.assertEqual(test["name"], "Testnet") + self.assertEqual(test["bech32"], "tb") + + def test_get_network_testnet4(self): + """get_network should handle testnet4.""" + testnet4 = networks.get_network("testnet4") + self.assertIsNotNone(testnet4) + self.assertEqual(testnet4["name"], "Testnet4") + self.assertEqual(testnet4["bech32"], "tb") + + def test_get_network_unknown(self): + """get_network should return None for unknown networks.""" + result = networks.get_network("unknown_network") + self.assertIsNone(result) + + def test_mainnet_bech32(self): + """Mainnet should use 'bc' prefix.""" + self.assertEqual(networks.NETWORKS["main"]["bech32"], "bc") + + def test_regtest_bech32(self): + """Regtest should use 'bcrt' prefix.""" + self.assertEqual(networks.NETWORKS["regtest"]["bech32"], "bcrt")