Вы должны создать свой собственный кодер и декодер, который будет в основном превратить ваш time.time_struct
объект в нечто сериализуемой (а Словаре), а затем зарегистрировать их в реестре комбу сериализатора, как описано в docs для того, чтобы позволить сельдерей использовать свой новый сериализатор в своей задаче.
import json
import time
import types
import datetime
class FeedContentEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, time_struct):
epoch = int(time.mktime(time_struct))
return {'__type__': '__time__', 'time': epoch}
else:
return json.FeedContentEncoder.default(self, obj)
def decode_feed_content(obj):
if isinstance(obj, types.DictionaryType) and '__type__' in obj:
if obj['__type__'] == '__time__':
return datetime.datetime.fromtimestamp(obj['time']).timetuple()
return obj
Необходимо уведомить kombu о своей новой сериализации, зарегистрировав их в реестре serializer.
from kombu.serialization import register
def feed_content_json_dumps(obj):
return json.dumps(obj, cls=FeedContentEncoder)
def feed_content_json_loads(obj):
return json.loads(obj, object_hook=decode_feed_content)
register('feedcontentjson',
feed_content_json_dumps,
feed_content_json_loads,
content_type='application/x-feedcontent-json',
content_encoding='utf-8')
Наконец, следует сказать, сельдерей использовать новый сериалайзер для сериализации задачи так же, как в celery docs; вы должны вызвать свою задачу с параметром serializer
.
parse_link.apply_async(args=[content, link, json_id, news_category], queue= news_source, serializer='feedcontentjson')
Надеюсь, что это поможет.
Я сделал файл 'serializer.py' в том же каталоге, где был помещен' celeryconfig.py'. Где я могу зарегистрировать свою новую сериализацию? В каком файле? и как сельдерей узнает о моей новой сериализации? Как подключить эти файлы? – PythonEnthusiast
Я решил это, пробрав объект Feedparser. В любом случае спасибо. :) – PythonEnthusiast