C ++ 0x não possui semáforos? Como sincronizar threads?

135

É verdade que o C ++ 0x virá sem semáforos? Já existem algumas perguntas no Stack Overflow sobre o uso de semáforos. Eu os uso (semáforos posix) o tempo todo para deixar um thread aguardar algum evento em outro thread:

void thread0(...)
{
  doSomething0();

  event1.wait();

  ...
}

void thread1(...)
{
  doSomething1();

  event1.post();

  ...
}

Se eu faria isso com um mutex:

void thread0(...)
{
  doSomething0();

  event1.lock(); event1.unlock();

  ...
}

void thread1(...)
{
  event1.lock();

  doSomethingth1();

  event1.unlock();

  ...
}

Problema: É feio e não é garantido que o thread1 bloqueie o mutex primeiro (como o mesmo thread deve bloquear e desbloquear um mutex, você também não pode bloquear o evento1 antes do início do thread0 e do thread1).

Portanto, como o boost também não possui semáforos, qual é a maneira mais simples de conseguir o que foi mencionado acima?

tauran
fonte
Talvez use a condição mutex e std :: promessa e std :: future?
Yves

Respostas:

179

Você pode criar facilmente um a partir de um mutex e uma variável de condição:

#include <mutex>
#include <condition_variable>

class semaphore
{
private:
    std::mutex mutex_;
    std::condition_variable condition_;
    unsigned long count_ = 0; // Initialized as locked.

public:
    void notify() {
        std::lock_guard<decltype(mutex_)> lock(mutex_);
        ++count_;
        condition_.notify_one();
    }

    void wait() {
        std::unique_lock<decltype(mutex_)> lock(mutex_);
        while(!count_) // Handle spurious wake-ups.
            condition_.wait(lock);
        --count_;
    }

    bool try_wait() {
        std::lock_guard<decltype(mutex_)> lock(mutex_);
        if(count_) {
            --count_;
            return true;
        }
        return false;
    }
};
Maxim Egorushkin
fonte
96
alguém deve enviar uma proposta ao
7
um comentário aqui que me intrigou inicialmente é o bloqueio em espera, pode-se perguntar como um thread pode passar para notificar se o bloqueio é retido em espera? a resposta um tanto mal obscuramente documentado é que condition_variable.wait pulsa o bloqueio, permitindo que outro segmento para passar comunicar de forma atômica, pelo menos é assim que eu entendo
31
Foi deliberadamente excluído do Boost com base no fato de que um semáforo é muita corda para os programadores se pendurarem. As variáveis ​​de condição supostamente são mais gerenciáveis. Entendo o argumento deles, mas me sinto um pouco apadrinhado. Eu assumo que a mesma lógica se aplica ao C ++ 11 - espera-se que os programadores gravem seus programas de uma maneira que "naturalmente" use condvars ou outras técnicas de sincronização aprovadas. O fornecimento de um semáforo iria contra isso, independentemente de ser implementado em cima de condvar ou de forma nativa.
31412 Steve Steveop
5
Nota - Consulte en.wikipedia.org/wiki/Spurious_wakeup para obter a justificativa por trás do while(!count_)loop.
Dan Nissenbaum 16/11/12
3
@ Maxim Me desculpe, eu não acho que você esteja certo. sem_wait e sem_post apenas syscall também na contenção (consulte sourceware.org/git/?p=glibc.git;a=blob;f=nptl/sem_wait.c ) para que o código aqui acabe duplicando a implementação da libc, com possíveis erros. Se você pretende portabilidade em qualquer sistema, pode ser uma solução, mas se você precisar apenas de compatibilidade com o Posix, use o semáforo Posix.
precisa saber é o seguinte
107

Com base na resposta de Maxim Yegorushkin , tentei fazer o exemplo no estilo C ++ 11.

#include <mutex>
#include <condition_variable>

class Semaphore {
public:
    Semaphore (int count_ = 0)
        : count(count_) {}

    inline void notify()
    {
        std::unique_lock<std::mutex> lock(mtx);
        count++;
        cv.notify_one();
    }

    inline void wait()
    {
        std::unique_lock<std::mutex> lock(mtx);

        while(count == 0){
            cv.wait(lock);
        }
        count--;
    }

private:
    std::mutex mtx;
    std::condition_variable cv;
    int count;
};
Tsuneo Yoshioka
fonte
34
Você pode fazer wait () também com três linhas:cv.wait(lck, [this]() { return count > 0; });
Domi
2
Adicionar outra classe no espírito de lock_guard também é útil. Da maneira RAII, o construtor, que toma o semáforo como referência, chama a chamada wait () do semáforo e o destruidor chama sua chamada notify (). Isso evita que as exceções falhem ao liberar o semáforo.
Jim Hunziker
não existe um bloqueio, se, por exemplo, N threads chamados wait () e count == 0, então cv.notify_one (); nunca é chamado, já que o mtx não foi lançado?
Marcello
1
@ Marcello Os threads em espera não mantêm a trava. O ponto principal das variáveis ​​de condição é fornecer uma operação atômica de "desbloqueio e espera".
David Schwartz
3
Você deve liberar o bloqueio antes de ligar para notify_one () para evitar o bloqueio imediato da ativação ... veja aqui: en.cppreference.com/w/cpp/thread/condition_variable/notify_all #
kylefinn
38

Decidi escrever o semáforo C ++ 11 mais robusto / genérico que eu pudesse, no estilo do padrão, tanto quanto eu poderia (observe using semaphore = ..., você normalmente usaria o nome semaphoresemelhante ao normalmente usando stringnão basic_string):

template <typename Mutex, typename CondVar>
class basic_semaphore {
public:
    using native_handle_type = typename CondVar::native_handle_type;

    explicit basic_semaphore(size_t count = 0);
    basic_semaphore(const basic_semaphore&) = delete;
    basic_semaphore(basic_semaphore&&) = delete;
    basic_semaphore& operator=(const basic_semaphore&) = delete;
    basic_semaphore& operator=(basic_semaphore&&) = delete;

    void notify();
    void wait();
    bool try_wait();
    template<class Rep, class Period>
    bool wait_for(const std::chrono::duration<Rep, Period>& d);
    template<class Clock, class Duration>
    bool wait_until(const std::chrono::time_point<Clock, Duration>& t);

    native_handle_type native_handle();

private:
    Mutex   mMutex;
    CondVar mCv;
    size_t  mCount;
};

using semaphore = basic_semaphore<std::mutex, std::condition_variable>;

template <typename Mutex, typename CondVar>
basic_semaphore<Mutex, CondVar>::basic_semaphore(size_t count)
    : mCount{count}
{}

template <typename Mutex, typename CondVar>
void basic_semaphore<Mutex, CondVar>::notify() {
    std::lock_guard<Mutex> lock{mMutex};
    ++mCount;
    mCv.notify_one();
}

template <typename Mutex, typename CondVar>
void basic_semaphore<Mutex, CondVar>::wait() {
    std::unique_lock<Mutex> lock{mMutex};
    mCv.wait(lock, [&]{ return mCount > 0; });
    --mCount;
}

template <typename Mutex, typename CondVar>
bool basic_semaphore<Mutex, CondVar>::try_wait() {
    std::lock_guard<Mutex> lock{mMutex};

    if (mCount > 0) {
        --mCount;
        return true;
    }

    return false;
}

template <typename Mutex, typename CondVar>
template<class Rep, class Period>
bool basic_semaphore<Mutex, CondVar>::wait_for(const std::chrono::duration<Rep, Period>& d) {
    std::unique_lock<Mutex> lock{mMutex};
    auto finished = mCv.wait_for(lock, d, [&]{ return mCount > 0; });

    if (finished)
        --mCount;

    return finished;
}

template <typename Mutex, typename CondVar>
template<class Clock, class Duration>
bool basic_semaphore<Mutex, CondVar>::wait_until(const std::chrono::time_point<Clock, Duration>& t) {
    std::unique_lock<Mutex> lock{mMutex};
    auto finished = mCv.wait_until(lock, t, [&]{ return mCount > 0; });

    if (finished)
        --mCount;

    return finished;
}

template <typename Mutex, typename CondVar>
typename basic_semaphore<Mutex, CondVar>::native_handle_type basic_semaphore<Mutex, CondVar>::native_handle() {
    return mCv.native_handle();
}
David
fonte
Isso funciona, com uma edição menor. As chamadas wait_forand do wait_untilmétodo com o predicado retornam um valor booleano (não um `std :: cv_status).
Jdknight #
desculpe por escolher tão tarde no jogo. std::size_testá sem sinal, portanto, decrementá-lo abaixo de zero é UB, e sempre será >= 0. IMHO countdeve ser um int.
Richard Hodges
3
@RichardHodges não há como diminuir abaixo de zero, então não há problema, e o que significaria uma contagem negativa em um semáforo? Isso nem faz sentido IMO.
David
1
@ David E se um tópico tivesse que esperar que outros iniciassem as coisas? por exemplo, 1 thread do leitor para aguardar 4 threads, eu chamaria o construtor de semáforo com -3 para fazer com que o thread do leitor esperasse até que todos os outros threads fizessem uma postagem. Eu acho que existem outras maneiras de fazer isso, mas não é razoável? Eu acho que é de fato a pergunta que o OP está fazendo, mas com mais "threads1" s.
jmmut
2
@RichardHodges por ser muito pedante, decrementar um tipo inteiro não assinado abaixo de 0 não é UB.
jcai
15

de acordo com os semáforos posix, eu acrescentaria

class semaphore
{
    ...
    bool trywait()
    {
        boost::mutex::scoped_lock lock(mutex_);
        if(count_)
        {
            --count_;
            return true;
        }
        else
        {
            return false;
        }
    }
};

E eu prefiro usar um mecanismo de sincronização em um nível conveniente de abstração, em vez de sempre copiar colando uma versão costurada usando operadores mais básicos.

Michael Zillich
fonte
9

Você também pode conferir o cpp11-on-multicore - ele possui uma implementação de semáforo portátil e ideal.

O repositório também contém outros itens de threading que complementam o threading do c ++ 11.

onqtam
fonte
8

Você pode trabalhar com variáveis ​​mutex e condição. Você obtém acesso exclusivo com o mutex, verifique se deseja continuar ou precisa esperar pelo outro lado. Se precisar esperar, aguarde em uma condição. Quando o outro encadeamento determina que você pode continuar, ele sinaliza a condição.

Há um pequeno exemplo na biblioteca boost :: thread que você provavelmente pode copiar (as bibliotecas C ++ 0x e boost thread são muito semelhantes).

David Rodríguez - dribeas
fonte
A condição sinaliza apenas para threads em espera ou não? Portanto, se o thread0 não estiver esperando quando o thread1 sinalizar, será bloqueado mais tarde? Além disso: não preciso da trava adicional que acompanha a condição - é uma sobrecarga.
Tauran
Sim, a condição sinaliza apenas threads em espera. O padrão comum é ter uma variável com o estado e uma condição, caso você precise esperar. Pense em um produtor / consumidor, haverá uma contagem dos itens no buffer, o produtor trava, adiciona o elemento, incrementa a contagem e os sinais. O consumidor bloqueia, verifica o contador e se não zero consome, enquanto se zero espera na condição.
David Rodríguez - dribeas
2
Você pode simular um semáforo da seguinte maneira: Inicialize uma variável com o valor que você daria ao semáforo e, em seguida, wait()será traduzido para "travar, verifique a contagem se houver um decréscimo diferente de zero e continue; se a espera zero na condição" enquanto postseria "travar, contador de incremento, sinaliza se foi 0 "
David Rodríguez - dribeas
Sim, parece bom. Gostaria de saber se os semáforos posix são implementados da mesma maneira.
Tauran
@tauran: Não sei ao certo (e pode depender de qual sistema operacional Posix), mas acho improvável. Os semáforos são tradicionalmente uma primitiva de sincronização de "nível inferior" do que mutexes e variáveis ​​de condição e, em princípio, podem ser mais eficientes do que seriam se implementados em cima de um condvar. Portanto, o mais provável em um determinado sistema operacional é que todas as primitivas de sincronização no nível do usuário sejam construídas sobre algumas ferramentas comuns que interagem com o planejador.
31412 Steve Steveop
3

Também pode ser útil o invólucro de semáforo RAII nos threads:

class ScopedSemaphore
{
public:
    explicit ScopedSemaphore(Semaphore& sem) : m_Semaphore(sem) { m_Semaphore.Wait(); }
    ScopedSemaphore(const ScopedSemaphore&) = delete;
    ~ScopedSemaphore() { m_Semaphore.Notify(); }

   ScopedSemaphore& operator=(const ScopedSemaphore&) = delete;

private:
    Semaphore& m_Semaphore;
};

Exemplo de uso no aplicativo multithread:

boost::ptr_vector<std::thread> threads;
Semaphore semaphore;

for (...)
{
    ...
    auto t = new std::thread([..., &semaphore]
    {
        ScopedSemaphore scopedSemaphore(semaphore);
        ...
    }
    );
    threads.push_back(t);
}

for (auto& t : threads)
    t.join();
slasla
fonte
3

O C ++ 20 finalmente terá semáforos - std::counting_semaphore<max_count>.

Estes terão (pelo menos) os seguintes métodos:

  • acquire() (bloqueando)
  • try_acquire() (sem bloqueio, retorna imediatamente)
  • try_acquire_for() (sem bloqueio, dura uma duração)
  • try_acquire_until() (sem bloqueio, leva um tempo para parar de tentar)
  • release()

Isso ainda não está listado na cppreference, mas você pode ler os slides da apresentação do CppCon 2019 ou assistir ao vídeo . Há também a proposta oficial P0514R4 , mas não tenho certeza de que seja a versão mais atualizada.

einpoklum
fonte
2

Eu encontrei o shared_ptr e o fraco_ptr, um longo com uma lista, fez o trabalho que eu precisava. Meu problema era que eu tinha vários clientes que desejavam interagir com os dados internos de um host. Normalmente, o host atualiza os dados por conta própria; no entanto, se um cliente solicita, o host precisa parar de atualizar até que nenhum cliente esteja acessando os dados do host. Ao mesmo tempo, um cliente pode solicitar acesso exclusivo, para que nenhum outro cliente, nem o host, possa modificar os dados do host.

Como fiz isso, criei uma struct:

struct UpdateLock
{
    typedef std::shared_ptr< UpdateLock > ptr;
};

Cada cliente teria um membro desses:

UpdateLock::ptr m_myLock;

Em seguida, o host teria um membro fraco_ptr para exclusividade e uma lista de fraco_ptrs para bloqueios não exclusivos:

std::weak_ptr< UpdateLock > m_exclusiveLock;
std::list< std::weak_ptr< UpdateLock > > m_locks;

Há uma função para ativar o bloqueio e outra função para verificar se o host está bloqueado:

UpdateLock::ptr LockUpdate( bool exclusive );       
bool IsUpdateLocked( bool exclusive ) const;

Testo bloqueios em LockUpdate, IsUpdateLocked e periodicamente na rotina de atualização do host. Testar um bloqueio é tão simples quanto verificar se o fraco_ptr expirou e remover qualquer um que expirou da lista m_locks (eu só faço isso durante a atualização do host); posso verificar se a lista está vazia; ao mesmo tempo, recebo o desbloqueio automático quando um cliente redefine o shared_ptr no qual está pendurado, o que também acontece quando um cliente é destruído automaticamente.

O efeito geral é que, como os clientes raramente precisam de exclusividade (normalmente reservados apenas para adições e exclusões), na maioria das vezes uma solicitação para LockUpdate (false), ou seja, não exclusiva, é bem-sucedida desde que (! M_exclusiveLock). E um LockUpdate (true), um pedido de exclusividade, é bem-sucedido apenas quando ambos (! M_exclusiveLock) e (m_locks.empty ()).

Uma fila pode ser adicionada para atenuar entre bloqueios exclusivos e não exclusivos; no entanto, até o momento não tive colisões, então pretendo esperar até que isso aconteça para adicionar a solução (principalmente para que eu tenha uma condição de teste no mundo real).

Até agora, isso está funcionando bem para minhas necessidades; Eu posso imaginar a necessidade de expandir isso, e alguns problemas que podem surgir sobre o uso expandido, no entanto, isso foi rápido de implementar e exigiu muito pouco código personalizado.

Kit10
fonte
-4

Caso alguém esteja interessado na versão atômica, aqui está a implementação. O desempenho é esperado melhor que a versão da variável mutex & condition.

class semaphore_atomic
{
public:
    void notify() {
        count_.fetch_add(1, std::memory_order_release);
    }

    void wait() {
        while (true) {
            int count = count_.load(std::memory_order_relaxed);
            if (count > 0) {
                if (count_.compare_exchange_weak(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
                    break;
                }
            }
        }
    }

    bool try_wait() {
        int count = count_.load(std::memory_order_relaxed);
        if (count > 0) {
            if (count_.compare_exchange_strong(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
                return true;
            }
        }
        return false;
    }
private:
    std::atomic_int count_{0};
};
Jeffery
fonte
4
Eu esperaria que o desempenho fosse muito pior. Esse código comete quase literalmente todos os erros possíveis. Como apenas o exemplo mais óbvio, suponha que o waitcódigo tenha que repetir várias vezes. Quando finalmente for desbloqueado, será necessário a mãe de todos os ramos imprevisíveis, pois a previsão de loop da CPU certamente preverá que ele fará um loop novamente. Eu poderia listar muitos outros problemas com esse código.
David Schwartz
1
Aqui está outro óbvio problema de desempenho: o waitloop consumirá recursos de microexecução da CPU à medida que gira. Suponha que ele esteja no mesmo núcleo físico do encadeamento que deveria notify- ele diminuirá muito o encadeamento.
David Schwartz
1
E aqui está apenas mais uma: Nas CPUs x86 (as CPUs mais populares atualmente), uma operação compare_exchange_weak é sempre uma operação de gravação, mesmo se falhar (ela grava de volta o mesmo valor que lê se a comparação falhar). Então, suponha que dois núcleos estejam ambos em um waitloop para o mesmo semáforo. Ambos estão gravando a toda velocidade na mesma linha de cache, o que pode reduzir a velocidade de outros núcleos saturando barramentos entre núcleos.
David Schwartz
@DavidSchwartz Fico feliz em ver seus comentários. Não tenho certeza de entender a parte '... previsão de loop da CPU ...'. Acordou o 2º. Aparentemente, seu terceiro caso pode acontecer, mas compare com o mutex, que faz com que o modo de usuário entre o modo de kernel e a chamada do sistema, a sincronização entre núcleos não seja pior.
Jeffery
1
Não existe um semáforo sem bloqueio. A idéia de estar livre de bloqueios não é escrever código sem usar mutexes, mas escrever código onde um encadeamento nunca bloqueia. Nesse caso, a essência do semáforo é bloquear threads que chamam a função wait ()!
Carlo Wood