Fazendo loop em 16 milhões de registros usando o ArcPy?

13

Eu tenho uma tabela com 8 colunas e ~ 16,7 milhões de registros. Eu preciso executar um conjunto de equações if-else nas colunas. Eu escrevi um script usando o módulo UpdateCursor, mas depois de alguns milhões de registros, ele fica sem memória. Fiquei me perguntando se existe uma maneira melhor de processar esses 16,7 milhões de registros.

import arcpy

arcpy.TableToTable_conversion("combine_2013", "D:/mosaic.gdb", "combo_table")

c_table = "D:/mosaic.gdb/combo_table"

fields = ['dev_agg', 'herb_agg','forest_agg','wat_agg', 'cate_2']

start_time = time.time()
print "Script Started"
with arcpy.da.UpdateCursor(c_table, fields) as cursor:
    for row in cursor:
        # row's 0,1,2,3,4 = dev, herb, forest, water, category
        #classficiation water = 1; herb = 2; dev = 3; forest = 4
        if (row[3] >= 0 and row[3] > row[2]):
            row[4] = 1
        elif (row[2] >= 0 and row[2] > row[3]):
            row[4] = 4
        elif (row[1] > 180):
            row[4] = 2
        elif (row[0] > 1):
            row[4] = 3
        cursor.updateRow(row)
end_time = time.time() - start_time
print "Script Complete - " +  str(end_time) + " seconds"

ATUALIZAÇÃO # 1

Executei o mesmo script em um computador com 40 GB de RAM (o computador original tinha apenas 12 GB de RAM). Concluiu com sucesso após ~ 16 horas. Sinto que 16 horas são muito longas, mas nunca trabalhei com um conjunto de dados tão grande, então não sei o que esperar. A única nova adição a esse script é arcpy.env.parallelProcessingFactor = "100%". Estou tentando dois métodos sugeridos (1) fazendo 1 milhão de registros em lotes e (2) usando o SearchCursor e gravando saídas no CSV. Vou relatar o progresso em breve.

ATUALIZAÇÃO # 2

A atualização do SearchCursor e CSV funcionou de maneira brilhante! Eu não tenho os tempos de execução precisos, atualizarei a postagem quando estiver no escritório amanhã, mas diria que o tempo de execução aproximado é de aproximadamente 5-6 minutos, o que é bastante impressionante. Eu não estava esperando por isso. Estou compartilhando meu código não polido. Todos os comentários e melhorias são bem-vindos:

import arcpy, csv, time
from arcpy import env

arcpy.env.parallelProcessingFactor = "100%"

arcpy.TableToTable_conversion("D:/mosaic.gdb/combine_2013", "D:/mosaic.gdb", "combo_table")
arcpy.AddField_management("D:/mosaic.gdb/combo_table","category","SHORT")

# Table
c_table = "D:/mosaic.gdb/combo_table"
fields = ['wat_agg', 'dev_agg', 'herb_agg','forest_agg','category', 'OBJECTID']

# CSV
c_csv = open("D:/combine.csv", "w")
c_writer = csv.writer(c_csv, delimiter= ';',lineterminator='\n')
c_writer.writerow (['OID', 'CATEGORY'])
c_reader = csv.reader(c_csv)

start_time = time.time()
with arcpy.da.SearchCursor(c_table, fields) as cursor:
    for row in cursor:
        #skip file headers
        if c_reader.line_num == 1:
            continue
        # row's 0,1,2,3,4,5 = water, dev, herb, forest, category, oid
        #classficiation water = 1; dev = 2; herb = 3; ; forest = 4
        if (row[0] >= 0 and row[0] > row[3]):
            c_writer.writerow([row[5], 1])
        elif (row[1] > 1):
            c_writer.writerow([row[5], 2])
        elif (row[2] > 180):
            c_writer.writerow([row[5], 3])
        elif (row[3] >= 0 and row[3] > row[0]):
            c_writer.writerow([row[5], 4])

c_csv.close()
end_time =  time.time() - start_time
print str(end_time) + " - Seconds"

ATUALIZAÇÃO # 3 Atualização final. O tempo total de execução do script é de ~ 199,6 segundos / 3,2 minutos.

cptpython
fonte
1
Você está usando 64 bits (em segundo plano, servidor ou Pro)?
KHibma
Esqueci de mencionar. Estou executando 10,4 x64 em segundo plano.
Cptpython 6/12/16
Advogado do diabo - você já tentou executá-lo em primeiro plano ou no IDLE, pois, olhando para o seu script, não precisa ter o ArcMap aberto?
Hornbydd 06/12/19
executá-lo como um script autônomo ou se você sabe SQL, carregar o shapefile para PostgreSQL e fazê-lo lá
Ziggy
1
Entendo que é de código aberto, mas o processo de aprovação leva de uma a duas semanas, e isso é sensível ao tempo, então não acho que seja viável nesse caso.
Cptpython 6/12/16

Respostas:

4

Você pode gravar o Objectid e o resultado do cálculo (cate_2) em um arquivo csv. Em seguida, junte o csv ao seu arquivo original, preencha um campo para preservar o resultado. Dessa forma, você não está atualizando a tabela usando o cursor DA. Você pode usar um cursor de pesquisa.

klewis
fonte
Eu estava pensando a mesma coisa que há uma discussão aqui e eles estão falando sobre conjuntos de dados ainda maiores.
Hornbydd 06/12/19
Obrigado, Klewis. Isso parece promissor. Vou tentar junto com a sugestão do FelixIP e uma discussão interessante, embora tenha que executar isso algumas dezenas de vezes.
Cptpython 6/12/16
Trabalhou brilhantemente! Atualizei a pergunta com o script mais recente. Obrigado!
Cptpython 8/12/16
2

Desculpas, se eu continuar revivendo esse tópico antigo. A idéia era executar as instruções if-else na varredura combinada e, em seguida, usar o novo campo na Pesquisa para criar uma nova varredura. Eu compliquei o problema exportando os dados como uma tabela e introduzi um fluxo de trabalho ineficiente, abordado por @Alex Tereshenkov. Depois de perceber o óbvio, agrupei os dados em 17 consultas (1 milhão cada), conforme sugerido pelo @FelixIP. Cada lote levou em média ~ 1,5 minutos para concluir e o tempo total de execução foi de ~ 23,3 minutos. Esse método elimina a necessidade de junções e acho que esse método realiza melhor a tarefa. Aqui está um script revisado para referência futura:

import arcpy, time
from arcpy import env

def cursor():
    combine = "D:/mosaic.gdb/combine_2013"
    #arcpy.AddField_management(combine,"cat_1","SHORT")
    fields = ['wat_agg', 'dev_agg', 'herb_agg','forest_agg', 'cat_1']
    batch = ['"OBJECTID" >= 1 AND "OBJECTID" <= 1000000', '"OBJECTID" >= 1000001 AND "OBJECTID" <= 2000000', '"OBJECTID" >= 2000001 AND "OBJECTID" <= 3000000', '"OBJECTID" >= 3000001 AND "OBJECTID" <= 4000000', '"OBJECTID" >= 4000001 AND "OBJECTID" <= 5000000', '"OBJECTID" >= 5000001 AND "OBJECTID" <= 6000000', '"OBJECTID" >= 6000001 AND "OBJECTID" <= 7000000', '"OBJECTID" >= 7000001 AND "OBJECTID" <= 8000000', '"OBJECTID" >= 8000001 AND "OBJECTID" <= 9000000', '"OBJECTID" >= 9000001 AND "OBJECTID" <= 10000000', '"OBJECTID" >= 10000001 AND "OBJECTID" <= 11000000', '"OBJECTID" >= 11000001 AND "OBJECTID" <= 12000000', '"OBJECTID" >= 12000001 AND "OBJECTID" <= 13000000', '"OBJECTID" >= 13000001 AND "OBJECTID" <= 14000000', '"OBJECTID" >= 14000001 AND "OBJECTID" <= 15000000', '"OBJECTID" >= 15000001 AND "OBJECTID" <= 16000000', '"OBJECTID" >= 16000001 AND "OBJECTID" <= 16757856']
    for i in batch:
        start_time = time.time()
        with arcpy.da.UpdateCursor(combine, fields, i) as cursor:
            for row in cursor:
            # row's 0,1,2,3,4,5 = water, dev, herb, forest, category
            #classficiation water = 1; dev = 2; herb = 3; ; forest = 4
                if (row[0] >= 0 and row[0] >= row[3]):
                    row[4] = 1
                elif (row[1] > 1):
                    row[4] = 2
                elif (row[2] > 180):
                    row[4] = 3
                elif (row[3] >= 0 and row[3] > row[0]):
                    row[4] = 4
                cursor.updateRow(row)
        end_time =  time.time() - start_time
        print str(end_time) + " - Seconds"

cursor()
cptpython
fonte
Então, apenas para ter certeza de que estou entendendo isso corretamente. Na sua postagem original, você disse que quando executou isso em um computador com 40 GB de RAM, levou aproximadamente 16 horas no total. Mas agora que você o dividiu em 17 lotes, levou cerca de 23 minutos no total. Isso está correto?
Ianbroad #
Corrigir. A primeira execução levou ~ 16 horas com 40 GB de RAM e a segunda execução levou ~ 23 minutos + outros ~ 15 minutos para executar Lookupe exportar a varredura com categorias recém-definidas.
Cptpython
Apenas uma nota que arcpy.env.parallelProcessingFactor = "100%"não tem efeito no seu script. Não vejo nenhuma ferramenta que aproveite esse ambiente.
KHibma
Você está correto. Vou editar o código.
Cptpython
1

Você pode tentar mudar para usar CalculateField_management . Isso evita a repetição do uso de cursores e, pela aparência de suas opções para o valor da categoria, você pode configurá-lo como quatro subprocessos gerados sequencialmente. À medida que cada subprocesso termina, sua memória é liberada antes de iniciar o próximo. Você leva um pequeno golpe (milissegundo) gerando cada subprocesso.

Ou, se você quiser manter sua abordagem atual, tenha um subprocesso que ocupa x linhas por vez. Tenha um processo principal para controlá-lo e, como antes, você continua limpando sua memória cada vez que termina. O bônus de fazê-lo dessa maneira (especialmente através de um processo python independente) é que você pode usar mais todos os seus núcleos como subprocessos geradores no multithreading de python que você contorna o GIL. Isso é possível com o ArcPy e uma abordagem que eu usei no passado para fazer grandes rotações de dados. Obviamente, mantenha seus blocos de dados baixos, caso contrário você ficará sem memória mais rapidamente!

MappaGnosis
fonte
Na minha experiência, usar arcpy.da.UpdateCursor é muito mais rápido que arcpy.CalculateField_management. Eu escrevi um script que roda em 55.000.000 recursos de construção, era cerca de 5 vezes mais lento com a ferramenta CalculateField.
offermann
O objetivo é configurar quatro subprocessos e limpar a memória, pois esse é o ponto de pitada real aqui. Como descrevi no segundo parágrafo, você pode dividir os subprocessos por linhas, mas isso exige um pouco mais de gerenciamento do que uma única seleção.
MappaGnosis
1

A lógica de manipulação de dados pode ser escrita como uma instrução UPDATE SQL usando uma expressão CASE, que você pode executar usando GDAL / OGR, por exemplo, via OSGeo4W com gdal-filegdbinstalado.

Aqui está o fluxo de trabalho, que usa em osgeo.ogrvez de arcpy:

import time
from osgeo import ogr

ds = ogr.Open('D:/mosaic.gdb', 1)
if ds is None:
    raise ValueError("You don't have a 'FileGDB' driver, or the dataset doesn't exist")
sql = '''\
UPDATE combo_table SET cate_2 = CASE
    WHEN wat_agg >= 0 AND wat_agg > forest_agg THEN 1
    WHEN dev_agg > 1 THEN 2
    WHEN herb_agg > 180 THEN 3
    WHEN forest_agg >= 0 AND forest_agg > wat_agg THEN 4
    END
'''
start_time = time.time()
ds.ExecuteSQL(sql, dialect='sqlite')
ds = None  # save, close
end_time =  time.time() - start_time
print("that took %.1f seconds" % end_time)

Em uma tabela semelhante com pouco mais de 1 milhão de registros, essa consulta levou 18 minutos. Portanto, ainda pode levar de 4 a 5 horas para processar 16 milhões de registros.

Mike T
fonte
Infelizmente, o script faz parte de um script maior escrito usando, arcpymas agradeço a resposta. Estou lentamente tentando usar mais o GDAL.
Cptpython
1

A atualização do código na seção 2 da sua pergunta não mostra como você está ingressando no .csvarquivo de volta à tabela original no geodatabase do arquivo. Você diz que seu script levou ~ 5 minutos para ser executado. Parece justo se você exportou o .csvarquivo apenas sem fazer nenhuma junção. Ao tentar trazer o .csvarquivo de volta ao ArcGIS, você encontrará os problemas de desempenho.

1) Você não pode fazer junções diretamente da .csvtabela de geodatabase, porque o .csvarquivo não possui um OID (ter um campo calculado com valores exclusivos não ajudará, pois você ainda precisará converter seu .csvarquivo em uma tabela de geodatabase). Portanto, alguns minutos para a Table To Tableferramenta GP (você pode usar o in_memoryespaço de trabalho para criar uma tabela temporária lá, será um pouco mais rápido).

2) Após carregar a .csvtabela em um geodatabase, convém criar um índice no campo em que você faria a junção (no seu caso, o valor da fonte objectiddo .csvarquivo. Isso levaria alguns minutos na tabela de linhas de 16 mln).

3) Então você precisaria usar as ferramentas Add Joinou Join FieldGP. Nenhum deles terá um bom desempenho em suas tabelas grandes.

4) Depois, você precisa fazer a Calculate Fieldferramenta GP para calcular os campos recém-unidos. Muitos minutos vão aqui; ainda mais, o cálculo do campo leva mais tempo quando os campos que participam do cálculo são provenientes de uma tabela unida.

Em uma palavra, você não terá nada perto de cinco minutos mencionados. Se você chegar em uma hora, eu ficaria impressionado.

Para evitar lidar com o processamento de grandes conjuntos de dados no ArcGIS, sugiro levar seus dados para fora do ArcGIS em um pandasquadro de dados e fazer todos os seus cálculos lá. Quando terminar, basta escrever as linhas do quadro de dados novamente em uma nova tabela de geodatabase com da.InsertCursor(ou você pode truncar sua tabela existente e gravar suas linhas na fonte).

O código completo que escrevi para comparar isso está abaixo:

import time
from functools import wraps
import arcpy
import pandas as pd

def report_time(func):
    '''Decorator reporting the execution time'''
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(func.__name__, round(end-start,3))
        return result
    return wrapper

#----------------------------------------------------------------------
@report_time
def make_df(in_table,limit):
    columns = [f.name for f in arcpy.ListFields(in_table) if f.name != 'OBJECTID']
    cur = arcpy.da.SearchCursor(in_table,columns,'OBJECTID < {}'.format(limit))
    rows = (row for row in cur)
    df = pd.DataFrame(rows,columns=columns)
    return df

#----------------------------------------------------------------------
@report_time
def calculate_field(df):
    df.ix[(df['DataField2'] % 2 == 0), 'Category'] = 'two'
    df.ix[(df['DataField2'] % 4 == 0), 'Category'] = 'four'
    df.ix[(df['DataField2'] % 5 == 0), 'Category'] = 'five'
    df.ix[(df['DataField2'] % 10 == 0), 'Category'] = 'ten'
    df['Category'].fillna('other', inplace=True)
    return df

#----------------------------------------------------------------------
@report_time
def save_gdb_table(df,out_table):
    rows_to_write = [tuple(r[1:]) for r in df.itertuples()]
    with arcpy.da.InsertCursor(out_table,df.columns) as ins_cur:
        for row in rows_to_write:
            ins_cur.insertRow(row)

#run for tables of various sizes
for limit in [100000,500000,1000000,5000000,15000000]:
    print '{:,}'.format(limit).center(50,'-')

    in_table = r'C:\ArcGIS\scratch.gdb\BigTraffic'
    out_table = r'C:\ArcGIS\scratch.gdb\BigTrafficUpdated'
    if arcpy.Exists(out_table):
        arcpy.TruncateTable_management(out_table)

    df = make_df(in_table,limit=limit)
    df = calculate_field(df)
    save_gdb_table(df, out_table)
    print

Abaixo está a saída do Debug IO (o número relatado é o número de linhas em uma tabela usada) com informações sobre o tempo de execução para funções individuais:

---------------------100,000----------------------
('make_df', 1.141)
('calculate_field', 0.042)
('save_gdb_table', 1.788)

---------------------500,000----------------------
('make_df', 4.733)
('calculate_field', 0.197)
('save_gdb_table', 8.84)

--------------------1,000,000---------------------
('make_df', 9.315)
('calculate_field', 0.392)
('save_gdb_table', 17.605)

--------------------5,000,000---------------------
('make_df', 45.371)
('calculate_field', 1.903)
('save_gdb_table', 90.797)

--------------------15,000,000--------------------
('make_df', 136.935)
('calculate_field', 5.551)
('save_gdb_table', 275.176)

Inserir uma linha com da.InsertCursorleva um tempo constante, ou seja, se inserir 1 linha leva, digamos, 0,1 segundo, para inserir 100 linhas, leva 10 segundos. Infelizmente, mais de 95% do tempo total de execução é gasto lendo a tabela de geodatabase e inserindo as linhas novamente no geodatabase.

O mesmo se aplica à criação de um pandasquadro de dados a partir de um da.SearchCursorgerador e ao cálculo do (s) campo (s). À medida que o número de linhas na tabela de geodatabase de origem dobra, o tempo de execução do script acima também. Obviamente, você ainda precisará usar o Python de 64 bits, pois durante a execução, algumas estruturas de dados maiores serão tratadas na memória.

Alex Tereshenkov
fonte
Na verdade, eu faria outra pergunta que falaria sobre as limitações do método que usei, porque me deparei com os problemas que você abordou acima, então muito obrigado! O que estou tentando realizar: combine quatro rasters e, em seguida, execute a instrução if-else com base nas colunas, escreva as saídas em uma nova coluna e, finalmente, execute Lookuppara criar uma varredura com base nos valores da nova coluna. Meu método teve muitas etapas desnecessárias e fluxo de trabalho ineficiente. Eu deveria ter mencionado isso na minha pergunta original. Viva e aprenda. Vou tentar o seu script ainda esta semana.
Cptpython