Уведомления

Группа в Telegram: @pythonsu

#1 Март 16, 2022 20:30:07

Headmaster
Зарегистрирован: 2015-11-06
Сообщения: 16
Репутация: +  0  -
Профиль   Отправить e-mail  

Как запустить celery таску в pytest

Есть условная celery таска

 class CreateNotifications(Task):
    name = 'create-notifications'
 
    def run(self,  *args, **kwargs):
        # код таски

Хочу запустить её в тестах (не замокать, а именно запустить) в синхронном режиме, как обычную функцию, чтобы не требовался redis или другой брокер.

Как это сделать?

Думал использовать вот такую штуку https://docs.celeryq.dev/en/v5.2.3/userguide/testing.html#celery-worker-embed-live-worker

 import pytest
 
@pytest.mark.django_db
class TestClass:
    def test_celery_notification_create(self, celery_worker, user):
        # some test logic

но получаю ошибку psycopg2.InterfaceError: connection already closed

В чём может быть проблема?

Офлайн

#2 Март 24, 2022 01:06:24

VadimK
Зарегистрирован: 2013-07-03
Сообщения: 199
Репутация: +  16  -
Профиль   Отправить e-mail  

Как запустить celery таску в pytest

celery задача запускается через .delay() или .async()
https://docs.celeryq.dev/en/stable/userguide/tasks.html

 @app.task
def add(x, y):
    return x + y

Вот запуск как задачи
 add.delay(8, 8)

А вот запуск как обычной функции.
 add(8, 8)

Отредактировано VadimK (Март 24, 2022 01:06:41)

Офлайн

#3 Март 24, 2022 09:39:16

Headmaster
Зарегистрирован: 2015-11-06
Сообщения: 16
Репутация: +  0  -
Профиль   Отправить e-mail  

Как запустить celery таску в pytest

VadimK
celery задача запускается через .delay() или .async() https://docs.celeryq.dev/en/stable/userguide/tasks.html

Так это если таска написана как функция, тут я и сам знаю как делать. А если таска это класс? (пример есть у меня в певром посте)

Офлайн

#4 Март 24, 2022 18:57:22

VadimK
Зарегистрирован: 2013-07-03
Сообщения: 199
Репутация: +  16  -
Профиль   Отправить e-mail  

Как запустить celery таску в pytest

А разве класс не запускается точно таким же образом ? Никогда на классах не делал. Сейчас поискал в гугле, вижу что запуск так же идет как и у функций. Только указывается сам класс а не run()
https://jsatt.com/blog/class-based-celery-tasks/

Офлайн

#5 Март 24, 2022 19:23:14

Headmaster
Зарегистрирован: 2015-11-06
Сообщения: 16
Репутация: +  0  -
Профиль   Отправить e-mail  

Как запустить celery таску в pytest

VadimK
https://jsatt.com/blog/class-based-celery-tasks/
Не запускается. Сам когда гуглил, находил эту статью и ничего

Я глянул исходники celery.Task - запуска не через брокер не нашёл. Либо плохо искал, либо на самом деле нету.

Касательно pytest - там есть несколько fixtures для работы с celery. Но завести их не получилось. Поэтому я и здесь

Офлайн

#6 Март 25, 2022 11:47:14

VadimK
Зарегистрирован: 2013-07-03
Сообщения: 199
Репутация: +  16  -
Профиль   Отправить e-mail  

Как запустить celery таску в pytest

Должно запуститься как :

 cli = CreateNotifications()
cli.run()

Celery же наверное не постгрес в качестве брокера использует. Т.е. ошибка о соединение - это куда то в другую степь. Как вариант всю работу с базой закоментировать и проверить на простых print выводах.

Офлайн

Board footer

Модераторировать

Powered by DjangoBB

Lo-Fi Version