Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/requeue-stalled-tasks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@pgflow/core': patch
'@pgflow/edge-worker': patch
---

Add automatic requeue for stalled tasks via cron job - tasks stuck beyond timeout+30s are requeued up to 3 times, then archived with status left as 'started' for easy identification (closes #586)
3 changes: 3 additions & 0 deletions pkgs/core/schemas/0060_tables_runtime.sql
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ create table pgflow.step_tasks (
completed_at timestamptz,
failed_at timestamptz,
last_worker_id uuid references pgflow.workers (worker_id) on delete set null,
-- Requeue tracking columns
requeued_count int not null default 0,
last_requeued_at timestamptz,
constraint step_tasks_pkey primary key (run_id, step_slug, task_index),
foreign key (run_id, step_slug)
references pgflow.step_states (run_id, step_slug),
Expand Down
82 changes: 82 additions & 0 deletions pkgs/core/schemas/0062_function_requeue_stalled_tasks.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
-- Requeue stalled tasks that have been in 'started' status longer than their timeout + 30s buffer
-- This handles tasks that got stuck when workers crashed without completing them
create or replace function pgflow.requeue_stalled_tasks()
returns int
language plpgsql
security definer
set search_path = ''
as $$
declare
result_count int := 0;
max_requeues constant int := 3;
begin
-- Find and requeue stalled tasks (where started_at > timeout + 30s buffer)
-- Tasks with requeued_count >= max_requeues will have their message archived
-- but status left as 'started' for easy identification via requeued_count column
with stalled_tasks as (
select
st.run_id,
st.step_slug,
st.task_index,
st.message_id,
r.flow_slug,
st.requeued_count,
f.opt_timeout
from pgflow.step_tasks st
join pgflow.runs r on r.run_id = st.run_id
join pgflow.flows f on f.flow_slug = r.flow_slug
where st.status = 'started'
and st.started_at < now() - (f.opt_timeout * interval '1 second') - interval '30 seconds'
for update of st skip locked
),
-- Separate tasks that can be requeued from those that exceeded max requeues
to_requeue as (
select * from stalled_tasks where requeued_count < max_requeues
),
to_archive as (
select * from stalled_tasks where requeued_count >= max_requeues
),
-- Update tasks that will be requeued
requeued as (
update pgflow.step_tasks st
set
status = 'queued',
started_at = null,
last_worker_id = null,
requeued_count = st.requeued_count + 1,
last_requeued_at = now()
from to_requeue tr
where st.run_id = tr.run_id
and st.step_slug = tr.step_slug
and st.task_index = tr.task_index
returning tr.flow_slug as queue_name, tr.message_id
),
-- Make requeued messages visible immediately (batched per queue)
visibility_reset as (
select pgflow.set_vt_batch(
r.queue_name,
array_agg(r.message_id),
array_agg(0) -- all offsets are 0 (immediate visibility)
)
from requeued r
where r.message_id is not null
group by r.queue_name
),
-- Archive messages for tasks that exceeded max requeues (batched per queue)
-- Task status remains 'started' with requeued_count >= 3 for easy identification
archived as (
select pgmq.archive(ta.flow_slug, array_agg(ta.message_id))
from to_archive ta
where ta.message_id is not null
group by ta.flow_slug
),
-- Force execution of visibility_reset CTE
_vr as (select count(*) from visibility_reset),
-- Force execution of archived CTE
_ar as (select count(*) from archived)
select count(*) into result_count
from requeued, _vr, _ar;

return result_count;
end;
$$;
Comment on lines +16 to +82
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical Bug: Infinite reprocessing of archived tasks

Tasks that exceed the max requeue limit (3) will be reprocessed every cron run (every 15 seconds) indefinitely. Here's why:

  1. When requeued_count >= 3, tasks go into to_archive CTE
  2. Their messages are archived in the queue
  3. BUT the task record itself is never updated - it remains with status = 'started' and requeued_count >= 3
  4. On the next cron run, the same task matches the stalled_tasks condition again
  5. Steps 1-4 repeat forever, causing:
    • Unnecessary CPU load every 15 seconds
    • Repeated archive operations on already-archived messages (will likely error)
    • Database lock contention

Fix: Add an UPDATE statement for to_archive tasks to change their status:

-- After the 'requeued' CTE, add:
archived_updated as (
  update pgflow.step_tasks st
  set status = 'failed'  -- or 'archived'
  from to_archive ta
  where st.run_id = ta.run_id
    and st.step_slug = ta.step_slug
    and st.task_index = ta.task_index
  returning 1
),

Or alternatively, exclude already-maxed-out tasks from the initial query:

where st.status = 'started'
  and st.requeued_count < max_requeues  -- Add this condition
  and st.started_at < ...
Suggested change
with stalled_tasks as (
select
st.run_id,
st.step_slug,
st.task_index,
st.message_id,
r.flow_slug,
st.requeued_count,
f.opt_timeout
from pgflow.step_tasks st
join pgflow.runs r on r.run_id = st.run_id
join pgflow.flows f on f.flow_slug = r.flow_slug
where st.status = 'started'
and st.started_at < now() - (f.opt_timeout * interval '1 second') - interval '30 seconds'
for update of st skip locked
),
-- Separate tasks that can be requeued from those that exceeded max requeues
to_requeue as (
select * from stalled_tasks where requeued_count < max_requeues
),
to_archive as (
select * from stalled_tasks where requeued_count >= max_requeues
),
-- Update tasks that will be requeued
requeued as (
update pgflow.step_tasks st
set
status = 'queued',
started_at = null,
last_worker_id = null,
requeued_count = st.requeued_count + 1,
last_requeued_at = now()
from to_requeue tr
where st.run_id = tr.run_id
and st.step_slug = tr.step_slug
and st.task_index = tr.task_index
returning tr.flow_slug as queue_name, tr.message_id
),
-- Make requeued messages visible immediately (batched per queue)
visibility_reset as (
select pgflow.set_vt_batch(
r.queue_name,
array_agg(r.message_id),
array_agg(0) -- all offsets are 0 (immediate visibility)
)
from requeued r
where r.message_id is not null
group by r.queue_name
),
-- Archive messages for tasks that exceeded max requeues (batched per queue)
-- Task status remains 'started' with requeued_count >= 3 for easy identification
archived as (
select pgmq.archive(ta.flow_slug, array_agg(ta.message_id))
from to_archive ta
where ta.message_id is not null
group by ta.flow_slug
),
-- Force execution of visibility_reset CTE
_vr as (select count(*) from visibility_reset),
-- Force execution of archived CTE
_ar as (select count(*) from archived)
select count(*) into result_count
from requeued, _vr, _ar;
return result_count;
end;
$$;
with stalled_tasks as (
select
st.run_id,
st.step_slug,
st.task_index,
st.message_id,
r.flow_slug,
st.requeued_count,
f.opt_timeout
from pgflow.step_tasks st
join pgflow.runs r on r.run_id = st.run_id
join pgflow.flows f on f.flow_slug = r.flow_slug
where st.status = 'started'
and st.started_at < now() - (f.opt_timeout * interval '1 second') - interval '30 seconds'
for update of st skip locked
),
-- Separate tasks that can be requeued from those that exceeded max requeues
to_requeue as (
select * from stalled_tasks where requeued_count < max_requeues
),
to_archive as (
select * from stalled_tasks where requeued_count >= max_requeues
),
-- Update tasks that will be requeued
requeued as (
update pgflow.step_tasks st
set
status = 'queued',
started_at = null,
last_worker_id = null,
requeued_count = st.requeued_count + 1,
last_requeued_at = now()
from to_requeue tr
where st.run_id = tr.run_id
and st.step_slug = tr.step_slug
and st.task_index = tr.task_index
returning tr.flow_slug as queue_name, tr.message_id
),
-- Update tasks that exceeded max requeues to prevent infinite reprocessing
archived_updated as (
update pgflow.step_tasks st
set status = 'failed'
from to_archive ta
where st.run_id = ta.run_id
and st.step_slug = ta.step_slug
and st.task_index = ta.task_index
returning 1
),
-- Make requeued messages visible immediately (batched per queue)
visibility_reset as (
select pgflow.set_vt_batch(
r.queue_name,
array_agg(r.message_id),
array_agg(0) -- all offsets are 0 (immediate visibility)
)
from requeued r
where r.message_id is not null
group by r.queue_name
),
-- Archive messages for tasks that exceeded max requeues (batched per queue)
-- Task status is now updated to 'failed' to prevent reprocessing
archived as (
select pgmq.archive(ta.flow_slug, array_agg(ta.message_id))
from to_archive ta
where ta.message_id is not null
group by ta.flow_slug
),
-- Force execution of visibility_reset CTE
_vr as (select count(*) from visibility_reset),
-- Force execution of archived CTE
_ar as (select count(*) from archived),
-- Force execution of archived_updated CTE
_au as (select count(*) from archived_updated)
select count(*) into result_count
from requeued, _vr, _ar, _au;
return result_count;
end;
$$;

Spotted by Graphite Agent

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
-- Cron setup function for automatic requeue monitoring
create or replace function pgflow.setup_requeue_stalled_tasks_cron(
cron_interval text default '15 seconds'
)
returns text
language plpgsql
security definer
set search_path = pgflow, cron, pg_temp
as $$
declare
job_id bigint;
begin
-- Remove existing job if any
begin
perform cron.unschedule('pgflow_requeue_stalled_tasks');
exception
when others then null;
end;

-- Schedule the new job
job_id := cron.schedule(
job_name => 'pgflow_requeue_stalled_tasks',
schedule => setup_requeue_stalled_tasks_cron.cron_interval,
command => 'select pgflow.requeue_stalled_tasks()'
);

return format('Scheduled pgflow_requeue_stalled_tasks (every %s, job_id=%s)',
setup_requeue_stalled_tasks_cron.cron_interval, job_id);
end;
$$;

comment on function pgflow.setup_requeue_stalled_tasks_cron(text) is
'Sets up cron job to automatically requeue stalled tasks.
Schedules pgflow_requeue_stalled_tasks at the specified cron_interval (default: 15 seconds).
Replaces existing job if it exists (idempotent).
Returns a confirmation message with job ID.';

-- Automatically set up the cron job when migration runs
select pgflow.setup_requeue_stalled_tasks_cron();
15 changes: 15 additions & 0 deletions pkgs/core/src/database-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,12 @@ export type Database = {
error_message: string | null
failed_at: string | null
flow_slug: string
last_requeued_at: string | null
last_worker_id: string | null
message_id: number | null
output: Json | null
queued_at: string
requeued_count: number
run_id: string
started_at: string | null
status: string
Expand All @@ -213,10 +215,12 @@ export type Database = {
error_message?: string | null
failed_at?: string | null
flow_slug: string
last_requeued_at?: string | null
last_worker_id?: string | null
message_id?: number | null
output?: Json | null
queued_at?: string
requeued_count?: number
run_id: string
started_at?: string | null
status?: string
Expand All @@ -229,10 +233,12 @@ export type Database = {
error_message?: string | null
failed_at?: string | null
flow_slug?: string
last_requeued_at?: string | null
last_worker_id?: string | null
message_id?: number | null
output?: Json | null
queued_at?: string
requeued_count?: number
run_id?: string
started_at?: string | null
status?: string
Expand Down Expand Up @@ -445,10 +451,12 @@ export type Database = {
error_message: string | null
failed_at: string | null
flow_slug: string
last_requeued_at: string | null
last_worker_id: string | null
message_id: number | null
output: Json | null
queued_at: string
requeued_count: number
run_id: string
started_at: string | null
status: string
Expand Down Expand Up @@ -512,10 +520,12 @@ export type Database = {
error_message: string | null
failed_at: string | null
flow_slug: string
last_requeued_at: string | null
last_worker_id: string | null
message_id: number | null
output: Json | null
queued_at: string
requeued_count: number
run_id: string
started_at: string | null
status: string
Expand Down Expand Up @@ -550,6 +560,7 @@ export type Database = {
isSetofReturn: true
}
}
requeue_stalled_tasks: { Args: never; Returns: number }
set_vt_batch: {
Args: { msg_ids: number[]; queue_name: string; vt_offsets: number[] }
Returns: {
Expand All @@ -565,6 +576,10 @@ export type Database = {
Args: { cron_interval?: string }
Returns: string
}
setup_requeue_stalled_tasks_cron: {
Args: { cron_interval?: string }
Returns: string
}
start_flow: {
Args: { flow_slug: string; input: Json; run_id?: string }
Returns: {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
-- Modify "step_tasks" table
ALTER TABLE "pgflow"."step_tasks" ADD COLUMN "requeued_count" integer NOT NULL DEFAULT 0, ADD COLUMN "last_requeued_at" timestamptz NULL;
-- Create "requeue_stalled_tasks" function
CREATE FUNCTION "pgflow"."requeue_stalled_tasks" () RETURNS integer LANGUAGE plpgsql SECURITY DEFINER SET "search_path" = '' AS $$
declare
result_count int := 0;
max_requeues constant int := 3;
begin
-- Find and requeue stalled tasks (where started_at > timeout + 30s buffer)
-- Tasks with requeued_count >= max_requeues will have their message archived
-- but status left as 'started' for easy identification via requeued_count column
with stalled_tasks as (
select
st.run_id,
st.step_slug,
st.task_index,
st.message_id,
r.flow_slug,
st.requeued_count,
f.opt_timeout
from pgflow.step_tasks st
join pgflow.runs r on r.run_id = st.run_id
join pgflow.flows f on f.flow_slug = r.flow_slug
where st.status = 'started'
and st.started_at < now() - (f.opt_timeout * interval '1 second') - interval '30 seconds'
for update of st skip locked
),
-- Separate tasks that can be requeued from those that exceeded max requeues
to_requeue as (
select * from stalled_tasks where requeued_count < max_requeues
),
to_archive as (
select * from stalled_tasks where requeued_count >= max_requeues
),
-- Update tasks that will be requeued
requeued as (
update pgflow.step_tasks st
set
status = 'queued',
started_at = null,
last_worker_id = null,
requeued_count = st.requeued_count + 1,
last_requeued_at = now()
from to_requeue tr
where st.run_id = tr.run_id
and st.step_slug = tr.step_slug
and st.task_index = tr.task_index
returning tr.flow_slug as queue_name, tr.message_id
),
-- Make requeued messages visible immediately (batched per queue)
visibility_reset as (
select pgflow.set_vt_batch(
r.queue_name,
array_agg(r.message_id),
array_agg(0) -- all offsets are 0 (immediate visibility)
)
from requeued r
where r.message_id is not null
group by r.queue_name
),
-- Archive messages for tasks that exceeded max requeues (batched per queue)
-- Task status remains 'started' with requeued_count >= 3 for easy identification
archived as (
select pgmq.archive(ta.flow_slug, array_agg(ta.message_id))
from to_archive ta
where ta.message_id is not null
group by ta.flow_slug
),
-- Force execution of visibility_reset CTE
_vr as (select count(*) from visibility_reset),
-- Force execution of archived CTE
_ar as (select count(*) from archived)
select count(*) into result_count
from requeued, _vr, _ar;

return result_count;
end;
$$;
-- Create "setup_requeue_stalled_tasks_cron" function
CREATE FUNCTION "pgflow"."setup_requeue_stalled_tasks_cron" ("cron_interval" text DEFAULT '15 seconds') RETURNS text LANGUAGE plpgsql SECURITY DEFINER SET "search_path" = pgflow, cron, pg_temp AS $$
declare
job_id bigint;
begin
-- Remove existing job if any
begin
perform cron.unschedule('pgflow_requeue_stalled_tasks');
exception
when others then null;
end;

-- Schedule the new job
job_id := cron.schedule(
job_name => 'pgflow_requeue_stalled_tasks',
schedule => setup_requeue_stalled_tasks_cron.cron_interval,
command => 'select pgflow.requeue_stalled_tasks()'
);

return format('Scheduled pgflow_requeue_stalled_tasks (every %s, job_id=%s)',
setup_requeue_stalled_tasks_cron.cron_interval, job_id);
end;
$$;
-- Set comment to function: "setup_requeue_stalled_tasks_cron"
COMMENT ON FUNCTION "pgflow"."setup_requeue_stalled_tasks_cron" IS 'Sets up cron job to automatically requeue stalled tasks.
Schedules pgflow_requeue_stalled_tasks at the specified cron_interval (default: 15 seconds).
Replaces existing job if it exists (idempotent).
Returns a confirmation message with job ID.';
-- Automatically set up the cron job
SELECT pgflow.setup_requeue_stalled_tasks_cron();
3 changes: 2 additions & 1 deletion pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:sIw3ylBXnDTOY5woU5hCoL+eT87Nb0XyctIIQl3Aq2g=
h1:39IWycuue/cvKBP6pWwEZGznxI1MWFwI4W32BsqMOYY=
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
Expand All @@ -16,3 +16,4 @@ h1:sIw3ylBXnDTOY5woU5hCoL+eT87Nb0XyctIIQl3Aq2g=
20251212100113_pgflow_allow_data_loss_parameter.sql h1:Fg3RHj51STNHS4epQ2J4AFMj7NwG0XfyDTSA/9dcBIQ=
20251225163110_pgflow_add_flow_input_column.sql h1:734uCbTgKmPhTK3TY56uNYZ31T8u59yll9ea7nwtEoc=
20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o=
20260119074418_pgflow_requeue_stalled_tasks.sql h1:A+xCwjYmMTdmlHsqlnY++R8GRWXML0/QOe/OZMhOZag=
Loading