Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 118 additions & 1 deletion pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from collections import deque
from enum import Enum
from typing import (
TYPE_CHECKING,
Any,
Union,
)

from pydantic import ConfigDict, Field, field_validator
from pydantic import ConfigDict, Field, TypeAdapter, field_validator
from requests import HTTPError, Session
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt

Expand All @@ -36,6 +37,16 @@
)
from pyiceberg.catalog.rest.auth import AuthManager, AuthManagerAdapter, AuthManagerFactory, LegacyOAuth2AuthManager
from pyiceberg.catalog.rest.response import _handle_non_200_response
from pyiceberg.catalog.rest.scan_planning import (
FetchScanTasksRequest,
PlanCancelled,
PlanCompleted,
PlanFailed,
PlanningResponse,
PlanSubmitted,
PlanTableScanRequest,
ScanTasks,
)
from pyiceberg.exceptions import (
AuthorizationExpiredError,
CommitFailedException,
Expand All @@ -44,6 +55,7 @@
NamespaceNotEmptyError,
NoSuchIdentifierError,
NoSuchNamespaceError,
NoSuchPlanTaskError,
NoSuchTableError,
NoSuchViewError,
TableAlreadyExistsError,
Expand All @@ -56,6 +68,7 @@
CommitTableRequest,
CommitTableResponse,
CreateTableTransaction,
FileScanTask,
StagedTable,
Table,
TableIdentifier,
Expand Down Expand Up @@ -315,6 +328,9 @@ class ListViewsResponse(IcebergBaseModel):
identifiers: list[ListViewResponseEntry] = Field()


_PLANNING_RESPONSE_ADAPTER = TypeAdapter(PlanningResponse)


class RestCatalog(Catalog):
uri: str
_session: Session
Expand Down Expand Up @@ -384,6 +400,107 @@ def is_rest_scan_planning_enabled(self) -> bool:
self.properties, REST_SCAN_PLANNING_ENABLED, REST_SCAN_PLANNING_ENABLED_DEFAULT
)

@retry(**_RETRY_ARGS)
def _plan_table_scan(self, identifier: str | Identifier, request: PlanTableScanRequest) -> PlanningResponse:
"""Submit a scan plan request to the REST server.

Args:
identifier: Table identifier.
request: The scan plan request parameters.

Returns:
PlanningResponse the result of the scan plan request representing the status
Raises:
NoSuchTableError: If a table with the given identifier does not exist.
"""
self._check_endpoint(Capability.V1_SUBMIT_TABLE_SCAN_PLAN)
response = self._session.post(
self.url(Endpoints.plan_table_scan, prefixed=True, **self._split_identifier_for_path(identifier)),
data=request.model_dump_json(by_alias=True, exclude_none=True).encode(UTF8),
)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchTableError})

return _PLANNING_RESPONSE_ADAPTER.validate_json(response.text)

@retry(**_RETRY_ARGS)
def _fetch_scan_tasks(self, identifier: str | Identifier, plan_task: str) -> ScanTasks:
"""Fetch additional scan tasks using a plan task token.

Args:
identifier: Table identifier.
plan_task: The plan task token from a previous response.

Returns:
ScanTasks containing file scan tasks and possibly more plan-task tokens.

Raises:
NoSuchPlanTaskError: If a plan task with the given identifier or task does not exist.
"""
self._check_endpoint(Capability.V1_TABLE_SCAN_PLAN_TASKS)
request = FetchScanTasksRequest(plan_task=plan_task)
response = self._session.post(
self.url(Endpoints.fetch_scan_tasks, prefixed=True, **self._split_identifier_for_path(identifier)),
data=request.model_dump_json(by_alias=True).encode(UTF8),
)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchPlanTaskError})

return ScanTasks.model_validate_json(response.text)

def plan_scan(self, identifier: str | Identifier, request: PlanTableScanRequest) -> list[FileScanTask]:
"""Plan a table scan and return FileScanTasks.

Handles the full scan planning lifecycle including pagination.

Args:
identifier: Table identifier.
request: The scan plan request parameters.

Returns:
List of FileScanTask objects ready for execution.

Raises:
RuntimeError: If planning fails, is cancelled, or returns unexpected response.
NotImplementedError: If async planning is required but not yet supported.
"""
response = self._plan_table_scan(identifier, request)

if isinstance(response, PlanFailed):
error_msg = response.error.message if response.error else "unknown error"
raise RuntimeError(f"Received status: failed: {error_msg}")

if isinstance(response, PlanCancelled):
raise RuntimeError("Received status: cancelled")

if isinstance(response, PlanSubmitted):
# TODO: implement polling for async planning
raise NotImplementedError(f"Async scan planning not yet supported for planId: {response.plan_id}")

if not isinstance(response, PlanCompleted):
raise RuntimeError(f"Invalid planStatus for response: {type(response).__name__}")

tasks: list[FileScanTask] = []

# Collect tasks from initial response
for task in response.file_scan_tasks:
tasks.append(FileScanTask.from_rest_response(task, response.delete_files))

# Fetch and collect from additional batches
pending_tasks = deque(response.plan_tasks)
while pending_tasks:
plan_task = pending_tasks.popleft()
batch = self._fetch_scan_tasks(identifier, plan_task)
for task in batch.file_scan_tasks:
tasks.append(FileScanTask.from_rest_response(task, batch.delete_files))
pending_tasks.extend(batch.plan_tasks)

return tasks

def _create_legacy_oauth2_auth_manager(self, session: Session) -> AuthManager:
"""Create the LegacyOAuth2AuthManager by fetching required properties.

Expand Down
9 changes: 8 additions & 1 deletion pyiceberg/catalog/rest/scan_planning.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,16 @@

from pyiceberg.catalog.rest.response import ErrorResponseMessage
from pyiceberg.expressions import BooleanExpression, SerializableBooleanExpression
from pyiceberg.manifest import FileFormat
from pyiceberg.manifest import DataFileContent, FileFormat
from pyiceberg.typedef import IcebergBaseModel

# REST content-type to DataFileContent
CONTENT_TYPE_MAP: dict[str, DataFileContent] = {
"data": DataFileContent.DATA,
"position-deletes": DataFileContent.POSITION_DELETES,
"equality-deletes": DataFileContent.EQUALITY_DELETES,
}

# Primitive types that can appear in partition values and bounds
PrimitiveTypeValue: TypeAlias = bool | int | float | str | Decimal | UUID | date | time | datetime | bytes

Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ class NoSuchNamespaceError(Exception):
"""Raised when a referenced name-space is not found."""


class NoSuchPlanTaskError(Exception):
"""Raised when a scan plan task is not found."""


class RESTError(Exception):
"""Raises when there is an unknown response from the REST Catalog."""

Expand Down
128 changes: 123 additions & 5 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@
from pyiceberg_core.datafusion import IcebergDataFusionTable

from pyiceberg.catalog import Catalog
from pyiceberg.catalog.rest.scan_planning import (
RESTContentFile,
RESTDeleteFile,
RESTFileScanTask,
)

ALWAYS_TRUE = AlwaysTrue()
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write"
Expand Down Expand Up @@ -1168,6 +1173,8 @@ def scan(
snapshot_id=snapshot_id,
options=options,
limit=limit,
catalog=self.catalog,
table_identifier=self._identifier,
)

@property
Expand Down Expand Up @@ -1684,6 +1691,8 @@ class TableScan(ABC):
snapshot_id: int | None
options: Properties
limit: int | None
catalog: Catalog | None
table_identifier: Identifier | None

def __init__(
self,
Expand All @@ -1695,6 +1704,8 @@ def __init__(
snapshot_id: int | None = None,
options: Properties = EMPTY_DICT,
limit: int | None = None,
catalog: Catalog | None = None,
table_identifier: Identifier | None = None,
):
self.table_metadata = table_metadata
self.io = io
Expand All @@ -1704,6 +1715,8 @@ def __init__(
self.snapshot_id = snapshot_id
self.options = options
self.limit = limit
self.catalog = catalog
self.table_identifier = table_identifier

def snapshot(self) -> Snapshot | None:
if self.snapshot_id:
Expand Down Expand Up @@ -1798,6 +1811,74 @@ def __init__(
self.delete_files = delete_files or set()
self.residual = residual

@staticmethod
def from_rest_response(
rest_task: RESTFileScanTask,
delete_files: list[RESTDeleteFile],
) -> FileScanTask:
"""Convert a RESTFileScanTask to a FileScanTask.

Args:
rest_task: The REST file scan task.
delete_files: The list of delete files from the ScanTasks response.

Returns:
A FileScanTask with the converted data and delete files.

Raises:
NotImplementedError: If equality delete files are encountered.
"""
from pyiceberg.catalog.rest.scan_planning import RESTEqualityDeleteFile

data_file = _rest_file_to_data_file(rest_task.data_file)

resolved_deletes: set[DataFile] = set()
if rest_task.delete_file_references:
for idx in rest_task.delete_file_references:
delete_file = delete_files[idx]
if isinstance(delete_file, RESTEqualityDeleteFile):
raise NotImplementedError(f"PyIceberg does not yet support equality deletes: {delete_file.file_path}")
resolved_deletes.add(_rest_file_to_data_file(delete_file))

return FileScanTask(
data_file=data_file,
delete_files=resolved_deletes,
residual=rest_task.residual_filter if rest_task.residual_filter else ALWAYS_TRUE,
)


def _rest_file_to_data_file(rest_file: RESTContentFile) -> DataFile:
"""Convert a REST content file to a manifest DataFile."""
from pyiceberg.catalog.rest.scan_planning import CONTENT_TYPE_MAP, RESTDataFile

if isinstance(rest_file, RESTDataFile):
column_sizes = rest_file.column_sizes.to_dict() if rest_file.column_sizes else None
value_counts = rest_file.value_counts.to_dict() if rest_file.value_counts else None
null_value_counts = rest_file.null_value_counts.to_dict() if rest_file.null_value_counts else None
nan_value_counts = rest_file.nan_value_counts.to_dict() if rest_file.nan_value_counts else None
else:
column_sizes = None
value_counts = None
null_value_counts = None
nan_value_counts = None

data_file = DataFile.from_args(
content=CONTENT_TYPE_MAP[rest_file.content],
file_path=rest_file.file_path,
file_format=rest_file.file_format,
partition=Record(*rest_file.partition) if rest_file.partition else Record(),
record_count=rest_file.record_count,
file_size_in_bytes=rest_file.file_size_in_bytes,
column_sizes=column_sizes,
value_counts=value_counts,
null_value_counts=null_value_counts,
nan_value_counts=nan_value_counts,
split_offsets=rest_file.split_offsets,
sort_order_id=rest_file.sort_order_id,
)
data_file.spec_id = rest_file.spec_id
return data_file


def _open_manifest(
io: FileIO,
Expand Down Expand Up @@ -1970,12 +2051,35 @@ def scan_plan_helper(self) -> Iterator[list[ManifestEntry]]:
],
)

def plan_files(self) -> Iterable[FileScanTask]:
"""Plans the relevant files by filtering on the PartitionSpecs.
def _should_use_rest_planning(self) -> bool:
"""Check if REST scan planning should be used for this scan."""
from pyiceberg.catalog.rest import RestCatalog

if not isinstance(self.catalog, RestCatalog):
return False
return self.catalog.is_rest_scan_planning_enabled()

def _plan_files_rest(self) -> Iterable[FileScanTask]:
"""Plan files using REST server-side scan planning."""
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.catalog.rest.scan_planning import PlanTableScanRequest

if not isinstance(self.catalog, RestCatalog):
raise TypeError("REST scan planning requires a RestCatalog")
if self.table_identifier is None:
raise ValueError("REST scan planning requires a table identifier")

request = PlanTableScanRequest(
snapshot_id=self.snapshot_id,
select=list(self.selected_fields) if self.selected_fields != ("*",) else None,
filter=self.row_filter if self.row_filter != ALWAYS_TRUE else None,
case_sensitive=self.case_sensitive,
)

Returns:
List of FileScanTasks that contain both data and delete files.
"""
return self.catalog.plan_scan(self.table_identifier, request)

def _plan_files_local(self) -> Iterable[FileScanTask]:
"""Plan files locally by reading manifests."""
data_entries: list[ManifestEntry] = []
positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER)

Expand Down Expand Up @@ -2006,6 +2110,20 @@ def plan_files(self) -> Iterable[FileScanTask]:
for data_entry in data_entries
]

def plan_files(self) -> Iterable[FileScanTask]:
"""Plans the relevant files by filtering on the PartitionSpecs.

If the table comes from a REST catalog with scan planning enabled,
this will use server-side scan planning. Otherwise, it falls back
to local planning.

Returns:
List of FileScanTasks that contain both data and delete files.
"""
if self._should_use_rest_planning():
return self._plan_files_rest()
return self._plan_files_local()

def to_arrow(self) -> pa.Table:
"""Read an Arrow table eagerly from this DataScan.

Expand Down
Loading