-
Notifications
You must be signed in to change notification settings - Fork 15
feat: add automatic requeue for stalled tasks via cron job #591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
jumski
wants to merge
1
commit into
main
Choose a base branch
from
01-12-pgf-aav_implement_requeue_for_stalled_tasks
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| --- | ||
| '@pgflow/core': patch | ||
| '@pgflow/edge-worker': patch | ||
| --- | ||
|
|
||
| Add automatic requeue for stalled tasks via cron job - tasks stuck beyond timeout+30s are requeued up to 3 times, then archived with status left as 'started' for easy identification (closes #586) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| -- Requeue stalled tasks that have been in 'started' status longer than their timeout + 30s buffer | ||
| -- This handles tasks that got stuck when workers crashed without completing them | ||
| create or replace function pgflow.requeue_stalled_tasks() | ||
| returns int | ||
| language plpgsql | ||
| security definer | ||
| set search_path = '' | ||
| as $$ | ||
| declare | ||
| result_count int := 0; | ||
| max_requeues constant int := 3; | ||
| begin | ||
| -- Find and requeue stalled tasks (where started_at > timeout + 30s buffer) | ||
| -- Tasks with requeued_count >= max_requeues will have their message archived | ||
| -- but status left as 'started' for easy identification via requeued_count column | ||
| with stalled_tasks as ( | ||
| select | ||
| st.run_id, | ||
| st.step_slug, | ||
| st.task_index, | ||
| st.message_id, | ||
| r.flow_slug, | ||
| st.requeued_count, | ||
| f.opt_timeout | ||
| from pgflow.step_tasks st | ||
| join pgflow.runs r on r.run_id = st.run_id | ||
| join pgflow.flows f on f.flow_slug = r.flow_slug | ||
| where st.status = 'started' | ||
| and st.started_at < now() - (f.opt_timeout * interval '1 second') - interval '30 seconds' | ||
| for update of st skip locked | ||
| ), | ||
| -- Separate tasks that can be requeued from those that exceeded max requeues | ||
| to_requeue as ( | ||
| select * from stalled_tasks where requeued_count < max_requeues | ||
| ), | ||
| to_archive as ( | ||
| select * from stalled_tasks where requeued_count >= max_requeues | ||
| ), | ||
| -- Update tasks that will be requeued | ||
| requeued as ( | ||
| update pgflow.step_tasks st | ||
| set | ||
| status = 'queued', | ||
| started_at = null, | ||
| last_worker_id = null, | ||
| requeued_count = st.requeued_count + 1, | ||
| last_requeued_at = now() | ||
| from to_requeue tr | ||
| where st.run_id = tr.run_id | ||
| and st.step_slug = tr.step_slug | ||
| and st.task_index = tr.task_index | ||
| returning tr.flow_slug as queue_name, tr.message_id | ||
| ), | ||
| -- Make requeued messages visible immediately (batched per queue) | ||
| visibility_reset as ( | ||
| select pgflow.set_vt_batch( | ||
| r.queue_name, | ||
| array_agg(r.message_id), | ||
| array_agg(0) -- all offsets are 0 (immediate visibility) | ||
| ) | ||
| from requeued r | ||
| where r.message_id is not null | ||
| group by r.queue_name | ||
| ), | ||
| -- Archive messages for tasks that exceeded max requeues (batched per queue) | ||
| -- Task status remains 'started' with requeued_count >= 3 for easy identification | ||
| archived as ( | ||
| select pgmq.archive(ta.flow_slug, array_agg(ta.message_id)) | ||
| from to_archive ta | ||
| where ta.message_id is not null | ||
| group by ta.flow_slug | ||
| ), | ||
| -- Force execution of visibility_reset CTE | ||
| _vr as (select count(*) from visibility_reset), | ||
| -- Force execution of archived CTE | ||
| _ar as (select count(*) from archived) | ||
| select count(*) into result_count | ||
| from requeued, _vr, _ar; | ||
|
|
||
| return result_count; | ||
| end; | ||
| $$; | ||
39 changes: 39 additions & 0 deletions
39
pkgs/core/schemas/0063_function_setup_requeue_stalled_tasks_cron.sql
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| -- Cron setup function for automatic requeue monitoring | ||
| create or replace function pgflow.setup_requeue_stalled_tasks_cron( | ||
| cron_interval text default '15 seconds' | ||
| ) | ||
| returns text | ||
| language plpgsql | ||
| security definer | ||
| set search_path = pgflow, cron, pg_temp | ||
| as $$ | ||
| declare | ||
| job_id bigint; | ||
| begin | ||
| -- Remove existing job if any | ||
| begin | ||
| perform cron.unschedule('pgflow_requeue_stalled_tasks'); | ||
| exception | ||
| when others then null; | ||
| end; | ||
|
|
||
| -- Schedule the new job | ||
| job_id := cron.schedule( | ||
| job_name => 'pgflow_requeue_stalled_tasks', | ||
| schedule => setup_requeue_stalled_tasks_cron.cron_interval, | ||
| command => 'select pgflow.requeue_stalled_tasks()' | ||
| ); | ||
|
|
||
| return format('Scheduled pgflow_requeue_stalled_tasks (every %s, job_id=%s)', | ||
| setup_requeue_stalled_tasks_cron.cron_interval, job_id); | ||
| end; | ||
| $$; | ||
|
|
||
| comment on function pgflow.setup_requeue_stalled_tasks_cron(text) is | ||
| 'Sets up cron job to automatically requeue stalled tasks. | ||
| Schedules pgflow_requeue_stalled_tasks at the specified cron_interval (default: 15 seconds). | ||
| Replaces existing job if it exists (idempotent). | ||
| Returns a confirmation message with job ID.'; | ||
|
|
||
| -- Automatically set up the cron job when migration runs | ||
| select pgflow.setup_requeue_stalled_tasks_cron(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
108 changes: 108 additions & 0 deletions
108
pkgs/core/supabase/migrations/20260119074418_pgflow_requeue_stalled_tasks.sql
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,108 @@ | ||
| -- Modify "step_tasks" table | ||
| ALTER TABLE "pgflow"."step_tasks" ADD COLUMN "requeued_count" integer NOT NULL DEFAULT 0, ADD COLUMN "last_requeued_at" timestamptz NULL; | ||
| -- Create "requeue_stalled_tasks" function | ||
| CREATE FUNCTION "pgflow"."requeue_stalled_tasks" () RETURNS integer LANGUAGE plpgsql SECURITY DEFINER SET "search_path" = '' AS $$ | ||
| declare | ||
| result_count int := 0; | ||
| max_requeues constant int := 3; | ||
| begin | ||
| -- Find and requeue stalled tasks (where started_at > timeout + 30s buffer) | ||
| -- Tasks with requeued_count >= max_requeues will have their message archived | ||
| -- but status left as 'started' for easy identification via requeued_count column | ||
| with stalled_tasks as ( | ||
| select | ||
| st.run_id, | ||
| st.step_slug, | ||
| st.task_index, | ||
| st.message_id, | ||
| r.flow_slug, | ||
| st.requeued_count, | ||
| f.opt_timeout | ||
| from pgflow.step_tasks st | ||
| join pgflow.runs r on r.run_id = st.run_id | ||
| join pgflow.flows f on f.flow_slug = r.flow_slug | ||
| where st.status = 'started' | ||
| and st.started_at < now() - (f.opt_timeout * interval '1 second') - interval '30 seconds' | ||
| for update of st skip locked | ||
| ), | ||
| -- Separate tasks that can be requeued from those that exceeded max requeues | ||
| to_requeue as ( | ||
| select * from stalled_tasks where requeued_count < max_requeues | ||
| ), | ||
| to_archive as ( | ||
| select * from stalled_tasks where requeued_count >= max_requeues | ||
| ), | ||
| -- Update tasks that will be requeued | ||
| requeued as ( | ||
| update pgflow.step_tasks st | ||
| set | ||
| status = 'queued', | ||
| started_at = null, | ||
| last_worker_id = null, | ||
| requeued_count = st.requeued_count + 1, | ||
| last_requeued_at = now() | ||
| from to_requeue tr | ||
| where st.run_id = tr.run_id | ||
| and st.step_slug = tr.step_slug | ||
| and st.task_index = tr.task_index | ||
| returning tr.flow_slug as queue_name, tr.message_id | ||
| ), | ||
| -- Make requeued messages visible immediately (batched per queue) | ||
| visibility_reset as ( | ||
| select pgflow.set_vt_batch( | ||
| r.queue_name, | ||
| array_agg(r.message_id), | ||
| array_agg(0) -- all offsets are 0 (immediate visibility) | ||
| ) | ||
| from requeued r | ||
| where r.message_id is not null | ||
| group by r.queue_name | ||
| ), | ||
| -- Archive messages for tasks that exceeded max requeues (batched per queue) | ||
| -- Task status remains 'started' with requeued_count >= 3 for easy identification | ||
| archived as ( | ||
| select pgmq.archive(ta.flow_slug, array_agg(ta.message_id)) | ||
| from to_archive ta | ||
| where ta.message_id is not null | ||
| group by ta.flow_slug | ||
| ), | ||
| -- Force execution of visibility_reset CTE | ||
| _vr as (select count(*) from visibility_reset), | ||
| -- Force execution of archived CTE | ||
| _ar as (select count(*) from archived) | ||
| select count(*) into result_count | ||
| from requeued, _vr, _ar; | ||
|
|
||
| return result_count; | ||
| end; | ||
| $$; | ||
| -- Create "setup_requeue_stalled_tasks_cron" function | ||
| CREATE FUNCTION "pgflow"."setup_requeue_stalled_tasks_cron" ("cron_interval" text DEFAULT '15 seconds') RETURNS text LANGUAGE plpgsql SECURITY DEFINER SET "search_path" = pgflow, cron, pg_temp AS $$ | ||
| declare | ||
| job_id bigint; | ||
| begin | ||
| -- Remove existing job if any | ||
| begin | ||
| perform cron.unschedule('pgflow_requeue_stalled_tasks'); | ||
| exception | ||
| when others then null; | ||
| end; | ||
|
|
||
| -- Schedule the new job | ||
| job_id := cron.schedule( | ||
| job_name => 'pgflow_requeue_stalled_tasks', | ||
| schedule => setup_requeue_stalled_tasks_cron.cron_interval, | ||
| command => 'select pgflow.requeue_stalled_tasks()' | ||
| ); | ||
|
|
||
| return format('Scheduled pgflow_requeue_stalled_tasks (every %s, job_id=%s)', | ||
| setup_requeue_stalled_tasks_cron.cron_interval, job_id); | ||
| end; | ||
| $$; | ||
| -- Set comment to function: "setup_requeue_stalled_tasks_cron" | ||
| COMMENT ON FUNCTION "pgflow"."setup_requeue_stalled_tasks_cron" IS 'Sets up cron job to automatically requeue stalled tasks. | ||
| Schedules pgflow_requeue_stalled_tasks at the specified cron_interval (default: 15 seconds). | ||
| Replaces existing job if it exists (idempotent). | ||
| Returns a confirmation message with job ID.'; | ||
| -- Automatically set up the cron job | ||
| SELECT pgflow.setup_requeue_stalled_tasks_cron(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical Bug: Infinite reprocessing of archived tasks
Tasks that exceed the max requeue limit (3) will be reprocessed every cron run (every 15 seconds) indefinitely. Here's why:
requeued_count >= 3, tasks go intoto_archiveCTEstatus = 'started'andrequeued_count >= 3stalled_taskscondition againFix: Add an UPDATE statement for
to_archivetasks to change their status:Or alternatively, exclude already-maxed-out tasks from the initial query:
Spotted by Graphite Agent

Is this helpful? React 👍 or 👎 to let us know.