Source code for pybgl.functions.script

from struct import unpack, pack
from pybgl.opcodes import *

from pybgl.functions.tools import bytes_from_hex, int_to_bytes, get_stream, get_bytes, bytes_to_int
from pybgl.functions.hash import hash160, sha256
from pybgl.functions.address import hash_to_address
from pybgl.functions.key import is_wif_valid, wif_to_private_key
from pybgl.crypto import __secp256k1_ecdsa_verify__
from pybgl.crypto import __secp256k1_ecdsa_sign__
from pybgl.crypto import __secp256k1_ecdsa_recover__


def public_key_to_pubkey_script(key, hex=True):
    key = get_bytes(key)
    s = b"%s%s%s" % (bytes([len(key)]), key, OP_CHECKSIG)
    return s.hex() if hex else s


[docs]def parse_script(script, segwit=True): """ Parse script and return script type, script address and required signatures count. :param script: script in bytes string or HEX encoded string format. :param segwit: (optional) If set to True recognize P2WPKH and P2WSH sripts, by default is True. :return: dictionary: - nType - numeric script type - type - script type - addressHash - address hash in case address recognized - script - script if no address recognized - reqSigs - required signatures count """ if not script: return {"nType": 7, "type": "NON_STANDARD", "reqSigs": 0, "script": b""} script = get_bytes(script) l = len(script) if segwit: if l == 22 and script[0] == 0: return {"nType": 5, "type": "P2WPKH", "reqSigs": 1, "addressHash": script[2:]} if l == 34 and script[0] == 0: return {"nType": 6, "type": "P2WSH", "reqSigs": None, "addressHash": script[2:]} if l == 25 and \ script[:2] == b"\x76\xa9" and \ script[-2:] == b"\x88\xac": return {"nType": 0, "type": "P2PKH", "reqSigs": 1, "addressHash": script[3:-2]} if l == 23 and \ script[0] == 169 and \ script[-1] == 135: return {"nType": 1, "type": "P2SH", "reqSigs": None, "addressHash": script[2:-1]} if l == 67 and script[-1] == 172: return {"nType": 2, "type": "PUBKEY", "reqSigs": 1, "addressHash": hash160(script[1:-1])} if l == 35 and script[-1] == 172: return {"nType": 2, "type": "PUBKEY", "reqSigs": 1, "addressHash": hash160(script[1:-1])} if script[0] == OPCODE["OP_RETURN"]: if l == 1: return {"nType": 3, "type": "NULL_DATA", "reqSigs": 0, "data": b""} elif script[1] < OPCODE["OP_PUSHDATA1"]: if script[1] == l - 2: return {"nType": 3, "type": "NULL_DATA", "reqSigs": 0, "data": script[2:]} elif script[1] == OPCODE["OP_PUSHDATA1"]: if l > 2: if script[2] == l - 3 and script[2] <= 80: return {"nType": 3, "type": "NULL_DATA", "reqSigs": 0, "data": script[3:]} return {"nType": 8, "type": "NULL_DATA_NON_STANDARD", "reqSigs": 0, "script": script} if script[0] >= 81 and script[0] <= 96: if script[-1] == 174: if script[-2] >= 81 and script[-2] <= 96: if script[-2] >= script[0]: c, s = 0, 1 while l - 2 - s > 0: if script[s] < 0x4c: s += script[s] c += 1 else: c = 0 break s += 1 if c == script[-2] - 80: return {"nType": 4, "type": "MULTISIG", "reqSigs": script[0] - 80, "pubKeys": c, "script": script} s, m, n, last, req_sigs = 0, 0, 0, 0, 0 while l - s > 0: # OP_1 -> OP_16 if script[s] >= 81 and script[s] <= 96: if not n: n = script[s] - 80 elif not m: n, m = script[s] - 80, 0 elif n > m: n, m = script[s] - 80, 0 elif m == script[s] - 80: last = 0 if last else 2 elif script[s] < 0x4c: s += script[s] m += 1 if m > 16: n, m = 0, 0 elif script[s] == OPCODE["OP_PUSHDATA1"]: try: s += 1 + script[s + 1] except: break elif script[s] == OPCODE["OP_PUSHDATA2"]: try: s += 2 + unpack('<H', script[s+1: s + 3])[0] except: break elif script[s] == OPCODE["OP_PUSHDATA4"]: try: s += 4 + unpack('<L', script[s+1: s + 5])[0] except: break else: if script[s] == OPCODE["OP_CHECKSIG"]: req_sigs += 1 elif script[s] == OPCODE["OP_CHECKSIGVERIFY"]: req_sigs += 1 elif script[s] in (OPCODE["OP_CHECKMULTISIG"], OPCODE["OP_CHECKMULTISIGVERIFY"]): if last: req_sigs += n else: req_sigs += 20 n, m = 0, 0 if last: last -= 1 s += 1 return {"nType": 7, "type": "NON_STANDARD", "reqSigs": req_sigs, "script": script}
def script_to_address(script, testnet=False): """ Decode script to address (base58/bech32 format). :param script: script in bytes string or HEX encoded string format. :param testnet: (optional) flag for testnet network, by default is False. :return: address in base58/bech32 format or None. """ d = parse_script(script) if "addressHash" in d: witness_version = 0 if d["nType"] in (5, 6) else None script_hash = True if d["nType"] in (1, 6) else False return hash_to_address(d["addressHash"], testnet=testnet, script_hash=script_hash, witness_version=witness_version) return None
[docs]def decode_script(script, asm=False): """ Decode script to ASM format or to human readable OPCODES string. :param script: script in bytes string or HEX encoded string format. :param asm: (optional) If set to True decode to ASM format, by default is False. :return: script in ASM format string or OPCODES string. """ script = get_bytes(script) l = len(script) s = 0 result = [] append = result.append try: while l - s > 0: if script[s] < 0x4c and script[s]: if asm: append("OP_PUSHBYTES[%s]" % script[s] ) append(script[s + 1:s + 1 + script[s]].hex()) else: append('[%s]' % script[s]) s += script[s] + 1 continue if script[s] == OPCODE["OP_PUSHDATA1"]: if asm: ld = script[s + 1] append("OP_PUSHDATA1[%s]" % ld) append(script[s + 2:s + 2 + ld].hex()) else: append(RAW_OPCODE[script[s]]) ld = script[s + 1] append('[%s]' % ld) s += 1 + script[s + 1] + 1 elif script[s] == OPCODE["OP_PUSHDATA2"]: if asm: ld = unpack('<H', script[s + 1: s + 3])[0] append("OP_PUSHDATA2[%s]" % ld) append(script[s + 3:s + 3 + ld].hex()) else: ld = unpack('<H', script[s + 1: s + 3])[0] append(RAW_OPCODE[script[s]]) append('[%s]' % ld) s += 2 + 1 + ld elif script[s] == OPCODE["OP_PUSHDATA4"]: if asm: ld = unpack('<L', script[s + 1: s + 5])[0] append("OP_PUSHDATA4[%s]" % ld) append(script[s + 5:s + 5 + ld].hex()) else: ld = unpack('<L', script[s + 1: s + 5])[0] append(RAW_OPCODE[script[s]]) append('[%s]' % ld) s += 5 + 1 + ld else: append(RAW_OPCODE[script[s]]) s += 1 except: append("[SCRIPT_DECODE_FAILED]") return ' '.join(result)
[docs]def delete_from_script(script, sub_script): """ Delete OP_CODE or subscript from script. :param script: target script in bytes or HEX encoded string. :param sub_script: sub_script which is necessary to remove from target script in bytes or HEX encoded string. :return: script in bytes or HEX encoded string corresponding to the format of target script. """ s_hex = isinstance(script, str) script = get_bytes(script) sub_script = get_bytes(sub_script) stream = get_stream(script) if not sub_script: return script.hex() if s_hex else script r = b'' offset = 0 skip_until = 0 o, d = read_opcode(stream) while o: if script[offset:offset + len(sub_script)] == sub_script: skip_until = offset + len(sub_script) r += d[len(sub_script) - 1:] if d is not None else b"" if offset >= skip_until: r += o if d is not None: if o == OP_PUSHDATA1: r += bytes([len(d)]) elif o == OP_PUSHDATA2: r += pack('<H',len(d)) elif o == OP_PUSHDATA4: r += pack('<L',len(d)) r += d offset += 1 if d is not None: offset += len(d) if o == OP_PUSHDATA1: offset += 1 elif o == OP_PUSHDATA2: offset += 2 elif o == OP_PUSHDATA4: offset += 4 o, d = read_opcode(stream) return r.hex() if s_hex else r
[docs]def script_to_hash(script, witness=False, hex=True): """ Encode script to hash HASH160 or SHA256 in dependency of the witness. :param script: script in bytes or HEX encoded string. :param witness: (optional) If set to True return SHA256 hash for P2WSH, by default is False. :param hex: (optional) If set to True return key in HEX format, by default is True. :param sub_script: sub_script which is necessary to remove from target script in bytes or HEX encoded string. :return: hash HASH160 or SHA256 of script in bytes or HEX encoded. """ script = get_bytes(script) return sha256(script, hex) if witness else hash160(script, hex)
def op_push_data(data): data = get_bytes(data) if len(data) <= 0x4b: return b''.join([bytes([len(data)]), data]) elif len(data) <= 0xff: return b''.join([OP_PUSHDATA1, bytes([len(data)]), data]) elif len(data) <= 0xffff: return b''.join([OP_PUSHDATA2, pack('<H',len(data)), data]) else: return b''.join([OP_PUSHDATA4, pack('<L',len(data)), data]) def get_multisig_public_keys(script, hex=False): script = get_bytes(script) pub_keys = [] s = get_stream(script) o, d = read_opcode(s) while o: o, d = read_opcode(s) if d: pub_keys.append(d.hex() if hex else d) return pub_keys def read_opcode(stream): read = stream.read b = read(1) if not b: return None, None if b[0] <= 0x4b: return b, read(b[0]) elif b == OP_PUSHDATA1: return b, read(read(1)[0]) elif b == OP_PUSHDATA2: return b, read(unpack("<H", read(2))[0]) elif b == OP_PUSHDATA4: return b, read(unpack("<L", read(4))[0]) else: return b, None
[docs]def sign_message(msg, private_key, hex=True): """ Sign message :param msg: message to sign bytes or HEX encoded string. :param private_key: private key (bytes, hex encoded string or WIF format) :param hex: (optional) If set to True return key in HEX format, by default is True. :return: DER encoded signature in bytes or HEX encoded string. """ msg = get_bytes(msg) if isinstance(private_key, bytearray): private_key = bytes(private_key) if isinstance(private_key, str): try: private_key = bytes_from_hex(private_key) except: if is_wif_valid(private_key): private_key = wif_to_private_key(private_key, hex=False) if not isinstance(private_key, bytes): raise TypeError("private key must be a bytes, hex encoded string or in WIF format") signature = __secp256k1_ecdsa_sign__(msg, private_key) return signature.hex() if hex else signature
[docs]def verify_signature(sig, pub_key, msg, encoding="hex"): """ Verify signature for message and given public key :param sig: signature in bytes or HEX encoded string. :param pub_key: public key in bytes or HEX encoded string. :param msg: message in bytes or HEX encoded string. :return: boolean. """ sig = get_bytes(sig) pub_key = get_bytes(pub_key) msg = get_bytes(msg, encoding=encoding) r = __secp256k1_ecdsa_verify__(sig, pub_key, msg) if r == 1: return True return False
def public_key_recovery(signature, messsage, rec_id, compressed=True, hex=True): signature = get_bytes(signature) messsage = get_bytes(messsage) pub = __secp256k1_ecdsa_recover__(signature, messsage, rec_id, compressed) if isinstance(pub, int): if pub == 0: return None else: raise RuntimeError("signature recovery error %s" % pub) return pub.hex() if hex else pub
[docs]def is_valid_signature_encoding(sig): """ Check is valid signature encoded in DER format :param sig: signature in bytes or HEX encoded string. :return: boolean. """ # Format: 0x30 [total-length] 0x02 [R-length] [R] 0x02 [S-length] [S] [sighash] # * total-length: 1-byte length descriptor of everything that follows, # excluding the sighash byte. # * R-length: 1-byte length descriptor of the R value that follows. # * R: arbitrary-length big-endian encoded R value. It must use the shortest # possible encoding for a positive integers (which means no null bytes at # the start, except a single one when the next byte has its highest bit set). # * S-length: 1-byte length descriptor of the S value that follows. # * S: arbitrary-length big-endian encoded S value. The same rules apply. # * sighash: 1-byte value indicating what data is hashed (not part of the DER # signature) sig = get_bytes(sig) length = len(sig) # Minimum and maximum size constraints. if (length < 9) or (length > 73): return False # A signature is of type 0x30 (compound). if sig[0] != 0x30: return False # Make sure the length covers the entire signature. if sig[1] != (length - 3): return False # Extract the length of the R element. len_r = sig[3] # Zero-length integers are not allowed for R. if len_r == 0: return False # Make sure the length of the S element is still inside the signature. if (5 + len_r) >= length: return False # Extract the length of the S element. len_s = sig[5 + len_r] # Verify that the length of the signature matches the sum of the length # of the elements. # Zero-length integers are not allowed for S. if len_s == 0: return False if (len_r + len_s + 7) != length: return False # Check whether the R element is an integer. if sig[2] != 0x02: return False # Negative numbers are not allowed for R. if sig[4] & 0x80: return False # Null bytes at the start of R are not allowed, unless R would # otherwise be interpreted as a negative number. if (len_r > 1) and (sig[4] == 0x00) and (not sig[5] & 0x80): return False # Check whether the S element is an integer. if sig[len_r + 4] != 0x02: return False # Negative numbers are not allowed for S. if sig[len_r + 6] & 0x80: return False # Null bytes at the start of S are not allowed, unless S would otherwise be # interpreted as a negative number. if (len_s > 1) and (sig[len_r + 6] == 0x00) and (not sig[len_r + 7] & 0x80): return False return True
def parse_signature(sig): """ Check is valid signature encoded in DER format :param sig: signature in bytes or HEX encoded string. :return: boolean. """ # Format: 0x30 [total-length] 0x02 [R-length] [R] 0x02 [S-length] [S] [sighash] # * total-length: 1-byte length descriptor of everything that follows, # excluding the sighash byte. # * R-length: 1-byte length descriptor of the R value that follows. # * R: arbitrary-length big-endian encoded R value. It must use the shortest # possible encoding for a positive integers (which means no null bytes at # the start, except a single one when the next byte has its highest bit set). # * S-length: 1-byte length descriptor of the S value that follows. # * S: arbitrary-length big-endian encoded S value. The same rules apply. # * sighash: 1-byte value indicating what data is hashed (not part of the DER # signature) sig = get_bytes(sig) if not is_valid_signature_encoding(sig): raise ValueError("invalid signature") len_r = sig[3] r = sig[5:4+ len_r] s = sig[len_r + 6:-1] return r, s