Приложение С. Каркас передачи сообщений и полный пример программы банкомата

В разделе 4.1 мы познакомились с каркасом передачи сообщений между потоками, продемонстрировав его на примере программы банкомата. В этом приложении приводится полный код примера, включая и код каркаса передачи сообщений.

В листинге С.1 показан код очереди сообщений. Сообщения хранятся в списке и представлены указателями на базовый класс. Сообщения конкретного типа обрабатываются шаблонным классом, производным от этого базового класса. В момент помещения сообщения в очередь конструируется подходящий экземпляр обертывающего класса и сохраняется указатель на него; операция извлечения возвращает именно этот указатель. Поскольку в классе message_base нет функций-членов, извлекающий поток должен привести указатель к нужному типу wrapped_message<T>, прежде чем сможет получить хранящееся сообщение.

Листинг С.1. Простая очередь сообщений

#include <mutex>

#include <condition_variable>

#include <queue>

#include <memory>

namespace messaging

{                     │ Базовый класс

struct message_base {←┘ элементов очереди

 virtual ~message_base() {}

};

template <typename Msg> │ Для каждого типа сообщений

struct wrapped_message:←┘ имеется специализация

message_base {

 Msg contents;

 explicit wrapped_message(Msg const& contents_):

  contents(contents_) {}

};

            │ Наша очередь

class queue←┘ сообщений

{                                              │ В настоящей

 std::mutex m;                                 │ очереди хранят-

 std::condition_variable с;                    │ ся указатели на

 std::queue<std::shared_ptr<message_base> > q;←┘ message_base

public:

 template<typename T>               │ Обернуть добав-

 void push(T const& msg)            │ ленное сообще-

 {                                  │ ние и сохранить

  std::lock_guard<std::mutex> lk(m);│ указатель

  q.push( ←┘

   std::make_shared<wrapped_message<T> >(msg));

  с.notify_all();

 }

 std::shared_ptr<message_base> wait_and_pop()│ Блокирует до

 {                                           │ появления в

  std::unique_lock<std::mutex> lk(m);        │ очереди хотя бы

  c.wait(lk, [&]{ return !q.empty(); });    ←┘ одного элемента

  auto res = q.front();

  q.pop();

  return res;

 }

};

}

Отправкой сообщений занимается объект класса sender, показанного в листинге С.2. Это не более чем тонкая обертка вокруг очереди сообщений, которая позволяет только добавлять сообщения. При копировании экземпляров sender копируется только указатель на очередь, а не сама очередь.

Листинг С.2. Класс sender

namespace messaging {

 class sender {│ sender обертывает указатель

 queue* q;    ←┘ на очередь

public:     │ У сконструированного по умолчанию

 sender() :←┘ sender'a нет очереди

 q(nullptr) {}

                             │ Разрешаем конструирование

 explicit sender(queue* q_):←┘ из указателя на очередь

  q(q_) {}

 template<typename Message>

 void send(Message const& msg) {

  if (q){        │ Отправка сообщения сводится

   q->push(msg);←┘ к помещению его в очередь

  }

 }

};

}

Получение сообщений несколько сложнее. Мы не только должны дождаться появления сообщения в очереди, но еще и проверить, совпадает ли его тип с одним из известных нам типов, и вызвать соответствующий обработчик. Эта процедура начинается в классе receiver, показанном в листинге ниже.

Листинг С.3. Класс receiver

namespace messaging {

class receiver {

 queue q; ← receiver владеет очередью

public:              │ Разрешить неявное преобразование в объект

 operator sender() {←┘ sender, ссылающийся на эту очередь

  return sender(&q);

 }

                     │ При обращении к функции ожидания

 dispatcher wait() {←┘ очереди создается диспетчер

  return dispatcher(&q);

 }

};

}

Если sender только ссылается на очередь сообщений, то receiver ей владеет. Мы можем получить объект sender, ссылающийся на очередь, воспользовавшись неявным преобразованием. Процедура диспетчеризации сообщения начинается с обращения к функции wait(). При этом создается объект dispatcher, ссылающийся на очередь, которой владеет receiver. Класс dispatcher показан в следующем листинге; как видите, содержательная работа производится в его деструкторе. В данном случае работа состоит в ожидании сообщения и его диспетчеризации.

Листинг С.4. Класс dispatcher

namespace messaging {

class close_queue {}; ← Сообщение о закрытии очереди

class dispatcher {

 queue* q;                             │ Экземпляры

 bool chained;                         │ диспетчера нельзя

                                       │ копировать

 dispatcher(dispatcher const&)=delete;←┘

 dispatcher& operator=(dispatcher const&)=delete;

 template<

  typename Dispatcher,│ Разрешить экземплярам

  typename Msg,       │ TemplateDispatcher доступ

  typename Func>     ←┘ к закрытым частям класса

 friend class TemplateDispatcher;

 void wait_and_dispatch()

 {           (1) В цикле ждем и диспетчеризуем

  for (;;) {←┘ сообщения

   auto msg = q->wait_and_pop();

   dispatch(msg);

  }

 }               (2) dispatch() смотрит, не пришло ли

                 │ сообщение close_queue, и, если

 bool dispatch (←┘ да, возбуждает исключение

  std::shared_ptr<message_base> const& msg) {

  if (dynamic_cast<wrapped_message<close_queue>*>(msg.get())) {

   throw close_queue();

  }

  return false;

 }

public:                          │ Экземпляры диспетчера

 dispatcher(dispatcher&& other):←┘ можно перемещать

  q(other.q), chained(other.chained) {│ Объект-источник не должен

  other.chained = true;              ←┘ ждать сообщений

 }

 explicit dispatcher(queue* q_): q(q_), chained(false) {}

 template<typename Message, typename Func>

 TemplateDispatcher<dispatcher, Message, Func>

 handle(Func&& f)←┐ Сообщения конкретного типа

 {                (3) обрабатывает TemplateDispatcher

  return TemplateDispatcher<dispatcher, Message, Func>(

   q, this, std::forward<Func>(f));

 }

 ~dispatcher() noexcept(false)←┐ Деструктор может

 {                             (4) возбудить исключение

  if (!chained) {

   wait_and_dispatch();

  }

 }

};

}

Экземпляр dispatcher, возвращенный функцией wait(), немедленно уничтожается, так как является временным объектом, и, как уже было сказало, вся работа выполняется в его деструкторе. Деструктор вызывает функцию wait_and_dispatch(), которая в цикле (1) ожидает сообщения и передает его функции dispatch(). Сама функция dispatch() (2) проста, как правда: она проверяет, не получено ли сообщение типа close_queue, и, если так, то возбуждает исключение; в противном случае возвращает false, извещая, что сообщение не обработало. Именно из-за исключения close_queue деструктор и помечен как noexcept(false) (4); без этой аннотации действовала бы подразумеваемая спецификация исключений для деструктора — noexcept(true), означающая, что исключения не допускаются, и тогда исключение close_queue привело бы к завершению программы.

Но просто вызывать функцию wait() особого смысла не имеет — как правило, нам нужно обработать полученное сообщение. Для этого предназначена функция-член handle() (3). Это шаблон, и тип сообщения нельзя вывести, поэтому необходимо явно указать, сообщение какого типа обрабатывается, и передать функцию (или допускающий вызов объект) для его обработки. Сама функция handle() передает очередь, текущий объект dispatcher и функцию-обработчик новому экземпляру шаблонного класса TemplateDispatcher, который обрабатывает сообщения указанного типа. Код этого класса показан в листинге С.5. Именно поэтому мы проверяем флаг chained в деструкторе перед тем, как приступить к ожиданию сообщения; он не только предотвращает ожидание объектами, содержимое которых перемещено, но и позволяет передать ответственность за ожидание новому экземпляру TemplateDispatcher.

Листинг С.5. Шаблон класса TemplateDispatcher

namespace messaging {

template<

 typename PreviousDispatcher, typename Msg, typename Func>

class TemplateDispatcher {

 queue* q;

 PreviousDispatcher* prev;

 Func f;

 bool chained;

 TemplateDispatcher(TemplateDispatcher const&) = delete;

 TemplateDispatcher& operator=(

  TemplateDispatcher const&) = delete;

 template<

  typename Dispatcher, typename OtherMsg, typename OtherFunc>

 friend class TemplateDispatcher;←┐ Все конкретизации

 void wait_and_dispatch()         │ TemplateDispatcher

 {                                │ дружат между собой

  for (;;) {

   auto msg = q->wait_and_pop();

   if (dispatch(msg))←┐ Если мы обработали

   break;             │ сообщение выходим

  }                   (1) из цикла

 }

 bool dispatch(std::shared_ptr<message_base> const& msg) {

  if (wrapped_message<Msg>* wrapper =

   dynamic_cast<wrapped_message<Msg>*>(

   msg.get())) {       ←┐ Проверяем тип

   f(wrapper->contents);│ сообщения и

   return true;         │ вызываем

  }                     (2) функцию

  else {

   return prev->dispatch(msg);←┐ Вызываем предыдущий

  }                            (3) диспетчер в цепочке

 }

public:

 TemplateDispatcher(TemplateDispatcher&& other):

  q(other.q), prev(other.prev), f(std::move(other.f)),

  chained(other.chained) {

  other.chained = true;

 }

 TemplateDispatcher(

  queue* q_, PreviousDispatcher* prev_, Func&& f_):

  q(q_), prev(prev_), f(std::forward<Func>(f_)), chained(false)

 {

  prev_->chained = true;

 }

 template<typename OtherMsg, typename OtherFunc>

 TemplateDispatcher<TemplateDispatcher, OtherMsg, OtherFunc>

 handle(OtherFunc&& of)←┐ Дополнительные обработчики

 {                      (4) можно связать в цепочку

  return TemplateDispatcher<

   TemplateDispatcher, OtherMsg, OtherFunc>(

    q, this, std::forward<OtherFunc>(of));

 }

 ~TemplateDispatcher() noexcept(false)←┐ Деструктор снова

 {                                     │ помечен как

  if (!chained) {                     (5) noexcept(false)

   wait_and_dispatch();

  }

 }

};

}

Шаблон класса TemplateDispatcher<> устроен по образцу класса dispatcher и почти ничем не отличается от него. В частности, деструктор тоже вызывает wait_and_dispatch(), чтобы дождаться сообщения.

Поскольку мы не возбуждаем исключения, если сообщение обработало, то теперь в цикле (1) нужно проверять, обработали мы сообщение или нет. Обработка прекращается, как только сообщение успешно обработало, чтобы в следующий раз можно было ждать очередного набора сообщений. Если найдено соответствие указанному типу сообщения, то вызывается предоставленная функция (2), а не возбуждается исключение (хотя функция-обработчик может и сама возбудить исключение). Если же соответствие не найдено, то мы передаем сообщение предыдущему диспетчеру в цепочке (3). В самом первом экземпляре это будет объект dispatcher, но если в функции handle() (4) вызовы сцеплялись, чтобы можно было обработать несколько типов сообщений, то предыдущим диспетчером может быть ранее созданный экземпляр TemplateDispatcher<>, который в свою очередь передаст сообщение предшествующему ему диспетчеру в цепочке, если не сможет обработать его сам. Поскольку любой обработчик может возбудить исключение (в том числе и обработчик самого первого объекта dispatcher, если встретит сообщение close_queue), то деструктор снова необходимо снабдить аннотацией noexcept(false) (5).

Этот простенький каркас позволяет помещать в очередь сообщения любого типа, а затем на принимающем конце отбирать те из них, которые мы можем обработать. Кроме того, он позволяет передавать ссылку на очередь, чтобы в нее можно было добавлять новые сообщения, оставляя при этом прижимающий конец недоступным извне.

И чтобы закончить пример из главы 4, в листинге С.6 приведён код сообщений, в листингах С.7, С.8 и С.9 — различные конечные автоматы, а в листинге С.10 — управляющая программа.

Листинг С.6. Сообщения банкомата

struct withdraw {

 std::string account;

 unsigned amount;

 mutable messaging::sender atm_queue;

 withdraw(std::string const& account_,

  unsigned amount_, messaging::sender atm_queue_):

   account(account_), amount(amount_), atm_queue(atm_queue_) {}

};

struct withdraw_ok {};

struct withdraw_denied {};

struct cancel_withdrawal {

 std::string account;

 unsigned amount;

 cancel_withdrawal(std::string const& account_,

  unsigned amount_):

   account(account_), amount(amount_) {}

};

struct withdrawal_processed {

 std::string account;

 unsigned amount;

 withdrawal_processed(std::string const& account_,

  unsigned amount_):

   account(account_), amount(amount_) {}

};

struct card_inserted {

 std::string account;

 explicit card_inserted(std::string const& account_):

  account(account_) {}

};

struct digit_pressed {

 char digit;

 explicit digit_pressed(char digit_):

  digit(digit_) {}

};

struct clear_last_pressed {};

struct eject_card {};

struct withdraw_pressed {

 unsigned amount;

 explicit withdraw_pressed(unsigned amount_):

  amount(amount_) {}

};

struct cancel_pressed {};

struct issue_money {

 unsigned amount;

 issue_money(unsigned amount_):

  amount(amount_) {}

};

struct verify_pin {

 std::string account;

 std::string pin;

 mutable messaging::sender atm_queue;

 verify_pin(std::string const& account_, std::string const& pin_,

  messaging::sender atm_queue_):

   account(account_), pin(pin_), atm_queue(atm_queue_) {}

};

struct pin_verified {};

struct pin_incorrect {};

struct display_enter_pin {};

struct display_enter_card {};

struct display_insufficient_funds {};

struct display_withdrawal_cancelled {};

struct display_pin_incorrect_message {};

struct display_withdrawal_options (};

struct get_balance {

 std::string account;

 mutable messaging::sender atm_queue;

 get_balance(

  std::string const& account_, messaging::sender atm_queue_):

   account(account_), atm_queue(atm_queue_) {}

};

struct balance {

 unsigned amount;

 explicit balance(unsigned amount_):

  amount(amount_) {}

};

struct display_balance {

 unsigned amount;

 explicit display_balance(unsigned amount_):

  amount(amount_) {}

};

struct balance_pressed {};

Листинг С.7. Конечный автомат банкомата

class atm {

 messaging::receiver incoming;

 messaging::sender bank;

 messaging::sender interface_hardware;

 void (atm::*state)();

 std::string account;

 unsigned withdrawal_amount;

 std::string pin;

 void process_withdrawal() {

  incoming.wait().handle<withdraw_ok>(

   [&](withdraw_ok const& msg) {

    interface_hardware.send(

     issue_money(withdrawal_amount));

    bank.send(

     withdrawal_processed(account, withdrawal_amount));

    state = &atm::done_processing;

   }

  ).handle<withdraw_denied>(

   [&](withdraw_denied const& msg) {

    interface_hardware.send(display_insufficient_funds());

    state = &atm::done_processing;

   }

  ).handle<cancel_pressed>(

   [&](cancel_pressed const& msg) {

    bank.send(

     cancel_withdrawal(account, withdrawal_amount));

    interface_hardware.send(

     display_withdrawal_cancelled());

    state = &atm::done_processing;

   }

  );

 }

 void process_balance() {

  incoming.wait().handle<balance>(

   [&](balance const& msg) {

    interface_hardware.send(display_balance(msg.amount));

    state = &atm::wait_for_action;

   }

  ).handle<cancel_pressed>(

   [&](cancel_pressed const& msg) {

    state = &atm::done_processing;

   }

  );

 }

 void wait_for_action() {

  interface_hardware.send(display_withdrawal_options());

  incoming.wait().handle<withdraw_pressed>(

   [&](withdraw_pressed const& msg) {

    withdrawal_amount = msg.amount;

    bank.send(withdraw(account, msg.amount, incoming));

    state = &atm::process_withdrawal;

   }

  ).handle<balance_pressed>(

   [&](balance_pressed const& msg) {

    bank.send(get_balance(account, incoming));

    state = &atm::process_balance;

   }

  ).handle<cancel_pressed>(

   [&](cancel_pressed const& msg) {

    state = &atm::done_processing;

   }

  );

 }

 void verifying_pin() {

  incoming.wait().handle<pin_verified>(

   [&](pin_verified const& msg) {

    state = &atm::wait_for_action;

   }

  ).handle<pin_incorrect>(

   [&](pin_incorrect const& msg) {

    interface_hardware.send(

     display_pin_incorrect_message());

    state = &atm::done_processing;

   }

  ).handle<cancel_pressed>(

   [&](cancel_pressed const& msg) {

    state = &atm::done_processing;

   }

  );

 }

 void getting_pin() {

  incoming.wait().handle<digit_pressed>(

   [&](digit_pressed const& msg) {

    unsigned const pin_length = 4;

    pin += msg.digit;

    if (pin.length() == pin_length) {

     bank.send(verify_pin(account, pin, incoming));

     state = &atm::verifying_pin;

    }

   }

  ).handle<clear_last_pressed>(

   [&](clear_last_pressed const& msg) {

    if (!pin.empty()) {

     pin.pop_back();

    }

   }

  ).handle<cancel_pressed>(

   [&](cancel_pressed const& msg) {

    state = &atm::done_processing;

   }

  );

 }

 void waiting_for_card() {

  interface_hardware.send(display_enter_card());

  incoming.wait().handle<card_inserted>(

   [&](card_inserted const& msg) {

    account = msg.account;

    pin = "";

    interface_hardware.send(display_enter_pin());

    state = &atm::getting_pin;

   }

  );

 }

 void done_processing() {

  interface_hardware.send(eject_card());

  state = &atm::waiting_for_card;

 }

 atm(atm const&) = delete;

 atm& operator=(atm const&) = delete;

public:

 atm(messaging::sender bank_,

  messaging::sender interface_hardware_):

   bank(bank_), interface_hardware(interface_hardware_) {}

 void done() {

  get_sender().send(messaging::close_queue());

 }

 void run() {

  state = &atm::waiting_for_card;

  try {

   for (;;) {

    (this->*state)();

   }

  } catch(messaging::close_queue const&) {

  }

 }

 messaging::sender get_sender() {

  return incoming;

 }

};

Листинг С.8. Конечный автомат банка

class bank_machine {

 messaging::receiver incoming;

 unsigned balance;

public:

 bank_machine():

  balance(199) {}

 void done() {

  get_sender().send(messaging::close_queue());

 }

 void run() {

  try {

   for (;;) {

    incoming.wait().handle<verify_pin>(

     [&](verify_pin const& msg) {

      if (msg.pin == "1937") {

       msg.atm_queue.send(pin_verified());

      } else {

       msg.atm_queue.send(pin_incorrect());

      }

     }

    ).handle<withdraw>(

     [&](withdraw const& msg) {

      if (balance >= msg.amount) {

       msg.atm_queue.send(withdraw_ok());

       balance -= msg.amount;

      } else {

       msg.atm_queue.send(withdraw_denied());

      }

     }

    ).handle<get_balance>(

     [&](get_balance const& msg) {

      msg.atm_queue.send(::balance(balance));

     }

    ).handle<withdrawal_processed>(

     [&](withdrawal_processed const& msg) {

     }

    ).handle<cancel_withdrawal>(

     [&](cancel_withdrawal const& msg) {

     }

    );

   }

  } catch(messaging::close_queue const&) {

  }

 }

 messaging::sender get_sender() {

  return incoming;

 }

};

Листинг С.9. Конечный автомат пользовательского интерфейса

class interface_machine {

 messaging::receiver incoming;

public:

 void done() {

  get_sender().send(messaging::close_queue());

 }

 void run() {

  try {

   for (;;) {

    incoming.wait().handle<issue_money> (

     [&](issue_money const& msg) {

      {

       std::lock_guard<std::mutex> lk(iom);

       std::cout << "Issuing "

                 << msg.amount << std::endl;

      }

     }

    ).handle<display_insufficient_funds>(

     [&](display_insufficient_funds const& msg) {

      {

       std::lock_guard<std::mutex> lk(iom);

       std::cout << "Insufficient funds" << std::endl;

      }

     }

    ).handle<display_enter_pin>(

     [&](display_enter_pin const& msg) {

      {

       std::lock_guard<std::mutex> lk(iom);

       std::cout

        << "Please enter your PIN (0-9)" << std::endl;

      }

     }

    ).handle<display_enter_card>(

     [&](display_enter_card const& msg) {

      {

       std::lock_guard<std::mutex> lk(iom);

       std::cout << "Please enter your card (I)"

                 << std::endl;

      }

     }

    ).handle<display_balance>(

     [&](display_balance const& msg) {

      {

       std::lock_guard<std::mutex> lk(iom);

       std::cout

        << "The balance of your account is "

        << msg.amount << std::endl;

      }

     }

    ).handle<display_withdrawal_options>(

     [&](display_withdrawal_options const& msg) {

      {

       std::lock_guard<std::mutex> lk(iom);

       std::cout << "Withdraw 50? (w)" << std::endl;

       std::cout << "Display Balance? (b)"

                 << std::endl;

       std::cout << "Cancel? (c) " << std::endl;

      }

     }

    ).handle<display_withdrawal_cancelled>(

     [&](display_withdrawal_cancelled const& msg) {

      {

       std::lock_guard<std::mutex> lk(iom);

       std::cout << "Withdrawal cancelled"

                 << std::endl;

      }

     }

    ).handle<display_pin_incorrect_message>(

     [&](display_pin_incorrect_message const& msg) {

      {

       std::lock_guard<std::mutex> lk(iom);

       std::cout << "PIN incorrect" << std::endl;

      }

     }

    ).handle<eject_card>(

     [&](eject_card const& msg) {

      {

       std::lock_guard<std::mutex> lk(iom);

       std::cout << "Ejecting card" << std::endl;

      }

     }

    );

   }

  } catch (messaging::close_queue&) {

  }

 }

 messaging::sender get_sender() {

  return incoming;

 }

};

Листинг С.10. Управляющая программа

int main() {

 bank_machine bank;

 interface_machine interface_hardware;

 atm machine(bank.get_sender(), interface_hardware.get_sender());

 std::thread bank_thread(&bank_machine::run, &bank);

 std::thread if_thread(&interface_machine::run,

  &interface_hardware);

 std::thread atm_thread(&atm::run, &machine);

 messaging::sender atmqueue(machine.get_sender());

 bool quit_pressed = false;

 while (!quit_pressed) {

  char c = getchar();

  switch(с) {

  case '0':

  case '1':

  case '2':

  case '3':

  case '4':

  case '5':

  case '6':

  case '7':

  case '8':

  case '9':

   atmqueue.send(digit_pressed(с));

   break;

  case 'b':

   atmqueue.send(balance_pressed());

   break;

  case 'w':

   atmqueue.send(withdraw_pressed(50));

   break;

  case 'с':

   atmqueue.send(cancel_pressed());

   break;

  case 'q':

   quit_pressed = true;

   break;

  case 'i':

   atmqueue.send(card_inserted("acc1234"));

   break;

  }

 }

 bank.done();

 machine.done();

 interface_hardware.done();

 atm_thread.join();

 bank_thread.join();

 if_thread.join();

}