Приложение С. Каркас передачи сообщений и полный пример программы банкомата
В разделе 4.1 мы познакомились с каркасом передачи сообщений между потоками, продемонстрировав его на примере программы банкомата. В этом приложении приводится полный код примера, включая и код каркаса передачи сообщений.
В листинге С.1 показан код очереди сообщений. Сообщения хранятся в списке и представлены указателями на базовый класс. Сообщения конкретного типа обрабатываются шаблонным классом, производным от этого базового класса. В момент помещения сообщения в очередь конструируется подходящий экземпляр обертывающего класса и сохраняется указатель на него; операция извлечения возвращает именно этот указатель. Поскольку в классе message_base нет функций-членов, извлекающий поток должен привести указатель к нужному типу wrapped_message<T>, прежде чем сможет получить хранящееся сообщение.
Листинг С.1. Простая очередь сообщений
#include <mutex>
#include <condition_variable>
#include <queue>
#include <memory>
namespace messaging
{ │ Базовый класс
struct message_base {←┘ элементов очереди
virtual ~message_base() {}
};
template <typename Msg> │ Для каждого типа сообщений
struct wrapped_message:←┘ имеется специализация
message_base {
Msg contents;
explicit wrapped_message(Msg const& contents_):
contents(contents_) {}
};
│ Наша очередь
class queue←┘ сообщений
{ │ В настоящей
std::mutex m; │ очереди хранят-
std::condition_variable с; │ ся указатели на
std::queue<std::shared_ptr<message_base> > q;←┘ message_base
public:
template<typename T> │ Обернуть добав-
void push(T const& msg) │ ленное сообще-
{ │ ние и сохранить
std::lock_guard<std::mutex> lk(m);│ указатель
q.push( ←┘
std::make_shared<wrapped_message<T> >(msg));
с.notify_all();
}
std::shared_ptr<message_base> wait_and_pop()│ Блокирует до
{ │ появления в
std::unique_lock<std::mutex> lk(m); │ очереди хотя бы
c.wait(lk, [&]{ return !q.empty(); }); ←┘ одного элемента
auto res = q.front();
q.pop();
return res;
}
};
}
Отправкой сообщений занимается объект класса sender, показанного в листинге С.2. Это не более чем тонкая обертка вокруг очереди сообщений, которая позволяет только добавлять сообщения. При копировании экземпляров sender копируется только указатель на очередь, а не сама очередь.
Листинг С.2. Класс sender
namespace messaging {
class sender {│ sender обертывает указатель
queue* q; ←┘ на очередь
public: │ У сконструированного по умолчанию
sender() :←┘ sender'a нет очереди
q(nullptr) {}
│ Разрешаем конструирование
explicit sender(queue* q_):←┘ из указателя на очередь
q(q_) {}
template<typename Message>
void send(Message const& msg) {
if (q){ │ Отправка сообщения сводится
q->push(msg);←┘ к помещению его в очередь
}
}
};
}
Получение сообщений несколько сложнее. Мы не только должны дождаться появления сообщения в очереди, но еще и проверить, совпадает ли его тип с одним из известных нам типов, и вызвать соответствующий обработчик. Эта процедура начинается в классе receiver, показанном в листинге ниже.
Листинг С.3. Класс receiver
namespace messaging {
class receiver {
queue q; ← receiver владеет очередью
public: │ Разрешить неявное преобразование в объект
operator sender() {←┘ sender, ссылающийся на эту очередь
return sender(&q);
}
│ При обращении к функции ожидания
dispatcher wait() {←┘ очереди создается диспетчер
return dispatcher(&q);
}
};
}
Если sender только ссылается на очередь сообщений, то receiver ей владеет. Мы можем получить объект sender, ссылающийся на очередь, воспользовавшись неявным преобразованием. Процедура диспетчеризации сообщения начинается с обращения к функции wait(). При этом создается объект dispatcher, ссылающийся на очередь, которой владеет receiver. Класс dispatcher показан в следующем листинге; как видите, содержательная работа производится в его деструкторе. В данном случае работа состоит в ожидании сообщения и его диспетчеризации.
Листинг С.4. Класс dispatcher
namespace messaging {
class close_queue {}; ← Сообщение о закрытии очереди
class dispatcher {
queue* q; │ Экземпляры
bool chained; │ диспетчера нельзя
│ копировать
dispatcher(dispatcher const&)=delete;←┘
dispatcher& operator=(dispatcher const&)=delete;
template<
typename Dispatcher,│ Разрешить экземплярам
typename Msg, │ TemplateDispatcher доступ
typename Func> ←┘ к закрытым частям класса
friend class TemplateDispatcher;
void wait_and_dispatch()
{ (1) В цикле ждем и диспетчеризуем
for (;;) {←┘ сообщения
auto msg = q->wait_and_pop();
dispatch(msg);
}
} (2) dispatch() смотрит, не пришло ли
│ сообщение close_queue, и, если
bool dispatch (←┘ да, возбуждает исключение
std::shared_ptr<message_base> const& msg) {
if (dynamic_cast<wrapped_message<close_queue>*>(msg.get())) {
throw close_queue();
}
return false;
}
public: │ Экземпляры диспетчера
dispatcher(dispatcher&& other):←┘ можно перемещать
q(other.q), chained(other.chained) {│ Объект-источник не должен
other.chained = true; ←┘ ждать сообщений
}
explicit dispatcher(queue* q_): q(q_), chained(false) {}
template<typename Message, typename Func>
TemplateDispatcher<dispatcher, Message, Func>
handle(Func&& f)←┐ Сообщения конкретного типа
{ (3) обрабатывает TemplateDispatcher
return TemplateDispatcher<dispatcher, Message, Func>(
q, this, std::forward<Func>(f));
}
~dispatcher() noexcept(false)←┐ Деструктор может
{ (4) возбудить исключение
if (!chained) {
wait_and_dispatch();
}
}
};
}
Экземпляр dispatcher, возвращенный функцией wait(), немедленно уничтожается, так как является временным объектом, и, как уже было сказало, вся работа выполняется в его деструкторе. Деструктор вызывает функцию wait_and_dispatch(), которая в цикле (1) ожидает сообщения и передает его функции dispatch(). Сама функция dispatch() (2) проста, как правда: она проверяет, не получено ли сообщение типа close_queue, и, если так, то возбуждает исключение; в противном случае возвращает false, извещая, что сообщение не обработало. Именно из-за исключения close_queue деструктор и помечен как noexcept(false) (4); без этой аннотации действовала бы подразумеваемая спецификация исключений для деструктора — noexcept(true), означающая, что исключения не допускаются, и тогда исключение close_queue привело бы к завершению программы.
Но просто вызывать функцию wait() особого смысла не имеет — как правило, нам нужно обработать полученное сообщение. Для этого предназначена функция-член handle() (3). Это шаблон, и тип сообщения нельзя вывести, поэтому необходимо явно указать, сообщение какого типа обрабатывается, и передать функцию (или допускающий вызов объект) для его обработки. Сама функция handle() передает очередь, текущий объект dispatcher и функцию-обработчик новому экземпляру шаблонного класса TemplateDispatcher, который обрабатывает сообщения указанного типа. Код этого класса показан в листинге С.5. Именно поэтому мы проверяем флаг chained в деструкторе перед тем, как приступить к ожиданию сообщения; он не только предотвращает ожидание объектами, содержимое которых перемещено, но и позволяет передать ответственность за ожидание новому экземпляру TemplateDispatcher.
Листинг С.5. Шаблон класса TemplateDispatcher
namespace messaging {
template<
typename PreviousDispatcher, typename Msg, typename Func>
class TemplateDispatcher {
queue* q;
PreviousDispatcher* prev;
Func f;
bool chained;
TemplateDispatcher(TemplateDispatcher const&) = delete;
TemplateDispatcher& operator=(
TemplateDispatcher const&) = delete;
template<
typename Dispatcher, typename OtherMsg, typename OtherFunc>
friend class TemplateDispatcher;←┐ Все конкретизации
void wait_and_dispatch() │ TemplateDispatcher
{ │ дружат между собой
for (;;) {
auto msg = q->wait_and_pop();
if (dispatch(msg))←┐ Если мы обработали
break; │ сообщение выходим
} (1) из цикла
}
bool dispatch(std::shared_ptr<message_base> const& msg) {
if (wrapped_message<Msg>* wrapper =
dynamic_cast<wrapped_message<Msg>*>(
msg.get())) { ←┐ Проверяем тип
f(wrapper->contents);│ сообщения и
return true; │ вызываем
} (2) функцию
else {
return prev->dispatch(msg);←┐ Вызываем предыдущий
} (3) диспетчер в цепочке
}
public:
TemplateDispatcher(TemplateDispatcher&& other):
q(other.q), prev(other.prev), f(std::move(other.f)),
chained(other.chained) {
other.chained = true;
}
TemplateDispatcher(
queue* q_, PreviousDispatcher* prev_, Func&& f_):
q(q_), prev(prev_), f(std::forward<Func>(f_)), chained(false)
{
prev_->chained = true;
}
template<typename OtherMsg, typename OtherFunc>
TemplateDispatcher<TemplateDispatcher, OtherMsg, OtherFunc>
handle(OtherFunc&& of)←┐ Дополнительные обработчики
{ (4) можно связать в цепочку
return TemplateDispatcher<
TemplateDispatcher, OtherMsg, OtherFunc>(
q, this, std::forward<OtherFunc>(of));
}
~TemplateDispatcher() noexcept(false)←┐ Деструктор снова
{ │ помечен как
if (!chained) { (5) noexcept(false)
wait_and_dispatch();
}
}
};
}
Шаблон класса TemplateDispatcher<> устроен по образцу класса dispatcher и почти ничем не отличается от него. В частности, деструктор тоже вызывает wait_and_dispatch(), чтобы дождаться сообщения.
Поскольку мы не возбуждаем исключения, если сообщение обработало, то теперь в цикле (1) нужно проверять, обработали мы сообщение или нет. Обработка прекращается, как только сообщение успешно обработало, чтобы в следующий раз можно было ждать очередного набора сообщений. Если найдено соответствие указанному типу сообщения, то вызывается предоставленная функция (2), а не возбуждается исключение (хотя функция-обработчик может и сама возбудить исключение). Если же соответствие не найдено, то мы передаем сообщение предыдущему диспетчеру в цепочке (3). В самом первом экземпляре это будет объект dispatcher, но если в функции handle() (4) вызовы сцеплялись, чтобы можно было обработать несколько типов сообщений, то предыдущим диспетчером может быть ранее созданный экземпляр TemplateDispatcher<>, который в свою очередь передаст сообщение предшествующему ему диспетчеру в цепочке, если не сможет обработать его сам. Поскольку любой обработчик может возбудить исключение (в том числе и обработчик самого первого объекта dispatcher, если встретит сообщение close_queue), то деструктор снова необходимо снабдить аннотацией noexcept(false) (5).
Этот простенький каркас позволяет помещать в очередь сообщения любого типа, а затем на принимающем конце отбирать те из них, которые мы можем обработать. Кроме того, он позволяет передавать ссылку на очередь, чтобы в нее можно было добавлять новые сообщения, оставляя при этом прижимающий конец недоступным извне.
И чтобы закончить пример из главы 4, в листинге С.6 приведён код сообщений, в листингах С.7, С.8 и С.9 — различные конечные автоматы, а в листинге С.10 — управляющая программа.
Листинг С.6. Сообщения банкомата
struct withdraw {
std::string account;
unsigned amount;
mutable messaging::sender atm_queue;
withdraw(std::string const& account_,
unsigned amount_, messaging::sender atm_queue_):
account(account_), amount(amount_), atm_queue(atm_queue_) {}
};
struct withdraw_ok {};
struct withdraw_denied {};
struct cancel_withdrawal {
std::string account;
unsigned amount;
cancel_withdrawal(std::string const& account_,
unsigned amount_):
account(account_), amount(amount_) {}
};
struct withdrawal_processed {
std::string account;
unsigned amount;
withdrawal_processed(std::string const& account_,
unsigned amount_):
account(account_), amount(amount_) {}
};
struct card_inserted {
std::string account;
explicit card_inserted(std::string const& account_):
account(account_) {}
};
struct digit_pressed {
char digit;
explicit digit_pressed(char digit_):
digit(digit_) {}
};
struct clear_last_pressed {};
struct eject_card {};
struct withdraw_pressed {
unsigned amount;
explicit withdraw_pressed(unsigned amount_):
amount(amount_) {}
};
struct cancel_pressed {};
struct issue_money {
unsigned amount;
issue_money(unsigned amount_):
amount(amount_) {}
};
struct verify_pin {
std::string account;
std::string pin;
mutable messaging::sender atm_queue;
verify_pin(std::string const& account_, std::string const& pin_,
messaging::sender atm_queue_):
account(account_), pin(pin_), atm_queue(atm_queue_) {}
};
struct pin_verified {};
struct pin_incorrect {};
struct display_enter_pin {};
struct display_enter_card {};
struct display_insufficient_funds {};
struct display_withdrawal_cancelled {};
struct display_pin_incorrect_message {};
struct display_withdrawal_options (};
struct get_balance {
std::string account;
mutable messaging::sender atm_queue;
get_balance(
std::string const& account_, messaging::sender atm_queue_):
account(account_), atm_queue(atm_queue_) {}
};
struct balance {
unsigned amount;
explicit balance(unsigned amount_):
amount(amount_) {}
};
struct display_balance {
unsigned amount;
explicit display_balance(unsigned amount_):
amount(amount_) {}
};
struct balance_pressed {};
Листинг С.7. Конечный автомат банкомата
class atm {
messaging::receiver incoming;
messaging::sender bank;
messaging::sender interface_hardware;
void (atm::*state)();
std::string account;
unsigned withdrawal_amount;
std::string pin;
void process_withdrawal() {
incoming.wait().handle<withdraw_ok>(
[&](withdraw_ok const& msg) {
interface_hardware.send(
issue_money(withdrawal_amount));
bank.send(
withdrawal_processed(account, withdrawal_amount));
state = &atm::done_processing;
}
).handle<withdraw_denied>(
[&](withdraw_denied const& msg) {
interface_hardware.send(display_insufficient_funds());
state = &atm::done_processing;
}
).handle<cancel_pressed>(
[&](cancel_pressed const& msg) {
bank.send(
cancel_withdrawal(account, withdrawal_amount));
interface_hardware.send(
display_withdrawal_cancelled());
state = &atm::done_processing;
}
);
}
void process_balance() {
incoming.wait().handle<balance>(
[&](balance const& msg) {
interface_hardware.send(display_balance(msg.amount));
state = &atm::wait_for_action;
}
).handle<cancel_pressed>(
[&](cancel_pressed const& msg) {
state = &atm::done_processing;
}
);
}
void wait_for_action() {
interface_hardware.send(display_withdrawal_options());
incoming.wait().handle<withdraw_pressed>(
[&](withdraw_pressed const& msg) {
withdrawal_amount = msg.amount;
bank.send(withdraw(account, msg.amount, incoming));
state = &atm::process_withdrawal;
}
).handle<balance_pressed>(
[&](balance_pressed const& msg) {
bank.send(get_balance(account, incoming));
state = &atm::process_balance;
}
).handle<cancel_pressed>(
[&](cancel_pressed const& msg) {
state = &atm::done_processing;
}
);
}
void verifying_pin() {
incoming.wait().handle<pin_verified>(
[&](pin_verified const& msg) {
state = &atm::wait_for_action;
}
).handle<pin_incorrect>(
[&](pin_incorrect const& msg) {
interface_hardware.send(
display_pin_incorrect_message());
state = &atm::done_processing;
}
).handle<cancel_pressed>(
[&](cancel_pressed const& msg) {
state = &atm::done_processing;
}
);
}
void getting_pin() {
incoming.wait().handle<digit_pressed>(
[&](digit_pressed const& msg) {
unsigned const pin_length = 4;
pin += msg.digit;
if (pin.length() == pin_length) {
bank.send(verify_pin(account, pin, incoming));
state = &atm::verifying_pin;
}
}
).handle<clear_last_pressed>(
[&](clear_last_pressed const& msg) {
if (!pin.empty()) {
pin.pop_back();
}
}
).handle<cancel_pressed>(
[&](cancel_pressed const& msg) {
state = &atm::done_processing;
}
);
}
void waiting_for_card() {
interface_hardware.send(display_enter_card());
incoming.wait().handle<card_inserted>(
[&](card_inserted const& msg) {
account = msg.account;
pin = "";
interface_hardware.send(display_enter_pin());
state = &atm::getting_pin;
}
);
}
void done_processing() {
interface_hardware.send(eject_card());
state = &atm::waiting_for_card;
}
atm(atm const&) = delete;
atm& operator=(atm const&) = delete;
public:
atm(messaging::sender bank_,
messaging::sender interface_hardware_):
bank(bank_), interface_hardware(interface_hardware_) {}
void done() {
get_sender().send(messaging::close_queue());
}
void run() {
state = &atm::waiting_for_card;
try {
for (;;) {
(this->*state)();
}
} catch(messaging::close_queue const&) {
}
}
messaging::sender get_sender() {
return incoming;
}
};
Листинг С.8. Конечный автомат банка
class bank_machine {
messaging::receiver incoming;
unsigned balance;
public:
bank_machine():
balance(199) {}
void done() {
get_sender().send(messaging::close_queue());
}
void run() {
try {
for (;;) {
incoming.wait().handle<verify_pin>(
[&](verify_pin const& msg) {
if (msg.pin == "1937") {
msg.atm_queue.send(pin_verified());
} else {
msg.atm_queue.send(pin_incorrect());
}
}
).handle<withdraw>(
[&](withdraw const& msg) {
if (balance >= msg.amount) {
msg.atm_queue.send(withdraw_ok());
balance -= msg.amount;
} else {
msg.atm_queue.send(withdraw_denied());
}
}
).handle<get_balance>(
[&](get_balance const& msg) {
msg.atm_queue.send(::balance(balance));
}
).handle<withdrawal_processed>(
[&](withdrawal_processed const& msg) {
}
).handle<cancel_withdrawal>(
[&](cancel_withdrawal const& msg) {
}
);
}
} catch(messaging::close_queue const&) {
}
}
messaging::sender get_sender() {
return incoming;
}
};
Листинг С.9. Конечный автомат пользовательского интерфейса
class interface_machine {
messaging::receiver incoming;
public:
void done() {
get_sender().send(messaging::close_queue());
}
void run() {
try {
for (;;) {
incoming.wait().handle<issue_money> (
[&](issue_money const& msg) {
{
std::lock_guard<std::mutex> lk(iom);
std::cout << "Issuing "
<< msg.amount << std::endl;
}
}
).handle<display_insufficient_funds>(
[&](display_insufficient_funds const& msg) {
{
std::lock_guard<std::mutex> lk(iom);
std::cout << "Insufficient funds" << std::endl;
}
}
).handle<display_enter_pin>(
[&](display_enter_pin const& msg) {
{
std::lock_guard<std::mutex> lk(iom);
std::cout
<< "Please enter your PIN (0-9)" << std::endl;
}
}
).handle<display_enter_card>(
[&](display_enter_card const& msg) {
{
std::lock_guard<std::mutex> lk(iom);
std::cout << "Please enter your card (I)"
<< std::endl;
}
}
).handle<display_balance>(
[&](display_balance const& msg) {
{
std::lock_guard<std::mutex> lk(iom);
std::cout
<< "The balance of your account is "
<< msg.amount << std::endl;
}
}
).handle<display_withdrawal_options>(
[&](display_withdrawal_options const& msg) {
{
std::lock_guard<std::mutex> lk(iom);
std::cout << "Withdraw 50? (w)" << std::endl;
std::cout << "Display Balance? (b)"
<< std::endl;
std::cout << "Cancel? (c) " << std::endl;
}
}
).handle<display_withdrawal_cancelled>(
[&](display_withdrawal_cancelled const& msg) {
{
std::lock_guard<std::mutex> lk(iom);
std::cout << "Withdrawal cancelled"
<< std::endl;
}
}
).handle<display_pin_incorrect_message>(
[&](display_pin_incorrect_message const& msg) {
{
std::lock_guard<std::mutex> lk(iom);
std::cout << "PIN incorrect" << std::endl;
}
}
).handle<eject_card>(
[&](eject_card const& msg) {
{
std::lock_guard<std::mutex> lk(iom);
std::cout << "Ejecting card" << std::endl;
}
}
);
}
} catch (messaging::close_queue&) {
}
}
messaging::sender get_sender() {
return incoming;
}
};
Листинг С.10. Управляющая программа
int main() {
bank_machine bank;
interface_machine interface_hardware;
atm machine(bank.get_sender(), interface_hardware.get_sender());
std::thread bank_thread(&bank_machine::run, &bank);
std::thread if_thread(&interface_machine::run,
&interface_hardware);
std::thread atm_thread(&atm::run, &machine);
messaging::sender atmqueue(machine.get_sender());
bool quit_pressed = false;
while (!quit_pressed) {
char c = getchar();
switch(с) {
case '0':
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
case '8':
case '9':
atmqueue.send(digit_pressed(с));
break;
case 'b':
atmqueue.send(balance_pressed());
break;
case 'w':
atmqueue.send(withdraw_pressed(50));
break;
case 'с':
atmqueue.send(cancel_pressed());
break;
case 'q':
quit_pressed = true;
break;
case 'i':
atmqueue.send(card_inserted("acc1234"));
break;
}
}
bank.done();
machine.done();
interface_hardware.done();
atm_thread.join();
bank_thread.join();
if_thread.join();
}