Como UPSERT (MERGE, INSERIR… NA ATUALIZAÇÃO DUPLICATIVA) no PostgreSQL?

268

Uma pergunta muito frequente aqui é como fazer um upsert, que é o que o MySQL chama INSERT ... ON DUPLICATE UPDATEe o padrão suporta como parte da MERGEoperação.

Dado que o PostgreSQL não o suporta diretamente (antes da página 9.5), como você faz isso? Considere o seguinte:

CREATE TABLE testtable (
    id integer PRIMARY KEY,
    somedata text NOT NULL
);

INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');

Agora imagine que você quer "Upsert" as tuplas (2, 'Joe'), (3, 'Alan'), de modo que os novos conteúdos de tabela seria:

(1, 'fred'),
(2, 'Joe'),    -- Changed value of existing tuple
(3, 'Alan')    -- Added new tuple

É disso que as pessoas estão falando quando discutem um upsert. Fundamentalmente, qualquer abordagem deve ser segura na presença de várias transações trabalhando na mesma tabela - usando bloqueio explícito ou defendendo-se contra as condições de corrida resultantes.

Este tópico é discutido extensivamente no Insert, na atualização duplicada no PostgreSQL? , mas trata-se de alternativas à sintaxe do MySQL, e aumentou um pouco de detalhes não relacionados ao longo do tempo. Estou trabalhando em respostas definitivas.

Essas técnicas também são úteis para "inserir se não existir, caso contrário, não faça nada", ou seja, "inserir ... ao ignorar chave duplicada".

Craig Ringer
fonte
1
possível duplicado do Insert, na atualização duplicada no PostgreSQL?
Michael Hampton
8
@MichaelHampton, o objetivo aqui era criar uma versão definitiva que não fosse confundida por várias respostas desatualizadas - e bloqueada, para que ninguém possa fazer nada a respeito. Não concordo com o voto fechado.
Craig Ringer
Ora, isso logo ficaria desatualizado - e bloqueado, para que ninguém pudesse fazer nada a respeito.
Michael Hampton
2
@MichaelHampton Se você estiver preocupado, talvez possa sinalizar o que você ligou e pedir que ele seja desbloqueado para que possa ser limpo, para que possamos juntar isso. Estou cansado de ter o único e óbvio detalhe- as-dup para upsert ser uma bagunça tão confusa e errada.
Craig Ringer
1
Esse Q&A não está bloqueado!
Michael Hampton

Respostas:

396

9.5 e mais recente:

PostgreSQL 9.5 e suporte mais recente INSERT ... ON CONFLICT UPDATE(e ON CONFLICT DO NOTHING), ou seja, upsert.

Comparação comON DUPLICATE KEY UPDATE .

Explicação rápida .

Para uso, consulte o manual - especificamente a cláusula conflito_ação no diagrama de sintaxe e o texto explicativo .

Ao contrário das soluções para a 9.4 e anteriores fornecidas abaixo, esse recurso funciona com várias linhas conflitantes e não requer bloqueio exclusivo ou loop de repetição.

O commit adicionando o recurso está aqui e a discussão sobre seu desenvolvimento está aqui .


Se você está na versão 9.5 e não precisa ser compatível com versões anteriores, pode parar de ler agora .


9.4 e mais velhos:

O PostgreSQL não tem nenhum built-in UPSERTMERGE recurso interno (ou ) e é muito difícil fazê-lo de maneira eficiente em face do uso simultâneo.

Este artigo discute o problema em detalhes úteis .

Em geral, você deve escolher entre duas opções:

  • Operações individuais de inserção / atualização em um loop de repetição; ou
  • Bloqueando a tabela e fazendo a mesclagem em lote

Loop de repetição de linha individual

O uso de upserts de linha individuais em um loop de repetição é a opção razoável se você desejar várias conexões simultaneamente tentando executar inserções.

A documentação do PostgreSQL contém um procedimento útil que permite fazer isso em um loop dentro do banco de dados . Ele protege contra atualizações perdidas e insere corridas, ao contrário das soluções mais ingênuas. Funcionará apenas emREAD COMMITTED modo e só será seguro se for a única coisa que você fizer na transação. A função não funcionará corretamente se acionadores ou chaves exclusivas secundárias causarem violações exclusivas.

Essa estratégia é muito ineficiente. Sempre que possível, você deve enfileirar o trabalho e fazer uma upsert em massa, conforme descrito abaixo.

Muitas soluções tentadas para esse problema não consideram reversões, portanto resultam em atualizações incompletas. Duas transações competem entre si; um deles com sucesso INSERTs; o outro recebe um erro de chave duplicada e, em UPDATEvez disso, executa um . Os UPDATEblocos aguardando INSERTa reversão ou confirmação. Quando é revertida, a UPDATEnova verificação da condição corresponde a zero linhas, mesmo que as UPDATEconfirmações não tenham feito o upsert esperado. Você deve verificar as contagens da linha de resultados e tentar novamente quando necessário.

Algumas soluções tentadas também não consideram as corridas SELECT. Se você tentar o óbvio e simples:

-- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE.

BEGIN;

UPDATE testtable
SET somedata = 'blah'
WHERE id = 2;

-- Remember, this is WRONG. Do NOT COPY IT.

INSERT INTO testtable (id, somedata)
SELECT 2, 'blah'
WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2);

COMMIT;

então, quando dois executam ao mesmo tempo, existem vários modos de falha. Uma é a questão já discutida com uma nova verificação de atualização. Outro é onde ambos UPDATEao mesmo tempo, combinando zero linhas e continuando. Em seguida, ambos fazem o EXISTSteste, o que acontece antes do INSERT. Ambos têm zero linhas, e ambos fazem o INSERT. Um falha com um erro de chave duplicada.

É por isso que você precisa de um loop de repetição. Você pode pensar que pode evitar erros de chave duplicados ou atualizações perdidas com o SQL inteligente, mas não pode. Você precisa verificar a contagem de linhas ou manipular erros de chave duplicados (dependendo da abordagem escolhida) e tentar novamente.

Por favor, não role sua própria solução para isso. Como na fila de mensagens, provavelmente está errado.

Upsert a granel com trava

Às vezes, você deseja fazer uma upsert em massa, em que possui um novo conjunto de dados que deseja mesclar em um conjunto de dados existente mais antigo. Isso é muito mais eficiente do que upserts individuais de linha e deve ser preferido sempre que possível.

Nesse caso, você normalmente segue o seguinte processo:

  • CREATEuma TEMPORARYmesa

  • COPY ou insira em massa os novos dados na tabela temporária

  • LOCKa tabela de destino IN EXCLUSIVE MODE. Isso permite que outras transações SELECT, mas não façam alterações na tabela.

  • Faça um UPDATE ... FROMdos registros existentes usando os valores na tabela temporária;

  • Faça uma INSERTdas linhas que ainda não existem na tabela de destino;

  • COMMIT, liberando a trava.

Por exemplo, para o exemplo fornecido na pergunta, usando valores múltiplos INSERTpara preencher a tabela temporária:

BEGIN;

CREATE TEMPORARY TABLE newvals(id integer, somedata text);

INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan');

LOCK TABLE testtable IN EXCLUSIVE MODE;

UPDATE testtable
SET somedata = newvals.somedata
FROM newvals
WHERE newvals.id = testtable.id;

INSERT INTO testtable
SELECT newvals.id, newvals.somedata
FROM newvals
LEFT OUTER JOIN testtable ON (testtable.id = newvals.id)
WHERE testtable.id IS NULL;

COMMIT;

Leitura relacionada

A respeito MERGE ?

Padrão SQL MERGENa verdade, o possui semânticas de concorrência mal definidas e não é adequado para upserting sem bloquear uma tabela primeiro.

É uma instrução OLAP realmente útil para mesclagem de dados, mas na verdade não é uma solução útil para upsert com segurança de simultaneidade. Há muitos conselhos para as pessoas que usam outros DBMSes para usarMERGE DBMSes para upserts, mas na verdade está errado.

Outros bancos de dados:

Craig Ringer
fonte
No upsert em massa, existe um valor possível ao excluir os newvals em vez de filtrar o INSERT? Por exemplo, WITH upd AS (UPDATE ... RETURNING newvals.id) DELETE FROM newvals USANDO upd WHERE newvals.id = upd.id, seguido por um INSERT INTO testtable SELECT * FROM newvals? Minha idéia com isso: em vez de filtrar duas vezes no INSERT (para o JOIN / WHERE e a restrição exclusiva), reutilize os resultados da verificação de existência do UPDATE, que já estão na RAM, e pode ser muito menor. Isso pode ser uma vitória se poucas linhas corresponderem e / ou newvals for muito menor que a tabela de teste.
Gunnlaugur Briem 31/07
1
Ainda existem problemas não resolvidos e, para os outros fornecedores, não está claro o que funciona e o que não funciona. 1. A solução de loop do Postgres, conforme observado, não funciona no caso de várias chaves exclusivas. 2. A chave on duplicate para mysql também não funciona para várias chaves exclusivas. 3. As outras soluções para MySQL, SQL Server e Oracle postadas acima funcionam? São possíveis exceções nesses casos e precisamos fazer um loop?
dan b
@danb Isso é realmente apenas sobre o PostgreSQL. Não há solução entre fornecedores. A solução para o PostgreSQL não funciona para várias linhas; infelizmente, você deve fazer uma transação por linha. As "soluções" usadas MERGEpara SQL Server e Oracle são incorretas e propensas a condições de corrida, conforme observado acima. Você precisará examinar cada DBMS especificamente para descobrir como lidar com eles; na verdade, só posso oferecer conselhos sobre o PostgreSQL. A única maneira de fazer um upsert seguro com várias linhas no PostgreSQL será se o suporte ao upsert nativo for adicionado ao servidor núcleo.
Craig Ringer
Mesmo para PostGresQL, a solução não funciona no caso em que uma tabela possui várias chaves exclusivas (atualizando apenas uma linha). Nesse caso, você precisa especificar qual chave está sendo atualizada. Pode haver uma solução entre fornecedores usando o jdbc, por exemplo.
dan b
2
O Postgres agora suporta UPSERT - git.postgresql.org/gitweb/…
Chris
32

Estou tentando contribuir com outra solução para o problema de inserção única nas versões anteriores ao 9.5 do PostgreSQL. A idéia é simplesmente tentar executar primeiro a inserção e, caso o registro já esteja presente, atualizá-lo:

do $$
begin 
  insert into testtable(id, somedata) values(2,'Joe');
exception when unique_violation then
  update testtable set somedata = 'Joe' where id = 2;
end $$;

Observe que esta solução pode ser aplicada apenas se não houver exclusões de linhas da tabela .

Eu não sei sobre a eficiência desta solução, mas me parece razoável o suficiente.

Renzo
fonte
3
Obrigado, é exatamente isso que eu estava procurando. Não consigo entender por que era tão difícil de encontrar.
Isapir
4
Sim. Essa simplificação funciona se e somente se não houver exclusões.
Craig Ringer
@CraigRinger Você pode explicar o que exatamente acontecerá se houver exclusões?
turbanoff
@turbanoff A inserção pode falhar porque o registro já está lá, é excluído simultaneamente e a atualização afeta zero linhas porque a linha foi excluída.
Craig Ringer
@CraigRinger So. A exclusão ocorre simultaneamente . Quais são as possíveis outways se este é funciona bem? Se a exclusão estiver funcionando simultaneamente - ela poderá ser executada logo após o nosso bloqueio. O que estou tentando dizer - se tivermos exclusão simultânea - esse código funcionará da mesma maneira que adequadainsert on update
turbanoff
30

Aqui estão alguns exemplos para insert ... on conflict ...(página 9.5+ ):

  • Inserir, em conflito - não faça nada .
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict do nothing;`  
  • Inserir, em conflito - atualize , especifique o destino do conflito por meio da coluna .
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict(id)
    do update set name = 'new_name', size = 3;  
  • Inserir, em conflito - atualize , especifique o destino do conflito por meio do nome da restrição .
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict on constraint dummy_pkey
    do update set name = 'new_name', size = 4;
Eric Wang
fonte
ótima resposta - pergunta: por que ou em que situação deve-se usar a especificação de destino via nome da coluna ou restrição? Existe uma vantagem / desvantagem para vários casos de uso?
Nathan Benton
1
@ NathanBenton Acho que existem pelo menos 2 diferenças: (1) o nome da coluna é especificado pelo programador, enquanto o nome da restrição pode ser especificado pelo programador ou gerado pelo banco de dados de acordo com os nomes de tabela / coluna. (2) cada coluna pode ter várias restrições. Dito isto, depende do seu caso para escolher qual usar.
Eric Wang
8

SQLAlchemy upsert para Postgres> = 9.5

Como a postagem grande acima cobre muitas abordagens SQL diferentes para as versões do Postgres (não apenas as 9.5 como na pergunta), gostaria de adicionar como fazê-lo no SQLAlchemy se você estiver usando o Postgres 9.5. Em vez de implementar seu próprio upsert, você também pode usar as funções do SQLAlchemy (que foram adicionadas no SQLAlchemy 1.1). Pessoalmente, eu recomendaria usá-los, se possível. Não apenas por conveniência, mas também porque permite ao PostgreSQL lidar com quaisquer condições de corrida que possam ocorrer.

Postagem cruzada de outra resposta que eu dei ontem ( https://stackoverflow.com/a/44395983/2156909 )

O SQLAlchemy suporta ON CONFLICTagora com dois métodos on_conflict_do_update()e on_conflict_do_nothing():

Copiando da documentação:

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='[email protected]', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
    )
conn.execute(stmt)

http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict#insert-on-conflict-upsert

PR
fonte
4
Python e SQLAlchemy não são mencionados na pergunta.
Alexander Emelianov
Costumo usar Python nas soluções que escrevo. Mas eu não procurei no SQLAlchemy (ou estava ciente disso). Esta parece ser uma opção elegante. Obrigado. Se der certo, apresentarei isso à minha organização.
Robert
3
WITH UPD AS (UPDATE TEST_TABLE SET SOME_DATA = 'Joe' WHERE ID = 2 
RETURNING ID),
INS AS (SELECT '2', 'Joe' WHERE NOT EXISTS (SELECT * FROM UPD))
INSERT INTO TEST_TABLE(ID, SOME_DATA) SELECT * FROM INS

Testado no Postgresql 9.3

aristar
fonte
@ CraigRinger: você poderia elaborar isso? o cte não é atômico?
31919 parisni
2
@parisni Não. Cada termo CTE obtém seu próprio instantâneo se ele executa gravações. Além disso, não há nenhum tipo de bloqueio de predicado executado em linhas que não foram encontradas, portanto elas ainda podem ser criadas simultaneamente por outra sessão. Se você usasse o SERIALIZABLEisolamento, abortaria com uma falha de serialização; caso contrário, provavelmente obteria uma violação única. Não reinvente o upsert, a reinvenção estará errada. Use INSERT ... ON CONFLICT .... Se o seu PostgreSQL é muito antigo, atualize-o.
Craig Ringer
O @CraigRinger INSERT ... ON CLONFLICT ...não se destina ao carregamento em massa. Na sua postagem, o LOCK TABLE testtable IN EXCLUSIVE MODE;CTE é uma solução alternativa para obter coisas atômicas. Não ?
parisni 6/12/19
@parisni Não se destina ao carregamento em massa? Quem disse? postgresql.org/docs/current/sql-insert.html#SQL-ON-CONFLICT . Claro, é muito mais lento que o carregamento em massa, sem comportamento semelhante ao upsert, mas isso é óbvio e será o caso, não importa o que você faça. É muito mais rápido do que usar subtransações, com certeza. A abordagem mais rápida é bloquear a tabela de destino e, em seguida, fazer um insert ... where not exists ...ou similar, é claro.
Craig Ringer
1

Como essa pergunta foi encerrada, estou postando aqui como você faz isso usando SQLAlchemy. Por recursão, ele tenta novamente uma inserção ou atualização em massa para combater condições de corrida e erros de validação.

Primeiro as importações

import itertools as it

from functools import partial
from operator import itemgetter

from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts

Agora, algumas funções auxiliares

def chunk(content, chunksize=None):
    """Groups data into chunks each with (at most) `chunksize` items.
    https://stackoverflow.com/a/22919323/408556
    """
    if chunksize:
        i = iter(content)
        generator = (list(it.islice(i, chunksize)) for _ in it.count())
    else:
        generator = iter([content])

    return it.takewhile(bool, generator)


def gen_resources(records):
    """Yields a dictionary if the record's id already exists, a row object 
    otherwise.
    """
    ids = {item[0] for item in session.query(Posts.id)}

    for record in records:
        is_row = hasattr(record, 'to_dict')

        if is_row and record.id in ids:
            # It's a row but the id already exists, so we need to convert it 
            # to a dict that updates the existing record. Since it is duplicate,
            # also yield True
            yield record.to_dict(), True
        elif is_row:
            # It's a row and the id doesn't exist, so no conversion needed. 
            # Since it's not a duplicate, also yield False
            yield record, False
        elif record['id'] in ids:
            # It's a dict and the id already exists, so no conversion needed. 
            # Since it is duplicate, also yield True
            yield record, True
        else:
            # It's a dict and the id doesn't exist, so we need to convert it. 
            # Since it's not a duplicate, also yield False
            yield Posts(**record), False

E, finalmente, a função upsert

def upsert(data, chunksize=None):
    for records in chunk(data, chunksize):
        resources = gen_resources(records)
        sorted_resources = sorted(resources, key=itemgetter(1))

        for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
            items = [g[0] for g in group]

            if dupe:
                _upsert = partial(session.bulk_update_mappings, Posts)
            else:
                _upsert = session.add_all

            try:
                _upsert(items)
                session.commit()
            except IntegrityError:
                # A record was added or deleted after we checked, so retry
                # 
                # modify accordingly by adding additional exceptions, e.g.,
                # except (IntegrityError, ValidationError, ValueError)
                db.session.rollback()
                upsert(items)
            except Exception as e:
                # Some other error occurred so reduce chunksize to isolate the 
                # offending row(s)
                db.session.rollback()
                num_items = len(items)

                if num_items > 1:
                    upsert(items, num_items // 2)
                else:
                    print('Error adding record {}'.format(items[0]))

Veja como você o usa

>>> data = [
...     {'id': 1, 'text': 'updated post1'}, 
...     {'id': 5, 'text': 'updated post5'}, 
...     {'id': 1000, 'text': 'new post1000'}]
... 
>>> upsert(data)

A vantagem disso bulk_save_objectsé que ele pode lidar com relacionamentos, verificação de erros etc. na inserção (ao contrário das operações em massa ).

reubano
fonte
Também parece errado para mim. E se uma sessão simultânea inserir uma linha depois de coletar sua lista de IDs? Ou exclui um?
Craig Ringer
good point @CraigRinger Eu faço algo semelhante a isso, mas só tenho 1 sessão executando o trabalho. Qual é a melhor maneira de lidar com várias sessões? Uma transação, talvez?
Reubano 27/04
As transações não são a solução mágica para todos os problemas de simultaneidade. Você pode usar SERIALIZABLE transações e lidar com falhas de serialização, mas é lento. Você precisa de tratamento de erros e um loop de repetição. Veja minha resposta e a seção "leitura relacionada".
Craig Ringer
@CraigRinger gotcha. Na verdade, implementei um loop de repetição no meu próprio caso devido a outras falhas de validação. Vou atualizar esta resposta de acordo.
Reubano 27/04