Usando o Apache Spark para fazer ML. Continue recebendo erros de serialização

7

então estou usando o Spark para fazer análises de sentimentos e continuo recebendo erros com os serializadores que ele usa (acho) para transmitir objetos python.

PySpark worker failed with exception:
Traceback (most recent call last):
  File "/Users/abdul/Desktop/RSI/spark-1.0.1-bin-    hadoop1/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/abdul/Desktop/RSI/spark-1.0.1-bin-    hadoop1/python/pyspark/serializers.py", line 191, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/Users/abdul/Desktop/RSI/spark-1.0.1-bin-    hadoop1/python/pyspark/serializers.py", line 123, in dump_stream
    for obj in iterator:
  File "/Users/abdul/Desktop/RSI/spark-1.0.1-bin-    hadoop1/python/pyspark/serializers.py", line 180, in _batched
    for item in iterator:
TypeError: __init__() takes exactly 3 arguments (2 given)

e o código para serializadores está disponível aqui

e meu código está aqui

seashark97
fonte

Respostas:

10

Na maioria das vezes erro serialização em (Py) meios de ignição que alguma parte de seu código distribuído (por exemplo, funções passado para map) tem dependências sobre dados não-serializáveis . Considere o seguinte exemplo:

rdd = sc.parallelize(range(5))
rdd = rdd.map(lambda x: x + 1)
rdd.collect()

Aqui você distribuiu a função de coleção e lambda para enviar a todos os trabalhadores. O Lambda é completamente independente, portanto, é fácil copiar sua representação binária para outros nós sem preocupações.

Agora vamos tornar as coisas um pouco mais interessantes:

f = open("/etc/hosts")
rdd = sc.parallelize(range(100))
rdd = rdd.map(lambda x: f.read())
rdd.collect()
f.close()

Estrondo! Erro estranho no módulo de serialização! O que aconteceu é que tentamos passar f, que é um objeto de arquivo, para os trabalhadores. Obviamente, o objeto de arquivo é um identificador para dados locais e, portanto, não pode ser enviado para outras máquinas.


Então, o que está acontecendo no seu código específico? Sem dados reais e conhecendo o formato do registro, não posso depurá-lo completamente, mas acho que esse problema decorre dessa linha:

def vectorizer(text, vocab=vocab_dict):

No Python, os argumentos das palavras-chave são inicializados quando a função é chamada pela primeira vez. Quando você liga sc.parallelize(...).map(vectorizer)logo após sua definição, vocab_dictestá disponível localmente , mas os funcionários remotos não sabem absolutamente nada sobre isso. Assim, a função é chamada com menos parâmetros do que o esperado, o que resulta em __init__() takes exactly 3 arguments (2 given)erro.

Observe também que você segue um padrão muito ruim de sc.parallelize(...)...collect()chamadas. Primeiro, você espalha sua coleção por todo o cluster, faz alguns cálculos e depois obtém o resultado. Mas enviar dados para lá e para cá é inútil aqui. Em vez disso, você pode fazer esses cálculos localmente e executar os processos paralelos do Spark apenas quando você trabalha com conjuntos de dados realmente grandes (como o principal amazon_dataset, eu acho).

amiga
fonte