Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Este documento descreve como fazer uso efetivo da Biblioteca de Agentes Assíncronos. A Biblioteca de Agentes promove um modelo de programação baseado em atores e passagem de mensagens dentro do processo para tarefas de fluxo de dados e pipelining de granularidade grossa.
Para obter mais informações sobre a Biblioteca de agentes, consulte Biblioteca de agentes assíncronos.
Secções
Este documento contém as seguintes secções:
Usar agentes para isolar o estado
A Biblioteca de Agentes fornece alternativas ao estado compartilhado, permitindo que você conecte componentes isolados por meio de um mecanismo assíncrono de passagem de mensagens. Os agentes assíncronos são mais eficazes quando isolam seu estado interno de outros componentes. Ao isolar o estado, vários componentes normalmente não atuam em dados compartilhados. O isolamento de estado pode permitir que seu aplicativo seja dimensionado porque reduz a contenção na memória compartilhada. O isolamento de estado também reduz a chance de deadlock e condições de corrida porque os componentes não precisam sincronizar o acesso aos dados compartilhados.
Normalmente, isola-se o estado em um agente mantendo os dados nas seções private ou protected da classe do agente e usando buffers de mensagens para comunicar alterações de estado. O exemplo a seguir mostra a classe basic_agent, que deriva de concurrency::agent. A basic_agent classe usa dois buffers de mensagens para se comunicar com componentes externos. Um buffer de mensagens contém mensagens recebidas; O outro buffer de mensagens contém mensagens enviadas.
// basic-agent.cpp
// compile with: /c /EHsc
#include <agents.h>
// An agent that uses message buffers to isolate state and communicate
// with other components.
class basic_agent : public concurrency::agent
{
public:
basic_agent(concurrency::unbounded_buffer<int>& input)
: _input(input)
{
}
// Retrieves the message buffer that holds output messages.
concurrency::unbounded_buffer<int>& output()
{
return _output;
}
protected:
void run()
{
while (true)
{
// Read from the input message buffer.
int value = concurrency::receive(_input);
// TODO: Do something with the value.
int result = value;
// Write the result to the output message buffer.
concurrency::send(_output, result);
}
done();
}
private:
// Holds incoming messages.
concurrency::unbounded_buffer<int>& _input;
// Holds outgoing messages.
concurrency::unbounded_buffer<int> _output;
};
Para obter exemplos completos sobre como definir e usar agentes, consulte Passo a passo: Criando um aplicativo Agent-Based e Passo a passo: Criando um agente de fluxo de dados.
[Topo]
Utilizar um mecanismo de controlo de fluxo para limitar o número de mensagens num pipeline de dados
Muitos tipos de buffer de mensagens, como concurrency::unbounded_buffer, podem conter um número ilimitado de mensagens. Quando um produtor de mensagens envia mensagens para um pipeline de dados mais rápido do que o consumidor pode processar essas mensagens, o aplicativo pode entrar em um estado de pouca memória ou falta de memória. Você pode usar um mecanismo de limitação, por exemplo, um semáforo, para limitar o número de mensagens que estão simultaneamente ativas em um pipeline de dados.
O exemplo básico a seguir demonstra como usar um semáforo para limitar o número de mensagens em um pipeline de dados. O pipeline de dados usa a função concurrency::wait para simular uma operação que leva pelo menos 100 milissegundos. Como o remetente produz mensagens mais rápido do que o consumidor pode processar essas mensagens, este exemplo define a semaphore classe para permitir que o aplicativo limite o número de mensagens ativas.
// message-throttling.cpp
// compile with: /EHsc
#include <windows.h> // for GetTickCount()
#include <atomic>
#include <agents.h>
#include <concrt.h>
#include <concurrent_queue.h>
#include <sstream>
#include <iostream>
using namespace concurrency;
using namespace std;
// A semaphore type that uses cooperative blocking semantics.
class semaphore
{
public:
explicit semaphore(long long capacity)
: _semaphore_count(capacity)
{
}
// Acquires access to the semaphore.
void acquire()
{
// The capacity of the semaphore is exceeded when the semaphore count
// falls below zero. When this happens, add the current context to the
// back of the wait queue and block the current context.
if (--_semaphore_count < 0)
{
_waiting_contexts.push(Context::CurrentContext());
Context::Block();
}
}
// Releases access to the semaphore.
void release()
{
// If the semaphore count is negative, unblock the first waiting context.
if (++_semaphore_count <= 0)
{
// A call to acquire might have decremented the counter, but has not
// yet finished adding the context to the queue.
// Create a spin loop that waits for the context to become available.
Context* waiting = NULL;
while (!_waiting_contexts.try_pop(waiting))
{
(Context::Yield)(); // <windows.h> defines Yield as a macro. The parenthesis around Yield prevent the macro expansion so that Context::Yield() is called.
}
// Unblock the context.
waiting->Unblock();
}
}
private:
// The semaphore count.
atomic<long long> _semaphore_count;
// A concurrency-safe queue of contexts that must wait to
// acquire the semaphore.
concurrent_queue<Context*> _waiting_contexts;
};
// A synchronization primitive that is signaled when its
// count reaches zero.
class countdown_event
{
public:
countdown_event(long long count)
: _current(count)
{
// Set the event if the initial count is zero.
if (_current == 0LL)
_event.set();
}
// Decrements the event counter.
void signal() {
if(--_current == 0LL) {
_event.set();
}
}
// Increments the event counter.
void add_count() {
if(++_current == 1LL) {
_event.reset();
}
}
// Blocks the current context until the event is set.
void wait() {
_event.wait();
}
private:
// The current count.
atomic<long long> _current;
// The event that is set when the counter reaches zero.
event _event;
// Disable copy constructor.
countdown_event(const countdown_event&);
// Disable assignment.
countdown_event const & operator=(countdown_event const&);
};
int wmain()
{
// The number of messages to send to the consumer.
const long long MessageCount = 5;
// The number of messages that can be active at the same time.
const long long ActiveMessages = 2;
// Used to compute the elapsed time.
DWORD start_time;
// Computes the elapsed time, rounded-down to the nearest
// 100 milliseconds.
auto elapsed = [&start_time] {
return (GetTickCount() - start_time)/100*100;
};
// Limits the number of active messages.
semaphore s(ActiveMessages);
// Enables the consumer message buffer to coordinate completion
// with the main application.
countdown_event e(MessageCount);
// Create a data pipeline that has three stages.
// The first stage of the pipeline prints a message.
transformer<int, int> print_message([&elapsed](int n) -> int {
wstringstream ss;
ss << elapsed() << L": received " << n << endl;
wcout << ss.str();
// Send the input to the next pipeline stage.
return n;
});
// The second stage of the pipeline simulates a
// time-consuming operation.
transformer<int, int> long_operation([](int n) -> int {
wait(100);
// Send the input to the next pipeline stage.
return n;
});
// The third stage of the pipeline releases the semaphore
// and signals to the main appliation that the message has
// been processed.
call<int> release_and_signal([&](int unused) {
// Enable the sender to send the next message.
s.release();
// Signal that the message has been processed.
e.signal();
});
// Connect the pipeline.
print_message.link_target(&long_operation);
long_operation.link_target(&release_and_signal);
// Send several messages to the pipeline.
start_time = GetTickCount();
for(auto i = 0; i < MessageCount; ++i)
{
// Acquire access to the semaphore.
s.acquire();
// Print the message to the console.
wstringstream ss;
ss << elapsed() << L": sending " << i << L"..." << endl;
wcout << ss.str();
// Send the message.
send(print_message, i);
}
// Wait for the consumer to process all messages.
e.wait();
}
/* Sample output:
0: sending 0...
0: received 0
0: sending 1...
0: received 1
100: sending 2...
100: received 2
200: sending 3...
200: received 3
300: sending 4...
300: received 4
*/
O semaphore objeto limita o pipeline a processar no máximo duas mensagens ao mesmo tempo.
Neste exemplo, o produtor envia relativamente poucas mensagens ao consumidor. Portanto, este exemplo não demonstra uma condição potencial de pouca memória ou falta de memória. No entanto, esse mecanismo é útil quando um pipeline de dados contém um número relativamente alto de mensagens.
Para obter mais informações sobre como criar a classe semaphore usada neste exemplo, consulte Como: Usar a classe de contexto para implementar um semaphore cooperativo.
[Topo]
Não executar Fine-Grained tarefas num pipeline de dados
A Biblioteca de Agentes é mais útil quando o trabalho executado por um pipeline de dados é relativamente menos detalhado. Por exemplo, um componente de aplicativo pode ler dados de um arquivo ou de uma conexão de rede e, ocasionalmente, enviar esses dados para outro componente. O protocolo que a Biblioteca de Agentes usa para propagar mensagens faz com que o mecanismo de passagem de mensagens tenha mais sobrecarga do que as construções paralelas de tarefas fornecidas pela Biblioteca de Padrões Paralelos (PPL). Portanto, certifique-se de que o trabalho executado por um pipeline de dados seja longo o suficiente para compensar essa sobrecarga.
Embora um pipeline de dados seja mais eficaz quando suas tarefas têm granularidade elevada, cada estágio do pipeline de dados pode usar estruturas da PPL, como grupos de tarefas e algoritmos paralelos, para executar um trabalho mais detalhado. Para obter um exemplo de uma rede de dados de grão grosso que usa paralelismo refinado em cada estágio de processamento, consulte Passo a passo: Criando uma rede Image-Processing.
[Topo]
Não passe cargas de mensagens grandes por valor
Em alguns casos, o tempo de execução cria uma cópia de cada mensagem que passa de um buffer de mensagens para outro buffer de mensagens. Por exemplo, a classe concurrency::overwrite_buffer oferece uma cópia de cada mensagem que recebe para cada um de seus destinos. O runtime também cria uma cópia dos dados da mensagem quando usa funções de passagem de mensagens, como concorrência::send e concurrency::receive, para gravar e ler mensagens de um buffer de mensagens. Embora esse mecanismo ajude a eliminar o risco de gravar simultaneamente em dados compartilhados, ele pode levar a um desempenho de memória ruim quando a carga útil da mensagem é relativamente grande.
Você pode usar ponteiros ou referências para melhorar o desempenho da memória ao passar mensagens com uma grande carga útil. O exemplo seguinte compara a passagem de grandes mensagens por valor com a passagem de ponteiros para o mesmo tipo de mensagem. O exemplo define dois tipos de agente, producer e consumer, que atuam em message_data objetos. O exemplo compara o tempo necessário para o produtor enviar vários message_data objetos ao consumidor com o tempo necessário para o agente produtor enviar vários ponteiros para message_data objetos ao consumidor.
// message-payloads.cpp
// compile with: /EHsc
#include <Windows.h>
#include <agents.h>
#include <iostream>
using namespace concurrency;
using namespace std;
// Calls the provided work function and returns the number of milliseconds
// that it takes to call that function.
template <class Function>
__int64 time_call(Function&& f)
{
__int64 begin = GetTickCount();
f();
return GetTickCount() - begin;
}
// A message structure that contains large payload data.
struct message_data
{
int id;
string source;
unsigned char binary_data[32768];
};
// A basic agent that produces values.
template <typename T>
class producer : public agent
{
public:
explicit producer(ITarget<T>& target, unsigned int message_count)
: _target(target)
, _message_count(message_count)
{
}
protected:
void run();
private:
// The target buffer to write to.
ITarget<T>& _target;
// The number of messages to send.
unsigned int _message_count;
};
// Template specialization for message_data.
template <>
void producer<message_data>::run()
{
// Send a number of messages to the target buffer.
while (_message_count > 0)
{
message_data message;
message.id = _message_count;
message.source = "Application";
send(_target, message);
--_message_count;
}
// Set the agent to the finished state.
done();
}
// Template specialization for message_data*.
template <>
void producer<message_data*>::run()
{
// Send a number of messages to the target buffer.
while (_message_count > 0)
{
message_data* message = new message_data;
message->id = _message_count;
message->source = "Application";
send(_target, message);
--_message_count;
}
// Set the agent to the finished state.
done();
}
// A basic agent that consumes values.
template <typename T>
class consumer : public agent
{
public:
explicit consumer(ISource<T>& source, unsigned int message_count)
: _source(source)
, _message_count(message_count)
{
}
protected:
void run();
private:
// The source buffer to read from.
ISource<T>& _source;
// The number of messages to receive.
unsigned int _message_count;
};
// Template specialization for message_data.
template <>
void consumer<message_data>::run()
{
// Receive a number of messages from the source buffer.
while (_message_count > 0)
{
message_data message = receive(_source);
--_message_count;
// TODO: Do something with the message.
// ...
}
// Set the agent to the finished state.
done();
}
template <>
void consumer<message_data*>::run()
{
// Receive a number of messages from the source buffer.
while (_message_count > 0)
{
message_data* message = receive(_source);
--_message_count;
// TODO: Do something with the message.
// ...
// Release the memory for the message.
delete message;
}
// Set the agent to the finished state.
done();
}
int wmain()
{
// The number of values for the producer agent to send.
const unsigned int count = 10000;
__int64 elapsed;
// Run the producer and consumer agents.
// This version uses message_data as the message payload type.
wcout << L"Using message_data..." << endl;
elapsed = time_call([count] {
// A message buffer that is shared by the agents.
unbounded_buffer<message_data> buffer;
// Create and start the producer and consumer agents.
producer<message_data> prod(buffer, count);
consumer<message_data> cons(buffer, count);
prod.start();
cons.start();
// Wait for the agents to finish.
agent::wait(&prod);
agent::wait(&cons);
});
wcout << L"took " << elapsed << L"ms." << endl;
// Run the producer and consumer agents a second time.
// This version uses message_data* as the message payload type.
wcout << L"Using message_data*..." << endl;
elapsed = time_call([count] {
// A message buffer that is shared by the agents.
unbounded_buffer<message_data*> buffer;
// Create and start the producer and consumer agents.
producer<message_data*> prod(buffer, count);
consumer<message_data*> cons(buffer, count);
prod.start();
cons.start();
// Wait for the agents to finish.
agent::wait(&prod);
agent::wait(&cons);
});
wcout << L"took " << elapsed << L"ms." << endl;
}
Este exemplo produz a seguinte saída:
Using message_data...
took 437ms.
Using message_data*...
took 47ms.
A versão que usa ponteiros tem um desempenho melhor porque elimina a necessidade de a execução em tempo real criar uma cópia completa de cada objeto message_data que se transmite entre o produtor e o consumidor.
[Topo]
Usar shared_ptr em uma rede de dados quando a propriedade estiver indefinida
Ao enviar mensagens por ponteiro através de uma rede ou canal de passagem de mensagens, normalmente aloca-se a memória para cada mensagem no início da rede e liberta-se essa memória no final da rede. Embora este mecanismo funcione frequentemente bem, há casos em que é difícil ou impossível utilizá-lo. Por exemplo, considere o caso em que a rede de dados contém vários nós finais. Nesse caso, não há um local claro para liberar a memória para as mensagens.
Para resolver esse problema, você pode usar um mecanismo, por exemplo, std::shared_ptr, que permite que um ponteiro seja propriedade de vários componentes. Quando o objeto final shared_ptr que possui um recurso é destruído, o recurso também é liberado.
O exemplo a seguir demonstra como usar shared_ptr para compartilhar valores de ponteiro entre vários buffers de mensagens. O exemplo conecta um objeto concurrency::overwrite_buffer a três objetos concurrency::call . A overwrite_buffer classe oferece mensagens para cada um de seus alvos. Como há vários proprietários dos dados no final da rede de dados, este exemplo usa shared_ptr para permitir que cada call objeto compartilhe a propriedade das mensagens.
// message-sharing.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>
#include <sstream>
using namespace concurrency;
using namespace std;
// A type that holds a resource.
class resource
{
public:
resource(int id) : _id(id)
{
wcout << L"Creating resource " << _id << L"..." << endl;
}
~resource()
{
wcout << L"Destroying resource " << _id << L"..." << endl;
}
// Retrieves the identifier for the resource.
int id() const { return _id; }
// TODO: Add additional members here.
private:
// An identifier for the resource.
int _id;
// TODO: Add additional members here.
};
int wmain()
{
// A message buffer that sends messages to each of its targets.
overwrite_buffer<shared_ptr<resource>> input;
// Create three call objects that each receive resource objects
// from the input message buffer.
call<shared_ptr<resource>> receiver1(
[](shared_ptr<resource> res) {
wstringstream ss;
ss << L"receiver1: received resource " << res->id() << endl;
wcout << ss.str();
},
[](shared_ptr<resource> res) {
return res != nullptr;
}
);
call<shared_ptr<resource>> receiver2(
[](shared_ptr<resource> res) {
wstringstream ss;
ss << L"receiver2: received resource " << res->id() << endl;
wcout << ss.str();
},
[](shared_ptr<resource> res) {
return res != nullptr;
}
);
event e;
call<shared_ptr<resource>> receiver3(
[&e](shared_ptr<resource> res) {
e.set();
},
[](shared_ptr<resource> res) {
return res == nullptr;
}
);
// Connect the call objects to the input message buffer.
input.link_target(&receiver1);
input.link_target(&receiver2);
input.link_target(&receiver3);
// Send a few messages through the network.
send(input, make_shared<resource>(42));
send(input, make_shared<resource>(64));
send(input, shared_ptr<resource>(nullptr));
// Wait for the receiver that accepts the nullptr value to
// receive its message.
e.wait();
}
Este exemplo produz a seguinte saída:
Creating resource 42...
receiver1: received resource 42
Creating resource 64...
receiver2: received resource 42
receiver1: received resource 64
Destroying resource 42...
receiver2: received resource 64
Destroying resource 64...
Ver também
Práticas recomendadas de tempo de execução de simultaneidade
Biblioteca de agentes assíncronos
Passo a passo: Criando um aplicativo Agent-Based
Passo a passo: Criando um agente de fluxo de dados
Passo a passo: Criando uma rede Image-Processing
Práticas recomendadas na biblioteca de padrões paralelos
Boas práticas gerais no runtime de simultaneidade