C++ 实现消息总线

消息总线

消息总线(Message Bus)提供了一种对象之间的交流方式。在消息总线中,对象只通过消息联系,而不必有直接的依赖或关联。这种统一管理的方式能极大地降低对象之间的耦合性,同时提高程序的可维护性。

参考:

  1. 消息总线的优点

  2. 消息总线和消息队列的异同

实现原理

  • 节点的表示:每个节点由其监听的主题组成和一个可调用对象(回调函数)组成。当某个节点接收到消息时,调用对应的可调用对象。

  • 消息的表示:每种消息的类型由其主题以及泛型函数类型(std::function<RetType(Args ...)>)共同确定。

  • 消息的发送:为了在消息总线中发送消息,我们需要传入指定的主题,泛型函数类型(std::function<RetType(Args ...)>),以及泛型函数对应的实参(如果有的话)。主题和泛型函数类型用于确定消息类型,实参是消息的具体内容。当某个消息在消息总线上传输时,需要查找监听了对应主题并具有对应的泛型函数类型的节点,调用它们的可调用对象。

  • 消息的存储:为了存储不同类型的消息,此处使用「函数萃取」技术提取可调用对象的调用形式(Call Signature),然后使用 boost 库中的 any 类来擦除对象的类型。

可调用对象包括函数、函数指针、Lambda 表达式、bind 创建的对象、重载了函数调用运算符的类、function 标准库类型等。

class MessageBus {
private:
    std::multimap<std::string, boost::any> container;
    using Iter = std::multimap<std::string, boost::any>::iterator;

public:
    // use the synthesized default constructor
    MessageBus() = default;

    // no copy
    MessageBus(const MessageBus&) = delete;
    MessageBus(MessageBus&&) = delete;

    // no assignment
    MessageBus& operator= (const MessageBus&) = delete;
    MessageBus& operator= (MessageBus&&) = delete;

    // use the synthesized destructor
    ~MessageBus() = default;

    // create a node with a topic and a callback function
    template <typename F>
    void create(const std::string& topic, F&& func) {
        using FunctionWrapperType = typename FunctionTraits<F>::FunctionWrapperType;
        FunctionWrapperType func_wrapper = func;
        const std::string tag = topic + typeid(func_wrapper).name();
        container.emplace(std::move(tag), std::forward<FunctionWrapperType>(func_wrapper));
    }

    // remove nodes that are listening to the given topic and have the given call signature
    template <typename RetType, typename... Args>
    void remove(const std::string& topic) {
        using FunctionWrapperType = std::function<RetType(Args...)>;
        const std::string remove_tag = topic + typeid(FunctionWrapperType).name();
        auto remove_range = container.equal_range(remove_tag);
        if (remove_range.first == remove_range.second) {
            std::cout << "Node not found." << std::endl;
        }
        else {
            container.erase(remove_range.first, remove_range.second);
        }
    }

    // send messages to nodes without argument
    template <typename RetType>
    void push(const std::string& topic) {
        using FunctionWrapperType = std::function<RetType()>;
        const std::string push_tag = topic + typeid(FunctionWrapperType).name();
        auto send_range = container.equal_range(push_tag);
        if (send_range.first == send_range.second) {
            std::cout << "Node not found." << std::endl;
            return;
        }
        for (Iter it = send_range.first; it != send_range.second; ++it) {
            FunctionWrapperType func_wrapper = boost::any_cast<FunctionWrapperType>(it->second);
            func_wrapper();
        }
    }

    // send messages to nodes with arguments
    template <typename RetType, typename... Args>
    void push(const std::string& topic, Args&&... args) {
        using FunctionWrapperType = std::function<RetType(Args...)>;
        const std::string push_tag = topic + typeid(FunctionWrapperType).name();
        auto send_range = container.equal_range(push_tag);
        if (send_range.first == send_range.second) {
            std::cout << "Node not found." << std::endl;
            return;
        }
        for (Iter it = send_range.first; it != send_range.second; ++it) {
            FunctionWrapperType func_wrapper = boost::any_cast<FunctionWrapperType>(it->second);
            func_wrapper(std::forward<Args>(args)...);
        }
    }
};

函数萃取

该项目使用的函数萃取(Function traits)是一种特殊的 Traits class。

  • 《Effective C++》条款 47:Traits classes 使得「类型相关信息」在编译期可用。它们以 templates 和「templates特化」完成实现。

  • 使用 Traits class 的方法:

    1. 建立一组重载函数(身份像劳工)或函数模版,彼此间的差异只在于各自的 traits 参数。令每个函数实现码与其接受之 traits 信息相应和。

    2. 建立一个控制函数(身份像工头)或函数模版,它调用上述那些「劳工函数」并传递 traits class 所提供的信息。

  • Function traits 通过模版特例化和可变参数模版在编译期间获取函数类型。

  • 本项目中较难实现的是 Lambda 表达式等可调用对象的类型萃取。当我们编写了一个 Lambda 后,编译器将该表达式翻译成一个未命名类的未命名对象,该类中含有一个重载的函数调用运算符。当我们向 FunctionTraits<T> 传入一个 Lambda 表达式时,实际上是传入了一个类对象。然后利用模版特化 &T::operator() 得到指向成员函数的指针,最后使用特化模版 RetType(ClassType::*)(Args...) 进行处理。其它可调用对象的处理方法与此相似。

namespace func_traits {
    // helper
    template <typename RetType, typename... Args>
    struct FunctionTraitsHelper {
        using ReturnType = RetType;
        using FunctionType = RetType(Args...);
        using FunctionWrapperType = std::function<FunctionType>;
    };

    template <typename T>
    struct FunctionTraits;

    //---------------------------------------------------------------------

    // normal function
    template <typename RetType, typename... Args>
    struct FunctionTraits<RetType(Args...)>
    : FunctionTraitsHelper<RetType, Args...> {};

    // function pointer
    template <typename RetType, typename... Args>
    struct FunctionTraits<RetType(*)(Args...)>
    : FunctionTraitsHelper<RetType, Args...> {};

    // function reference
    template <typename RetType, typename... Args>
    struct FunctionTraits<RetType(&)(Args...)>
    : FunctionTraitsHelper<RetType, Args...> {};

    //---------------------------------------------------------------------

    // pointer to member fuction
    template <typename CType, typename RetType, typename... Args>
    struct FunctionTraits<RetType(CType::*)(Args...)>
    : FunctionTraitsHelper<RetType, Args...> {
        using ClassType = CType;
    };

    // (const version) pointer to member fuction
    template <typename CType, typename RetType, typename... Args>
    struct FunctionTraits<RetType(CType::*)(Args...) const>
    : FunctionTraitsHelper<RetType, Args...> {
        using ClassType = CType;
    };

    // callable (lambda or std::function<> or bind())
    template <typename T>
    struct FunctionTraits
    : FunctionTraits<decltype(&T::operator())> {};
}

any 类

any 类 是 boost 库中的一个特殊容器,可以存放任何类型的值,在使用时可通过调用 any_cast<T> 将对象还原为实际类型。

转发和可变参数模版

《C++ Primer》中文版 p.624

实例

// Create the message bus.
MessageBus message_bus;

// Every node has a topic it listen to and its own callback function.

// Create a node with a topic and a callback function.
message_bus.create("topic 1", [](int message) {
    // callback
    std::cout << "Node 1 get message: " << message << std::endl;
});

// Create another node.
message_bus.create("topic 1", [](int message) {
    // callback
    std::cout << "Node 2 get message: " << message << std::endl;
});

// Send message 123 to nodes that are listening to topic 1 and have the call signature void(int).
message_bus.push<void, int>("topic 1", 123);

// Remove nodes that are listening to topic 1 and have the call signature void(int).
message_bus.remove<void, int>("topic 1");

// No node will get this message now.
message_bus.push<void, int>("topic 1", 123);

发布/订阅模式

该消息总线可用于实现发布/订阅模式。

发布/订阅模式(Publish–subscribe pattern)包含三个角色:主题(Topic),发布者(Publisher),订阅者(Subscriber)。发布者发送到某个主题的消息,只有该主题的订阅者才会收到消息。每个系统可以有多个发布者,每个主题也可以有多个即订阅者。发布者和订阅者之间有时间上的依赖性:针对某个主题的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。

参考:

  1. 维基百科的解释

  2. 发布/订阅模式在 Redis 中的运用

  3. 观察者模式 vs 发布/订阅模式

MessageBus MESSAGE_BUS;
const std::string TOPIC_A = "Pink Floyd";
const std::string TOPIC_B = "Gorillaz";
const std::string RESPOND_TOPIC = "Respond";

class Publisher {
public:
    Publisher() {
        MESSAGE_BUS.create(RESPOND_TOPIC, [this] (const std::string& response) {
            getResponse(response);
        });
    }

    void publish(const std::string& topic) {
        if (topic == TOPIC_A) {
            MESSAGE_BUS.push<void, const std::string&>(TOPIC_A, "Another Brick In The Wall");
        }
        else if (topic == TOPIC_B) {
            MESSAGE_BUS.push<void, const std::string&>(TOPIC_B, "Tomorrow Comes Today");
        }
    }

private:
    void getResponse(const std::string& response) {
        std::cout << "Publisher gets response: " << response << std::endl;
    }
};

class Subscriber {
public:
    Subscriber(const std::string& name) : name(name) {}

    void subscribe(const std::string& topic = "") {
        MESSAGE_BUS.create(topic, [this] (const std::string& song) {
            getMessage(song);
        });
    }

private:
    std::string name;

    void getMessage(const std::string& song) {
        std::cout << name << " gets a message: " << song << std::endl;
        MESSAGE_BUS.push<void, const std::string&>(RESPOND_TOPIC, "Nice song!");
    }
};

int main() {
    Publisher publisher;
    Subscriber subscriberA("Subscriber A");
    Subscriber subscriberB("Subscriber B");

    subscriberA.subscribe(TOPIC_A);
    subscriberA.subscribe(TOPIC_B);

    subscriberB.subscribe(TOPIC_A);
    subscriberB.subscribe(TOPIC_B);

    publisher.publish(TOPIC_A);
    publisher.publish(TOPIC_B);
}

延伸阅读

Alan Kay On Messaging

Updated: