-
Notifications
You must be signed in to change notification settings - Fork 15
feat: add automatic requeue for stalled tasks via cron job #591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
🦋 Changeset detectedLatest commit: 8f162cb The changes in this PR will be included in the next version bump. This PR includes changesets to release 5 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
This stack of pull requests is managed by Graphite. Learn more about stacking. |
|
View your CI Pipeline Execution ↗ for commit 8f162cb
☁️ Nx Cloud last updated this comment at |
367abd2 to
3653295
Compare
| with stalled_tasks as ( | ||
| select | ||
| st.run_id, | ||
| st.step_slug, | ||
| st.task_index, | ||
| st.message_id, | ||
| r.flow_slug, | ||
| st.requeued_count, | ||
| f.opt_timeout | ||
| from pgflow.step_tasks st | ||
| join pgflow.runs r on r.run_id = st.run_id | ||
| join pgflow.flows f on f.flow_slug = r.flow_slug | ||
| where st.status = 'started' | ||
| and st.started_at < now() - (f.opt_timeout * interval '1 second') - interval '30 seconds' | ||
| for update of st skip locked | ||
| ), | ||
| -- Separate tasks that can be requeued from those that exceeded max requeues | ||
| to_requeue as ( | ||
| select * from stalled_tasks where requeued_count < max_requeues | ||
| ), | ||
| to_archive as ( | ||
| select * from stalled_tasks where requeued_count >= max_requeues | ||
| ), | ||
| -- Update tasks that will be requeued | ||
| requeued as ( | ||
| update pgflow.step_tasks st | ||
| set | ||
| status = 'queued', | ||
| started_at = null, | ||
| last_worker_id = null, | ||
| requeued_count = st.requeued_count + 1, | ||
| last_requeued_at = now() | ||
| from to_requeue tr | ||
| where st.run_id = tr.run_id | ||
| and st.step_slug = tr.step_slug | ||
| and st.task_index = tr.task_index | ||
| returning tr.flow_slug as queue_name, tr.message_id | ||
| ), | ||
| -- Make requeued messages visible immediately (batched per queue) | ||
| visibility_reset as ( | ||
| select pgflow.set_vt_batch( | ||
| r.queue_name, | ||
| array_agg(r.message_id), | ||
| array_agg(0) -- all offsets are 0 (immediate visibility) | ||
| ) | ||
| from requeued r | ||
| where r.message_id is not null | ||
| group by r.queue_name | ||
| ), | ||
| -- Archive messages for tasks that exceeded max requeues (batched per queue) | ||
| -- Task status remains 'started' with requeued_count >= 3 for easy identification | ||
| archived as ( | ||
| select pgmq.archive(ta.flow_slug, array_agg(ta.message_id)) | ||
| from to_archive ta | ||
| where ta.message_id is not null | ||
| group by ta.flow_slug | ||
| ), | ||
| -- Force execution of visibility_reset CTE | ||
| _vr as (select count(*) from visibility_reset), | ||
| -- Force execution of archived CTE | ||
| _ar as (select count(*) from archived) | ||
| select count(*) into result_count | ||
| from requeued, _vr, _ar; | ||
|
|
||
| return result_count; | ||
| end; | ||
| $$; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical Bug: Infinite reprocessing of archived tasks
Tasks that exceed the max requeue limit (3) will be reprocessed every cron run (every 15 seconds) indefinitely. Here's why:
- When
requeued_count >= 3, tasks go intoto_archiveCTE - Their messages are archived in the queue
- BUT the task record itself is never updated - it remains with
status = 'started'andrequeued_count >= 3 - On the next cron run, the same task matches the
stalled_taskscondition again - Steps 1-4 repeat forever, causing:
- Unnecessary CPU load every 15 seconds
- Repeated archive operations on already-archived messages (will likely error)
- Database lock contention
Fix: Add an UPDATE statement for to_archive tasks to change their status:
-- After the 'requeued' CTE, add:
archived_updated as (
update pgflow.step_tasks st
set status = 'failed' -- or 'archived'
from to_archive ta
where st.run_id = ta.run_id
and st.step_slug = ta.step_slug
and st.task_index = ta.task_index
returning 1
),Or alternatively, exclude already-maxed-out tasks from the initial query:
where st.status = 'started'
and st.requeued_count < max_requeues -- Add this condition
and st.started_at < ...| with stalled_tasks as ( | |
| select | |
| st.run_id, | |
| st.step_slug, | |
| st.task_index, | |
| st.message_id, | |
| r.flow_slug, | |
| st.requeued_count, | |
| f.opt_timeout | |
| from pgflow.step_tasks st | |
| join pgflow.runs r on r.run_id = st.run_id | |
| join pgflow.flows f on f.flow_slug = r.flow_slug | |
| where st.status = 'started' | |
| and st.started_at < now() - (f.opt_timeout * interval '1 second') - interval '30 seconds' | |
| for update of st skip locked | |
| ), | |
| -- Separate tasks that can be requeued from those that exceeded max requeues | |
| to_requeue as ( | |
| select * from stalled_tasks where requeued_count < max_requeues | |
| ), | |
| to_archive as ( | |
| select * from stalled_tasks where requeued_count >= max_requeues | |
| ), | |
| -- Update tasks that will be requeued | |
| requeued as ( | |
| update pgflow.step_tasks st | |
| set | |
| status = 'queued', | |
| started_at = null, | |
| last_worker_id = null, | |
| requeued_count = st.requeued_count + 1, | |
| last_requeued_at = now() | |
| from to_requeue tr | |
| where st.run_id = tr.run_id | |
| and st.step_slug = tr.step_slug | |
| and st.task_index = tr.task_index | |
| returning tr.flow_slug as queue_name, tr.message_id | |
| ), | |
| -- Make requeued messages visible immediately (batched per queue) | |
| visibility_reset as ( | |
| select pgflow.set_vt_batch( | |
| r.queue_name, | |
| array_agg(r.message_id), | |
| array_agg(0) -- all offsets are 0 (immediate visibility) | |
| ) | |
| from requeued r | |
| where r.message_id is not null | |
| group by r.queue_name | |
| ), | |
| -- Archive messages for tasks that exceeded max requeues (batched per queue) | |
| -- Task status remains 'started' with requeued_count >= 3 for easy identification | |
| archived as ( | |
| select pgmq.archive(ta.flow_slug, array_agg(ta.message_id)) | |
| from to_archive ta | |
| where ta.message_id is not null | |
| group by ta.flow_slug | |
| ), | |
| -- Force execution of visibility_reset CTE | |
| _vr as (select count(*) from visibility_reset), | |
| -- Force execution of archived CTE | |
| _ar as (select count(*) from archived) | |
| select count(*) into result_count | |
| from requeued, _vr, _ar; | |
| return result_count; | |
| end; | |
| $$; | |
| with stalled_tasks as ( | |
| select | |
| st.run_id, | |
| st.step_slug, | |
| st.task_index, | |
| st.message_id, | |
| r.flow_slug, | |
| st.requeued_count, | |
| f.opt_timeout | |
| from pgflow.step_tasks st | |
| join pgflow.runs r on r.run_id = st.run_id | |
| join pgflow.flows f on f.flow_slug = r.flow_slug | |
| where st.status = 'started' | |
| and st.started_at < now() - (f.opt_timeout * interval '1 second') - interval '30 seconds' | |
| for update of st skip locked | |
| ), | |
| -- Separate tasks that can be requeued from those that exceeded max requeues | |
| to_requeue as ( | |
| select * from stalled_tasks where requeued_count < max_requeues | |
| ), | |
| to_archive as ( | |
| select * from stalled_tasks where requeued_count >= max_requeues | |
| ), | |
| -- Update tasks that will be requeued | |
| requeued as ( | |
| update pgflow.step_tasks st | |
| set | |
| status = 'queued', | |
| started_at = null, | |
| last_worker_id = null, | |
| requeued_count = st.requeued_count + 1, | |
| last_requeued_at = now() | |
| from to_requeue tr | |
| where st.run_id = tr.run_id | |
| and st.step_slug = tr.step_slug | |
| and st.task_index = tr.task_index | |
| returning tr.flow_slug as queue_name, tr.message_id | |
| ), | |
| -- Update tasks that exceeded max requeues to prevent infinite reprocessing | |
| archived_updated as ( | |
| update pgflow.step_tasks st | |
| set status = 'failed' | |
| from to_archive ta | |
| where st.run_id = ta.run_id | |
| and st.step_slug = ta.step_slug | |
| and st.task_index = ta.task_index | |
| returning 1 | |
| ), | |
| -- Make requeued messages visible immediately (batched per queue) | |
| visibility_reset as ( | |
| select pgflow.set_vt_batch( | |
| r.queue_name, | |
| array_agg(r.message_id), | |
| array_agg(0) -- all offsets are 0 (immediate visibility) | |
| ) | |
| from requeued r | |
| where r.message_id is not null | |
| group by r.queue_name | |
| ), | |
| -- Archive messages for tasks that exceeded max requeues (batched per queue) | |
| -- Task status is now updated to 'failed' to prevent reprocessing | |
| archived as ( | |
| select pgmq.archive(ta.flow_slug, array_agg(ta.message_id)) | |
| from to_archive ta | |
| where ta.message_id is not null | |
| group by ta.flow_slug | |
| ), | |
| -- Force execution of visibility_reset CTE | |
| _vr as (select count(*) from visibility_reset), | |
| -- Force execution of archived CTE | |
| _ar as (select count(*) from archived), | |
| -- Force execution of archived_updated CTE | |
| _au as (select count(*) from archived_updated) | |
| select count(*) into result_count | |
| from requeued, _vr, _ar, _au; | |
| return result_count; | |
| end; | |
| $$; |
Spotted by Graphite Agent
Is this helpful? React 👍 or 👎 to let us know.
… logic - Introduced requeued_count and last_requeued_at columns to step_tasks table - Developed requeue_stalled_tasks function to requeue or fail stalled tasks based on max requeues - Created setup_requeue_stalled_tasks_cron function to schedule automatic requeue checks - Updated migration scripts to include new columns and functions - Added comprehensive tests for requeue behavior, max requeue limit, and cron setup
3653295 to
8f162cb
Compare

Add automatic requeue for stalled tasks via cron job
This PR implements a system to automatically detect and requeue tasks that have stalled due to worker crashes or other issues. Key features:
requeue_stalled_tasks()function that identifies tasks stuck in 'started' status beyond their timeout windowstep_taskstable:requeued_countandlast_requeued_atsetup_requeue_stalled_tasks_cron()that runs every 15 seconds by defaultThis enhancement improves system resilience by ensuring tasks don't remain stuck when workers crash unexpectedly, addressing issue #586.