2016-02-07 6 views
1

Я создал простой рабочий процесс SWF, но, похоже, получает несколько уведомлений о новых задачах решения. Я использую boto3 python sdk.Получение дублирующих задач решения из AWS SWF при использовании boto3

Нет хорошего кода примера boto3 swf, который я могу найти, поэтому я начал с образца boto2 по адресу http://boto.cloudhackers.com/en/latest/swf_tut.html.

Я создал мой простой рабочий процесс с помощью этого сценария, который создает домен, рабочий процесс, и одна задача в этом рабочем процессе:

#!/usr/bin/python 

import boto3 
from botocore.exceptions import ClientError 


swf = boto3.client('swf') 

try: 
    swf.register_domain(
    name="surroundiotest-swf", 
    description="Surroundio test SWF domain", 
    workflowExecutionRetentionPeriodInDays="10" 
) 
except ClientError as e: 
    print "Domain already exists: ", e.response.get("Error", {}).get("Code") 

try: 
    swf.register_workflow_type(
    domain="surroundiotest-swf", 
    name="testflow", 
    version="0.1", 
    description="testworkflow", 
    defaultExecutionStartToCloseTimeout="250", 
    defaultTaskStartToCloseTimeout="NONE", 
    defaultChildPolicy="TERMINATE", 
    defaultTaskList={"name": "testflow"} 
) 
    print "testflow created!" 
except ClientError as e: 
    print "Workflow already exists: ", e.response.get("Error", {}).get("Code") 

try: 
    swf.register_activity_type(
    domain="surroundiotest-swf", 
    name="testworker", 
    version="0.1", 
    description="testworker", 
    defaultTaskStartToCloseTimeout="NONE", 
    defaultTaskList={"name": "testflow"} 
) 
    print "testworker created!" 
except ClientError as e: 
    print "Activity already exists: ", e.response.get("Error", {}).get("Code") 

мой рабочий код:

#!/usr/bin/python 

import boto3 
from botocore.client import Config 

botoConfig = Config(connect_timeout=50, read_timeout=70) 
swf = boto3.client('swf', config=botoConfig) 

print "Listening for Worker Tasks" 

while True: 

    task = swf.poll_for_activity_task(
    domain='surroundiotest-swf', 
    taskList={'name': 'testflow'}, 
    identity='worker-1') 

    if 'taskToken' not in task: 
    print "Poll timed out, no new task. Repoll" 

    else: 
    print "New task arrived" 

    swf.respond_activity_task_completed(
     taskToken=task['taskToken'], 
     result='success' 
    ) 

    print "Task Done" 

мой решающий код:

#!/usr/bin/python 

import boto3 
from botocore.client import Config 

botoConfig = Config(connect_timeout=50, read_timeout=70) 
swf = boto3.client('swf', config=botoConfig) 


print "Listening for Decision Tasks" 

while True: 

    newTask = swf.poll_for_decision_task(
    domain='surroundiotest-swf', 
    taskList={'name': 'testflow'}, 
    identity='decider-1', 
    reverseOrder=True) 

    if 'taskToken' not in newTask: 
    print "Poll timed out, no new task. Repoll" 

    elif 'events' in newTask: 

    eventHistory = [evt for evt in newTask['events'] if not evt['eventType'].startswith('Decision')] 
    lastEvent = eventHistory[-1] 

    if lastEvent['eventType'] == 'WorkflowExecutionStarted': 
     print "Dispatching task to worker", newTask['workflowExecution'], newTask['workflowType'] 
     swf.respond_decision_task_completed(
     taskToken=newTask['taskToken'], 
     decisions=[ 
      { 
      'decisionType': 'ScheduleActivityTask', 
      'scheduleActivityTaskDecisionAttributes': { 
       'activityType':{ 
        'name': 'testworker', 
        'version': '0.1' 
        }, 
       'activityId': 'activityid-1001', 
       'input': '', 
       'scheduleToCloseTimeout': 'NONE', 
       'scheduleToStartTimeout': 'NONE', 
       'startToCloseTimeout': 'NONE', 
       'heartbeatTimeout': 'NONE', 
       'taskList': {'name': 'testflow'}, 
      } 
      } 
     ] 
    ) 
     print "Task Dispatched" 
     # print json.dumps(newTask, default=json_serial, sort_keys=True, indent=4, separators=(',', ': ')) 

    elif lastEvent['eventType'] == 'ActivityTaskCompleted': 
     swf.respond_decision_task_completed(
     taskToken=newTask['taskToken'], 
     decisions=[ 
      { 
      'decisionType': 'CompleteWorkflowExecution', 
      'completeWorkflowExecutionDecisionAttributes': { 
       'result': 'success' 
      } 
      } 
     ] 
    ) 
     print "Task Completed!" 

Когда я запускаю эти сценарии и отправляю запрос, решение принимает задание и отправляет его, и рабочий получает его и выполняет его. Однако решающая уведомляется задачи на каждом последующем опросе, так что я вижу выход, как это от моего решающему:

Listening for Decision Tasks 
Poll timed out, no new task. Repoll 
Dispatching task to worker {u'workflowId': u'surroundtest-1001', u'runId': u'23oPrHZ/d9kR43V/hr0ykZCI7Dks/FzLhfDeA9PPWFuPE='} {u'version': u'0.1', u'name': u'testflow'} 
Task Dispatched 
Dispatching task to worker {u'workflowId': u'surroundtest-1001', u'runId': u'23oPrHZ/d9kR43V/hr0ykZCI7Dks/FzLhfDeA9PPWFuPE='} {u'version': u'0.1', u'name': u'testflow'} 
Task Dispatched 
Dispatching task to worker {u'workflowId': u'surroundtest-1001', u'runId': u'23oPrHZ/d9kR43V/hr0ykZCI7Dks/FzLhfDeA9PPWFuPE='} {u'version': u'0.1', u'name': u'testflow'} 
Task Dispatched 
Dispatching task to worker {u'workflowId': u'surroundtest-1001', u'runId': u'23oPrHZ/d9kR43V/hr0ykZCI7Dks/FzLhfDeA9PPWFuPE='} {u'version': u'0.1', u'name': u'testflow'} 
Task Dispatched 
Dispatching task to worker {u'workflowId': u'surroundtest-1001', u'runId': u'23oPrHZ/d9kR43V/hr0ykZCI7Dks/FzLhfDeA9PPWFuPE='} {u'version': u'0.1', u'name': u'testflow'} 
Task Dispatched 

Казалось бы, что SWF не получает уведомление о том, что решающая разослал задачу, и постоянно доставляет задачу решающему. Есть ли какой-то фрагмент настройки, который я делаю неправильно, есть ли какая-то дополнительная информация, которую мне нужно передать SWF?

ответ

1

У вас есть небольшая ошибка. Вы выполняете опрос для решения задач с использованием reverseOrder=True. Из документации API из PollForDecisionTask:

ReverseOrder
При установке в true, возвращает события в обратном порядке. По умолчанию результаты возвращаются в порядке возрастания eventTimestamp событий.

С reverseOrder=True вы получаете самое старое событие, которое всегда WorkflowExecutionStarted. После того, как задача будет выполнена, вы всегда планируете ее снова.

Как правило, вы хотите использовать previousStartedEventId и обрабатывать только новые события, которые произошли после предыдущей задачи решения. Обработка только последней задачи не всегда достаточно, потому что для обработки может потребоваться несколько событий.

+0

Отлично, я не знаю, как я выбрал эту настройку, но вы поставили меня на правильный путь! Большое спасибо – jhludwig

 Смежные вопросы

  • Нет связанных вопросов^_^