Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions src/dstack/_internal/cli/commands/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
from dataclasses import asdict

from dstack._internal.cli.commands import APIBaseCommand
from dstack._internal.cli.services.events import EventListFilters, EventPaginator, print_event
from dstack._internal.cli.services.events import (
EventListFilters,
EventPaginator,
EventTracker,
print_event,
)
from dstack._internal.cli.utils.common import (
get_start_time,
)
Expand All @@ -29,6 +34,12 @@ def _register(self):
list_parser.set_defaults(subfunc=self._list)

for parser in [self._parser, list_parser]:
parser.add_argument(
"-w",
"--watch",
help="Watch events in realtime",
action="store_true",
)
parser.add_argument(
"--since",
help=(
Expand Down Expand Up @@ -106,7 +117,11 @@ def _list(self, args: argparse.Namespace):
since = get_start_time(args.since)
filters = _build_filters(args, self.api)

if since is not None:
if args.watch:
events = EventTracker(
client=self.api.client.events, filters=filters, since=since
).stream_forever()
elif since is not None:
events = EventPaginator(self.api.client.events).list(
filters=filters, since=since, ascending=True
)
Expand Down
81 changes: 79 additions & 2 deletions src/dstack/_internal/cli/services/events.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import time
import uuid
from collections.abc import Iterator
from dataclasses import asdict, dataclass
from datetime import datetime
from datetime import datetime, timedelta
from typing import Optional

from rich.text import Text

from dstack._internal.cli.utils.common import console
from dstack._internal.cli.utils.common import LIVE_TABLE_PROVISION_INTERVAL_SECS, console
from dstack._internal.core.models.events import Event, EventTargetType
from dstack._internal.server.schemas.events import LIST_EVENTS_DEFAULT_LIMIT
from dstack.api.server._events import EventsAPIClient
Expand Down Expand Up @@ -50,6 +51,82 @@ def list(
prev_recorded_at = events[-1].recorded_at


class EventTracker:
"""
Tracks new events from the server. Implements a sliding window mechanism to avoid
missing events that are commited with a delay.
"""

def __init__(
self,
client: EventsAPIClient,
filters: EventListFilters,
since: Optional[datetime],
event_delay_tolerance: timedelta = timedelta(seconds=20),
) -> None:
self._client = client
self._filters = filters
self._since = since
self._event_delay_tolerance = event_delay_tolerance
self._seen_events: dict[uuid.UUID, _SeenEvent] = {}
self._latest_event: Optional[Event] = None

def poll(self) -> Iterator[Event]:
"""
Fetches the next batch of events from the server.
"""

if self._since is None and self._latest_event is None:
# First batch without `since` - fetch some recent events
event_stream = reversed(self._client.list(ascending=False, **asdict(self._filters)))
else:
configured_since = self._since or datetime.fromtimestamp(0)
latest_event_recorded_at = (
self._latest_event.recorded_at
if self._latest_event is not None
else datetime.fromtimestamp(0)
)
since = max(
configured_since.astimezone(),
latest_event_recorded_at.astimezone() - self._event_delay_tolerance,
)
self._cleanup_seen_events(before=since)
event_stream = EventPaginator(self._client).list(self._filters, since, ascending=True)

for event in event_stream:
if event.id not in self._seen_events:
self._seen_events[event.id] = _SeenEvent(recorded_at=event.recorded_at)
yield event
self._latest_event = event

def stream_forever(
self,
update_interval: timedelta = timedelta(seconds=LIVE_TABLE_PROVISION_INTERVAL_SECS),
) -> Iterator[Event]:
"""
Yields events as they are received from the server.
"""

while True:
for event in self.poll():
yield event
time.sleep(update_interval.total_seconds())

def _cleanup_seen_events(self, before: datetime) -> None:
ids_to_delete = {
event_id
for event_id, seen_event in self._seen_events.items()
if seen_event.recorded_at.astimezone() < before.astimezone()
}
for event_id in ids_to_delete:
del self._seen_events[event_id]


@dataclass
class _SeenEvent:
recorded_at: datetime


def print_event(event: Event) -> None:
recorded_at = event.recorded_at.astimezone().strftime("%Y-%m-%d %H:%M:%S")
targets = ", ".join(f"{target.type} {target.name}" for target in event.targets)
Expand Down
4 changes: 4 additions & 0 deletions src/dstack/_internal/server/routers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ async def list_events(
The results are paginated. To get the next page, pass `recorded_at` and `id` of
the last event from the previous page as `prev_recorded_at` and `prev_id`.
NOTE: Some events may become available in the API with a delay after their `recorded_at`.
This should be taken into account when using the API to monitor recent events,
so that delayed events are not missed during pagination.
"""
return CustomORJSONResponse(
await events_services.list_events(
Expand Down
Loading