Анализ класса Queue
<?php
namespace App\Task;
use App\Connect;
use Opis\Closure\SerializableClosure;
class Queue
{
/**
* Adds a function and its variables to the queue for execution at a specified time.
*
* @param object $function The function to be executed.
* @param array $array_vars The array of variables to be passed to the function.
* @param int $execute_at The timestamp indicating when the function should be executed. Defaults to the current time.
* @return int The ID of the inserted record in the queue table.
*/
public static function add($function, $array_vars, $execute_at = 0)
{
$serialize = serialize([
'function' => new SerializableClosure($function),
'array_vars' => $array_vars
]);
$execute_at = date('Y-m-d H:i:s', $execute_at ?: time());
$dbh = Connect::getInstance();
$sth = $dbh->prepare("INSERT INTO queue (serialize, execute_at) VALUES (:serialize, :execute_at)");
$sth->bindValue(':serialize', $serialize);
$sth->bindValue(':execute_at', $execute_at);
$sth->execute();
return $dbh->lastInsertId();
}
/**
* Run the next queued function based on the scheduled execution time.
*
* @throws \Exception When an error occurs during function execution.
*/
public static function run()
{
$dbh = Connect::getInstance();
$dbh->beginTransaction();
try {
$sth = $dbh->prepare("SELECT * FROM queue WHERE execute_at <= NOW() ORDER BY execute_at ASC LIMIT 1 FOR UPDATE");
$sth->execute();
$row = $sth->fetch();
if ($row) {
$sth = $dbh->prepare("DELETE FROM queue WHERE id = :id");
$sth->bindValue(':id', $row->id);
$sth->execute();
}
$dbh->commit();
if ($row) {
$data = unserialize($row->serialize);
$data['function'] = $data['function']->getClosure();
$data['function']($data['array_vars']);
}
} catch (\Exception $e) {
$dbh->rollBack();
throw $e;
}
}
}
Структура таблицы queue
CREATE TABLE `queue` (
`id` int(11) UNSIGNED NOT NULL,
`serialize` mediumtext NOT NULL,
`execute_at` datetime NOT NULL DEFAULT current_timestamp()
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
—
— Индексы сохранённых таблиц
—
—
— Индексы таблицы `queue`
—
ALTER TABLE `queue`
ADD PRIMARY KEY (`id`),
ADD KEY `execute_at` (`execute_at`);
—
— AUTO_INCREMENT для сохранённых таблиц
—
—
— AUTO_INCREMENT для таблицы `queue`
—
ALTER TABLE `queue`
MODIFY `id` int(11) UNSIGNED NOT NULL AUTO_INCREMENT;
Преимущества данного метода очередей
Сравнение с сериализацией и выполнением через eval
Пример использования:
Добавляем задачу в стек
\App\Task\Queue::add(function ($array) {
$dbh = \App\Connect::getInstance();
$sth = $dbh->prepare("INSERT INTO messages (user_id, role, thread_id, text) VALUES (:user_id, :role, :thread_id, :text)");
$sth->bindValue(':user_id', $array[0]);
$sth->bindValue(':role', 'user');
$sth->bindValue(':thread_id', $array[1]);
$sth->bindValue(':text', $array[2]);
$sth->execute();
return true;
}, [$user->id, $_POST['thread_id'], $_POST['text']]);
\App\Task\Queue::run();