Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/dstack/_internal/core/models/configurations.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,16 @@ class ProbeConfig(generate_dual_core_model(ProbeConfigConfig)):
),
),
] = None
until_ready: Annotated[
bool,
Field(
description=(
"If `true`, the probe will stop being executed as soon as it reaches the"
" `ready_after` threshold of successful executions."
" Defaults to `false`"
),
),
] = False

@validator("timeout", pre=True)
def parse_timeout(cls, v: Optional[Union[int, str]]) -> Optional[int]:
Expand Down
1 change: 1 addition & 0 deletions src/dstack/_internal/core/models/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ class ProbeSpec(CoreModel):
timeout: int
interval: int
ready_after: int
until_ready: bool = False


class JobSpec(CoreModel):
Expand Down
15 changes: 9 additions & 6 deletions src/dstack/_internal/server/background/tasks/process_probes.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,15 @@ async def process_probes():
else:
job_spec: JobSpec = JobSpec.__response__.parse_raw(probe.job.job_spec_data)
probe_spec = job_spec.probes[probe.probe_num]
# Schedule the next probe execution in case this execution is interrupted
probe.due = get_current_datetime() + _get_probe_async_processing_timeout(
probe_spec
)
# Execute the probe asynchronously outside of the DB session
PROBES_SCHEDULER.add_job(partial(_process_probe_async, probe, probe_spec))
if probe_spec.until_ready and probe.success_streak >= probe_spec.ready_after:
probe.active = False
else:
# Schedule the next probe execution in case this execution is interrupted
probe.due = get_current_datetime() + _get_probe_async_processing_timeout(
probe_spec
)
# Execute the probe asynchronously outside of the DB session
PROBES_SCHEDULER.add_job(partial(_process_probe_async, probe, probe_spec))
await session.commit()
finally:
probe_lockset.difference_update(probe_ids)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ def _probe_config_to_spec(c: ProbeConfig) -> ProbeSpec:
method=c.method if c.method is not None else DEFAULT_PROBE_METHOD,
headers=c.headers,
body=c.body,
until_ready=c.until_ready,
)


Expand Down
61 changes: 61 additions & 0 deletions src/tests/_internal/server/background/tasks/test_process_probes.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,67 @@ async def test_schedules_probe_execution(self, test_db, session: AsyncSession) -
+ PROCESSING_OVERHEAD_TIMEOUT
)

async def test_deactivates_probe_when_until_ready_and_ready_after_reached(
self, test_db, session: AsyncSession
) -> None:
project = await create_project(session=session)
user = await create_user(session=session)
repo = await create_repo(
session=session,
project_id=project.id,
)
run = await create_run(
session=session,
project=project,
repo=repo,
user=user,
run_spec=get_run_spec(
run_name="test",
repo_id=repo.name,
configuration=ServiceConfiguration(
port=80,
image="nginx",
probes=[
ProbeConfig(
type="http", url="/until_ready", until_ready=True, ready_after=3
),
ProbeConfig(type="http", url="/regular", until_ready=False, ready_after=3),
],
),
),
)
instance = await create_instance(
session=session,
project=project,
status=InstanceStatus.BUSY,
)
job = await create_job(
session=session,
run=run,
status=JobStatus.RUNNING,
job_provisioning_data=get_job_provisioning_data(),
instance=instance,
instance_assigned=True,
)

probe_until_ready = await create_probe(session, job, probe_num=0, success_streak=3)
probe_regular = await create_probe(session, job, probe_num=1, success_streak=3)

with patch(
"dstack._internal.server.background.tasks.process_probes.PROBES_SCHEDULER"
) as scheduler_mock:
await process_probes()

await session.refresh(probe_until_ready)
await session.refresh(probe_regular)

assert not probe_until_ready.active
assert probe_until_ready.success_streak == 3

assert probe_regular.active
assert probe_regular.success_streak == 3
assert scheduler_mock.add_job.call_count == 1 # only the regular probe was scheduled


# TODO: test probe success and failure
# (skipping for now - a bit difficult to test and most of the logic will be mocked)