Я пытаюсь заполнить базу данных postgresql с помощью MRjob. Несколько дней назад кто-то любезно предложил мне here разделить по шагам картографа. Я пытался, но дается ошибка:psycopg2.ProgrammingError: отношение * уже существует во время populatig базы данных через MRjob
python db_store_hadoop.py -r local --dbname=en_ws xSparse.txt
no configs found; falling back on auto-configuration
no configs found; falling back on auto-configuration
creating tmp directory /tmp/db_store_hadoop.iarroyof.20160204.074501.695246
writing wrapper script to /tmp/db_store_hadoop.iarroyof.20160204.074501.695246/setup-wrapper.sh
PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols
writing to /tmp/db_store_hadoop.iarroyof.20160204.074501.695246/step-0-mapper_part-00000
> sh -ex setup-wrapper.sh /usr/bin/python db_store_hadoop.py --step-num=0 --mapper --dbname en_ws /tmp/db_store_hadoop.iarroyof.20160204.074501.695246/input_part-00000 > /tmp/db_store_hadoop.iarroyof.20160204.074501.695246/step-0-mapper_part-00000
writing to /tmp/db_store_hadoop.iarroyof.20160204.074501.695246/step-0-mapper_part-00001
> sh -ex setup-wrapper.sh /usr/bin/python db_store_hadoop.py --step-num=0 --mapper --dbname en_ws /tmp/db_store_hadoop.iarroyof.20160204.074501.695246/input_part-00001 > /tmp/db_store_hadoop.iarroyof.20160204.074501.695246/step-0-mapper_part-00001
STDERR: + __mrjob_PWD=/tmp/db_store_hadoop.iarroyof.20160204.074501.695246/job_local_dir/0/mapper/0
STDERR: + exec
STDERR: + /usr/bin/python -c import fcntl; fcntl.flock(9, fcntl.LOCK_EX)
STDERR: + export PYTHONPATH=/tmp/db_store_hadoop.iarroyof.20160204.074501.695246/job_local_dir/0/mapper/0/mrjob.tar.gz:/home/iarroyof/shogun-install/lib/python2.7/dist-packages:/home/iarroyof/shogun/examples/undocumented/python_modular:/home/iarroyof/smo-mkl/python:
STDERR: + exec
STDERR: + cd /tmp/db_store_hadoop.iarroyof.20160204.074501.695246/job_local_dir/0/mapper/0
STDERR: + /usr/bin/python db_store_hadoop.py --step-num=0 --mapper --dbname en_ws /tmp/db_store_hadoop.iarroyof.20160204.074501.695246/input_part-00000
STDERR: Traceback (most recent call last):
STDERR: File "db_store_hadoop.py", line 86, in <module>
STDERR: MRwordStore().run()
STDERR: File "/tmp/db_store_hadoop.iarroyof.20160204.074501.695246/job_local_dir/0/mapper/0/mrjob.tar.gz/mrjob/job.py", line 461, in run
STDERR: mr_job.execute()
STDERR: File "/tmp/db_store_hadoop.iarroyof.20160204.074501.695246/job_local_dir/0/mapper/0/mrjob.tar.gz/mrjob/job.py", line 470, in execute
STDERR: self.run_mapper(self.options.step_num)
STDERR: File "/tmp/db_store_hadoop.iarroyof.20160204.074501.695246/job_local_dir/0/mapper/0/mrjob.tar.gz/mrjob/job.py", line 530, in run_mapper
STDERR: for out_key, out_value in mapper_init() or():
STDERR: File "db_store_hadoop.py", line 35, in mapper_init
STDERR: create_tables(self.cr0)
STDERR: File "db_store_hadoop.py", line 14, in create_tables
STDERR: cr.execute("create table word_list(id serial primary key, word character varying not null)")
STDERR: psycopg2.ProgrammingError: relation "word_list" already exists
STDERR:
Counters from step 1:
(no counters found)
Traceback (most recent call last):
File "db_store_hadoop.py", line 86, in <module>
MRwordStore().run()
File "/usr/local/lib/python2.7/dist-packages/mrjob/job.py", line 461, in run
mr_job.execute()
File "/usr/local/lib/python2.7/dist-packages/mrjob/job.py", line 479, in execute
super(MRJob, self).execute()
File "/usr/local/lib/python2.7/dist-packages/mrjob/launch.py", line 153, in execute
self.run_job()
File "/usr/local/lib/python2.7/dist-packages/mrjob/launch.py", line 216, in run_job
runner.run()
File "/usr/local/lib/python2.7/dist-packages/mrjob/runner.py", line 470, in run
self._run()
File "/usr/local/lib/python2.7/dist-packages/mrjob/sim.py", line 173, in _run
self._invoke_step(step_num, 'mapper')
File "/usr/local/lib/python2.7/dist-packages/mrjob/sim.py", line 264, in _invoke_step
self.per_step_runner_finish(step_num)
File "/usr/local/lib/python2.7/dist-packages/mrjob/local.py", line 152, in per_step_runner_finish
self._wait_for_process(proc_dict, step_num)
File "/usr/local/lib/python2.7/dist-packages/mrjob/local.py", line 268, in _wait_for_process
(proc_dict['args'], returncode, ''.join(tb_lines)))
Exception: Command ['sh', '-ex', 'setup-wrapper.sh', '/usr/bin/python', 'db_store_hadoop.py', '--step-num=0', '--mapper', '--dbname', 'en_ws', '/tmp/db_store_hadoop.iarroyof.20160204.074501.695246/input_part-00000'] returned non-zero exit status 1:
Traceback (most recent call last):
File "db_store_hadoop.py", line 86, in <module>
MRwordStore().run()
File "/tmp/db_store_hadoop.iarroyof.20160204.074501.695246/job_local_dir/0/mapper/0/mrjob.tar.gz/mrjob/job.py", line 461, in run
mr_job.execute()
File "/tmp/db_store_hadoop.iarroyof.20160204.074501.695246/job_local_dir/0/mapper/0/mrjob.tar.gz/mrjob/job.py", line 470, in execute
self.run_mapper(self.options.step_num)
File "/tmp/db_store_hadoop.iarroyof.20160204.074501.695246/job_local_dir/0/mapper/0/mrjob.tar.gz/mrjob/job.py", line 530, in run_mapper
for out_key, out_value in mapper_init() or():
File "db_store_hadoop.py", line 35, in mapper_init
create_tables(self.cr0)
File "db_store_hadoop.py", line 14, in create_tables
cr.execute("create table word_list(id serial primary key, word character varying not null)")
psycopg2.ProgrammingError: relation "word_list" already exists
Это моя работа Код:
# -*- coding: utf-8 -*-
#Script for storing the sparse data into a database
import psycopg2
import re
import argparse
from mrjob.job import MRJob
def unicodize(segment):
if re.match(r'\\u[0-9a-f]{4}', segment):
return segment.decode('unicode-escape')
return segment.decode('utf-8')
def create_tables(cr):
cr.execute("create table word_list(id serial primary key, word character varying not null)")
cr.execute("""create table word_sparse(
id serial primary key,
word_id integer references word_list(id) not null,
pos integer not null,
val float not null)""")
def delete_tables(cr):
cr.execute("drop table word_sparse")
cr.execute("drop table word_list")
class MRwordStore(MRJob):
#conn = psycopg2.connect("dbname=%s user=semeval password=semeval" % args_n)
def configure_options(self):
super(MRwordStore, self).configure_options()
self.add_file_option('--dbname')
def mapper_init(self):
# make sqlite3 database available to mapper
self.conn = psycopg2.connect("dbname="+ self.options.dbname +" user=semeval password=semeval")
self.cr0 = self.conn.cursor()
create_tables(self.cr0)
def mapper(self, _, line):
self.cr = self.conn.cursor()
item = line.strip().split('\t')
replaced = u"".join((unicodize(seg) for seg in re.split(r'(\\u[0-9a-f]{4})', item[0])))
key = u''.join((c for c in replaced if c != '"'))
self.cr.execute("insert into word_list(word) values(%s) returning id", (key,))
word_id = self.cr.fetchone()[0]
#Parse the list, literal_eval is avoided because of memory issues
inside = False
number = ""
pos = 0
val = 0
for c in item[1]:
if c == '[':
inside = True
elif c.isdigit():
number += c
elif c == ',':
if inside:
pos = int(number)
number = ""
elif c == ']':
if inside:
val = int(number)
number = ""
self.cr.execute("insert into word_sparse(word_id, pos, val) values (%s, %s, %s)", (word_id, pos, val))
inside = False
def mapper_final(self):
self.conn.commit()
self.conn.close()
if __name__ == "__main__":
"""
Stores words in the database.
The first time, run with the arguments -cs.
If the database has to be recreated, run again with the d argument (-dcs)
Use the -f argument to specify the input file (sparse data)
Use the -n argument to specify the database name, which must be already created.
It also asumes the owner of the database is a user named semeval with password semeval
"""
MRwordStore().run()
Если кто-то может помочь мне выявления ошибок и заблуждений было бы очень признателен.