Я пишу модульные тесты для задания сельдерея, используя django-нос. Это довольно типично; пустую тестовую базу данных (REUSE_DB = 0), которая предварительно заполняется через прибор во время тестирования.django-носовой блок, проверяющий задачу сельдерея ... отсутствующие данные базы данных
Проблема заключается в том, что даже несмотря на то, что TestCase загружает прибор и я могу получить доступ к объектам из метода тестирования, тот же запрос выходит из строя при выполнении задачи async celery.
Я проверил, что настройки.DATABASES ["default"] ["name"] одинаковы как в методе тестирования, так и в тестируемой задаче. Я также подтвердил, что задача, которая находится под тестом, ведет себя корректно при вызове как обычный вызов метода.
И это о том, где я из идей.
Вот пример:
class MyTest(TestCase):
fixtures = ['test_data.json']
def setUp(self):
settings.CELERY_ALWAYS_EAGER = True # seems to be required; if not I get socket errors for Rabbit
settings.CELERY_EAGER_PROPAGATES_EXCEPTIONS = True # exposes errors in the code under test.
def test_city(self):
self.assertIsNotNone(City.objects.get(name='brisbane'))
myTask.delay(city_name='brisbane').get()
# The following works fine: myTask('brisbane')
from celery.task import task
@task()
def myTask(city_name):
c = City.objects.count() # gives 0
my_city = City.objects.get(name=city_name) # raises DoesNotExist exception
return
Какая база данных и какая версия сельдерея/джанго-сельдерея вы используете? –