У меня есть многопроцессорное приложение, которое нуждается в обновлении (вставка, если существует обновление).Как заставить SQLAlchemy вставить работу с многопользовательским триггером upgert-проверки Postgres?
Я решил подойти к использованию с использованием триггерного решения. (Вы добавляете дополнительный столбец для каждой таблицы с поддержкой upsert с именем is_upsert, и в триггере проверяется это поле, если оно ложно, вы выполняете нормальную вставку, но если это правда, вы выполняете логику upsert - попробуйте обновление и если оно не работает, потому что запись не выполняется 't существует, вы пытаетесь вставить).
Вот триггер логика:
CREATE OR REPLACE FUNCTION upsert_trigger_function_{table}()
RETURNS TRIGGER AS $upsert_trigger_function$
DECLARE
row record;
BEGIN
RAISE NOTICE 'upsert trigger fired, upsert is %%', NEW.{upsert_column};
IF NEW.{upsert_column} THEN
NEW.{upsert_column} := false;
LOOP
UPDATE {table} SET
{update_set}
WHERE
{update_where}
;
IF found THEN
RETURN NULL;
END IF;
BEGIN
INSERT INTO {table} SELECT NEW.*;
RETURN NULL;
EXCEPTION WHEN unique_violation THEN
-- loop
END;
END LOOP;
RETURN NULL;
ELSE
RETURN NEW;
END IF;
END;
$upsert_trigger_function$ LANGUAGE plpgsql;
объект испытаний (add_upsert только делает выше триггер должен быть установлен):
class SimpleItem(PipelinesBase):
__tablename__ = 'simple_item'
id = Column(BigInteger, primary_key=True)
item_type = Column(String, nullable=False, unique=True)
quantity = Column(Integer, nullable=False)
price = Column(Float, nullable=False)
in_stock = Column(Boolean, nullable=False)
arrived = Column(Date)
sys_time = Column(
TSTZRANGE,
nullable=False,
server_default=text("TSTZRANGE(now(), null)"),
)
_upsert = Column(Boolean, nullable=False, server_default=text('false'))
_type_identifier = 1400
add_upsert(SimpleItem, ['item_type'])
тестовый скрипт
from sqlalchemy.engine import create_engine
from pipelines.settings_proxy import TEST_DB
from sqlalchemy.orm.session import sessionmaker
from test_pipelines.test_persistence.mock_items import SimpleItem
from test_pipelines.test_persistence.helpers import random_simple_item
def main():
engine = create_engine(TEST_DB)
values = random_simple_item(_upsert=True)
session = sessionmaker(engine)()
si = SimpleItem(**values)
session.add(si)
session.commit()
si = SimpleItem(**values)
si.price = 1
session.merge(si)
session.commit()
Он работает должным образом, когда используя SQL-состояния, но когда я использую его вместе с объектом добавления ORM SQLAlchemy, есть
Traceback (most recent call last):
File "pipelines/persistence/experiment_with_upsert_field.py", line 59, in <module>
main()
File "pipelines/persistence/experiment_with_upsert_field.py", line 27, in main
session.commit()
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 801, in commit
self.transaction.commit()
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 392, in commit
self._prepare_impl()
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 372, in _prepare_impl
self.session.flush()
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 2019, in flush
self._flush(objects)
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 2137, in _flush
transaction.rollback(_capture_exception=True)
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/util/langhelpers.py", line 60, in __exit__
compat.reraise(exc_type, exc_value, exc_tb)
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 184, in reraise
raise value
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 2101, in _flush
flush_context.execute()
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/orm/unitofwork.py", line 373, in execute
rec.execute(self)
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/orm/unitofwork.py", line 532, in execute
uow
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/orm/persistence.py", line 174, in save_obj
mapper, table, insert)
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/orm/persistence.py", line 800, in _emit_insert_statements
execute(statement, params)
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 914, in execute
return meth(self, multiparams, params)
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/sql/elements.py", line 323, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1010, in _execute_clauseelement
compiled_sql, distilled_params
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1159, in _execute_context
result = context._setup_crud_result_proxy()
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 828, in _setup_crud_result_proxy
self._setup_ins_pk_from_implicit_returning(row)
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 893, in _setup_ins_pk_from_implicit_returning
for col in table.primary_key
File "/home/sebastian/local/virtualenvs/perception/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 891, in <listcomp>
for col, value in [
TypeError: 'NoneType' object is not subscriptable
поднять в глубину sqlalchemy.engine.default. Я уверен, что это потому, что мой триггер возвращает NULL, когда выполняет UPSERT, и SQLAlchemy пытается распространять объект со вставленным ID с помощью инструкции RETURNING. Который obviuosly терпит неудачу, потому что невозможно получить правильный идентификатор в триггере из подчиненного INSERT/UPDATE и в то же время блокировать нормальную нормальную вставку.
Обратите внимание, что я уже тестировал upsert как специальную функцию, которая не работает для меня, потому что я жертвуя помощью SQLAlchemy с обновлением сложных элементов (те, у которых есть отношения с другими элементами).
Итак, вот мой вопрос: Как я могу сказать SQLAlchemy, чтобы избежать загрузки вставленных идентификаторов объектов?
вы не в состоянии использовать PostgreSQL 9.5? Можете ли вы опубликовать полную трассировку стека, а также как вы вызываете функцию pl/pgsql из Python? – univerio
Невозможно использовать 9.5. Вот подробности. Однако, вероятно, невозможно избежать загрузки с использованием ORM. Я решил это с помощью ядра SQLAlchemy. – omikron