Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions config/Migrations/20260129000000_MigrationQueueOutput.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php
declare(strict_types=1);

use Migrations\BaseMigration;

class MigrationQueueOutput extends BaseMigration {

/**
* Change Method.
*
* @return void
*/
public function change(): void {
$this->table('queued_jobs')
->addColumn('output', 'text', [
'default' => null,
'null' => true,
])
->update();
}

}
6 changes: 6 additions & 0 deletions config/app.example.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@
// determine whether logging is enabled
'log' => true,

// capture task output (stdout/stderr) and store in database
'captureOutput' => false,

// maximum size in bytes for captured output (0 = unlimited)
'maxOutputSize' => 65536, // 64KB

// set default Mailer class
'mailerClass' => 'Cake\Mailer\Email',

Expand Down
48 changes: 29 additions & 19 deletions src/Command/JobCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,26 @@ public function execute(Arguments $args, ConsoleIo $io) {
* @return int
*/
protected function rerunAll(ConsoleIo $io): int {
/** @var array<\Queue\Model\Entity\QueuedJob> $queuedJobs */
$queuedJobs = $this->QueuedJobs->find()
->where(['completed IS NOT' => null])
->all()
->toArray();

$status = static::CODE_SUCCESS;
foreach ($queuedJobs as $queuedJob) {
$status |= $this->rerun($io, $queuedJob);
$fields = [
'completed' => null,
'fetched' => null,
'progress' => null,
'attempts' => 0,
'workerkey' => null,
'failure_message' => null,
'output' => null,
'memory' => null,
];
$count = $this->QueuedJobs->updateAll($fields, ['completed IS NOT' => null]);
if (!$count) {
$io->out('No completed jobs to rerun.');

return static::CODE_SUCCESS;
}

return $status;
$io->success($count . ' job(s) queued for rerun');

return static::CODE_SUCCESS;
}

/**
Expand All @@ -161,18 +169,16 @@ protected function rerunAll(ConsoleIo $io): int {
* @return int
*/
protected function resetAll(ConsoleIo $io): int {
/** @var array<\Queue\Model\Entity\QueuedJob> $queuedJobs */
$queuedJobs = $this->QueuedJobs->find()
->where(['completed IS' => null])
->all()
->toArray();
$count = $this->QueuedJobs->reset(null, true);
if (!$count) {
$io->out('No incomplete jobs to reset.');

$status = static::CODE_SUCCESS;
foreach ($queuedJobs as $queuedJob) {
$status |= $this->reset($io, $queuedJob);
return static::CODE_SUCCESS;
}

return $status;
$io->success($count . ' job(s) reset for rerun');

return static::CODE_SUCCESS;
}

/**
Expand Down Expand Up @@ -266,6 +272,10 @@ protected function view(ConsoleIo $io, QueuedJob $queuedJob): int {
if ($queuedJob->attempts) {
$io->out('Failure message: ' . ($queuedJob->failure_message ?: '-'));
}
if ($queuedJob->output) {
$io->out('Output:');
$io->out($queuedJob->output);
}

$data = $queuedJob->data;
$io->out('Data: ' . Debugger::exportVar($data, 9));
Expand Down
2 changes: 1 addition & 1 deletion src/Command/WorkerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public function initialize(): void {
* @inheritDoc
*/
public static function defaultName(): string {
return 'queue job';
return 'queue worker';
}

/**
Expand Down
136 changes: 135 additions & 1 deletion src/Console/Io.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,133 @@ class Io {
*/
protected ConsoleIo $_io;

/**
* @var array<array{level: string, message: string, time: int}>
*/
protected array $outputLog = [];

/**
* @var bool
*/
protected bool $captureOutput = false;

/**
* @var int
*/
protected int $capturedBytes = 0;

/**
* @var int
*/
protected int $maxCaptureBytes = 0;

/**
* @param \Cake\Console\ConsoleIo $io
*/
public function __construct(ConsoleIo $io) {
$this->_io = $io;
}

/**
* Enable output capture.
*
* @param int $maxBytes Maximum bytes to capture (0 for unlimited).
*
* @return void
*/
public function enableOutputCapture(int $maxBytes = 0): void {
$this->captureOutput = true;
$this->outputLog = [];
$this->capturedBytes = 0;
$this->maxCaptureBytes = $maxBytes;
}

/**
* Disable output capture.
*
* @return void
*/
public function disableOutputCapture(): void {
$this->captureOutput = false;
}

/**
* Get the captured output log.
*
* @return array<array{level: string, message: string, time: int}>
*/
public function getOutputLog(): array {
return $this->outputLog;
}

/**
* Get the captured output as plain text string.
*
* @param int $maxLength Maximum length in bytes before truncation. 0 for unlimited.
*
* @return string|null
*/
public function getOutputAsText(int $maxLength = 0): ?string {
if (!$this->outputLog) {
return null;
}

$lines = [];
foreach ($this->outputLog as $entry) {
$prefix = strtoupper($entry['level']);
$lines[] = '[' . date('H:i:s', $entry['time']) . '] ' . $prefix . ': ' . $entry['message'];
}

$output = implode("\n", $lines);
if ($maxLength > 0 && strlen($output) > $maxLength) {
$output = substr($output, 0, $maxLength) . "\n... [output truncated]";
}

return $output;
}

/**
* Reset the captured output log.
*
* @return void
*/
public function resetOutputLog(): void {
$this->outputLog = [];
$this->capturedBytes = 0;
}

/**
* @param string $level
* @param array<string>|string $message
*
* @return void
*/
protected function capture(string $level, array|string $message): void {
if (!$this->captureOutput) {
return;
}

if ($this->maxCaptureBytes > 0 && $this->capturedBytes >= $this->maxCaptureBytes) {
return;
}

$messages = (array)$message;
foreach ($messages as $msg) {
// Strip console formatting tags
$clean = (string)preg_replace('/<\/?[a-z]+>/', '', $msg);
$this->capturedBytes += strlen($clean);
$this->outputLog[] = [
'level' => $level,
'message' => $clean,
'time' => time(),
];

if ($this->maxCaptureBytes > 0 && $this->capturedBytes >= $this->maxCaptureBytes) {
break;
}
}
}

/**
* Output at the verbose level.
*
Expand All @@ -34,6 +154,8 @@ public function __construct(ConsoleIo $io) {
* @return int|null The number of bytes returned from writing to stdout.
*/
public function verbose(array|string $message, int $newlines = 1): ?int {
$this->capture('verbose', $message);

return $this->_io->verbose($message, $newlines);
}

Expand All @@ -46,6 +168,8 @@ public function verbose(array|string $message, int $newlines = 1): ?int {
* @return int|null The number of bytes returned from writing to stdout.
*/
public function quiet(array|string $message, int $newlines = 1): ?int {
$this->capture('info', $message);

return $this->_io->quiet($message, $newlines);
}

Expand All @@ -69,6 +193,8 @@ public function quiet(array|string $message, int $newlines = 1): ?int {
* @return int|null The number of bytes returned from writing to stdout.
*/
public function out(array|string $message = '', int $newlines = 1, int $level = ConsoleIo::NORMAL): ?int {
$this->capture('info', $message);

return $this->_io->out($message, $newlines, $level);
}

Expand All @@ -82,6 +208,8 @@ public function out(array|string $message = '', int $newlines = 1, int $level =
* @return int|null The number of bytes returned from writing to stderr.
*/
public function error(array|string $message = '', int $newlines = 1): ?int {
$this->capture('error', $message);

$messages = (array)$message;

return $this->_io->error($messages, $newlines);
Expand Down Expand Up @@ -138,6 +266,8 @@ public function comment(array|string $message = '', int $newlines = 1, int $leve
* @return int|null The number of bytes returned from writing to stderr.
*/
public function warn(array|string $message = '', int $newlines = 1): ?int {
$this->capture('warning', $message);

$messages = (array)$message;
foreach ($messages as $key => $message) {
$messages[$key] = '<warning>' . $message . '</warning>';
Expand Down Expand Up @@ -190,6 +320,8 @@ public function nl(int $multiplier = 1): string {
* @return void
*/
public function hr(int $newlines = 0, int $width = 63): void {
$this->capture('info', str_repeat('-', $width));

$this->_io->hr($newlines, $width);
}

Expand All @@ -207,7 +339,7 @@ public function hr(int $newlines = 0, int $width = 63): void {
* @return void
*/
public function abort(string $message, int $exitCode = CommandInterface::CODE_ERROR): void {
$this->_io->error($message);
$this->error($message);

throw new StopException($message, $exitCode);
}
Expand Down Expand Up @@ -241,6 +373,8 @@ public function helper(string $name, array $settings = []): Helper {
* @return void
*/
public function overwrite(array|string $message, int $newlines = 1, ?int $size = null): void {
$this->capture('info', $message);

$this->_io->overwrite($message, $newlines, $size);
}

Expand Down
1 change: 1 addition & 0 deletions src/Model/Entity/QueuedJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* @property int $priority
* @property \Queue\Model\Entity\QueueProcess $worker_process
* @property int|null $memory
* @property string|null $output
* @property string|null $headers !
* @property string|null $message !
*/
Expand Down
6 changes: 5 additions & 1 deletion src/Model/Table/QueueProcessesTable.php
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,11 @@ public function terminateProcess(string $pid, int $sig = 0): void {
$killed = posix_kill((int)$pid, $sig);
}
if (!$killed) {
exec('kill -' . $sig . ' ' . $pid);
$safePid = (int)$pid;
$safeSig = (int)$sig;
if ($safePid > 0) {
exec('kill -' . $safeSig . ' ' . $safePid);
}
}
sleep(1);

Expand Down
Loading