Отложенное выполнение кода и очереди в PHP

Отложенное выполнение кода и использование очередей в PHP являются ключевыми компонентами высокопроизводительных веб-приложений, которые обрабатывают тяжелые или отложенные задачи без блокирования основного потока выполнения. Изучим пример класса очереди на PHP,  и разберем преимущества такого подхода.

Анализ класса Queue

Класс Queue, который использует подход к управлению задачами через очередь. Основные методы класса:
  • add($function, $array_vars, $execute_at): Добавляет задачу в очередь с возможностью задать время выполнения.
  • run(): Выполняет следующую задачу в очереди в зависимости от установленного времени выполнения.
<?php

namespace App\Task;

use App\Connect;
use Opis\Closure\SerializableClosure;

class Queue
{
    /**
     * Adds a function and its variables to the queue for execution at a specified time.
     *
     * @param object $function The function to be executed.
     * @param array $array_vars The array of variables to be passed to the function.
     * @param int $execute_at The timestamp indicating when the function should be executed. Defaults to the current time.
     * @return int The ID of the inserted record in the queue table.
     */
    public static function add($function, $array_vars, $execute_at = 0)
    {
        $serialize = serialize([
            'function' => new SerializableClosure($function),
            'array_vars' => $array_vars
        ]);

        $execute_at = date('Y-m-d H:i:s', $execute_at ?: time());
        $dbh = Connect::getInstance();
        $sth = $dbh->prepare("INSERT INTO queue (serialize, execute_at) VALUES (:serialize, :execute_at)");
        $sth->bindValue(':serialize', $serialize);
        $sth->bindValue(':execute_at', $execute_at);
        $sth->execute();
        return $dbh->lastInsertId();
    }


    /**
     * Run the next queued function based on the scheduled execution time.
     *
     * @throws \Exception When an error occurs during function execution.
     */
    public static function run()
    {
        $dbh = Connect::getInstance();
        $dbh->beginTransaction();
        try {
            $sth = $dbh->prepare("SELECT * FROM queue WHERE execute_at <= NOW() ORDER BY execute_at ASC LIMIT 1 FOR UPDATE");
            $sth->execute();
            $row = $sth->fetch();

            if ($row) {
                $sth = $dbh->prepare("DELETE FROM queue WHERE id = :id");
                $sth->bindValue(':id', $row->id);
                $sth->execute();
            }

            $dbh->commit();

            if ($row) {
                $data = unserialize($row->serialize);
                $data['function'] = $data['function']->getClosure();
                $data['function']($data['array_vars']);
            }
        } catch (\Exception $e) {
            $dbh->rollBack();
            throw $e;
        }
    }
}

Структура таблицы queue

CREATE TABLE `queue` (
`id` int(11) UNSIGNED NOT NULL,
`serialize` mediumtext NOT NULL,
`execute_at` datetime NOT NULL DEFAULT current_timestamp()
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;


— Индексы сохранённых таблиц


— Индексы таблицы `queue`

ALTER TABLE `queue`
ADD PRIMARY KEY (`id`),
ADD KEY `execute_at` (`execute_at`);


— AUTO_INCREMENT для сохранённых таблиц


— AUTO_INCREMENT для таблицы `queue`

ALTER TABLE `queue`
MODIFY `id` int(11) UNSIGNED NOT NULL AUTO_INCREMENT;

Структура таблицы базы данных MySQL queue, как указано, предоставляет основу для хранения сериализованных задач и времени их выполнения.

Преимущества данного метода очередей

1. Асинхронная обработка задач: Позволяет выполнить код в фоновом режиме без блокировки пользовательского интерфейса или основного потока выполнения.
2. Точное планирование выполнения: Задачи могут быть запланированы для выполнения в конкретное время, что позволяет обрабатывать их в наиболее подходящие моменты, например, в часы наименьшей нагрузки на сервер.
3. Расширяемость: Можно легко масштабировать систему, добавляя дополнительные рабочие процессы для обработки задач в очереди.

Сравнение с сериализацией и выполнением через eval

Сериализация кода с последующим выполнением через eval может быть опасной, так как eval может выполнить любой переданный код, что открывает двери для потенциальных уязвимостей. Использование очередей избавляет от необходимости использовать eval, повышая безопасность приложения.

Пример использования:

Добавляем задачу в стек

\App\Task\Queue::add(function ($array) {
    $dbh = \App\Connect::getInstance();
    $sth = $dbh->prepare("INSERT INTO messages (user_id, role, thread_id, text) VALUES (:user_id, :role, :thread_id, :text)");
    $sth->bindValue(':user_id', $array[0]);
    $sth->bindValue(':role', 'user');
    $sth->bindValue(':thread_id', $array[1]);
    $sth->bindValue(':text', $array[2]);
    $sth->execute();
    return true;
}, [$user->id, $_POST['thread_id'], $_POST['text']]);
Вызываем выполнение задач:
\App\Task\Queue::run();

Оставить ответ

Ваш адрес email не будет опубликован. Обязательные поля помечены *