Skip to content

Conversation

@jumski
Copy link
Contributor

@jumski jumski commented Jan 12, 2026

Add automatic requeue for stalled tasks via cron job

This PR implements a system to automatically detect and requeue tasks that have stalled due to worker crashes or other issues. Key features:

  • Added a requeue_stalled_tasks() function that identifies tasks stuck in 'started' status beyond their timeout window
  • Tasks can be requeued up to 3 times before being marked as failed
  • Added tracking columns to step_tasks table: requeued_count and last_requeued_at
  • Implemented a configurable cron job via setup_requeue_stalled_tasks_cron() that runs every 15 seconds by default
  • Added comprehensive test suite covering basic requeuing, max requeue limits, and multi-flow scenarios
  • Increased default visibility timeout in edge-worker from 2 to 5 seconds for better reliability

This enhancement improves system resilience by ensuring tasks don't remain stuck when workers crash unexpectedly, addressing issue #586.

@changeset-bot
Copy link

changeset-bot bot commented Jan 12, 2026

🦋 Changeset detected

Latest commit: 8f162cb

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 5 packages
Name Type
@pgflow/core Patch
@pgflow/edge-worker Patch
pgflow Patch
@pgflow/client Patch
@pgflow/dsl Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

Copy link
Contributor Author

jumski commented Jan 12, 2026

This stack of pull requests is managed by Graphite. Learn more about stacking.

@nx-cloud
Copy link

nx-cloud bot commented Jan 12, 2026

View your CI Pipeline Execution ↗ for commit 8f162cb

Command Status Duration Result
nx run edge-worker:test:integration ✅ Succeeded 5m 7s View ↗
nx run client:e2e ✅ Succeeded 2m 53s View ↗
nx run core:pgtap ✅ Succeeded 1m 47s View ↗
nx affected -t verify-exports --base=origin/mai... ✅ Succeeded 3s View ↗
nx affected -t build --configuration=production... ✅ Succeeded 3s View ↗
nx affected -t lint typecheck test --parallel -... ✅ Succeeded 1m 42s View ↗
nx run cli:e2e ✅ Succeeded 4s View ↗
nx run edge-worker:e2e ✅ Succeeded 49s View ↗

☁️ Nx Cloud last updated this comment at 2026-01-19 07:55:19 UTC

@jumski jumski force-pushed the 01-12-pgf-aav_implement_requeue_for_stalled_tasks branch 4 times, most recently from 367abd2 to 3653295 Compare January 19, 2026 07:25
Comment on lines +16 to +82
with stalled_tasks as (
select
st.run_id,
st.step_slug,
st.task_index,
st.message_id,
r.flow_slug,
st.requeued_count,
f.opt_timeout
from pgflow.step_tasks st
join pgflow.runs r on r.run_id = st.run_id
join pgflow.flows f on f.flow_slug = r.flow_slug
where st.status = 'started'
and st.started_at < now() - (f.opt_timeout * interval '1 second') - interval '30 seconds'
for update of st skip locked
),
-- Separate tasks that can be requeued from those that exceeded max requeues
to_requeue as (
select * from stalled_tasks where requeued_count < max_requeues
),
to_archive as (
select * from stalled_tasks where requeued_count >= max_requeues
),
-- Update tasks that will be requeued
requeued as (
update pgflow.step_tasks st
set
status = 'queued',
started_at = null,
last_worker_id = null,
requeued_count = st.requeued_count + 1,
last_requeued_at = now()
from to_requeue tr
where st.run_id = tr.run_id
and st.step_slug = tr.step_slug
and st.task_index = tr.task_index
returning tr.flow_slug as queue_name, tr.message_id
),
-- Make requeued messages visible immediately (batched per queue)
visibility_reset as (
select pgflow.set_vt_batch(
r.queue_name,
array_agg(r.message_id),
array_agg(0) -- all offsets are 0 (immediate visibility)
)
from requeued r
where r.message_id is not null
group by r.queue_name
),
-- Archive messages for tasks that exceeded max requeues (batched per queue)
-- Task status remains 'started' with requeued_count >= 3 for easy identification
archived as (
select pgmq.archive(ta.flow_slug, array_agg(ta.message_id))
from to_archive ta
where ta.message_id is not null
group by ta.flow_slug
),
-- Force execution of visibility_reset CTE
_vr as (select count(*) from visibility_reset),
-- Force execution of archived CTE
_ar as (select count(*) from archived)
select count(*) into result_count
from requeued, _vr, _ar;

return result_count;
end;
$$;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical Bug: Infinite reprocessing of archived tasks

Tasks that exceed the max requeue limit (3) will be reprocessed every cron run (every 15 seconds) indefinitely. Here's why:

  1. When requeued_count >= 3, tasks go into to_archive CTE
  2. Their messages are archived in the queue
  3. BUT the task record itself is never updated - it remains with status = 'started' and requeued_count >= 3
  4. On the next cron run, the same task matches the stalled_tasks condition again
  5. Steps 1-4 repeat forever, causing:
    • Unnecessary CPU load every 15 seconds
    • Repeated archive operations on already-archived messages (will likely error)
    • Database lock contention

Fix: Add an UPDATE statement for to_archive tasks to change their status:

-- After the 'requeued' CTE, add:
archived_updated as (
  update pgflow.step_tasks st
  set status = 'failed'  -- or 'archived'
  from to_archive ta
  where st.run_id = ta.run_id
    and st.step_slug = ta.step_slug
    and st.task_index = ta.task_index
  returning 1
),

Or alternatively, exclude already-maxed-out tasks from the initial query:

where st.status = 'started'
  and st.requeued_count < max_requeues  -- Add this condition
  and st.started_at < ...
Suggested change
with stalled_tasks as (
select
st.run_id,
st.step_slug,
st.task_index,
st.message_id,
r.flow_slug,
st.requeued_count,
f.opt_timeout
from pgflow.step_tasks st
join pgflow.runs r on r.run_id = st.run_id
join pgflow.flows f on f.flow_slug = r.flow_slug
where st.status = 'started'
and st.started_at < now() - (f.opt_timeout * interval '1 second') - interval '30 seconds'
for update of st skip locked
),
-- Separate tasks that can be requeued from those that exceeded max requeues
to_requeue as (
select * from stalled_tasks where requeued_count < max_requeues
),
to_archive as (
select * from stalled_tasks where requeued_count >= max_requeues
),
-- Update tasks that will be requeued
requeued as (
update pgflow.step_tasks st
set
status = 'queued',
started_at = null,
last_worker_id = null,
requeued_count = st.requeued_count + 1,
last_requeued_at = now()
from to_requeue tr
where st.run_id = tr.run_id
and st.step_slug = tr.step_slug
and st.task_index = tr.task_index
returning tr.flow_slug as queue_name, tr.message_id
),
-- Make requeued messages visible immediately (batched per queue)
visibility_reset as (
select pgflow.set_vt_batch(
r.queue_name,
array_agg(r.message_id),
array_agg(0) -- all offsets are 0 (immediate visibility)
)
from requeued r
where r.message_id is not null
group by r.queue_name
),
-- Archive messages for tasks that exceeded max requeues (batched per queue)
-- Task status remains 'started' with requeued_count >= 3 for easy identification
archived as (
select pgmq.archive(ta.flow_slug, array_agg(ta.message_id))
from to_archive ta
where ta.message_id is not null
group by ta.flow_slug
),
-- Force execution of visibility_reset CTE
_vr as (select count(*) from visibility_reset),
-- Force execution of archived CTE
_ar as (select count(*) from archived)
select count(*) into result_count
from requeued, _vr, _ar;
return result_count;
end;
$$;
with stalled_tasks as (
select
st.run_id,
st.step_slug,
st.task_index,
st.message_id,
r.flow_slug,
st.requeued_count,
f.opt_timeout
from pgflow.step_tasks st
join pgflow.runs r on r.run_id = st.run_id
join pgflow.flows f on f.flow_slug = r.flow_slug
where st.status = 'started'
and st.started_at < now() - (f.opt_timeout * interval '1 second') - interval '30 seconds'
for update of st skip locked
),
-- Separate tasks that can be requeued from those that exceeded max requeues
to_requeue as (
select * from stalled_tasks where requeued_count < max_requeues
),
to_archive as (
select * from stalled_tasks where requeued_count >= max_requeues
),
-- Update tasks that will be requeued
requeued as (
update pgflow.step_tasks st
set
status = 'queued',
started_at = null,
last_worker_id = null,
requeued_count = st.requeued_count + 1,
last_requeued_at = now()
from to_requeue tr
where st.run_id = tr.run_id
and st.step_slug = tr.step_slug
and st.task_index = tr.task_index
returning tr.flow_slug as queue_name, tr.message_id
),
-- Update tasks that exceeded max requeues to prevent infinite reprocessing
archived_updated as (
update pgflow.step_tasks st
set status = 'failed'
from to_archive ta
where st.run_id = ta.run_id
and st.step_slug = ta.step_slug
and st.task_index = ta.task_index
returning 1
),
-- Make requeued messages visible immediately (batched per queue)
visibility_reset as (
select pgflow.set_vt_batch(
r.queue_name,
array_agg(r.message_id),
array_agg(0) -- all offsets are 0 (immediate visibility)
)
from requeued r
where r.message_id is not null
group by r.queue_name
),
-- Archive messages for tasks that exceeded max requeues (batched per queue)
-- Task status is now updated to 'failed' to prevent reprocessing
archived as (
select pgmq.archive(ta.flow_slug, array_agg(ta.message_id))
from to_archive ta
where ta.message_id is not null
group by ta.flow_slug
),
-- Force execution of visibility_reset CTE
_vr as (select count(*) from visibility_reset),
-- Force execution of archived CTE
_ar as (select count(*) from archived),
-- Force execution of archived_updated CTE
_au as (select count(*) from archived_updated)
select count(*) into result_count
from requeued, _vr, _ar, _au;
return result_count;
end;
$$;

Spotted by Graphite Agent

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

… logic

- Introduced requeued_count and last_requeued_at columns to step_tasks table
- Developed requeue_stalled_tasks function to requeue or fail stalled tasks based on max requeues
- Created setup_requeue_stalled_tasks_cron function to schedule automatic requeue checks
- Updated migration scripts to include new columns and functions
- Added comprehensive tests for requeue behavior, max requeue limit, and cron setup
@jumski jumski force-pushed the 01-12-pgf-aav_implement_requeue_for_stalled_tasks branch from 3653295 to 8f162cb Compare January 19, 2026 07:47
@jumski jumski mentioned this pull request Jan 19, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants