Fico confuso quando boost :: asio :: io_service executar bloqueia / desbloqueia o método

91

Sendo um iniciante total no Boost.Asio, estou confuso io_service::run(). Eu apreciaria se alguém pudesse me explicar quando esse método bloqueia / desbloqueia. A documentação afirma:

A run()função bloqueia até que todo o trabalho seja concluído e não haja mais manipuladores a serem despachados, ou até io_serviceque seja interrompido.

Vários threads podem chamar a run()função para configurar um pool de threads a partir do qual io_servicepodem executar manipuladores. Todos os threads que estão esperando no pool são equivalentes e o io_servicepode escolher qualquer um deles para invocar um manipulador.

Uma saída normal da run()função implica que o io_serviceobjeto está parado (a stopped()função retorna verdadeiro). As chamadas subseqüentes para run(), run_one(), poll()ou poll_one()retornará imediatamente a menos que haja uma chamada antes reset().

O que significa a seguinte declaração?

[...] não há mais manipuladores para serem despachados [...]


Ao tentar entender o comportamento de io_service::run(), me deparei com este exemplo (exemplo 3a). Dentro dele, observo que io_service->run()bloqueia e aguarda ordens de serviço.

// WorkerThread invines io_service->run()
void WorkerThread(boost::shared_ptr<boost::asio::io_service> io_service);
void CalculateFib(size_t);

boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<boost::asio::io_service::work> work(
   new boost::asio::io_service::work(*io_service));

// ...

boost::thread_group worker_threads;
for(int x = 0; x < 2; ++x)
{
  worker_threads.create_thread(boost::bind(&WorkerThread, io_service));
}

io_service->post( boost::bind(CalculateFib, 3));
io_service->post( boost::bind(CalculateFib, 4));
io_service->post( boost::bind(CalculateFib, 5));

work.reset();
worker_threads.join_all();

No entanto, no código a seguir em que eu estava trabalhando, o cliente se conecta usando TCP / IP e o método de execução bloqueia até que os dados sejam recebidos de forma assíncrona.

typedef boost::asio::ip::tcp tcp;
boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<tcp::socket> socket(new tcp::socket(*io_service));

// Connect to 127.0.0.1:9100.
tcp::resolver resolver(*io_service);
tcp::resolver::query query("127.0.0.1", 
                           boost::lexical_cast< std::string >(9100));
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
socket->connect(endpoint_iterator->endpoint());

// Just blocks here until a message is received.
socket->async_receive(boost::asio::buffer(buf_client, 3000), 0,
                      ClientReceiveEvent);
io_service->run();

// Write response.
boost::system::error_code ignored_error;
std::cout << "Sending message \n";
boost::asio::write(*socket, boost::asio::buffer("some data"), ignored_error);

Qualquer explicação run()que descreva seu comportamento nos dois exemplos abaixo seria bem-vinda.

MistyD
fonte

Respostas:

238

Fundação

Vamos começar com um exemplo simplificado e examinar as peças Boost.Asio relevantes:

void handle_async_receive(...) { ... }
void print() { ... }

...  

boost::asio::io_service io_service;
boost::asio::ip::tcp::socket socket(io_service);

...

io_service.post(&print);                             // 1
socket.connect(endpoint);                            // 2
socket.async_receive(buffer, &handle_async_receive); // 3
io_service.post(&print);                             // 4
io_service.run();                                    // 5

O que é um manipulador ?

Um manipulador nada mais é do que um retorno de chamada. No código de exemplo, existem 3 manipuladores:

  • O printmanipulador (1).
  • O handle_async_receivemanipulador (3).
  • O printmanipulador (4).

Mesmo que a mesma print()função seja usada duas vezes, cada uso é considerado para criar seu próprio manipulador identificável exclusivamente. Os manipuladores podem ter várias formas e tamanhos, desde funções básicas como as acima até construções mais complexas, como functores gerados de boost::bind()e lambdas. Independentemente da complexidade, o manipulador ainda permanece nada mais do que um retorno de chamada.

O que é trabalho ?

Trabalho é algum processamento que o Boost.Asio foi solicitado a fazer em nome do código do aplicativo. Às vezes, o Boost.Asio pode iniciar parte do trabalho assim que for informado sobre ele e, outras vezes, pode esperar para fazer o trabalho posteriormente. Depois de concluir o trabalho, Boost.Asio informará o aplicativo chamando o manipulador fornecido .

Boost.Asio garante que manipuladores só será executada dentro de um segmento que está chamando run(), run_one(), poll(), ou poll_one(). Esses são os threads que farão o trabalho e chamarão os manipuladores . Portanto, no exemplo acima, print()não é invocado quando é postado no io_service(1). Em vez disso, ele é adicionado ao io_servicee será chamado posteriormente. Nesse caso, dentro de io_service.run()(5).

O que são operações assíncronas?

Uma operação assíncrona cria trabalho e Boost.Asio invocará um manipulador para informar o aplicativo quando o trabalho for concluído. As operações assíncronas são criadas chamando uma função que tem um nome com o prefixo async_. Essas funções também são conhecidas como funções de inicialização .

As operações assíncronas podem ser decompostas em três etapas exclusivas:

  • Iniciar, ou informar, o associado io_serviceque funciona precisa ser feito. A async_receiveoperação (3) informa io_serviceque será necessário ler dados do soquete de forma assíncrona e async_receiveretorna imediatamente.
  • Fazendo o trabalho real. Neste caso, ao socketreceber dados, os bytes serão lidos e copiados para buffer. O trabalho real será feito em:
    • A função de inicialização (3), se Boost.Asio puder determinar que não bloqueará.
    • Quando o aplicativo executa explicitamente o io_service(5).
  • Invocando o handle_async_receive ReadHandler . Mais uma vez, os manipuladores são invocados apenas em threads que executam o io_service. Assim, independentemente de quando o trabalho é feito (3 ou 5), é garantido que handle_async_receive()só será invocado dentro de io_service.run()(5).

A separação no tempo e no espaço entre essas três etapas é conhecida como inversão de fluxo de controle. É uma das complexidades que tornam a programação assíncrona difícil. No entanto, existem técnicas que podem ajudar a mitigar isso, como o uso de corrotinas .

O que io_service.run()faz?

Quando um thread é chamado io_service.run(), o trabalho e os manipuladores são chamados de dentro desse thread. No exemplo acima, io_service.run()(5) irá bloquear até:

  • Ele foi chamado e retornou de ambos os printmanipuladores, a operação de recebimento foi concluída com sucesso ou falha e seu handle_async_receivemanipulador foi chamado e retornado.
  • O io_serviceé explicitamente parado por meio de io_service::stop().
  • Uma exceção é lançada de dentro de um manipulador.

Um fluxo potencial psuedo pode ser descrito como o seguinte:

criar io_service
criar soquete
adicionar manipulador de impressão a io_service (1)
aguarde o soquete conectar (2)
adicionar uma solicitação de trabalho de leitura assíncrona ao io_service (3)
adicionar manipulador de impressão a io_service (4)
execute o io_service (5)
  há trabalho ou manipuladores?
    sim, há 1 trabalho e 2 manipuladores
      o socket tem dados? não faça nada
      execute o gerenciador de impressão (1)
  há trabalho ou manipuladores?
    sim, há 1 trabalho e 1 manipulador
      o socket tem dados? não faça nada
      execute o manipulador de impressão (4)
  há trabalho ou manipuladores?
    sim, há 1 trabalho
      o socket tem dados? não, continue esperando
  - socket recebe dados -
      socket tem dados, leia no buffer
      adicionar manipulador handle_async_receive a io_service
  há trabalho ou manipuladores?
    sim, há 1 manipulador
      execute o manipulador handle_async_receive (3)
  há trabalho ou manipuladores?
    não, defina io_service como interrompido e retornar

Observe como, quando a leitura terminou, ela adicionou outro manipulador ao io_service. Esse detalhe sutil é um recurso importante da programação assíncrona. Ele permite que os manipuladores sejam encadeados. Por exemplo, se handle_async_receivenão obtiver todos os dados esperados, sua implementação poderá postar outra operação de leitura assíncrona, resultando em io_servicemais trabalho e, portanto, não retornando io_service.run().

Note que quando o io_servicetem correu para fora do trabalho, o aplicativo deve reset()a io_serviceantes de executá-lo novamente.


Pergunta de exemplo e código do exemplo 3a

Agora, vamos examinar as duas partes do código mencionadas na pergunta.

Código da Pergunta

socket->async_receiveadiciona trabalho ao io_service. Portanto, io_service->run()bloqueará até que a operação de leitura seja concluída com sucesso ou erro e ClientReceiveEventtenha concluído a execução ou gere uma exceção.

Exemplo 3a Código

Na esperança de tornar mais fácil de entender, aqui está um exemplo menor anotado 3a:

void CalculateFib(std::size_t n);

int main()
{
  boost::asio::io_service io_service;
  boost::optional<boost::asio::io_service::work> work =       // '. 1
      boost::in_place(boost::ref(io_service));                // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
  {                                                           //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
  }                                                           // -'

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  work = boost::none;                                         // 4
  worker_threads.join_all();                                  // 5
}

Em um alto nível, o programa criará 2 threads que processarão o io_serviceloop de eventos (2). Isso resulta em um pool de threads simples que calculará os números de Fibonacci (3).

A única diferença principal entre o código de pergunta e esse código é que ele invoca io_service::run()(2) antes que o trabalho real e os manipuladores sejam adicionados a io_service(3). Para evitar que o io_service::run()retorne imediatamente, um io_service::workobjeto é criado (1). Este objeto evita que o io_servicetrabalho fique sem trabalho; portanto, io_service::run()não retornará como resultado de nenhum trabalho.

O fluxo geral é o seguinte:

  1. Crie e adicione o io_service::workobjeto adicionado ao io_service.
  2. Conjunto de threads criado que invoca io_service::run(). Esses threads de trabalho não retornarão por io_servicecausa do io_service::workobjeto.
  3. Adicione 3 manipuladores que calculam os números de Fibonacci ao io_servicee retorne imediatamente. Os threads de trabalho, não o thread principal, podem começar a executar esses manipuladores imediatamente.
  4. Exclua o io_service::workobjeto.
  5. Aguarde o término da execução dos threads de trabalho. Isso só ocorrerá quando todos os 3 manipuladores concluírem a execução, pois io_servicenenhum deles tem manipuladores nem trabalho.

O código poderia ser escrito de forma diferente, da mesma maneira que o Código Original, onde manipuladores são adicionados ao io_servicee, em seguida, o io_serviceloop de evento é processado. Isso elimina a necessidade de uso io_service::worke resulta no seguinte código:

int main()
{
  boost::asio::io_service io_service;

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
  {                                                           //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
  }                                                           // -'
  worker_threads.join_all();                                  // 5
}

Síncrono vs. Assíncrono

Embora o código em questão esteja usando uma operação assíncrona, ele está efetivamente funcionando de forma síncrona, pois está aguardando a conclusão da operação assíncrona:

socket.async_receive(buffer, handler)
io_service.run();

é equivalente a:

boost::asio::error_code error;
std::size_t bytes_transferred = socket.receive(buffer, 0, error);
handler(error, bytes_transferred);

Como regra geral, tente evitar misturar operações síncronas e assíncronas. Muitas vezes, pode transformar um sistema complexo em um sistema complicado. Essa resposta destaca as vantagens da programação assíncrona, algumas das quais também são abordadas na documentação do Boost.Asio .

Tanner Sansbury
fonte
13
Postagem incrível. Gostaria de acrescentar apenas uma coisa porque sinto que não recebe atenção suficiente: depois que run () retornar, você precisa chamar reset () em seu io_service antes de executá-lo () novamente. Caso contrário, ele pode retornar instantaneamente, quer haja ou não operações async_ esperando ou não.
DeVadder
De onde vem o buffer? O que é isso?
ruipacheco
Eu ainda estou confuso. Se a mixagem for sincronizada e assíncrona não for recomendada, qual é o modo assíncrono puro? você pode dar um exemplo mostrando o código sem io_service.run () ;?
Splash
@Splash pode ser usado io_service.poll()para processar o loop de eventos sem bloquear as operações pendentes. A recomendação principal para evitar a mistura de operações síncronas e assíncronas é evitar adicionar complexidade desnecessária e evitar uma capacidade de resposta insatisfatória quando os manipuladores demoram muito para serem concluídos. Existem alguns casos em que é seguro, como quando se sabe que a operação síncrona não será bloqueada.
Tanner Sansbury
O que você quer dizer com "atualmente" em "Boost.Asio garante que os manipuladores serão executados apenas em um segmento que está chamando atualmenterun() ...." ? Se houver N threads (que foram chamados run()), qual é a thread "atual"? Pode haver muitos? Ou você quer dizer o fio que tiver terminado a execução do async_*()(digamos async_read), é garantido para chamar seus manipuladores bem?
Nawaz
19

Para simplificar como o que runfaz, pense nele como um funcionário que deve processar uma pilha de papel; pega uma folha, faz o que a folha manda, joga a folha fora e pega a próxima; quando ele fica sem folhas, sai do escritório. Em cada folha pode haver qualquer tipo de instrução, até mesmo adicionar uma nova folha à pilha. De volta ao asio: você pode dar um io_servicetrabalho de duas maneiras, essencialmente: usando post-o como no exemplo que você vinculou, ou usando outros objetos que chamam internamente posto io_service, como o sockete seus async_*métodos.

Loghorn
fonte