como ouvir N canais? (declaração de seleção dinâmica)

116

para iniciar um loop infinito de execução de duas goroutines, posso usar o código abaixo:

após receber a mensagem, ele iniciará uma nova goroutine e continuará para sempre.

c1 := make(chan string)
c2 := make(chan string)

go DoStuff(c1, 5)
go DoStuff(c2, 2)

for ; true;  {
    select {
    case msg1 := <-c1:
        fmt.Println("received ", msg1)
        go DoStuff(c1, 1)
    case msg2 := <-c2:
        fmt.Println("received ", msg2)
        go DoStuff(c2, 9)
    }
}

Agora, gostaria de ter o mesmo comportamento para N gorotinas, mas como ficará a instrução select nesse caso?

Este é o bit de código com o qual comecei, mas estou confuso sobre como codificar a instrução select

numChans := 2

//I keep the channels in this slice, and want to "loop" over them in the select statemnt
var chans = [] chan string{}

for i:=0;i<numChans;i++{
    tmp := make(chan string);
    chans = append(chans, tmp);
    go DoStuff(tmp, i + 1)

//How shall the select statment be coded for this case?  
for ; true;  {
    select {
    case msg1 := <-c1:
        fmt.Println("received ", msg1)
        go DoStuff(c1, 1)
    case msg2 := <-c2:
        fmt.Println("received ", msg2)
        go DoStuff(c2, 9)
    }
}
JohnSmith
fonte
4
Acho que você está querendo multiplexação de canais. golang.org/doc/effective_go.html#chan_of_chan Basicamente, você tem um único canal para ouvir e, em seguida, vários canais filho que se encaminham para o canal principal. Pergunta SO relacionada: stackoverflow.com/questions/10979608/…
Brenden

Respostas:

152

Você pode fazer isso usando a Selectfunção do pacote reflet :

func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)

Selecionar executa uma operação de seleção descrita pela lista de casos. Como a instrução Go select, ela bloqueia até que pelo menos um dos casos possa prosseguir, faz uma escolha pseudo-aleatória uniforme e, em seguida, executa esse caso. Ele retorna o índice do caso escolhido e, se aquele caso for uma operação de recepção, o valor recebido e um booleano indicando se o valor corresponde a um envio no canal (em oposição a um valor zero recebido porque o canal está fechado).

Você passa uma matriz de SelectCaseestruturas que identificam o canal a ser selecionado, a direção da operação e um valor a ser enviado no caso de uma operação de envio.

Então você poderia fazer algo assim:

cases := make([]reflect.SelectCase, len(chans))
for i, ch := range chans {
    cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
chosen, value, ok := reflect.Select(cases)
// ok will be true if the channel has not been closed.
ch := chans[chosen]
msg := value.String()

Você pode experimentar um exemplo mais detalhado aqui: http://play.golang.org/p/8zwvSk4kjx

James Henstridge
fonte
4
Existe um limite prático para o número de casos em tal seleção? Aquele que se você for além disso, o desempenho será severamente afetado?
Maxim Vladimirsky
4
Talvez seja minha incompetência, mas achei esse padrão muito difícil de trabalhar quando você está enviando e recebendo estruturas complexas através do canal. Passar por um canal "agregado" compartilhado, como disse Tim Allclair, foi muito mais fácil no meu caso.
Bora M. Alper de
90

Você pode fazer isso envolvendo cada canal em uma goroutine que "encaminha" mensagens para um canal "agregado" compartilhado. Por exemplo:

agg := make(chan string)
for _, ch := range chans {
  go func(c chan string) {
    for msg := range c {
      agg <- msg
    }
  }(ch)
}

select {
case msg <- agg:
    fmt.Println("received ", msg)
}

Se você precisa saber de qual canal a mensagem se originou, pode envolvê-la em uma estrutura com qualquer informação extra antes de encaminhá-la para o canal agregado.

No meu teste (limitado), este método tem um desempenho muito melhor usando o pacote reflet:

$ go test dynamic_select_test.go -test.bench=.
...
BenchmarkReflectSelect         1    5265109013 ns/op
BenchmarkGoSelect             20      81911344 ns/op
ok      command-line-arguments  9.463s

Código de referência aqui

Tim Allclair
fonte
2
Seu código de benchmark está incorreto, você precisa fazer um loopb.N dentro de um benchmark. Caso contrário, os resultados (que são divididos por b.N1 e 2000000000 em sua saída) serão completamente sem sentido.
Dave C
2
@DaveC Obrigado! A conclusão não muda, mas os resultados são muito mais sensatos.
Tim Allclair,
1
Na verdade, eu fiz um hack rápido em seu código de benchmark para obter alguns números reais . Pode muito bem haver algo faltando / errado neste benchmark, mas a única coisa que o código de reflexão mais complicado tem a seu favor é que a configuração é mais rápida (com GOMAXPROCS = 1), já que não precisa de um monte de goroutines. Em todos os outros casos, um canal de fusão de goroutine simples destrói a solução de reflexão (em ~ 2 ordens de magnitude).
Dave C
2
Uma desvantagem importante (em comparação com a reflect.Selectabordagem) é que os goroutines realizam o buffer de mesclagem, no mínimo, um único valor em cada canal sendo mesclado. Normalmente, isso não será um problema, mas em algumas aplicações específicas que podem ser um obstáculo :(.
Dave C
1
um canal de mesclagem com buffer torna o problema pior. O problema é que apenas a solução refletida pode ter uma semântica totalmente sem buffer. Eu fui em frente e postei o código de teste que estava experimentando como uma resposta separada para (espero) esclarecer o que eu estava tentando dizer.
Dave C
22

Para expandir alguns comentários sobre as respostas anteriores e fornecer uma comparação mais clara, aqui está um exemplo de ambas as abordagens apresentadas até agora com a mesma entrada, uma fatia de canais para ler e uma função para chamar para cada valor que também precisa saber qual canal de onde veio o valor.

Existem três diferenças principais entre as abordagens:

  • Complexidade. Embora possa ser parcialmente uma preferência do leitor, acho a abordagem do canal mais idiomática, direta e legível.

  • Atuação. No meu sistema Xeon amd64, o goroutines + channels executa a solução de reflexão em cerca de duas ordens de magnitude (em geral, a reflexão em Go é frequentemente mais lenta e só deve ser usada quando absolutamente necessário). Obviamente, se houver algum atraso significativo no processamento da função dos resultados ou na gravação dos valores nos canais de entrada, essa diferença de desempenho pode facilmente se tornar insignificante.

  • Semântica de bloqueio / buffer. A importância disso depende do caso de uso. Na maioria das vezes, isso não importa ou o pequeno buffer extra na solução de mesclagem de goroutine pode ser útil para o rendimento. No entanto, se for desejável ter a semântica de que apenas um único gravador seja desbloqueado e seu valor totalmente gerenciado antes que qualquer outro gravador seja desbloqueado, então isso só pode ser alcançado com a solução de reflexão.

Observe que ambas as abordagens podem ser simplificadas se o "id" do canal de envio não for necessário ou se os canais de origem nunca forem fechados.

Canal de fusão Goroutine:

// Process1 calls `fn` for each value received from any of the `chans`
// channels. The arguments to `fn` are the index of the channel the
// value came from and the string value. Process1 returns once all the
// channels are closed.
func Process1(chans []<-chan string, fn func(int, string)) {
    // Setup
    type item struct {
        int    // index of which channel this came from
        string // the actual string item
    }
    merged := make(chan item)
    var wg sync.WaitGroup
    wg.Add(len(chans))
    for i, c := range chans {
        go func(i int, c <-chan string) {
            // Reads and buffers a single item from `c` before
            // we even know if we can write to `merged`.
            //
            // Go doesn't provide a way to do something like:
            //     merged <- (<-c)
            // atomically, where we delay the read from `c`
            // until we can write to `merged`. The read from
            // `c` will always happen first (blocking as
            // required) and then we block on `merged` (with
            // either the above or the below syntax making
            // no difference).
            for s := range c {
                merged <- item{i, s}
            }
            // If/when this input channel is closed we just stop
            // writing to the merged channel and via the WaitGroup
            // let it be known there is one fewer channel active.
            wg.Done()
        }(i, c)
    }
    // One extra goroutine to watch for all the merging goroutines to
    // be finished and then close the merged channel.
    go func() {
        wg.Wait()
        close(merged)
    }()

    // "select-like" loop
    for i := range merged {
        // Process each value
        fn(i.int, i.string)
    }
}

Seleção de reflexão:

// Process2 is identical to Process1 except that it uses the reflect
// package to select and read from the input channels which guarantees
// there is only one value "in-flight" (i.e. when `fn` is called only
// a single send on a single channel will have succeeded, the rest will
// be blocked). It is approximately two orders of magnitude slower than
// Process1 (which is still insignificant if their is a significant
// delay between incoming values or if `fn` runs for a significant
// time).
func Process2(chans []<-chan string, fn func(int, string)) {
    // Setup
    cases := make([]reflect.SelectCase, len(chans))
    // `ids` maps the index within cases to the original `chans` index.
    ids := make([]int, len(chans))
    for i, c := range chans {
        cases[i] = reflect.SelectCase{
            Dir:  reflect.SelectRecv,
            Chan: reflect.ValueOf(c),
        }
        ids[i] = i
    }

    // Select loop
    for len(cases) > 0 {
        // A difference here from the merging goroutines is
        // that `v` is the only value "in-flight" that any of
        // the workers have sent. All other workers are blocked
        // trying to send the single value they have calculated
        // where-as the goroutine version reads/buffers a single
        // extra value from each worker.
        i, v, ok := reflect.Select(cases)
        if !ok {
            // Channel cases[i] has been closed, remove it
            // from our slice of cases and update our ids
            // mapping as well.
            cases = append(cases[:i], cases[i+1:]...)
            ids = append(ids[:i], ids[i+1:]...)
            continue
        }

        // Process each value
        fn(ids[i], v.String())
    }
}

[Código completo no playground Go .]

Dave C
fonte
1
Também é importante notar que a solução goroutines + channels não pode fazer tudo selectou reflect.Selectfaz. Os goroutines continuarão girando até consumirem tudo dos canais, então não há uma maneira clara de você Process1sair mais cedo. Também existe o potencial de problemas se você tiver vários leitores, pois os goroutines armazenam em buffer um item de cada um dos canais, o que não acontecerá com select.
James Henstridge
@JamesHenstridge, sua primeira observação sobre parar não é verdade. Você planejaria interromper o Processo1 exatamente da mesma maneira que planejaria para interromper o Processo2; por exemplo, adicionando um canal de "parada" que é fechado quando os goroutines devem parar. O Processo1 precisaria de dois casos selectem um forloop em vez do for rangeloop mais simples usado atualmente. O Processo2 precisaria colocar outro caso casese lidar com esse valor especial i.
Dave C
Isso ainda não resolve o problema de que você está lendo valores dos canais que não serão usados ​​no caso de parada antecipada.
James Henstridge
0

Por que essa abordagem não funcionaria presumindo que alguém está enviando eventos?

func main() {
    numChans := 2
    var chans = []chan string{}

    for i := 0; i < numChans; i++ {
        tmp := make(chan string)
        chans = append(chans, tmp)
    }

    for true {
        for i, c := range chans {
            select {
            case x = <-c:
                fmt.Printf("received %d \n", i)
                go DoShit(x, i)
            default: continue
            }
        }
    }
}
noonex
fonte
8
Este é um loop de rotação. Enquanto se espera que um canal de entrada tenha um valor, isso consome toda a CPU disponível. O ponto principal de selectem vários canais (sem uma defaultcláusula) é que ele espera com eficiência até que pelo menos um esteja pronto sem girar.
Dave C
0

Opção possivelmente mais simples:

Em vez de ter uma matriz de canais, por que não passar apenas um canal como parâmetro para as funções que estão sendo executadas em goroutines separadas e, em seguida, ouvir o canal em uma goroutine de consumidor?

Isso permite que você selecione apenas um canal em seu ouvinte, tornando a seleção simples e evitando a criação de novos goroutines para agregar mensagens de vários canais?

Fernando Sanchez
fonte