psycopg2: insere várias linhas com uma consulta

141

Preciso inserir várias linhas com uma consulta (o número de linhas não é constante), portanto, preciso executar uma consulta como esta:

INSERT INTO t (a, b) VALUES (1, 2), (3, 4), (5, 6);

A única maneira que eu sei é

args = [(1,2), (3,4), (5,6)]
args_str = ','.join(cursor.mogrify("%s", (x, )) for x in args)
cursor.execute("INSERT INTO t (a, b) VALUES "+args_str)

mas eu quero uma maneira mais simples.

Sergey Fedoseev
fonte

Respostas:

219

Criei um programa que insere várias linhas em um servidor localizado em outra cidade.

Eu descobri que usar esse método era cerca de 10 vezes mais rápido que executemany. No meu caso, tupé uma tupla contendo cerca de 2000 linhas. Demorou cerca de 10 segundos ao usar este método:

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str) 

e 2 minutos ao usar este método:

cur.executemany("INSERT INTO table VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s)", tup)
ant32
fonte
15
Ainda muito relevante quase dois anos depois. Hoje, uma experiência sugere que, à medida que o número de linhas que você deseja enviar aumenta, melhor é usar a executeestratégia. Vi uma aceleração de cerca de 100x graças a isso!
Rob Watts
4
Talvez executemanyexecute uma confirmação após cada inserção. Se você envolver a coisa toda em uma transação, talvez isso acelere as coisas?
Richard
4
Acabei de confirmar esta melhoria eu mesmo. Pelo que li, o psycopg2 executemanynão faz nada ideal, apenas faz um loop e faz muitas executedeclarações. Usando esse método, uma inserção de 700 linhas em um servidor remoto passou de 60 para <2s.
Nelson
5
Talvez eu esteja sendo paranóico, mas concatenar a consulta com uma +aparência que possa abrir para injeção de sql, acho que a execute_values()solução @Clodoaldo Neto é mais segura.
Will Munn
26
caso alguém encontre o seguinte erro: [TypeError: item de sequência 0: ocorrência esperada de str, bytes encontrados] execute este comando em vez disso [args_str = ','. join (cur.mogrify ("(% s,% s)", x ) .decode ("utf-8") para x em tup)]
mrt
146

Novo execute_valuesmétodo no Psycopg 2.7:

data = [(1,'x'), (2,'y')]
insert_query = 'insert into t (a, b) values %s'
psycopg2.extras.execute_values (
    cursor, insert_query, data, template=None, page_size=100
)

A maneira pitônica de fazer isso no Psycopg 2.6:

data = [(1,'x'), (2,'y')]
records_list_template = ','.join(['%s'] * len(data))
insert_query = 'insert into t (a, b) values {}'.format(records_list_template)
cursor.execute(insert_query, data)

Explicação: Se os dados a serem inseridos forem fornecidos como uma lista de tuplas, como em

data = [(1,'x'), (2,'y')]

já está no formato exato necessário, como

  1. a valuessintaxe da insertcláusula espera uma lista de registros como em

    insert into t (a, b) values (1, 'x'),(2, 'y')

  2. Psycopgadapta um Python tuplea um Postgresql record.

O único trabalho necessário é fornecer um modelo de lista de registros a ser preenchido pelo psycopg

# We use the data list to be sure of the template length
records_list_template = ','.join(['%s'] * len(data))

e coloque-o na insertconsulta

insert_query = 'insert into t (a, b) values {}'.format(records_list_template)

Imprimir as insert_querysaídas

insert into t (a, b) values %s,%s

Agora, para a Psycopgsubstituição de argumentos usuais

cursor.execute(insert_query, data)

Ou apenas testando o que será enviado ao servidor

print (cursor.mogrify(insert_query, data).decode('utf8'))

Resultado:

insert into t (a, b) values (1, 'x'),(2, 'y')
Clodoaldo Neto
fonte
1
Como o desempenho desse método se compara ao cur.copy_from?
Michael Goldshteyn 03/03
1
Aqui está uma essência com uma referência . copy_from é escalado para cerca de 6,5X mais rápido na minha máquina com 10 milhões de registros.
Joseph Sheedy
Parece bom - acho que você tem um desvio, no final de sua definição inicial de insert_query (a menos que você esteja tentando torná-la uma tupla?) E está faltando depois do% para% s também na definição inicial de insert_query.
deadcode
2
usando execute_valueseu era capaz de obter o meu sistema em execução no 1k registros de um minuto até 128k registros de um minuto
Conrad.Dean
66

Atualize com o psycopg2 2.7:

O clássico executemany()é cerca de 60 vezes mais lento que a implementação do @ ant32 (chamada "dobrada"), conforme explicado neste tópico: https://www.postgresql.org/message-id/20170130215151.GA7081%40deb76.aryehleib.com

Esta implementação foi adicionada ao psycopg2 na versão 2.7 e é chamada execute_values():

from psycopg2.extras import execute_values
execute_values(cur,
    "INSERT INTO test (id, v1, v2) VALUES %s",
    [(1, 2, 3), (4, 5, 6), (7, 8, 9)])

Resposta anterior:

Para inserir várias linhas, o uso da VALUESsintaxe de múltiplas linhas execute()é cerca de 10x mais rápido que o uso do psycopg2 executemany(). Na verdade, executemany()apenas executa muitosINSERT declarações .

O código do @ ant32 funciona perfeitamente no Python 2. Mas no Python 3, cursor.mogrify()retorna bytes, cursor.execute()pega bytes ou seqüências de caracteres e ','.join()esperastr instância.

Portanto, no Python 3, você pode precisar modificar o código do @ ant32, adicionando .decode('utf-8'):

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x).decode('utf-8') for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str)

Ou usando apenas bytes (com b''ou b""):

args_bytes = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute(b"INSERT INTO table VALUES " + args_bytes) 
Antoine Dusséaux
fonte
26

cursor.copy_from é a solução mais rápida que encontrei para inserções em massa de longe. Aqui está uma síntese que eu criei contendo uma classe chamada IteratorFile que permite que um iterador que produza strings seja lido como um arquivo. Podemos converter cada registro de entrada em uma string usando uma expressão de gerador. Então a solução seria

args = [(1,2), (3,4), (5,6)]
f = IteratorFile(("{}\t{}".format(x[0], x[1]) for x in args))
cursor.copy_from(f, 'table_name', columns=('a', 'b'))

Para esse tamanho trivial de argumentos, não fará muita diferença de velocidade, mas vejo grandes acelerações ao lidar com milhares de linhas. Também será mais eficiente em termos de memória do que construir uma string de consulta gigante. Um iterador apenas manteria um registro de entrada na memória de cada vez, onde, em algum momento, você ficará sem memória no processo Python ou no Postgres criando a string de consulta.

Joseph Sheedy
fonte
3
Aqui está uma referência comparando copy_from / IteratorFile com uma solução do construtor de consultas. copy_from é escalado para cerca de 6,5X mais rápido na minha máquina com 10 milhões de registros.
Joseph Sheedy
3
você tem que mexer com cordas e timestamps, etc?
CpILL
Sim, você precisará verificar se possui registros TSV bem formados.
23817 Joseph Sheedy
24

Um trecho da página de tutorial do Psycopg2 em Postgresql.org (veja abaixo) :

Um último item que gostaria de mostrar é como inserir várias linhas usando um dicionário. Se você tivesse o seguinte:

namedict = ({"first_name":"Joshua", "last_name":"Drake"},
            {"first_name":"Steven", "last_name":"Foo"},
            {"first_name":"David", "last_name":"Bar"})

Você pode inserir facilmente todas as três linhas no dicionário usando:

cur = conn.cursor()
cur.executemany("""INSERT INTO bar(first_name,last_name) VALUES (%(first_name)s, %(last_name)s)""", namedict)

Não salva muito código, mas definitivamente parece melhor.

ptrn
fonte
35
Isso executará muitas INSERTinstruções individuais . Útil, mas não o mesmo que uma única VALUEinserção múltipla .
Craig Ringer
7

Todas essas técnicas são chamadas 'Inserções estendidas "na terminologia do Postgres e, em 24 de novembro de 2016, ainda é muito mais rápido que o executemany () do psychopg2 e todos os outros métodos listados neste segmento (que tentei antes de abordar este assunto) responda).

Aqui está um código que não usa cur.mogrify e é legal e simples de entender:

valueSQL = [ '%s', '%s', '%s', ... ] # as many as you have columns.
sqlrows = []
rowsPerInsert = 3 # more means faster, but with diminishing returns..
for row in getSomeData:
        # row == [1, 'a', 'yolo', ... ]
        sqlrows += row
        if ( len(sqlrows)/len(valueSQL) ) % rowsPerInsert == 0:
                # sqlrows == [ 1, 'a', 'yolo', 2, 'b', 'swag', 3, 'c', 'selfie' ]
                insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*rowsPerInsert)
                cur.execute(insertSQL, sqlrows)
                con.commit()
                sqlrows = []
insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*len(sqlrows))
cur.execute(insertSQL, sqlrows)
con.commit()

Mas deve-se notar que, se você pode usar copy_from (), deve usar copy_from;)

JJ
fonte
Criando dos mortos, mas o que acontece na situação das últimas fileiras? Suponho que você realmente execute essa cláusula final novamente nas últimas linhas restantes, caso você tenha um número par de linhas?
Mcpeterson
Correto, desculpe, eu devo ter esquecido de fazer isso quando escrevi o exemplo - isso é muito estúpido da minha parte. Não fazer isso não teria dado às pessoas um erro, o que me preocupa quantas pessoas copiam / colam a solução e cuidam dos seus negócios ... De qualquer forma, mcpeterson muito grato - obrigado!
JJ
2

Estou usando a resposta do ant32 acima há vários anos. No entanto, descobri que há um erro no python 3 porque mogrifyretorna uma string de bytes.

A conversão explícita em bytes de strings é uma solução simples para tornar o código python 3 compatível.

args_str = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup) 
cur.execute(b"INSERT INTO table VALUES " + args_str)
jprockbelly
fonte
1

Outra abordagem interessante e eficiente - é passar linhas para inserção como 1 argumento, que é uma matriz de objetos json.

Por exemplo, você passando o argumento:

[ {id: 18, score: 1}, { id: 19, score: 5} ]

É uma matriz, que pode conter qualquer quantidade de objetos dentro. Então seu SQL se parece com:

INSERT INTO links (parent_id, child_id, score) 
SELECT 123, (r->>'id')::int, (r->>'score')::int 
FROM unnest($1::json[]) as r 

Aviso: Seu postgress deve ser novo o suficiente para suportar json

Daniel Garmoshka
fonte
1

A solução cursor.copyfrom , fornecida por @ jopseph.sheedy ( https://stackoverflow.com/users/958118/joseph-sheedy ) acima ( https://stackoverflow.com/a/30721460/11100064 ), é realmente muito rápida.

No entanto, o exemplo que ele fornece não é genericamente utilizável para um registro com qualquer número de campos e levei um tempo para descobrir como usá-lo corretamente.

O IteratorFile precisa ser instanciado com campos separados por tabulação como este ( ré uma lista de dictos em que cada dict é um registro):

    f = IteratorFile("{0}\t{1}\t{2}\t{3}\t{4}".format(r["id"],
        r["type"],
        r["item"],
        r["month"],
        r["revenue"]) for r in records)

Para generalizar para um número arbitrário de campos, primeiro criaremos uma sequência de linhas com a quantidade correta de guias e espaços reservados para campos: "{}\t{}\t{}....\t{}"e depois usaremos .format()para preencher os valores dos campos *list(r.values())) for r in records:

        line = "\t".join(["{}"] * len(records[0]))

        f = IteratorFile(line.format(*list(r.values())) for r in records)

função completa em essência aqui .

Bart Jonk
fonte
0

Se você estiver usando SQLAlchemy, não precisará mexer na criação manual da sequência, porque o SQLAlchemy suporta a geração de uma VALUEScláusula de várias linhas para uma única INSERTinstrução :

rows = []
for i, name in enumerate(rawdata):
    row = {
        'id': i,
        'name': name,
        'valid': True,
    }
    rows.append(row)
if len(rows) > 0:  # INSERT fails if no rows
    insert_query = SQLAlchemyModelName.__table__.insert().values(rows)
    session.execute(insert_query)
Jeff Widman
fonte
Sob o capô, o SQLAlchemy usa o executemany () do psychopg2 para chamadas como essa e, portanto, esta resposta terá problemas graves de desempenho para consultas grandes. Veja o método execute docs.sqlalchemy.org/en/latest/orm/session_api.html .
precisa saber é o seguinte
2
Eu não acho que é esse o caso. Já faz um tempo desde que olhei para isso, mas IIRC, na verdade, está criando uma única instrução de inserção na insert_querylinha. Então, session.execute()basta chamar a execute()declaração do psycopg2 com uma única sequência massiva. Portanto, o "truque" é criar o objeto inteiro da instrução de inserção primeiro. Estou usando isso para inserir 200.000 linhas por vez e vi um desempenho maciço aumentar usando esse código em comparação com o normal executemany().
quer
1
O documento SQLAlchemy ao qual você vinculou possui uma seção que mostra exatamente como isso funciona e até diz: "É essencial observar que a passagem de vários valores NÃO é a mesma que o uso do formulário executemany () tradicional". Portanto, está explicitamente dizendo que isso funciona.
quer
1
Eu estou corrigido. Eu não notei o uso do método values ​​() (sem ele o SQLAlchemy apenas executa muitos). Eu diria que edite a resposta para incluir um link para esse documento para que eu possa alterar meu voto, mas obviamente você já o incluiu. Talvez mencione que isso não é o mesmo que chamar um insert () com execute () com uma lista de dictos?
sage88
como ele funciona em comparação com execute_values?
MRR
0

execute_batch foi adicionado ao psycopg2 desde que esta pergunta foi publicada.

É mais lento que execute_values, mas mais simples de usar.

gerardw
fonte
2
Veja outros comentários. O método do psycopg2 execute_valuesé mais rápido que oexecute_batch
Fierr 5/04
0

executar aceita array de tuplas

https://www.postgresqltutorial.com/postgresql-python/insert/

    """ array of tuples """
    vendor_list = [(value1,)]

    """ insert multiple vendors into the vendors table  """
    sql = "INSERT INTO vendors(vendor_name) VALUES(%s)"
    conn = None
    try:
        # read database configuration
        params = config()
        # connect to the PostgreSQL database
        conn = psycopg2.connect(**params)
        # create a new cursor
        cur = conn.cursor()
        # execute the INSERT statement
        cur.executemany(sql,vendor_list)
        # commit the changes to the database
        conn.commit()
        # close communication with the database
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()
Grigory
fonte
-1

Se você deseja inserir várias linhas em um status de inserção (supondo que você não esteja usando o ORM), a maneira mais fácil até agora para mim seria usar a lista de dicionários. Aqui está um exemplo:

 t = [{'id':1, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 6},
      {'id':2, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 7},
      {'id':3, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 8}]

conn.execute("insert into campaign_dates
             (id, start_date, end_date, campaignid) 
              values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);",
             t)

Como você pode ver, apenas uma consulta será executada:

INFO sqlalchemy.engine.base.Engine insert into campaign_dates (id, start_date, end_date, campaignid) values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);
INFO sqlalchemy.engine.base.Engine [{'campaignid': 6, 'id': 1, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 7, 'id': 2, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 8, 'id': 3, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}]
INFO sqlalchemy.engine.base.Engine COMMIT
Alex
fonte
Mostrar o log do mecanismo sqlalchemy NÃO é uma demonstração de apenas uma consulta sendo executada, apenas significa que o mecanismo sqlalchemy executou um comando. Sob o capô, está usando muitos executores do psychopg2, que são muito ineficientes. Veja o método execute docs.sqlalchemy.org/en/latest/orm/session_api.html .
precisa saber é
-3

Usando aiopg - O trecho abaixo funciona perfeitamente bem

    # items = [10, 11, 12, 13]
    # group = 1
    tup = [(gid, pid) for pid in items]
    args_str = ",".join([str(s) for s in tup])
    # insert into group values (1, 10), (1, 11), (1, 12), (1, 13)
    yield from cur.execute("INSERT INTO group VALUES " + args_str)
Nihal Sharma
fonte
-4

Finalmente, na versão SQLalchemy1.2, essa nova implementação é adicionada para usar psycopg2.extras.execute_batch () em vez de executemany quando você inicializa seu mecanismo com use_batch_mode = True, como:

engine = create_engine(
    "postgresql+psycopg2://scott:tiger@host/dbname",
    use_batch_mode=True)

http://docs.sqlalchemy.org/en/latest/changelog/migration_12.html#change-4109

Então alguém teria que usar SQLalchmey não se importaria em tentar diferentes combinações de sqla e psycopg2 e direcionar SQL juntos ..

user2189731
fonte