Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Este tópico descreve como implementar o padrão produtor-consumidor em seu aplicativo. Nesse padrão, o produtor envia mensagens para um bloco de mensagens e o consumidor lê mensagens desse bloco.
O tópico demonstra dois cenários. No primeiro cenário, o consumidor deve receber cada mensagem que o produtor envia. No segundo cenário, o consumidor pesquisa dados periodicamente e, portanto, não precisa receber cada mensagem.
Ambos os exemplos neste tópico usam agentes, blocos de mensagens e funções de passagem de mensagens para transmitir mensagens do produtor para o consumidor. O agente produtor usa a função concurrency::send para gravar mensagens em um objeto concurrency::ITarget . O agente do consumidor usa a função concurrency::receive para ler mensagens de um objeto concurrency::ISource . Ambos os agentes possuem um valor sentinela para coordenar o fim do processamento.
Para obter mais informações sobre agentes assíncronos, consulte Agentes assíncronos. Para obter mais informações sobre blocos de mensagens e funções de passagem de mensagens, consulte Blocos de mensagens assíncronas e Funções de passagem de mensagens.
Exemplo: Enviar séries de números para o agente do consumidor
Neste exemplo, o agente produtor envia uma série de números para o agente do consumidor. O consumidor recebe cada um destes números e calcula a sua média. O aplicativo grava a média no console.
Este exemplo usa um objeto concurrency::unbounded_buffer para permitir que o produtor enfileire mensagens. A unbounded_buffer classe implementa ITarget e ISource para que o produtor e o consumidor possam enviar e receber mensagens de e para um buffer compartilhado. As send funções e receive coordenam a tarefa de propagar os dados do produtor para o consumidor.
// producer-consumer-average.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
explicit producer_agent(ITarget<int>& target, unsigned int count, int sentinel)
: _target(target)
, _count(count)
, _sentinel(sentinel)
{
}
protected:
void run()
{
// Send the value of each loop iteration to the target buffer.
while (_count > 0)
{
send(_target, static_cast<int>(_count));
--_count;
}
// Send the sentinel value.
send(_target, _sentinel);
// Set the agent to the finished state.
done();
}
private:
// The target buffer to write to.
ITarget<int>& _target;
// The number of values to send.
unsigned int _count;
// The sentinel value, which informs the consumer agent to stop processing.
int _sentinel;
};
// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
explicit consumer_agent(ISource<int>& source, int sentinel)
: _source(source)
, _sentinel(sentinel)
{
}
// Retrieves the average of all received values.
int average()
{
return receive(_average);
}
protected:
void run()
{
// The sum of all values.
int sum = 0;
// The count of values received.
int count = 0;
// Read from the source block until we receive the
// sentinel value.
int n;
while ((n = receive(_source)) != _sentinel)
{
sum += n;
++count;
}
// Write the average to the message buffer.
send(_average, sum / count);
// Set the agent to the finished state.
done();
}
private:
// The source buffer to read from.
ISource<int>& _source;
// The sentinel value, which informs the agent to stop processing.
int _sentinel;
// Holds the average of all received values.
single_assignment<int> _average;
};
int wmain()
{
// Informs the consumer agent to stop processing.
const int sentinel = 0;
// The number of values for the producer agent to send.
const unsigned int count = 100;
// A message buffer that is shared by the agents.
unbounded_buffer<int> buffer;
// Create and start the producer and consumer agents.
producer_agent producer(buffer, count, sentinel);
consumer_agent consumer(buffer, sentinel);
producer.start();
consumer.start();
// Wait for the agents to finish.
agent::wait(&producer);
agent::wait(&consumer);
// Print the average.
wcout << L"The average is " << consumer.average() << L'.' << endl;
}
Este exemplo produz o seguinte resultado.
The average is 50.
Exemplo: Enviar séries de cotações de ações para o agente consumidor
Neste exemplo, o agente produtor envia uma série de cotações de ações para o agente do consumidor. O agente do consumidor lê periodicamente a cotação atual e a imprime no console.
Este exemplo é semelhante ao anterior, exceto que ele usa um objeto concurrency::overwrite_buffer para permitir que o produtor compartilhe uma mensagem com o consumidor. Como no exemplo anterior, a overwrite_buffer classe implementa ITarget e ISource para que o produtor e o consumidor possam agir num buffer de mensagens partilhadas.
// producer-consumer-quotes.cpp
// compile with: /EHsc
#include <agents.h>
#include <array>
#include <algorithm>
#include <iostream>
using namespace concurrency;
using namespace std;
// Demonstrates a basic agent that produces values.
class producer_agent : public agent
{
public:
explicit producer_agent(ITarget<double>& target)
: _target(target)
{
}
protected:
void run()
{
// For illustration, create a predefined array of stock quotes.
// A real-world application would read these from an external source,
// such as a network connection or a database.
array<double, 6> quotes = { 24.44, 24.65, 24.99, 23.76, 22.30, 25.89 };
// Send each quote to the target buffer.
for_each (begin(quotes), end(quotes), [&] (double quote) {
send(_target, quote);
// Pause before sending the next quote.
concurrency::wait(20);
});
// Send a negative value to indicate the end of processing.
send(_target, -1.0);
// Set the agent to the finished state.
done();
}
private:
// The target buffer to write to.
ITarget<double>& _target;
};
// Demonstrates a basic agent that consumes values.
class consumer_agent : public agent
{
public:
explicit consumer_agent(ISource<double>& source)
: _source(source)
{
}
protected:
void run()
{
// Read quotes from the source buffer until we receive
// a negative value.
double quote;
while ((quote = receive(_source)) >= 0.0)
{
// Print the quote.
wcout.setf(ios::fixed);
wcout.precision(2);
wcout << L"Current quote is " << quote << L'.' << endl;
// Pause before reading the next quote.
concurrency::wait(10);
}
// Set the agent to the finished state.
done();
}
private:
// The source buffer to read from.
ISource<double>& _source;
};
int wmain()
{
// A message buffer that is shared by the agents.
overwrite_buffer<double> buffer;
// Create and start the producer and consumer agents.
producer_agent producer(buffer);
consumer_agent consumer(buffer);
producer.start();
consumer.start();
// Wait for the agents to finish.
agent::wait(&producer);
agent::wait(&consumer);
}
Este exemplo gera a seguinte saída.
Current quote is 24.44.
Current quote is 24.44.
Current quote is 24.65.
Current quote is 24.99.
Current quote is 23.76.
Current quote is 22.30.
Current quote is 25.89.
Ao contrário de um unbounded_buffer objeto, a receive função não remove a mensagem do overwrite_buffer objeto. Se o consumidor ler do buffer de mensagens mais de uma vez antes de o produtor substituir essa mensagem, o recetor obterá a mesma mensagem de cada vez.
Compilando o código
Copie o código de exemplo e cole-o em um projeto do Visual Studio ou cole-o em um arquivo chamado producer-consumer.cpp e, em seguida, execute o seguinte comando em uma janela do prompt de comando do Visual Studio.
cl.exe /EHsc producer-consumer.cpp
Ver também
Biblioteca de agentes assíncronos
Agentes assíncronos
Blocos de mensagens assíncronas
Funções de passagem de mensagens