Эффективная передача сообщений между процессами в C ++

Взаимодействие между процессами

Введение в процессы

Процесс  —  это экземпляр компьютерной программы, выполняемый в одном или в нескольких потоках.

Рассмотрим пример. Мы разрабатываем приложение, исполняемый файл которого назвали app или app.exe для Windows. При запуске ./app из оболочки получим экземпляр работающей программы app. Можно несколько раз запустить ./app и получить несколько экземпляров (процессов) app, выполняемых в ОС.

Разрабатываемое app может работать как в одном, так и в нескольких потоках.

Упрощенная иллюстрация процесса

Из иллюстрации видно, что каждому процессу назначается отдельное пространство виртуальной памяти. process1 ничего не знает о пространстве виртуальной памяти process2 и не имеет к нему доступа.

С другой стороны, потоки находятся в одном и том же процессе, они совместно используют одно и то же пространство виртуальной памяти. Иллюстрация объясняет различие между процессом и потоком.

Межпоточное и межпроцессное взаимодействие

Взаимодействие между потоками реализовать проще, поскольку они используют единое адресное пространство. Для обмена данными/сообщениями между потоками с надлежащим механизмом синхронизации можно использовать общую глобальную переменную.

Потоки взаимодействуют, используя единое пространство памяти

Для обмена данными между процессами нужен другой механизм, поскольку у них разные адресные пространства. Глобальная переменная одного процесса не существует в другом. Один из подходов —  простое использование файлов.

Межпроцессный обмен данными с использованием файла

Рассмотрим теперь общие подходы к межпроцессному взаимодействию (IPC).

Различные подходы к IPC в C++

Вот некоторые из механизмов IPC:

  • Файл;
  • Общая память;
  • Конвейер;
  • Доменный сокет Unix;
  • Очередь сообщений и др.

Для IPC имеется множество библиотек C ++, например Boost.Interprocess. Рассмотрим создание уровня абстракции, скрывающего детали базовых механизмов IPC, чтобы разработчики могли сосредоточиться на логике приложения. Им не придется разбираться в деталях процесса, писать много шаблонного кода. Задача состоит в том, чтобы отправлять сообщения в следующей форме:

int main() {
    Coord coord{1, 2, 3};
    LatLong latlong{1.5, 2.5};
    Sender sender;
    
    sender.Send(coord);
    sender.Send(latlong);

    return 0;
}

Можно просто отправлять объекты другим процессам. В процессе получения код выглядит так:

int main() {
    Receiver receiver;
    
    receiver.Handle([](const Coord& coord) {
        std::cout << "Coord: " << coord << "\n";
    });
    
    receiver.Handle([](const LatLong& latlong) {
        std::cout << "LatLong: " << latlong << "\n";
    });
    
    receiver.Run();
    return 0;
}

Просто регистрируем наши обработчики там, где в основном используем лямбды (анонимные функции), и все. Детали рассмотрим в следующих разделах. В качестве основного механизма IPC используем очередь сообщений Boost (Boost’s Message Queue).

Последовательный формат

Зачем нужно переводить объекты в последовательную форму?

При межпроцессном обмене сообщениями данные передаются в последовательном формате (serialize).

После перевода в последовательный формат структуры данных/состояния объектов можно сохранять, передавать и восстанавливать позже на том же или другом компьютере.

Экземпляр класса — объект, который можно сохранить в файл или отправить другому процессу на этом же или другом компьютере.

Как известно, два процесса используют разные пространства памяти. Поэтому, прежде чем отправить объект от одного процесса другому, нужен механизм для преобразования объекта в поток байтов.

Кто-то может спросить: “Почему нельзя просто отправить байты объекта”? Это возможно, но не все так просто. Даже если обе программы написаны на C ++, могут возникнуть проблемы, если они созданы с использованием разных компиляторов, если объекты не относятся к типу POD (Plain Old Data), если программы созданы на компьютерах с разной архитектурой (прямой и обратный порядок байтов) и т. д.

Для сложных типов, к примеру, имеющих виртуальные функции и указатели/ссылки в качестве членов, нельзя просто выгрузить память, поскольку она содержат адреса (указатели, виртуальные указатели), которые действительны только в этом процессе. 

Отправка объекта из Process 1 в Process 2

Для передачи данных можно использовать любой механизм IPC из перечисленных ранее.

Методы последовательного представления данных

При выборе метода последовательного преобразования учитывают целый ряд факторов. Вот некоторые из них:

  • Удобное для человека чтение кода;
  • Скорость кодирования и декодирования;
  • Размер данных в последовательном формате;
  • Независимость от языка/ориентированность на конкретный язык;
  • Поддержка безопасности и т.д.

По категориям методы делятся на:

  • Текстовые;
  • Бинарные.

К последовательным текстовым форматам обмена данными относятся XML, JSON, YAML и другие. Бинарные форматы поддерживают CBOR, Google Protocol Buffers и другие.

Например, есть структура с именем Coord, которая представляет трехмерную декартову систему координат. Имеем х, у и z.

struct Coord {
    int x;
    int y;
    int z;
};

Если для последовательного представления выбрать формат JSON, то получим:

{
  "name":"coord",
  "data":{
    "x":1,
    "y":2,
    "z":3
  }
}

Данные готовы к отправке в виде текстового потока байтов в кодировке UTF-8.

Метод последовательного преобразования выбирается в зависимости от требований приложения. При необходимости это может быть удобочитаемость кода или популярность метода.

Для демонстрации структуры межпроцессной передачи сообщений в C++ будем использовать далее JSON. Это библиотека nlohmann JSON в repo на Github. Хорошо, если вы уже умеете ею пользоваться. Для остальных возможности библиотеки представлены далее.

Структура передачи сообщений

Используемые компоненты:

  • Очередь сообщений Boost для механизма IPC;
  • Формат JSON для последовательного преобразования сообщений.

Приступим к реализации структуры.

Сообщения

Допустим, есть два сообщения для передачи из Process 1 в Process 2. Это Coord и LatLong.

struct Coord {
    static constexpr const char* name = "Coord";
    int x;
    int y;
    int z;
};

struct LatLong {
    static constexpr const char* name = "LatLong";
    double latitude;
    double longitude;
};

В формате JSON они выглядят следующим образом:

{"name":"Coord", "data":{"x":1, "y":2, "z":3}}
{"name":"LatLong", "data":{"latitude":1.5, "longitude":2.5}}

Сначала, используя выбранную библиотеку, напишем для обоих объектов шаблоны функции прямого и обратного последовательного преобразования to_json и from_json.

template <typename T>
json to_json(const T& t);

template <typename T>
T from_json(const json& j);

template <>
json to_json(const Coord& coord) {
    return json{{"name", coord.name},
                {"data",{{"x", coord.x},
                         {"y", coord.y},
                         {"z", coord.z}}}};
}

template <>
Coord from_json<Coord>(const json& j) {
    Coord coord;

    j.at("x").get_to(coord.x);
    j.at("y").get_to(coord.y);
    j.at("z").get_to(coord.z);

    return coord;
}

template <>
json to_json(const LatLong& latlong) {
    return json{{"name", latlong.name}, 
                {"data",{{"latitude", latlong.latitude}, 
                         {"longitude", latlong.longitude}}}};
}

template <>
LatLong from_json<LatLong>(const json& j) {
    LatLong latlong;

    j.at("latitude").get_to(latlong.latitude);
    j.at("longitude").get_to(latlong.longitude);

    return latlong;
}

С помощью этих шаблонов можно преобразовать объект в последовательный код и обратно:

int main()
{
  Coord coord{1, 2, 3};
  LatLong latlong{1.5, 2.5};

  // serialize
  json coord_msg = to_json(coord);
  json latlong_msg = to_json(latlong);
  // json object -> bytes stream

  // deserialize
  // bytes stream -> json object
  Coord coordData = from_json<Coord>(coord_msg);
  LatLong latlongData = from_json<LatLong>(latlong_msg);
  
  return 0;
}

Неструктурированная передача сообщений

Используя прямое и обратное последовательное преобразование можно добавить механизм IPC для обмена сообщениями. Далее представлена реализация процессов sender и receiver с помощью очереди сообщений Boost:

using namespace boost::interprocess;

int main () {
  Coord coord{1, 2, 3};
  LatLong latlong{1.5, 2.5};
  try {
    message_queue::remove("message_queue");
    message_queue mq
       (create_only               //only create
       ,"message_queue"           //name
       ,100                       //max message number
       ,1024                      //max message size
       );
      
      // send Coord object
      {
        json msg_j = to_json(coord);
        std::string msg = msg_j.dump();
        mq.send(msg.c_str(), msg.length()+1, 0);
      }
      // send LatLong object
      {
        json msg_j = to_json(latlong);
        std::string msg = msg_j.dump();
        mq.send(msg.c_str(), msg.length()+1, 0);
      }
  }
  catch(interprocess_exception &ex){
    std::cout << ex.what() << std::endl;
    return 1;
  }

  return 0;
}

Приведенный выше код sender создает очередь из сообщений и перед завершением отправляет два из них. Перед отправкой объекты преобразуются в последовательный (serialized) формат JSON, в строки с завершающим нулевым байтом с помощью функций dump () и c_str ():

using namespace boost::interprocess;

int main () {
  Coord coord{1, 2, 3};
  LatLong latlong{1.5, 2.5};
  try {
    message_queue::remove("message_queue");
    message_queue mq
       (create_only               //only create
       ,"message_queue"           //name
       ,100                       //max message number
       ,1024                      //max message size
       );
      
      // send Coord object
      {
        json msg_j = to_json(coord);
        std::string msg = msg_j.dump();
        mq.send(msg.c_str(), msg.length()+1, 0);
      }
      // send LatLong object
      {
        json msg_j = to_json(latlong);
        std::string msg = msg_j.dump();
        mq.send(msg.c_str(), msg.length()+1, 0);
      }
  }
  catch(interprocess_exception &ex){
    std::cout << ex.what() << std::endl;
    return 1;
  }

  return 0;
}

На стороне receiver открываем созданную очередь сообщений и ждем их поступления. Полученные в виде потока байтов сообщения сначала нужно преобразовать в объекты JSON, затем вызвать функции для преобразования (deserialization) в типы Coord и LatLong.

Теперь код выполняет свою основную функцию, но есть еще ряд возможностей для его совершенствования, например, можно скрыть детали в классах, избавиться от шаблонного кода и пр.

Выразительный код

Всем известно, что код гораздо чаще приходится читать, чем писать. На чтение собственного или чужого кода мы тратим немало времени. Именно поэтому писать его нужно для других разработчиков, а не для машин.

Хотя это и не всегда верно. Работающее в режиме реального времени программное обеспечение оптимизируют под компьютеры, в ущерб удобочитаемости. Но все-таки чаще всего код пишут для людей.

Выразительный код решает поставленную задачу без излишнего усложнения. Но для оценки выразительности кода нет стандартных способов. Это качественный и субъективный показатель.

Код рассматриваемого здесь примера можно сделать более выразительным, написав структуру для передачи сообщений между процессами. Наиболее важны при отправке и получении сообщений send () и handle (). Можно сравнить следующие версии кода и выбрать более выразительную:

Без структуры:

  mq.receive(buffer, 1024, recvd_size, priority);
  json msg_j = json::parse(buffer, buffer+recvd_size);
  if (msg_j["name"] == "Coord") {
    Handle(from_json<Coord>(msg_j));
  } else if (msg_j["name"] == "LatLong") {
    Handle(from_json<LatLong>(msg_j));
  } else {
    // unknown message
  }

Структурированная:

Receiver receiver;

receiver.Handle([](const Coord& coord) {
    std::cout << "Coord: " << coord << "\n";
}); 
receiver.Handle([](const LatLong& latlong) {
    std::cout << "LatLong: " << latlong << "\n";
});

receiver.Run();

Структурированная передача сообщений

Теперь давайте реализуем структуру по шагам. Для этого не так уж много нужно сделать. Можно повторно использовать класс CallbackWrappe.

Класс реализован с использованием метода Type Erasure для обертывания вызываемого объекта и предоставления интерфейсов чтобы:

  • проверить, может ли обернутый объект обрабатывать тип;
  • переадресовать вызов.
struct CallbackWrapper {
    template <typename T>
    CallbackWrapper(T&& obj) :
        wrappedCallback(std::make_unique<Wrapper<typename std::remove_reference<T>::type>>
                        (std::forward<T>(obj))) { }

    struct CallbackBase {
        virtual bool operator()(const std::type_info& type) const = 0;
        virtual void operator()(const std::any& data) = 0;
        virtual ~CallbackBase() {}
    };

    template<typename T>
    struct Wrapper : public CallbackBase {

        typedef typename function_traits<T>::template arg<0> arg_type;

        Wrapper(const T& t) :
            wrappedObject(t) {}

        bool operator()(const std::type_info& type) const override {
            if (typeid(arg_type) == type) {
                return true;
            } else {
                return false;
            }
        }

        void operator()(const std::any& data) override {
            wrappedObject(std::any_cast<arg_type>(data));
        }

        T wrappedObject;
    };

    bool operator()(const std::type_info& type) {
        return (*wrappedCallback)(type);
    }

    void operator()(const std::any& data) {
        return (*wrappedCallback)(data);
    }

    std::unique_ptr<CallbackBase> wrappedCallback;
};

Следующий шаг — реализация класса Sender:

class Sender {
public:
    Sender() :
        mq(boost::interprocess::create_only, "message_queue", MAX_NUM_OF_MESSAGES, KILOBYTE) {}
    
    template <typename T>
    void Send(const T& t) {
        try {
            json msg_j = to_json(t);
            std::string msg = msg_j.dump();
            mq.send(msg.c_str(), msg.length()+1, 0);
        } catch(boost::interprocess::interprocess_exception& ex) {
            std::cout << ex.what() << std::endl;
        }
    }
private:
    boost::interprocess::message_queue mq;
};

И последний шаг — реализация класса Receiver. Этот класс состоит из нескольких частей:

class Receiver {
public:
    Receiver() :
        mq(boost::interprocess::open_only, "message_queue") {}
    
    void Run() {
        while (true) {
            try {
                mq.receive(buffer, KILOBYTE, recvd_size, priority);
                json msg_j = json::parse(buffer, buffer+recvd_size);
                std::any msg = dispatch_table[msg_j["name"]](msg_j["data"]);
                for (CallbackWrapper& callbackWrapper : callbackWrappers) {
                    if (callbackWrapper(msg.type())) {
                        callbackWrapper(msg);
                    }
                }
            } catch(boost::interprocess::interprocess_exception& ex) {
                std::cout << ex.what() << std::endl;
                break;
            }
        }
    }

    template <typename Callback>
    void Handle(Callback cb) {
        static_assert(function_traits<Callback>::arity == 1, "Only one argument is supported");
        typedef typename function_traits<Callback>::template arg<0> arg_type_cv_ref;
        typedef typename std::remove_reference<arg_type_cv_ref>::type arg_type_cv;
        typedef typename std::remove_cv<arg_type_cv>::type arg_type;

        dispatch_table[arg_type::name] = from_json<arg_type>;
        callbackWrappers.emplace_back(cb);
    }

private:
    char buffer[KILOBYTE];
    unsigned int priority;
    boost::interprocess::message_queue::size_type recvd_size;
    boost::interprocess::message_queue mq;
    std::map<std::string, std::function<std::any(const json&)>> dispatch_table;
    std::vector<CallbackWrapper> callbackWrappers;
};

Помимо обертывания прямого/обратного последовательного преобразования и очереди сообщений здесь также реализованы:

  • шаблон функции Handle();
  • таблица-диспетчер.

Шаблон функции Handle принимает вызываемый объект, получает тип его первого аргумента, удаляет из него ссылку и квалификатор cv. Затем он использует этот тип, чтобы выбрать функцию для преобразования объекта JSON в правильный объект. Вызываемый объект при этом хранится в векторе.

После прибытия сообщения выполняются следующие действия:

  • создание объекта JSON;
  • получение правильной функции from_json с помощью таблицы-диспетчера;
  • преобразование объекта JSON в правильный объект;
  • выбор обработчиков с помощью callbackWrappers;
  • вызов обработчиков.

С помощью этой структуры можно достичь изложенной в начале этой статьи цели, просто написав следующий код для отправки и получения сообщений:

int main() {
    Coord coord{1, 2, 3};
    LatLong latlong{1.5, 2.5};
    Sender sender;
    
    sender.Send(coord);
    sender.Send(latlong);

    return 0;
}

int main() {
    Receiver receiver;
    
    receiver.Handle([](const Coord& coord) {
        std::cout << "Coord: " << coord << "\n";
    });
    
    receiver.Handle([](const LatLong& latlong) {
        std::cout << "LatLong: " << latlong << "\n";
    });
    
    receiver.Run();
    return 0;
}

Результат:

Coord: 1, 2, 3
LatLong: 1.5, 2.5

И, конечно же, для правильной печати результата нужно реализовать следующий код:

std::ostream& operator<<(std::ostream& os, const Coord& coord) {
    os << coord.x << ", " << coord.y << ", " << coord.z;
    return os;
}

std::ostream& operator<<(std::ostream& os, const LatLong& latlong) {
    os << latlong.latitude << ", " << latlong.longitude;
    return os;
}

Заключение

  • Обмен сообщениями между процессами отличается от обмена между потоками, потому что процессы используют разные адресные пространства.
  • Поэтому для взаимодействия между процессами нельзя просто использовать глобальную переменную.
  • Среди множества механизмов межпроцессного взаимодействия можно выбрать соответствующий требованиям, например, очередь сообщений.
  • Обмен данными между процессами должен выполняться в последовательном формате, поскольку способы хранения объектов в памяти могут быть разными, зависящими от архитектуры процессора, используемого компилятора и т. д.
  • Разработчики должны стремиться к написанию не просто работающего, но и выразительного кода, понятного для человека, стараясь избегать излишних сложностей, которые будут перегружать код.

Читайте также:

Читайте нас в TelegramVK и Яндекс.Дзен


Перевод статьи Debby Nirwan: C++ Framework for More Expressive Inter-Process Message Passing

Предыдущая статья5 ключевых правил успешного ревью кода
Следующая статья5 вещей, которые следует учесть перед тем, как создать свой бизнес на основе мобильного приложения