class CreateNotifications(Task): name = 'create-notifications' def run(self, *args, **kwargs): # код таски
Хочу запустить её в тестах (не замокать, а именно запустить) в синхронном режиме, как обычную функцию, чтобы не требовался redis или другой брокер.
Как это сделать?
Думал использовать вот такую штуку https://docs.celeryq.dev/en/v5.2.3/userguide/testing.html#celery-worker-embed-live-worker
import pytest @pytest.mark.django_db class TestClass: def test_celery_notification_create(self, celery_worker, user): # some test logic
но получаю ошибку psycopg2.InterfaceError: connection already closed
В чём может быть проблема?