У меня возникли проблемы с получением pytest
, чтобы проверить задачу celery
, которая является частью проекта django
. Единственный способ заставить его работать - это импортировать задачу в методе тестирования.pytest/сельдерей - Ошибка: доступ к базе данных не разрешен
# tasks.py
from __future__ import absolute_import
import logging
import django
django.setup() # flake8: noqa
from .celery import app
from .models import Run
from celery.exceptions import TaskError
@app.task(bind=True, throws=(TaskError,))
def get_runs_task(self, testrun_id=None):
...
return True
Работы:
# test_tasks.py
# flake8: noqa
from django.test import override_settings
import pytest
@override_settings(CELERY_ALWAYS_EAGER=True)
@pytest.mark.django_db
def test_get_runs_one():
from ami.tasks import get_runs_task
result = get_runs_task.delay(testrun_id='123456')
assert result.successful()
Отображение Failed: Database access not allowed, use the "django_db" mark...
ошибки:
# test_tasks.py
# flake8: noqa
from django.test import override_settings
import pytest
from ami.tasks import get_runs_task
@override_settings(CELERY_ALWAYS_EAGER=True)
@pytest.mark.django_db
def test_get_runs_one():
result = get_runs_task.delay(testrun_id='123456')
assert result.successful()
Любая помощь будет принята с благодарностью, спасибо.
ОБНОВЛЕНО: (Есть несколько полей, но я для краткости опускаю)
# models.py
from django.db import models
class Run(models.Model):
run_id = models.CharField(
max_length=15,
verbose_name='Run ID')
# test_models.py
from ami.models import Run
from django.core.exceptions import ValidationError
import pytest
@pytest.fixture
def create_good_run():
run = Run.objects.create(
run_id='12345'
return run
@pytest.mark.django_db
def test_inherit_values():
run = create_good_run()
assert run
Я имел import django
и django.setup()
в файл tasks.py
, потому что я получаю django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
Я с тех пор переехал те celery.py
, которые ниже:
# celery.py
from __future__ import absolute_import
import os
import django
django.setup() # flake8: noqa
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'amiproj.settings')
from django.conf import settings # flake8: noqa
app = Celery('ami')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
if __name__ == '__main__':
app.start()
Спасибо, Фрэнк, к сожалению, я все еще получаю ту же ошибку. Я должен добавить, что 'test_models.py' настроен таким же образом, и он работает. Это происходит только при попытке импортировать метод из 'tasks.py'. – mkerins
Hm. Не могли бы вы включить содержимое своих 'models.py' и' test_models.py' в вопрос? Кроме того, вы можете объяснить, почему у вас есть 'django.setup()' в 'task.py'? Обычно это используется для автономного скрипта, который 'tasks.py' на самом деле не должен быть. Может ли ваша настройка/конфигурация Django быть не совсем правильной? Возможно, попробуйте выполнить 'django.setup()' в вашем тесте - может быть, что вызов 'django.setup()' является причиной того, что импорт из модуля 'tasks' фиксирует вещи. –
Привет, Франк, я добавил код. – mkerins