From 5e30cc7e670c2f39bb427c9a381ff5a9f38cb2d7 Mon Sep 17 00:00:00 2001 From: mscherer Date: Thu, 29 Jan 2026 09:05:22 +0100 Subject: [PATCH 1/4] Add optional output capture for queue jobs Introduce a new `Queue.captureOutput` config option that, when enabled, captures task stdout/stderr output and stores it in a new `output` text column on the `queued_jobs` table. This allows reviewing job output in the admin panel after execution, which is especially useful for jobs running as daemons/services where terminal output is lost. - Add output capture buffer to Io class with enable/disable/reset API - Add `output` nullable text column via migration - Update Processor to enable capture per job and pass output to markJobDone/markJobFailed - Update QueuedJobsTable reset/rerun/clone to clear output field - Display output in admin job detail view - Add `captureOutput` and `maxOutputSize` config options (off by default) --- .../20260129000000_MigrationQueueOutput.php | 22 ++++ config/app.example.php | 6 + src/Console/Io.php | 106 ++++++++++++++++++ src/Model/Entity/QueuedJob.php | 1 + src/Model/Table/QueuedJobsTable.php | 15 ++- src/Queue/Processor.php | 16 ++- templates/Admin/QueuedJobs/view.php | 8 ++ 7 files changed, 170 insertions(+), 4 deletions(-) create mode 100644 config/Migrations/20260129000000_MigrationQueueOutput.php diff --git a/config/Migrations/20260129000000_MigrationQueueOutput.php b/config/Migrations/20260129000000_MigrationQueueOutput.php new file mode 100644 index 00000000..39109b25 --- /dev/null +++ b/config/Migrations/20260129000000_MigrationQueueOutput.php @@ -0,0 +1,22 @@ +table('queued_jobs') + ->addColumn('output', 'text', [ + 'default' => null, + 'null' => true, + ]) + ->update(); + } + +} diff --git a/config/app.example.php b/config/app.example.php index 3b544587..1c4585a1 100644 --- a/config/app.example.php +++ b/config/app.example.php @@ -47,6 +47,12 @@ // determine whether logging is enabled 'log' => true, + // capture task output (stdout/stderr) and store in database + 'captureOutput' => false, + + // maximum size in bytes for captured output (0 = unlimited) + 'maxOutputSize' => 65536, // 64KB + // set default Mailer class 'mailerClass' => 'Cake\Mailer\Email', diff --git a/src/Console/Io.php b/src/Console/Io.php index 2fd375a6..91e8044c 100644 --- a/src/Console/Io.php +++ b/src/Console/Io.php @@ -18,6 +18,16 @@ class Io { */ protected ConsoleIo $_io; + /** + * @var array + */ + protected array $outputLog = []; + + /** + * @var bool + */ + protected bool $captureOutput = false; + /** * @param \Cake\Console\ConsoleIo $io */ @@ -25,6 +35,92 @@ public function __construct(ConsoleIo $io) { $this->_io = $io; } + /** + * Enable output capture. + * + * @return void + */ + public function enableOutputCapture(): void { + $this->captureOutput = true; + $this->outputLog = []; + } + + /** + * Disable output capture. + * + * @return void + */ + public function disableOutputCapture(): void { + $this->captureOutput = false; + } + + /** + * Get the captured output log. + * + * @return array + */ + public function getOutputLog(): array { + return $this->outputLog; + } + + /** + * Get the captured output as plain text string. + * + * @param int $maxLength Maximum length in bytes before truncation. 0 for unlimited. + * + * @return string|null + */ + public function getOutputAsText(int $maxLength = 0): ?string { + if (!$this->outputLog) { + return null; + } + + $lines = []; + foreach ($this->outputLog as $entry) { + $prefix = strtoupper($entry['level']); + $lines[] = '[' . date('H:i:s', $entry['time']) . '] ' . $prefix . ': ' . $entry['message']; + } + + $output = implode("\n", $lines); + if ($maxLength > 0 && strlen($output) > $maxLength) { + $output = substr($output, 0, $maxLength) . "\n... [output truncated]"; + } + + return $output; + } + + /** + * Reset the captured output log. + * + * @return void + */ + public function resetOutputLog(): void { + $this->outputLog = []; + } + + /** + * @param string $level + * @param array|string $message + * + * @return void + */ + protected function capture(string $level, array|string $message): void { + if (!$this->captureOutput) { + return; + } + + $messages = (array)$message; + foreach ($messages as $msg) { + // Strip console formatting tags + $clean = (string)preg_replace('/<\/?[a-z]+>/', '', $msg); + $this->outputLog[] = [ + 'level' => $level, + 'message' => $clean, + 'time' => time(), + ]; + } + } + /** * Output at the verbose level. * @@ -34,6 +130,8 @@ public function __construct(ConsoleIo $io) { * @return int|null The number of bytes returned from writing to stdout. */ public function verbose(array|string $message, int $newlines = 1): ?int { + $this->capture('verbose', $message); + return $this->_io->verbose($message, $newlines); } @@ -46,6 +144,8 @@ public function verbose(array|string $message, int $newlines = 1): ?int { * @return int|null The number of bytes returned from writing to stdout. */ public function quiet(array|string $message, int $newlines = 1): ?int { + $this->capture('info', $message); + return $this->_io->quiet($message, $newlines); } @@ -69,6 +169,8 @@ public function quiet(array|string $message, int $newlines = 1): ?int { * @return int|null The number of bytes returned from writing to stdout. */ public function out(array|string $message = '', int $newlines = 1, int $level = ConsoleIo::NORMAL): ?int { + $this->capture('info', $message); + return $this->_io->out($message, $newlines, $level); } @@ -82,6 +184,8 @@ public function out(array|string $message = '', int $newlines = 1, int $level = * @return int|null The number of bytes returned from writing to stderr. */ public function error(array|string $message = '', int $newlines = 1): ?int { + $this->capture('error', $message); + $messages = (array)$message; return $this->_io->error($messages, $newlines); @@ -138,6 +242,8 @@ public function comment(array|string $message = '', int $newlines = 1, int $leve * @return int|null The number of bytes returned from writing to stderr. */ public function warn(array|string $message = '', int $newlines = 1): ?int { + $this->capture('warning', $message); + $messages = (array)$message; foreach ($messages as $key => $message) { $messages[$key] = '' . $message . ''; diff --git a/src/Model/Entity/QueuedJob.php b/src/Model/Entity/QueuedJob.php index f73d2247..672b837a 100644 --- a/src/Model/Entity/QueuedJob.php +++ b/src/Model/Entity/QueuedJob.php @@ -23,6 +23,7 @@ * @property int $priority * @property \Queue\Model\Entity\QueueProcess $worker_process * @property int|null $memory + * @property string|null $output * @property string|null $headers ! * @property string|null $message ! */ diff --git a/src/Model/Table/QueuedJobsTable.php b/src/Model/Table/QueuedJobsTable.php index 302ffe8d..8f9702bc 100644 --- a/src/Model/Table/QueuedJobsTable.php +++ b/src/Model/Table/QueuedJobsTable.php @@ -665,6 +665,7 @@ public function requestJob(array $tasks, array $groups = [], array $types = []): 'fetched' => $now, 'progress' => null, 'failure_message' => null, + 'output' => null, 'attempts' => $job->attempts + 1, ]); @@ -710,12 +711,15 @@ public function updateProgress(int $id, float $progress, ?string $status = null) * * @return bool Success */ - public function markJobDone(QueuedJob $job): bool { + public function markJobDone(QueuedJob $job, ?string $output = null): bool { $fields = [ 'progress' => 1, 'completed' => $this->getDateTime(), 'memory' => Memory::usage(), ]; + if ($output !== null) { + $fields['output'] = $output; + } $job = $this->patchEntity($job, $fields); return (bool)$this->save($job); @@ -729,11 +733,14 @@ public function markJobDone(QueuedJob $job): bool { * * @return bool Success */ - public function markJobFailed(QueuedJob $job, ?string $failureMessage = null): bool { + public function markJobFailed(QueuedJob $job, ?string $failureMessage = null, ?string $output = null): bool { $fields = [ 'failure_message' => $failureMessage, 'memory' => Memory::usage(), ]; + if ($output !== null) { + $fields['output'] = $output; + } $job = $this->patchEntity($job, $fields); return (bool)$this->save($job); @@ -773,6 +780,7 @@ public function reset(?int $id = null, bool $full = false): int { 'attempts' => 0, 'workerkey' => null, 'failure_message' => null, + 'output' => null, 'memory' => null, ]; $conditions = [ @@ -806,6 +814,7 @@ public function rerunByTask(string $task, ?string $reference = null): int { 'attempts' => 0, 'workerkey' => null, 'failure_message' => null, + 'output' => null, 'memory' => null, ]; $conditions = [ @@ -832,6 +841,7 @@ public function rerun(int $id): int { 'attempts' => 0, 'workerkey' => null, 'failure_message' => null, + 'output' => null, 'memory' => null, ]; $conditions = [ @@ -855,6 +865,7 @@ public function clone(QueuedJob $queuedJob): ?QueuedJob { 'attempts' => 0, 'workerkey' => null, 'failure_message' => null, + 'output' => null, 'memory' => null, ]; $data = $defaults + $queuedJob->toArray(); diff --git a/src/Queue/Processor.php b/src/Queue/Processor.php index 0b0ef045..9e52591c 100644 --- a/src/Queue/Processor.php +++ b/src/Queue/Processor.php @@ -224,6 +224,11 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void { ]); EventManager::instance()->dispatch($event); + $captureOutput = (bool)Configure::read('Queue.captureOutput'); + if ($captureOutput) { + $this->io->enableOutputCapture(); + } + $return = $failureMessage = null; try { $this->time = time(); @@ -247,8 +252,15 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void { $this->logError($taskName . ' (job ' . $queuedJob->id . ')' . "\n" . $failureMessage, $pid); } + $capturedOutput = null; + if ($captureOutput) { + $maxOutputSize = (int)(Configure::read('Queue.maxOutputSize') ?: 65536); + $capturedOutput = $this->io->getOutputAsText($maxOutputSize); + $this->io->disableOutputCapture(); + } + if ($return === false) { - $this->QueuedJobs->markJobFailed($queuedJob, $failureMessage); + $this->QueuedJobs->markJobFailed($queuedJob, $failureMessage, $capturedOutput); $failedStatus = $this->QueuedJobs->getFailedStatus($queuedJob, $this->getTaskConf()); $this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id . ' failed and ' . $failedStatus, $pid); $this->io->out('Job did not finish, ' . $failedStatus . ' after try ' . $queuedJob->attempts . '.'); @@ -273,7 +285,7 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void { return; } - $this->QueuedJobs->markJobDone($queuedJob); + $this->QueuedJobs->markJobDone($queuedJob, $capturedOutput); // Dispatch completed event $event = new Event('Queue.Job.completed', $this, [ diff --git a/templates/Admin/QueuedJobs/view.php b/templates/Admin/QueuedJobs/view.php index a18a79b3..e85cec09 100644 --- a/templates/Admin/QueuedJobs/view.php +++ b/templates/Admin/QueuedJobs/view.php @@ -146,6 +146,14 @@ ?> + output) { ?> +
+
+

+
output) ?>
+
+
+

From 50c511f7c035a24176d8cdcf7fe0c3ac538acac3 Mon Sep 17 00:00:00 2001 From: mscherer Date: Thu, 29 Jan 2026 10:16:25 +0100 Subject: [PATCH 2/4] Fix CI: add missing output param docs and fixture column - Add @param doc for $output on markJobDone() and markJobFailed() to fix phpcs DocBlockParam.ParamTypeMismatch errors - Add output column to QueuedJobsFixture to fix "no such column" errors in test suite --- src/Model/Table/QueuedJobsTable.php | 2 ++ tests/Fixture/QueuedJobsFixture.php | 1 + 2 files changed, 3 insertions(+) diff --git a/src/Model/Table/QueuedJobsTable.php b/src/Model/Table/QueuedJobsTable.php index 8f9702bc..bf795ace 100644 --- a/src/Model/Table/QueuedJobsTable.php +++ b/src/Model/Table/QueuedJobsTable.php @@ -708,6 +708,7 @@ public function updateProgress(int $id, float $progress, ?string $status = null) * Mark a job as Completed, removing it from the queue. * * @param \Queue\Model\Entity\QueuedJob $job Job + * @param string|null $output Optional captured output. * * @return bool Success */ @@ -730,6 +731,7 @@ public function markJobDone(QueuedJob $job, ?string $output = null): bool { * * @param \Queue\Model\Entity\QueuedJob $job Job * @param string|null $failureMessage Optional message to append to the failure_message field. + * @param string|null $output Optional captured output. * * @return bool Success */ diff --git a/tests/Fixture/QueuedJobsFixture.php b/tests/Fixture/QueuedJobsFixture.php index 82c147b8..2fe8f210 100644 --- a/tests/Fixture/QueuedJobsFixture.php +++ b/tests/Fixture/QueuedJobsFixture.php @@ -33,6 +33,7 @@ class QueuedJobsFixture extends TestFixture { 'status' => ['type' => 'string', 'length' => 255, 'null' => true, 'default' => null, 'comment' => '', 'precision' => null, 'fixed' => null], 'priority' => ['type' => 'integer', 'length' => 3, 'unsigned' => true, 'null' => false, 'default' => 5, 'comment' => '', 'precision' => null, 'autoIncrement' => null], 'memory' => ['type' => 'integer', 'length' => 10, 'unsigned' => true, 'null' => true, 'default' => null, 'comment' => 'MB'], + 'output' => ['type' => 'text', 'length' => 16777215, 'null' => true, 'default' => null, 'comment' => '', 'precision' => null], '_constraints' => [ 'primary' => ['type' => 'primary', 'columns' => ['id'], 'length' => []], ], From 6d11c082739fbc2e191420ddfa648ceefb08840c Mon Sep 17 00:00:00 2001 From: mscherer Date: Thu, 29 Jan 2026 10:49:16 +0100 Subject: [PATCH 3/4] Preserve captured output on SIGTERM worker termination When a worker is killed by signal, any partial output captured before the signal is now saved alongside the failure message. Previously this output was silently discarded. --- src/Queue/Processor.php | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/Queue/Processor.php b/src/Queue/Processor.php index 9e52591c..000ed928 100644 --- a/src/Queue/Processor.php +++ b/src/Queue/Processor.php @@ -369,7 +369,12 @@ protected function getTaskConf(): array { protected function exit(int $signal): void { if ($this->currentJob) { $failureMessage = 'Worker process terminated by signal (SIGTERM) - job execution interrupted due to timeout or manual termination'; - $this->QueuedJobs->markJobFailed($this->currentJob, $failureMessage); + $capturedOutput = null; + if ((bool)Configure::read('Queue.captureOutput')) { + $maxOutputSize = (int)(Configure::read('Queue.maxOutputSize') ?: 65536); + $capturedOutput = $this->io->getOutputAsText($maxOutputSize); + } + $this->QueuedJobs->markJobFailed($this->currentJob, $failureMessage, $capturedOutput); $this->logError('Job ' . $this->currentJob->job_task . ' (id ' . $this->currentJob->id . ') failed due to worker termination', $this->pid); $this->io->out('Current job marked as failed due to worker termination.'); } From 5f2fa9624ab2e52506e78aa89d606621bb85357b Mon Sep 17 00:00:00 2001 From: mscherer Date: Thu, 29 Jan 2026 11:14:50 +0100 Subject: [PATCH 4/4] Fix edge cases found in audit Io.php: - Cap output capture memory during capture(), not just retrieval - Capture overwrite() and hr() output - Route abort() through $this->error() for capture - Reset byte counter in resetOutputLog() Processor.php: - abort() signal handler now marks current job as failed with captured output (was silently losing jobs on Ctrl+C) - exit() signal handler now disables capture and clears currentJob - Replace declare(ticks=1) with pcntl_async_signals(true) - Pass maxOutputSize to enableOutputCapture() for early capping - Remove outdated PHP 5.3 gc_enable() guard QueuedJobsTable.php: - Fix SQLite age calculation (was computing now-now instead of notbefore-now) - Fix getFailedStatus() backward compat to use 'Queue.' plugin prefix instead of non-matching 'Queue' concatenation - Use Config::cleanuptimeout() in cleanOldJobs() for consistent defaults - Replace deprecated group()/order() with groupBy()/orderBy() - Remove dead bin2hex null check JobCommand.php: - Display output field in CLI view command - Fix rerunAll()/resetAll() to use bulk updateAll instead of loading all entities into memory WorkerCommand.php: - Fix defaultName() collision with JobCommand (was 'queue job', now 'queue worker') QueueProcessesTable.php: - Sanitize PID/signal as integers in terminateProcess() exec() --- src/Command/JobCommand.php | 48 +++++++++++++++---------- src/Command/WorkerCommand.php | 2 +- src/Console/Io.php | 32 +++++++++++++++-- src/Model/Table/QueueProcessesTable.php | 6 +++- src/Model/Table/QueuedJobsTable.php | 44 +++++++---------------- src/Queue/Processor.php | 27 ++++++++++---- 6 files changed, 99 insertions(+), 60 deletions(-) diff --git a/src/Command/JobCommand.php b/src/Command/JobCommand.php index 35e34fc3..db29b1ec 100644 --- a/src/Command/JobCommand.php +++ b/src/Command/JobCommand.php @@ -141,18 +141,26 @@ public function execute(Arguments $args, ConsoleIo $io) { * @return int */ protected function rerunAll(ConsoleIo $io): int { - /** @var array<\Queue\Model\Entity\QueuedJob> $queuedJobs */ - $queuedJobs = $this->QueuedJobs->find() - ->where(['completed IS NOT' => null]) - ->all() - ->toArray(); - - $status = static::CODE_SUCCESS; - foreach ($queuedJobs as $queuedJob) { - $status |= $this->rerun($io, $queuedJob); + $fields = [ + 'completed' => null, + 'fetched' => null, + 'progress' => null, + 'attempts' => 0, + 'workerkey' => null, + 'failure_message' => null, + 'output' => null, + 'memory' => null, + ]; + $count = $this->QueuedJobs->updateAll($fields, ['completed IS NOT' => null]); + if (!$count) { + $io->out('No completed jobs to rerun.'); + + return static::CODE_SUCCESS; } - return $status; + $io->success($count . ' job(s) queued for rerun'); + + return static::CODE_SUCCESS; } /** @@ -161,18 +169,16 @@ protected function rerunAll(ConsoleIo $io): int { * @return int */ protected function resetAll(ConsoleIo $io): int { - /** @var array<\Queue\Model\Entity\QueuedJob> $queuedJobs */ - $queuedJobs = $this->QueuedJobs->find() - ->where(['completed IS' => null]) - ->all() - ->toArray(); + $count = $this->QueuedJobs->reset(null, true); + if (!$count) { + $io->out('No incomplete jobs to reset.'); - $status = static::CODE_SUCCESS; - foreach ($queuedJobs as $queuedJob) { - $status |= $this->reset($io, $queuedJob); + return static::CODE_SUCCESS; } - return $status; + $io->success($count . ' job(s) reset for rerun'); + + return static::CODE_SUCCESS; } /** @@ -266,6 +272,10 @@ protected function view(ConsoleIo $io, QueuedJob $queuedJob): int { if ($queuedJob->attempts) { $io->out('Failure message: ' . ($queuedJob->failure_message ?: '-')); } + if ($queuedJob->output) { + $io->out('Output:'); + $io->out($queuedJob->output); + } $data = $queuedJob->data; $io->out('Data: ' . Debugger::exportVar($data, 9)); diff --git a/src/Command/WorkerCommand.php b/src/Command/WorkerCommand.php index 4e6f3242..484a0874 100644 --- a/src/Command/WorkerCommand.php +++ b/src/Command/WorkerCommand.php @@ -42,7 +42,7 @@ public function initialize(): void { * @inheritDoc */ public static function defaultName(): string { - return 'queue job'; + return 'queue worker'; } /** diff --git a/src/Console/Io.php b/src/Console/Io.php index 91e8044c..9225cb87 100644 --- a/src/Console/Io.php +++ b/src/Console/Io.php @@ -28,6 +28,16 @@ class Io { */ protected bool $captureOutput = false; + /** + * @var int + */ + protected int $capturedBytes = 0; + + /** + * @var int + */ + protected int $maxCaptureBytes = 0; + /** * @param \Cake\Console\ConsoleIo $io */ @@ -38,11 +48,15 @@ public function __construct(ConsoleIo $io) { /** * Enable output capture. * + * @param int $maxBytes Maximum bytes to capture (0 for unlimited). + * * @return void */ - public function enableOutputCapture(): void { + public function enableOutputCapture(int $maxBytes = 0): void { $this->captureOutput = true; $this->outputLog = []; + $this->capturedBytes = 0; + $this->maxCaptureBytes = $maxBytes; } /** @@ -96,6 +110,7 @@ public function getOutputAsText(int $maxLength = 0): ?string { */ public function resetOutputLog(): void { $this->outputLog = []; + $this->capturedBytes = 0; } /** @@ -109,15 +124,24 @@ protected function capture(string $level, array|string $message): void { return; } + if ($this->maxCaptureBytes > 0 && $this->capturedBytes >= $this->maxCaptureBytes) { + return; + } + $messages = (array)$message; foreach ($messages as $msg) { // Strip console formatting tags $clean = (string)preg_replace('/<\/?[a-z]+>/', '', $msg); + $this->capturedBytes += strlen($clean); $this->outputLog[] = [ 'level' => $level, 'message' => $clean, 'time' => time(), ]; + + if ($this->maxCaptureBytes > 0 && $this->capturedBytes >= $this->maxCaptureBytes) { + break; + } } } @@ -296,6 +320,8 @@ public function nl(int $multiplier = 1): string { * @return void */ public function hr(int $newlines = 0, int $width = 63): void { + $this->capture('info', str_repeat('-', $width)); + $this->_io->hr($newlines, $width); } @@ -313,7 +339,7 @@ public function hr(int $newlines = 0, int $width = 63): void { * @return void */ public function abort(string $message, int $exitCode = CommandInterface::CODE_ERROR): void { - $this->_io->error($message); + $this->error($message); throw new StopException($message, $exitCode); } @@ -347,6 +373,8 @@ public function helper(string $name, array $settings = []): Helper { * @return void */ public function overwrite(array|string $message, int $newlines = 1, ?int $size = null): void { + $this->capture('info', $message); + $this->_io->overwrite($message, $newlines, $size); } diff --git a/src/Model/Table/QueueProcessesTable.php b/src/Model/Table/QueueProcessesTable.php index 1450ffed..72452892 100644 --- a/src/Model/Table/QueueProcessesTable.php +++ b/src/Model/Table/QueueProcessesTable.php @@ -308,7 +308,11 @@ public function terminateProcess(string $pid, int $sig = 0): void { $killed = posix_kill((int)$pid, $sig); } if (!$killed) { - exec('kill -' . $sig . ' ' . $pid); + $safePid = (int)$pid; + $safeSig = (int)$sig; + if ($safePid > 0) { + exec('kill -' . $safeSig . ' ' . $safePid); + } } sleep(1); diff --git a/src/Model/Table/QueuedJobsTable.php b/src/Model/Table/QueuedJobsTable.php index bf795ace..feb9d725 100644 --- a/src/Model/Table/QueuedJobsTable.php +++ b/src/Model/Table/QueuedJobsTable.php @@ -21,7 +21,6 @@ use Queue\Queue\Config; use Queue\Queue\TaskFinder; use Queue\Utility\Memory; -use RuntimeException; /** * @author MGriesbach@gmail.com @@ -296,18 +295,9 @@ public function getLength(?string $type = null): int { * @return \Cake\ORM\Query\SelectQuery */ public function getTypes(): SelectQuery { - $findCond = [ - 'fields' => [ - 'job_task', - ], - 'group' => [ - 'job_task', - ], - 'keyField' => 'job_task', - 'valueField' => 'job_task', - ]; - - return $this->find('list', ...$findCond); + return $this->find('list', keyField: 'job_task', valueField: 'job_task') + ->select(['job_task']) + ->groupBy(['job_task']); } /** @@ -357,12 +347,10 @@ public function getStats(bool $disableHydration = false): array { 'conditions' => [ 'completed IS NOT' => null, ], - 'group' => [ - 'job_task', - ], ]; - $query = $this->find('all', ...$options); + $query = $this->find('all', ...$options) + ->groupBy(['job_task']); if ($disableHydration) { $query = $query->disableHydration(); } @@ -505,7 +493,7 @@ public function requestJob(array $tasks, array $groups = [], array $types = []): break; case static::DRIVER_SQLITE: $age = $query->expr() - ->add('IFNULL(CAST(strftime("%s", CURRENT_TIMESTAMP) as integer) - CAST(strftime("%s", "' . $nowStr . '") as integer), 0)'); + ->add('IFNULL(CAST(strftime("%s", notbefore) as integer) - CAST(strftime("%s", "' . $nowStr . '") as integer), 0)'); break; } @@ -517,11 +505,6 @@ public function requestJob(array $tasks, array $groups = [], array $types = []): 'fields' => [ 'age' => $age, ], - 'order' => [ - 'priority' => 'ASC', - 'age' => 'ASC', - 'id' => 'ASC', - ], ]; $costConstraints = []; @@ -631,7 +614,8 @@ public function requestJob(array $tasks, array $groups = [], array $types = []): /** @var \Queue\Model\Entity\QueuedJob|null $job */ $job = $this->getConnection()->transactional(function () use ($query, $options, $now, $driverName) { - $query->find('all', ...$options)->enableAutoFields(true); + $query->find('all', ...$options)->enableAutoFields(true) + ->orderBy(['priority' => 'ASC', 'age' => 'ASC', 'id' => 'ASC']); switch ($driverName) { case static::DRIVER_MYSQL: @@ -944,11 +928,12 @@ public function getScheduledStats(): SelectQuery { * @return int */ public function cleanOldJobs(): int { - if (!Configure::read('Queue.cleanuptimeout')) { + $cleanupTimeout = Config::cleanuptimeout(); + if (!$cleanupTimeout) { return 0; } - $threshold = (new DateTime())->subSeconds((int)Configure::read('Queue.cleanuptimeout')); + $threshold = (new DateTime())->subSeconds($cleanupTimeout); return $this->deleteAll([ 'completed <' => $threshold, @@ -966,8 +951,8 @@ public function getFailedStatus(QueuedJob $queuedTask, array $taskConfiguration) $queuedTaskName = $queuedTask->job_task; if (empty($taskConfiguration[$queuedTaskName])) { - // Try with 'Queue' prefix for backward compatibility - $queuedTaskName = 'Queue' . $queuedTask->job_task; + // Try with plugin prefix for backward compatibility (e.g. "Example" => "Queue.Example") + $queuedTaskName = 'Queue.' . $queuedTask->job_task; if (empty($taskConfiguration[$queuedTaskName])) { return $failureMessageRequeued; } @@ -1005,9 +990,6 @@ public function key(): string { return $this->_key; } $this->_key = bin2hex(random_bytes(20)); - if (!$this->_key) { - throw new RuntimeException('Invalid key generated'); - } return $this->_key; } diff --git a/src/Queue/Processor.php b/src/Queue/Processor.php index 000ed928..80ee30a2 100644 --- a/src/Queue/Processor.php +++ b/src/Queue/Processor.php @@ -27,7 +27,7 @@ use const SIGTSTP; use const SIGUSR1; -declare(ticks=1); +// Enable async signal handling (replaces declare(ticks=1) for better performance) /** * Main shell to init and run queue workers. @@ -123,9 +123,10 @@ public function run(array $args): int { return CommandInterface::CODE_ERROR; } - // Enable Garbage Collector (PHP >= 5.3) - if (function_exists('gc_enable')) { - gc_enable(); + gc_enable(); + + if (function_exists('pcntl_async_signals')) { + pcntl_async_signals(true); } if (function_exists('pcntl_signal')) { pcntl_signal(SIGTERM, [&$this, 'exit']); @@ -226,7 +227,8 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void { $captureOutput = (bool)Configure::read('Queue.captureOutput'); if ($captureOutput) { - $this->io->enableOutputCapture(); + $maxOutputSize = (int)(Configure::read('Queue.maxOutputSize') ?: 65536); + $this->io->enableOutputCapture($maxOutputSize); } $return = $failureMessage = null; @@ -373,10 +375,11 @@ protected function exit(int $signal): void { if ((bool)Configure::read('Queue.captureOutput')) { $maxOutputSize = (int)(Configure::read('Queue.maxOutputSize') ?: 65536); $capturedOutput = $this->io->getOutputAsText($maxOutputSize); + $this->io->disableOutputCapture(); } $this->QueuedJobs->markJobFailed($this->currentJob, $failureMessage, $capturedOutput); $this->logError('Job ' . $this->currentJob->job_task . ' (id ' . $this->currentJob->id . ') failed due to worker termination', $this->pid); - $this->io->out('Current job marked as failed due to worker termination.'); + $this->currentJob = null; } $this->exit = true; } @@ -389,6 +392,18 @@ protected function exit(int $signal): void { * @return void */ protected function abort(int $signal = 1): void { + if ($this->currentJob) { + $failureMessage = 'Worker process aborted by signal (' . $signal . ') - job execution interrupted'; + $capturedOutput = null; + if ((bool)Configure::read('Queue.captureOutput')) { + $maxOutputSize = (int)(Configure::read('Queue.maxOutputSize') ?: 65536); + $capturedOutput = $this->io->getOutputAsText($maxOutputSize); + $this->io->disableOutputCapture(); + } + $this->QueuedJobs->markJobFailed($this->currentJob, $failureMessage, $capturedOutput); + $this->currentJob = null; + } + $this->deletePid($this->pid); exit($signal);