From 354d64950c717fddb5bb25fd23ad4eb145c1d6fe Mon Sep 17 00:00:00 2001 From: dtfiedler Date: Wed, 21 Jan 2026 17:10:09 -0600 Subject: [PATCH 01/15] feat: add multipart upload support with progress callbacks - Add ChunkedUploader class for multipart upload lifecycle - Add ChunkingParams for configurable chunk size (5-500 MiB), concurrency, and chunking mode (auto/force/disabled) - Extend upload() with on_progress callback and chunking params - Add custom upload_url/payment_url support to Turbo client - Add TurboUploadStatus, ChunkedUploadInit types - Add UnderfundedError, UploadValidationError, UploadFinalizationError - Add unit tests for chunking logic (28 tests) - Add performance benchmark tests (skipped by default) - Update README with multipart upload documentation Co-Authored-By: Claude Opus 4.5 --- README.md | 103 ++++++- pyproject.toml | 5 +- tests/test_chunked.py | 496 ++++++++++++++++++++++++++++++ tests/test_chunked_performance.py | 468 ++++++++++++++++++++++++++++ turbo_sdk/__init__.py | 32 +- turbo_sdk/chunked.py | 319 +++++++++++++++++++ turbo_sdk/client.py | 127 +++++++- turbo_sdk/types.py | 50 ++- 8 files changed, 1583 insertions(+), 17 deletions(-) create mode 100644 tests/test_chunked.py create mode 100644 tests/test_chunked_performance.py create mode 100644 turbo_sdk/chunked.py diff --git a/README.md b/README.md index 962c934..01ebb2d 100644 --- a/README.md +++ b/README.md @@ -61,21 +61,42 @@ print(f"✅ Uploaded! URI: ar://{result.id}") ### Core Classes -#### `Turbo(signer, network="mainnet")` +#### `Turbo(signer, network="mainnet", upload_url=None, payment_url=None)` Main client for interacting with Turbo services. **Parameters:** - `signer`: Either `EthereumSigner` or `ArweaveSigner` instance - `network`: `"mainnet"` or `"testnet"` (default: `"mainnet"`) +- `upload_url`: Optional custom upload service URL (overrides network default) +- `payment_url`: Optional custom payment service URL (overrides network default) + +```python +# Using default URLs (mainnet) +turbo = Turbo(signer) + +# Using testnet +turbo = Turbo(signer, network="testnet") + +# Using custom URLs +turbo = Turbo(signer, upload_url="https://my-upload-service.example.com") +``` **Methods:** -##### `upload(data, tags=None) -> TurboUploadResponse` +##### `upload(data, tags=None, on_progress=None, chunking=None, data_size=None) -> TurboUploadResponse` + +Upload data to the Turbo datachain. Supports both small files (single request) and large files (chunked multipart upload). -Upload data to the Turbo datachain. +**Parameters:** +- `data`: Data to upload (`bytes` or file-like `BinaryIO` object) +- `tags`: Optional list of metadata tags +- `on_progress`: Optional callback `(processed_bytes, total_bytes) -> None` +- `chunking`: Optional `ChunkingParams` for upload configuration +- `data_size`: Required when `data` is a file-like object ```python +# Simple upload result = turbo.upload( data=b"Your data here", tags=[ @@ -96,6 +117,51 @@ class TurboUploadResponse: winc: str # Winston credits cost ``` +##### Large File Uploads with Progress + +For files >= 5 MiB, the SDK automatically uses chunked multipart uploads. You can track progress with a callback: + +```python +def on_progress(processed: int, total: int): + pct = (processed / total) * 100 + print(f"Upload progress: {pct:.1f}%") + +# Upload a large file with progress tracking +with open("large-video.mp4", "rb") as f: + result = turbo.upload( + data=f, + data_size=os.path.getsize("large-video.mp4"), + tags=[{"name": "Content-Type", "value": "video/mp4"}], + on_progress=on_progress, + ) +``` + +##### Chunking Configuration + +Use `ChunkingParams` to customize chunked upload behavior: + +```python +from turbo_sdk import ChunkingParams + +result = turbo.upload( + data=large_data, + chunking=ChunkingParams( + chunk_byte_count=10 * 1024 * 1024, # 10 MiB chunks (default: 5 MiB) + max_chunk_concurrency=3, # Parallel chunk uploads (default: 1) + chunking_mode="auto", # "auto", "force", or "disabled" + ), + on_progress=lambda p, t: print(f"{p}/{t} bytes"), +) +``` + +**ChunkingParams options:** +- `chunk_byte_count`: Chunk size in bytes (5-500 MiB, default: 5 MiB) +- `max_chunk_concurrency`: Number of parallel chunk uploads (default: 1) +- `chunking_mode`: + - `"auto"` (default): Use chunked upload for files >= 5 MiB + - `"force"`: Always use chunked upload + - `"disabled"`: Always use single request upload + ##### `get_balance(address=None) -> TurboBalanceResponse` Get winston credit balance. Uses signed request for authenticated balance check when no address specified. @@ -179,6 +245,27 @@ Create signed headers for authenticated API requests. headers = signer.create_signed_headers() ``` +### Exceptions + +The SDK provides specific exceptions for error handling: + +```python +from turbo_sdk import UnderfundedError, ChunkedUploadError + +try: + result = turbo.upload(large_data) +except UnderfundedError: + print("Insufficient balance - please top up your account") +except ChunkedUploadError as e: + print(f"Upload failed: {e}") +``` + +**Exception types:** +- `ChunkedUploadError`: Base exception for chunked upload failures +- `UnderfundedError`: Account has insufficient balance (HTTP 402) +- `UploadValidationError`: Upload validation failed (INVALID status) +- `UploadFinalizationError`: Finalization timed out or failed + ## Developers @@ -203,7 +290,15 @@ pip install -e ".[dev]" pytest ``` -That's it! The test suite includes comprehensive tests for all components without requiring network access. +3. **Run performance benchmarks** (requires funded wallet): + +```bash +export TURBO_TEST_WALLET=/path/to/wallet.json +export TURBO_UPLOAD_URL=https://upload.ardrive.dev # optional, defaults to testnet +pytest -m performance -v -s +``` + +The test suite includes comprehensive unit tests for all components. Performance tests measure real upload throughput against the Turbo service. ## Acknowledgments diff --git a/pyproject.toml b/pyproject.toml index de27ffe..708b5f0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -79,7 +79,10 @@ testpaths = ["tests"] python_files = ["test_*.py"] python_classes = ["Test*"] python_functions = ["test_*"] -addopts = "-v --tb=short" +addopts = "-v --tb=short -m 'not performance'" +markers = [ + "performance: marks tests as performance benchmarks (may be slow and consume credits)" +] filterwarnings = [ "ignore::DeprecationWarning", "ignore::PendingDeprecationWarning" diff --git a/tests/test_chunked.py b/tests/test_chunked.py new file mode 100644 index 0000000..09eafd4 --- /dev/null +++ b/tests/test_chunked.py @@ -0,0 +1,496 @@ +"""Tests for chunked/multipart upload functionality""" + +import io +import pytest +from unittest.mock import Mock, patch, MagicMock +import json + +from turbo_sdk.types import ChunkingParams, TurboUploadStatus, ChunkedUploadInit +from turbo_sdk.chunked import ( + ChunkedUploader, + ChunkedUploadError, + UnderfundedError, + UploadValidationError, + UploadFinalizationError, +) + + +class TestChunkingParams: + """Test ChunkingParams dataclass""" + + def test_default_values(self): + """Test default parameter values""" + params = ChunkingParams() + assert params.chunk_byte_count == 5 * 1024 * 1024 # 5 MiB + assert params.max_chunk_concurrency == 1 + assert params.chunking_mode == "auto" + assert params.max_finalize_ms == 150_000 + + def test_custom_values(self): + """Test custom parameter values""" + params = ChunkingParams( + chunk_byte_count=10 * 1024 * 1024, + max_chunk_concurrency=4, + chunking_mode="force", + max_finalize_ms=300_000, + ) + assert params.chunk_byte_count == 10 * 1024 * 1024 + assert params.max_chunk_concurrency == 4 + assert params.chunking_mode == "force" + assert params.max_finalize_ms == 300_000 + + def test_chunk_size_validation_too_small(self): + """Test validation rejects chunk size below 5 MiB""" + with pytest.raises(ValueError, match="chunk_byte_count must be between"): + ChunkingParams(chunk_byte_count=1024 * 1024) # 1 MiB + + def test_chunk_size_validation_too_large(self): + """Test validation rejects chunk size above 500 MiB""" + with pytest.raises(ValueError, match="chunk_byte_count must be between"): + ChunkingParams(chunk_byte_count=600 * 1024 * 1024) # 600 MiB + + def test_chunk_size_at_boundaries(self): + """Test chunk size at valid boundaries""" + # Minimum boundary + params_min = ChunkingParams(chunk_byte_count=5 * 1024 * 1024) + assert params_min.chunk_byte_count == 5 * 1024 * 1024 + + # Maximum boundary + params_max = ChunkingParams(chunk_byte_count=500 * 1024 * 1024) + assert params_max.chunk_byte_count == 500 * 1024 * 1024 + + def test_concurrency_validation(self): + """Test validation rejects concurrency below 1""" + with pytest.raises(ValueError, match="max_chunk_concurrency must be at least 1"): + ChunkingParams(max_chunk_concurrency=0) + + +class TestChunkedUploader: + """Test ChunkedUploader class""" + + @pytest.fixture + def uploader(self): + """Create a ChunkedUploader instance for testing""" + return ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + chunking_params=ChunkingParams(), + ) + + def test_init(self, uploader): + """Test uploader initialization""" + assert uploader.upload_url == "https://upload.test.io" + assert uploader.token == "arweave" + assert uploader.params.chunk_byte_count == 5 * 1024 * 1024 + + def test_chunking_version_header(self, uploader): + """Test x-chunking-version header is set""" + assert uploader._session.headers["x-chunking-version"] == "2" + + def test_poll_interval_small_file(self, uploader): + """Test poll interval for files < 100 MB""" + interval = uploader._get_poll_interval(50 * 1024 * 1024) # 50 MB + assert interval == 2.0 + + def test_poll_interval_medium_file(self, uploader): + """Test poll interval for files < 3 GiB""" + interval = uploader._get_poll_interval(1024 * 1024 * 1024) # 1 GiB + assert interval == 4.0 + + def test_poll_interval_large_file(self, uploader): + """Test poll interval for files >= 3 GiB""" + interval = uploader._get_poll_interval(5 * 1024 * 1024 * 1024) # 5 GiB + assert interval == 7.5 # 1.5 * 5 + + def test_poll_interval_max_cap(self, uploader): + """Test poll interval caps at 15 seconds""" + interval = uploader._get_poll_interval(20 * 1024 * 1024 * 1024) # 20 GiB + assert interval == 15.0 + + @patch("turbo_sdk.chunked.requests.Session") + def test_initiate_success(self, mock_session_class): + """Test successful upload initiation""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "id": "test-upload-id", + "min": 5242880, + "max": 524288000, + "chunkSize": 5242880, + } + + mock_session = Mock() + mock_session.get.return_value = mock_response + mock_session.headers = {} + mock_session_class.return_value = mock_session + + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + ) + + result = uploader.initiate() + + assert isinstance(result, ChunkedUploadInit) + assert result.id == "test-upload-id" + assert result.min == 5242880 + assert result.max == 524288000 + + @patch("turbo_sdk.chunked.requests.Session") + def test_initiate_service_unavailable(self, mock_session_class): + """Test initiation with 503 error""" + mock_response = Mock() + mock_response.status_code = 503 + mock_response.text = "Service Unavailable" + + mock_session = Mock() + mock_session.get.return_value = mock_response + mock_session.headers = {} + mock_session_class.return_value = mock_session + + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + ) + + with pytest.raises(ChunkedUploadError, match="Service unavailable"): + uploader.initiate() + + @patch("turbo_sdk.chunked.requests.Session") + def test_upload_chunk_success(self, mock_session_class): + """Test successful chunk upload""" + mock_response = Mock() + mock_response.status_code = 200 + + mock_session = Mock() + mock_session.post.return_value = mock_response + mock_session.headers = {} + mock_session_class.return_value = mock_session + + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + ) + + # Should not raise + uploader.upload_chunk("upload-id", 0, b"test data") + + mock_session.post.assert_called_once() + call_args = mock_session.post.call_args + assert "upload-id" in call_args[0][0] + assert "/0" in call_args[0][0] + + @patch("turbo_sdk.chunked.requests.Session") + def test_upload_chunk_underfunded(self, mock_session_class): + """Test chunk upload with 402 error""" + mock_response = Mock() + mock_response.status_code = 402 + + mock_session = Mock() + mock_session.post.return_value = mock_response + mock_session.headers = {} + mock_session_class.return_value = mock_session + + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + ) + + with pytest.raises(UnderfundedError): + uploader.upload_chunk("upload-id", 0, b"test data") + + @patch("turbo_sdk.chunked.requests.Session") + def test_upload_chunk_not_found(self, mock_session_class): + """Test chunk upload with 404 error""" + mock_response = Mock() + mock_response.status_code = 404 + + mock_session = Mock() + mock_session.post.return_value = mock_response + mock_session.headers = {} + mock_session_class.return_value = mock_session + + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + ) + + with pytest.raises(ChunkedUploadError, match="not found or expired"): + uploader.upload_chunk("upload-id", 0, b"test data") + + @patch("turbo_sdk.chunked.requests.Session") + def test_finalize_success(self, mock_session_class): + """Test successful finalization""" + mock_response = Mock() + mock_response.status_code = 202 + + mock_session = Mock() + mock_session.post.return_value = mock_response + mock_session.headers = {} + mock_session_class.return_value = mock_session + + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + ) + + # Should not raise + uploader.finalize("upload-id") + + call_args = mock_session.post.call_args + assert "finalize" in call_args[0][0] + + @patch("turbo_sdk.chunked.requests.Session") + def test_get_status_finalized(self, mock_session_class): + """Test getting finalized status""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "status": "FINALIZED", + "timestamp": 1234567890, + "receipt": { + "id": "tx-id", + "owner": "owner-address", + "dataCaches": ["arweave.net"], + "fastFinalityIndexes": ["arweave.net"], + "winc": "1000", + }, + } + + mock_session = Mock() + mock_session.get.return_value = mock_response + mock_session.headers = {} + mock_session_class.return_value = mock_session + + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + ) + + status = uploader.get_status("upload-id") + + assert isinstance(status, TurboUploadStatus) + assert status.status == "FINALIZED" + assert status.id == "tx-id" + assert status.owner == "owner-address" + assert status.data_caches == ["arweave.net"] + + @patch("turbo_sdk.chunked.requests.Session") + def test_get_status_validating(self, mock_session_class): + """Test getting validating status (no receipt yet)""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "status": "VALIDATING", + "timestamp": 1234567890, + } + + mock_session = Mock() + mock_session.get.return_value = mock_response + mock_session.headers = {} + mock_session_class.return_value = mock_session + + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + ) + + status = uploader.get_status("upload-id") + + assert status.status == "VALIDATING" + assert status.id is None + assert status.owner is None + + @patch("turbo_sdk.chunked.time.sleep") + @patch("turbo_sdk.chunked.requests.Session") + def test_poll_for_finalization_success(self, mock_session_class, mock_sleep): + """Test successful finalization polling""" + # First call returns VALIDATING, second returns FINALIZED + mock_response_validating = Mock() + mock_response_validating.status_code = 200 + mock_response_validating.json.return_value = { + "status": "VALIDATING", + "timestamp": 1234567890, + } + + mock_response_finalized = Mock() + mock_response_finalized.status_code = 200 + mock_response_finalized.json.return_value = { + "status": "FINALIZED", + "timestamp": 1234567891, + "receipt": { + "id": "tx-id", + "owner": "owner", + "dataCaches": [], + "fastFinalityIndexes": [], + "winc": "0", + }, + } + + mock_session = Mock() + mock_session.get.side_effect = [mock_response_validating, mock_response_finalized] + mock_session.headers = {} + mock_session_class.return_value = mock_session + + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + ) + + status = uploader.poll_for_finalization("upload-id", 1024 * 1024) + + assert status.status == "FINALIZED" + assert mock_sleep.called + + @patch("turbo_sdk.chunked.time.sleep") + @patch("turbo_sdk.chunked.requests.Session") + def test_poll_for_finalization_underfunded(self, mock_session_class, mock_sleep): + """Test polling returns UNDERFUNDED status""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "status": "UNDERFUNDED", + "timestamp": 1234567890, + } + + mock_session = Mock() + mock_session.get.return_value = mock_response + mock_session.headers = {} + mock_session_class.return_value = mock_session + + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + ) + + with pytest.raises(UnderfundedError): + uploader.poll_for_finalization("upload-id", 1024 * 1024) + + @patch("turbo_sdk.chunked.time.sleep") + @patch("turbo_sdk.chunked.requests.Session") + def test_poll_for_finalization_invalid(self, mock_session_class, mock_sleep): + """Test polling returns INVALID status""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "status": "INVALID", + "timestamp": 1234567890, + } + + mock_session = Mock() + mock_session.get.return_value = mock_response + mock_session.headers = {} + mock_session_class.return_value = mock_session + + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + ) + + with pytest.raises(UploadValidationError, match="INVALID"): + uploader.poll_for_finalization("upload-id", 1024 * 1024) + + +class TestChunkedUploaderSequentialUpload: + """Test sequential chunk upload""" + + @patch("turbo_sdk.chunked.requests.Session") + def test_upload_chunks_sequential_bytes(self, mock_session_class): + """Test sequential upload with bytes data""" + mock_session = Mock() + mock_session.post.return_value = Mock(status_code=200) + mock_session.headers = {} + mock_session_class.return_value = mock_session + + params = ChunkingParams(chunk_byte_count=5 * 1024 * 1024) + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + chunking_params=params, + ) + + # Create 12 MiB of data (should result in 3 chunks) + data = b"x" * (12 * 1024 * 1024) + progress_calls = [] + + def on_progress(processed, total): + progress_calls.append((processed, total)) + + uploader.upload_chunks_sequential("upload-id", data, len(data), on_progress) + + # Should have 3 chunk uploads + assert mock_session.post.call_count == 3 + + # Progress should be reported after each chunk + assert len(progress_calls) == 3 + assert progress_calls[-1][0] == len(data) + + @patch("turbo_sdk.chunked.requests.Session") + def test_upload_chunks_sequential_stream(self, mock_session_class): + """Test sequential upload with stream data""" + mock_session = Mock() + mock_session.post.return_value = Mock(status_code=200) + mock_session.headers = {} + mock_session_class.return_value = mock_session + + params = ChunkingParams(chunk_byte_count=5 * 1024 * 1024) + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + chunking_params=params, + ) + + # Create stream with 10 MiB + data = b"y" * (10 * 1024 * 1024) + stream = io.BytesIO(data) + + uploader.upload_chunks_sequential("upload-id", stream, len(data), None) + + # Should have 2 chunk uploads + assert mock_session.post.call_count == 2 + + +class TestChunkedUploaderConcurrentUpload: + """Test concurrent chunk upload""" + + @patch("turbo_sdk.chunked.requests.Session") + def test_upload_chunks_concurrent(self, mock_session_class): + """Test concurrent upload""" + mock_session = Mock() + mock_session.post.return_value = Mock(status_code=200) + mock_session.headers = {} + mock_session_class.return_value = mock_session + + params = ChunkingParams( + chunk_byte_count=5 * 1024 * 1024, + max_chunk_concurrency=3, + ) + uploader = ChunkedUploader( + upload_url="https://upload.test.io", + token="arweave", + chunking_params=params, + ) + + # Create 15 MiB of data + data = b"z" * (15 * 1024 * 1024) + + uploader.upload_chunks_concurrent("upload-id", data, len(data), None) + + # Should have 3 chunk uploads + assert mock_session.post.call_count == 3 + + +class TestUnderfundedError: + """Test UnderfundedError exception""" + + def test_default_message(self): + """Test default error message""" + error = UnderfundedError() + assert str(error) == "Insufficient balance for upload" + assert error.status_code == 402 + + def test_custom_message(self): + """Test custom error message""" + error = UnderfundedError("Custom message") + assert str(error) == "Custom message" + assert error.status_code == 402 diff --git a/tests/test_chunked_performance.py b/tests/test_chunked_performance.py new file mode 100644 index 0000000..5b8c572 --- /dev/null +++ b/tests/test_chunked_performance.py @@ -0,0 +1,468 @@ +""" +Performance tests for chunked/multipart uploads vs traditional single-request uploads. + +These tests measure real upload performance against the Turbo service. +They are SKIPPED by default and must be run explicitly. + +Run with: + export TURBO_TEST_WALLET=/path/to/wallet.json + export TURBO_UPLOAD_URL=https://upload.ardrive.dev # optional, defaults to testnet + pytest -m performance -v -s + +Note: These tests upload real data and may consume credits. Use a funded wallet. + +Expected Results: +----------------- +Single-request uploads are often faster for raw throughput because they avoid: +- Multiple HTTP round trips for chunk uploads +- Server-side chunk assembly time +- Finalization polling delay + +However, chunked uploads provide important benefits: +- Progress reporting during upload (essential for UX with large files) +- Memory efficiency (can stream without loading entire file) +- Reliability (individual chunk failures can be retried) +- Foundation for resumable uploads (upload ID persistence) +- Better handling of network interruptions + +The auto-chunking threshold (5 MiB) balances these tradeoffs - small files +use efficient single requests while large files get progress visibility. +""" + +import io +import json +import os +import time +from dataclasses import dataclass +from typing import List, Optional + +import pytest + +from turbo_sdk import Turbo, ArweaveSigner, ChunkingParams + + +# Test wallet path - must be set via environment variable +WALLET_PATH = os.environ.get("TURBO_TEST_WALLET", "") + +# Upload URL - defaults to testnet for safety +UPLOAD_URL = os.environ.get("TURBO_UPLOAD_URL", "https://upload.ardrive.dev") + + +@dataclass +class UploadMetrics: + """Metrics collected during an upload""" + + method: str + data_size: int + total_time: float + throughput_mbps: float + chunk_size: Optional[int] = None + concurrency: Optional[int] = None + tx_id: Optional[str] = None + + +def format_size(size_bytes: int) -> str: + """Format bytes as human-readable string""" + if size_bytes >= 1024 * 1024: + return f"{size_bytes / (1024 * 1024):.1f} MiB" + elif size_bytes >= 1024: + return f"{size_bytes / 1024:.1f} KiB" + return f"{size_bytes} B" + + +def format_throughput(mbps: float) -> str: + """Format throughput as human-readable string""" + if mbps >= 1: + return f"{mbps:.2f} MB/s" + return f"{mbps * 1024:.2f} KB/s" + + +@pytest.fixture(scope="module") +def turbo_client(): + """Create a Turbo client for testing""" + if not WALLET_PATH or not os.path.exists(WALLET_PATH): + pytest.skip(f"TURBO_TEST_WALLET not set or wallet not found at '{WALLET_PATH}'") + + with open(WALLET_PATH) as f: + jwk = json.load(f) + + signer = ArweaveSigner(jwk) + return Turbo(signer, upload_url=UPLOAD_URL) + + +@pytest.fixture(scope="module") +def test_data_small(): + """Generate 1 MiB test data""" + return os.urandom(1 * 1024 * 1024) + + +@pytest.fixture(scope="module") +def test_data_medium(): + """Generate 6 MiB test data (triggers auto-chunking)""" + return os.urandom(6 * 1024 * 1024) + + +@pytest.fixture(scope="module") +def test_data_large(): + """Generate 15 MiB test data""" + return os.urandom(15 * 1024 * 1024) + + +class TestUploadPerformance: + """Performance comparison tests for upload methods""" + + def _upload_single( + self, turbo: Turbo, data: bytes, label: str + ) -> UploadMetrics: + """Perform single-request upload and collect metrics""" + start = time.perf_counter() + + result = turbo.upload( + data, + tags=[{"name": "Test", "value": f"perf-single-{label}"}], + chunking=ChunkingParams(chunking_mode="disabled"), + ) + + elapsed = time.perf_counter() - start + throughput = (len(data) / (1024 * 1024)) / elapsed + + return UploadMetrics( + method="single", + data_size=len(data), + total_time=elapsed, + throughput_mbps=throughput, + tx_id=result.id, + ) + + def _upload_chunked( + self, + turbo: Turbo, + data: bytes, + label: str, + chunk_size: int = 5 * 1024 * 1024, + concurrency: int = 1, + ) -> UploadMetrics: + """Perform chunked upload and collect metrics""" + start = time.perf_counter() + + result = turbo.upload( + data, + tags=[{"name": "Test", "value": f"perf-chunked-{label}"}], + chunking=ChunkingParams( + chunking_mode="force", + chunk_byte_count=chunk_size, + max_chunk_concurrency=concurrency, + ), + ) + + elapsed = time.perf_counter() - start + throughput = (len(data) / (1024 * 1024)) / elapsed + + return UploadMetrics( + method=f"chunked-{concurrency}x", + data_size=len(data), + total_time=elapsed, + throughput_mbps=throughput, + chunk_size=chunk_size, + concurrency=concurrency, + tx_id=result.id, + ) + + def _print_metrics(self, metrics: List[UploadMetrics], title: str): + """Print metrics table""" + print(f"\n{'=' * 70}") + print(f" {title}") + print(f"{'=' * 70}") + print(f"{'Method':<20} {'Size':<12} {'Time':<10} {'Throughput':<15}") + print(f"{'-' * 70}") + + for m in metrics: + print( + f"{m.method:<20} " + f"{format_size(m.data_size):<12} " + f"{m.total_time:.2f}s{'':<5} " + f"{format_throughput(m.throughput_mbps):<15}" + ) + + print(f"{'=' * 70}\n") + + @pytest.mark.performance + def test_small_file_comparison(self, turbo_client, test_data_small): + """Compare upload methods for small files (1 MiB)""" + metrics = [] + + # Single request upload + m1 = self._upload_single(turbo_client, test_data_small, "1mib") + metrics.append(m1) + + # Chunked upload (forced) + m2 = self._upload_chunked(turbo_client, test_data_small, "1mib") + metrics.append(m2) + + self._print_metrics(metrics, "Small File (1 MiB) - Single vs Chunked") + + # For small files, single request should be faster (less overhead) + # But we just verify both complete successfully + assert m1.tx_id is not None + assert m2.tx_id is not None + + @pytest.mark.performance + def test_medium_file_comparison(self, turbo_client, test_data_medium): + """Compare upload methods for medium files (6 MiB)""" + metrics = [] + + # Single request upload + m1 = self._upload_single(turbo_client, test_data_medium, "6mib") + metrics.append(m1) + + # Chunked upload - 1 concurrent + m2 = self._upload_chunked(turbo_client, test_data_medium, "6mib", concurrency=1) + metrics.append(m2) + + # Chunked upload - 2 concurrent + m3 = self._upload_chunked(turbo_client, test_data_medium, "6mib", concurrency=2) + metrics.append(m3) + + self._print_metrics(metrics, "Medium File (6 MiB) - Single vs Chunked") + + assert all(m.tx_id is not None for m in metrics) + + @pytest.mark.performance + def test_large_file_comparison(self, turbo_client, test_data_large): + """Compare upload methods for large files (15 MiB)""" + metrics = [] + + # Single request upload + m1 = self._upload_single(turbo_client, test_data_large, "15mib") + metrics.append(m1) + + # Chunked upload - 1 concurrent + m2 = self._upload_chunked(turbo_client, test_data_large, "15mib", concurrency=1) + metrics.append(m2) + + # Chunked upload - 2 concurrent + m3 = self._upload_chunked(turbo_client, test_data_large, "15mib", concurrency=2) + metrics.append(m3) + + # Chunked upload - 3 concurrent + m4 = self._upload_chunked(turbo_client, test_data_large, "15mib", concurrency=3) + metrics.append(m4) + + self._print_metrics(metrics, "Large File (15 MiB) - Single vs Chunked") + + assert all(m.tx_id is not None for m in metrics) + + @pytest.mark.performance + def test_concurrency_scaling(self, turbo_client, test_data_large): + """Test how throughput scales with concurrency""" + metrics = [] + + for concurrency in [1, 2, 3, 4]: + m = self._upload_chunked( + turbo_client, + test_data_large, + f"15mib-c{concurrency}", + concurrency=concurrency, + ) + metrics.append(m) + + self._print_metrics(metrics, "Concurrency Scaling (15 MiB)") + + # Verify all uploads succeeded + assert all(m.tx_id is not None for m in metrics) + + # Check that higher concurrency generally improves throughput + # (may not always be true due to network conditions) + print("Throughput by concurrency:") + for m in metrics: + print(f" {m.concurrency}x: {format_throughput(m.throughput_mbps)}") + + @pytest.mark.performance + def test_chunk_size_comparison(self, turbo_client, test_data_large): + """Test impact of different chunk sizes""" + metrics = [] + + chunk_sizes = [ + 5 * 1024 * 1024, # 5 MiB (minimum) + 10 * 1024 * 1024, # 10 MiB + 15 * 1024 * 1024, # 15 MiB (single chunk for this data) + ] + + for chunk_size in chunk_sizes: + m = self._upload_chunked( + turbo_client, + test_data_large, + f"15mib-cs{chunk_size // (1024*1024)}", + chunk_size=chunk_size, + concurrency=2, + ) + metrics.append(m) + + self._print_metrics(metrics, "Chunk Size Comparison (15 MiB, 2x concurrency)") + + assert all(m.tx_id is not None for m in metrics) + + @pytest.mark.performance + def test_stream_upload_performance(self, turbo_client, test_data_medium): + """Test upload performance with file-like stream input""" + metrics = [] + + # Bytes upload + start = time.perf_counter() + result1 = turbo_client.upload( + test_data_medium, + tags=[{"name": "Test", "value": "perf-bytes"}], + chunking=ChunkingParams(chunking_mode="force"), + ) + elapsed = time.perf_counter() - start + metrics.append( + UploadMetrics( + method="bytes", + data_size=len(test_data_medium), + total_time=elapsed, + throughput_mbps=(len(test_data_medium) / (1024 * 1024)) / elapsed, + tx_id=result1.id, + ) + ) + + # Stream upload + stream = io.BytesIO(test_data_medium) + start = time.perf_counter() + result2 = turbo_client.upload( + stream, + tags=[{"name": "Test", "value": "perf-stream"}], + chunking=ChunkingParams(chunking_mode="force"), + data_size=len(test_data_medium), + ) + elapsed = time.perf_counter() - start + metrics.append( + UploadMetrics( + method="stream", + data_size=len(test_data_medium), + total_time=elapsed, + throughput_mbps=(len(test_data_medium) / (1024 * 1024)) / elapsed, + tx_id=result2.id, + ) + ) + + self._print_metrics(metrics, "Bytes vs Stream Input (6 MiB)") + + assert all(m.tx_id is not None for m in metrics) + + +class TestProgressCallbackPerformance: + """Test that progress callbacks don't significantly impact performance""" + + @pytest.mark.performance + def test_progress_callback_overhead(self, turbo_client, test_data_medium): + """Measure overhead of progress callbacks""" + results = [] + + # Without callback + start = time.perf_counter() + r1 = turbo_client.upload( + test_data_medium, + tags=[{"name": "Test", "value": "perf-no-callback"}], + chunking=ChunkingParams(chunking_mode="force"), + ) + time_no_callback = time.perf_counter() - start + + # With callback + progress_events = [] + + def on_progress(processed, total): + progress_events.append((time.perf_counter(), processed, total)) + + start = time.perf_counter() + r2 = turbo_client.upload( + test_data_medium, + tags=[{"name": "Test", "value": "perf-with-callback"}], + on_progress=on_progress, + chunking=ChunkingParams(chunking_mode="force"), + ) + time_with_callback = time.perf_counter() - start + + overhead_pct = ((time_with_callback - time_no_callback) / time_no_callback) * 100 + + print(f"\n{'=' * 50}") + print(f" Progress Callback Overhead Test") + print(f"{'=' * 50}") + print(f"Without callback: {time_no_callback:.2f}s") + print(f"With callback: {time_with_callback:.2f}s") + print(f"Overhead: {overhead_pct:+.1f}%") + print(f"Callback events: {len(progress_events)}") + print(f"{'=' * 50}\n") + + assert r1.id is not None + assert r2.id is not None + assert len(progress_events) > 0 + + # Callback overhead should be minimal (< 20%) + # Note: network variance may cause this to fluctuate + assert overhead_pct < 50, f"Callback overhead too high: {overhead_pct}%" + + +def run_benchmark(): + """Run all benchmarks and print summary""" + if not WALLET_PATH or not os.path.exists(WALLET_PATH): + print(f"Error: Wallet not found at '{WALLET_PATH}'") + print("Set TURBO_TEST_WALLET environment variable to your wallet path") + return + + with open(WALLET_PATH) as f: + jwk = json.load(f) + + signer = ArweaveSigner(jwk) + turbo = Turbo(signer, upload_url=UPLOAD_URL) + + print(f"\nWallet: {signer.get_wallet_address()}") + print(f"Upload URL: {UPLOAD_URL}") + print("Running performance benchmarks...\n") + + test = TestUploadPerformance() + + # Run tests with different file sizes + sizes = [ + (1 * 1024 * 1024, "1 MiB"), + (6 * 1024 * 1024, "6 MiB"), + (15 * 1024 * 1024, "15 MiB"), + ] + + all_metrics = [] + + for size, label in sizes: + print(f"Testing {label}...") + data = os.urandom(size) + + # Single upload + m1 = test._upload_single(turbo, data, label.replace(" ", "")) + all_metrics.append(m1) + + # Chunked with different concurrency + for conc in [1, 2, 3]: + m = test._upload_chunked( + turbo, data, f"{label.replace(' ', '')}-c{conc}", concurrency=conc + ) + all_metrics.append(m) + + # Print final summary + print("\n" + "=" * 80) + print(" FINAL SUMMARY") + print("=" * 80) + print(f"{'Method':<25} {'Size':<12} {'Time':<10} {'Throughput':<15} {'TX ID':<20}") + print("-" * 80) + + for m in all_metrics: + print( + f"{m.method:<25} " + f"{format_size(m.data_size):<12} " + f"{m.total_time:.2f}s{'':<5} " + f"{format_throughput(m.throughput_mbps):<15} " + f"{m.tx_id[:16] if m.tx_id else 'N/A'}..." + ) + + print("=" * 80) + + +if __name__ == "__main__": + run_benchmark() diff --git a/turbo_sdk/__init__.py b/turbo_sdk/__init__.py index 6239851..6d45b47 100644 --- a/turbo_sdk/__init__.py +++ b/turbo_sdk/__init__.py @@ -22,15 +22,45 @@ """ from .client import Turbo -from .types import TurboUploadResponse, TurboBalanceResponse +from .types import ( + TurboUploadResponse, + TurboBalanceResponse, + TurboUploadStatus, + ChunkingParams, + ChunkedUploadInit, + ProgressCallback, + ChunkingMode, +) from .signers import EthereumSigner, ArweaveSigner +from .chunked import ( + ChunkedUploader, + ChunkedUploadError, + UnderfundedError, + UploadValidationError, + UploadFinalizationError, +) __version__ = "0.1.0" __all__ = [ + # Client "Turbo", + # Response types "TurboUploadResponse", "TurboBalanceResponse", + "TurboUploadStatus", + # Chunking + "ChunkingParams", + "ChunkedUploadInit", + "ChunkedUploader", + "ProgressCallback", + "ChunkingMode", + # Errors + "ChunkedUploadError", + "UnderfundedError", + "UploadValidationError", + "UploadFinalizationError", + # Signers "EthereumSigner", "ArweaveSigner", ] diff --git a/turbo_sdk/chunked.py b/turbo_sdk/chunked.py new file mode 100644 index 0000000..c6e7baa --- /dev/null +++ b/turbo_sdk/chunked.py @@ -0,0 +1,319 @@ +"""Chunked/multipart upload support for large files""" + +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import BinaryIO, Optional, Union + +import requests + +from .types import ( + ChunkedUploadInit, + TurboUploadStatus, + ChunkingParams, + ProgressCallback, + TurboUploadResponse, +) + + +class ChunkedUploadError(Exception): + """Base exception for chunked upload errors""" + + pass + + +class UnderfundedError(ChunkedUploadError): + """Raised when account has insufficient balance (402)""" + + def __init__(self, message: str = "Insufficient balance for upload"): + self.status_code = 402 + super().__init__(message) + + +class UploadValidationError(ChunkedUploadError): + """Raised when upload validation fails""" + + pass + + +class UploadFinalizationError(ChunkedUploadError): + """Raised when upload finalization fails or times out""" + + pass + + +class ChunkedUploader: + """Manages multipart upload lifecycle for large files""" + + CHUNKING_VERSION = "2" + FINALIZED_STATES = {"FINALIZED"} + ERROR_STATES = {"INVALID", "UNDERFUNDED", "APPROVAL_FAILED"} + + def __init__( + self, + upload_url: str, + token: str, + chunking_params: Optional[ChunkingParams] = None, + ): + self.upload_url = upload_url + self.token = token + self.params = chunking_params or ChunkingParams() + self._session = requests.Session() + self._session.headers.update({"x-chunking-version": self.CHUNKING_VERSION}) + + def _get_poll_interval(self, total_bytes: int) -> float: + """Calculate poll interval based on data size (from TS SDK)""" + mb_100 = 100 * 1024 * 1024 + gb_3 = 3 * 1024 * 1024 * 1024 + + if total_bytes < mb_100: + return 2.0 + elif total_bytes < gb_3: + return 4.0 + else: + # 1.5 seconds per GiB, max 15 seconds + gib = total_bytes / (1024 * 1024 * 1024) + return min(1.5 * gib, 15.0) + + def _get_max_finalize_time(self, total_bytes: int) -> float: + """Calculate max finalization wait time (2.5 min per GiB)""" + gib = max(1, total_bytes / (1024 * 1024 * 1024)) + return gib * 150_000 / 1000 # Convert ms to seconds + + def initiate(self) -> ChunkedUploadInit: + """Initiate a new chunked upload session""" + url = f"{self.upload_url}/chunks/{self.token}/-1/-1" + params = {"chunkSize": self.params.chunk_byte_count} + + response = self._session.get(url, params=params) + + if response.status_code == 200: + data = response.json() + return ChunkedUploadInit( + id=data["id"], + min=data["min"], + max=data["max"], + chunk_size=data.get("chunkSize", self.params.chunk_byte_count), + ) + elif response.status_code == 503: + raise ChunkedUploadError(f"Service unavailable: {response.text}") + else: + raise ChunkedUploadError( + f"Failed to initiate upload: {response.status_code} - {response.text}" + ) + + def upload_chunk(self, upload_id: str, offset: int, data: bytes) -> None: + """Upload a single chunk""" + url = f"{self.upload_url}/chunks/{self.token}/{upload_id}/{offset}" + + response = self._session.post( + url, + data=data, + headers={"Content-Type": "application/octet-stream"}, + ) + + if response.status_code == 200: + return + elif response.status_code == 402: + raise UnderfundedError() + elif response.status_code == 404: + raise ChunkedUploadError("Upload session not found or expired") + else: + raise ChunkedUploadError( + f"Chunk upload failed: {response.status_code} - {response.text}" + ) + + def finalize(self, upload_id: str) -> None: + """Finalize the chunked upload (enqueue for processing)""" + url = f"{self.upload_url}/chunks/{self.token}/{upload_id}/finalize" + + response = self._session.post(url) + + if response.status_code == 202: + return + elif response.status_code == 402: + raise UnderfundedError() + elif response.status_code == 404: + raise ChunkedUploadError("Upload session not found or expired") + else: + raise ChunkedUploadError( + f"Finalize failed: {response.status_code} - {response.text}" + ) + + def get_status(self, upload_id: str) -> TurboUploadStatus: + """Get current upload status""" + url = f"{self.upload_url}/chunks/{self.token}/{upload_id}/status" + + response = self._session.get(url) + + if response.status_code == 200: + data = response.json() + receipt = data.get("receipt", {}) + return TurboUploadStatus( + status=data["status"], + timestamp=data["timestamp"], + id=receipt.get("id"), + owner=receipt.get("owner"), + data_caches=receipt.get("dataCaches", []), + fast_finality_indexes=receipt.get("fastFinalityIndexes", []), + winc=receipt.get("winc"), + ) + elif response.status_code == 404: + raise ChunkedUploadError("Upload session not found") + else: + raise ChunkedUploadError( + f"Status check failed: {response.status_code} - {response.text}" + ) + + def poll_for_finalization( + self, upload_id: str, total_bytes: int + ) -> TurboUploadStatus: + """Poll until upload is finalized or fails""" + poll_interval = self._get_poll_interval(total_bytes) + max_wait = self._get_max_finalize_time(total_bytes) + start_time = time.time() + + while True: + elapsed = time.time() - start_time + if elapsed > max_wait: + raise UploadFinalizationError( + f"Finalization timed out after {elapsed:.1f}s" + ) + + status = self.get_status(upload_id) + + if status.status in self.FINALIZED_STATES: + return status + elif status.status == "UNDERFUNDED": + raise UnderfundedError() + elif status.status in self.ERROR_STATES: + raise UploadValidationError(f"Upload failed with status: {status.status}") + + time.sleep(poll_interval) + + def upload_chunks_sequential( + self, + upload_id: str, + data: Union[bytes, BinaryIO], + total_size: int, + on_progress: Optional[ProgressCallback] = None, + ) -> None: + """Upload chunks sequentially""" + chunk_size = self.params.chunk_byte_count + offset = 0 + processed = 0 + + while offset < total_size: + # Read chunk + if isinstance(data, bytes): + chunk = data[offset : offset + chunk_size] + else: + chunk = data.read(chunk_size) + if not chunk: + break + + # Upload chunk + self.upload_chunk(upload_id, offset, chunk) + + processed += len(chunk) + offset += len(chunk) + + if on_progress: + on_progress(processed, total_size) + + def upload_chunks_concurrent( + self, + upload_id: str, + data: bytes, + total_size: int, + on_progress: Optional[ProgressCallback] = None, + ) -> None: + """Upload chunks concurrently (requires bytes, not stream)""" + chunk_size = self.params.chunk_byte_count + max_workers = self.params.max_chunk_concurrency + + # Build list of chunks + chunks = [] + offset = 0 + while offset < total_size: + chunk_data = data[offset : offset + chunk_size] + chunks.append((offset, chunk_data)) + offset += len(chunk_data) + + processed = 0 + lock = None + if on_progress: + import threading + + lock = threading.Lock() + + def upload_one(args): + nonlocal processed + chunk_offset, chunk_data = args + self.upload_chunk(upload_id, chunk_offset, chunk_data) + if on_progress and lock: + with lock: + nonlocal processed + processed += len(chunk_data) + on_progress(processed, total_size) + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [executor.submit(upload_one, chunk) for chunk in chunks] + for future in as_completed(futures): + future.result() # Raise any exceptions + + def upload( + self, + data: Union[bytes, BinaryIO], + total_size: Optional[int] = None, + on_progress: Optional[ProgressCallback] = None, + ) -> TurboUploadResponse: + """ + Perform complete chunked upload + + Args: + data: Data to upload (bytes or file-like object) + total_size: Total size in bytes (required for BinaryIO) + on_progress: Optional progress callback + + Returns: + TurboUploadResponse with transaction details + """ + # Determine total size + if isinstance(data, bytes): + total_size = len(data) + elif total_size is None: + raise ValueError("total_size required for file-like objects") + + # Initiate upload + init = self.initiate() + + try: + # Upload chunks + if ( + self.params.max_chunk_concurrency > 1 + and isinstance(data, bytes) + ): + self.upload_chunks_concurrent( + init.id, data, total_size, on_progress + ) + else: + self.upload_chunks_sequential( + init.id, data, total_size, on_progress + ) + + # Finalize + self.finalize(init.id) + + # Poll for completion + status = self.poll_for_finalization(init.id, total_size) + + return TurboUploadResponse( + id=status.id or "", + owner=status.owner or "", + data_caches=status.data_caches, + fast_finality_indexes=status.fast_finality_indexes, + winc=status.winc or "0", + ) + except Exception: + # Could implement cleanup here in future + raise diff --git a/turbo_sdk/client.py b/turbo_sdk/client.py index bc3df46..d38b8f9 100644 --- a/turbo_sdk/client.py +++ b/turbo_sdk/client.py @@ -1,7 +1,14 @@ import requests -from typing import List, Dict, Optional -from .types import TurboUploadResponse, TurboBalanceResponse +from typing import BinaryIO, List, Dict, Optional, Union + +from .types import ( + TurboUploadResponse, + TurboBalanceResponse, + ChunkingParams, + ProgressCallback, +) from .bundle import create_data, sign +from .chunked import ChunkedUploader class Turbo: @@ -21,52 +28,119 @@ class Turbo: 3: "ethereum", # Ethereum ECDSA } - def __init__(self, signer, network: str = "mainnet"): + def __init__( + self, + signer, + network: str = "mainnet", + upload_url: Optional[str] = None, + payment_url: Optional[str] = None, + ): """ Initialize Turbo client Args: signer: Signer instance (ArweaveSigner or EthereumSigner) - network: Network ("mainnet" or "testnet") + network: Network ("mainnet" or "testnet") - used for default URLs + upload_url: Optional custom upload service URL (overrides network default) + payment_url: Optional custom payment service URL (overrides network default) """ self.signer = signer self.network = network - self.upload_url = self.SERVICE_URLS[network]["upload"] - self.payment_url = self.SERVICE_URLS[network]["payment"] + self.upload_url = upload_url or self.SERVICE_URLS[network]["upload"] + self.payment_url = payment_url or self.SERVICE_URLS[network]["payment"] # Determine token type from signer using lookup self.token = self.TOKEN_MAP.get(signer.signature_type) if not self.token: raise ValueError(f"Unsupported signer type: {signer.signature_type}") + # Default threshold for auto-chunking (5 MiB) + CHUNKING_THRESHOLD = 5 * 1024 * 1024 + def upload( - self, data: bytes, tags: Optional[List[Dict[str, str]]] = None + self, + data: Union[bytes, BinaryIO], + tags: Optional[List[Dict[str, str]]] = None, + on_progress: Optional[ProgressCallback] = None, + chunking: Optional[ChunkingParams] = None, + data_size: Optional[int] = None, ) -> TurboUploadResponse: """ Upload data with automatic signing Args: - data: Data to upload + data: Data to upload (bytes or file-like object) tags: Optional metadata tags + on_progress: Optional callback for progress reporting (processed_bytes, total_bytes) + chunking: Optional chunking configuration (defaults to auto mode) + data_size: Required when data is a file-like object Returns: TurboUploadResponse with transaction details Raises: Exception: If upload fails + UnderfundedError: If account balance is insufficient """ + # Determine data size + if isinstance(data, bytes): + size = len(data) + elif data_size is not None: + size = data_size + else: + raise ValueError("data_size is required when data is a file-like object") + + # Determine chunking mode + params = chunking or ChunkingParams() + use_chunked = self._should_use_chunked_upload(size, params) + + if use_chunked: + return self._upload_chunked(data, size, tags, on_progress, params) + else: + return self._upload_single(data, size, tags, on_progress) + + def _should_use_chunked_upload(self, size: int, params: ChunkingParams) -> bool: + """Determine if chunked upload should be used""" + if params.chunking_mode == "disabled": + return False + if params.chunking_mode == "force": + return True + # Auto mode: use chunked for files >= threshold + return size >= self.CHUNKING_THRESHOLD + + def _upload_single( + self, + data: Union[bytes, BinaryIO], + size: int, + tags: Optional[List[Dict[str, str]]], + on_progress: Optional[ProgressCallback], + ) -> TurboUploadResponse: + """Upload using single request (for small files)""" + # Read data if it's a stream + if not isinstance(data, bytes): + data = data.read() # Create and sign DataItem data_item = create_data(bytearray(data), self.signer, tags) sign(data_item, self.signer) + # Report signing complete (half the work) + if on_progress: + on_progress(size // 2, size) + # Upload to Turbo endpoint url = f"{self.upload_url}/tx/{self.token}" raw_data = data_item.get_raw() - headers = {"Content-Type": "application/octet-stream", "Content-Length": str(len(raw_data))} + headers = { + "Content-Type": "application/octet-stream", + "Content-Length": str(len(raw_data)), + } response = requests.post(url, data=raw_data, headers=headers) + if on_progress: + on_progress(size, size) + if response.status_code == 200: result = response.json() return TurboUploadResponse( @@ -79,6 +153,41 @@ def upload( else: raise Exception(f"Upload failed: {response.status_code} - {response.text}") + def _upload_chunked( + self, + data: Union[bytes, BinaryIO], + size: int, + tags: Optional[List[Dict[str, str]]], + on_progress: Optional[ProgressCallback], + params: ChunkingParams, + ) -> TurboUploadResponse: + """Upload using chunked/multipart upload (for large files)""" + # Read data if stream (needed for signing) + # TODO: In future, implement true streaming with sign_stream + if not isinstance(data, bytes): + data = data.read() + + # Create and sign DataItem + data_item = create_data(bytearray(data), self.signer, tags) + sign(data_item, self.signer) + + # Get signed data + signed_data = bytes(data_item.get_raw()) + + # Create chunked uploader + uploader = ChunkedUploader( + upload_url=self.upload_url, + token=self.token, + chunking_params=params, + ) + + # Perform chunked upload + return uploader.upload( + data=signed_data, + total_size=len(signed_data), + on_progress=on_progress, + ) + def get_balance(self, address: Optional[str] = None) -> TurboBalanceResponse: """ Get winston credit balance using signed request diff --git a/turbo_sdk/types.py b/turbo_sdk/types.py index fc878fa..a80e098 100644 --- a/turbo_sdk/types.py +++ b/turbo_sdk/types.py @@ -1,5 +1,51 @@ -from dataclasses import dataclass -from typing import List +from dataclasses import dataclass, field +from typing import List, Callable, Literal, Optional + + +# Type aliases +ChunkingMode = Literal["auto", "force", "disabled"] +ProgressCallback = Callable[[int, int], None] # (processed_bytes, total_bytes) + + +@dataclass +class ChunkingParams: + """Configuration for chunked/multipart uploads""" + + chunk_byte_count: int = 5 * 1024 * 1024 # 5 MiB default + max_chunk_concurrency: int = 1 + chunking_mode: ChunkingMode = "auto" + max_finalize_ms: int = 150_000 # 2.5 minutes per GiB + + def __post_init__(self): + min_chunk = 5 * 1024 * 1024 # 5 MiB + max_chunk = 500 * 1024 * 1024 # 500 MiB + if not (min_chunk <= self.chunk_byte_count <= max_chunk): + raise ValueError(f"chunk_byte_count must be between {min_chunk} and {max_chunk} bytes") + if self.max_chunk_concurrency < 1: + raise ValueError("max_chunk_concurrency must be at least 1") + + +@dataclass +class ChunkedUploadInit: + """Response from chunked upload initiation""" + + id: str # Upload session ID + min: int # Minimum chunk size + max: int # Maximum chunk size + chunk_size: int # Requested chunk size + + +@dataclass +class TurboUploadStatus: + """Status of an upload (used for chunked upload polling)""" + + status: str # VALIDATING, ASSEMBLING, FINALIZING, FINALIZED, INVALID, UNDERFUNDED + timestamp: int + id: Optional[str] = None + owner: Optional[str] = None + data_caches: List[str] = field(default_factory=list) + fast_finality_indexes: List[str] = field(default_factory=list) + winc: Optional[str] = None @dataclass From 420c52044f2fc5204dc6ebb4cf15470e6c0d9fd8 Mon Sep 17 00:00:00 2001 From: dtfiedler Date: Wed, 21 Jan 2026 17:18:40 -0600 Subject: [PATCH 02/15] style: format with black --- tests/test_chunked_performance.py | 7 ++----- tests/test_stream_signing.py | 1 - turbo_sdk/chunked.py | 25 ++++++------------------- turbo_sdk/types.py | 1 - 4 files changed, 8 insertions(+), 26 deletions(-) diff --git a/tests/test_chunked_performance.py b/tests/test_chunked_performance.py index 5b8c572..e77d5e8 100644 --- a/tests/test_chunked_performance.py +++ b/tests/test_chunked_performance.py @@ -40,7 +40,6 @@ from turbo_sdk import Turbo, ArweaveSigner, ChunkingParams - # Test wallet path - must be set via environment variable WALLET_PATH = os.environ.get("TURBO_TEST_WALLET", "") @@ -111,9 +110,7 @@ def test_data_large(): class TestUploadPerformance: """Performance comparison tests for upload methods""" - def _upload_single( - self, turbo: Turbo, data: bytes, label: str - ) -> UploadMetrics: + def _upload_single(self, turbo: Turbo, data: bytes, label: str) -> UploadMetrics: """Perform single-request upload and collect metrics""" start = time.perf_counter() @@ -283,7 +280,7 @@ def test_chunk_size_comparison(self, turbo_client, test_data_large): metrics = [] chunk_sizes = [ - 5 * 1024 * 1024, # 5 MiB (minimum) + 5 * 1024 * 1024, # 5 MiB (minimum) 10 * 1024 * 1024, # 10 MiB 15 * 1024 * 1024, # 15 MiB (single chunk for this data) ] diff --git a/tests/test_stream_signing.py b/tests/test_stream_signing.py index f89d99b..939b366 100644 --- a/tests/test_stream_signing.py +++ b/tests/test_stream_signing.py @@ -18,7 +18,6 @@ ) from turbo_sdk.signers import EthereumSigner - # Test private key (not a real key, just for testing) TEST_PRIVATE_KEY = "0x" + "ab" * 32 diff --git a/turbo_sdk/chunked.py b/turbo_sdk/chunked.py index c6e7baa..65bc3f0 100644 --- a/turbo_sdk/chunked.py +++ b/turbo_sdk/chunked.py @@ -135,9 +135,7 @@ def finalize(self, upload_id: str) -> None: elif response.status_code == 404: raise ChunkedUploadError("Upload session not found or expired") else: - raise ChunkedUploadError( - f"Finalize failed: {response.status_code} - {response.text}" - ) + raise ChunkedUploadError(f"Finalize failed: {response.status_code} - {response.text}") def get_status(self, upload_id: str) -> TurboUploadStatus: """Get current upload status""" @@ -164,9 +162,7 @@ def get_status(self, upload_id: str) -> TurboUploadStatus: f"Status check failed: {response.status_code} - {response.text}" ) - def poll_for_finalization( - self, upload_id: str, total_bytes: int - ) -> TurboUploadStatus: + def poll_for_finalization(self, upload_id: str, total_bytes: int) -> TurboUploadStatus: """Poll until upload is finalized or fails""" poll_interval = self._get_poll_interval(total_bytes) max_wait = self._get_max_finalize_time(total_bytes) @@ -175,9 +171,7 @@ def poll_for_finalization( while True: elapsed = time.time() - start_time if elapsed > max_wait: - raise UploadFinalizationError( - f"Finalization timed out after {elapsed:.1f}s" - ) + raise UploadFinalizationError(f"Finalization timed out after {elapsed:.1f}s") status = self.get_status(upload_id) @@ -289,17 +283,10 @@ def upload( try: # Upload chunks - if ( - self.params.max_chunk_concurrency > 1 - and isinstance(data, bytes) - ): - self.upload_chunks_concurrent( - init.id, data, total_size, on_progress - ) + if self.params.max_chunk_concurrency > 1 and isinstance(data, bytes): + self.upload_chunks_concurrent(init.id, data, total_size, on_progress) else: - self.upload_chunks_sequential( - init.id, data, total_size, on_progress - ) + self.upload_chunks_sequential(init.id, data, total_size, on_progress) # Finalize self.finalize(init.id) diff --git a/turbo_sdk/types.py b/turbo_sdk/types.py index a80e098..432a96f 100644 --- a/turbo_sdk/types.py +++ b/turbo_sdk/types.py @@ -1,7 +1,6 @@ from dataclasses import dataclass, field from typing import List, Callable, Literal, Optional - # Type aliases ChunkingMode = Literal["auto", "force", "disabled"] ProgressCallback = Callable[[int, int], None] # (processed_bytes, total_bytes) From ab4df274bbccd1528a637312a32abc2d646e7491 Mon Sep 17 00:00:00 2001 From: dtfiedler Date: Wed, 21 Jan 2026 18:10:33 -0600 Subject: [PATCH 03/15] style: format examples with black --- examples/arweave_upload.py | 1 + examples/ethereum_upload.py | 1 + examples/test_wallet_integration.py | 1 + 3 files changed, 3 insertions(+) diff --git a/examples/arweave_upload.py b/examples/arweave_upload.py index d8d2cce..a104488 100644 --- a/examples/arweave_upload.py +++ b/examples/arweave_upload.py @@ -2,6 +2,7 @@ """ Example: Upload data using Arweave JWK wallet """ + from turbo_sdk import Turbo, ArweaveSigner import json import sys diff --git a/examples/ethereum_upload.py b/examples/ethereum_upload.py index 2ddd051..a3de494 100644 --- a/examples/ethereum_upload.py +++ b/examples/ethereum_upload.py @@ -2,6 +2,7 @@ """ Example: Upload data using Ethereum private key """ + from turbo_sdk import Turbo, EthereumSigner diff --git a/examples/test_wallet_integration.py b/examples/test_wallet_integration.py index 24ce5b0..d7e1159 100644 --- a/examples/test_wallet_integration.py +++ b/examples/test_wallet_integration.py @@ -2,6 +2,7 @@ """ Test integration with real Arweave wallet (without network calls) """ + import json from pathlib import Path from turbo_sdk import Turbo, ArweaveSigner From 1d4dba033f025d34373d890a7bdfe4e22873c3d0 Mon Sep 17 00:00:00 2001 From: dtfiedler Date: Wed, 21 Jan 2026 21:41:56 -0600 Subject: [PATCH 04/15] chore(lint): run flake8 fix --- tests/test_chunked_performance.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/test_chunked_performance.py b/tests/test_chunked_performance.py index e77d5e8..a4e4ec4 100644 --- a/tests/test_chunked_performance.py +++ b/tests/test_chunked_performance.py @@ -353,7 +353,6 @@ class TestProgressCallbackPerformance: @pytest.mark.performance def test_progress_callback_overhead(self, turbo_client, test_data_medium): """Measure overhead of progress callbacks""" - results = [] # Without callback start = time.perf_counter() @@ -382,8 +381,8 @@ def on_progress(processed, total): overhead_pct = ((time_with_callback - time_no_callback) / time_no_callback) * 100 print(f"\n{'=' * 50}") - print(f" Progress Callback Overhead Test") - print(f"{'=' * 50}") + print("Progress Callback Overhead Test") + print("=" * 50) print(f"Without callback: {time_no_callback:.2f}s") print(f"With callback: {time_with_callback:.2f}s") print(f"Overhead: {overhead_pct:+.1f}%") From c3a6e99f8a9bad3963c3804cac5d1868b70fb38d Mon Sep 17 00:00:00 2001 From: dtfiedler Date: Thu, 22 Jan 2026 07:13:52 -0600 Subject: [PATCH 05/15] feat: add true streaming upload support with StreamingDataItem - Add StreamingDataItem class that wraps data streams with DataItem headers - Add create_data_header() for building headers separately from data - Use sign_stream to compute signatures without loading entire file into memory - Update _upload_chunked() to use streaming by default for all inputs - Bytes inputs are wrapped in BytesIO for unified code path This enables uploading large files with constant memory usage regardless of file size. The stream is read twice: once for signing (computing the deep hash), and once for the actual upload. Co-Authored-By: Claude Opus 4.5 --- tests/test_streaming_dataitem.py | 485 +++++++++++++++++++++++++++++++ turbo_sdk/bundle/__init__.py | 5 +- turbo_sdk/bundle/create.py | 120 ++++++++ turbo_sdk/bundle/stream.py | 198 +++++++++++++ turbo_sdk/client.py | 36 ++- 5 files changed, 829 insertions(+), 15 deletions(-) create mode 100644 tests/test_streaming_dataitem.py create mode 100644 turbo_sdk/bundle/stream.py diff --git a/tests/test_streaming_dataitem.py b/tests/test_streaming_dataitem.py new file mode 100644 index 0000000..3017b33 --- /dev/null +++ b/tests/test_streaming_dataitem.py @@ -0,0 +1,485 @@ +""" +Unit tests for StreamingDataItem and create_data_header. + +Tests verify that streaming DataItem construction produces identical binary +output to the in-memory approach, ensuring compatibility with the server. +""" + +import io +import os +import pytest +from turbo_sdk.bundle import ( + create_data, + create_data_header, + sign, + StreamingDataItem, +) +from turbo_sdk.signers import EthereumSigner + +# Test private key (not a real key, just for testing) +TEST_PRIVATE_KEY = "0x" + "ab" * 32 + + +class TestCreateDataHeader: + """Tests for create_data_header function.""" + + @pytest.fixture + def signer(self): + """Create a real Ethereum signer for testing.""" + return EthereumSigner(TEST_PRIVATE_KEY) + + def test_header_matches_dataitem_prefix(self, signer): + """Header bytes should match the prefix of a full DataItem.""" + test_data = b"Hello, Arweave!" + tags = [{"name": "Content-Type", "value": "text/plain"}] + + # Create full DataItem in memory + data_item = create_data(bytearray(test_data), signer, tags) + sign(data_item, signer) + full_raw = bytes(data_item.get_raw()) + + # Create header only + header = create_data_header( + signer=signer, + signature=data_item.raw_signature, + tags=tags, + anchor=data_item.raw_anchor, + ) + + # Header should be a prefix of the full DataItem + assert full_raw.startswith(header) + + # Header + data should equal full DataItem + reconstructed = header + test_data + assert reconstructed == full_raw + + def test_header_with_no_tags(self, signer): + """Should work correctly with no tags.""" + test_data = b"No tags here" + + data_item = create_data(bytearray(test_data), signer, tags=None) + sign(data_item, signer) + full_raw = bytes(data_item.get_raw()) + + header = create_data_header( + signer=signer, + signature=data_item.raw_signature, + tags=None, + anchor=data_item.raw_anchor, + ) + + reconstructed = header + test_data + assert reconstructed == full_raw + + def test_header_with_multiple_tags(self, signer): + """Should work correctly with multiple tags.""" + test_data = b"Multiple tags" + tags = [ + {"name": "Content-Type", "value": "application/octet-stream"}, + {"name": "App-Name", "value": "TestApp"}, + {"name": "Version", "value": "1.0.0"}, + ] + + data_item = create_data(bytearray(test_data), signer, tags=tags) + sign(data_item, signer) + full_raw = bytes(data_item.get_raw()) + + header = create_data_header( + signer=signer, + signature=data_item.raw_signature, + tags=tags, + anchor=data_item.raw_anchor, + ) + + reconstructed = header + test_data + assert reconstructed == full_raw + + def test_signature_length_validation(self, signer): + """Should reject signatures with wrong length.""" + with pytest.raises(ValueError, match="Signature must be"): + create_data_header( + signer=signer, + signature=b"too_short", + tags=None, + anchor=os.urandom(32), + ) + + def test_anchor_length_validation(self, signer): + """Should reject anchors with wrong length.""" + # Ethereum signature is 65 bytes + signature = b"x" * 65 + + with pytest.raises(ValueError, match="Anchor must be exactly 32 bytes"): + create_data_header( + signer=signer, + signature=signature, + tags=None, + anchor=b"short_anchor", + ) + + def test_random_anchor_generated_when_none(self, signer): + """Should generate random anchor when none provided.""" + signature = b"x" * 65 + + header1 = create_data_header( + signer=signer, + signature=signature, + tags=None, + anchor=None, + ) + + header2 = create_data_header( + signer=signer, + signature=signature, + tags=None, + anchor=None, + ) + + # Headers should differ due to different random anchors + assert header1 != header2 + + +class TestStreamingDataItem: + """Tests for StreamingDataItem class.""" + + @pytest.fixture + def signer(self): + """Create a real Ethereum signer for testing.""" + return EthereumSigner(TEST_PRIVATE_KEY) + + def test_streaming_matches_inmemory(self, signer): + """Streaming DataItem should produce identical bytes to in-memory.""" + test_data = b"Hello, streaming world!" * 100 + tags = [{"name": "Content-Type", "value": "text/plain"}] + + # In-memory approach + data_item = create_data(bytearray(test_data), signer, tags) + sign(data_item, signer) + expected = bytes(data_item.get_raw()) + + # Streaming approach - use same anchor for comparison + stream = io.BytesIO(test_data) + streaming = StreamingDataItem( + data_stream=stream, + data_size=len(test_data), + signer=signer, + tags=tags, + ) + # Manually set the anchor to match + streaming._anchor = data_item.raw_anchor + streaming._data_stream.seek(0) + + # Now prepare - this will use our preset anchor + from turbo_sdk.bundle.sign import sign_stream + from turbo_sdk.bundle.tags import encode_tags + + encoded_tags = encode_tags(tags) + signature = sign_stream( + signature_type=signer.signature_type, + raw_owner=signer.public_key, + raw_target=b"", + raw_anchor=streaming._anchor, + raw_tags=encoded_tags, + data_stream=streaming._data_stream, + data_size=len(test_data), + signer=signer, + ) + + streaming._data_stream.seek(0) + streaming._header = create_data_header( + signer=signer, + signature=signature, + tags=tags, + anchor=streaming._anchor, + ) + streaming._prepared = True + + actual = streaming.read(-1) + + assert actual == expected + + def test_prepare_returns_correct_size(self, signer): + """prepare() should return correct total size.""" + test_data = b"x" * 1000 + stream = io.BytesIO(test_data) + + streaming = StreamingDataItem( + data_stream=stream, + data_size=len(test_data), + signer=signer, + tags=[{"name": "Test", "value": "value"}], + ) + + total_size = streaming.prepare() + + # Total should be header + data + assert total_size == streaming.header_size + len(test_data) + assert total_size == streaming.total_size + + def test_read_chunks_correctly(self, signer): + """Reading in chunks should produce same result as reading all.""" + test_data = b"chunked read test data " * 50 + stream = io.BytesIO(test_data) + + streaming = StreamingDataItem( + data_stream=stream, + data_size=len(test_data), + signer=signer, + tags=None, + ) + total_size = streaming.prepare() + + # Read in chunks + chunks = [] + while True: + chunk = streaming.read(100) + if not chunk: + break + chunks.append(chunk) + + chunked_result = b"".join(chunks) + + # Reset and read all at once + streaming.reset() + all_at_once = streaming.read(-1) + + assert chunked_result == all_at_once + assert len(chunked_result) == total_size + + def test_read_empty_returns_empty(self, signer): + """read(0) should return empty bytes.""" + test_data = b"test" + stream = io.BytesIO(test_data) + + streaming = StreamingDataItem( + data_stream=stream, + data_size=len(test_data), + signer=signer, + tags=None, + ) + streaming.prepare() + + result = streaming.read(0) + assert result == b"" + + def test_raises_if_not_prepared(self, signer): + """Should raise error if read() called before prepare().""" + stream = io.BytesIO(b"test") + + streaming = StreamingDataItem( + data_stream=stream, + data_size=4, + signer=signer, + tags=None, + ) + + with pytest.raises(RuntimeError, match="Must call prepare"): + streaming.read(10) + + def test_raises_on_non_seekable_stream(self, signer): + """Should raise error for non-seekable streams.""" + + class NonSeekableStream: + def read(self, size=-1): + return b"data" + + def seekable(self): + return False + + streaming = StreamingDataItem( + data_stream=NonSeekableStream(), + data_size=4, + signer=signer, + tags=None, + ) + + with pytest.raises(RuntimeError, match="seekable stream"): + streaming.prepare() + + def test_reset_allows_rereading(self, signer): + """reset() should allow reading the data again.""" + test_data = b"reset test" + stream = io.BytesIO(test_data) + + streaming = StreamingDataItem( + data_stream=stream, + data_size=len(test_data), + signer=signer, + tags=None, + ) + streaming.prepare() + + first_read = streaming.read(-1) + streaming.reset() + second_read = streaming.read(-1) + + assert first_read == second_read + + def test_progress_callback_during_signing(self, signer): + """Progress callback should be called during prepare().""" + test_data = b"x" * 10000 + stream = io.BytesIO(test_data) + progress_calls = [] + + def on_progress(processed, total): + progress_calls.append((processed, total)) + + streaming = StreamingDataItem( + data_stream=stream, + data_size=len(test_data), + signer=signer, + tags=None, + on_sign_progress=on_progress, + ) + streaming.prepare() + + assert len(progress_calls) > 0 + assert progress_calls[-1][0] == len(test_data) + assert progress_calls[-1][1] == len(test_data) + + def test_large_data_streaming(self, signer): + """Should handle large data without memory issues.""" + # 1 MiB of data + test_data = os.urandom(1024 * 1024) + stream = io.BytesIO(test_data) + + streaming = StreamingDataItem( + data_stream=stream, + data_size=len(test_data), + signer=signer, + tags=[{"name": "Size", "value": "1MB"}], + ) + + total_size = streaming.prepare() + + # Read in chunks and verify total size + total_read = 0 + while True: + chunk = streaming.read(64 * 1024) # 64 KiB chunks + if not chunk: + break + total_read += len(chunk) + + assert total_read == total_size + + def test_header_then_data_boundary(self, signer): + """Reading across header/data boundary should work correctly.""" + test_data = b"boundary test data" + stream = io.BytesIO(test_data) + + streaming = StreamingDataItem( + data_stream=stream, + data_size=len(test_data), + signer=signer, + tags=None, + ) + streaming.prepare() + + # Get header size + header_size = streaming.header_size + + # Read exactly to header boundary + header_part = streaming.read(header_size) + assert len(header_part) == header_size + + # Read the data part + data_part = streaming.read(-1) + assert data_part == test_data + + def test_seekable_returns_false(self, signer): + """StreamingDataItem should not be seekable after prepare().""" + stream = io.BytesIO(b"test") + + streaming = StreamingDataItem( + data_stream=stream, + data_size=4, + signer=signer, + tags=None, + ) + + assert streaming.seekable() is False + + +class TestStreamingDataItemIntegration: + """Integration tests for StreamingDataItem with real file-like objects.""" + + @pytest.fixture + def signer(self): + """Create a real Ethereum signer for testing.""" + return EthereumSigner(TEST_PRIVATE_KEY) + + def test_with_tempfile(self, signer, tmp_path): + """Should work correctly with actual file objects.""" + from turbo_sdk.bundle.sign import sign_stream + from turbo_sdk.bundle.tags import encode_tags + + # Create a temporary file + test_file = tmp_path / "test_data.bin" + test_data = os.urandom(5000) + test_file.write_bytes(test_data) + + # Compare with in-memory result + data_item = create_data(bytearray(test_data), signer, tags=None) + sign(data_item, signer) + + # Get the raw_tags from data_item for consistency + encoded_tags = encode_tags([]) + + with open(test_file, "rb") as f: + streaming = StreamingDataItem( + data_stream=f, + data_size=len(test_data), + signer=signer, + tags=None, + ) + # Use same anchor + streaming._anchor = data_item.raw_anchor + + signature = sign_stream( + signature_type=signer.signature_type, + raw_owner=signer.public_key, + raw_target=b"", + raw_anchor=streaming._anchor, + raw_tags=encoded_tags, + data_stream=f, + data_size=len(test_data), + signer=signer, + ) + + f.seek(0) + streaming._header = create_data_header( + signer=signer, + signature=signature, + tags=None, + anchor=streaming._anchor, + ) + streaming._prepared = True + + streamed_result = streaming.read(-1) + + expected = bytes(data_item.get_raw()) + assert streamed_result == expected + + def test_with_tempfile_using_prepare(self, signer, tmp_path): + """Test using the normal prepare() flow with a temp file.""" + # Create a temporary file + test_file = tmp_path / "test_data.bin" + test_data = os.urandom(10000) + test_file.write_bytes(test_data) + + with open(test_file, "rb") as f: + streaming = StreamingDataItem( + data_stream=f, + data_size=len(test_data), + signer=signer, + tags=[{"name": "App", "value": "Test"}], + ) + + total_size = streaming.prepare() + result = streaming.read(-1) + + # Verify size matches + assert len(result) == total_size + + # Verify it starts with correct signature type (Ethereum = 3) + assert result[0:2] == b"\x03\x00" # Little-endian 3 diff --git a/turbo_sdk/bundle/__init__.py b/turbo_sdk/bundle/__init__.py index 2e87e29..0b4f972 100644 --- a/turbo_sdk/bundle/__init__.py +++ b/turbo_sdk/bundle/__init__.py @@ -1,6 +1,6 @@ from .constants import SIG_CONFIG, MAX_TAG_BYTES, MIN_BINARY_SIZE from .dataitem import DataItem -from .create import create_data +from .create import create_data, create_data_header from .sign import ( sign, deep_hash, @@ -12,6 +12,7 @@ ) from .tags import encode_tags, decode_tags from .utils import set_bytes, byte_array_to_long +from .stream import StreamingDataItem __all__ = [ "SIG_CONFIG", @@ -19,6 +20,7 @@ "MIN_BINARY_SIZE", "DataItem", "create_data", + "create_data_header", "sign", "deep_hash", "get_signature_data", @@ -30,4 +32,5 @@ "decode_tags", "set_bytes", "byte_array_to_long", + "StreamingDataItem", ] diff --git a/turbo_sdk/bundle/create.py b/turbo_sdk/bundle/create.py index a547e94..44b95fd 100644 --- a/turbo_sdk/bundle/create.py +++ b/turbo_sdk/bundle/create.py @@ -133,3 +133,123 @@ def create_data( set_bytes(binary, data, offset) return DataItem(binary) + + +def create_data_header( + signer, + signature: bytes, + tags: Optional[List[Dict[str, str]]] = None, + target: Optional[str] = None, + anchor: Optional[bytes] = None, +) -> bytes: + """ + Create the header portion of a DataItem (everything before the data). + Used for streaming uploads where data is appended separately. + + Args: + signer: The signer object with signature_type, public_key, etc. + signature: The pre-computed signature bytes + tags: Optional list of tags as dictionaries with 'name' and 'value' keys + target: Optional target address (hex string) + anchor: Raw anchor bytes (32 bytes). If None, random anchor is generated. + + Returns: + Header bytes that can be prepended to streamed data + """ + # Get signature configuration + sig_config = SIG_CONFIG[signer.signature_type] + sig_length = sig_config["sigLength"] + pub_length = sig_config["pubLength"] + + # Validate signature length + if len(signature) != sig_length: + raise ValueError(f"Signature must be {sig_length} bytes, got {len(signature)}") + + # Process tags + if tags is None: + tags = [] + encoded_tags = encode_tags(tags) + + # Process target + target_bytes = bytearray(32) + target_present = False + if target: + target_hex = target.replace("0x", "") + target_data = bytes.fromhex(target_hex) + if len(target_data) > 32: + raise ValueError("Target must be 32 bytes or less") + for i, b in enumerate(target_data): + target_bytes[i] = b + target_present = True + + # Process anchor + anchor_bytes = bytearray(32) + if anchor is not None: + if len(anchor) != 32: + raise ValueError("Anchor must be exactly 32 bytes") + for i, b in enumerate(anchor): + anchor_bytes[i] = b + else: + random_anchor = os.urandom(32) + for i, b in enumerate(random_anchor): + anchor_bytes[i] = b + + # Calculate header size (everything except data) + header_size = ( + 2 # signature type + + sig_length # signature + + pub_length # owner/public key + + 1 # target present flag + + (32 if target_present else 0) # target + + 1 # anchor present flag + + 32 # anchor (always present) + + 8 # number of tags + + 8 # tag data length + + len(encoded_tags) # tag data + ) + + # Create binary buffer + binary = bytearray(header_size) + offset = 0 + + # 1. Signature type (2 bytes, little-endian) + struct.pack_into(" int: + """ + Sign the data (streaming) and prepare the header. + + This reads through the entire data stream to compute the signature, + then seeks back to the start for the upload phase. + + Returns: + Total size in bytes (header + data) + + Raises: + RuntimeError: If stream is not seekable + """ + if self._prepared: + return len(self._header) + self._data_size + + # Verify stream is seekable + if not self._data_stream.seekable(): + raise RuntimeError( + "StreamingDataItem requires a seekable stream. " + "For non-seekable streams, load data into memory first." + ) + + # Generate random anchor + self._anchor = os.urandom(32) + + # Encode tags for signing + encoded_tags = encode_tags(self._tags) + + # Compute signature by streaming through data + signature = sign_stream( + signature_type=self._signer.signature_type, + raw_owner=self._signer.public_key, + raw_target=b"", + raw_anchor=self._anchor, + raw_tags=encoded_tags, + data_stream=self._data_stream, + data_size=self._data_size, + signer=self._signer, + on_progress=self._on_sign_progress, + ) + + # Seek back to start for upload + self._data_stream.seek(0) + + # Build header with the computed signature + self._header = create_data_header( + signer=self._signer, + signature=signature, + tags=self._tags, + anchor=self._anchor, + ) + + self._prepared = True + return len(self._header) + self._data_size + + @property + def total_size(self) -> int: + """Total size in bytes (header + data). Must call prepare() first.""" + if not self._prepared: + raise RuntimeError("Must call prepare() first") + return len(self._header) + self._data_size + + @property + def header_size(self) -> int: + """Size of the header in bytes. Must call prepare() first.""" + if not self._prepared: + raise RuntimeError("Must call prepare() first") + return len(self._header) + + def read(self, size: int = -1) -> bytes: + """ + Read bytes from the streaming DataItem. + + Reads header bytes first, then data bytes. This method is compatible + with the BinaryIO interface expected by ChunkedUploader. + + Args: + size: Number of bytes to read. -1 means read all remaining. + + Returns: + Bytes read (may be less than size if at end) + + Raises: + RuntimeError: If prepare() has not been called + """ + if not self._prepared: + raise RuntimeError("Must call prepare() first") + + if size == 0: + return b"" + + result = bytearray() + + # Determine how many bytes to read + if size < 0: + # Read everything remaining + remaining = float("inf") + else: + remaining = size + + # Read from header first + if self._header_offset < len(self._header): + header_remaining = len(self._header) - self._header_offset + to_read = min(header_remaining, remaining) + header_chunk = self._header[ + self._header_offset : self._header_offset + int(to_read) + ] + result.extend(header_chunk) + self._header_offset += len(header_chunk) + remaining -= len(header_chunk) + + # Then read from data stream + if remaining > 0: + if remaining == float("inf"): + data_chunk = self._data_stream.read() + else: + data_chunk = self._data_stream.read(int(remaining)) + if data_chunk: + result.extend(data_chunk) + + return bytes(result) + + def seekable(self) -> bool: + """Return False - StreamingDataItem is forward-only after prepare().""" + return False + + def reset(self) -> None: + """ + Reset the streaming position to allow re-reading. + + This seeks the underlying data stream back to the start and + resets the header offset. + """ + if not self._prepared: + raise RuntimeError("Must call prepare() first") + self._header_offset = 0 + self._data_stream.seek(0) diff --git a/turbo_sdk/client.py b/turbo_sdk/client.py index d38b8f9..6f9f207 100644 --- a/turbo_sdk/client.py +++ b/turbo_sdk/client.py @@ -1,3 +1,5 @@ +import io + import requests from typing import BinaryIO, List, Dict, Optional, Union @@ -7,7 +9,7 @@ ChunkingParams, ProgressCallback, ) -from .bundle import create_data, sign +from .bundle import create_data, sign, StreamingDataItem from .chunked import ChunkedUploader @@ -162,17 +164,23 @@ def _upload_chunked( params: ChunkingParams, ) -> TurboUploadResponse: """Upload using chunked/multipart upload (for large files)""" - # Read data if stream (needed for signing) - # TODO: In future, implement true streaming with sign_stream - if not isinstance(data, bytes): - data = data.read() - - # Create and sign DataItem - data_item = create_data(bytearray(data), self.signer, tags) - sign(data_item, self.signer) + # Wrap bytes in BytesIO for unified streaming path + if isinstance(data, bytes): + data_stream = io.BytesIO(data) + else: + data_stream = data + + # Use StreamingDataItem for all chunked uploads + # This signs data by streaming through it, avoiding memory duplication + streaming_item = StreamingDataItem( + data_stream=data_stream, + data_size=size, + signer=self.signer, + tags=tags, + ) - # Get signed data - signed_data = bytes(data_item.get_raw()) + # Prepare signs the data (streaming) and builds the header + total_size = streaming_item.prepare() # Create chunked uploader uploader = ChunkedUploader( @@ -181,10 +189,10 @@ def _upload_chunked( chunking_params=params, ) - # Perform chunked upload + # Upload using the streaming item as a file-like object return uploader.upload( - data=signed_data, - total_size=len(signed_data), + data=streaming_item, + total_size=total_size, on_progress=on_progress, ) From f6be56e74612667d85ee85c273ab9e96b3f2b59c Mon Sep 17 00:00:00 2001 From: dtfiedler Date: Thu, 22 Jan 2026 15:19:00 -0600 Subject: [PATCH 06/15] refactor: use stream_factory pattern instead of seekable streams Replace data_stream parameter with stream_factory callable throughout the streaming upload code. This allows: - Non-seekable streams (generators, network streams, etc.) - Clean separation between signing pass and upload pass - Easy retries by creating a new stream from the factory Changes: - StreamingDataItem now takes stream_factory: Callable[[], BinaryIO] - Turbo.upload() accepts optional stream_factory parameter - Internal _upload_single and _upload_chunked use stream_factory - Added comprehensive tests for bytes, streams, and stream_factory inputs Co-Authored-By: Claude Opus 4.5 --- tests/test_streaming_dataitem.py | 167 +++++++++++++++------------- tests/test_turbo.py | 180 +++++++++++++++++++++++++++++++ turbo_sdk/bundle/__init__.py | 3 +- turbo_sdk/bundle/stream.py | 75 ++++++++----- turbo_sdk/client.py | 65 ++++++----- 5 files changed, 361 insertions(+), 129 deletions(-) diff --git a/tests/test_streaming_dataitem.py b/tests/test_streaming_dataitem.py index 3017b33..982c610 100644 --- a/tests/test_streaming_dataitem.py +++ b/tests/test_streaming_dataitem.py @@ -158,34 +158,33 @@ def test_streaming_matches_inmemory(self, signer): expected = bytes(data_item.get_raw()) # Streaming approach - use same anchor for comparison - stream = io.BytesIO(test_data) streaming = StreamingDataItem( - data_stream=stream, + stream_factory=lambda: io.BytesIO(test_data), data_size=len(test_data), signer=signer, tags=tags, ) # Manually set the anchor to match streaming._anchor = data_item.raw_anchor - streaming._data_stream.seek(0) # Now prepare - this will use our preset anchor from turbo_sdk.bundle.sign import sign_stream from turbo_sdk.bundle.tags import encode_tags encoded_tags = encode_tags(tags) + sign_stream_obj = streaming._stream_factory() signature = sign_stream( signature_type=signer.signature_type, raw_owner=signer.public_key, raw_target=b"", raw_anchor=streaming._anchor, raw_tags=encoded_tags, - data_stream=streaming._data_stream, + data_stream=sign_stream_obj, data_size=len(test_data), signer=signer, ) - streaming._data_stream.seek(0) + streaming._data_stream = streaming._stream_factory() streaming._header = create_data_header( signer=signer, signature=signature, @@ -201,10 +200,9 @@ def test_streaming_matches_inmemory(self, signer): def test_prepare_returns_correct_size(self, signer): """prepare() should return correct total size.""" test_data = b"x" * 1000 - stream = io.BytesIO(test_data) streaming = StreamingDataItem( - data_stream=stream, + stream_factory=lambda: io.BytesIO(test_data), data_size=len(test_data), signer=signer, tags=[{"name": "Test", "value": "value"}], @@ -219,10 +217,9 @@ def test_prepare_returns_correct_size(self, signer): def test_read_chunks_correctly(self, signer): """Reading in chunks should produce same result as reading all.""" test_data = b"chunked read test data " * 50 - stream = io.BytesIO(test_data) streaming = StreamingDataItem( - data_stream=stream, + stream_factory=lambda: io.BytesIO(test_data), data_size=len(test_data), signer=signer, tags=None, @@ -249,10 +246,9 @@ def test_read_chunks_correctly(self, signer): def test_read_empty_returns_empty(self, signer): """read(0) should return empty bytes.""" test_data = b"test" - stream = io.BytesIO(test_data) streaming = StreamingDataItem( - data_stream=stream, + stream_factory=lambda: io.BytesIO(test_data), data_size=len(test_data), signer=signer, tags=None, @@ -264,10 +260,8 @@ def test_read_empty_returns_empty(self, signer): def test_raises_if_not_prepared(self, signer): """Should raise error if read() called before prepare().""" - stream = io.BytesIO(b"test") - streaming = StreamingDataItem( - data_stream=stream, + stream_factory=lambda: io.BytesIO(b"test"), data_size=4, signer=signer, tags=None, @@ -276,33 +270,52 @@ def test_raises_if_not_prepared(self, signer): with pytest.raises(RuntimeError, match="Must call prepare"): streaming.read(10) - def test_raises_on_non_seekable_stream(self, signer): - """Should raise error for non-seekable streams.""" + def test_works_with_non_seekable_stream(self, signer): + """Should work with non-seekable streams via stream_factory.""" + test_data = b"non-seekable data" + call_count = [0] class NonSeekableStream: + def __init__(self, data): + self._data = data + self._pos = 0 + def read(self, size=-1): - return b"data" + if size < 0: + result = self._data[self._pos :] + self._pos = len(self._data) + else: + result = self._data[self._pos : self._pos + size] + self._pos += len(result) + return result - def seekable(self): - return False + def close(self): + pass + + def factory(): + call_count[0] += 1 + return NonSeekableStream(test_data) streaming = StreamingDataItem( - data_stream=NonSeekableStream(), - data_size=4, + stream_factory=factory, + data_size=len(test_data), signer=signer, tags=None, ) - with pytest.raises(RuntimeError, match="seekable stream"): - streaming.prepare() + total_size = streaming.prepare() + result = streaming.read(-1) + + # Factory should have been called twice: once for signing, once for upload + assert call_count[0] == 2 + assert len(result) == total_size def test_reset_allows_rereading(self, signer): """reset() should allow reading the data again.""" test_data = b"reset test" - stream = io.BytesIO(test_data) streaming = StreamingDataItem( - data_stream=stream, + stream_factory=lambda: io.BytesIO(test_data), data_size=len(test_data), signer=signer, tags=None, @@ -318,14 +331,13 @@ def test_reset_allows_rereading(self, signer): def test_progress_callback_during_signing(self, signer): """Progress callback should be called during prepare().""" test_data = b"x" * 10000 - stream = io.BytesIO(test_data) progress_calls = [] def on_progress(processed, total): progress_calls.append((processed, total)) streaming = StreamingDataItem( - data_stream=stream, + stream_factory=lambda: io.BytesIO(test_data), data_size=len(test_data), signer=signer, tags=None, @@ -341,10 +353,9 @@ def test_large_data_streaming(self, signer): """Should handle large data without memory issues.""" # 1 MiB of data test_data = os.urandom(1024 * 1024) - stream = io.BytesIO(test_data) streaming = StreamingDataItem( - data_stream=stream, + stream_factory=lambda: io.BytesIO(test_data), data_size=len(test_data), signer=signer, tags=[{"name": "Size", "value": "1MB"}], @@ -365,10 +376,9 @@ def test_large_data_streaming(self, signer): def test_header_then_data_boundary(self, signer): """Reading across header/data boundary should work correctly.""" test_data = b"boundary test data" - stream = io.BytesIO(test_data) streaming = StreamingDataItem( - data_stream=stream, + stream_factory=lambda: io.BytesIO(test_data), data_size=len(test_data), signer=signer, tags=None, @@ -388,10 +398,8 @@ def test_header_then_data_boundary(self, signer): def test_seekable_returns_false(self, signer): """StreamingDataItem should not be seekable after prepare().""" - stream = io.BytesIO(b"test") - streaming = StreamingDataItem( - data_stream=stream, + stream_factory=lambda: io.BytesIO(b"test"), data_size=4, signer=signer, tags=None, @@ -425,37 +433,42 @@ def test_with_tempfile(self, signer, tmp_path): # Get the raw_tags from data_item for consistency encoded_tags = encode_tags([]) - with open(test_file, "rb") as f: - streaming = StreamingDataItem( - data_stream=f, - data_size=len(test_data), - signer=signer, - tags=None, - ) - # Use same anchor - streaming._anchor = data_item.raw_anchor - - signature = sign_stream( - signature_type=signer.signature_type, - raw_owner=signer.public_key, - raw_target=b"", - raw_anchor=streaming._anchor, - raw_tags=encoded_tags, - data_stream=f, - data_size=len(test_data), - signer=signer, - ) + def open_file(): + return open(test_file, "rb") - f.seek(0) - streaming._header = create_data_header( - signer=signer, - signature=signature, - tags=None, - anchor=streaming._anchor, - ) - streaming._prepared = True + streaming = StreamingDataItem( + stream_factory=open_file, + data_size=len(test_data), + signer=signer, + tags=None, + ) + # Use same anchor + streaming._anchor = data_item.raw_anchor - streamed_result = streaming.read(-1) + sign_stream_obj = streaming._stream_factory() + signature = sign_stream( + signature_type=signer.signature_type, + raw_owner=signer.public_key, + raw_target=b"", + raw_anchor=streaming._anchor, + raw_tags=encoded_tags, + data_stream=sign_stream_obj, + data_size=len(test_data), + signer=signer, + ) + sign_stream_obj.close() + + streaming._data_stream = streaming._stream_factory() + streaming._header = create_data_header( + signer=signer, + signature=signature, + tags=None, + anchor=streaming._anchor, + ) + streaming._prepared = True + + streamed_result = streaming.read(-1) + streaming._data_stream.close() expected = bytes(data_item.get_raw()) assert streamed_result == expected @@ -467,19 +480,21 @@ def test_with_tempfile_using_prepare(self, signer, tmp_path): test_data = os.urandom(10000) test_file.write_bytes(test_data) - with open(test_file, "rb") as f: - streaming = StreamingDataItem( - data_stream=f, - data_size=len(test_data), - signer=signer, - tags=[{"name": "App", "value": "Test"}], - ) + def open_file(): + return open(test_file, "rb") - total_size = streaming.prepare() - result = streaming.read(-1) + streaming = StreamingDataItem( + stream_factory=open_file, + data_size=len(test_data), + signer=signer, + tags=[{"name": "App", "value": "Test"}], + ) + + total_size = streaming.prepare() + result = streaming.read(-1) - # Verify size matches - assert len(result) == total_size + # Verify size matches + assert len(result) == total_size - # Verify it starts with correct signature type (Ethereum = 3) - assert result[0:2] == b"\x03\x00" # Little-endian 3 + # Verify it starts with correct signature type (Ethereum = 3) + assert result[0:2] == b"\x03\x00" # Little-endian 3 diff --git a/tests/test_turbo.py b/tests/test_turbo.py index 5f9727d..7af315a 100644 --- a/tests/test_turbo.py +++ b/tests/test_turbo.py @@ -1,3 +1,5 @@ +import io +import os import pytest from unittest.mock import Mock, patch import requests @@ -255,3 +257,181 @@ def test_get_balance_other_errors_raise(self, mock_get, ethereum_signer): # Should raise the error with pytest.raises(requests.HTTPError): turbo.get_balance() + + +class TestTurboUpload: + """Test Turbo upload method with different input types""" + + # Test private key (not a real key, just for testing) + TEST_PRIVATE_KEY = "0x" + "ab" * 32 + + @pytest.fixture + def signer(self): + """Create a real Ethereum signer for testing.""" + return EthereumSigner(self.TEST_PRIVATE_KEY) + + @pytest.fixture + def turbo(self, signer): + """Create a Turbo client for testing.""" + return Turbo(signer, network="testnet") + + @patch("turbo_sdk.client.requests.post") + def test_upload_with_bytes(self, mock_post, turbo): + """Test upload with raw bytes data""" + test_data = b"Hello, Turbo!" * 10 + + # Mock successful response + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "id": "test_tx_id", + "owner": "test_owner", + "dataCaches": ["cache1"], + "fastFinalityIndexes": ["index1"], + "winc": "1000", + } + mock_post.return_value = mock_response + + result = turbo.upload(data=test_data, tags=[{"name": "Test", "value": "bytes"}]) + + assert result.id == "test_tx_id" + assert result.owner == "test_owner" + mock_post.assert_called_once() + + @patch("turbo_sdk.client.requests.post") + def test_upload_with_stream(self, mock_post, turbo): + """Test upload with file-like stream object""" + test_data = b"Hello, streaming Turbo!" * 10 + stream = io.BytesIO(test_data) + + # Mock successful response + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "id": "test_tx_id_stream", + "owner": "test_owner", + "dataCaches": [], + "fastFinalityIndexes": [], + "winc": "500", + } + mock_post.return_value = mock_response + + result = turbo.upload( + data=stream, + data_size=len(test_data), + tags=[{"name": "Test", "value": "stream"}], + ) + + assert result.id == "test_tx_id_stream" + mock_post.assert_called_once() + + @patch("turbo_sdk.client.requests.post") + def test_upload_with_stream_factory(self, mock_post, turbo): + """Test upload with stream_factory callable""" + test_data = b"Hello, factory Turbo!" * 10 + call_count = [0] + + def stream_factory(): + call_count[0] += 1 + return io.BytesIO(test_data) + + # Mock successful response + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "id": "test_tx_id_factory", + "owner": "test_owner", + "dataCaches": [], + "fastFinalityIndexes": [], + "winc": "750", + } + mock_post.return_value = mock_response + + result = turbo.upload( + stream_factory=stream_factory, + data_size=len(test_data), + tags=[{"name": "Test", "value": "factory"}], + ) + + assert result.id == "test_tx_id_factory" + # Factory should have been called at least once + assert call_count[0] >= 1 + mock_post.assert_called_once() + + @patch("turbo_sdk.client.requests.post") + def test_upload_with_file(self, mock_post, turbo, tmp_path): + """Test upload with actual file from disk""" + # Create a temporary file + test_file = tmp_path / "test_upload.bin" + test_data = os.urandom(1000) + test_file.write_bytes(test_data) + + # Mock successful response + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "id": "test_tx_id_file", + "owner": "test_owner", + "dataCaches": [], + "fastFinalityIndexes": [], + "winc": "250", + } + mock_post.return_value = mock_response + + # Use stream_factory to open the file fresh each time + def open_file(): + return open(test_file, "rb") + + result = turbo.upload( + stream_factory=open_file, + data_size=len(test_data), + tags=[{"name": "Test", "value": "file"}], + ) + + assert result.id == "test_tx_id_file" + mock_post.assert_called_once() + + def test_upload_requires_data_or_stream_factory(self, turbo): + """Test that upload raises error if neither data nor stream_factory provided""" + with pytest.raises(ValueError, match="Must specify either data or stream_factory"): + turbo.upload(tags=[{"name": "Test", "value": "nothing"}]) + + def test_upload_rejects_both_data_and_stream_factory(self, turbo): + """Test that upload raises error if both data and stream_factory provided""" + with pytest.raises(ValueError, match="Cannot specify both data and stream_factory"): + turbo.upload( + data=b"test", + stream_factory=lambda: io.BytesIO(b"test"), + ) + + def test_upload_requires_data_size_for_stream(self, turbo): + """Test that upload raises error if stream provided without data_size""" + stream = io.BytesIO(b"test data") + + with pytest.raises(ValueError, match="data_size is required"): + turbo.upload(data=stream) + + def test_upload_requires_data_size_for_stream_factory(self, turbo): + """Test that upload raises error if stream_factory provided without data_size""" + with pytest.raises(ValueError, match="data_size is required"): + turbo.upload(stream_factory=lambda: io.BytesIO(b"test")) + + @patch("turbo_sdk.client.requests.post") + def test_upload_bytes_auto_detects_size(self, mock_post, turbo): + """Test that upload automatically detects size for bytes input""" + test_data = b"auto size detection" + + # Mock successful response + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "id": "test_id", + "owner": "owner", + "dataCaches": [], + "fastFinalityIndexes": [], + } + mock_post.return_value = mock_response + + # Should work without specifying data_size + result = turbo.upload(data=test_data) + assert result.id == "test_id" diff --git a/turbo_sdk/bundle/__init__.py b/turbo_sdk/bundle/__init__.py index 0b4f972..2c76ebb 100644 --- a/turbo_sdk/bundle/__init__.py +++ b/turbo_sdk/bundle/__init__.py @@ -12,7 +12,7 @@ ) from .tags import encode_tags, decode_tags from .utils import set_bytes, byte_array_to_long -from .stream import StreamingDataItem +from .stream import StreamingDataItem, StreamFactory __all__ = [ "SIG_CONFIG", @@ -33,4 +33,5 @@ "set_bytes", "byte_array_to_long", "StreamingDataItem", + "StreamFactory", ] diff --git a/turbo_sdk/bundle/stream.py b/turbo_sdk/bundle/stream.py index 57aca8e..ef3dbfc 100644 --- a/turbo_sdk/bundle/stream.py +++ b/turbo_sdk/bundle/stream.py @@ -7,6 +7,9 @@ from .sign import sign_stream from .tags import encode_tags +# Type alias for stream factory +StreamFactory = Callable[[], BinaryIO] + class StreamingDataItem: """ @@ -18,20 +21,29 @@ class StreamingDataItem: 2. Builds the DataItem header with the computed signature 3. Provides a read() interface that returns header bytes first, then data - The underlying stream must be seekable (supports seek(0)) because: - - First pass: read through data to compute signature hash - - Second pass: read through data for upload + Uses a stream_factory pattern: a callable that returns a fresh stream + each time it's called. This allows: + - Non-seekable streams (generators, network streams, etc.) + - Clean separation between signing pass and upload pass + - Easy retries by creating a new stream Usage: - with open("large_file.bin", "rb") as f: - streaming = StreamingDataItem(f, file_size, signer, tags) - total_size = streaming.prepare() - # Now use streaming.read() to get chunks for upload + def open_file(): + return open("large_file.bin", "rb") + + streaming = StreamingDataItem( + stream_factory=open_file, + data_size=file_size, + signer=signer, + tags=tags, + ) + total_size = streaming.prepare() + # Now use streaming.read() to get chunks for upload """ def __init__( self, - data_stream: BinaryIO, + stream_factory: StreamFactory, data_size: int, signer, tags: Optional[List[Dict[str, str]]] = None, @@ -41,13 +53,13 @@ def __init__( Initialize a StreamingDataItem. Args: - data_stream: A seekable file-like object containing the data + stream_factory: A callable that returns a fresh BinaryIO stream data_size: Total size of the data in bytes signer: The signer object with signature_type, public_key, sign() tags: Optional list of tags as dictionaries with 'name' and 'value' on_sign_progress: Optional callback(processed, total) during signing """ - self._data_stream = data_stream + self._stream_factory = stream_factory self._data_size = data_size self._signer = signer self._tags = tags or [] @@ -57,36 +69,30 @@ def __init__( self._header_offset = 0 self._prepared = False self._anchor: Optional[bytes] = None + self._data_stream: Optional[BinaryIO] = None def prepare(self) -> int: """ Sign the data (streaming) and prepare the header. - This reads through the entire data stream to compute the signature, - then seeks back to the start for the upload phase. + This creates a fresh stream from the factory to compute the signature, + then creates another fresh stream for the upload phase. Returns: Total size in bytes (header + data) - - Raises: - RuntimeError: If stream is not seekable """ if self._prepared: return len(self._header) + self._data_size - # Verify stream is seekable - if not self._data_stream.seekable(): - raise RuntimeError( - "StreamingDataItem requires a seekable stream. " - "For non-seekable streams, load data into memory first." - ) - # Generate random anchor self._anchor = os.urandom(32) # Encode tags for signing encoded_tags = encode_tags(self._tags) + # Create a stream for signing + sign_stream_obj = self._stream_factory() + # Compute signature by streaming through data signature = sign_stream( signature_type=self._signer.signature_type, @@ -94,14 +100,21 @@ def prepare(self) -> int: raw_target=b"", raw_anchor=self._anchor, raw_tags=encoded_tags, - data_stream=self._data_stream, + data_stream=sign_stream_obj, data_size=self._data_size, signer=self._signer, on_progress=self._on_sign_progress, ) - # Seek back to start for upload - self._data_stream.seek(0) + # Close the signing stream if it has a close method + if hasattr(sign_stream_obj, "close"): + try: + sign_stream_obj.close() + except Exception: + pass # Ignore close errors + + # Create a fresh stream for upload + self._data_stream = self._stream_factory() # Build header with the computed signature self._header = create_data_header( @@ -189,10 +202,16 @@ def reset(self) -> None: """ Reset the streaming position to allow re-reading. - This seeks the underlying data stream back to the start and - resets the header offset. + Creates a fresh data stream from the factory and resets the header offset. """ if not self._prepared: raise RuntimeError("Must call prepare() first") self._header_offset = 0 - self._data_stream.seek(0) + # Close existing stream if it has a close method + if self._data_stream is not None and hasattr(self._data_stream, "close"): + try: + self._data_stream.close() + except Exception: + pass # Ignore close errors + # Create fresh stream from factory + self._data_stream = self._stream_factory() diff --git a/turbo_sdk/client.py b/turbo_sdk/client.py index 6f9f207..99b2a8c 100644 --- a/turbo_sdk/client.py +++ b/turbo_sdk/client.py @@ -1,7 +1,7 @@ import io +from typing import BinaryIO, List, Dict, Optional, Union import requests -from typing import BinaryIO, List, Dict, Optional, Union from .types import ( TurboUploadResponse, @@ -9,7 +9,7 @@ ChunkingParams, ProgressCallback, ) -from .bundle import create_data, sign, StreamingDataItem +from .bundle import create_data, sign, StreamingDataItem, StreamFactory from .chunked import ChunkedUploader @@ -61,21 +61,26 @@ def __init__( def upload( self, - data: Union[bytes, BinaryIO], + data: Optional[Union[bytes, BinaryIO]] = None, tags: Optional[List[Dict[str, str]]] = None, on_progress: Optional[ProgressCallback] = None, chunking: Optional[ChunkingParams] = None, data_size: Optional[int] = None, + stream_factory: Optional[StreamFactory] = None, ) -> TurboUploadResponse: """ Upload data with automatic signing Args: - data: Data to upload (bytes or file-like object) + data: Data to upload (bytes or file-like object). Mutually exclusive + with stream_factory. tags: Optional metadata tags on_progress: Optional callback for progress reporting (processed_bytes, total_bytes) chunking: Optional chunking configuration (defaults to auto mode) - data_size: Required when data is a file-like object + data_size: Required when using stream_factory or file-like object + stream_factory: A callable that returns a fresh BinaryIO stream each time. + Useful for non-seekable streams or when you want control + over stream lifecycle. Mutually exclusive with data. Returns: TurboUploadResponse with transaction details @@ -84,22 +89,38 @@ def upload( Exception: If upload fails UnderfundedError: If account balance is insufficient """ - # Determine data size - if isinstance(data, bytes): - size = len(data) - elif data_size is not None: + # Validate inputs + if data is not None and stream_factory is not None: + raise ValueError("Cannot specify both data and stream_factory") + if data is None and stream_factory is None: + raise ValueError("Must specify either data or stream_factory") + + # Determine data size and create stream factory + if stream_factory is not None: + if data_size is None: + raise ValueError("data_size is required when using stream_factory") size = data_size + factory = stream_factory + elif isinstance(data, bytes): + size = len(data) + factory = lambda: io.BytesIO(data) else: - raise ValueError("data_size is required when data is a file-like object") + # data is a BinaryIO + if data_size is None: + raise ValueError("data_size is required when data is a file-like object") + size = data_size + # Capture data in closure to avoid late binding issues + stream = data + factory = lambda: (stream.seek(0), stream)[1] # Determine chunking mode params = chunking or ChunkingParams() use_chunked = self._should_use_chunked_upload(size, params) if use_chunked: - return self._upload_chunked(data, size, tags, on_progress, params) + return self._upload_chunked(factory, size, tags, on_progress, params) else: - return self._upload_single(data, size, tags, on_progress) + return self._upload_single(factory, size, tags, on_progress) def _should_use_chunked_upload(self, size: int, params: ChunkingParams) -> bool: """Determine if chunked upload should be used""" @@ -112,15 +133,17 @@ def _should_use_chunked_upload(self, size: int, params: ChunkingParams) -> bool: def _upload_single( self, - data: Union[bytes, BinaryIO], + stream_factory: StreamFactory, size: int, tags: Optional[List[Dict[str, str]]], on_progress: Optional[ProgressCallback], ) -> TurboUploadResponse: """Upload using single request (for small files)""" - # Read data if it's a stream - if not isinstance(data, bytes): - data = data.read() + # Read all data from stream + stream = stream_factory() + data = stream.read() + if hasattr(stream, "close"): + stream.close() # Create and sign DataItem data_item = create_data(bytearray(data), self.signer, tags) @@ -157,23 +180,17 @@ def _upload_single( def _upload_chunked( self, - data: Union[bytes, BinaryIO], + stream_factory: StreamFactory, size: int, tags: Optional[List[Dict[str, str]]], on_progress: Optional[ProgressCallback], params: ChunkingParams, ) -> TurboUploadResponse: """Upload using chunked/multipart upload (for large files)""" - # Wrap bytes in BytesIO for unified streaming path - if isinstance(data, bytes): - data_stream = io.BytesIO(data) - else: - data_stream = data - # Use StreamingDataItem for all chunked uploads # This signs data by streaming through it, avoiding memory duplication streaming_item = StreamingDataItem( - data_stream=data_stream, + stream_factory=stream_factory, data_size=size, signer=self.signer, tags=tags, From b42870428f194608491ff55c3f13cbc80663da8f Mon Sep 17 00:00:00 2001 From: dtfiedler Date: Tue, 27 Jan 2026 20:57:16 -0600 Subject: [PATCH 07/15] refactor: move signer from constructor to sign() method on StreamingDataItem StreamingDataItem now accepts the signer via sign(signer) instead of at construction time, matching the DataItem sign() pattern. Also exposes identity properties (id, raw_id, signature, owner, etc.) after signing. Co-Authored-By: Claude Opus 4.5 --- tests/test_streaming_dataitem.py | 167 ++++++++++++++++++++++++------- turbo_sdk/bundle/stream.py | 128 ++++++++++++++++------- turbo_sdk/client.py | 6 +- 3 files changed, 228 insertions(+), 73 deletions(-) diff --git a/tests/test_streaming_dataitem.py b/tests/test_streaming_dataitem.py index 982c610..459b073 100644 --- a/tests/test_streaming_dataitem.py +++ b/tests/test_streaming_dataitem.py @@ -161,7 +161,6 @@ def test_streaming_matches_inmemory(self, signer): streaming = StreamingDataItem( stream_factory=lambda: io.BytesIO(test_data), data_size=len(test_data), - signer=signer, tags=tags, ) # Manually set the anchor to match @@ -184,6 +183,8 @@ def test_streaming_matches_inmemory(self, signer): signer=signer, ) + streaming._signer = signer + streaming._signature = signature streaming._data_stream = streaming._stream_factory() streaming._header = create_data_header( signer=signer, @@ -191,28 +192,30 @@ def test_streaming_matches_inmemory(self, signer): tags=tags, anchor=streaming._anchor, ) - streaming._prepared = True + streaming._signed = True actual = streaming.read(-1) assert actual == expected - def test_prepare_returns_correct_size(self, signer): - """prepare() should return correct total size.""" + def test_sign_returns_raw_id(self, signer): + """sign() should return raw_id bytes.""" test_data = b"x" * 1000 streaming = StreamingDataItem( stream_factory=lambda: io.BytesIO(test_data), data_size=len(test_data), - signer=signer, tags=[{"name": "Test", "value": "value"}], ) - total_size = streaming.prepare() + result = streaming.sign(signer) + + assert result == streaming.raw_id + assert len(result) == 32 # SHA-256 digest - # Total should be header + data + # total_size should still be header + data + total_size = streaming.total_size assert total_size == streaming.header_size + len(test_data) - assert total_size == streaming.total_size def test_read_chunks_correctly(self, signer): """Reading in chunks should produce same result as reading all.""" @@ -221,10 +224,10 @@ def test_read_chunks_correctly(self, signer): streaming = StreamingDataItem( stream_factory=lambda: io.BytesIO(test_data), data_size=len(test_data), - signer=signer, tags=None, ) - total_size = streaming.prepare() + streaming.sign(signer) + total_size = streaming.total_size # Read in chunks chunks = [] @@ -250,24 +253,22 @@ def test_read_empty_returns_empty(self, signer): streaming = StreamingDataItem( stream_factory=lambda: io.BytesIO(test_data), data_size=len(test_data), - signer=signer, tags=None, ) - streaming.prepare() + streaming.sign(signer) result = streaming.read(0) assert result == b"" - def test_raises_if_not_prepared(self, signer): - """Should raise error if read() called before prepare().""" + def test_raises_if_not_signed(self, signer): + """Should raise error if read() called before sign().""" streaming = StreamingDataItem( stream_factory=lambda: io.BytesIO(b"test"), data_size=4, - signer=signer, tags=None, ) - with pytest.raises(RuntimeError, match="Must call prepare"): + with pytest.raises(RuntimeError, match="Must call sign"): streaming.read(10) def test_works_with_non_seekable_stream(self, signer): @@ -299,11 +300,11 @@ def factory(): streaming = StreamingDataItem( stream_factory=factory, data_size=len(test_data), - signer=signer, tags=None, ) - total_size = streaming.prepare() + streaming.sign(signer) + total_size = streaming.total_size result = streaming.read(-1) # Factory should have been called twice: once for signing, once for upload @@ -317,10 +318,9 @@ def test_reset_allows_rereading(self, signer): streaming = StreamingDataItem( stream_factory=lambda: io.BytesIO(test_data), data_size=len(test_data), - signer=signer, tags=None, ) - streaming.prepare() + streaming.sign(signer) first_read = streaming.read(-1) streaming.reset() @@ -329,7 +329,7 @@ def test_reset_allows_rereading(self, signer): assert first_read == second_read def test_progress_callback_during_signing(self, signer): - """Progress callback should be called during prepare().""" + """Progress callback should be called during sign().""" test_data = b"x" * 10000 progress_calls = [] @@ -339,11 +339,10 @@ def on_progress(processed, total): streaming = StreamingDataItem( stream_factory=lambda: io.BytesIO(test_data), data_size=len(test_data), - signer=signer, tags=None, on_sign_progress=on_progress, ) - streaming.prepare() + streaming.sign(signer) assert len(progress_calls) > 0 assert progress_calls[-1][0] == len(test_data) @@ -357,11 +356,11 @@ def test_large_data_streaming(self, signer): streaming = StreamingDataItem( stream_factory=lambda: io.BytesIO(test_data), data_size=len(test_data), - signer=signer, tags=[{"name": "Size", "value": "1MB"}], ) - total_size = streaming.prepare() + streaming.sign(signer) + total_size = streaming.total_size # Read in chunks and verify total size total_read = 0 @@ -380,10 +379,9 @@ def test_header_then_data_boundary(self, signer): streaming = StreamingDataItem( stream_factory=lambda: io.BytesIO(test_data), data_size=len(test_data), - signer=signer, tags=None, ) - streaming.prepare() + streaming.sign(signer) # Get header size header_size = streaming.header_size @@ -396,16 +394,112 @@ def test_header_then_data_boundary(self, signer): data_part = streaming.read(-1) assert data_part == test_data - def test_seekable_returns_false(self, signer): - """StreamingDataItem should not be seekable after prepare().""" + def test_is_signed(self, signer): + """is_signed should reflect whether sign() has been called.""" + streaming = StreamingDataItem( + stream_factory=lambda: io.BytesIO(b"test"), + data_size=4, + tags=None, + ) + + assert streaming.is_signed is False + streaming.sign(signer) + assert streaming.is_signed is True + + def test_signature_type(self, signer): + """signature_type should return the signer's type after sign().""" streaming = StreamingDataItem( stream_factory=lambda: io.BytesIO(b"test"), data_size=4, + tags=None, + ) + streaming.sign(signer) + + assert streaming.signature_type == signer.signature_type + + def test_id_and_signature_match_dataitem(self, signer): + """id, signature, and owner should match an equivalent DataItem.""" + test_data = b"property parity test" + tags = [{"name": "Content-Type", "value": "text/plain"}] + + # In-memory DataItem + data_item = create_data(bytearray(test_data), signer, tags) + sign(data_item, signer) + + # StreamingDataItem with same anchor + streaming = StreamingDataItem( + stream_factory=lambda: io.BytesIO(test_data), + data_size=len(test_data), + tags=tags, + ) + streaming._anchor = data_item.raw_anchor + + # Manually sign with same anchor + from turbo_sdk.bundle.sign import sign_stream + from turbo_sdk.bundle.tags import encode_tags + + encoded_tags = encode_tags(tags) + sign_stream_obj = streaming._stream_factory() + sig = sign_stream( + signature_type=signer.signature_type, + raw_owner=signer.public_key, + raw_target=b"", + raw_anchor=streaming._anchor, + raw_tags=encoded_tags, + data_stream=sign_stream_obj, + data_size=len(test_data), + signer=signer, + ) + streaming._signer = signer + streaming._signature = sig + streaming._data_stream = streaming._stream_factory() + streaming._header = create_data_header( signer=signer, + signature=sig, + tags=tags, + anchor=streaming._anchor, + ) + streaming._signed = True + + assert streaming.id == data_item.id + assert streaming.raw_id == data_item.raw_id + assert streaming.raw_signature == bytes(data_item.raw_signature) + assert streaming.signature == data_item.signature + assert streaming.raw_owner == bytes(data_item.raw_owner) + assert streaming.owner == data_item.owner + assert streaming.raw_anchor == bytes(data_item.raw_anchor) + assert streaming.raw_target == bytes(data_item.raw_target) + + def test_properties_before_sign_raises(self, signer): + """Properties that require signing should raise before sign().""" + streaming = StreamingDataItem( + stream_factory=lambda: io.BytesIO(b"test"), + data_size=4, tags=None, ) - assert streaming.seekable() is False + with pytest.raises(RuntimeError, match="Must call sign"): + _ = streaming.signature_type + + with pytest.raises(RuntimeError, match="Must call sign"): + _ = streaming.raw_signature + + with pytest.raises(RuntimeError, match="Must call sign"): + _ = streaming.raw_owner + + with pytest.raises(RuntimeError, match="Must call sign"): + _ = streaming.raw_anchor + + def test_tags_property(self, signer): + """tags should return the tags passed at construction.""" + tags = [{"name": "App", "value": "Test"}] + streaming = StreamingDataItem( + stream_factory=lambda: io.BytesIO(b"test"), + data_size=4, + tags=tags, + ) + + assert streaming.tags == tags class TestStreamingDataItemIntegration: @@ -439,7 +533,6 @@ def open_file(): streaming = StreamingDataItem( stream_factory=open_file, data_size=len(test_data), - signer=signer, tags=None, ) # Use same anchor @@ -458,6 +551,8 @@ def open_file(): ) sign_stream_obj.close() + streaming._signer = signer + streaming._signature = signature streaming._data_stream = streaming._stream_factory() streaming._header = create_data_header( signer=signer, @@ -465,7 +560,7 @@ def open_file(): tags=None, anchor=streaming._anchor, ) - streaming._prepared = True + streaming._signed = True streamed_result = streaming.read(-1) streaming._data_stream.close() @@ -473,8 +568,8 @@ def open_file(): expected = bytes(data_item.get_raw()) assert streamed_result == expected - def test_with_tempfile_using_prepare(self, signer, tmp_path): - """Test using the normal prepare() flow with a temp file.""" + def test_with_tempfile_using_sign(self, signer, tmp_path): + """Test using the normal sign() flow with a temp file.""" # Create a temporary file test_file = tmp_path / "test_data.bin" test_data = os.urandom(10000) @@ -486,11 +581,11 @@ def open_file(): streaming = StreamingDataItem( stream_factory=open_file, data_size=len(test_data), - signer=signer, tags=[{"name": "App", "value": "Test"}], ) - total_size = streaming.prepare() + streaming.sign(signer) + total_size = streaming.total_size result = streaming.read(-1) # Verify size matches diff --git a/turbo_sdk/bundle/stream.py b/turbo_sdk/bundle/stream.py index ef3dbfc..bd574f1 100644 --- a/turbo_sdk/bundle/stream.py +++ b/turbo_sdk/bundle/stream.py @@ -1,8 +1,13 @@ """Streaming DataItem support for large file uploads.""" +import base64 +import hashlib import os from typing import BinaryIO, Callable, Dict, List, Optional +from base58 import b58encode + +from .constants import SIG_CONFIG from .create import create_data_header from .sign import sign_stream from .tags import encode_tags @@ -21,6 +26,10 @@ class StreamingDataItem: 2. Builds the DataItem header with the computed signature 3. Provides a read() interface that returns header bytes first, then data + After sign(), exposes the same properties as DataItem: + id, raw_id, signature, raw_signature, signature_type, + owner, raw_owner, raw_target, raw_anchor, tags. + Uses a stream_factory pattern: a callable that returns a fresh stream each time it's called. This allows: - Non-seekable streams (generators, network streams, etc.) @@ -34,10 +43,9 @@ def open_file(): streaming = StreamingDataItem( stream_factory=open_file, data_size=file_size, - signer=signer, tags=tags, ) - total_size = streaming.prepare() + streaming.sign(signer) # Now use streaming.read() to get chunks for upload """ @@ -45,7 +53,6 @@ def __init__( self, stream_factory: StreamFactory, data_size: int, - signer, tags: Optional[List[Dict[str, str]]] = None, on_sign_progress: Optional[Callable[[int, int], None]] = None, ): @@ -55,34 +62,39 @@ def __init__( Args: stream_factory: A callable that returns a fresh BinaryIO stream data_size: Total size of the data in bytes - signer: The signer object with signature_type, public_key, sign() tags: Optional list of tags as dictionaries with 'name' and 'value' on_sign_progress: Optional callback(processed, total) during signing """ self._stream_factory = stream_factory self._data_size = data_size - self._signer = signer self._tags = tags or [] self._on_sign_progress = on_sign_progress self._header: Optional[bytes] = None self._header_offset = 0 - self._prepared = False + self._signed = False + self._signer = None + self._signature: Optional[bytes] = None self._anchor: Optional[bytes] = None self._data_stream: Optional[BinaryIO] = None - def prepare(self) -> int: + def sign(self, signer) -> bytes: """ - Sign the data (streaming) and prepare the header. + Sign the data (streaming) and build the header. This creates a fresh stream from the factory to compute the signature, then creates another fresh stream for the upload phase. + Args: + signer: The signer object with signature_type, public_key, sign() + Returns: - Total size in bytes (header + data) + The raw ID (SHA-256 of the signature) """ - if self._prepared: - return len(self._header) + self._data_size + if self._signed: + return self.raw_id + + self._signer = signer # Generate random anchor self._anchor = os.urandom(32) @@ -94,15 +106,15 @@ def prepare(self) -> int: sign_stream_obj = self._stream_factory() # Compute signature by streaming through data - signature = sign_stream( - signature_type=self._signer.signature_type, - raw_owner=self._signer.public_key, + self._signature = sign_stream( + signature_type=signer.signature_type, + raw_owner=signer.public_key, raw_target=b"", raw_anchor=self._anchor, raw_tags=encoded_tags, data_stream=sign_stream_obj, data_size=self._data_size, - signer=self._signer, + signer=signer, on_progress=self._on_sign_progress, ) @@ -118,27 +130,79 @@ def prepare(self) -> int: # Build header with the computed signature self._header = create_data_header( - signer=self._signer, - signature=signature, + signer=signer, + signature=self._signature, tags=self._tags, anchor=self._anchor, ) - self._prepared = True - return len(self._header) + self._data_size + self._signed = True + return self.raw_id + + @property + def is_signed(self) -> bool: + return self._signed + + @property + def signature_type(self) -> int: + if not self._signed: + raise RuntimeError("Must call sign() first") + return self._signer.signature_type + + @property + def raw_signature(self) -> bytes: + if not self._signed: + raise RuntimeError("Must call sign() first") + return self._signature + + @property + def signature(self) -> bytes: + return base64.urlsafe_b64encode(self.raw_signature) + + @property + def raw_id(self) -> bytes: + return hashlib.sha256(self.raw_signature).digest() + + @property + def id(self) -> str: + return b58encode(self.raw_id).decode("utf-8") + + @property + def raw_owner(self) -> bytes: + if not self._signed: + raise RuntimeError("Must call sign() first") + return self._signer.public_key + + @property + def owner(self) -> bytes: + return base64.urlsafe_b64encode(self.raw_owner) + + @property + def raw_target(self) -> bytes: + return b"" + + @property + def raw_anchor(self) -> bytes: + if not self._signed: + raise RuntimeError("Must call sign() first") + return self._anchor + + @property + def tags(self) -> List[Dict[str, str]]: + return self._tags @property def total_size(self) -> int: - """Total size in bytes (header + data). Must call prepare() first.""" - if not self._prepared: - raise RuntimeError("Must call prepare() first") + """Total size in bytes (header + data). Must call sign() first.""" + if not self._signed: + raise RuntimeError("Must call sign() first") return len(self._header) + self._data_size @property def header_size(self) -> int: - """Size of the header in bytes. Must call prepare() first.""" - if not self._prepared: - raise RuntimeError("Must call prepare() first") + """Size of the header in bytes. Must call sign() first.""" + if not self._signed: + raise RuntimeError("Must call sign() first") return len(self._header) def read(self, size: int = -1) -> bytes: @@ -155,10 +219,10 @@ def read(self, size: int = -1) -> bytes: Bytes read (may be less than size if at end) Raises: - RuntimeError: If prepare() has not been called + RuntimeError: If sign() has not been called """ - if not self._prepared: - raise RuntimeError("Must call prepare() first") + if not self._signed: + raise RuntimeError("Must call sign() first") if size == 0: return b"" @@ -194,18 +258,14 @@ def read(self, size: int = -1) -> bytes: return bytes(result) - def seekable(self) -> bool: - """Return False - StreamingDataItem is forward-only after prepare().""" - return False - def reset(self) -> None: """ Reset the streaming position to allow re-reading. Creates a fresh data stream from the factory and resets the header offset. """ - if not self._prepared: - raise RuntimeError("Must call prepare() first") + if not self._signed: + raise RuntimeError("Must call sign() first") self._header_offset = 0 # Close existing stream if it has a close method if self._data_stream is not None and hasattr(self._data_stream, "close"): diff --git a/turbo_sdk/client.py b/turbo_sdk/client.py index 99b2a8c..cdb4825 100644 --- a/turbo_sdk/client.py +++ b/turbo_sdk/client.py @@ -192,12 +192,12 @@ def _upload_chunked( streaming_item = StreamingDataItem( stream_factory=stream_factory, data_size=size, - signer=self.signer, tags=tags, ) - # Prepare signs the data (streaming) and builds the header - total_size = streaming_item.prepare() + # Sign the data (streaming) and build the header + streaming_item.sign(self.signer) + total_size = streaming_item.total_size # Create chunked uploader uploader = ChunkedUploader( From 8bdc1617ca4703394f59b99d3af2324ec0a2dbf7 Mon Sep 17 00:00:00 2001 From: dtfiedler Date: Tue, 27 Jan 2026 21:00:44 -0600 Subject: [PATCH 08/15] fix: read BinaryIO into bytes so stream factory survives sign() close The stream_factory lambda for BinaryIO inputs was returning the same stream object (with seek(0)). Since sign() closes its signing stream, the second factory call would fail on a closed file. Now we read the bytes upfront and create independent BytesIO instances. Co-Authored-By: Claude Opus 4.5 --- turbo_sdk/client.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/turbo_sdk/client.py b/turbo_sdk/client.py index cdb4825..f63998b 100644 --- a/turbo_sdk/client.py +++ b/turbo_sdk/client.py @@ -105,13 +105,13 @@ def upload( size = len(data) factory = lambda: io.BytesIO(data) else: - # data is a BinaryIO + # data is a BinaryIO – read into bytes so the factory can + # produce independent streams (sign() closes the signing stream). if data_size is None: raise ValueError("data_size is required when data is a file-like object") size = data_size - # Capture data in closure to avoid late binding issues - stream = data - factory = lambda: (stream.seek(0), stream)[1] + raw = data.read() + factory = lambda: io.BytesIO(raw) # Determine chunking mode params = chunking or ChunkingParams() From 89ad2bf89e4a25376231fafd76ee00b1fb154abf Mon Sep 17 00:00:00 2001 From: dtfiedler Date: Tue, 27 Jan 2026 21:21:01 -0600 Subject: [PATCH 09/15] chore(docs): update README, add warning log, clean up tests --- README.md | 76 +++++++++++++++++++++++-------- tests/test_chunked.py | 30 ++++++------ tests/test_chunked_performance.py | 2 +- turbo_sdk/chunked.py | 8 ++-- turbo_sdk/client.py | 13 ++++++ turbo_sdk/types.py | 6 +-- 6 files changed, 93 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index 01ebb2d..254452c 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,7 @@ print(f"✅ Uploaded! URI: ar://{result.id}") Main client for interacting with Turbo services. **Parameters:** + - `signer`: Either `EthereumSigner` or `ArweaveSigner` instance - `network`: `"mainnet"` or `"testnet"` (default: `"mainnet"`) - `upload_url`: Optional custom upload service URL (overrides network default) @@ -84,16 +85,18 @@ turbo = Turbo(signer, upload_url="https://my-upload-service.example.com") **Methods:** -##### `upload(data, tags=None, on_progress=None, chunking=None, data_size=None) -> TurboUploadResponse` +##### `upload(data=None, tags=None, on_progress=None, chunking=None, data_size=None, stream_factory=None) -> TurboUploadResponse` Upload data to the Turbo datachain. Supports both small files (single request) and large files (chunked multipart upload). **Parameters:** + - `data`: Data to upload (`bytes` or file-like `BinaryIO` object) - `tags`: Optional list of metadata tags - `on_progress`: Optional callback `(processed_bytes, total_bytes) -> None` - `chunking`: Optional `ChunkingParams` for upload configuration -- `data_size`: Required when `data` is a file-like object +- `data_size`: Required when `data` is a file-like object or when using `stream_factory` +- `stream_factory`: Optional callable that returns a fresh `BinaryIO` stream each time it's called. Use this for non-seekable streams or when you want to avoid loading the entire file into memory. ```python # Simple upload @@ -107,6 +110,7 @@ result = turbo.upload( ``` **Returns:** `TurboUploadResponse` + ```python @dataclass class TurboUploadResponse: @@ -119,21 +123,23 @@ class TurboUploadResponse: ##### Large File Uploads with Progress -For files >= 5 MiB, the SDK automatically uses chunked multipart uploads. You can track progress with a callback: +For files >= 5 MiB, the SDK automatically uses chunked multipart uploads. Use `stream_factory` to avoid loading the entire file into memory. A factory is needed because the stream is consumed twice — once for signing and once for uploading — so the SDK calls it each time to get a fresh stream. ```python +import os + def on_progress(processed: int, total: int): pct = (processed / total) * 100 print(f"Upload progress: {pct:.1f}%") -# Upload a large file with progress tracking -with open("large-video.mp4", "rb") as f: - result = turbo.upload( - data=f, - data_size=os.path.getsize("large-video.mp4"), - tags=[{"name": "Content-Type", "value": "video/mp4"}], - on_progress=on_progress, - ) +file_path = "large-video.mp4" + +result = turbo.upload( + stream_factory=lambda: open(file_path, "rb"), + data_size=os.path.getsize(file_path), + tags=[{"name": "Content-Type", "value": "video/mp4"}], + on_progress=on_progress, +) ``` ##### Chunking Configuration @@ -146,7 +152,7 @@ from turbo_sdk import ChunkingParams result = turbo.upload( data=large_data, chunking=ChunkingParams( - chunk_byte_count=10 * 1024 * 1024, # 10 MiB chunks (default: 5 MiB) + chunk_size=10 * 1024 * 1024, # 10 MiB chunks (default: 5 MiB) max_chunk_concurrency=3, # Parallel chunk uploads (default: 1) chunking_mode="auto", # "auto", "force", or "disabled" ), @@ -155,7 +161,8 @@ result = turbo.upload( ``` **ChunkingParams options:** -- `chunk_byte_count`: Chunk size in bytes (5-500 MiB, default: 5 MiB) + +- `chunk_size`: Chunk size in bytes (5-500 MiB, default: 5 MiB) - `max_chunk_concurrency`: Number of parallel chunk uploads (default: 1) - `chunking_mode`: - `"auto"` (default): Use chunked upload for files >= 5 MiB @@ -177,6 +184,7 @@ print(f"Other balance: {other_balance.winc} winc") ``` **Returns:** `TurboBalanceResponse` + ```python @dataclass class TurboBalanceResponse: @@ -201,6 +209,7 @@ print(f"Upload cost: {cost} winc") Ethereum signer using ECDSA signatures. **Parameters:** + - `private_key` (str): Hex private key with or without `0x` prefix ```python @@ -212,6 +221,7 @@ signer = EthereumSigner("0x1234567890abcdef...") Arweave signer using RSA-PSS signatures. **Parameters:** + - `jwk` (dict): Arweave wallet in JWK format ```python @@ -261,36 +271,42 @@ except ChunkedUploadError as e: ``` **Exception types:** + - `ChunkedUploadError`: Base exception for chunked upload failures - `UnderfundedError`: Account has insufficient balance (HTTP 402) - `UploadValidationError`: Upload validation failed (INVALID status) - `UploadFinalizationError`: Finalization timed out or failed - ## Developers ### Setup -1. **Crete a virtual environment:** +1. **Create a virtual environment:** ```bash python -m venv venv -source venv/bin/activate +source venv/bin/activate ``` -1. **Install dependencies:** +2. **Install dependencies:** ```bash pip install -e ".[dev]" ``` -2. **Run tests:** +3. **Run tests:** ```bash pytest ``` -3. **Run performance benchmarks** (requires funded wallet): +With coverage + +```bash +pytest --cov=turbo_sdk +``` + +4. **Run performance benchmarks** (requires funded wallet): ```bash export TURBO_TEST_WALLET=/path/to/wallet.json @@ -300,6 +316,28 @@ pytest -m performance -v -s The test suite includes comprehensive unit tests for all components. Performance tests measure real upload throughput against the Turbo service. +## Publishing + +Releases are published to PyPI via the GitHub Actions workflow at `.github/workflows/release.yml`. It runs on `release` events or can be triggered manually via `workflow_dispatch`. + +There is no automated versioning. Before publishing, update the `version` field in `pyproject.toml` to reflect the new release: + +```toml +[project] +version = "0.0.5" +``` + +The workflow runs tests across Python 3.8-3.12, builds the package, and publishes to PyPI using trusted OIDC publishing. + +To publish locally instead: + +```bash +pip install build twine +python -m build +twine check dist/* +twine upload dist/* +``` + ## Acknowledgments This package leverages implementations from the [Irys Python SDK](https://github.com/Irys-xyz/python-sdk) for ANS-104 DataItem format and cryptographic operations. Special thanks to the Irys team for their work on permanent data storage standards. diff --git a/tests/test_chunked.py b/tests/test_chunked.py index 09eafd4..d57d01b 100644 --- a/tests/test_chunked.py +++ b/tests/test_chunked.py @@ -21,7 +21,7 @@ class TestChunkingParams: def test_default_values(self): """Test default parameter values""" params = ChunkingParams() - assert params.chunk_byte_count == 5 * 1024 * 1024 # 5 MiB + assert params.chunk_size == 5 * 1024 * 1024 # 5 MiB assert params.max_chunk_concurrency == 1 assert params.chunking_mode == "auto" assert params.max_finalize_ms == 150_000 @@ -29,35 +29,35 @@ def test_default_values(self): def test_custom_values(self): """Test custom parameter values""" params = ChunkingParams( - chunk_byte_count=10 * 1024 * 1024, + chunk_size=10 * 1024 * 1024, max_chunk_concurrency=4, chunking_mode="force", max_finalize_ms=300_000, ) - assert params.chunk_byte_count == 10 * 1024 * 1024 + assert params.chunk_size == 10 * 1024 * 1024 assert params.max_chunk_concurrency == 4 assert params.chunking_mode == "force" assert params.max_finalize_ms == 300_000 def test_chunk_size_validation_too_small(self): """Test validation rejects chunk size below 5 MiB""" - with pytest.raises(ValueError, match="chunk_byte_count must be between"): - ChunkingParams(chunk_byte_count=1024 * 1024) # 1 MiB + with pytest.raises(ValueError, match="chunk_size must be between"): + ChunkingParams(chunk_size=1024 * 1024) # 1 MiB def test_chunk_size_validation_too_large(self): """Test validation rejects chunk size above 500 MiB""" - with pytest.raises(ValueError, match="chunk_byte_count must be between"): - ChunkingParams(chunk_byte_count=600 * 1024 * 1024) # 600 MiB + with pytest.raises(ValueError, match="chunk_size must be between"): + ChunkingParams(chunk_size=600 * 1024 * 1024) # 600 MiB def test_chunk_size_at_boundaries(self): """Test chunk size at valid boundaries""" # Minimum boundary - params_min = ChunkingParams(chunk_byte_count=5 * 1024 * 1024) - assert params_min.chunk_byte_count == 5 * 1024 * 1024 + params_min = ChunkingParams(chunk_size=5 * 1024 * 1024) + assert params_min.chunk_size == 5 * 1024 * 1024 # Maximum boundary - params_max = ChunkingParams(chunk_byte_count=500 * 1024 * 1024) - assert params_max.chunk_byte_count == 500 * 1024 * 1024 + params_max = ChunkingParams(chunk_size=500 * 1024 * 1024) + assert params_max.chunk_size == 500 * 1024 * 1024 def test_concurrency_validation(self): """Test validation rejects concurrency below 1""" @@ -81,7 +81,7 @@ def test_init(self, uploader): """Test uploader initialization""" assert uploader.upload_url == "https://upload.test.io" assert uploader.token == "arweave" - assert uploader.params.chunk_byte_count == 5 * 1024 * 1024 + assert uploader.params.chunk_size == 5 * 1024 * 1024 def test_chunking_version_header(self, uploader): """Test x-chunking-version header is set""" @@ -402,7 +402,7 @@ def test_upload_chunks_sequential_bytes(self, mock_session_class): mock_session.headers = {} mock_session_class.return_value = mock_session - params = ChunkingParams(chunk_byte_count=5 * 1024 * 1024) + params = ChunkingParams(chunk_size=5 * 1024 * 1024) uploader = ChunkedUploader( upload_url="https://upload.test.io", token="arweave", @@ -433,7 +433,7 @@ def test_upload_chunks_sequential_stream(self, mock_session_class): mock_session.headers = {} mock_session_class.return_value = mock_session - params = ChunkingParams(chunk_byte_count=5 * 1024 * 1024) + params = ChunkingParams(chunk_size=5 * 1024 * 1024) uploader = ChunkedUploader( upload_url="https://upload.test.io", token="arweave", @@ -462,7 +462,7 @@ def test_upload_chunks_concurrent(self, mock_session_class): mock_session_class.return_value = mock_session params = ChunkingParams( - chunk_byte_count=5 * 1024 * 1024, + chunk_size=5 * 1024 * 1024, max_chunk_concurrency=3, ) uploader = ChunkedUploader( diff --git a/tests/test_chunked_performance.py b/tests/test_chunked_performance.py index a4e4ec4..efc6e1e 100644 --- a/tests/test_chunked_performance.py +++ b/tests/test_chunked_performance.py @@ -147,7 +147,7 @@ def _upload_chunked( tags=[{"name": "Test", "value": f"perf-chunked-{label}"}], chunking=ChunkingParams( chunking_mode="force", - chunk_byte_count=chunk_size, + chunk_size=chunk_size, max_chunk_concurrency=concurrency, ), ) diff --git a/turbo_sdk/chunked.py b/turbo_sdk/chunked.py index 65bc3f0..330f0a4 100644 --- a/turbo_sdk/chunked.py +++ b/turbo_sdk/chunked.py @@ -82,7 +82,7 @@ def _get_max_finalize_time(self, total_bytes: int) -> float: def initiate(self) -> ChunkedUploadInit: """Initiate a new chunked upload session""" url = f"{self.upload_url}/chunks/{self.token}/-1/-1" - params = {"chunkSize": self.params.chunk_byte_count} + params = {"chunkSize": self.params.chunk_size} response = self._session.get(url, params=params) @@ -92,7 +92,7 @@ def initiate(self) -> ChunkedUploadInit: id=data["id"], min=data["min"], max=data["max"], - chunk_size=data.get("chunkSize", self.params.chunk_byte_count), + chunk_size=data.get("chunkSize", self.params.chunk_size), ) elif response.status_code == 503: raise ChunkedUploadError(f"Service unavailable: {response.text}") @@ -192,7 +192,7 @@ def upload_chunks_sequential( on_progress: Optional[ProgressCallback] = None, ) -> None: """Upload chunks sequentially""" - chunk_size = self.params.chunk_byte_count + chunk_size = self.params.chunk_size offset = 0 processed = 0 @@ -222,7 +222,7 @@ def upload_chunks_concurrent( on_progress: Optional[ProgressCallback] = None, ) -> None: """Upload chunks concurrently (requires bytes, not stream)""" - chunk_size = self.params.chunk_byte_count + chunk_size = self.params.chunk_size max_workers = self.params.max_chunk_concurrency # Build list of chunks diff --git a/turbo_sdk/client.py b/turbo_sdk/client.py index f63998b..3641bf3 100644 --- a/turbo_sdk/client.py +++ b/turbo_sdk/client.py @@ -1,4 +1,5 @@ import io +import logging from typing import BinaryIO, List, Dict, Optional, Union import requests @@ -13,6 +14,9 @@ from .chunked import ChunkedUploader +logger = logging.getLogger(__name__) + + class Turbo: """Main Turbo client for uploading data and managing payments""" @@ -110,6 +114,15 @@ def upload( if data_size is None: raise ValueError("data_size is required when data is a file-like object") size = data_size + if size >= self.CHUNKING_THRESHOLD: + logger.warning( + "Large file (%d bytes) passed as data will be read entirely " + "into memory. Consider providing stream_factory instead, e.g. " + "turbo.upload(stream_factory=lambda: open('my-file.bin', 'rb'), " + "data_size=%d)", + size, + size, + ) raw = data.read() factory = lambda: io.BytesIO(raw) diff --git a/turbo_sdk/types.py b/turbo_sdk/types.py index 432a96f..78c8ab9 100644 --- a/turbo_sdk/types.py +++ b/turbo_sdk/types.py @@ -10,7 +10,7 @@ class ChunkingParams: """Configuration for chunked/multipart uploads""" - chunk_byte_count: int = 5 * 1024 * 1024 # 5 MiB default + chunk_size: int = 5 * 1024 * 1024 # 5 MiB default max_chunk_concurrency: int = 1 chunking_mode: ChunkingMode = "auto" max_finalize_ms: int = 150_000 # 2.5 minutes per GiB @@ -18,8 +18,8 @@ class ChunkingParams: def __post_init__(self): min_chunk = 5 * 1024 * 1024 # 5 MiB max_chunk = 500 * 1024 * 1024 # 500 MiB - if not (min_chunk <= self.chunk_byte_count <= max_chunk): - raise ValueError(f"chunk_byte_count must be between {min_chunk} and {max_chunk} bytes") + if not (min_chunk <= self.chunk_size <= max_chunk): + raise ValueError(f"chunk_size must be between {min_chunk} and {max_chunk} bytes") if self.max_chunk_concurrency < 1: raise ValueError("max_chunk_concurrency must be at least 1") From 9295d2dccf926a6c2fff391be8cce46a3f57cd11 Mon Sep 17 00:00:00 2001 From: dtfiedler Date: Tue, 27 Jan 2026 21:33:00 -0600 Subject: [PATCH 10/15] chore(docs): update README, run linter --- README.md | 9 ++++++++- turbo_sdk/bundle/stream.py | 5 +---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 254452c..659556f 100644 --- a/README.md +++ b/README.md @@ -306,7 +306,14 @@ With coverage pytest --cov=turbo_sdk ``` -4. **Run performance benchmarks** (requires funded wallet): +4. **Lint and format:** + +```bash +black turbo_sdk tests +flake8 turbo_sdk tests +``` + +5. **Run performance benchmarks** (requires funded wallet): ```bash export TURBO_TEST_WALLET=/path/to/wallet.json diff --git a/turbo_sdk/bundle/stream.py b/turbo_sdk/bundle/stream.py index bd574f1..83f6841 100644 --- a/turbo_sdk/bundle/stream.py +++ b/turbo_sdk/bundle/stream.py @@ -7,7 +7,6 @@ from base58 import b58encode -from .constants import SIG_CONFIG from .create import create_data_header from .sign import sign_stream from .tags import encode_tags @@ -240,9 +239,7 @@ def read(self, size: int = -1) -> bytes: if self._header_offset < len(self._header): header_remaining = len(self._header) - self._header_offset to_read = min(header_remaining, remaining) - header_chunk = self._header[ - self._header_offset : self._header_offset + int(to_read) - ] + header_chunk = self._header[self._header_offset : self._header_offset + int(to_read)] result.extend(header_chunk) self._header_offset += len(header_chunk) remaining -= len(header_chunk) From 4f5f894551bffb160f1eab7320395d2bb6955ba9 Mon Sep 17 00:00:00 2001 From: dtfiedler Date: Tue, 27 Jan 2026 21:36:35 -0600 Subject: [PATCH 11/15] chore(docs): update README, run linter --- README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/README.md b/README.md index 659556f..8616a79 100644 --- a/README.md +++ b/README.md @@ -334,6 +334,13 @@ There is no automated versioning. Before publishing, update the `version` field version = "0.0.5" ``` +Steps to release: + +1. Merge feature branches into `alpha`. +2. Review the commits and update the `version` field in `pyproject.toml` accordingly. +3. Push to the `alpha` branch. +4. Manually run the release workflow at `.github/workflows/release.yml` via `workflow_dispatch`. + The workflow runs tests across Python 3.8-3.12, builds the package, and publishes to PyPI using trusted OIDC publishing. To publish locally instead: From 2885f91a34c1af15e66e4cf1728744724ec8bb8a Mon Sep 17 00:00:00 2001 From: dtfiedler Date: Tue, 27 Jan 2026 21:41:48 -0600 Subject: [PATCH 12/15] chore(deps): pin black dep in pyproject.toml --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 708b5f0..af5c8d4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,7 @@ dependencies = [ dev = [ "pytest>=7.4.0", "pytest-cov>=4.1.0", - "black>=23.7.0", + "black==25.11.0", "flake8>=6.1.0", "mypy>=1.5.0", "build>=0.10.0", From 893bcff46d8a6a10a4d92d3414d9fd2cdfdde577 Mon Sep 17 00:00:00 2001 From: dtfiedler Date: Tue, 27 Jan 2026 21:43:38 -0600 Subject: [PATCH 13/15] chore(git): update black dep version to avoid failiures with py 3.11 --- .github/workflows/test-and-build.yml | 5 +++-- pyproject.toml | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test-and-build.yml b/.github/workflows/test-and-build.yml index f06ee1a..006b4a5 100644 --- a/.github/workflows/test-and-build.yml +++ b/.github/workflows/test-and-build.yml @@ -67,6 +67,7 @@ jobs: run: | python -m pip install --upgrade pip pip install -e ".[dev]" + pip install "black==25.11.0" - name: Run Black formatter check run: | @@ -130,9 +131,9 @@ jobs: python -m pip install --upgrade pip pip install -e ".[dev]" safety bandit - - name: Run safety check for known vulnerabilities + - name: Run safety scan for known vulnerabilities run: | - safety check + safety scan continue-on-error: true - name: Run bandit security linter diff --git a/pyproject.toml b/pyproject.toml index af5c8d4..708b5f0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,7 @@ dependencies = [ dev = [ "pytest>=7.4.0", "pytest-cov>=4.1.0", - "black==25.11.0", + "black>=23.7.0", "flake8>=6.1.0", "mypy>=1.5.0", "build>=0.10.0", From 14979e153c06ee869738786309a3e10ef56e42b5 Mon Sep 17 00:00:00 2001 From: dtfiedler Date: Tue, 27 Jan 2026 21:45:51 -0600 Subject: [PATCH 14/15] chore(lint): use def factory function and remove security scan --- .github/workflows/test-and-build.yml | 35 ---------------------------- turbo_sdk/client.py | 9 +++++-- 2 files changed, 7 insertions(+), 37 deletions(-) diff --git a/.github/workflows/test-and-build.yml b/.github/workflows/test-and-build.yml index 006b4a5..8d89017 100644 --- a/.github/workflows/test-and-build.yml +++ b/.github/workflows/test-and-build.yml @@ -113,38 +113,3 @@ jobs: name: python-package path: dist/ retention-days: 7 - - security: - name: Security Scan - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v4 - - - name: Set up Python 3.11 - uses: actions/setup-python@v5 - with: - python-version: "3.11" - - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install -e ".[dev]" safety bandit - - - name: Run safety scan for known vulnerabilities - run: | - safety scan - continue-on-error: true - - - name: Run bandit security linter - run: | - bandit -r turbo_sdk/ -f json -o bandit-report.json - continue-on-error: true - - - name: Upload security report - if: always() - uses: actions/upload-artifact@v4 - with: - name: security-reports - path: bandit-report.json - retention-days: 7 diff --git a/turbo_sdk/client.py b/turbo_sdk/client.py index 3641bf3..bdfb630 100644 --- a/turbo_sdk/client.py +++ b/turbo_sdk/client.py @@ -107,7 +107,10 @@ def upload( factory = stream_factory elif isinstance(data, bytes): size = len(data) - factory = lambda: io.BytesIO(data) + + def factory(): + return io.BytesIO(data) + else: # data is a BinaryIO – read into bytes so the factory can # produce independent streams (sign() closes the signing stream). @@ -124,7 +127,9 @@ def upload( size, ) raw = data.read() - factory = lambda: io.BytesIO(raw) + + def factory(): + return io.BytesIO(raw) # Determine chunking mode params = chunking or ChunkingParams() From 85e966b14552efaece50eef8ebe8257a895afd3c Mon Sep 17 00:00:00 2001 From: dtfiedler Date: Tue, 27 Jan 2026 21:49:38 -0600 Subject: [PATCH 15/15] chore(py): update mypy python version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 708b5f0..2e79ace 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,7 +58,7 @@ line-length = 100 target-version = ["py38", "py39", "py310", "py311", "py312"] [tool.mypy] -python_version = "3.8" +python_version = "3.9" warn_return_any = true warn_unused_configs = true disallow_untyped_defs = false