Inserção em massa com SQLAlchemy ORM

130

Existe alguma maneira de fazer com que o SQLAlchemy faça uma inserção em massa em vez de inserir cada objeto individual. ou seja,

fazendo:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

ao invés de:

INSERT INTO `foo` (`bar`) VALUES (1)
INSERT INTO `foo` (`bar`) VALUES (2)
INSERT INTO `foo` (`bar`) VALUES (3)

Acabei de converter um código para usar sqlalchemy em vez de raw sql e, embora agora seja muito mais agradável trabalhar com ele, parece mais lento agora (até um fator de 10), estou me perguntando se esse é o motivo.

Talvez eu possa melhorar a situação usando as sessões com mais eficiência. No momento eu tenho autoCommit=Falsee faço um session.commit()depois que eu adicionei algumas coisas. Embora isso pareça fazer com que os dados fiquem obsoletos se o banco de dados for alterado em outro lugar, como, mesmo que eu faça uma nova consulta, ainda recupero resultados antigos?

Obrigado pela ajuda!

Nick Holden
fonte
1
Isso pode ajudar: stackoverflow.com/questions/270879/…
Sean Vieira
1
Nick, eu entendo que este é um post muito antigo. Seria possível atualizar o título para algo correto como "inserção de vários registros com SQLAlchemy ORM". Instruções de inserção de vários registros, como a que você forneceu, são bastante diferentes das operações de carregamento em massa no nível do banco de dados. As inserções em massa destinam-se a uploads de 1k + dados, geralmente de grandes conjuntos de dados e feitos por gerentes de aplicativos, não operações REST ou código no nível do aplicativo ... Vamos usar nossa nomenclatura corretamente.
precisa saber é o seguinte
Para aqueles que se deparam com essa pergunta enquanto procuram informações sobre operações em massa no sqlalchemy Core (não ORM), consulte minha resposta para outra pergunta .
Nickolay

Respostas:

173

SQLAlchemy introduziu isso na versão 1.0.0:

Operações em massa - SQLAlchemy docs

Com essas operações, agora você pode fazer inserções ou atualizações em massa!

Por exemplo, você pode fazer:

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

Aqui, uma inserção em massa será feita.

Pierre
fonte
30
Você também precisa do s.commit () para salvar os registros (demorei um pouco para descobrir isso).
precisa saber é
3
Eu tentei isso com sqlachemy 1.0.11 e ainda faz 3 instruções de inserção. Mas é muito mais rápido que as operações normais da orm.
Zidarsk8
3
Embora não seja pertinente à questão dos POs, vale ressaltar que isso quebra alguns recursos do ORM. docs.sqlalchemy.org/en/rel_1_0/orm/...
dangel
@angel sim, obrigado por postar isso. Embora o título do OP diga respeito ao "carregamento em massa", sua pergunta sobre instruções de inserção de vários registros não tem nada a ver com o recurso de carregamento em massa do sqlalchemy.
precisa saber é o seguinte
Comparado à inserção dos mesmos dados do CSV com \copypsql (do mesmo cliente para o mesmo servidor), vejo uma enorme diferença no desempenho no lado do servidor, resultando em cerca de 10x mais inserções / s. Aparentemente, o carregamento em massa é muito melhor usando \copy(ou COPYno servidor) usando um pacote de comunicação entre cliente e servidor do que usar SQL via SQLAlchemy. Mais informações: grande volume de inserção diferença de desempenho PostgreSQL vs ... .
gertvdijk
42

Os documentos do sqlalchemy têm um resumo do desempenho de várias técnicas que podem ser usadas para inserções em massa:

Os ORMs não se destinam basicamente a pastilhas a granel de alto desempenho - esse é o motivo pelo qual o SQLAlchemy oferece o Core, além do ORM como um componente de primeira classe.

Para o caso de uso de inserções em massa rápidas, o sistema de geração e execução de SQL que o ORM constrói sobre ele faz parte do Core. Usando esse sistema diretamente, podemos produzir um INSERT que seja competitivo com o uso direto da API do banco de dados bruto.

Como alternativa, o SQLAlchemy ORM oferece o conjunto de métodos Bulk Operations, que fornece ganchos nas subseções do processo da unidade de trabalho para emitir construções INSERT e UPDATE no nível do núcleo com um pequeno grau de automação baseada em ORM.

O exemplo abaixo ilustra testes baseados em tempo para vários métodos diferentes de inserção de linhas, passando do mais automatizado para o menos. Com o cPython 2.7, os tempos de execução observados:

classics-MacBook-Pro:sqlalchemy classic$ python test.py
SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs
SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs
SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs
SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs
sqlite3: Total time for 100000 records 0.487842082977 sec

Roteiro:

import time
import sqlite3

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String,  create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

Base = declarative_base()
DBSession = scoped_session(sessionmaker())
engine = None


class Customer(Base):
    __tablename__ = "customer"
    id = Column(Integer, primary_key=True)
    name = Column(String(255))


def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'):
    global engine
    engine = create_engine(dbname, echo=False)
    DBSession.remove()
    DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)


def test_sqlalchemy_orm(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer()
        customer.name = 'NAME ' + str(i)
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_pk_given(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer(id=i+1, name="NAME " + str(i))
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM pk given: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_bulk_insert(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    n1 = n
    while n1 > 0:
        n1 = n1 - 10000
        DBSession.bulk_insert_mappings(
            Customer,
            [
                dict(name="NAME " + str(i))
                for i in xrange(min(10000, n1))
            ]
        )
    DBSession.commit()
    print(
        "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
        [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )
    print(
        "SQLAlchemy Core: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def init_sqlite3(dbname):
    conn = sqlite3.connect(dbname)
    c = conn.cursor()
    c.execute("DROP TABLE IF EXISTS customer")
    c.execute(
        "CREATE TABLE customer (id INTEGER NOT NULL, "
        "name VARCHAR(255), PRIMARY KEY(id))")
    conn.commit()
    return conn


def test_sqlite3(n=100000, dbname='sqlite3.db'):
    conn = init_sqlite3(dbname)
    c = conn.cursor()
    t0 = time.time()
    for i in xrange(n):
        row = ('NAME ' + str(i),)
        c.execute("INSERT INTO customer (name) VALUES (?)", row)
    conn.commit()
    print(
        "sqlite3: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " sec")

if __name__ == '__main__':
    test_sqlalchemy_orm(100000)
    test_sqlalchemy_orm_pk_given(100000)
    test_sqlalchemy_orm_bulk_insert(100000)
    test_sqlalchemy_core(100000)
    test_sqlite3(100000)
Grant Humphries
fonte
1
Obrigado. Realmente útil e completo.
Steve B.
Eu vi outro exemplo usando bindparams. A sintaxe parece sucinta, isso é bom?
Jay
35

Até onde eu sei, não há como fazer com que o ORM emita inserções em massa. Acredito que a razão subjacente é que o SQLAlchemy precisa acompanhar a identidade de cada objeto (ou seja, novas chaves primárias), e inserções em massa interferem nisso. Por exemplo, supondo que sua footabela contenha uma idcoluna e seja mapeada para uma Fooclasse:

x = Foo(bar=1)
print x.id
# None
session.add(x)
session.flush()
# BEGIN
# INSERT INTO foo (bar) VALUES(1)
# COMMIT
print x.id
# 1

Como o SQLAlchemy selecionou o valor x.idsem emitir outra consulta, podemos inferir que ele obteve o valor diretamente da INSERTinstrução Se você não precisar de acesso subseqüente aos objetos criados pelas mesmas instâncias, poderá pular a camada ORM da sua inserção:

Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}])
# INSERT INTO foo (bar) VALUES ((1,), (2,), (3,))

O SQLAlchemy não pode corresponder essas novas linhas a nenhum objeto existente; portanto, você precisará consultá-las novamente para operações subsequentes.

No que diz respeito aos dados obsoletos, é útil lembrar que a sessão não possui uma maneira integrada de saber quando o banco de dados é alterado fora da sessão. Para acessar dados modificados externamente através de instâncias existentes, as instâncias devem ser marcadas como expiradas . Isso acontece por padrão session.commit(), mas pode ser feito manualmente chamando session.expire_all()ou session.expire(instance). Um exemplo (SQL omitido):

x = Foo(bar=1)
session.add(x)
session.commit()
print x.bar
# 1
foo.update().execute(bar=42)
print x.bar
# 1
session.expire(x)
print x.bar
# 42

session.commit()expira x, então a primeira instrução de impressão abre implicitamente uma nova transação e consulta novamente os xatributos. Se você comentar a primeira declaração de impressão, notará que a segunda agora seleciona o valor correto, porque a nova consulta não é emitida até depois da atualização.

Isso faz sentido do ponto de vista do isolamento transacional - você só deve captar modificações externas entre transações. Se isso estiver causando problemas, sugiro que você esclareça ou repense os limites de transação do aplicativo em vez de procurar imediatamente session.expire_all().

dhaffey
fonte
Obrigado pela sua resposta, vou tentar. WRT a questão expirando, o que vi não era o mesmo. Estou usando uma sessão de escopo em turbogears. Executar um getSession (). Query (Foo) .filter .... all () retornou coisas diferentes dependendo da solicitação, também não retornou os registros atualizados que estavam no db até que eu o reiniciei. Corrigi esse problema executando um autocommit = True e adicionando algo que .remove () da sessão após a conclusão da solicitação (acho que você deve fazer isso de qualquer maneira).
Nick Holden
Eu acho que ele retornou coisas diferentes, dependendo da solicitação, porque tinha uma sessão com escopo definido por thread no pool e as sessões estavam em estados diferentes? Parecia um pouco estranho que sa não recebesse novos dados após uma nova solicitação. Espero que eu estou missunderstanding o autocommit = False está fazendo
Nick Holden
Com autocommit=False, acredito que você deve ligar session.commit()após a conclusão da solicitação (não estou familiarizado com o TurboGears, então ignore isso se isso for tratado para você no nível da estrutura). Além de garantir que suas alterações tenham sido feitas no banco de dados, isso expiraria tudo na sessão. A próxima transação não começaria até o próximo uso dessa sessão, portanto, solicitações futuras no mesmo encadeamento não veriam dados obsoletos.
dhaffey
10
Estilo alternativo:session.execute(Foo.__table__.insert(), values)
Joril 03/02
6
Observe que as versões mais recentes do sqlalchemy têm recursos de inserção em massa: docs.sqlalchemy.org/en/latest/orm/…
Wayne Werner
18

Eu costumo fazer isso usando add_all.

from app import session
from models import User

objects = [User(name="u1"), User(name="u2"), User(name="u3")]
session.add_all(objects)
session.commit()
reubano
fonte
2
Tem certeza de que isso funciona? Isso não equivale apenas .addà sessão, uma de cada vez?
Alec
Isso seria contra-intuitivo, dado o nome do método, os documentos não entram em detalhes: Add the given collection of instances to this Session.você tem algum motivo para acreditar que ele não faz uma inserção em massa?
Reubano 18/04
3
Eu não acho que seja muito contra-intuitivo - na verdade, adiciona todas as coisas que você pede. Nada sobre a adição de todas as coisas à sessão parece implicar quais instruções SQL subjacentes são emitidas. Olhando para a fonte: github.com/zzzeek/sqlalchemy/blob/… , de fato, parece apenas .addcada item individualmente.
Alec
Funciona bem, comparado a bulk_save_objects(), com a flush(), podemos obter o ID do objeto, mas bulk_save_objects()não podemos (evento com flush()chamado).
coanor
14

Suporte direto foi adicionado ao SQLAlchemy a partir da versão 0.8

De acordo com os documentos , connection.execute(table.insert().values(data))deve fazer o truque. (Observe que isso não é o mesmo connection.execute(table.insert(), data)que resulta em muitas inserções de linha individuais por meio de uma chamada para executemany). Em qualquer coisa, exceto em uma conexão local, a diferença de desempenho pode ser enorme.

user3805082
fonte
10

SQLAlchemy introduziu isso na versão 1.0.0:

Operações em massa - SQLAlchemy docs

Com essas operações, agora você pode fazer inserções ou atualizações em massa!

Por exemplo (se você deseja a menor sobrecarga para INSERTs de tabela simples), pode usar Session.bulk_insert_mappings():

loadme = [(1, 'a'),
          (2, 'b'),
          (3, 'c')]
dicts = [dict(bar=t[0], fly=t[1]) for t in loadme]

s = Session()
s.bulk_insert_mappings(Foo, dicts)
s.commit()

Ou, se desejar, pule as loadmetuplas e escreva os dicionários diretamente dicts(mas acho mais fácil deixar toda a wordiness fora dos dados e carregar uma lista de dicionários em um loop).

juanitogan
fonte
7

A resposta de Piere está correta, mas um problema é que, bulk_save_objectspor padrão, não retorna as chaves primárias dos objetos, se isso lhe interessa. Defina return_defaultscomo Truepara obter esse comportamento.

A documentação está aqui .

foos = [Foo(bar='a',), Foo(bar='b'), Foo(bar='c')]
session.bulk_save_objects(foos, return_defaults=True)
for foo in foos:
    assert foo.id is not None
session.commit()
Matthew Moisen
fonte
2
Um cuidado deve ser tomado com a bandeira. Ele inserirá um objeto por vez seqüencialmente e o ganho significativo de desempenho pode não estar lá [1]. No meu caso, o desempenho diminuiu e eu suspeitei devido à sobrecarga. [1]: docs.sqlalchemy.org/pt/13/orm/…
dhfromkorea 23/07/19
6

Todas as estradas levam a Roma , mas algumas atravessam montanhas, exigem balsas, mas se você quiser chegar rapidamente, pegue a rodovia.


Nesse caso, a auto-estrada deve usar o recurso execute_batch () do psycopg2 . A documentação diz o melhor:

A implementação atual de executemany()(usando um eufemismo extremamente caridoso) não está sendo executada particularmente. Essas funções podem ser usadas para acelerar a execução repetida de uma instrução contra um conjunto de parâmetros. Ao reduzir o número de viagens de ida e volta ao servidor, o desempenho pode ser uma ordem de magnitude melhor que o uso executemany().

Em meu próprio teste execute_batch()é aproximadamente duas vezes mais rápido como executemany(), e dá a opção de configurar o page_size para mais ajustes (se você quiser espremer a última 2-3% do desempenho fora do motorista).

O mesmo recurso pode ser facilmente ativado se você estiver usando SQLAlchemy, configurando use_batch_mode=Truecomo um parâmetro ao instanciar o mecanismo comcreate_engine()

chjortlund
fonte
Nota: o psycopg2 execute_valuesé mais rápido que o psycopg2 execute_batchao fazer inserções em massa!
Fierr 05/04
5

Esta é uma maneira:

values = [1, 2, 3]
Foo.__table__.insert().execute([{'bar': x} for x in values])

Isso será inserido assim:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

Referência: as perguntas frequentes do SQLAlchemy incluem referências para vários métodos de confirmação.

Eefret
fonte
3

A melhor resposta que encontrei até agora foi na documentação do sqlalchemy:

http://docs.sqlalchemy.org/en/latest/faq/performance.html#im-inserting-400-000-rows-with-the-orm-and-it-s-really-slow

Há um exemplo completo de uma referência de possíveis soluções.

Como mostrado na documentação:

bulk_save_objects não é a melhor solução, mas seu desempenho está correto.

A segunda melhor implementação em termos de legibilidade, acho que foi com o SQLAlchemy Core:

def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
            [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )

O contexto dessa função é fornecido no artigo de documentação.

lelabo_m
fonte