В Tirebase около 15 сервисов на FastAPI, каталог на ~250K SKU. Большая часть бэкенда — асинхронный Python. За два года я нашёл несколько проблем с asyncio, которые не описаны в туториалах и которые стоили мне часов отладки на проде. Ниже — четыре неочевидных случая, каждый с контекстом и решением.
Исчерпание пула соединений под нагрузкой
Эта проблема проявилась не сразу. У нас есть сервис, который при обновлении каталога ходит к нескольким поставщикам за ценами. При полном обновлении это ~250K запросов.
aiohttp.ClientSession по умолчанию использует TCPConnector с лимитом в 100 одновременных соединений. Когда ты запускаешь тысячи корутин, каждая из которых делает HTTP-запрос — большинство стоит в очереди. А если каждый запрос длится 2 секунды, последние в очереди ждут минуты.
# ПРОБЛЕМА: тысячи корутин, 100 соединений
async def fetch_all_prices(product_ids: list[str]):
async with aiohttp.ClientSession() as session:
tasks = [fetch_price(session, pid) for pid in product_ids]
return await asyncio.gather(*tasks) # всё разом
Решение: явно ограничивать параллелизм через asyncio.Semaphore:
async def fetch_all_prices(product_ids: list[str], max_concurrent: int = 50):
semaphore = asyncio.Semaphore(max_concurrent)
connector = aiohttp.TCPConnector(limit=max_concurrent)
async def fetch_with_limit(session, pid):
async with semaphore:
return await fetch_price(session, pid)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [fetch_with_limit(session, pid) for pid in product_ids]
return await asyncio.gather(*tasks)
Отдельная история — пул соединений к PostgreSQL через asyncpg. У нас пул исчерпывался из-за долгих транзакций при генерации отчётов. Мониторинг показывал рост pool_size до максимума, а потом — таймауты для всех остальных запросов. Решение: statement_timeout на уровне пула и алерт на pool.get_size() > pool.get_max_size() * 0.8. Не уверен, что это лучший подход — возможно, стоило вынести отчёты в отдельный сервис с собственным пулом, но пока работает.
Event loop блокируется JSON-сериализацией
Этот кейс нашёлся не сразу. У нас был эндпоинт, который возвращал весь каталог (порядка 250K позиций) в JSON для внутреннего сервиса синхронизации. Стандартный json.dumps() — синхронная операция, и на таком объёме данных она занимала сотни миллисекунд. На это время event loop замирал полностью — все остальные запросы вставали в очередь.
# ПРОБЛЕМА: блокировка event loop на сериализации
async def get_catalog(request):
products = await db.fetch_all_products()
return web.json_response(products) # json.dumps внутри
Решение: перейти на orjson и для больших ответов — сериализовать в отдельном потоке:
import orjson
async def get_catalog(request):
products = await db.fetch_all_products()
body = await asyncio.to_thread(orjson.dumps, products)
return web.Response(body=body, content_type="application/json")
На нашем каталоге orjson отработал примерно в 6-8 раз быстрее стандартного json — точные цифры зависели от структуры данных, но разница была ощутимой. Ретроспективно, стоило с самого начала не отдавать весь каталог одним запросом, а сделать пагинацию. Но legacy есть legacy.
Graceful shutdown — это сложнее, чем кажется
Ctrl+C или SIGTERM от Docker — и твоё приложение должно корректно завершить все текущие запросы, закрыть соединения с БД, дождаться фоновых задач. В асинхронном мире это нетривиально, и документация по shutdown не покрывает реальных сценариев.
Первая версия моего shutdown-хендлера просто вызывала loop.stop(). Результат: потерянные транзакции, незакрытые соединения, corrupted данные в Redis.
async def graceful_shutdown(app):
# 1. Перестаём принимать новые запросы
app["accepting_requests"] = False
# 2. Ждём завершения текущих запросов (максимум 30 сек)
if app["active_requests"]:
await asyncio.wait_for(
app["all_requests_done"].wait(),
timeout=30.0
)
# 3. Завершаем фоновые задачи
for task in app["background_tasks"]:
task.cancel()
await asyncio.gather(*app["background_tasks"], return_exceptions=True)
# 4. Закрываем пулы соединений
await app["db_pool"].close()
await app["redis"].close()
Порядок здесь критичен. Если закрыть пул БД до завершения активных запросов — получишь ошибки. Если не отменить фоновые задачи — процесс зависнет. Я потратил полдня, выстраивая правильную последовательность, и до сих пор не уверен, что покрыл все edge cases. Например, что если фоновая задача сама держит соединение к БД и не реагирует на cancel? У нас такое было с задачей импорта прайсов.
asyncio.gather vs TaskGroup: не всё однозначно
asyncio.gather() по умолчанию продолжает выполнять задачи, даже если одна упала с исключением. Это значит, что ты получишь результат через минуту, хотя ошибка произошла на первой секунде. Остальные корутины отработают зря.
# ПРОБЛЕМА: если первый запрос падает, остальные 99 всё равно выполняются
results = await asyncio.gather(*[fetch(url) for url in urls])
В Python 3.11+ появился TaskGroup, который отменяет все задачи при первой ошибке:
# РЕШЕНИЕ: TaskGroup отменяет всё при первой ошибке
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch(url)) for url in urls]
results = [t.result() for t in tasks]
Но и TaskGroup не панацея. Он бросает ExceptionGroup, который нужно обрабатывать через except* (Python 3.11+). Это ломает совместимость со старым кодом и библиотеками. У нас часть кода ещё на gather с return_exceptions=True, часть — на TaskGroup, и единого подхода пока нет. Возможно, стоит мигрировать всё на TaskGroup, но это рефакторинг, до которого руки пока не дошли.
Отдельно стоит упомянуть anyio — библиотеку, которая предоставляет единый API поверх asyncio и trio, включая task groups с более предсказуемым поведением. Мы её не используем, но слышал хорошие отзывы от знакомых разработчиков.
Итог
Асинхронный Python — мощный инструмент, но дьявол в деталях. Каждый из этих случаев стоил мне от нескольких часов до дня отладки. Общий паттерн: asyncio отлично работает, пока нагрузка небольшая. Проблемы начинаются, когда данных становится много — большие JSON, тысячи соединений, долгие транзакции. Главное правило: если latency растёт нелинейно — ищи, что блокирует event loop.