então estou usando o Spark para fazer análises de sentimentos e continuo recebendo erros com os serializadores que ele usa (acho) para transmitir objetos python.
PySpark worker failed with exception:
Traceback (most recent call last):
File "/Users/abdul/Desktop/RSI/spark-1.0.1-bin- hadoop1/python/pyspark/worker.py", line 77, in main
serializer.dump_stream(func(split_index, iterator), outfile)
File "/Users/abdul/Desktop/RSI/spark-1.0.1-bin- hadoop1/python/pyspark/serializers.py", line 191, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/Users/abdul/Desktop/RSI/spark-1.0.1-bin- hadoop1/python/pyspark/serializers.py", line 123, in dump_stream
for obj in iterator:
File "/Users/abdul/Desktop/RSI/spark-1.0.1-bin- hadoop1/python/pyspark/serializers.py", line 180, in _batched
for item in iterator:
TypeError: __init__() takes exactly 3 arguments (2 given)
e o código para serializadores está disponível aqui
e meu código está aqui
apache-spark
pyspark
sentiment-analysis
seashark97
fonte
fonte
Respostas:
Na maioria das vezes erro serialização em (Py) meios de ignição que alguma parte de seu código distribuído (por exemplo, funções passado para
map
) tem dependências sobre dados não-serializáveis . Considere o seguinte exemplo:Aqui você distribuiu a função de coleção e lambda para enviar a todos os trabalhadores. O Lambda é completamente independente, portanto, é fácil copiar sua representação binária para outros nós sem preocupações.
Agora vamos tornar as coisas um pouco mais interessantes:
Estrondo! Erro estranho no módulo de serialização! O que aconteceu é que tentamos passar
f
, que é um objeto de arquivo, para os trabalhadores. Obviamente, o objeto de arquivo é um identificador para dados locais e, portanto, não pode ser enviado para outras máquinas.Então, o que está acontecendo no seu código específico? Sem dados reais e conhecendo o formato do registro, não posso depurá-lo completamente, mas acho que esse problema decorre dessa linha:
No Python, os argumentos das palavras-chave são inicializados quando a função é chamada pela primeira vez. Quando você liga
sc.parallelize(...).map(vectorizer)
logo após sua definição,vocab_dict
está disponível localmente , mas os funcionários remotos não sabem absolutamente nada sobre isso. Assim, a função é chamada com menos parâmetros do que o esperado, o que resulta em__init__() takes exactly 3 arguments (2 given)
erro.Observe também que você segue um padrão muito ruim de
sc.parallelize(...)...collect()
chamadas. Primeiro, você espalha sua coleção por todo o cluster, faz alguns cálculos e depois obtém o resultado. Mas enviar dados para lá e para cá é inútil aqui. Em vez disso, você pode fazer esses cálculos localmente e executar os processos paralelos do Spark apenas quando você trabalha com conjuntos de dados realmente grandes (como o principalamazon_dataset
, eu acho).fonte