2016-10-14 2 views
0

Итак, я пишу бесплатный сервер задач python для Autodesk Maya, который имеет очередь из x числа «работников». В любой момент сервер может принять «задачу» и бросить эту задачу в очередь, в которую проваливаются рабочие.Многопоточный сервер python (mp.Pool) с очередью задач

Из очереди каждый работник получает «taskDict», который является словарем, отправленным на сервер, где говорится, что файл майя, и какой код запускается при открытии безгласного приложения Maya (mayapy.exe/standalone)

Я переписал это много раз, сначала используя свою собственную систему очередей, но потом решил использовать python. Затем, используя пул, используя Queue.Queue, используя mp.Manager.Queue и пул и т. Д. Мне трудно найти примеры простого многопоточного сервера, который получает информацию и запускает поток, но использует очередь для того, когда это получает слишком много запросов.

Я просто принципиально не понимаю, как размещать информацию в очереди, и у вас есть очередь mp.pool в очереди, начиная с процессов apply_async, которые используют эти данные и сообщают очередь, когда она будет завершена.

Вот текущее состояние кода:

import tempfile 
import os 
import subprocess 
import threading 
import multiprocessing as mp 
import socket 
import sys 

from PySide import QtGui, QtCore 

import serverUtils 

selfDirectory = os.path.dirname(__file__) 
uiFile = selfDirectory + '/server.ui' 
if os.path.isfile(uiFile): 
    form_class, base_class = serverUtils.loadUiType(uiFile) 
else: 
    print('Cannot find UI file: ' + uiFile) 


def show(): 
    global mayaTaskServerWindow 
    try: 
     mayaTaskServerWindow.close() 
    except: 
     pass 

     mayaTaskServerWindow = mayaTaskServer() 
     mayaTaskServerWindow.show() 
    return mayaTaskServerWindow 

class MayaTaskServer(base_class, form_class): 
    refreshSignal = QtCore.Signal() 

    def __init__(self): 
     super(MayaTaskServer, self).__init__() 

     self.setupUi(self) 

     self.mainJobServer = None 
     self.mpPool = None 
     self.manager = None 
     self.q = None 

     self.workerDict = {} 

     self.refreshSignal.connect(self.refreshTree) 
     self.startLocalCoresBTN.clicked.connect(self.startLocalCoresFn) 
     self.killLocalCoresBTN.clicked.connect(self.killLocalCoresFn) 
     self.jobTree.setContextMenuPolicy(QtCore.Qt.CustomContextMenu) 
     self.jobTree.customContextMenuRequested.connect(self.openMenu) 

     self.startJobServer(6006) 
     self.startQueue() 

     # set the default temp folder 
     filepath = os.path.realpath(__file__) 
     self.localTempFolderEDT.setText(filepath.replace(filepath.split('\\')[-1], '')) 

    ## JOB SYSTEM 
    #################################################################### 

    class MayaWorker(object): 
     def __init__(self, host, port, cpuID): 
      self.host = host 
      self.port = port 
      self.location = None 
      self.cpuID = cpuID 

      self.location = self.host 

      self.busy = False 
      self.task = None 
      self.taskHistory = {} 

     def runTask(self, task): 
      print 'starting task - ', self.task['description'] 
      self.busy = True 
      serverUtils.spawnMaya(task) 
      win.refreshSignal.emit() 

     def taskComplete(self, arg): 
      self.busy = False 
      self.task = None 
      self.mayaFile = None 
      win.refreshSignal.emit() 

    def bootUpLocalWorkers(self, numProcs): 
     self.mpPool = mp.Pool(processes=numProcs) 
     for i in range(0, numProcs): 
      mw = self.MayaWorker('localhost', 6006, i) 
      win.mpPool.apply_async(mw, args=(win.q)) 
      win.workerDict['CPU_' + str(i)] = mw 

    ## USER INTERFACE 
    #################################################################### 

    #UI code here you don't care about 

    ## JOB LISTENER/SERVER/QUEUE 
    #################################################################### 
    class JobServer(threading.Thread): 
     def __init__(self, port): 
      threading.Thread.__init__(self) 
      self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
      self.server_socket.bind(('localhost', port)) 
      self.server_socket.listen(5) 

      self.port = port 
      self.running = True 

      self.mpPool = None 

     def addToQueue(self, task): 
      #add to queue 
      win.q.put(task, timeout=1000) 

      #update UI 
      wid1 = QtGui.QTreeWidgetItem() 
      wid1.setText(0, str(task)) 
      win.queTree.addTopLevelItem(wid1) 

     def run(self, debug=1): 
      print 'Starting Task Server @' + socket.gethostbyname(socket.gethostname()) + ':' + str(self.port) 
      while self.running: 
       client_socket, address = self.server_socket.accept() 
       ip = str(address[0]) 
       data = client_socket.recv(512) 
       if 'runTask' in data: 
        taskDict = eval(data.split(' >> ')[-1]) 
        print 'SERVER>> Received task:', str(taskDict) 
        self.addToQueue(taskDict) 

    class TaskQueueServer(threading.Thread): 
     def __init__(self): 
      q = self.q_in 
      while True: 
       if self.q_in: 
        worker = win.findLocalWorker() 
        if worker: 
         taskDict = self.q_in[0] 
         worker.task = taskDict 
         worker.startTask() 
         self.q_in.pop[0] 


    def startJobServer(self, port): 
     self.mainJobServer = self.JobServer(port) 
     self.mainJobServer.start() 

    def startQueue(self): 
     self.manager = mp.Manager() 
     self.q = self.manager.Queue() 


if __name__ == "__main__": 
    app = QtGui.QApplication(sys.argv) 
    win = MayaTaskServer() 
    win.show() 
    sys.exit(app.exec_()) 

ответ

0

Так вот как я это сделал. Очень простое, прагматичное решение.

У меня есть метод под названием «finaLocalWorker», вы можете видеть, что рабочий класс может быть отмечен как «занятый». Если рабочий не занят, ему отправляется поступающая задача.

Если все рабочие заняты, то входящая задача добавляется в простой список под названием self.q.

Когда работник завершает задачу, mpPool.apply_async имеет обратный вызов, который запускает метод taskComplete. Этот метод говорит: «если self.q, возьмите элемент [0] в списке и поп (удалите) его. Else отмечают, что я не занят ».

Это позволяет переполнять входящие запросы, такие как партия из 500 анимаций, помещаться в очередь в списке задач, но также сервер не может получать никаких заданий в течение некоторого времени и сразу же работать над любой задачей, которая приходит.

Я поставлю окончательный код на github.

 Смежные вопросы

  • Нет связанных вопросов^_^