В Tirebase (5 точек в Петербурге, каталог ~250К SKU) нужно было отправлять уведомления менеджерам, генерировать PDF-каталоги и обновлять поисковый индекс --- всё асинхронно, в фоне. Стандартный ответ --- Celery. Я поставил Celery, настроил RabbitMQ, написал таски. И через неделю понял, что для нашего масштаба это перебор.
Почему Celery не подошёл
Celery — мощный инструмент. Для крупных проектов с десятками типов задач, приоритетами, расписаниями, цепочками — он незаменим. Но для нашего случая он был как кувалда для гвоздя:
- RabbitMQ как зависимость. Ещё один сервис, который нужно мониторить, обновлять и понимать. У меня уже был Redis для кеширования --- зачем второй брокер?
- Магия конфигурации.
CELERY_TASK_SERIALIZER,CELERY_RESULT_BACKEND,CELERY_ACCEPT_CONTENT,CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP… Десятки настроек, каждая из которых может сломать поведение неочевидным образом. - Отладка. Задача упала --- где логи? В worker-процессе. Какой из четырёх? Зависит от prefork/eventlet/gevent. Результат где? В result backend. Или потерялся, если бэкенд не настроен.
- Overhead. Для трёх типов задач Celery добавлял ощутимый расход RAM (worker + RabbitMQ) и усложнял деплой.
У меня и так был Redis. Почему бы не использовать его напрямую?
Redis Streams: основы
Redis Streams --- структура данных, появившаяся в Redis 5.0, специально для задач типа event streaming и message queues. В отличие от LPUSH/BRPOP (который тоже можно использовать как очередь), Streams поддерживают:
- Consumer groups — несколько воркеров читают из одного стрима, каждое сообщение доставляется только одному
- Acknowledgment — сообщение считается обработанным только после явного подтверждения
- Pending entries — необработанные сообщения можно перечитать после падения воркера
- Персистентность — сообщения хранятся даже после чтения
Producer: отправка задач
import redis
import json
import uuid
from datetime import datetime
class TaskProducer:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url, decode_responses=True)
self.stream = "task_queue"
def send_task(
self,
task_type: str,
payload: dict,
max_retries: int = 3,
) -> str:
task_id = str(uuid.uuid4())
message = {
"task_id": task_id,
"task_type": task_type,
"payload": json.dumps(payload),
"created_at": datetime.utcnow().isoformat(),
"max_retries": str(max_retries),
"retry_count": "0",
}
# MAXLEN ~ 10000 — ограничиваем размер стрима
self.redis.xadd(self.stream, message, maxlen=10000)
return task_id
# Использование
producer = TaskProducer()
producer.send_task("send_email", {
"to": "user@example.com",
"template": "order_confirmation",
"order_id": 12345,
})
producer.send_task("generate_pdf", {
"report_type": "monthly",
"period": "2025-12",
})
XADD добавляет сообщение в стрим с автоматическим ID (timestamp + sequence). MAXLEN ~ — приблизительное ограничение длины стрима, чтобы Redis не рос бесконечно. Тильда означает «приблизительно» — Redis удаляет записи блоками, что эффективнее точного ограничения.
Consumer: обработка задач
import signal
import traceback
from typing import Callable
class TaskConsumer:
def __init__(
self,
redis_url: str = "redis://localhost:6379",
group: str = "workers",
consumer_name: str = None,
):
self.redis = redis.from_url(redis_url, decode_responses=True)
self.stream = "task_queue"
self.group = group
self.consumer = consumer_name or f"worker-{uuid.uuid4().hex[:8]}"
self.handlers: dict[str, Callable] = {}
self.running = True
self._ensure_group()
def _ensure_group(self):
"""Создаём consumer group, если она ещё не существует"""
try:
self.redis.xgroup_create(self.stream, self.group, id="0", mkstream=True)
except redis.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise # Группа уже существует — нормально
def register(self, task_type: str, handler: Callable):
self.handlers[task_type] = handler
def run(self):
signal.signal(signal.SIGTERM, lambda *_: setattr(self, 'running', False))
signal.signal(signal.SIGINT, lambda *_: setattr(self, 'running', False))
# Сначала обрабатываем pending (незавершённые после краша)
self._process_pending()
# Основной цикл — читаем новые сообщения
while self.running:
messages = self.redis.xreadgroup(
groupname=self.group,
consumername=self.consumer,
streams={self.stream: ">"}, # ">" = только новые
count=10,
block=5000, # Ждём 5 сек если нет сообщений
)
if not messages:
continue
for stream_name, entries in messages:
for msg_id, data in entries:
self._handle_message(msg_id, data)
def _handle_message(self, msg_id: str, data: dict):
task_type = data.get("task_type")
task_id = data.get("task_id")
handler = self.handlers.get(task_type)
if not handler:
print(f"[WARN] Нет обработчика для {task_type}, пропускаю")
self.redis.xack(self.stream, self.group, msg_id)
return
try:
payload = json.loads(data["payload"])
handler(payload)
self.redis.xack(self.stream, self.group, msg_id)
print(f"[OK] {task_type}:{task_id} обработан")
except Exception as e:
retry_count = int(data.get("retry_count", 0))
max_retries = int(data.get("max_retries", 3))
if retry_count < max_retries:
self._retry(data, retry_count)
self.redis.xack(self.stream, self.group, msg_id)
else:
self._send_to_dlq(msg_id, data, str(e))
self.redis.xack(self.stream, self.group, msg_id)
print(f"[DLQ] {task_type}:{task_id} исчерпал попытки")
def _retry(self, data: dict, current_retry: int):
"""Повторная отправка с увеличенным счётчиком"""
data["retry_count"] = str(current_retry + 1)
data["payload"] = data["payload"] # уже строка
self.redis.xadd(self.stream, data, maxlen=10000)
def _send_to_dlq(self, msg_id: str, data: dict, error: str):
"""Dead Letter Queue — для задач, которые не удалось обработать"""
data["error"] = error
data["failed_at"] = datetime.utcnow().isoformat()
data["original_id"] = msg_id
self.redis.xadd("task_queue:dlq", data)
def _process_pending(self):
"""Обрабатываем незавершённые сообщения после рестарта воркера"""
pending = self.redis.xpending_range(
self.stream, self.group, "-", "+", count=100
)
if not pending:
return
print(f"[INFO] Найдено {len(pending)} pending сообщений")
for entry in pending:
msg_id = entry["message_id"]
messages = self.redis.xrange(self.stream, msg_id, msg_id)
if messages:
_, data = messages[0]
self._handle_message(msg_id, data)
Регистрация обработчиков
def handle_email(payload: dict):
send_email(
to=payload["to"],
template=payload["template"],
context=payload,
)
def handle_pdf(payload: dict):
report = generate_report(payload["report_type"], payload["period"])
save_to_s3(report)
def handle_reindex(payload: dict):
elasticsearch.index(payload["index"], payload["document"])
# Запуск воркера
consumer = TaskConsumer()
consumer.register("send_email", handle_email)
consumer.register("generate_pdf", handle_pdf)
consumer.register("reindex", handle_reindex)
consumer.run()
Весь код очереди --- около 120 строк. Без магии, без скрытых настроек. Если задача падает --- я вижу это в обычных логах воркера. Понятно, что это не замена промышленного решения --- тут нет, например, приоритетов или отложенных задач. Но для наших нужд хватает.
Мониторинг
Для мониторинга достаточно нескольких Redis-команд:
def get_queue_stats(r: redis.Redis) -> dict:
stream_info = r.xinfo_stream("task_queue")
groups = r.xinfo_groups("task_queue")
dlq_len = r.xlen("task_queue:dlq")
return {
"queue_length": stream_info["length"],
"pending_total": sum(g["pending"] for g in groups),
"consumers_active": sum(g["consumers"] for g in groups),
"dlq_length": dlq_len,
"oldest_message": stream_info.get("first-entry"),
}
Я прокидываю эти метрики в Prometheus через /metrics эндпоинт FastAPI. Алерт в Grafana: если dlq_length > 0 или pending_total > 100 --- значит, что-то идёт не так.
Когда Celery всё-таки нужен
Я не буду делать вид, что Redis Streams заменяет Celery во всех случаях. Celery обоснован, когда:
- Расписания. Celery Beat — полноценный cron-планировщик с персистентностью. На Redis Streams придётся писать свой.
- Цепочки и группы.
chain(task1.s(), task2.s()),group(task.s(i) for i in range(100))— оркестрация задач в Celery продумана и протестирована. - Canvas. Сложные workflow: chord, map, starmap — всё есть.
- Приоритеты. Нативная поддержка очередей с разными приоритетами.
- Команда. Если в команде 10 разработчиков, все знают Celery — не нужно объяснять самописное решение.
Мой порог: если задач больше 5 типов и нужна оркестрация — беру Celery. Если 2–3 типа задач с простой логикой «отправил — обработал» — Redis Streams.
Итог
Redis Streams — надёжная основа для простых очередей задач. Consumer groups обеспечивают распределение нагрузки между воркерами. Acknowledgment гарантирует, что задача не потеряется при падении воркера. Dead letter queue ловит задачи, которые не удалось обработать.
За несколько месяцев в продакшене очередь работает стабильно. Потерянных задач пока не было. DLQ срабатывал несколько десятков раз --- каждый раз из-за внешних сервисов (SMTP недоступен, S3 timeout). После починки внешнего сервиса задачи из DLQ перекидывались обратно в основной стрим одной командой.
Около 120 строк кода вместо Celery + RabbitMQ. Для простых задач --- этого достаточно. Если Tirebase вырастет до масштаба, где нужна оркестрация задач, я перейду на Celery или arq без сожалений.