У меня есть скрипт, который получает список узлов в качестве аргумента (может быть 10 или даже 50) и подключается к каждому из SSH для запуска команды перезапуска службы. В настоящее время я использую многопроцессорную обработку, чтобы распараллелить скрипт (получив размер партии как аргумент), однако я слышал, что этот модуль потоков может помочь мне с выполнением моих задач быстрее и проще управлять (я использую try..except
KeyboardInterrupt с sys.exit()
и pool.terminate()
, но он не остановит весь скрипт, потому что это другой процесс). Поскольку я понимаю, что многопоточность более легка и удобна в управлении для моего случая, я пытаюсь преобразовать свой сценарий, чтобы использовать потоки вместо многопроцессорности, но он не работает должным образом.Несколько SSH-подключений в скрипте Python 2.7. Многопроцессорное взаимодействие с потоком
Текущий код в многопроцессорной (работы):
def restart_service(node, initd_tup):
"""
Get a node name as an argument, connect to it via SSH and run the service restart command..
"""
command = 'service {0} restart'.format(initd_tup[node])
logger.info('[{0}] Connecting to {0} in order to restart {1} service...'.format(node, initd_tup[node]))
try:
ssh.connect(node)
stdin, stdout, stderr = ssh.exec_command(command)
result = stdout.read()
if not result:
result_err = stderr.read()
print '{0}{1}[{2}] ERROR: {3}{4}'.format(Color.BOLD, Color.RED, node, result_err, Color.END)
logger.error('[{0}] Result of command {1} output: {2}'.format(node, command, result_err))
else:
print '{0}{1}{2}[{3}]{4}\n{5}'.format(Color.BOLD, Color.UNDERLINE, Color.GREEN, node, Color.END, result)
logger.info('[{0}] Result of command {1} output: {2}'.format(node, command, result.replace("\n", "... ")))
ssh.close()
except paramiko.AuthenticationException:
print "{0}{1}ERROR! SSH failed with Authentication Error. Make sure you run the script as root and try again..{2}".format(Color.BOLD, Color.RED, Color.END)
logger.error('SSH Authentication failed, thrown error message to the user to make sure script is run with root permissions')
pool.terminate()
except socket.error as error:
print("[{0}]{1}{2} ERROR! SSH failed with error: {3}{4}\n".format(node, Color.RED, Color.BOLD, error, Color.END))
logger.error("[{0}] SSH failed with error: {1}".format(node, error))
except KeyboardInterrupt:
pool.terminate()
general_utils.terminate(logger)
def convert_to_tuple(a_b):
"""Convert 'f([1,2])' to 'f(1,2)' call."""
return restart_service(*a_b)
def iterate_nodes_and_call_exec_func(nodes_list):
"""
Iterate over the list of nodes to process,
create a list of nodes that shouldn't exceed the batch size provided (or 1 if not provided).
Then using the multiprocessing module, call the restart_service func on x nodes in parallel (where x is the batch size).
If batch_sleep arg was provided, call the sleep func and provide the batch_sleep argument between each batch.
"""
global pool
general_utils.banner('Initiating service restart')
pool = multiprocessing.Pool(10)
manager = multiprocessing.Manager()
work = manager.dict()
for line in nodes_list:
work[line] = general_utils.get_initd(logger, args, line)
if len(work) >= int(args.batch):
pool.map(convert_to_tuple, itertools.izip(work.keys(), itertools.repeat(work)))
work = {}
if int(args.batch_sleep) > 0:
logger.info('*** Sleeping for %d seconds before moving on to next batch ***', int(args.batch_sleep))
general_utils.sleep_func(int(args.batch_sleep))
if len(work) > 0:
try:
pool.map(convert_to_tuple, itertools.izip(work.keys(), itertools.repeat(work)))
except KeyboardInterrupt:
pool.terminate()
general_utils.terminate(logger)
А вот то, что я пытался, чтобы с Threading, которая не работает (когда я назначить batch_size больше 1, то сценарий просто застревает, и я должен убить его силой
def parse_args():
"""Define the argument parser, and the arguments to accept.."""
global args, parser
parser = MyParser(description=__doc__)
parser.add_argument('-H', '--host', help='List of hosts to process, separated by "," and NO SPACES!')
parser.add_argument('--batch', help='Do requests in batches', default=1)
args = parser.parse_args()
# If no arguments were passed, print the help file and exit with ERROR..
if len(sys.argv) == 1:
parser.print_help()
print '\n\nERROR: No arguments passed!\n'
sys.exit(3)
def do_work(node):
logger.info('[{0}]'.format(node))
try:
ssh.connect(node)
stdin, stdout, stderr = ssh.exec_command('hostname ; date')
print stdout.read()
ssh.close()
except:
print 'ERROR!'
sys.exit(2)
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
def iterate():
for item in args.host.split(","):
q.put(item)
for i in range(int(args.batch)):
t = Thread(target=worker)
t.daemon = True
t.start()
q.join()
def main():
parse_args()
try:
iterate()
except KeyboardInterrupt:
exit(1)
в журнале сценария я вижу ПРЕДУПРЕЖДЕНИЕ порожденную Paramiko, как показано ниже:.
2016-01-04 22:51:37,613 WARNING: Oops, unhandled type 3
Я попытался выполнить эту ошибку unhandled type 3
, но не нашел ничего, связанного с моей проблемой, поскольку речь идет о двухфакторной аутентификации или попытке подключения через оба пароля и SSH-ключ одновременно, но я только загружая ключи хоста без предоставления пароля SSH-клиенту.
Я был бы признателен за любую помощь по этому вопросу ..
Выглядит хорошо :) Обратите внимание, что параллельно-SSH является асинхронным и не использует мульти обработки, ни много поточность. Если в коде есть потоки или процессы, они будут преобразованы в асинхронные запросы gevent (используется parallel-ssh). Они могут быть удалены из вашего кода, поскольку они больше не нужны. Если я могу сделать предложение, нужно только проверить код выхода после завершения вывода, а не на каждой строке вывода. – danny