Найти - Пользователи
Полная версия: Как запустить celery таску в pytest
Начало » Django » Как запустить celery таску в pytest
1
Headmaster
Есть условная celery таска

 class CreateNotifications(Task):
    name = 'create-notifications'
 
    def run(self,  *args, **kwargs):
        # код таски

Хочу запустить её в тестах (не замокать, а именно запустить) в синхронном режиме, как обычную функцию, чтобы не требовался redis или другой брокер.

Как это сделать?

Думал использовать вот такую штуку https://docs.celeryq.dev/en/v5.2.3/userguide/testing.html#celery-worker-embed-live-worker

 import pytest
 
@pytest.mark.django_db
class TestClass:
    def test_celery_notification_create(self, celery_worker, user):
        # some test logic

но получаю ошибку psycopg2.InterfaceError: connection already closed

В чём может быть проблема?
VadimK
celery задача запускается через .delay() или .async()
https://docs.celeryq.dev/en/stable/userguide/tasks.html

 @app.task
def add(x, y):
    return x + y

Вот запуск как задачи
 add.delay(8, 8)

А вот запуск как обычной функции.
 add(8, 8)
Headmaster
VadimK
celery задача запускается через .delay() или .async() https://docs.celeryq.dev/en/stable/userguide/tasks.html

Так это если таска написана как функция, тут я и сам знаю как делать. А если таска это класс? (пример есть у меня в певром посте)
VadimK
А разве класс не запускается точно таким же образом ? Никогда на классах не делал. Сейчас поискал в гугле, вижу что запуск так же идет как и у функций. Только указывается сам класс а не run()
https://jsatt.com/blog/class-based-celery-tasks/
Headmaster
VadimK
https://jsatt.com/blog/class-based-celery-tasks/
Не запускается. Сам когда гуглил, находил эту статью и ничего

Я глянул исходники celery.Task - запуска не через брокер не нашёл. Либо плохо искал, либо на самом деле нету.

Касательно pytest - там есть несколько fixtures для работы с celery. Но завести их не получилось. Поэтому я и здесь
VadimK
Должно запуститься как :
 cli = CreateNotifications()
cli.run()

Celery же наверное не постгрес в качестве брокера использует. Т.е. ошибка о соединение - это куда то в другую степь. Как вариант всю работу с базой закоментировать и проверить на простых print выводах.
This is a "lo-fi" version of our main content. To view the full version with more information, formatting and images, please click here.
Powered by DjangoBB