Evitando vazamentos de memória com Scalaz 7 zipWithIndex / group enumeratees

106

fundo

Conforme observado nesta pergunta , estou usando o Scalaz 7 iteratees para processar um grande (ou seja, ilimitado) fluxo de dados em um espaço de heap constante.

Meu código é parecido com este:

type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]

def processChunk(c: Chunk, idx: Long): Result

def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =
  Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>
    rs ++ vs map { 
      case (c, i) => processChunk(c, i) 
    }
  } &= (data.zipWithIndex mapE Iteratee.group(P))

O problema

Parece que tive um vazamento de memória, mas não estou familiarizado o suficiente com o Scalaz / FP para saber se o bug está no Scalaz ou no meu código. Intuitivamente, espero que este código exija apenas (na ordem de) P vezes o Chunkespaço -size.

Observação: encontrei uma pergunta semelhante na qual um OutOfMemoryErrorfoi encontrado, mas meu código não está usando consume.

Testando

Fiz alguns testes para tentar isolar o problema. Para resumir, o vazamento só parece surgir quando ambos zipWithIndexe groupsão usados.

// no zipping/grouping
scala> (i1 &= enumArrs(1 << 25, 128)).run.unsafePerformIO
res47: Long = 4294967296

// grouping only
scala> (i2 &= (enumArrs(1 << 25, 128) mapE Iteratee.group(4))).run.unsafePerformIO
res49: Long = 4294967296

// zipping and grouping
scala> (i3 &= (enumArrs(1 << 25, 128).zipWithIndex mapE Iteratee.group(4))).run.unsafePerformIO
java.lang.OutOfMemoryError: Java heap space

// zipping only
scala> (i4 &= (enumArrs(1 << 25, 128).zipWithIndex)).run.unsafePerformIO
res51: Long = 4294967296

// no zipping/grouping, larger arrays
scala> (i1 &= enumArrs(1 << 27, 128)).run.unsafePerformIO
res53: Long = 17179869184

// zipping only, larger arrays
scala> (i4 &= (enumArrs(1 << 27, 128).zipWithIndex)).run.unsafePerformIO
res54: Long = 17179869184

Código para os testes:

import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.vector._

// define an enumerator that produces a stream of new, zero-filled arrays
def enumArrs(sz: Int, n: Int) = 
  Iteratee.enumIterator[Array[Int], IO](
    Iterator.continually(Array.fill(sz)(0)).take(n))

// define an iteratee that consumes a stream of arrays 
// and computes its length
val i1 = Iteratee.fold[Array[Int], IO, Long](0) { 
  (c, a) => c + a.length 
}

// define an iteratee that consumes a grouped stream of arrays 
// and computes its length
val i2 = Iteratee.fold[Vector[Array[Int]], IO, Long](0) { 
  (c, as) => c + as.map(_.length).sum 
}

// define an iteratee that consumes a grouped/zipped stream of arrays
// and computes its length
val i3 = Iteratee.fold[Vector[(Array[Int], Long)], IO, Long](0) {
  (c, vs) => c + vs.map(_._1.length).sum
}

// define an iteratee that consumes a zipped stream of arrays
// and computes its length
val i4 = Iteratee.fold[(Array[Int], Long), IO, Long](0) {
  (c, v) => c + v._1.length
}

Questões

  • O bug está no meu código?
  • Como posso fazer isso funcionar em um espaço de heap constante?
Aaron Novstrup
fonte
6
Acabei relatando isso como um problema no Scalaz .
Aaron Novstrup
1
Não será divertido, mas você pode tentar -XX:+HeapDumpOnOutOfMemoryErrore analisar o dump com eclipse MAT eclipse.org/mat para ver que linha de código está segurando os arrays.
Huynhjl,
10
@huynhjl FWIW, tentei analisar o heap com JProfiler e MAT, mas fui completamente incapaz de percorrer todas as referências a classes de função anônimas, etc. Scala realmente precisa de ferramentas dedicadas para esse tipo de coisa.
Aaron Novstrup
E se não houver vazamento e apenas o que você está fazendo requer uma quantidade cada vez maior de memória? Você pode replicar facilmente o zipWithIndex sem aquela construção FP em particular, apenas mantendo um varcontador conforme você avança.
Ezekiel Victor
@EzekielVictor Não tenho certeza se entendi o comentário. Você está sugerindo que adicionar um único Longíndice por bloco mudaria o algoritmo de espaço de heap constante para não constante? A versão não compactada claramente usa espaço de heap constante, porque pode "processar" tantos pedaços quanto você estiver disposto a esperar.
Aaron Novstrup

Respostas:

4

Isso não servirá de consolo para quem está preso à iterateeAPI mais antiga , mas recentemente verifiquei que um teste equivalente foi aprovado na API scalaz-stream . Esta é uma API de processamento de fluxo mais recente que deve ser substituída iteratee.

Para completar, aqui está o código de teste:

// create a stream containing `n` arrays with `sz` Ints in each one
def streamArrs(sz: Int, n: Int): Process[Task, Array[Int]] =
  (Process emit Array.fill(sz)(0)).repeat take n

(streamArrs(1 << 25, 1 << 14).zipWithIndex 
      pipe process1.chunk(4) 
      pipe process1.fold(0L) {
    (c, vs) => c + vs.map(_._1.length.toLong).sum
  }).runLast.run

Isso deve funcionar com qualquer valor para o nparâmetro (desde que você esteja disposto a esperar o suficiente) - testei com 2 ^ 14 matrizes de 32 MiB (ou seja, um total de meio TiB de memória alocada ao longo do tempo).

Aaron Novstrup
fonte