Como esperar que todas as goroutines terminem sem usar o tempo. Sono?

108

Este código seleciona todos os arquivos xml na mesma pasta, como o executável invocado e aplica de forma assíncrona o processamento a cada resultado no método de retorno de chamada (no exemplo abaixo, apenas o nome do arquivo é impresso).

Como evito usar o método sleep para impedir que o método principal saia? Tenho problemas para entender os canais (presumo que seja o que for preciso, para sincronizar os resultados), então qualquer ajuda é apreciada!

package main

import (
    "fmt"
    "io/ioutil"
    "path"
    "path/filepath"
    "os"
    "runtime"
    "time"
)

func eachFile(extension string, callback func(file string)) {
    exeDir := filepath.Dir(os.Args[0])
    files, _ := ioutil.ReadDir(exeDir)
    for _, f := range files {
            fileName := f.Name()
            if extension == path.Ext(fileName) {
                go callback(fileName)
            }
    }
}


func main() {
    maxProcs := runtime.NumCPU()
    runtime.GOMAXPROCS(maxProcs)

    eachFile(".xml", func(fileName string) {
                // Custom logic goes in here
                fmt.Println(fileName)
            })

    // This is what i want to get rid of
    time.Sleep(100 * time.Millisecond)
}
Dante
fonte

Respostas:

173

Você pode usar sync.WaitGroup . Citando o exemplo vinculado:

package main

import (
        "net/http"
        "sync"
)

func main() {
        var wg sync.WaitGroup
        var urls = []string{
                "http://www.golang.org/",
                "http://www.google.com/",
                "http://www.somestupidname.com/",
        }
        for _, url := range urls {
                // Increment the WaitGroup counter.
                wg.Add(1)
                // Launch a goroutine to fetch the URL.
                go func(url string) {
                        // Decrement the counter when the goroutine completes.
                        defer wg.Done()
                        // Fetch the URL.
                        http.Get(url)
                }(url)
        }
        // Wait for all HTTP fetches to complete.
        wg.Wait()
}
zzzz
fonte
11
Alguma razão para você fazer wg.Add (1) fora da rotina go? Podemos fazer isso antes de adiar wg.Done ()?
sábado
18
sentou, sim, há um motivo, é descrito em sync.WaitGroup.Add docs: Note that calls with positive delta must happen before the call to Wait, or else Wait may wait for too small a group. Typically this means the calls to Add should execute before the statement creating the goroutine or other event to be waited for. See the WaitGroup example.
wobmene
15
A adaptação desse código me causou uma longa sessão de depuração porque meu goroutine era uma função nomeada e passar no WaitGroup como um valor irá copiá-lo e tornar wg.Done () ineficaz. Embora isso possa ser corrigido passando um ponteiro & wg, a melhor maneira de evitar esses erros é declarar a variável WaitGroup como um ponteiro em primeiro lugar: em wg := new(sync.WaitGroup)vez de var wg sync.WaitGroup.
Robert Jack Will
Acho que é válido escrever wg.Add(len(urls))logo acima da linha for _, url := range urls, acredito que seja melhor já que você usa o Add apenas uma vez.
Victor
@RobertJackWill: Boa nota! A propósito , isso é abordado nos documentos : "Um WaitGroup não deve ser copiado após o primeiro uso. Que pena que Go não tem uma maneira de fazer isso . Na verdade, no entanto, go vetdetecta esse caso e avisa com" passes de função bloqueio por valor : sync.WaitGroup contém sync.noCopy ".
Brent Bradburn
56

WaitGroups são definitivamente a maneira canônica de fazer isso. No entanto, apenas por uma questão de integridade, aqui está a solução que era comumente usada antes da introdução dos WaitGroups. A ideia básica é usar um canal para dizer "terminei" e fazer a goroutine principal esperar até que cada rotina gerada tenha relatado sua conclusão.

func main() {
    c := make(chan struct{}) // We don't need any data to be passed, so use an empty struct
    for i := 0; i < 100; i++ {
        go func() {
            doSomething()
            c <- struct{}{} // signal that the routine has completed
        }()
    }

    // Since we spawned 100 routines, receive 100 messages.
    for i := 0; i < 100; i++ {
        <- c
    }
}
Joshlf
fonte
9
É bom ver uma solução com canais simples. Um bônus adicional: se doSomething()retornar algum resultado, você pode colocá-lo no canal e coletar e processar os resultados no segundo loop for (assim que estiverem prontos)
andras
4
Só funciona se você já souber a quantidade de gorutines que gostaria de iniciar. E se você estiver escrevendo algum tipo de rastreador de html e iniciar gorutines de maneira recursiva para cada link na página?
Brightdev
Você precisará controlar isso de alguma forma. Com WaitGroups é um pouco mais fácil porque cada vez que você gera um novo goroutine, você pode primeiro fazê wg.Add(1)-lo e, portanto, ele irá mantê-los sob controle. Com canais seria um pouco mais difícil.
joshlf
c será bloqueado, pois todas as rotinas go tentarão acessá-lo e ele está sem buffer
Edwin Ikechukwu Okonkwo
Se por "bloquear" você quer dizer que o programa irá travar, isso não é verdade. Você pode tentar executá-lo sozinho. O motivo é que os únicos goroutines que escrevem csão diferentes do goroutine principal, que lê de c. Assim, a goroutine principal está sempre disponível para ler um valor fora do canal, o que acontecerá quando uma das goroutines estiver disponível para gravar um valor no canal. Você está certo que se esse código não gerasse goroutines, mas, em vez disso, executasse tudo em uma única goroutine, haveria um deadlock.
joshlf
8

sync.WaitGroup pode ajudá-lo aqui.

package main

import (
    "fmt"
    "sync"
    "time"
)


func wait(seconds int, wg * sync.WaitGroup) {
    defer wg.Done()

    time.Sleep(time.Duration(seconds) * time.Second)
    fmt.Println("Slept ", seconds, " seconds ..")
}


func main() {
    var wg sync.WaitGroup

    for i := 0; i <= 5; i++ {
        wg.Add(1)   
        go wait(i, &wg)
    }
    wg.Wait()
}
dimmg
fonte
1

Embora sync.waitGroup(wg) seja a maneira canônica de avançar, ele exige que você faça pelo menos algumas de suas wg.Addchamadas antes wg.Waitde todas serem concluídas. Isso pode não ser viável para coisas simples como um rastreador da web, onde você não sabe o número de chamadas recursivas de antemão e leva um tempo para recuperar os dados que conduzem owg.Add chamadas. Afinal, você precisa carregar e analisar a primeira página antes de saber o tamanho do primeiro lote de páginas filhas.

Eu escrevi uma solução usando canais, evitando waitGroupem minha solução o exercício Tour of Go - web crawler . Cada vez que uma ou mais rotinas de go são iniciadas, você envia o número para o childrencanal. Cada vez que uma rotina go está prestes a ser concluída, você envia um 1para o donecanal. Quando a soma dos filhos for igual à soma de done, estamos prontos.

Minha única preocupação restante é o tamanho do resultscanal embutido, mas essa é uma limitação do Go (atual).


// recursionController is a data structure with three channels to control our Crawl recursion.
// Tried to use sync.waitGroup in a previous version, but I was unhappy with the mandatory sleep.
// The idea is to have three channels, counting the outstanding calls (children), completed calls 
// (done) and results (results).  Once outstanding calls == completed calls we are done (if you are
// sufficiently careful to signal any new children before closing your current one, as you may be the last one).
//
type recursionController struct {
    results  chan string
    children chan int
    done     chan int
}

// instead of instantiating one instance, as we did above, use a more idiomatic Go solution
func NewRecursionController() recursionController {
    // we buffer results to 1000, so we cannot crawl more pages than that.  
    return recursionController{make(chan string, 1000), make(chan int), make(chan int)}
}

// recursionController.Add: convenience function to add children to controller (similar to waitGroup)
func (rc recursionController) Add(children int) {
    rc.children <- children
}

// recursionController.Done: convenience function to remove a child from controller (similar to waitGroup)
func (rc recursionController) Done() {
    rc.done <- 1
}

// recursionController.Wait will wait until all children are done
func (rc recursionController) Wait() {
    fmt.Println("Controller waiting...")
    var children, done int
    for {
        select {
        case childrenDelta := <-rc.children:
            children += childrenDelta
            // fmt.Printf("children found %v total %v\n", childrenDelta, children)
        case <-rc.done:
            done += 1
            // fmt.Println("done found", done)
        default:
            if done > 0 && children == done {
                fmt.Printf("Controller exiting, done = %v, children =  %v\n", done, children)
                close(rc.results)
                return
            }
        }
    }
}

Código-fonte completo para a solução

dirkjot
fonte
1

Aqui está uma solução que emprega WaitGroup.

Primeiro, defina 2 métodos de utilidade:

package util

import (
    "sync"
)

var allNodesWaitGroup sync.WaitGroup

func GoNode(f func()) {
    allNodesWaitGroup.Add(1)
    go func() {
        defer allNodesWaitGroup.Done()
        f()
    }()
}

func WaitForAllNodes() {
    allNodesWaitGroup.Wait()
}

Em seguida, substitua a invocação de callback:

go callback(fileName)

Com uma chamada para sua função de utilidade:

util.GoNode(func() { callback(fileName) })

Última etapa, adicione esta linha no final de seu main, em vez de seu sleep. Isso garantirá que o thread principal esteja aguardando que todas as rotinas sejam concluídas antes que o programa possa ser interrompido.

func main() {
  // ...
  util.WaitForAllNodes()
}
gamliela
fonte