diff --git a/src/dstack/_internal/core/models/configurations.py b/src/dstack/_internal/core/models/configurations.py index 3b2c7812b..13bd948ff 100644 --- a/src/dstack/_internal/core/models/configurations.py +++ b/src/dstack/_internal/core/models/configurations.py @@ -372,6 +372,16 @@ class ProbeConfig(generate_dual_core_model(ProbeConfigConfig)): ), ), ] = None + until_ready: Annotated[ + bool, + Field( + description=( + "If `true`, the probe will stop being executed as soon as it reaches the" + " `ready_after` threshold of successful executions." + " Defaults to `false`" + ), + ), + ] = False @validator("timeout", pre=True) def parse_timeout(cls, v: Optional[Union[int, str]]) -> Optional[int]: diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index 27c6f430c..1a9e127b0 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -247,6 +247,7 @@ class ProbeSpec(CoreModel): timeout: int interval: int ready_after: int + until_ready: bool = False class JobSpec(CoreModel): diff --git a/src/dstack/_internal/server/background/tasks/process_probes.py b/src/dstack/_internal/server/background/tasks/process_probes.py index bc1dc0943..4f712ff4c 100644 --- a/src/dstack/_internal/server/background/tasks/process_probes.py +++ b/src/dstack/_internal/server/background/tasks/process_probes.py @@ -73,12 +73,15 @@ async def process_probes(): else: job_spec: JobSpec = JobSpec.__response__.parse_raw(probe.job.job_spec_data) probe_spec = job_spec.probes[probe.probe_num] - # Schedule the next probe execution in case this execution is interrupted - probe.due = get_current_datetime() + _get_probe_async_processing_timeout( - probe_spec - ) - # Execute the probe asynchronously outside of the DB session - PROBES_SCHEDULER.add_job(partial(_process_probe_async, probe, probe_spec)) + if probe_spec.until_ready and probe.success_streak >= probe_spec.ready_after: + probe.active = False + else: + # Schedule the next probe execution in case this execution is interrupted + probe.due = get_current_datetime() + _get_probe_async_processing_timeout( + probe_spec + ) + # Execute the probe asynchronously outside of the DB session + PROBES_SCHEDULER.add_job(partial(_process_probe_async, probe, probe_spec)) await session.commit() finally: probe_lockset.difference_update(probe_ids) diff --git a/src/dstack/_internal/server/services/jobs/configurators/base.py b/src/dstack/_internal/server/services/jobs/configurators/base.py index df6738a77..4a7fd3c6f 100644 --- a/src/dstack/_internal/server/services/jobs/configurators/base.py +++ b/src/dstack/_internal/server/services/jobs/configurators/base.py @@ -444,6 +444,7 @@ def _probe_config_to_spec(c: ProbeConfig) -> ProbeSpec: method=c.method if c.method is not None else DEFAULT_PROBE_METHOD, headers=c.headers, body=c.body, + until_ready=c.until_ready, ) diff --git a/src/tests/_internal/server/background/tasks/test_process_probes.py b/src/tests/_internal/server/background/tasks/test_process_probes.py index ebc3c15b9..928709dd7 100644 --- a/src/tests/_internal/server/background/tasks/test_process_probes.py +++ b/src/tests/_internal/server/background/tasks/test_process_probes.py @@ -163,6 +163,67 @@ async def test_schedules_probe_execution(self, test_db, session: AsyncSession) - + PROCESSING_OVERHEAD_TIMEOUT ) + async def test_deactivates_probe_when_until_ready_and_ready_after_reached( + self, test_db, session: AsyncSession + ) -> None: + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo( + session=session, + project_id=project.id, + ) + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + run_spec=get_run_spec( + run_name="test", + repo_id=repo.name, + configuration=ServiceConfiguration( + port=80, + image="nginx", + probes=[ + ProbeConfig( + type="http", url="/until_ready", until_ready=True, ready_after=3 + ), + ProbeConfig(type="http", url="/regular", until_ready=False, ready_after=3), + ], + ), + ), + ) + instance = await create_instance( + session=session, + project=project, + status=InstanceStatus.BUSY, + ) + job = await create_job( + session=session, + run=run, + status=JobStatus.RUNNING, + job_provisioning_data=get_job_provisioning_data(), + instance=instance, + instance_assigned=True, + ) + + probe_until_ready = await create_probe(session, job, probe_num=0, success_streak=3) + probe_regular = await create_probe(session, job, probe_num=1, success_streak=3) + + with patch( + "dstack._internal.server.background.tasks.process_probes.PROBES_SCHEDULER" + ) as scheduler_mock: + await process_probes() + + await session.refresh(probe_until_ready) + await session.refresh(probe_regular) + + assert not probe_until_ready.active + assert probe_until_ready.success_streak == 3 + + assert probe_regular.active + assert probe_regular.success_streak == 3 + assert scheduler_mock.add_job.call_count == 1 # only the regular probe was scheduled + # TODO: test probe success and failure # (skipping for now - a bit difficult to test and most of the logic will be mocked)