From 67bced2cde83e8c01e0beb6d1ecae027e17e3d66 Mon Sep 17 00:00:00 2001 From: dtfiedler Date: Tue, 27 Jan 2026 21:56:15 -0600 Subject: [PATCH] fix(types): resolve all 54 mypy type errors Add TypedDict for SIG_CONFIG, widen bytes/bytearray types in Signer base class, fix Optional narrowing in StreamingDataItem, and correct return type annotations across sign, dataitem, and signer modules. Co-Authored-By: Claude Opus 4.5 --- turbo_sdk/bundle/constants.py | 11 ++++++++- turbo_sdk/bundle/dataitem.py | 6 ++--- turbo_sdk/bundle/sign.py | 8 +++---- turbo_sdk/bundle/stream.py | 44 +++++++++++++++++++++-------------- turbo_sdk/bundle/utils.py | 5 ++-- turbo_sdk/client.py | 5 ++-- turbo_sdk/signers/arweave.py | 16 +++++++------ turbo_sdk/signers/ethereum.py | 27 ++++++++++----------- turbo_sdk/signers/signer.py | 15 ++++++++---- 9 files changed, 82 insertions(+), 55 deletions(-) diff --git a/turbo_sdk/bundle/constants.py b/turbo_sdk/bundle/constants.py index 3f0e142..e09fb31 100644 --- a/turbo_sdk/bundle/constants.py +++ b/turbo_sdk/bundle/constants.py @@ -1,4 +1,13 @@ -SIG_CONFIG = { +from typing import Dict, TypedDict + + +class SigConfigEntry(TypedDict): + sigLength: int + pubLength: int + sigName: str + + +SIG_CONFIG: Dict[int, SigConfigEntry] = { 1: {"sigLength": 512, "pubLength": 512, "sigName": "arweave"}, 2: {"sigLength": 64, "pubLength": 32, "sigName": "ed25519"}, 3: {"sigLength": 65, "pubLength": 65, "sigName": "ethereum"}, diff --git a/turbo_sdk/bundle/dataitem.py b/turbo_sdk/bundle/dataitem.py index 18eab87..7795e00 100644 --- a/turbo_sdk/bundle/dataitem.py +++ b/turbo_sdk/bundle/dataitem.py @@ -17,7 +17,7 @@ class DataItem: def __init__(self, buffer: bytearray): self.binary = buffer - self._id = [] + self._id = b"" @staticmethod def is_data_item(item: Any) -> bool: @@ -75,7 +75,7 @@ def raw_signature(self): return self.binary[2 : 2 + self.signature_length] @property - def signature(self) -> bytearray: + def signature(self) -> bytes: return base64.urlsafe_b64encode(self.raw_signature) @property @@ -142,7 +142,7 @@ def get_tags_size(self) -> int: def get_raw(self) -> bytearray: return self.binary - def sign(self, signer: "Signer") -> bytearray: + def sign(self, signer: "Signer") -> bytes: from .sign import sign self._id = sign(self, signer) diff --git a/turbo_sdk/bundle/sign.py b/turbo_sdk/bundle/sign.py index 3b770c3..5e0ae6b 100644 --- a/turbo_sdk/bundle/sign.py +++ b/turbo_sdk/bundle/sign.py @@ -2,7 +2,7 @@ from typing import BinaryIO, Callable, Optional -def deep_hash(data) -> bytearray: +def deep_hash(data) -> bytes: """ Create a deep hash using the exact Irys/ANS-104 algorithm """ @@ -17,7 +17,7 @@ def deep_hash(data) -> bytearray: return hashlib.sha384(tagged_hash).digest() -def deep_hash_chunks(chunks, acc: bytearray): +def deep_hash_chunks(chunks, acc: bytes) -> bytes: """ Recursively hash chunks for deep hash algorithm """ @@ -28,7 +28,7 @@ def deep_hash_chunks(chunks, acc: bytearray): return deep_hash_chunks(chunks[1:], new_acc) -def get_signature_data(dataitem) -> bytearray: +def get_signature_data(dataitem) -> bytes: """ Get the data that needs to be signed for a DataItem Using exact Irys implementation @@ -232,4 +232,4 @@ def sign_stream( on_progress=on_progress, ) - return signer.sign(signature_data) + return bytes(signer.sign(signature_data)) diff --git a/turbo_sdk/bundle/stream.py b/turbo_sdk/bundle/stream.py index 83f6841..e3620e5 100644 --- a/turbo_sdk/bundle/stream.py +++ b/turbo_sdk/bundle/stream.py @@ -144,15 +144,17 @@ def is_signed(self) -> bool: @property def signature_type(self) -> int: - if not self._signed: + signer = self._signer + if signer is None: raise RuntimeError("Must call sign() first") - return self._signer.signature_type + return int(signer.signature_type) @property def raw_signature(self) -> bytes: - if not self._signed: + signature = self._signature + if signature is None: raise RuntimeError("Must call sign() first") - return self._signature + return signature @property def signature(self) -> bytes: @@ -168,9 +170,10 @@ def id(self) -> str: @property def raw_owner(self) -> bytes: - if not self._signed: + signer = self._signer + if signer is None: raise RuntimeError("Must call sign() first") - return self._signer.public_key + return bytes(signer.public_key) @property def owner(self) -> bytes: @@ -182,9 +185,10 @@ def raw_target(self) -> bytes: @property def raw_anchor(self) -> bytes: - if not self._signed: + anchor = self._anchor + if anchor is None: raise RuntimeError("Must call sign() first") - return self._anchor + return anchor @property def tags(self) -> List[Dict[str, str]]: @@ -193,16 +197,18 @@ def tags(self) -> List[Dict[str, str]]: @property def total_size(self) -> int: """Total size in bytes (header + data). Must call sign() first.""" - if not self._signed: + header = self._header + if header is None: raise RuntimeError("Must call sign() first") - return len(self._header) + self._data_size + return len(header) + self._data_size @property def header_size(self) -> int: """Size of the header in bytes. Must call sign() first.""" - if not self._signed: + header = self._header + if header is None: raise RuntimeError("Must call sign() first") - return len(self._header) + return len(header) def read(self, size: int = -1) -> bytes: """ @@ -220,7 +226,9 @@ def read(self, size: int = -1) -> bytes: Raises: RuntimeError: If sign() has not been called """ - if not self._signed: + header = self._header + data_stream = self._data_stream + if header is None or data_stream is None: raise RuntimeError("Must call sign() first") if size == 0: @@ -236,10 +244,10 @@ def read(self, size: int = -1) -> bytes: remaining = size # Read from header first - if self._header_offset < len(self._header): - header_remaining = len(self._header) - self._header_offset + if self._header_offset < len(header): + header_remaining = len(header) - self._header_offset to_read = min(header_remaining, remaining) - header_chunk = self._header[self._header_offset : self._header_offset + int(to_read)] + header_chunk = header[self._header_offset : self._header_offset + int(to_read)] result.extend(header_chunk) self._header_offset += len(header_chunk) remaining -= len(header_chunk) @@ -247,9 +255,9 @@ def read(self, size: int = -1) -> bytes: # Then read from data stream if remaining > 0: if remaining == float("inf"): - data_chunk = self._data_stream.read() + data_chunk = data_stream.read() else: - data_chunk = self._data_stream.read(int(remaining)) + data_chunk = data_stream.read(int(remaining)) if data_chunk: result.extend(data_chunk) diff --git a/turbo_sdk/bundle/utils.py b/turbo_sdk/bundle/utils.py index cfcacbd..f99819f 100644 --- a/turbo_sdk/bundle/utils.py +++ b/turbo_sdk/bundle/utils.py @@ -1,4 +1,5 @@ import base64 +from typing import Union def b64url_decode(s: str) -> bytes: @@ -9,13 +10,13 @@ def b64url_decode(s: str) -> bytes: return base64.urlsafe_b64decode(s) -def set_bytes(dest: bytearray, src: bytearray, offset: int): +def set_bytes(dest: bytearray, src: Union[bytes, bytearray], offset: int) -> None: """Set bytes in destination array at offset""" for i in range(offset, offset + len(src), 1): dest[i] = src[i - offset] -def byte_array_to_long(byte_array: bytearray): +def byte_array_to_long(byte_array: Union[bytes, bytearray]) -> int: """Convert byte array to long using little-endian""" value = 0 for i in range(len(byte_array) - 1, -1, -1): diff --git a/turbo_sdk/client.py b/turbo_sdk/client.py index bdfb630..880106d 100644 --- a/turbo_sdk/client.py +++ b/turbo_sdk/client.py @@ -126,7 +126,7 @@ def factory(): size, size, ) - raw = data.read() + raw = data.read() # type: ignore[union-attr] def factory(): return io.BytesIO(raw) @@ -218,6 +218,7 @@ def _upload_chunked( total_size = streaming_item.total_size # Create chunked uploader + assert self.token is not None uploader = ChunkedUploader( upload_url=self.upload_url, token=self.token, @@ -226,7 +227,7 @@ def _upload_chunked( # Upload using the streaming item as a file-like object return uploader.upload( - data=streaming_item, + data=streaming_item, # type: ignore[arg-type] total_size=total_size, on_progress=on_progress, ) diff --git a/turbo_sdk/signers/arweave.py b/turbo_sdk/signers/arweave.py index 1fb8602..94fbd85 100644 --- a/turbo_sdk/signers/arweave.py +++ b/turbo_sdk/signers/arweave.py @@ -1,3 +1,5 @@ +from typing import Any, Union + from turbo_sdk.signers.signer import Signer from turbo_sdk.bundle.utils import b64url_decode from cryptography.hazmat.primitives import hashes @@ -7,11 +9,11 @@ class ArweaveSigner(Signer): - signature_type = 1 - signature_length = 512 - owner_length = 512 - public_key = None - private_key = None + signature_type: int = 1 + signature_length: int = 512 + owner_length: int = 512 + public_key: bytearray = bytearray() + private_key: Any = None def __init__(self, jwk: dict): """ @@ -51,7 +53,7 @@ def _jwk_to_public_key_bytes(self, jwk: dict) -> bytearray: raise ValueError(f"Invalid Arweave public key length: {len(n_bytes)} (expected 512)") return bytearray(n_bytes) - def sign(self, message: bytearray) -> bytearray: + def sign(self, message: Union[bytes, bytearray]) -> bytearray: """Sign using RSA-PSS SHA-256""" signature = self.private_key.sign( bytes(message), @@ -61,7 +63,7 @@ def sign(self, message: bytearray) -> bytearray: return bytearray(signature) @staticmethod - def verify(pubkey: bytearray, message: bytearray, signature: bytearray) -> bool: + def verify(pubkey: Union[bytes, bytearray], message: Union[bytes, bytearray], signature: Union[bytes, bytearray], **opts: Any) -> bool: """Verify RSA-PSS signature""" try: # Convert modulus bytes to RSA public key diff --git a/turbo_sdk/signers/ethereum.py b/turbo_sdk/signers/ethereum.py index 372aaf6..9c999da 100644 --- a/turbo_sdk/signers/ethereum.py +++ b/turbo_sdk/signers/ethereum.py @@ -1,5 +1,5 @@ import codecs -from typing import Any +from typing import Any, Union from turbo_sdk.signers.signer import Signer from turbo_sdk.bundle.constants import SIG_CONFIG from eth_account import Account @@ -9,11 +9,11 @@ class EthereumSigner(Signer): - public_key = None - private_key = None - signature_type = 3 - signature_length = SIG_CONFIG[3]["sigLength"] - owner_length = SIG_CONFIG[3]["pubLength"] + public_key: bytes = b"" + private_key: Any = None + signature_type: int = 3 + signature_length: int = SIG_CONFIG[3]["sigLength"] + owner_length: int = SIG_CONFIG[3]["pubLength"] def __init__(self, private_key: str): """ @@ -27,7 +27,7 @@ def __init__(self, private_key: str): self.private_key = keys.PrivateKey(dec) self.public_key = b"\x04" + self.private_key.public_key.to_bytes() - def sign(self, message: bytearray) -> bytearray: + def sign(self, message: Union[bytes, bytearray]) -> bytearray: """ Sign a message using Ethereum's personal_sign format @@ -43,17 +43,18 @@ def sign(self, message: bytearray) -> bytearray: return bytearray.fromhex(signature[2:] if signature.startswith("0x") else signature) @staticmethod - def verify(pubkey: bytearray, message: bytearray, signature: bytearray, **opts: Any) -> bool: + def verify(pubkey: Union[bytes, bytearray], message: Union[bytes, bytearray], signature: Union[bytes, bytearray], **opts: Any) -> bool: msg = encode_defunct(primitive=message) msg_hash = _hash_eip191_message(msg) # trim pubkey - pubkey = keys.PublicKey(pubkey if len(pubkey) == 64 else pubkey[1:]) + pk = keys.PublicKey(pubkey if len(pubkey) == 64 else pubkey[1:]) # standardize v value - signature[64] = to_standard_v(signature[64]) - signature = keys.Signature(signature) - valid = keys.ecdsa_verify(msg_hash, signature, pubkey) + sig_array = bytearray(signature) + sig_array[64] = to_standard_v(sig_array[64]) + sig = keys.Signature(sig_array) + valid: bool = keys.ecdsa_verify(msg_hash, sig, pk) # type: ignore[arg-type] return valid def get_wallet_address(self) -> str: """Get the Ethereum wallet address (checksum format)""" - return self.private_key.public_key.to_checksum_address() + return str(self.private_key.public_key.to_checksum_address()) diff --git a/turbo_sdk/signers/signer.py b/turbo_sdk/signers/signer.py index 75df69e..b4580e9 100644 --- a/turbo_sdk/signers/signer.py +++ b/turbo_sdk/signers/signer.py @@ -1,5 +1,5 @@ from abc import abstractmethod -from typing import Any, Dict +from typing import Any, Dict, Union import base64 import secrets @@ -8,7 +8,7 @@ class Signer: @property @abstractmethod - def public_key(self) -> bytearray: + def public_key(self) -> Union[bytes, bytearray]: pass @property @@ -27,12 +27,17 @@ def owner_length(self) -> int: pass @abstractmethod - def sign(self, message: bytearray, **opts: Any) -> bytearray: + def sign(self, message: Union[bytes, bytearray]) -> Union[bytes, bytearray]: pass + @staticmethod @abstractmethod - # @staticmethod - def verify(pubkey: bytearray, message: bytearray, signature: bytearray, **opts: Any) -> bool: + def verify( + pubkey: Union[bytes, bytearray], + message: Union[bytes, bytearray], + signature: Union[bytes, bytearray], + **opts: Any, + ) -> bool: pass @abstractmethod