Нет ничего сложного в том, чтобы отправить потоку асинхронное сообщение длинной в пару байт. Гораздо интереснее проделать этот же трюк с неограниченным объёмом информации. Итак, цели прежние: организовать асинхронное взаимодействие между потоками с минимальным количеством блокировок. Лишь немного изменились условия: теперь сообщение – это строка произвольной длинны.
Для начала нужно понять, чем же новые условия усложняют реализацию. На первый взгляд может показаться, что нет никакой разницы: просто вместо числа мы должны передавать функции QueueUserAPC адрес отправляемой строки. Так? – Нет! Задумаемся на секунду над вопросом: кто и когда будет выделять и освобождать память для хранения передаваемых данных? Для успешной реализации нужно решить сразу несколько проблем, которые никак напрямую не связаны с параллельным программированием и асинхронным взаимодействием. Рассмотрим каждую из них.
Три проблемы
Первая проблема, с которой мы сталкиваемся – это выделение памяти. Совершенно очевидно, что мы не можем передать адрес переменной из локального контекста: как только мы выйдем за границы видимости, переменная будет уничтожена и память будет освобождена. Следовательно, прежде чем передать потоку адрес строки с помощью функции QueueUserAPC, мы должны динамически выделить буфер памяти и скопировать туда передаваемые данные целиком. Адрес именно этого буфера и отправится в виде асинхронного сообщения.
Вторая проблема напрямую следует из первой. Раз память была выделена – её нужно освободить. Вопрос лишь в том, кто и когда это сделает. Самое прямое и простое решение – это поручить освобождение памяти принимающему потоку. Оно же и самое не правильное. Принимающий поток может быть уничтожен извне или аварийно завершиться при обработке предыдущего сообщения. В этом случае все сообщения, посланные функцией QueueUserAPC, теряются. Вместе с ними теряются и адреса выделенных блоков памяти, что приводит к невозможности их освобождения. Есть и ещё одна причина, о которой я расскажу в самом конце.
Наиболее правильное решение выглядит примерно так:
Ну и последняя проблема – чисто техническая. Логично поместить реализацию обработки сообщений прямо в теле принимающего потока, а функцию, отвечающую непосредственно за их "получение" реализовать как обработчик APC. В этом случае мы убиваем сразу 2-ух зайцев:
А теперь слайды
Сначала приведу некоторые глобальные определения и код функций producer и consumer. Они практически ничем не отличаются от аналогичных функций из предыдущей заметки. Все отличия сводятся лишь к типу сообщения (в данном случае – это просто std::string) и небольшому коду, инициализирующему очередь сообщений в TLS:
Всё самое интересное сосредоточено в 3-ёх достаточно простых функциях: sendMessage (вызывается из кода функции producer для отправки сообщений), receiveMessage (вызывается в потоке функции consumer для получения очередного сообщения) и receiver (обработчик APC, выполняющий "приём" сообщения и его копирование в очередь сообщений потока в TLS).
Начнём с функции отправки сообщений – sendMessage. Эта функция принимает 3 аргумента: описатель потока-получателя, список ранее отправленных сообщений и новое сообщение для отправки. Первым делом она пробегает по списку отправленных сообщений и удаляет те, которые были успешно "приняты". После чего формирует новое сообщение, сохраняет в список отправленных и отправляет потоку-получателю APC, передав в качестве параметра указатель на новое сообщение:
При вызове функции QueueUserAPC в качестве обработчика APC указывается функция receiver. Она копирует полученное сообщение в очередь, указатель на которую поток-consumer разместил в TLS при старте, и сбрасывает флаг notHandled в 0, сообщив тем самым отправителю, что сообщение принято и может быть очищено:
И наконец функция, извлекающая очередное сообщение из очереди и передающая его получателю. Надо отметить, что она самая тривиальная из этой троицы: извлекает очередное сообщение из очереди, которая передаётся ей в качестве аргумента, и возвращает его. Если же очередь пуста, то уходит в тревожное ожидание на событии завершения:
Ты не видишь суслика, а он есть!
К сожалению, в этом коротком и простом примере я не смог устранить все лишние блокировки. И хотя они не заметны невооружённым взглядом, на самом деле они присутствуют.
Для упрощения понимания и экономии своего времени я повсеместно использую стандартную библиотеку, которая в свою очередь активно использует операторы и функции распределения памяти С++ и CRT (new/delete и malloc/free). Эти вызовы в конечном итоге приводят к обращениям к той или иной куче (heap). Причём все потоки одного процесса будут общаться с одной и той же кучей. Вот здесь и возникают те самые скрытые блокировки. Компания Microsoft утверждает, что до минимума сведено как само количество блокировок при обращении к куче, так и время, на которое эти блокировки срабатывают. Но полностью устранить их не удалось.
При решении большинства обычных задач, вы никогда не столкнётесь с падением производительности, вызванным блокировками в куче Windows. Но в случае высоконагруженных систем с ними придётся считаться. В этом случае необходимо полностью отказаться от использования стандартных механизмов распределения памяти C++ в потоках-получателях (кстати, концепция аллокаторов стандартной библиотеки позволяет добиться этого наименьшей кровью).
Выше я упоминал об ещё одной причине, почему нельзя поручать потоку-получателю освобождение памяти под сообщением. Как не сложно догадаться, этой причиной как раз и являются скрытые блокировки при выделении и освобождении памяти в куче. Если вы будете выделять память в одном потоке, а освобождать в другом, то когда вы упрётесь в проблемы производительности, переделать такой код будет гораздо сложнее.
Для начала нужно понять, чем же новые условия усложняют реализацию. На первый взгляд может показаться, что нет никакой разницы: просто вместо числа мы должны передавать функции QueueUserAPC адрес отправляемой строки. Так? – Нет! Задумаемся на секунду над вопросом: кто и когда будет выделять и освобождать память для хранения передаваемых данных? Для успешной реализации нужно решить сразу несколько проблем, которые никак напрямую не связаны с параллельным программированием и асинхронным взаимодействием. Рассмотрим каждую из них.
Три проблемы
Первая проблема, с которой мы сталкиваемся – это выделение памяти. Совершенно очевидно, что мы не можем передать адрес переменной из локального контекста: как только мы выйдем за границы видимости, переменная будет уничтожена и память будет освобождена. Следовательно, прежде чем передать потоку адрес строки с помощью функции QueueUserAPC, мы должны динамически выделить буфер памяти и скопировать туда передаваемые данные целиком. Адрес именно этого буфера и отправится в виде асинхронного сообщения.
Вторая проблема напрямую следует из первой. Раз память была выделена – её нужно освободить. Вопрос лишь в том, кто и когда это сделает. Самое прямое и простое решение – это поручить освобождение памяти принимающему потоку. Оно же и самое не правильное. Принимающий поток может быть уничтожен извне или аварийно завершиться при обработке предыдущего сообщения. В этом случае все сообщения, посланные функцией QueueUserAPC, теряются. Вместе с ними теряются и адреса выделенных блоков памяти, что приводит к невозможности их освобождения. Есть и ещё одна причина, о которой я расскажу в самом конце.
Наиболее правильное решение выглядит примерно так:
- Передающая сторона сохраняет адрес каждой отправленной строки в динамическом списке вместе с флагом, являющимся признаком получения сообщения.
- Адрес структуры, сохранённой в списке и содержащей адрес строки и флаг, передаётся функции QueueUserAPC.
- Поток, получивший сообщение, копирует строку в свой собственный буфер для дальнейшей обработки и тут же изменяет значение флага, сигнализируя противоположной стороне о том, что сообщение получено.
- Передающая сторона периодически обходит список отправленных сообщений (например, при отправке очередного) и, сверяясь с флагом, очищает те, которые были успешно приняты.
- Передающая сторона может использовать список переданных сообщений для корректной очистки памяти при завершении приложения или для повторной отправки сообщений при аварийном завершении одного или нескольких потоков-обработчиков.
Ну и последняя проблема – чисто техническая. Логично поместить реализацию обработки сообщений прямо в теле принимающего потока, а функцию, отвечающую непосредственно за их "получение" реализовать как обработчик APC. В этом случае мы убиваем сразу 2-ух зайцев:
- передающая сторона будет избавлена от необходимости выбора значения 1-ого параметра при вызове функции QueueUserAPC;
- решена проблема вложенного вызова обработчиков APC, которая возникает, если текущий обработчик APC косвенно или открыто выполнит тревожное ожидание – в этом случае может произойти вызов следующего обработчика, ожидающего в очереди, что бывает фатально для не подготовленного к таким трюкам кода.
А теперь слайды
Сначала приведу некоторые глобальные определения и код функций producer и consumer. Они практически ничем не отличаются от аналогичных функций из предыдущей заметки. Все отличия сводятся лишь к типу сообщения (в данном случае – это просто std::string) и небольшому коду, инициализирующему очередь сообщений в TLS:
const DWORD ON_EXIT = 0xFFFFFFFF;
struct ControlEvents {
HANDLE onExit;
HANDLE onStart;
};
struct Message {
std::string message;
volatile LONG notHandled;
};
// declaration of variable in TLS
// this code is specific for MS VS
__declspec(thread)
volatile std::queue<std::string> *messagesQueuePtrTLS = 0;
unsigned int __stdcall
consumer(void *data)
{
ControlEvents events = *reinterpret_cast<ControlEvents*>(data);
// define messages queue
volatile std::queue<std::string> messagesQueue;
// and store pointer to the messages queue in TLS
messagesQueuePtrTLS = &messagesQueue;
std::cout << "[0x"
<< std::hex
<< std::setw(8)
<< std::setfill('0')
<< ::GetCurrentThreadId()
<< "] Consumer." << std::endl;
::SetEvent(events.onStart);
DWORD result = ERROR_SUCCESS;
std::string message;
while (ERROR_SUCCESS == (result = receiveMessage(
events.onExit, messagesQueue, message))) {
std::cout
<< "[0x"
<< std::hex
<< std::setw(8)
<< std::setfill('0')
<< ::GetCurrentThreadId()
<< "] Message: " << message << std::endl;
}
messagesQueuePtrTLS = 0;
return result == ON_EXIT ? ERROR_SUCCESS : result;
}
DWORD
producer()
{
typedef std::vector<std::string> TWorkMessages;
TWorkMessages works;
works.reserve(12);
works.push_back("stand up");
works.push_back("dress");
works.push_back("cook");
works.push_back("eat");
works.push_back("dish up");
works.push_back("wash");
works.push_back("close the door");
works.push_back("drive");
works.push_back("work");
works.push_back("work");
works.push_back("work");
works.push_back("work");
ControlEvents events = {0};
HANDLE workerThreadH = 0;
DWORD ret = ERROR_SUCCESS;
try
{
events.onExit = ::CreateEvent(0, true, false, 0);
if (0 == events.onExit) {
throw ::GetLastError();
}
events.onStart = ::CreateEvent(0, true, false, 0);
if (0 == events.onStart) {
throw ::GetLastError();
}
std::cout << "[0x"
<< std::hex
<< std::setw(8)
<< std::setfill('0')
<< ::GetCurrentThreadId()
<< "] Producer." << std::endl;
workerThreadH = reinterpret_cast<HANDLE>(
_beginthreadex(0, 0, &consumer, &events, 0, 0));
if (0 == workerThreadH) {
throw ::GetLastError();
}
if (WAIT_OBJECT_0 != ::WaitForSingleObject(
events.onStart, INFINITE)) {
throw ::GetLastError();
}
std::list<Message> sentMessages;
for(TWorkMessages::const_iterator it = works.begin();
works.end() != it; ++it)
{
DWORD res = sendMessage(
workerThreadH, *it, sentMessages);
if (ERROR_SUCCESS != res) {
throw res;
}
}
// wait a bit
::Sleep(1000);
} catch (DWORD errCode) {
ret = errCode;
}
if (0 != events.onExit) {
if (0 != workerThreadH) {
::SetEvent(events.onExit);
if (WAIT_OBJECT_0 != ::WaitForSingleObject(
workerThreadH, 5000)) {
::TerminateThread(workerThreadH, 1);
}
::CloseHandle(workerThreadH);
}
::CloseHandle(events.onExit);
}
if (0 != events.onStart) {
::CloseHandle(events.onStart);
}
return ret;
}
struct ControlEvents {
HANDLE onExit;
HANDLE onStart;
};
struct Message {
std::string message;
volatile LONG notHandled;
};
// declaration of variable in TLS
// this code is specific for MS VS
__declspec(thread)
volatile std::queue<std::string> *messagesQueuePtrTLS = 0;
unsigned int __stdcall
consumer(void *data)
{
ControlEvents events = *reinterpret_cast<ControlEvents*>(data);
// define messages queue
volatile std::queue<std::string> messagesQueue;
// and store pointer to the messages queue in TLS
messagesQueuePtrTLS = &messagesQueue;
std::cout << "[0x"
<< std::hex
<< std::setw(8)
<< std::setfill('0')
<< ::GetCurrentThreadId()
<< "] Consumer." << std::endl;
::SetEvent(events.onStart);
DWORD result = ERROR_SUCCESS;
std::string message;
while (ERROR_SUCCESS == (result = receiveMessage(
events.onExit, messagesQueue, message))) {
std::cout
<< "[0x"
<< std::hex
<< std::setw(8)
<< std::setfill('0')
<< ::GetCurrentThreadId()
<< "] Message: " << message << std::endl;
}
messagesQueuePtrTLS = 0;
return result == ON_EXIT ? ERROR_SUCCESS : result;
}
DWORD
producer()
{
typedef std::vector<std::string> TWorkMessages;
TWorkMessages works;
works.reserve(12);
works.push_back("stand up");
works.push_back("dress");
works.push_back("cook");
works.push_back("eat");
works.push_back("dish up");
works.push_back("wash");
works.push_back("close the door");
works.push_back("drive");
works.push_back("work");
works.push_back("work");
works.push_back("work");
works.push_back("work");
ControlEvents events = {0};
HANDLE workerThreadH = 0;
DWORD ret = ERROR_SUCCESS;
try
{
events.onExit = ::CreateEvent(0, true, false, 0);
if (0 == events.onExit) {
throw ::GetLastError();
}
events.onStart = ::CreateEvent(0, true, false, 0);
if (0 == events.onStart) {
throw ::GetLastError();
}
std::cout << "[0x"
<< std::hex
<< std::setw(8)
<< std::setfill('0')
<< ::GetCurrentThreadId()
<< "] Producer." << std::endl;
workerThreadH = reinterpret_cast<HANDLE>(
_beginthreadex(0, 0, &consumer, &events, 0, 0));
if (0 == workerThreadH) {
throw ::GetLastError();
}
if (WAIT_OBJECT_0 != ::WaitForSingleObject(
events.onStart, INFINITE)) {
throw ::GetLastError();
}
std::list<Message> sentMessages;
for(TWorkMessages::const_iterator it = works.begin();
works.end() != it; ++it)
{
DWORD res = sendMessage(
workerThreadH, *it, sentMessages);
if (ERROR_SUCCESS != res) {
throw res;
}
}
// wait a bit
::Sleep(1000);
} catch (DWORD errCode) {
ret = errCode;
}
if (0 != events.onExit) {
if (0 != workerThreadH) {
::SetEvent(events.onExit);
if (WAIT_OBJECT_0 != ::WaitForSingleObject(
workerThreadH, 5000)) {
::TerminateThread(workerThreadH, 1);
}
::CloseHandle(workerThreadH);
}
::CloseHandle(events.onExit);
}
if (0 != events.onStart) {
::CloseHandle(events.onStart);
}
return ret;
}
Всё самое интересное сосредоточено в 3-ёх достаточно простых функциях: sendMessage (вызывается из кода функции producer для отправки сообщений), receiveMessage (вызывается в потоке функции consumer для получения очередного сообщения) и receiver (обработчик APC, выполняющий "приём" сообщения и его копирование в очередь сообщений потока в TLS).
Начнём с функции отправки сообщений – sendMessage. Эта функция принимает 3 аргумента: описатель потока-получателя, список ранее отправленных сообщений и новое сообщение для отправки. Первым делом она пробегает по списку отправленных сообщений и удаляет те, которые были успешно "приняты". После чего формирует новое сообщение, сохраняет в список отправленных и отправляет потоку-получателю APC, передав в качестве параметра указатель на новое сообщение:
DWORD
sendMessage(
HANDLE target,
const std::string &message,
std::list<Message> &sentMessages)
{
// iterate over alls sent messages
for(std::list<Message>::iterator it = sentMessages.begin();
sentMessages.end() != it; /* do nothing */) {
// check if message is already handled
if (InterlockedExchangePointer(&(it->notHandled),
it->notHandled) == 0) {
// message was already handled - delete it
it = sentMessages.erase(it);
} else {
++it;
}
}
// prepare new message
sentMessages.push_back(Message());
sentMessages.back().message = message;
sentMessages.back().notHandled = 1;
// place memory barier to prevent a re-ordering of read
// and write operations; this, for example, flushes CPU
// caches (see details here and here); we need to do this
// to guarantee that the write operations which update the
// message's fields will be completed before the write
// operation which passes the message's address into
// the consumer thread by means of QueueUserAPC call,
// the point is the documentation does not specify whether
// QueueUserAPC function places memory barier or not
MemoryBarrier();
// and send message
if (::QueueUserAPC(&receiver, target, reinterpret_cast<
ULONG_PTR>(&sentMessages.back())) == 0) {
// no-zero result code means error
return ::GetLastError();
}
return ERROR_SUCCESS;
}
sendMessage(
HANDLE target,
const std::string &message,
std::list<Message> &sentMessages)
{
// iterate over alls sent messages
for(std::list<Message>::iterator it = sentMessages.begin();
sentMessages.end() != it; /* do nothing */) {
// check if message is already handled
if (InterlockedExchangePointer(&(it->notHandled),
it->notHandled) == 0) {
// message was already handled - delete it
it = sentMessages.erase(it);
} else {
++it;
}
}
// prepare new message
sentMessages.push_back(Message());
sentMessages.back().message = message;
sentMessages.back().notHandled = 1;
// place memory barier to prevent a re-ordering of read
// and write operations; this, for example, flushes CPU
// caches (see details here and here); we need to do this
// to guarantee that the write operations which update the
// message's fields will be completed before the write
// operation which passes the message's address into
// the consumer thread by means of QueueUserAPC call,
// the point is the documentation does not specify whether
// QueueUserAPC function places memory barier or not
MemoryBarrier();
// and send message
if (::QueueUserAPC(&receiver, target, reinterpret_cast<
ULONG_PTR>(&sentMessages.back())) == 0) {
// no-zero result code means error
return ::GetLastError();
}
return ERROR_SUCCESS;
}
При вызове функции QueueUserAPC в качестве обработчика APC указывается функция receiver. Она копирует полученное сообщение в очередь, указатель на которую поток-consumer разместил в TLS при старте, и сбрасывает флаг notHandled в 0, сообщив тем самым отправителю, что сообщение принято и может быть очищено:
void __stdcall receiver(ULONG_PTR data)
{
Message *message = reinterpret_cast<Message*>(data);
// copy the received message into TLS
const_cast<std::queue<std::string>*>(
messagesQueuePtrTLS)->push(message->message);
// and mark it as handled
InterlockedExchangePointer(&(message->notHandled), 0);
}
{
Message *message = reinterpret_cast<Message*>(data);
// copy the received message into TLS
const_cast<std::queue<std::string>*>(
messagesQueuePtrTLS)->push(message->message);
// and mark it as handled
InterlockedExchangePointer(&(message->notHandled), 0);
}
И наконец функция, извлекающая очередное сообщение из очереди и передающая его получателю. Надо отметить, что она самая тривиальная из этой троицы: извлекает очередное сообщение из очереди, которая передаётся ей в качестве аргумента, и возвращает его. Если же очередь пуста, то уходит в тревожное ожидание на событии завершения:
DWORD
receiveMessage(
HANDLE onExit,
volatile std::queue<std::string> &messagesQueue,
std::string &message)
{
if (const_cast<std::queue<std::string>&>(
messagesQueue).empty()) {
const DWORD waitRes = ::WaitForSingleObjectEx(
onExit, INFINITE, true);
if (WAIT_OBJECT_0 == waitRes) {
return ON_EXIT;
}
if (WAIT_IO_COMPLETION != waitRes) {
return ::GetLastError();
}
}
message = const_cast<std::queue<std::string>&>(
messagesQueue).front();
const_cast<std::queue<std::string>&>(
messagesQueue).pop();
return ERROR_SUCCESS;
}
receiveMessage(
HANDLE onExit,
volatile std::queue<std::string> &messagesQueue,
std::string &message)
{
if (const_cast<std::queue<std::string>&>(
messagesQueue).empty()) {
const DWORD waitRes = ::WaitForSingleObjectEx(
onExit, INFINITE, true);
if (WAIT_OBJECT_0 == waitRes) {
return ON_EXIT;
}
if (WAIT_IO_COMPLETION != waitRes) {
return ::GetLastError();
}
}
message = const_cast<std::queue<std::string>&>(
messagesQueue).front();
const_cast<std::queue<std::string>&>(
messagesQueue).pop();
return ERROR_SUCCESS;
}
Ты не видишь суслика, а он есть!
К сожалению, в этом коротком и простом примере я не смог устранить все лишние блокировки. И хотя они не заметны невооружённым взглядом, на самом деле они присутствуют.
Для упрощения понимания и экономии своего времени я повсеместно использую стандартную библиотеку, которая в свою очередь активно использует операторы и функции распределения памяти С++ и CRT (new/delete и malloc/free). Эти вызовы в конечном итоге приводят к обращениям к той или иной куче (heap). Причём все потоки одного процесса будут общаться с одной и той же кучей. Вот здесь и возникают те самые скрытые блокировки. Компания Microsoft утверждает, что до минимума сведено как само количество блокировок при обращении к куче, так и время, на которое эти блокировки срабатывают. Но полностью устранить их не удалось.
При решении большинства обычных задач, вы никогда не столкнётесь с падением производительности, вызванным блокировками в куче Windows. Но в случае высоконагруженных систем с ними придётся считаться. В этом случае необходимо полностью отказаться от использования стандартных механизмов распределения памяти C++ в потоках-получателях (кстати, концепция аллокаторов стандартной библиотеки позволяет добиться этого наименьшей кровью).
Выше я упоминал об ещё одной причине, почему нельзя поручать потоку-получателю освобождение памяти под сообщением. Как не сложно догадаться, этой причиной как раз и являются скрытые блокировки при выделении и освобождении памяти в куче. Если вы будете выделять память в одном потоке, а освобождать в другом, то когда вы упрётесь в проблемы производительности, переделать такой код будет гораздо сложнее.
Комментариев нет:
Отправить комментарий