Concurso: a maneira mais rápida de classificar uma grande variedade de dados distribuídos gaussianos

71

Seguindo o interesse nesta pergunta , achei interessante tornar as respostas um pouco mais objetivas e quantitativas ao propor um concurso.

A ideia é simples: eu gerei um arquivo binário contendo 50 milhões de duplicados gaussianos distribuídos (média: 0, stdev 1). O objetivo é criar um programa que os classifique na memória o mais rápido possível. Uma implementação de referência muito simples em python leva 1m4s para ser concluída. Quão baixo nós podemos ir?

As regras são as seguintes: responda com um programa que abra o arquivo "gaussian.dat" e classifique os números na memória (não é necessário produzi-los) e instruções para criar e executar o programa. O programa deve poder funcionar na minha máquina Arch Linux (o que significa que você pode usar qualquer linguagem ou biblioteca de programação que seja facilmente instalável neste sistema).

O programa deve ser razoavelmente legível, para que eu possa ter certeza de que é seguro iniciar (nenhuma solução somente para montadores, por favor!).

Vou executar as respostas na minha máquina (quad core, 4 Gigabytes de RAM). A solução mais rápida receberá a resposta aceita e uma recompensa de 100 pontos :)

O programa usado para gerar os números:

#!/usr/bin/env python
import random
from array import array
from sys import argv
count=int(argv[1])
a=array('d',(random.gauss(0,1) for x in xrange(count)))
f=open("gaussian.dat","wb")
a.tofile(f)

A implementação de referência simples:

#!/usr/bin/env python
from array import array
from sys import argv
count=int(argv[1])
a=array('d')
a.fromfile(open("gaussian.dat"),count)
print "sorting..."
b=sorted(a)

EDIT: apenas 4 GB de RAM, desculpe

EDIÇÃO # 2: Observe que o objetivo do concurso é verificar se podemos usar informações anteriores sobre os dados . não é para ser uma partida irritante entre diferentes implementações de linguagem de programação!

static_rtti
fonte
11
Pegue cada valor e mova-o diretamente para a posição "esperada", repita para o valor deslocado. Não tenho certeza de como resolver um problema com isso. Quando terminar, classifique as bolhas até que estejam concluídas (algumas passagens devem ser necessárias).
11
Vou postar uma solução tipo balde de amanhã à noite, se este não tiver sido fechada pelo então :)
11
@static_rtti - como um usuário pesado de CG, esse é exatamente o tipo de coisa que "gostamos" de invadir no CG.SE. Para qualquer mods de leitura, mova para CG, não feche.
arrdem
11
Bem-vindo ao CodeGolf.SE! Limpei muitos comentários do SO original sobre onde isso pertence ou não, e remarcado para estar mais próximo do mainstream do CodeGolf.SE.
precisa
2
A questão complicada aqui é que procuramos critérios objetivos de vitória e "mais rápido" introduz dependências de plataforma ... um algoritmo O (n ^ {1.2}) implementado na máquina virtual cpython supera um O (n ^ {1.3} ) com uma constante semelhante implementada em c? Geralmente, sugiro uma discussão sobre as características de desempenho de cada solução, pois isso pode ajudar as pessoas a julgar o que está acontecendo.
precisa

Respostas:

13

Aqui está uma solução em C ++ que primeiro particiona os números em intervalos com o mesmo número esperado de elementos e depois classifica cada intervalo separadamente. Ele pré-calcula uma tabela da função de distribuição cumulativa com base em algumas fórmulas da Wikipedia e interpola os valores dessa tabela para obter uma aproximação rápida.

Várias etapas são executadas em vários threads para fazer uso dos quatro núcleos.

#include <cstdlib>
#include <math.h>
#include <stdio.h>
#include <algorithm>

#include <tbb/parallel_for.h>

using namespace std;

typedef unsigned long long ull;

double signum(double x) {
    return (x<0) ? -1 : (x>0) ? 1 : 0;
}

const double fourOverPI = 4 / M_PI;

double erf(double x) {
    double a = 0.147;
    double x2 = x*x;
    double ax2 = a*x2;
    double f1 = -x2 * (fourOverPI + ax2) / (1 + ax2);
    double s1 = sqrt(1 - exp(f1));
    return signum(x) * s1;
}

const double sqrt2 = sqrt(2);

double cdf(double x) {
    return 0.5 + erf(x / sqrt2) / 2;
}

const int cdfTableSize = 200;
const double cdfTableLimit = 5;
double* computeCdfTable(int size) {
    double* res = new double[size];
    for (int i = 0; i < size; ++i) {
        res[i] = cdf(cdfTableLimit * i / (size - 1));
    }
    return res;
}
const double* const cdfTable = computeCdfTable(cdfTableSize);

double cdfApprox(double x) {
    bool negative = (x < 0);
    if (negative) x = -x;
    if (x > cdfTableLimit) return negative ? cdf(-x) : cdf(x);
    double p = (cdfTableSize - 1) * x / cdfTableLimit;
    int below = (int) p;
    if (p == below) return negative ? -cdfTable[below] : cdfTable[below];
    int above = below + 1;
    double ret = cdfTable[below] +
            (cdfTable[above] - cdfTable[below])*(p - below);
    return negative ? 1 - ret : ret;
}

void print(const double* arr, int len) {
    for (int i = 0; i < len; ++i) {
        printf("%e; ", arr[i]);
    }
    puts("");
}

void print(const int* arr, int len) {
    for (int i = 0; i < len; ++i) {
        printf("%d; ", arr[i]);
    }
    puts("");
}

void fillBuckets(int N, int bucketCount,
        double* data, int* partitions,
        double* buckets, int* offsets) {
    for (int i = 0; i < N; ++i) {
        ++offsets[partitions[i]];
    }

    int offset = 0;
    for (int i = 0; i < bucketCount; ++i) {
        int t = offsets[i];
        offsets[i] = offset;
        offset += t;
    }
    offsets[bucketCount] = N;

    int next[bucketCount];
    memset(next, 0, sizeof(next));
    for (int i = 0; i < N; ++i) {
        int p = partitions[i];
        int j = offsets[p] + next[p];
        ++next[p];
        buckets[j] = data[i];
    }
}

class Sorter {
public:
    Sorter(double* data, int* offsets) {
        this->data = data;
        this->offsets = offsets;
    }

    static void radixSort(double* arr, int len) {
        ull* encoded = (ull*)arr;
        for (int i = 0; i < len; ++i) {
            ull n = encoded[i];
            if (n & signBit) {
                n ^= allBits;
            } else {
                n ^= signBit;
            }
            encoded[i] = n;
        }

        const int step = 11;
        const ull mask = (1ull << step) - 1;
        int offsets[8][1ull << step];
        memset(offsets, 0, sizeof(offsets));

        for (int i = 0; i < len; ++i) {
            for (int b = 0, j = 0; b < 64; b += step, ++j) {
                int p = (encoded[i] >> b) & mask;
                ++offsets[j][p];
            }
        }

        int sum[8] = {0};
        for (int i = 0; i <= mask; i++) {
            for (int b = 0, j = 0; b < 64; b += step, ++j) {
                int t = sum[j] + offsets[j][i];
                offsets[j][i] = sum[j];
                sum[j] = t;
            }
        }

        ull* copy = new ull[len];
        ull* current = encoded;
        for (int b = 0, j = 0; b < 64; b += step, ++j) {
            for (int i = 0; i < len; ++i) {
                int p = (current[i] >> b) & mask;
                copy[offsets[j][p]] = current[i];
                ++offsets[j][p];
            }

            ull* t = copy;
            copy = current;
            current = t;
        }

        if (current != encoded) {
            for (int i = 0; i < len; ++i) {
                encoded[i] = current[i];
            }
        }

        for (int i = 0; i < len; ++i) {
            ull n = encoded[i];
            if (n & signBit) {
                n ^= signBit;
            } else {
                n ^= allBits;
            }
            encoded[i] = n;
        }
    }

    void operator() (tbb::blocked_range<int>& range) const {
        for (int i = range.begin(); i < range.end(); ++i) {
            double* begin = &data[offsets[i]];
            double* end = &data[offsets[i+1]];
            //std::sort(begin, end);
            radixSort(begin, end-begin);
        }
    }

private:
    double* data;
    int* offsets;
    static const ull signBit = 1ull << 63;
    static const ull allBits = ~0ull;
};

void sortBuckets(int bucketCount, double* data, int* offsets) {
    Sorter sorter(data, offsets);
    tbb::blocked_range<int> range(0, bucketCount);
    tbb::parallel_for(range, sorter);
    //sorter(range);
}

class Partitioner {
public:
    Partitioner(int bucketCount, double* data, int* partitions) {
        this->data = data;
        this->partitions = partitions;
        this->bucketCount = bucketCount;
    }

    void operator() (tbb::blocked_range<int>& range) const {
        for (int i = range.begin(); i < range.end(); ++i) {
            double d = data[i];
            int p = (int) (cdfApprox(d) * bucketCount);
            partitions[i] = p;
        }
    }

private:
    double* data;
    int* partitions;
    int bucketCount;
};

const int bucketCount = 512;
int offsets[bucketCount + 1];

int main(int argc, char** argv) {
    if (argc != 2) {
        printf("Usage: %s N\n N = the size of the input\n", argv[0]);
        return 1;
    }

    puts("initializing...");
    int N = atoi(argv[1]);
    double* data = new double[N];
    double* buckets = new double[N];
    memset(offsets, 0, sizeof(offsets));
    int* partitions = new int[N];

    puts("loading data...");
    FILE* fp = fopen("gaussian.dat", "rb");
    if (fp == 0 || fread(data, sizeof(*data), N, fp) != N) {
        puts("Error reading data");
        return 1;
    }
    //print(data, N);

    puts("assigning partitions...");
    tbb::parallel_for(tbb::blocked_range<int>(0, N),
            Partitioner(bucketCount, data, partitions));

    puts("filling buckets...");
    fillBuckets(N, bucketCount, data, partitions, buckets, offsets);
    data = buckets;

    puts("sorting buckets...");
    sortBuckets(bucketCount, data, offsets);

    puts("done.");

    /*
    for (int i = 0; i < N-1; ++i) {
        if (data[i] > data[i+1]) {
            printf("error at %d: %e > %e\n", i, data[i], data[i+1]);
        }
    }
    */

    //print(data, N);

    return 0;
}

Para compilar e executá-lo, use este comando:

g++ -O3 -ltbb -o gsort gsort.cpp && time ./gsort 50000000

EDIT: agora todos os buckets são colocados na mesma matriz para remover a necessidade de copiar os buckets novamente para a matriz. Além disso, o tamanho da tabela com valores pré-computados foi reduzido, porque os valores são precisos o suficiente. Ainda assim, se eu alterar o número de buckets acima de 256, o programa levará mais tempo para ser executado do que com esse número de buckets.

EDIT: O mesmo algoritmo, linguagem de programação diferente. Usei C ++ em vez de Java e o tempo de execução foi reduzido de ~ 3,2s para ~ 2,35s na minha máquina. O número ideal de buckets ainda está em torno de 256 (novamente, no meu computador).

By the way, tbb é realmente incrível.

EDIT: Fui inspirado pela ótima solução de Alexandru e substituí o std :: sort na última fase por uma versão modificada de sua classificação radix. Eu usei um método diferente para lidar com os números positivos / negativos, mesmo que ele precise de mais passagens pela matriz. Também decidi classificar a matriz exatamente e remover a classificação de inserção. Mais tarde, passarei algum tempo testando como essas alterações influenciam o desempenho e, possivelmente, as revertem. No entanto, usando a classificação radix, o tempo diminuiu de ~ 2,35s para ~ 1,63s.

k21
fonte
Agradável. Eu tenho 3.055 no meu. O menor que eu consegui ter o meu foi 6,3. Estou escolhendo a sua para melhorar as estatísticas. Por que você escolheu 256 como o número de buckets? Eu tentei 128 e 512, mas 256 funcionaram melhor.
Scott
Por que escolhi 256 como o número de baldes? Eu tentei 128 e 512, mas 256 funcionaram melhor. :) Encontrei empiricamente e não sei por que aumentar o número de buckets diminui o algoritmo - a alocação de memória não deve demorar tanto. Talvez algo relacionado ao tamanho do cache?
k21
2.725s na minha máquina. Muito bom para uma solução java, levando em consideração o tempo de carregamento da JVM.
static_rtti
2
Troquei seu código para usar os pacotes nio, de acordo com a minha solução e a de Arjan (usava a sintaxe dele, pois era mais limpa que a minha) e consegui obtê-lo 0,3 segundos mais rápido. Eu tenho um ssd, gostaria de saber quais seriam as implicações, se não. Ele também se livra de alguns dos seus truques. As seções modificadas estão aqui.
Scott
3
Esta é a solução paralela mais rápida dos meus testes (16core cpu). 1.22s longe do 1.94s segundo lugar.
Alexandru
13

Sem ser esperto, apenas para fornecer um classificador ingênuo muito mais rápido, aqui está um em C que deve ser praticamente equivalente ao seu Python:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int cmp(const void* av, const void* bv) {
    double a = *(const double*)av;
    double b = *(const double*)bv;
    return a < b ? -1 : a > b ? 1 : 0;
}
int main(int argc, char** argv) {
    if (argc <= 1)
        return puts("No argument!");
    unsigned count = atoi(argv[1]);

    double *a = malloc(count * sizeof *a);

    FILE *f = fopen("gaussian.dat", "rb");
    if (fread(a, sizeof *a, count, f) != count)
        return puts("fread failed!");
    fclose(f);

    puts("sorting...");
    double *b = malloc(count * sizeof *b);
    memcpy(b, a, count * sizeof *b);
    qsort(b, count, sizeof *b, cmp);
    return 0;
}

Compilado com gcc -O3, na minha máquina, isso leva mais de um minuto a menos que o Python: cerca de 11 s comparado a 87 s.


fonte
11
Tomou 10.086s na minha máquina, o que faz de você o líder atual! Mas eu tenho certeza que podemos fazer melhor :)
11
Você poderia tentar remover o segundo operador ternário e simplesmente retornar 1 para esse caso, porque as duplas aleatórias não são iguais entre si nessa quantidade de dados.
Codism
@ Codism: eu acrescentaria que não nos importamos em trocar locais de dados equivalentes, portanto, mesmo que pudéssemos obter valores equivalentes, seria uma simplificação apropriada.
10

Particionei em segmentos com base no desvio padrão que melhor deveria ser dividido em 4º. Editar: reescrito para particionar com base no valor x em http://en.wikipedia.org/wiki/Error_function#Table_of_values

http://www.wolframalpha.com/input/?i=percentages+by++normal+distribution

Tentei usar baldes menores, mas parecia ter pouco efeito uma vez 2 * além do número de núcleos disponíveis. Sem coleções paralelas, levaria 37 segundos na minha caixa e 24 nas coleções paralelas. Se particionar via distribuição, você não pode simplesmente usar uma matriz, para que haja mais sobrecarga. Não sei ao certo quando um valor seria colocado na caixa / fora da caixa no scala.

Estou usando o scala 2.9, para a coleção paralela. Você pode simplesmente fazer o download da distribuição tar.gz dele.

Para compilar: scalac SortFile.scala (acabei de copiá-lo diretamente na pasta scala / bin.

Para executar: JAVA_OPTS = "- Xmx4096M" ./scala SortFile (executei-o com 2 GB de RAM e obtive o mesmo tempo)

Editar: Removido assignateDirect, mais lento que alocar. Removido o priming do tamanho inicial para buffers de matriz. Na verdade, fez ler todos os valores 50000000. Reescreva para evitar problemas de autoboxing (ainda mais lento que o ingênuo c)

import java.io.FileInputStream;
import java.nio.ByteBuffer
import java.nio.ByteOrder
import scala.collection.mutable.ArrayBuilder


object SortFile {

//used partition numbers from Damascus' solution
val partList = List(0, 0.15731, 0.31864, 0.48878, 0.67449, 0.88715, 1.1503, 1.5341)

val listSize = partList.size * 2;
val posZero = partList.size;
val neg = partList.map( _ * -1).reverse.zipWithIndex
val pos = partList.map( _ * 1).zipWithIndex.reverse

def partition(dbl:Double): Int = { 

//for each partition, i am running through the vals in order
//could make this a binary search to be more performant... but our list size is 4 (per side)

  if(dbl < 0) { return neg.find( dbl < _._1).get._2  }
  if(dbl > 0) { return posZero  + pos.find( dbl > _._1).get._2  }
      return posZero; 

}

  def main(args: Array[String])
    { 

    var l = 0
    val dbls = new Array[Double](50000000)
    val partList = new Array[Int](50000000)
    val pa = Array.fill(listSize){Array.newBuilder[Double]}
    val channel = new FileInputStream("../../gaussian.dat").getChannel()
    val bb = ByteBuffer.allocate(50000000 * 8)
    bb.order(ByteOrder.LITTLE_ENDIAN)
    channel.read(bb)
    bb.rewind
    println("Loaded" + System.currentTimeMillis())
    var dbl = 0.0
    while(bb.hasRemaining)
    { 
      dbl = bb.getDouble
      dbls.update(l,dbl) 

      l+=1
    }
    println("Beyond first load" + System.currentTimeMillis());

    for( i <- (0 to 49999999).par) { partList.update(i, partition(dbls(i)))}

    println("Partition computed" + System.currentTimeMillis() )
    for(i <- (0 to 49999999)) { pa(partList(i)) += dbls(i) }
    println("Partition completed " + System.currentTimeMillis())
    val toSort = for( i <- pa) yield i.result()
    println("Arrays Built" + System.currentTimeMillis());
    toSort.par.foreach{i:Array[Double] =>scala.util.Sorting.quickSort(i)};

    println("Read\t" + System.currentTimeMillis());

  }
}
Scott
fonte
11
8.185s! Bom para uma solução scala, eu acho ... Além disso, bravo por fornecer a primeira solução que realmente usa a distribuição gaussiana de alguma maneira!
11
Eu estava apenas com o objetivo de competir com a solução c #. Não imaginava que eu venceria c / c ++. Além disso ... está se comportando de maneira muito diferente para você e para mim. Estou usando o openJDK do meu lado e é muito mais lento. Gostaria de saber se adicionar mais partições ajudaria em seu ambiente.
Scott
9

Basta colocar isso em um arquivo cs e compilá-lo com o csc na teoria: (Requer mono)

using System;
using System.IO;
using System.Threading;

namespace Sort
{
    class Program
    {
        const int count = 50000000;
        static double[][] doubles;
        static WaitHandle[] waiting = new WaitHandle[4];
        static AutoResetEvent[] events = new AutoResetEvent[4];

        static double[] Merge(double[] left, double[] right)
        {
            double[] result = new double[left.Length + right.Length];
            int l = 0, r = 0, spot = 0;
            while (l < left.Length && r < right.Length)
            {
                if (right[r] < left[l])
                    result[spot++] = right[r++];
                else
                    result[spot++] = left[l++];
            }
            while (l < left.Length) result[spot++] = left[l++];
            while (r < right.Length) result[spot++] = right[r++];
            return result;
        }

        static void ThreadStart(object data)
        {
            int index = (int)data;
            Array.Sort(doubles[index]);
            events[index].Set();
        }

        static void Main(string[] args)
        {
            System.Diagnostics.Stopwatch watch = new System.Diagnostics.Stopwatch();
            watch.Start();
            byte[] bytes = File.ReadAllBytes(@"..\..\..\SortGuassian\Data.dat");
            doubles = new double[][] { new double[count / 4], new double[count / 4], new double[count / 4], new double[count / 4] };
            for (int i = 0; i < 4; i++)
            {
                for (int j = 0; j < count / 4; j++)
                {
                    doubles[i][j] = BitConverter.ToDouble(bytes, i * count/4 + j * 8);
                }
            }
            Thread[] threads = new Thread[4];
            for (int i = 0; i < 4; i++)
            {
                threads[i] = new Thread(ThreadStart);
                waiting[i] = events[i] = new AutoResetEvent(false);
                threads[i].Start(i);
            }
            WaitHandle.WaitAll(waiting);
            double[] left = Merge(doubles[0], doubles[1]);
            double[] right = Merge(doubles[2], doubles[3]);
            double[] result = Merge(left, right);
            watch.Stop();
            Console.WriteLine(watch.Elapsed.ToString());
            Console.ReadKey();
        }
    }
}
Guvante
fonte
Posso executar suas soluções com o Mono? Como devo fazer isso?
Não usei o Mono, não pense nisso, você deve poder compilar o F # e depois executá-lo.
11
Atualizado para usar quatro threads para melhorar o desempenho. Agora me dá 6 segundos. Observe que isso pode ser significativamente melhorado (provavelmente em 5 segundos) se você usar apenas uma matriz sobressalente e evitar inicializar uma tonelada de memória para zero, o que é feito pelo CLR, pois tudo está sendo gravado pelo menos uma vez.
11
9.598s na minha máquina! Você é o atual líder :)
11
Minha mãe me disse para ficar longe dos caras com Mono!
8

Como você sabe qual é a distribuição, é possível usar uma classificação O (N) de indexação direta. (Se você está se perguntando o que é isso, suponha que você tenha um baralho de 52 cartas e queira classificá-lo. Basta ter 52 posições e jogar cada carta em sua própria posição.)

Você tem 5e7 duplos. Aloque uma matriz de resultados R de 5e7 duplos. Pegue cada número xe pegue i = phi(x) * 5e7. Basicamente faça R[i] = x. Tenha uma maneira de lidar com colisões, como mover o número com o qual ela pode estar colidindo (como na codificação de hash simples). Como alternativa, você pode tornar R um pouco maior, preenchido com um valor vazio exclusivo . No final, você apenas varre os elementos de R.

phié apenas a função de distribuição cumulativa gaussiana. Ele converte um número distribuído gaussiano entre +/- infinito em um número distribuído uniforme entre 0 e 1. Uma maneira simples de calculá-lo é com consulta e interpolação de tabela.


fonte
3
Cuidado: você conhece a distribuição aproximada, não a distribuição exata. Você sabe que os dados foram gerados usando uma lei gaussiana, mas, como são finitos, não seguem exatamente uma gaussiana.
@static_rtti: neste caso, a aproximação necessária de phi criaria um aborrecimento maior do que quaisquer irregularidades no conjunto de dados IMO.
11
@static_rtti: não precisa ser exato. Ele só precisa espalhar os dados para que sejam aproximadamente uniformes, para que não se amontoem em alguns lugares.
Suponha que você tenha 5e7 duplos. Por que não fazer de cada entrada em R um vetor de, digamos, 5e6 vetores de double. Em seguida, empurre_back cada duplo em seu vetor apropriado. Classifique os vetores e pronto. Isso deve levar tempo linear no tamanho da entrada.
Neil G
Na verdade, vejo que o mdkess já veio com essa solução.
Neil G
8

Aqui está outra solução seqüencial:

#include <stdio.h>
#include <stdlib.h>
#include <algorithm>
#include <ctime>

typedef unsigned long long ull;

int size;
double *dbuf, *copy;
int cnt[8][1 << 16];

void sort()
{
  const int step = 10;
  const int start = 24;
  ull mask = (1ULL << step) - 1;

  ull *ibuf = (ull *) dbuf;
  for (int i = 0; i < size; i++) {
    for (int w = start, v = 0; w < 64; w += step, v++) {
      int p = (~ibuf[i] >> w) & mask;
      cnt[v][p]++;
    }
  }

  int sum[8] = { 0 };
  for (int i = 0; i <= mask; i++) {
    for (int w = start, v = 0; w < 64; w += step, v++) {
      int tmp = sum[v] + cnt[v][i];
      cnt[v][i] = sum[v];
      sum[v] = tmp;
    }
  }

  for (int w = start, v = 0; w < 64; w += step, v++) {
    ull *ibuf = (ull *) dbuf;
    for (int i = 0; i < size; i++) {
      int p = (~ibuf[i] >> w) & mask;
      copy[cnt[v][p]++] = dbuf[i];
    }

    double *tmp = copy;
    copy = dbuf;
    dbuf = tmp;
  }

  for (int p = 0; p < size; p++)
    if (dbuf[p] >= 0.) {
      std::reverse(dbuf + p, dbuf + size);
      break;
    }

  // Insertion sort
  for (int i = 1; i < size; i++) {
    double value = dbuf[i];
    if (value < dbuf[i - 1]) {
      dbuf[i] = dbuf[i - 1];
      int p = i - 1;
      for (; p > 0 && value < dbuf[p - 1]; p--)
        dbuf[p] = dbuf[p - 1];
      dbuf[p] = value;
    }
  }
}

int main(int argc, char **argv) {
  size = atoi(argv[1]);
  dbuf = new double[size];
  copy = new double[size];

  FILE *f = fopen("gaussian.dat", "r");
  fread(dbuf, size, sizeof(double), f);
  fclose(f);

  clock_t c0 = clock();
  sort();
  printf("Finished after %.3f\n", (double) ((clock() - c0)) / CLOCKS_PER_SEC);
  return 0;
}

Duvido que seja melhor que a solução multiencadeada, mas os tempos no meu laptop i7 são (stdsort é a solução C ++ fornecida em outra resposta):

$ g++ -O3 mysort.cpp -o mysort && ./mysort 50000000
Finished after 2.10
$ g++ -O3 stdsort.cpp -o stdsort && ./stdsort
Finished after 7.12

Observe que esta solução possui complexidade de tempo linear (porque usa a representação especial de duplas).

EDIT : Corrigida a ordem dos elementos a serem aumentados.

EDIT : Melhor velocidade em quase meio segundo.

EDIT : Melhor velocidade por mais 0,7 segundos. Tornou o algoritmo mais amigável ao cache.

EDIT : Melhor velocidade por mais 1 segundo. Como existem apenas 50.000.000 de elementos, posso classificar parcialmente a mantissa e usar a classificação por inserção (que é compatível com o cache) para corrigir elementos fora do local. Essa idéia remove cerca de duas iterações do último loop de classificação de raiz.

EDIT : 0,16 menos segundos. O primeiro std :: reverse pode ser eliminado se a ordem de classificação for revertida.

Alexandru
fonte
Agora isso está ficando interessante! Que tipo de algoritmo de classificação é esse?
static_rtti
2
Classificação de raiz de dígito menos significativo . Você pode classificar a mantissa, o expoente e o sinal. O algoritmo apresentado aqui leva essa idéia um passo adiante. Pode ser paralelizado usando uma ideia de particionamento fornecida em uma resposta diferente.
Alexandru
Muito rápido para uma única solução de rosca: 2.552s! Você acha que poderia mudar sua solução para usar o fato de que os dados são normalmente distribuídos? Você provavelmente poderia fazer melhor do que as melhores soluções multiencadeadas atuais.
static_rtti
11
@static_rtti: Vejo que o Damascus Steel já postou uma versão multithread desta implementação. Aprimorei o comportamento de armazenamento em cache desse algoritmo, então você deve obter melhores tempos agora. Por favor, teste esta nova versão.
Alexandru
2
1.459s nos meus últimos testes. Embora essa solução não seja a vencedora de acordo com minhas regras, ela realmente merece muitos elogios. Parabéns!
static_rtti
6

Tomando a solução de Christian Ammer e paralelizando-a com os blocos de construção rosqueados da Intel

#include <iostream>
#include <fstream>
#include <algorithm>
#include <vector>
#include <ctime>
#include <tbb/parallel_sort.h>

int main(void)
{
    std::ifstream ifs("gaussian.dat", std::ios::binary | std::ios::in);
    std::vector<double> values;
    values.reserve(50000000);
    double d;
    while (ifs.read(reinterpret_cast<char*>(&d), sizeof(double)))
    values.push_back(d);
    clock_t c0 = clock();
    tbb::parallel_sort(values.begin(), values.end());
    std::cout << "Finished after "
              << static_cast<double>((clock() - c0)) / CLOCKS_PER_SEC
              << std::endl;
}

Se você tiver acesso à biblioteca IPP (Performance Primitives) da Intel, poderá usar sua classificação radix. Apenas substitua

#include <tbb/parallel_sort.h>

com

#include "ipps.h"

e

tbb::parallel_sort(values.begin(), values.end());

com

std::vector<double> copy(values.size());
ippsSortRadixAscend_64f_I(&values[0], &copy[0], values.size());

No meu laptop dual core, os tempos são

C               16.4 s
C#              20 s
C++ std::sort   7.2 s
C++ tbb         5 s
C++ ipp         4.5 s
python          too long
Aço de Damasco
fonte
11
2.958s! O TBB parece bem legal e fácil de usar!
2
TBB é absurdamente incrível. É exatamente o nível certo de abstração para o trabalho algorítmico.
drxzcl
5

Que tal uma implementação do quicksort paralelo que escolha seus valores de pivô com base nas estatísticas da distribuição, garantindo assim partições de tamanhos iguais? O primeiro pivô estaria na média (zero neste caso), o próximo par estaria nos percentis 25 e 75 (desvios padrão de +/- -0,67449) e assim por diante, com cada partição cortando pela metade o conjunto de dados restante mais ou menos menos perfeitamente.

jardineiro
fonte
Isso foi efetivamente o que eu fiz no meu ... é claro que você postou este post antes que eu pudesse terminar minha redação.
5

Muito feio (por que usar matrizes quando posso usar variáveis ​​que terminam com números), mas código rápido (minha primeira tentativa de std :: threads), tempo inteiro (tempo real) no meu sistema 1,8 s (comparando com std :: sort () 4,8 s), compile com g ++ -std = c ++ 0x -O3 -march = native -pthread Apenas passe os dados pelo stdin (funciona apenas para 50M).

#include <cstdio>
#include <iostream>
#include <algorithm>
#include <thread>
using namespace std;
const size_t size=50000000;

void pivot(double* start,double * end, double middle,size_t& koniec){
    double * beg=start;
    end--;
    while (start!=end){
        if (*start>middle) swap (*start,*end--);
        else start++;
    }
    if (*end<middle) start+=1;
    koniec= start-beg;
}
void s(double * a, double* b){
    sort(a,b);
}
int main(){
    double *data=new double[size];
    FILE *f = fopen("gaussian.dat", "rb");
    fread(data,8,size,f);
    size_t end1,end2,end3,temp;
    pivot(data, data+size,0,end2);
    pivot(data, data+end2,-0.6745,end1);
    pivot(data+end2,data+size,0.6745,end3);
    end3+=end2;
    thread ts1(s,data,data+end1);
    thread ts2(s,data+end1,data+end2);
    thread ts3(s,data+end2,data+end3);
    thread ts4(s,data+end3,data+size);
    ts1.join(),ts2.join(),ts3.join(),ts4.join();
    //for (int i=0; i<size-1; i++){
    //  if (data[i]>data[i+1]) cerr<<"BLAD\n";
    //}
    fclose(f);
    //fwrite(data,8,size,stdout);
}

// Edição alterada para ler o arquivo gaussian.dat.

Zjarek
fonte
Você poderia alterá-lo para ler gaussian.dat, como fazem as soluções C ++ acima?
Vou tentar mais tarde quando chegar em casa.
static_rtti
Solução muito boa, você é o atual líder (1.949s)! E bom uso da distribuição de Gauss :)
static_rtti
4

Uma solução C ++ usando std::sort(eventualmente mais rápido que qsort, em relação ao desempenho de qsort vs std :: sort )

#include <iostream>
#include <fstream>
#include <algorithm>
#include <vector>
#include <ctime>

int main(void)
{
    std::ifstream ifs("C:\\Temp\\gaussian.dat", std::ios::binary | std::ios::in);
    std::vector<double> values;
    values.reserve(50000000);
    double d;
    while (ifs.read(reinterpret_cast<char*>(&d), sizeof(double)))
        values.push_back(d);
    clock_t c0 = clock();
    std::sort(values.begin(), values.end());
    std::cout << "Finished after "
              << static_cast<double>((clock() - c0)) / CLOCKS_PER_SEC
              << std::endl;
}

Não sei dizer quanto tempo leva, porque eu tenho apenas 1 GB na minha máquina e, com o código Python fornecido, eu só conseguia criar um gaussian.datarquivo com apenas 25 milhões de pares (sem obter um erro de memória). Mas estou muito interessado em quanto tempo o algoritmo std :: sort é executado.

Christian Ammer
fonte
6.425s! Como esperado, C ++ brilha :)
@static_rtti: Eu tentei Swensons timsort algoritmo (como sugerido a partir de Matthieu M. em sua primeira pergunta ). Eu tive que fazer algumas alterações no sort.harquivo para compilá-lo com C ++. Foi duas vezes mais lento que std::sort. Não sei por que, talvez por causa das otimizações do compilador?
Christian Ammer
4

Aqui está uma mistura do tipo de raiz de Alexandru com o giro inteligente rosqueado de Zjarek. Compile com

g++ -std=c++0x -pthread -O3 -march=native sorter_gaussian_radix.cxx -o sorter_gaussian_radix

Você pode alterar o tamanho da raiz definindo STEP (por exemplo, adicione -DSTEP = 11). Eu achei o melhor para o meu laptop é 8 (o padrão).

Por padrão, ele divide o problema em quatro partes e o executa em vários segmentos. Você pode mudar isso passando um parâmetro de profundidade para a linha de comando. Portanto, se você tiver dois núcleos, execute-o como

sorter_gaussian_radix 50000000 1

e se você tem 16 núcleos

sorter_gaussian_radix 50000000 4

A profundidade máxima agora é 6 (64 threads). Se você colocar muitos níveis, apenas abrandará o código.

Uma coisa que eu também tentei foi a classificação radix da biblioteca Intel Performance Primitives (IPP). A implementação da Alexandru supera profundamente o IPP, com o IPP sendo cerca de 30% mais lento. Essa variação também está incluída aqui (comentada).

#include <stdlib.h>
#include <string.h>
#include <algorithm>
#include <ctime>
#include <iostream>
#include <thread>
#include <vector>
#include <boost/cstdint.hpp>
// #include "ipps.h"

#ifndef STEP
#define STEP 8
#endif

const int step = STEP;
const int start_step=24;
const int num_steps=(64-start_step+step-1)/step;
int size;
double *dbuf, *copy;

clock_t c1, c2, c3, c4, c5;

const double distrib[]={-2.15387,
                        -1.86273,
                        -1.67594,
                        -1.53412,
                        -1.4178,
                        -1.31801,
                        -1.22986,
                        -1.15035,
                        -1.07752,
                        -1.00999,
                        -0.946782,
                        -0.887147,
                        -0.830511,
                        -0.776422,
                        -0.724514,
                        -0.67449,
                        -0.626099,
                        -0.579132,
                        -0.53341,
                        -0.488776,
                        -0.445096,
                        -0.40225,
                        -0.36013,
                        -0.318639,
                        -0.27769,
                        -0.237202,
                        -0.197099,
                        -0.157311,
                        -0.11777,
                        -0.0784124,
                        -0.0391761,
                        0,
                        0.0391761,
                        0.0784124,
                        0.11777,
                        0.157311,
                        0.197099,
                        0.237202,
                        0.27769,
                        0.318639,
                        0.36013,
                        0.40225,
                        0.445097,
                        0.488776,
                        0.53341,
                        0.579132,
                        0.626099,
                        0.67449,
                        0.724514,
                        0.776422,
                        0.830511,
                        0.887147,
                        0.946782,
                        1.00999,
                        1.07752,
                        1.15035,
                        1.22986,
                        1.31801,
                        1.4178,
                        1.53412,
                        1.67594,
                        1.86273,
                        2.15387};


class Distrib
{
  const int value;
public:
  Distrib(const double &v): value(v) {}

  bool operator()(double a)
  {
    return a<value;
  }
};


void recursive_sort(const int start, const int end,
                    const int index, const int offset,
                    const int depth, const int max_depth)
{
  if(depth<max_depth)
    {
      Distrib dist(distrib[index]);
      const int middle=std::partition(dbuf+start,dbuf+end,dist) - dbuf;

      // const int middle=
      //   std::partition(dbuf+start,dbuf+end,[&](double a)
      //                  {return a<distrib[index];})
      //   - dbuf;

      std::thread lower(recursive_sort,start,middle,index-offset,offset/2,
                        depth+1,max_depth);
      std::thread upper(recursive_sort,middle,end,index+offset,offset/2,
                        depth+1,max_depth);
      lower.join(), upper.join();
    }
  else
    {
  // ippsSortRadixAscend_64f_I(dbuf+start,copy+start,end-start);

      c1=clock();

      double *dbuf_local(dbuf), *copy_local(copy);
      boost::uint64_t mask = (1 << step) - 1;
      int cnt[num_steps][mask+1];

      boost::uint64_t *ibuf = reinterpret_cast<boost::uint64_t *> (dbuf_local);

      for(int i=0;i<num_steps;++i)
        for(uint j=0;j<mask+1;++j)
          cnt[i][j]=0;

      for (int i = start; i < end; i++)
        {
          for (int w = start_step, v = 0; w < 64; w += step, v++)
            {
              int p = (~ibuf[i] >> w) & mask;
              (cnt[v][p])++;
            }
        }

      c2=clock();

      std::vector<int> sum(num_steps,0);
      for (uint i = 0; i <= mask; i++)
        {
          for (int w = start_step, v = 0; w < 64; w += step, v++)
            {
              int tmp = sum[v] + cnt[v][i];
              cnt[v][i] = sum[v];
              sum[v] = tmp;
            }
        }

      c3=clock();

      for (int w = start_step, v = 0; w < 64; w += step, v++)
        {
          ibuf = reinterpret_cast<boost::uint64_t *>(dbuf_local);

          for (int i = start; i < end; i++)
            {
              int p = (~ibuf[i] >> w) & mask;
              copy_local[start+((cnt[v][p])++)] = dbuf_local[i];
            }
          std::swap(copy_local,dbuf_local);
        }

      // Do the last set of reversals
      for (int p = start; p < end; p++)
        if (dbuf_local[p] >= 0.)
          {
            std::reverse(dbuf_local+p, dbuf_local + end);
            break;
          }

      c4=clock();

      // Insertion sort
      for (int i = start+1; i < end; i++) {
        double value = dbuf_local[i];
        if (value < dbuf_local[i - 1]) {
          dbuf_local[i] = dbuf_local[i - 1];
          int p = i - 1;
          for (; p > 0 && value < dbuf_local[p - 1]; p--)
            dbuf_local[p] = dbuf_local[p - 1];
          dbuf_local[p] = value;
        }
      }
      c5=clock();

    }
}


int main(int argc, char **argv) {
  size = atoi(argv[1]);
  copy = new double[size];

  dbuf = new double[size];
  FILE *f = fopen("gaussian.dat", "r");
  fread(dbuf, size, sizeof(double), f);
  fclose(f);

  clock_t c0 = clock();

  const int max_depth= (argc > 2) ? atoi(argv[2]) : 2;

  // ippsSortRadixAscend_64f_I(dbuf,copy,size);

  recursive_sort(0,size,31,16,0,max_depth);

  if(num_steps%2==1)
    std::swap(dbuf,copy);

  // for (int i=0; i<size-1; i++){
  //   if (dbuf[i]>dbuf[i+1])
  //     std::cout << "BAD "
  //               << i << " "
  //               << dbuf[i] << " "
  //               << dbuf[i+1] << " "
  //               << "\n";
  // }

  std::cout << "Finished after "
            << (double) (c1 - c0) / CLOCKS_PER_SEC << " "
            << (double) (c2 - c1) / CLOCKS_PER_SEC << " "
            << (double) (c3 - c2) / CLOCKS_PER_SEC << " "
            << (double) (c4 - c3) / CLOCKS_PER_SEC << " "
            << (double) (c5 - c4) / CLOCKS_PER_SEC << " "
            << "\n";

  // delete [] dbuf;
  // delete [] copy;
  return 0;
}

EDIT : Eu implementei as melhorias de cache do Alexandru, e isso diminuiu cerca de 30% do tempo na minha máquina.

EDIT : Isso implementa uma classificação recursiva, portanto deve funcionar bem na máquina de 16 núcleos do Alexandru. Ele também usa o último aprimoramento do Alexandru e remove um dos reversos. Para mim, isso deu uma melhoria de 20%.

EDIT : Corrigido um bug de sinal que causava ineficiência quando há mais de 2 núcleos.

EDIT : Removido o lambda, para que ele compile com versões mais antigas do gcc. Inclui a variação do código IPP comentada. Também corrigi a documentação para rodar em 16 núcleos. Tanto quanto posso dizer, esta é a implementação mais rápida.

EDIT : Corrigido um erro quando STEP não era 8. Aumentado o número máximo de threads para 64. Adicionadas algumas informações de tempo.

Aço de Damasco
fonte
Agradável. A classificação Radix é muito hostil ao cache. Veja se você pode obter melhores resultados alterando step(11 foi o ideal no meu laptop).
Alexandru
Você tem um bug: int cnt[mask]deveria ser int cnt[mask + 1]. Para melhores resultados, use um valor fixo int cnt[1 << 16].
Alexandru
Hoje vou tentar todas essas soluções quando chegar em casa.
static_rtti
1.534s !!! Acho que temos um :-D líder
static_rtti
@static_rtti: Você poderia tentar isso de novo? Ficou significativamente mais rápido que na última vez em que você tentou. Na minha máquina, é substancialmente mais rápido que qualquer outra solução.
Damascus Steel
2

Eu acho que isso realmente depende do que você quer fazer. Se você quiser classificar um monte de gaussianos, isso não ajudará. Mas se você quiser um monte de gaussianos classificados, isso vai acontecer. Mesmo que isso perca um pouco o problema, acho que será interessante comparar as rotinas reais de classificação.

Se você quiser que algo seja rápido, faça menos.

Em vez de gerar várias amostras aleatórias a partir da distribuição normal e, em seguida, classificá-las, é possível gerar várias amostras a partir da distribuição normal na ordem classificada.

Você pode usar a solução aqui para gerar n números aleatórios uniformes em ordem classificada. Em seguida, você pode usar o cdf inverso (scipy.stats.norm.ppf) da distribuição normal para transformar os números aleatórios uniformes em números da distribuição normal via amostragem por transformação inversa .

import scipy.stats
import random

# slightly modified from linked stackoverflow post
def n_random_numbers_increasing(n):
  """Like sorted(random() for i in range(n))),                                
  but faster because we avoid sorting."""
  v = 1.0
  while n:
    v *= random.random() ** (1.0 / n)
    yield 1 - v
    n -= 1

def n_normal_samples_increasing(n):
  return map(scipy.stats.norm.ppf, n_random_numbers_increasing(n))

Se você quiser sujar as mãos, acho que poderá acelerar os muitos cálculos inversos de cdf usando algum tipo de método iterativo e usando o resultado anterior como seu palpite inicial. Como as suposições serão muito próximas, provavelmente uma única iteração fornecerá grande precisão.

rrenaud
fonte
2
Boa resposta, mas isso seria trapaça :) A idéia da minha pergunta é que, embora os algoritmos de classificação tenham recebido uma atenção enorme, quase não há literatura sobre o uso de conhecimento prévio sobre os dados para classificação, mesmo com os poucos trabalhos que abordou a questão relataram bons ganhos. Então vamos ver o que é possível!
2

Experimente esta solução em mudança da Guvante com este Main (), ele começa a classificar assim que a leitura de 1/4 IO é concluída, é mais rápido no meu teste:

    static void Main(string[] args)
    {
        FileStream filestream = new FileStream(@"..\..\..\gaussian.dat", FileMode.Open, FileAccess.Read);
        doubles = new double[][] { new double[count / 4], new double[count / 4], new double[count / 4], new double[count / 4] };
        Thread[] threads = new Thread[4];

        for (int i = 0; i < 4; i++)
        {
            byte[] bytes = new byte[count * 4];
            filestream.Read(bytes, 0, count * 4);

            for (int j = 0; j < count / 4; j++)
            {
                doubles[i][j] = BitConverter.ToDouble(bytes, i * count/4 + j * 8);
            }

            threads[i] = new Thread(ThreadStart);
            waiting[i] = events[i] = new AutoResetEvent(false);
            threads[i].Start(i);    
        }

        WaitHandle.WaitAll(waiting);
        double[] left = Merge(doubles[0], doubles[1]);
        double[] right = Merge(doubles[2], doubles[3]);
        double[] result = Merge(left, right);
        Console.ReadKey();
    }
}

fonte
8.933s. Um pouco mais rápido :)
2

Como você conhece a distribuição, minha idéia seria fazer k buckets, cada um com o mesmo número esperado de elementos (já que você conhece a distribuição, é possível calcular isso). Então, em O (n) tempo, varra a matriz e coloque elementos em seus baldes.

Em seguida, classifique os baldes simultaneamente. Suponha que você tenha k buckets e n elementos. Um balde levará (n / k) lg (n / k) tempo para classificar. Agora, suponha que você tenha processadores p que você pode usar. Como as caçambas podem ser classificadas independentemente, você tem um multiplicador de teto (k / p) para lidar. Isso fornece um tempo de execução final de n + ceil (k / p) * (n / k) lg (n / k), que deve ser muito mais rápido que n lg n se você escolher k bem.

mdkess
fonte
Eu acho que essa é a melhor solução.
Neil G
Você não sabe exatamente o número de elementos que acabarão em um balde; portanto, a matemática está realmente errada. Dito isto, acho que é uma boa resposta.
poulejapon
@pouejapon: Você está certo.
9111 Neil G
Essa resposta parece muito legal. O problema é - não é realmente rápido. Eu implementei isso no C99 (veja minha resposta), e certamente bate facilmente std::sort(), mas é bem mais lento que a solução radixsort da Alexandru.
Sven Marnach
2

Uma idéia de otimização de baixo nível é ajustar duas duplas em um registro SSE, para que cada thread funcione com dois itens por vez. Isso pode ser complicado para alguns algoritmos.

Outra coisa a fazer é classificar a matriz em pedaços compatíveis com o cache e depois mesclar os resultados. Dois níveis devem ser usados: por exemplo, primeiro 4 KB para L1 e 64 KB para L2.

Isso deve ser muito compatível com o cache, pois a classificação do bucket não sai do cache e a mesclagem final percorre a memória sequencialmente.

Atualmente, o cálculo é muito mais barato que o acesso à memória. No entanto, temos um grande número de itens, por isso é difícil dizer qual é o tamanho da matriz quando a classificação com reconhecimento de cache estúpida é mais lenta que uma versão sem reconhecimento de cache de baixa complexidade.

Mas não fornecerei uma implementação do acima, pois o faria no Windows (VC ++).


fonte
2

Aqui está uma implementação de classificação de balde de verificação linear. Eu acho que é mais rápido do que todas as implementações atuais de thread único, exceto a classificação radix. Deveria ter um tempo de execução linear esperado, se eu estiver estimando o CD com precisão suficiente (estou usando a interpolação linear dos valores que encontrei na Web) e não cometer nenhum erro que possa causar uma verificação excessiva:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <algorithm>
#include <ctime>

using std::fill;

const double q[] = {
  0.0,
  9.865E-10,
  2.8665150000000003E-7,
  3.167E-5,
  0.001349898,
  0.022750132,
  0.158655254,
  0.5,
  0.8413447460000001,
  0.9772498679999999,
  0.998650102,
  0.99996833,
  0.9999997133485,
  0.9999999990134999,
  1.0,
};
int main(int argc, char** argv) {
  if (argc <= 1)
    return puts("No argument!");
  unsigned count = atoi(argv[1]);
  unsigned count2 = 3 * count;

  bool *ba = new bool[count2 + 1000];
  fill(ba, ba + count2 + 1000, false);
  double *a = new double[count];
  double *c = new double[count2 + 1000];

  FILE *f = fopen("gaussian.dat", "rb");
  if (fread(a, 8, count, f) != count)
    return puts("fread failed!");
  fclose(f);

  int i;
  int j;
  bool s;
  int t;
  double z;
  double p;
  double d1;
  double d2;
  for (i = 0; i < count; i++) {
    s = a[i] < 0;
    t = a[i];
    if (s) t--;
    z = a[i] - t;
    t += 7;
    if (t < 0) {
      t = 0;
      z = 0;
    } else if (t >= 14) {
      t = 13;
      z = 1;
    }
    p = q[t] * (1 - z) + q[t + 1] * z;
    j = count2 * p;
    while (ba[j] && c[j] < a[i]) {
      j++;
    }
    if (!ba[j]) {
      ba[j] = true;
      c[j] = a[i];
    } else {
      d1 = c[j];
      c[j] = a[i];
      j++;
      while (ba[j]) {
        d2 = c[j];
        c[j] = d1;
        d1 = d2;
        j++;
      }
      c[j] = d1;
      ba[j] = true;
    }
  }
  i = 0;
  int max = count2 + 1000;
  for (j = 0; j < max; j++) {
    if (ba[j]) {
      a[i++] = c[j];
    }
  }
  // for (i = 0; i < count; i += 1) {
  //   printf("here %f\n", a[i]);
  // }
  return 0;
}
jonderry
fonte
11
Vou tentar isso hoje mais tarde, quando chegar em casa. Enquanto isso, posso dizer que seu código é muito feio? :-D
static_rtti
3.071s! Nada mal para uma solução de thread único!
static_rtti
2

Eu não sei, por que não consigo editar minha postagem anterior, então aqui está a nova versão, 0,2 segundos mais rápida (mas cerca de 1,5 s mais rápida no tempo da CPU (usuário)). Essa solução possui 2 programas, primeiro pré-calcula quantis para distribuição normal para classificação de buckets e armazena-os na tabela t [double * scale] = índice de buckets, em que scale é um número arbitrário que torna possível a conversão para o dobro. Em seguida, o programa principal pode usar esses dados para colocar duplas no balde correto. Ele tem uma desvantagem: se os dados não forem gaussianos, eles não funcionarão corretamente (e também há quase zero chance de funcionar incorretamente para a distribuição normal), mas a modificação para casos especiais é fácil e rápida (apenas o número de verificações de baldes e a queda para std ::ordenar()).

Compilando: g ++ => http://pastebin.com/WG7pZEzH programa auxiliar

g ++ -std = c ++ 0x -O3 -march = native -pthread => http://pastebin.com/T3yzViZP principal programa de classificação

Zjarek
fonte
1.621s! Eu acho que você é o líder, mas estou perdendo rapidamente o controle com todas essas respostas :) #
static_rtti
2

Aqui está outra solução seqüencial. Este usa o fato de que os elementos são distribuídos normalmente, e acho que a idéia é geralmente aplicável para obter uma classificação próxima ao tempo linear.

O algoritmo é assim:

  • CDF aproximado (consulte a phi()função na implementação)
  • Para todos os elementos, calcule a posição aproximada na matriz classificada: size * phi(x)
  • Coloque elementos em uma nova matriz perto de sua posição final
    • Na minha matriz de destino de implementação, existem algumas lacunas para que eu não precise mudar muitos elementos ao inserir.
  • Use inserttsort para classificar os elementos finais (inserttsort é linear se a distância da posição final for menor que uma constante).

Infelizmente, a constante oculta é muito grande e esta solução é duas vezes mais lenta que o algoritmo de classificação de raiz.

Alexandru
fonte
11
2.470s! Idéias muito legais. Não importa que a solução não seja a mais rápida se as idéias forem interessantes :) #
static_rtti
11
É o mesmo que o meu, mas agrupando os cálculos phi e as mudanças para obter melhor desempenho do cache, certo?
jonderry
@onderry: votei na sua solução, agora que entendo o que ela faz. Não quis roubar sua ideia. Eu incluí a sua implementação no meu (não oficial) conjunto de testes
Alexandru
2

Meu favorito pessoal usando os Threaded Building Blocks da Intel já foi publicado, mas aqui está uma solução paralela grosseira usando o JDK 7 e sua nova API de junção / junção:

import java.io.FileInputStream;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.concurrent.*;
import static java.nio.channels.FileChannel.MapMode.READ_ONLY;
import static java.nio.ByteOrder.LITTLE_ENDIAN;


/**
 * 
 * Original Quicksort: https://github.com/pmbauer/parallel/tree/master/src/main/java/pmbauer/parallel
 *
 */
public class ForkJoinQuicksortTask extends RecursiveAction {

    public static void main(String[] args) throws Exception {

        double[] array = new double[Integer.valueOf(args[0])];

        FileChannel fileChannel = new FileInputStream("gaussian.dat").getChannel();
        fileChannel.map(READ_ONLY, 0, fileChannel.size()).order(LITTLE_ENDIAN).asDoubleBuffer().get(array);

        ForkJoinPool mainPool = new ForkJoinPool();

        System.out.println("Starting parallel computation");

        mainPool.invoke(new ForkJoinQuicksortTask(array));        
    }

    private static final long serialVersionUID = -642903763239072866L;
    private static final int SERIAL_THRESHOLD = 0x1000;

    private final double a[];
    private final int left, right;

    public ForkJoinQuicksortTask(double[] a) {this(a, 0, a.length - 1);}

    private ForkJoinQuicksortTask(double[] a, int left, int right) {
        this.a = a;
        this.left = left;
        this.right = right;
    }

    @Override
    protected void compute() {
        if (right - left < SERIAL_THRESHOLD) {
            Arrays.sort(a, left, right + 1);
        } else {
            int pivotIndex = partition(a, left, right);
            ForkJoinTask<Void> t1 = null;

            if (left < pivotIndex)
                t1 = new ForkJoinQuicksortTask(a, left, pivotIndex).fork();
            if (pivotIndex + 1 < right)
                new ForkJoinQuicksortTask(a, pivotIndex + 1, right).invoke();

            if (t1 != null)
                t1.join();
        }
    }

    public static int partition(double[] a, int left, int right) {
        // chose middle value of range for our pivot
        double pivotValue = a[left + (right - left) / 2];

        --left;
        ++right;

        while (true) {
            do
                ++left;
            while (a[left] < pivotValue);

            do
                --right;
            while (a[right] > pivotValue);

            if (left < right) {
                double tmp = a[left];
                a[left] = a[right];
                a[right] = tmp;
            } else {
                return right;
            }
        }
    }    
}

Isenção de responsabilidade importante : Aceitei a adaptação de classificação rápida para fork / join em: https://github.com/pmbauer/parallel/tree/master/src/main/java/pmbauer/parallel

Para executar isso, você precisa de uma versão beta do JDK 7 (http://jdk7.java.net/download.html).

No meu quad core 2.97Ghz i7 (OS X):

Referência do Python

time python sort.py 50000000
sorting...

real    1m13.885s
user    1m11.942s
sys     0m1.935s

Java JDK 7 bifurcação / junção

time java ForkJoinQuicksortTask 50000000
Starting parallel computation

real    0m2.404s
user    0m10.195s
sys     0m0.347s

Também tentei fazer algumas experiências com leitura paralela e converter os bytes em duplos, mas não vi nenhuma diferença lá.

Atualizar:

Se alguém quiser experimentar o carregamento paralelo dos dados, a versão do carregamento paralelo está abaixo. Em teoria, isso poderia torná-lo um pouco mais rápido ainda, se o seu dispositivo IO tiver capacidade paralela suficiente (os SSDs costumam ter). Também há alguma sobrecarga na criação de Doubles a partir de bytes, de modo que também poderia ser mais rápido em paralelo. Nos meus sistemas (Ubuntu 10.10 / Nehalem Quad / Intel X25M SSD e OS X 10.6 / i7 Quad / Samsung SSD), não vi nenhuma diferença real.

import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static java.nio.channels.FileChannel.MapMode.READ_ONLY;

import java.io.FileInputStream;
import java.nio.DoubleBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;


/**
 *
 * Original Quicksort: https://github.com/pmbauer/parallel/tree/master/src/main/java/pmbauer/parallel
 *
 */
public class ForkJoinQuicksortTask extends RecursiveAction {

   public static void main(String[] args) throws Exception {

       ForkJoinPool mainPool = new ForkJoinPool();

       double[] array = new double[Integer.valueOf(args[0])];
       FileChannel fileChannel = new FileInputStream("gaussian.dat").getChannel();
       DoubleBuffer buffer = fileChannel.map(READ_ONLY, 0, fileChannel.size()).order(LITTLE_ENDIAN).asDoubleBuffer();

       mainPool.invoke(new ReadAction(buffer, array, 0, array.length));
       mainPool.invoke(new ForkJoinQuicksortTask(array));
   }

   private static final long serialVersionUID = -642903763239072866L;
   private static final int SERIAL_THRESHOLD = 0x1000;

   private final double a[];
   private final int left, right;

   public ForkJoinQuicksortTask(double[] a) {this(a, 0, a.length - 1);}

   private ForkJoinQuicksortTask(double[] a, int left, int right) {
       this.a = a;
       this.left = left;
       this.right = right;
   }

   @Override
   protected void compute() {
       if (right - left < SERIAL_THRESHOLD) {
           Arrays.sort(a, left, right + 1);
       } else {
           int pivotIndex = partition(a, left, right);
           ForkJoinTask<Void> t1 = null;

           if (left < pivotIndex)
               t1 = new ForkJoinQuicksortTask(a, left, pivotIndex).fork();
           if (pivotIndex + 1 < right)
               new ForkJoinQuicksortTask(a, pivotIndex + 1, right).invoke();

           if (t1 != null)
               t1.join();
       }
   }

   public static int partition(double[] a, int left, int right) {
       // chose middle value of range for our pivot
       double pivotValue = a[left + (right - left) / 2];

       --left;
       ++right;

       while (true) {
           do
               ++left;
           while (a[left] < pivotValue);

           do
               --right;
           while (a[right] > pivotValue);

           if (left < right) {
               double tmp = a[left];
               a[left] = a[right];
               a[right] = tmp;
           } else {
               return right;
           }
       }
   }

}

class ReadAction extends RecursiveAction {

   private static final long serialVersionUID = -3498527500076085483L;

   private final DoubleBuffer buffer;
   private final double[] array;
   private final int low, high;

   public ReadAction(DoubleBuffer buffer, double[] array, int low, int high) {
       this.buffer = buffer;
       this.array = array;
       this.low = low;
       this.high = high;
   }

   @Override
   protected void compute() {
       if (high - low < 100000) {
           buffer.position(low);
           buffer.get(array, low, high-low);
       } else {
           int middle = (low + high) >>> 1;

           invokeAll(new ReadAction(buffer.slice(), array, low, middle),  new ReadAction(buffer.slice(), array, middle, high));
       }
   }
}

Update2:

Eu executei o código em uma das nossas 12 máquinas de desenvolvimento com uma pequena modificação para definir uma quantidade fixa de núcleos. Isso deu os seguintes resultados:

Cores  Time
1      7.568s
2      3.903s
3      3.325s
4      2.388s
5      2.227s
6      1.956s
7      1.856s
8      1.827s
9      1.682s
10     1.698s
11     1.620s
12     1.503s

Nesse sistema, também tentei a versão Python, que possuía 1m2.994s, e a versão C ++ de Zjarek, que levou 1.925s (por alguma razão, a versão C ++ de Zjarek parece correr relativamente mais rápido no computador do static_rtti).

Eu também tentei o que aconteceu se dobrar o tamanho do arquivo para 100.000.000 duplos:

Cores  Time
1      15.056s
2      8.116s
3      5.925s
4      4.802s
5      4.430s
6      3.733s
7      3.540s
8      3.228s
9      3.103s
10     2.827s
11     2.784s
12     2.689s

Nesse caso, a versão C ++ de Zjarek levou 3.968s. O Python demorou muito tempo aqui.

150.000.000 duplos:

Cores  Time
1      23.295s
2      12.391s
3      8.944s
4      6.990s
5      6.216s
6      6.211s
7      5.446s
8      5.155s
9      4.840s
10     4.435s
11     4.248s
12     4.174s

Nesse caso, a versão C ++ de Zjarek era 6.044s. Eu nem tentei Python.

A versão C ++ é muito consistente com seus resultados, onde o Java oscila um pouco. Primeiro, fica um pouco mais eficiente quando o problema aumenta, mas depois é menos eficiente novamente.

arjan
fonte
11
Este código não analisa os valores duplos corretamente para mim. O Java 7 é necessário para analisar corretamente os valores do arquivo?
jonderry
11
Ah, bobo eu. Esqueci de definir o endianness novamente depois de refatorar localmente o código IO de várias linhas para uma. O Java 7 normalmente seria necessário, a menos que você incluísse fork / junção separadamente no Java 6, é claro.
arjan
3.411s na minha máquina. Não é ruim, mas mais lento do que de koumes21 solução java :)
static_rtti
11
Vou tentar a solução do koumes21 aqui também localmente para ver quais são as diferenças relativas no meu sistema. Enfim, não há vergonha em 'perder' o koumes21, pois é uma solução muito mais inteligente. Este é apenas um-tipo rápida quase padrão jogado em um fork / join piscina;)
Arjan
1

Uma versão usando pthreads tradicionais. Código de fusão copiado da resposta de Guvante. Compile com g++ -O3 -pthread.

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <algorithm>

static unsigned int nthreads = 4;
static unsigned int size = 50000000;

typedef struct {
  double *array;
  int size;
} array_t;


void 
merge(double *left, int leftsize,
      double *right, int rightsize,
      double *result)
{
  int l = 0, r = 0, insertat = 0;
  while (l < leftsize && r < rightsize) {
    if (left[l] < right[r])
      result[insertat++] = left[l++];
    else
      result[insertat++] = right[r++];
  }

  while (l < leftsize) result[insertat++] = left[l++];
  while (r < rightsize) result[insertat++] = right[r++];
}


void *
run_thread(void *input)
{
  array_t numbers = *(array_t *)input;
  std::sort(numbers.array, numbers.array+numbers.size); 
  pthread_exit(NULL);
}

int 
main(int argc, char **argv) 
{
  double *numbers = (double *) malloc(size * sizeof(double));

  FILE *f = fopen("gaussian.dat", "rb");
  if (fread(numbers, sizeof(double), size, f) != size)
    return printf("Reading gaussian.dat failed");
  fclose(f);

  array_t worksets[nthreads];
  int worksetsize = size / nthreads;
  for (int i = 0; i < nthreads; i++) {
    worksets[i].array=numbers+(i*worksetsize);
    worksets[i].size=worksetsize;
  }

  pthread_attr_t attributes;
  pthread_attr_init(&attributes);
  pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_JOINABLE);

  pthread_t threads[nthreads];
  for (int i = 0; i < nthreads; i++) {
    pthread_create(&threads[i], &attributes, &run_thread, &worksets[i]);
  }

  for (int i = 0; i < nthreads; i++) {
    pthread_join(threads[i], NULL);
  }

  double *tmp = (double *) malloc(size * sizeof(double));
  merge(numbers, worksetsize, numbers+worksetsize, worksetsize, tmp);
  merge(numbers+(worksetsize*2), worksetsize, numbers+(worksetsize*3), worksetsize, tmp+(size/2));
  merge(tmp, worksetsize*2, tmp+(size/2), worksetsize*2, numbers);

  /*
  printf("Verifying result..\n");
  for (int i = 0; i < size - 1; i++) {
    if (numbers[i] > numbers[i+1])
      printf("Result is not correct\n");
  }
  */

  pthread_attr_destroy(&attributes);
  return 0;
}  

No meu laptop, obtenho os seguintes resultados:

real    0m6.660s
user    0m9.449s
sys     0m1.160s
Vendedor
fonte
1

Aqui está uma implementação seqüencial do C99 que tenta realmente fazer uso da distribuição conhecida. Basicamente, ele executa uma única rodada de classificação de balde usando as informações de distribuição, depois algumas rodadas de classificação rápida em cada intervalo, assumindo uma distribuição uniforme dentro dos limites do balde e, finalmente, uma classificação de seleção modificada para copiar os dados de volta ao buffer original. O quicksort memoriza os pontos de divisão, portanto, a classificação por seleção precisa apenas operar em pequenos baús. E apesar (por quê?) De toda essa complexidade, nem é muito rápido.

Para acelerar a avaliação, os valores são amostrados em alguns pontos e, posteriormente, somente a interpolação linear é usada. Na verdade, não importa se Φ é avaliado exatamente, desde que a aproximação seja estritamente monotônica.

Os tamanhos dos compartimentos são escolhidos de forma que a chance de transbordamento seja desprezível. Mais precisamente, com os parâmetros atuais, a chance de um conjunto de dados de 50000000 elementos causar um estouro de caixa é 3,65e-09. (Isso pode ser calculado usando a função de sobrevivência da distribuição de Poisson .)

Para compilar, use

gcc -std=c99 -msse3 -O3 -ffinite-math-only

Como há consideravelmente mais computação do que nas outras soluções, esses sinalizadores do compilador são necessários para torná-lo pelo menos razoavelmente rápido. Sem que -msse3as conversões doublese inttornem realmente lentas. Se sua arquitetura não suportar SSE3, essas conversões também poderão ser feitas usando a lrint()função

O código é bastante feio - não tenho certeza se isso atende ao requisito de ser "razoavelmente legível" ...

#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <math.h>

#define N 50000000
#define BINSIZE 720
#define MAXBINSIZE 880
#define BINCOUNT (N / BINSIZE)
#define SPLITS 64
#define PHI_VALS 513

double phi_vals[PHI_VALS];

int bin_index(double x)
{
    double y = (x + 8.0) * ((PHI_VALS - 1) / 16.0);
    int interval = y;
    y -= interval;
    return (1.0 - y) * phi_vals[interval] + y * phi_vals[interval + 1];
}

double bin_value(int bin)
{
    int left = 0;
    int right = PHI_VALS - 1;
    do
    {
        int centre = (left + right) / 2;
        if (bin < phi_vals[centre])
            right = centre;
        else
            left = centre;
    } while (right - left > 1);
    double frac = (bin - phi_vals[left]) / (phi_vals[right] - phi_vals[left]);
    return (left + frac) * (16.0 / (PHI_VALS - 1)) - 8.0;
}

void gaussian_sort(double *restrict a)
{
    double *b = malloc(BINCOUNT * MAXBINSIZE * sizeof(double));
    double **pos = malloc(BINCOUNT * sizeof(double*));
    for (size_t i = 0; i < BINCOUNT; ++i)
        pos[i] = b + MAXBINSIZE * i;
    for (size_t i = 0; i < N; ++i)
        *pos[bin_index(a[i])]++ = a[i];
    double left_val, right_val = bin_value(0);
    for (size_t bin = 0, i = 0; bin < BINCOUNT; ++bin)
    {
        left_val = right_val;
        right_val = bin_value(bin + 1);
        double *splits[SPLITS + 1];
        splits[0] = b + bin * MAXBINSIZE;
        splits[SPLITS] = pos[bin];
        for (int step = SPLITS; step > 1; step >>= 1)
            for (int left_split = 0; left_split < SPLITS; left_split += step)
            {
                double *left = splits[left_split];
                double *right = splits[left_split + step] - 1;
                double frac = (double)(left_split + (step >> 1)) / SPLITS;
                double pivot = (1.0 - frac) * left_val + frac * right_val;
                while (1)
                {
                    while (*left < pivot && left <= right)
                        ++left;
                    while (*right >= pivot && left < right)
                        --right;
                    if (left >= right)
                        break;
                    double tmp = *left;
                    *left = *right;
                    *right = tmp;
                    ++left;
                    --right;
                }
                splits[left_split + (step >> 1)] = left;
            }
        for (int left_split = 0; left_split < SPLITS; ++left_split)
        {
            double *left = splits[left_split];
            double *right = splits[left_split + 1] - 1;
            while (left <= right)
            {
                double *min = left;
                for (double *tmp = left + 1; tmp <= right; ++tmp)
                    if (*tmp < *min)
                        min = tmp;
                a[i++] = *min;
                *min = *right--;
            }
        }
    }
    free(b);
    free(pos);
}

int main()
{
    double *a = malloc(N * sizeof(double));
    FILE *f = fopen("gaussian.dat", "rb");
    assert(fread(a, sizeof(double), N, f) == N);
    fclose(f);
    for (int i = 0; i < PHI_VALS; ++i)
    {
        double x = (i * (16.0 / PHI_VALS) - 8.0) / sqrt(2.0);
        phi_vals[i] =  (erf(x) + 1.0) * 0.5 * BINCOUNT;
    }
    gaussian_sort(a);
    free(a);
}
Sven Marnach
fonte
4.098s! Eu tive que adicionar -lm para compilá-lo (para erf).
static_rtti
1
#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include <memory.h>
#include <algorithm>

// maps [-inf,+inf] to (0,1)
double normcdf(double x) {
        return 0.5 * (1 + erf(x * M_SQRT1_2));
}

int calcbin(double x, int bins) {
        return (int)floor(normcdf(x) * bins);
}

int *docensus(int bins, int n, double *arr) {
        int *hist = calloc(bins, sizeof(int));
        int i;
        for(i = 0; i < n; i++) {
                hist[calcbin(arr[i], bins)]++;
        }
        return hist;
}

void partition(int bins, int *orig_counts, double *arr) {
        int *counts = malloc(bins * sizeof(int));
        memcpy(counts, orig_counts, bins*sizeof(int));
        int *starts = malloc(bins * sizeof(int));
        int b, i;
        starts[0] = 0;
        for(i = 1; i < bins; i++) {
                starts[i] = starts[i-1] + counts[i-1];
        }
        for(b = 0; b < bins; b++) {
                while (counts[b] > 0) {
                        double v = arr[starts[b]];
                        int correctbin;
                        do {
                                correctbin = calcbin(v, bins);
                                int swappos = starts[correctbin];
                                double tmp = arr[swappos];
                                arr[swappos] = v;
                                v = tmp;
                                starts[correctbin]++;
                                counts[correctbin]--;
                        } while (correctbin != b);
                }
        }
        free(counts);
        free(starts);
}


void sortbins(int bins, int *counts, double *arr) {
        int start = 0;
        int b;
        for(b = 0; b < bins; b++) {
                std::sort(arr + start, arr + start + counts[b]);
                start += counts[b];
        }
}


void checksorted(double *arr, int n) {
        int i;
        for(i = 1; i < n; i++) {
                if (arr[i-1] > arr[i]) {
                        printf("out of order at %d: %lf %lf\n", i, arr[i-1], arr[i]);
                        exit(1);
                }
        }
}


int main(int argc, char *argv[]) {
        if (argc == 1 || argv[1] == NULL) {
                printf("Expected data size as argument\n");
                exit(1);
        }
        int n = atoi(argv[1]);
        const int cachesize = 128 * 1024; // a guess
        int bins = (int) (1.1 * n * sizeof(double) / cachesize);
        if (argc > 2) {
                bins = atoi(argv[2]);
        }
        printf("Using %d bins\n", bins);
        FILE *f = fopen("gaussian.dat", "rb");
        if (f == NULL) {
                printf("Couldn't open gaussian.dat\n");
                exit(1);
        }
        double *arr = malloc(n * sizeof(double));
        fread(arr, sizeof(double), n, f);
        fclose(f);

        int *counts = docensus(bins, n, arr);
        partition(bins, counts, arr);
        sortbins(bins, counts, arr);
        checksorted(arr, n);

        return 0;
}

Isso usa erf () para colocar cada elemento adequadamente em uma posição e depois classifica cada posição. Mantém a matriz totalmente no local.

Primeira passagem: docensus () conta o número de elementos em cada posição.

Segunda passagem: partition () permite a matriz, colocando cada elemento em sua bandeja apropriada

Terceira passagem: sortbins () executa um qsort em cada posição.

É meio ingênuo e chama a função erf () cara duas vezes para cada valor. O primeiro e o terceiro passes são potencialmente paralelos. O segundo é altamente serial e provavelmente é desacelerado por seus padrões de acesso à memória altamente aleatórios. Também pode valer a pena armazenar em cache o número de cada compartimento duplo, dependendo da taxa de velocidade da CPU e da velocidade da memória.

Este programa permite escolher o número de posições a serem usadas. Basta adicionar um segundo número à linha de comando. Eu o compilei com gcc -O3, mas minha máquina é tão fraca que não posso contar nenhum bom número de desempenho.

Editar: Poof! Meu programa C se transformou magicamente em um programa C ++ usando std :: sort!

frud
fonte
Você pode usar phi para um stdnormal_cdf mais rápido.
Alexandru
Quantas caixas devo colocar, aproximadamente?
static_rtti
@Alexandru: adicionei uma aproximação linear por partes ao normcdf e ganhei apenas cerca de 5% de velocidade.
frud
@static_rtti: Você não precisa colocar nenhum. Por padrão, o código escolhe a contagem de posições no depósito, para que o tamanho médio da bandeja seja 10/11 de 128kb. Poucas caixas e você não obtém o benefício do particionamento. Muitos e a fase da partição é interrompida devido ao estouro do cache.
FRUD
10.6s! Tentei jogar um pouco com o número de posições e obtive os melhores resultados com 5000 (um pouco acima do valor padrão de 3356). Devo dizer que era esperado que eu visse um desempenho muito melhor para sua solução ... Talvez seja o fato de você estar usando o qsort em vez do potencial std :: sort das soluções C ++?
static_rtti
1

Dê uma olhada na implementação de classificação radix por Michael Herf ( Radix Tricks ). Na minha máquina, a classificação foi 5 vezes mais rápida em comparação com o std::sortalgoritmo da minha primeira resposta. O nome da função de classificação é RadixSort11.

int main(void)
{
    std::ifstream ifs("C:\\Temp\\gaussian.dat", std::ios::binary | std::ios::in);
    std::vector<float> v;
    v.reserve(50000000);
    double d;
    while (ifs.read(reinterpret_cast<char*>(&d), sizeof(double)))
        v.push_back(static_cast<float>(d));
    std::vector<float> vres(v.size(), 0.0);
    clock_t c0 = clock();
    RadixSort11(&v[0], &vres[0], v.size());
    std::cout << "Finished after: "
              << static_cast<double>(clock() - c0) / CLOCKS_PER_SEC << std::endl;
    return 0;
}
Christian Ammer
fonte