Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,367 changes: 1,367 additions & 0 deletions bitcoinutils/psbt.py

Large diffs are not rendered by default.

369 changes: 293 additions & 76 deletions bitcoinutils/script.py

Large diffs are not rendered by default.

248 changes: 197 additions & 51 deletions bitcoinutils/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
# No part of python-bitcoin-utils, including this file, may be copied, modified,
# propagated, or distributed except according to the terms contained in the
# LICENSE file.

from typing import Union
import math
import hashlib
import struct
from io import BytesIO
from typing import Optional, Union

from bitcoinutils.constants import (
Expand Down Expand Up @@ -42,6 +43,8 @@
parse_compact_size,
)

def i_to_little_endian(value, length):
return value.to_bytes(length, byteorder='little')

class TxInput:
"""Represents a transaction input.
Expand Down Expand Up @@ -76,7 +79,7 @@ def __init__(
txid: str,
txout_index: int,
script_sig=Script([]),
sequence: str | bytes = DEFAULT_TX_SEQUENCE,
sequence: Union[str, bytes] = DEFAULT_TX_SEQUENCE,
) -> None:
"""See TxInput description"""

Expand Down Expand Up @@ -348,7 +351,31 @@ def copy(cls, txout: "TxOutput") -> "TxOutput":
"""Deep copy of TxOutput"""

return cls(txout.amount, txout.script_pubkey)


@classmethod
def from_bytes(cls, b):
"""Deserialize a TxOutput from bytes."""
from bitcoinutils.script import Script
from bitcoinutils.utils import read_varint
import struct
from io import BytesIO

stream = BytesIO(b)

# Read 8-byte value (little-endian)
value = struct.unpack('<Q', stream.read(8))[0]

# Read varint length of scriptPubKey
script_len, varint_size = read_varint(stream.read(9)) # Read up to 9 bytes for varint
stream.seek(8 + varint_size) # Seek back to right after the varint

# Read the script
script_data = stream.read(script_len)

# Create Script object from the raw bytes
script_pubkey = Script.from_raw(script_data)

return cls(value, script_pubkey)

class Sequence:
"""Helps setting up appropriate sequence. Used to provide the sequence to
Expand Down Expand Up @@ -512,7 +539,7 @@ def __init__(
self,
inputs: Optional[list[TxInput]] = None,
outputs: Optional[list[TxOutput]] = None,
locktime: str | bytes = DEFAULT_TX_LOCKTIME,
locktime: Union[str, bytes, int] = DEFAULT_TX_LOCKTIME,
version: bytes = DEFAULT_TX_VERSION,
has_segwit: bool = False,
witnesses: Optional[list[TxWitnessInput]] = None,
Expand All @@ -532,9 +559,11 @@ def __init__(
self.has_segwit = has_segwit
self.witnesses = witnesses

# if user provided a locktime it would be as string (for now...)
# Always store locktime as int
if isinstance(locktime, str):
self.locktime = h_to_b(locktime)
self.locktime = int.from_bytes(h_to_b(locktime), 'little')
elif isinstance(locktime, bytes):
self.locktime = int.from_bytes(locktime, 'little')
else:
self.locktime = locktime

Expand Down Expand Up @@ -625,7 +654,7 @@ def __str__(self) -> str:
"outputs": self.outputs,
"has_segwit": self.has_segwit,
"witnesses": self.witnesses,
"locktime": self.locktime.hex(),
"locktime": self.locktime,
"version": self.version.hex(),
}
)
Expand Down Expand Up @@ -878,7 +907,7 @@ def get_transaction_segwit_digest(
tx_for_signing += hash_outputs

# add locktime
tx_for_signing += self.locktime
tx_for_signing += self.locktime.to_bytes(4, byteorder="little")

# add sighash type
tx_for_signing += struct.pack("<i", sighash)
Expand All @@ -902,32 +931,32 @@ def get_transaction_taproot_digest(
Also consult Bitcoin Core code at: https://github.com/bitcoin/bitcoin/blob/29c36f070618ea5148cd4b2da3732ee4d37af66b/src/script/interpreter.cpp#L1478
And: https://github.com/bitcoin/bitcoin/blob/b5f33ac1f82aea290b4653af36ac2ad1bf1cce7b/test/functional/test_framework/script.py

| SIGHASH types (see constants.py):
| TAPROOT_SIGHASH_ALL - signs all inputs and outputs (default)
| SIGHASH_ALL - signs all inputs and outputs
| SIGHASH_NONE - signs all of the inputs
| SIGHASH_SINGLE - signs all inputs but only txin_index output
| SIGHASH_ANYONECANPAY (only combined with one of the above)
| - with ALL - signs all outputs but only txin_index input
| - with NONE - signs only the txin_index input
| - with SINGLE - signs txin_index input and output

Attributes
----------
txin_index : int
The index of the input that we wish to sign
script_pubkeys : list(Script)
The scriptPubkeys that correspond to all the inputs/UTXOs
amounts : int/float/Decimal
The amounts that correspond to all the inputs/UTXOs
ext_flag : int
Extension mechanism, default is 0; 1 is for script spending (BIP342)
script : Script object
The script that we are spending (ext_flag=1)
leaf_ver : int
The script version, LEAF_VERSION_TAPSCRIPT for the default tapscript
sighash : int
The type of the signature hash to be created
| SIGHASH types (see constants.py):
| TAPROOT_SIGHASH_ALL - signs all inputs and outputs (default)
| SIGHASH_ALL - signs all inputs and outputs
| SIGHASH_NONE - signs all of the inputs
| SIGHASH_SINGLE - signs all inputs but only txin_index output
| SIGHASH_ANYONECANPAY (only combined with one of the above)
| - with ALL - signs all outputs but only txin_index input
| - with NONE - signs only the txin_index input
| - with SINGLE - signs txin_index input and output

Attributes
----------
txin_index : int
The index of the input that we wish to sign
script_pubkeys : list(Script)
The scriptPubkeys that correspond to all the inputs/UTXOs
amounts : int/float/Decimal
The amounts that correspond to all the inputs/UTXOs
ext_flag : int
Extension mechanism, default is 0; 1 is for script spending (BIP342)
script : Script object
The script that we are spending (ext_flag=1)
leaf_ver : int
The script version, LEAF_VERSION_TAPSCRIPT for the default tapscript
sighash : int
The type of the signature hash to be created
"""

# clone transaction to modify without messing up the real transaction
Expand All @@ -951,7 +980,7 @@ def get_transaction_taproot_digest(
tx_for_signing += self.version

# add locktime
tx_for_signing += self.locktime
tx_for_signing += self.locktime.to_bytes(4, byteorder="little")

# defaults
hash_prevouts = b""
Expand All @@ -974,7 +1003,21 @@ def get_transaction_taproot_digest(

# the SHA256 of the serialization of all input amounts
for a in amounts:
hash_amounts += a.to_bytes(8, "little")
# Fix: Handle both int and bytes types for amounts
if isinstance(a, int):
hash_amounts += a.to_bytes(8, "little")
elif isinstance(a, bytes):
# If it's already bytes, ensure it's 8 bytes little-endian
if len(a) == 8:
hash_amounts += a
else:
# Convert bytes to int then back to 8-byte little-endian
amount_int = int.from_bytes(a, "little")
hash_amounts += amount_int.to_bytes(8, "little")
else:
# Handle other numeric types (float, Decimal)
amount_int = int(a)
hash_amounts += amount_int.to_bytes(8, "little")
hash_amounts = hashlib.sha256(hash_amounts).digest()
tx_for_signing += hash_amounts

Expand Down Expand Up @@ -1017,7 +1060,19 @@ def get_transaction_taproot_digest(
txin.txout_index,
)

tx_for_signing += amounts[txin_index].to_bytes(8, "little")
# Fix: Handle amount type for anyone_can_pay case
amount = amounts[txin_index]
if isinstance(amount, int):
tx_for_signing += amount.to_bytes(8, "little")
elif isinstance(amount, bytes):
if len(amount) == 8:
tx_for_signing += amount
else:
amount_int = int.from_bytes(amount, "little")
tx_for_signing += amount_int.to_bytes(8, "little")
else:
amount_int = int(amount)
tx_for_signing += amount_int.to_bytes(8, "little")

script_pubkey = script_pubkeys[txin_index].to_hex()
script_len = int(len(script_pubkey) / 2)
Expand Down Expand Up @@ -1090,7 +1145,7 @@ def to_bytes(self, has_segwit: bool) -> bytes:
witnesses_count_bytes = encode_varint(len(witness.stack))
data += witnesses_count_bytes
data += witness.to_bytes()
data += self.locktime
data += i_to_little_endian(self.locktime, 4)
return data

def get_txid(self) -> str:
Expand Down Expand Up @@ -1120,30 +1175,30 @@ def get_size(self) -> int:
return len(self.to_bytes(self.has_segwit))

def get_vsize(self) -> int:
"""Gets the virtual size of the transaction.
"""
Gets the virtual size of the transaction.

For non-segwit txs this is identical to get_size(). For segwit txs the
marker and witnesses length needs to be reduced to 1/4 of its original
length. Thus it is substructed from size and then it is divided by 4
before added back to size to produce vsize (always rounded up).
length. Thus it is subtracted from size and then it is divided by 4
before being added back to size to produce vsize (always rounded up).

https://en.bitcoin.it/wiki/Weight_units
"""
# return size if non segwit
# return size if non-segwit
if not self.has_segwit:
return self.get_size()

marker_size = 2

wit_size = 0
data = b""

# count witnesses data
# count witness data
for witness in self.witnesses:
# add witnesses stack count
# add witness stack count
witnesses_count_bytes = chr(len(witness.stack)).encode()
data += witnesses_count_bytes
data += witness.to_bytes()

wit_size = len(data)

size = self.get_size() - (marker_size + wit_size)
Expand All @@ -1152,19 +1207,110 @@ def get_vsize(self) -> int:
return int(math.ceil(vsize))

def to_hex(self) -> str:
"""Converts object to hexadecimal string"""

"""Converts object to hexadecimal string."""
return b_to_h(self.to_bytes(self.has_segwit))

def serialize(self) -> str:
"""Converts object to hexadecimal string"""

"""Alias for to_hex()."""
return self.to_hex()

@staticmethod
def from_bytes(tx_bytes):
"""
Minimal inline deserializer for Transaction from bytes.
Only for testing — not full-featured!
"""
stream = BytesIO(tx_bytes)
return Transaction._parse_from_stream(stream)

@classmethod
def _parse_from_stream(cls, stream):
"""
Parse a Bitcoin transaction from a byte stream (PSBT raw transaction).
"""
def read_varint(s):
i = s.read(1)[0]
if i == 0xfd:
return struct.unpack('<H', s.read(2))[0]
elif i == 0xfe:
return struct.unpack('<I', s.read(4))[0]
elif i == 0xff:
return struct.unpack('<Q', s.read(8))[0]
else:
return i

version = struct.unpack('<I', stream.read(4))[0]
marker = stream.read(1)

if marker == b'\x00': # segwit marker
flag = stream.read(1)
is_segwit = True
else:
is_segwit = False
stream.seek(-1, 1) # rewind one byte

input_count = read_varint(stream)
inputs = []
for _ in range(input_count):
txid = stream.read(32)[::-1].hex()
vout = struct.unpack('<I', stream.read(4))[0]
script_len = read_varint(stream)
script_sig_bytes = stream.read(script_len)
sequence = stream.read(4)

# FIX: Create proper TxInput object instead of dictionary
script_sig = Script.from_raw(script_sig_bytes.hex(), has_segwit=is_segwit)
tx_input = TxInput(
txid=txid,
txout_index=vout,
script_sig=script_sig,
sequence=sequence
)
inputs.append(tx_input)

output_count = read_varint(stream)
outputs = []
for _ in range(output_count):
value = struct.unpack('<Q', stream.read(8))[0]
script_len = read_varint(stream)
script_pubkey_bytes = stream.read(script_len)

# FIX: Create proper TxOutput object instead of dictionary
script_pubkey = Script.from_raw(script_pubkey_bytes.hex(), has_segwit=is_segwit)
tx_output = TxOutput(
amount=value, # This will be accessible as tx_output.amount (int)
script_pubkey=script_pubkey
)
outputs.append(tx_output)

witnesses = []
if is_segwit:
for _ in range(input_count):
witness_count = read_varint(stream)
witness_stack = []
for _ in range(witness_count):
item_len = read_varint(stream)
witness_item = stream.read(item_len).hex()
witness_stack.append(witness_item)
if witness_stack:
witnesses.append(TxWitnessInput(witness_stack))

locktime_bytes = stream.read(4)

# Construct and return the transaction with proper objects
return cls(
inputs=inputs,
outputs=outputs,
locktime=locktime_bytes,
version=version.to_bytes(4, 'little'),
has_segwit=is_segwit,
witnesses=witnesses
)


def main():
pass


if __name__ == "__main__":
main()
main()
Loading