2016-02-04 6 views
0

Я пытаюсь заполнить базу данных postgresql с помощью MRjob. Несколько дней назад кто-то любезно предложил мне here разделить по шагам картографа. Я пытался, но дается ошибка:psycopg2.ProgrammingError: отношение * уже существует во время populatig базы данных через MRjob

python db_store_hadoop.py -r local --dbname=en_ws xSparse.txt 
no configs found; falling back on auto-configuration 
no configs found; falling back on auto-configuration 
creating tmp directory /tmp/db_store_hadoop.iarroyof.20160204.074501.695246 
writing wrapper script to /tmp/db_store_hadoop.iarroyof.20160204.074501.695246/setup-wrapper.sh 

PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols 

writing to /tmp/db_store_hadoop.iarroyof.20160204.074501.695246/step-0-mapper_part-00000 
> sh -ex setup-wrapper.sh /usr/bin/python db_store_hadoop.py --step-num=0 --mapper --dbname en_ws /tmp/db_store_hadoop.iarroyof.20160204.074501.695246/input_part-00000 > /tmp/db_store_hadoop.iarroyof.20160204.074501.695246/step-0-mapper_part-00000 
writing to /tmp/db_store_hadoop.iarroyof.20160204.074501.695246/step-0-mapper_part-00001 
> sh -ex setup-wrapper.sh /usr/bin/python db_store_hadoop.py --step-num=0 --mapper --dbname en_ws /tmp/db_store_hadoop.iarroyof.20160204.074501.695246/input_part-00001 > /tmp/db_store_hadoop.iarroyof.20160204.074501.695246/step-0-mapper_part-00001 
STDERR: + __mrjob_PWD=/tmp/db_store_hadoop.iarroyof.20160204.074501.695246/job_local_dir/0/mapper/0 
STDERR: + exec 
STDERR: + /usr/bin/python -c import fcntl; fcntl.flock(9, fcntl.LOCK_EX) 
STDERR: + export PYTHONPATH=/tmp/db_store_hadoop.iarroyof.20160204.074501.695246/job_local_dir/0/mapper/0/mrjob.tar.gz:/home/iarroyof/shogun-install/lib/python2.7/dist-packages:/home/iarroyof/shogun/examples/undocumented/python_modular:/home/iarroyof/smo-mkl/python: 
STDERR: + exec 
STDERR: + cd /tmp/db_store_hadoop.iarroyof.20160204.074501.695246/job_local_dir/0/mapper/0 
STDERR: + /usr/bin/python db_store_hadoop.py --step-num=0 --mapper --dbname en_ws /tmp/db_store_hadoop.iarroyof.20160204.074501.695246/input_part-00000 
STDERR: Traceback (most recent call last): 
STDERR: File "db_store_hadoop.py", line 86, in <module> 
STDERR:  MRwordStore().run() 
STDERR: File "/tmp/db_store_hadoop.iarroyof.20160204.074501.695246/job_local_dir/0/mapper/0/mrjob.tar.gz/mrjob/job.py", line 461, in run 
STDERR:  mr_job.execute() 
STDERR: File "/tmp/db_store_hadoop.iarroyof.20160204.074501.695246/job_local_dir/0/mapper/0/mrjob.tar.gz/mrjob/job.py", line 470, in execute 
STDERR:  self.run_mapper(self.options.step_num) 
STDERR: File "/tmp/db_store_hadoop.iarroyof.20160204.074501.695246/job_local_dir/0/mapper/0/mrjob.tar.gz/mrjob/job.py", line 530, in run_mapper 
STDERR:  for out_key, out_value in mapper_init() or(): 
STDERR: File "db_store_hadoop.py", line 35, in mapper_init 
STDERR:  create_tables(self.cr0) 
STDERR: File "db_store_hadoop.py", line 14, in create_tables 
STDERR:  cr.execute("create table word_list(id serial primary key, word character varying not null)") 
STDERR: psycopg2.ProgrammingError: relation "word_list" already exists 
STDERR: 
Counters from step 1: 
    (no counters found) 
Traceback (most recent call last): 
    File "db_store_hadoop.py", line 86, in <module> 
    MRwordStore().run() 
    File "/usr/local/lib/python2.7/dist-packages/mrjob/job.py", line 461, in run 
    mr_job.execute() 
    File "/usr/local/lib/python2.7/dist-packages/mrjob/job.py", line 479, in execute 
    super(MRJob, self).execute() 
    File "/usr/local/lib/python2.7/dist-packages/mrjob/launch.py", line 153, in execute 
    self.run_job() 
    File "/usr/local/lib/python2.7/dist-packages/mrjob/launch.py", line 216, in run_job 
    runner.run() 
    File "/usr/local/lib/python2.7/dist-packages/mrjob/runner.py", line 470, in run 
    self._run() 
    File "/usr/local/lib/python2.7/dist-packages/mrjob/sim.py", line 173, in _run 
    self._invoke_step(step_num, 'mapper') 
    File "/usr/local/lib/python2.7/dist-packages/mrjob/sim.py", line 264, in _invoke_step 
    self.per_step_runner_finish(step_num) 
    File "/usr/local/lib/python2.7/dist-packages/mrjob/local.py", line 152, in per_step_runner_finish 
    self._wait_for_process(proc_dict, step_num) 
    File "/usr/local/lib/python2.7/dist-packages/mrjob/local.py", line 268, in _wait_for_process 
    (proc_dict['args'], returncode, ''.join(tb_lines))) 
Exception: Command ['sh', '-ex', 'setup-wrapper.sh', '/usr/bin/python', 'db_store_hadoop.py', '--step-num=0', '--mapper', '--dbname', 'en_ws', '/tmp/db_store_hadoop.iarroyof.20160204.074501.695246/input_part-00000'] returned non-zero exit status 1: 
Traceback (most recent call last): 
    File "db_store_hadoop.py", line 86, in <module> 
    MRwordStore().run() 
    File "/tmp/db_store_hadoop.iarroyof.20160204.074501.695246/job_local_dir/0/mapper/0/mrjob.tar.gz/mrjob/job.py", line 461, in run 
    mr_job.execute() 
    File "/tmp/db_store_hadoop.iarroyof.20160204.074501.695246/job_local_dir/0/mapper/0/mrjob.tar.gz/mrjob/job.py", line 470, in execute 
    self.run_mapper(self.options.step_num) 
    File "/tmp/db_store_hadoop.iarroyof.20160204.074501.695246/job_local_dir/0/mapper/0/mrjob.tar.gz/mrjob/job.py", line 530, in run_mapper 
    for out_key, out_value in mapper_init() or(): 
    File "db_store_hadoop.py", line 35, in mapper_init 
    create_tables(self.cr0) 
    File "db_store_hadoop.py", line 14, in create_tables 
    cr.execute("create table word_list(id serial primary key, word character varying not null)") 
psycopg2.ProgrammingError: relation "word_list" already exists 

Это моя работа Код:

# -*- coding: utf-8 -*- 
#Script for storing the sparse data into a database 
import psycopg2 
import re 
import argparse 
from mrjob.job import MRJob 

def unicodize(segment): 
    if re.match(r'\\u[0-9a-f]{4}', segment): 
     return segment.decode('unicode-escape') 
    return segment.decode('utf-8') 

def create_tables(cr): 
    cr.execute("create table word_list(id serial primary key, word character varying not null)") 
    cr.execute("""create table word_sparse(
     id serial primary key, 
     word_id integer references word_list(id) not null, 
     pos integer not null, 
     val float not null)""") 

def delete_tables(cr): 
    cr.execute("drop table word_sparse") 
    cr.execute("drop table word_list") 

class MRwordStore(MRJob): 
    #conn = psycopg2.connect("dbname=%s user=semeval password=semeval" % args_n) 
    def configure_options(self): 
     super(MRwordStore, self).configure_options() 
     self.add_file_option('--dbname') 

    def mapper_init(self): 
     # make sqlite3 database available to mapper 
     self.conn = psycopg2.connect("dbname="+ self.options.dbname +" user=semeval password=semeval") 
     self.cr0 = self.conn.cursor() 
     create_tables(self.cr0) 

    def mapper(self, _, line): 
     self.cr = self.conn.cursor() 
     item = line.strip().split('\t') 
     replaced = u"".join((unicodize(seg) for seg in re.split(r'(\\u[0-9a-f]{4})', item[0]))) 
     key = u''.join((c for c in replaced if c != '"')) 

     self.cr.execute("insert into word_list(word) values(%s) returning id", (key,)) 
     word_id = self.cr.fetchone()[0] 

      #Parse the list, literal_eval is avoided because of memory issues 
     inside = False 
     number = "" 
     pos = 0 
     val = 0 
     for c in item[1]: 
      if c == '[': 
       inside = True 
      elif c.isdigit(): 
       number += c 
      elif c == ',': 
       if inside: 
        pos = int(number) 
        number = "" 
      elif c == ']': 
       if inside: 
        val = int(number) 
        number = "" 
        self.cr.execute("insert into word_sparse(word_id, pos, val) values (%s, %s, %s)", (word_id, pos, val)) 
       inside = False 

    def mapper_final(self): 

     self.conn.commit() 
     self.conn.close() 


if __name__ == "__main__": 
    """ 
    Stores words in the database. 

    The first time, run with the arguments -cs. 
    If the database has to be recreated, run again with the d argument (-dcs) 

    Use the -f argument to specify the input file (sparse data) 
    Use the -n argument to specify the database name, which must be already created. 

    It also asumes the owner of the database is a user named semeval with password semeval 
    """ 

    MRwordStore().run() 

Если кто-то может помочь мне выявления ошибок и заблуждений было бы очень признателен.

ответ

0

После нескольких дней попытки я использовал начальное соединение и CREATE TABLE IF NOT EXISTS ... на __main__. На mapper_init() я создал новое соединение и курсор для каждого картографа. Это скрипт для заполнения базы данных PostgreSQL с помощью Hadoop:

# -*- coding: utf-8 -*- 
# Script for storing the sparse data into a database. 
# Dependencies: MRjob, psycopg2, postgresql and/or Hadoop. 

import psycopg2 
import re 
import argparse 
from mrjob.job import MRJob 

dbname = "es_ws" 
# Following global was created for my custom application. You can avoid it. 
dsm = True 

def unicodize(segment): 
    if re.match(r'\\u[0-9a-f]{4}', segment): 
     return segment.decode('unicode-escape') 
    return segment.decode('utf-8') 

def replaced(item): 
    replaced = u"".join((unicodize(seg) for seg in re.split(r'(\\u[0-9a-f]{4})', item))) 
    word = replaced.strip('"') 
    return word 

def insert_list_vector(cursor, word_id, vector): 
    inside = False 
    number = "" 
    pos = 0 
    val = 0 
    for c in vector: 
     if c == '[': 
      inside = True 
     elif c.isdigit(): 
       number += c 
     elif c == ',': 
      if inside: 
       pos = int(number) 
       number = "" 
     elif c == ']': 
      if inside: 
       val = int(number) 
       number = "" 
       cursor.execute("insert into word_sparse(word_id, pos, val) values (%s, %s, %s)", (word_id, pos, val)) 
      inside = False 

def insert_dict_vector(cursor, word, vector): 
     palabra = word #replaced(palabra) 
     d = vector #item[1] 
     bkey = True 
     bvalue = False 
     key = "" 
     value = "" 
     for c in d: 
      if c == '{': 
       pass 
      elif c == ":": 
       bkey = False 
       bvalue = True 
      elif c in (",","}"): 
       bkey = True 
       bvalue = False 
       key = replaced(key.strip()) 
       value = int(value) 
       sql = "INSERT INTO coocurrencias VALUES('%s', '%s', %s);"%(palabra, key, value) 
       cursor.execute(sql) 
       key = "" 
       value = "" 
      elif bkey: 
       key += c 
      elif bvalue: 
       value += c 

def create_tables(cr): 
    if dsm: 
     cr.execute("create table if not exists coocurrencias(pal1 character varying, pal2 character varying, valor integer)") 
     cr.execute("create table if not exists words(id integer, word character varying)") #(id integer, word character varying, freq integer) 
    else: 
     cr.execute("create table if not exists word_list(id serial primary key, word character varying not null)") 
     cr.execute("""create table if not exists word_sparse(
        id serial primary key, word_id integer references word_list(id) not null, 
        pos integer not null, val float not null)""") 

class MRwordStore(MRJob): 

    def mapper_init(self): 
     self.conn = psycopg2.connect("dbname="+ dbname +" user=semeval password=semeval") 

    def mapper(self, _, line): 
     self.cr = self.conn.cursor() 
     item = line.strip().split('\t') 
     replaced = u"".join((unicodize(seg) for seg in re.split(r'(\\u[0-9a-f]{4})', item[0]))) 
     key = u''.join((c for c in replaced if c != '"'))  
    if dsm: 
     self.cr.execute("insert into words(word) values(%s) returning id", (key,)) 
      word_id = self.cr.fetchone()[0] 
      insert_dict_vector(cursor = self.cr, word = key, vector = item[1]) 
     else:   
      self.cr.execute("insert into word_list(word) values(%s) returning id", (key,)) 
      word_id = self.cr.fetchone()[0] 
      insert_list_vector(cursor = self.cr, word_id = word_id, vector = item[1]) 

    def mapper_final(self): 
     self.conn.commit() 
     self.conn.close() 

if __name__ == "__main__": 
    """Stores word vectors into a database. Such a db (e.g. here is en_ws) must be previusly created in postgresql. 
    It also asumes the owner of the database is a user named semeval with password semeval. 
    This script parses input_file.txt containing lines in the next example format (dsm=False): 

    "word"<tab> [[number, number],[number, number], ...] 

    or (dsm=True) 

    "word"<tab> {key:value, key:value,...} 

    Use example: 

    python db_store_hadoop.py -r hadoop input_file.txt 
    """ 
    # Firstly create tables once for avoiding duplicates. 
    conn = psycopg2.connect("dbname="+ dbname +" user=semeval password=semeval") 
    create_tables(conn.cursor()) # Overwrite this function for customing your tables 
    conn.commit() 
    conn.close()  

    # Run the MR object 
    MRwordStore().run() 

В случае, если кто имеет другое предложение это можно только приветствовать.