2016-01-13 3 views
1

Я запускаю Hadoop MapReduce и другие команды SSH из скрипта Python с использованием модуля paramiko (код можно увидеть here). После завершения задания MapReduce я запустил шаг getmerge, чтобы получить вывод в текстовый файл.Как запустить шаг окна PSCP cmd в моем скрипте Python

Проблема заключается в том, что я должен открыть окно cmd и запустить PSCP для загрузки файла output.txt из среды HDFS на свой компьютер. Например:

pscp [email protected]:/nfs_home/appers/cnielsen/MROutput_121815_0.txt C:\Users\cnielsen\Desktop\MR_Test 

Как я могу включить этот PSCP шаг в мой сценарий, так что я не должен открыть CMD окно для запуска этого после того, как мой MapReduce и getmerge задачи выполнены? Я хотел бы, чтобы мой скрипт мог выполнить задачу MR, задачу getmerge, а затем автоматически сохранить вывод MR на моем компьютере.

Адрес: code.

ответ

1

Я решил эту проблему со следующим кодом. Хитрость заключалась в том, чтобы использовать модуль scp и импортировать SCPClient. См. Функцию scp_download (ssh) ниже.

Когда задание MapReduce завершается, выполняется команда getmerge, а затем шаг scp_download.

import paramiko 
from scp import SCPClient 
import time 

# Define connection info 
host_ip = 'xx.xx.xx.xx' 
user = 'xxxxxxxx' 
pw = 'xxxxxxxx' 
port = 22 

# Paths 
input_loc = '/nfs_home/appers/extracts/*/*.xml' 
output_loc = '/user/lcmsprod/output/cnielsen/' 
python_path = "/usr/lib/python_2.7.3/bin/python" 
hdfs_home = '/nfs_home/appers/cnielsen/' 
output_log = r'C:\Users\cnielsen\Desktop\MR_Test\MRtest011316_0.txt' 

# File names 
xml_lookup_file = 'product_lookups.xml' 
mapper = 'Mapper.py' 
reducer = 'Reducer.py' 
helper_script = 'Process.py' 
product_name = 'test1' 
output_ref = 'test65' 
target_file = 'test_011416_3.txt' 

# ---------------------------------------------------- 
def createSSHClient(host_ip, port, user, pw): 
    client = paramiko.SSHClient() 
    client.load_system_host_keys() 
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 
    client.connect(host_ip, port, user, pw) 
    return client 
# ---------------------------------------------------- 
def buildMRcommand(product_name): 
    space = " " 
    mr_command_list = [ 'hadoop', 'jar', '/share/hadoop/tools/lib/hadoop-streaming.jar', 
         '-files', hdfs_home+xml_lookup_file, 
         '-file', hdfs_home+mapper, 
         '-file', hdfs_home+reducer, 
         '-mapper', "'"+python_path, mapper, product_name+"'", 
         '-file', hdfs_home+helper_script, 
         '-reducer', "'"+python_path, reducer+"'", 
         '-input', input_loc, 
         '-output', output_loc+output_ref] 

    MR_command = space.join(mr_command_list) 
    print MR_command 
    return MR_command 
# ---------------------------------------------------- 
def unbuffered_lines(f): 
    line_buf = "" 
    while not f.channel.exit_status_ready(): 
     line_buf += f.read(1) 
     if line_buf.endswith('\n'): 
      yield line_buf 
      line_buf = "" 
# ---------------------------------------------------- 
def stream_output(stdin, stdout, stderr): 
    writer = open(output_log, 'w') 
    # Using line_buffer function 
    for l in unbuffered_lines(stderr): 
     e = '[stderr] ' + l 
     print '[stderr] ' + l.strip('\n') 
     writer.write(e) 

    # gives full listing.. 
    for line in stdout: 
     r = '[stdout]' + line 
     print '[stdout]' + line.strip('\n') 
     writer.write(r) 
    writer.close() 
# ---------------------------------------------------- 
def run_MapReduce(ssh): 
    stdin, stdout, stderr = ssh.exec_command(buildMRcommand(product_name)) 
    stream_output(stdin, stdout, stderr) 
    return 1 
# ---------------------------------------------------- 
def run_list_dir(ssh): 
    list_dir = "ls "+hdfs_home+" -l" 
    stdin, stdout, stderr = ssh.exec_command(list_dir) 
    stream_output(stdin, stdout, stderr) 
# ---------------------------------------------------- 
def run_getmerge(ssh): 
    getmerge = "hadoop fs -getmerge "+output_loc+output_ref+" "+hdfs_home+target_file 
    print getmerge 
    stdin, stdout, stderr = ssh.exec_command(getmerge) 
    for line in stdout: 
     print '[stdout]' + line.strip('\n') 
    time.sleep(1.5) 
    return 1 
# ---------------------------------------------------- 
def scp_download(ssh): 
    scp = SCPClient(ssh.get_transport()) 
    print "Fetching SCP data.." 
    scp.get(hdfs_home+target_file, local_dir) 
    print "File download complete." 
# ---------------------------------------------------- 
def main(): 
    # Get the ssh connection 
    global ssh 
    ssh = createSSHClient(host_ip, port, user, pw) 
    print "Executing command..." 

    # Command list 
    ##run_list_dir(ssh) 
    ##run_getmerge(ssh) 
    ##scp_download(ssh) 

    # Run MapReduce 
    MR_status = 0 
    MR_status = run_MapReduce(ssh) 

    if MR_status == 1: 
     gs = 0 
     gs = run_getmerge(ssh) 
     if gs == 1: 
      scp_download(ssh) 

    # Close ssh connection 
    ssh.close() 

if __name__ == '__main__': 
    main() 
+0

Примечание: Там, кажется, не быть код выхода обеспечивается командой getmerge, поэтому я использовал ожидания 'time.sleep (1.5)' в сценарии, так что шаг scp_download не начинается до того работа getmerge завершена. –