C++ 实现消息总线
消息总线
消息总线(Message Bus)提供了一种对象之间的交流方式。在消息总线中,对象只通过消息联系,而不必有直接的依赖或关联。这种统一管理的方式能极大地降低对象之间的耦合性,同时提高程序的可维护性。
参考:
实现原理
-
节点的表示:每个节点由其监听的主题组成和一个可调用对象(回调函数)组成。当某个节点接收到消息时,调用对应的可调用对象。
-
消息的表示:每种消息的类型由其主题以及泛型函数类型(
std::function<RetType(Args ...)>
)共同确定。 -
消息的发送:为了在消息总线中发送消息,我们需要传入指定的主题,泛型函数类型(
std::function<RetType(Args ...)>
),以及泛型函数对应的实参(如果有的话)。主题和泛型函数类型用于确定消息类型,实参是消息的具体内容。当某个消息在消息总线上传输时,需要查找监听了对应主题并具有对应的泛型函数类型的节点,调用它们的可调用对象。 -
消息的存储:为了存储不同类型的消息,此处使用「函数萃取」技术提取可调用对象的调用形式(Call Signature),然后使用 boost 库中的 any 类来擦除对象的类型。
可调用对象包括函数、函数指针、Lambda 表达式、bind 创建的对象、重载了函数调用运算符的类、function 标准库类型等。
class MessageBus {
private:
std::multimap<std::string, boost::any> container;
using Iter = std::multimap<std::string, boost::any>::iterator;
public:
// use the synthesized default constructor
MessageBus() = default;
// no copy
MessageBus(const MessageBus&) = delete;
MessageBus(MessageBus&&) = delete;
// no assignment
MessageBus& operator= (const MessageBus&) = delete;
MessageBus& operator= (MessageBus&&) = delete;
// use the synthesized destructor
~MessageBus() = default;
// create a node with a topic and a callback function
template <typename F>
void create(const std::string& topic, F&& func) {
using FunctionWrapperType = typename FunctionTraits<F>::FunctionWrapperType;
FunctionWrapperType func_wrapper = func;
const std::string tag = topic + typeid(func_wrapper).name();
container.emplace(std::move(tag), std::forward<FunctionWrapperType>(func_wrapper));
}
// remove nodes that are listening to the given topic and have the given call signature
template <typename RetType, typename... Args>
void remove(const std::string& topic) {
using FunctionWrapperType = std::function<RetType(Args...)>;
const std::string remove_tag = topic + typeid(FunctionWrapperType).name();
auto remove_range = container.equal_range(remove_tag);
if (remove_range.first == remove_range.second) {
std::cout << "Node not found." << std::endl;
}
else {
container.erase(remove_range.first, remove_range.second);
}
}
// send messages to nodes without argument
template <typename RetType>
void push(const std::string& topic) {
using FunctionWrapperType = std::function<RetType()>;
const std::string push_tag = topic + typeid(FunctionWrapperType).name();
auto send_range = container.equal_range(push_tag);
if (send_range.first == send_range.second) {
std::cout << "Node not found." << std::endl;
return;
}
for (Iter it = send_range.first; it != send_range.second; ++it) {
FunctionWrapperType func_wrapper = boost::any_cast<FunctionWrapperType>(it->second);
func_wrapper();
}
}
// send messages to nodes with arguments
template <typename RetType, typename... Args>
void push(const std::string& topic, Args&&... args) {
using FunctionWrapperType = std::function<RetType(Args...)>;
const std::string push_tag = topic + typeid(FunctionWrapperType).name();
auto send_range = container.equal_range(push_tag);
if (send_range.first == send_range.second) {
std::cout << "Node not found." << std::endl;
return;
}
for (Iter it = send_range.first; it != send_range.second; ++it) {
FunctionWrapperType func_wrapper = boost::any_cast<FunctionWrapperType>(it->second);
func_wrapper(std::forward<Args>(args)...);
}
}
};
函数萃取
该项目使用的函数萃取(Function traits)是一种特殊的 Traits class。
-
《Effective C++》条款 47:Traits classes 使得「类型相关信息」在编译期可用。它们以 templates 和「templates特化」完成实现。
-
使用 Traits class 的方法:
-
建立一组重载函数(身份像劳工)或函数模版,彼此间的差异只在于各自的 traits 参数。令每个函数实现码与其接受之 traits 信息相应和。
-
建立一个控制函数(身份像工头)或函数模版,它调用上述那些「劳工函数」并传递 traits class 所提供的信息。
-
-
Function traits 通过模版特例化和可变参数模版在编译期间获取函数类型。
-
本项目中较难实现的是 Lambda 表达式等可调用对象的类型萃取。当我们编写了一个 Lambda 后,编译器将该表达式翻译成一个未命名类的未命名对象,该类中含有一个重载的函数调用运算符。当我们向
FunctionTraits<T>
传入一个 Lambda 表达式时,实际上是传入了一个类对象。然后利用模版特化 &T::operator() 得到指向成员函数的指针,最后使用特化模版RetType(ClassType::*)(Args...)
进行处理。其它可调用对象的处理方法与此相似。
namespace func_traits {
// helper
template <typename RetType, typename... Args>
struct FunctionTraitsHelper {
using ReturnType = RetType;
using FunctionType = RetType(Args...);
using FunctionWrapperType = std::function<FunctionType>;
};
template <typename T>
struct FunctionTraits;
//---------------------------------------------------------------------
// normal function
template <typename RetType, typename... Args>
struct FunctionTraits<RetType(Args...)>
: FunctionTraitsHelper<RetType, Args...> {};
// function pointer
template <typename RetType, typename... Args>
struct FunctionTraits<RetType(*)(Args...)>
: FunctionTraitsHelper<RetType, Args...> {};
// function reference
template <typename RetType, typename... Args>
struct FunctionTraits<RetType(&)(Args...)>
: FunctionTraitsHelper<RetType, Args...> {};
//---------------------------------------------------------------------
// pointer to member fuction
template <typename CType, typename RetType, typename... Args>
struct FunctionTraits<RetType(CType::*)(Args...)>
: FunctionTraitsHelper<RetType, Args...> {
using ClassType = CType;
};
// (const version) pointer to member fuction
template <typename CType, typename RetType, typename... Args>
struct FunctionTraits<RetType(CType::*)(Args...) const>
: FunctionTraitsHelper<RetType, Args...> {
using ClassType = CType;
};
// callable (lambda or std::function<> or bind())
template <typename T>
struct FunctionTraits
: FunctionTraits<decltype(&T::operator())> {};
}
any 类
any 类 是 boost 库中的一个特殊容器,可以存放任何类型的值,在使用时可通过调用 any_cast<T> 将对象还原为实际类型。
转发和可变参数模版
《C++ Primer》中文版 p.624
实例
// Create the message bus.
MessageBus message_bus;
// Every node has a topic it listen to and its own callback function.
// Create a node with a topic and a callback function.
message_bus.create("topic 1", [](int message) {
// callback
std::cout << "Node 1 get message: " << message << std::endl;
});
// Create another node.
message_bus.create("topic 1", [](int message) {
// callback
std::cout << "Node 2 get message: " << message << std::endl;
});
// Send message 123 to nodes that are listening to topic 1 and have the call signature void(int).
message_bus.push<void, int>("topic 1", 123);
// Remove nodes that are listening to topic 1 and have the call signature void(int).
message_bus.remove<void, int>("topic 1");
// No node will get this message now.
message_bus.push<void, int>("topic 1", 123);
发布/订阅模式
该消息总线可用于实现发布/订阅模式。
发布/订阅模式(Publish–subscribe pattern)包含三个角色:主题(Topic),发布者(Publisher),订阅者(Subscriber)。发布者发送到某个主题的消息,只有该主题的订阅者才会收到消息。每个系统可以有多个发布者,每个主题也可以有多个即订阅者。发布者和订阅者之间有时间上的依赖性:针对某个主题的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
参考:
MessageBus MESSAGE_BUS;
const std::string TOPIC_A = "Pink Floyd";
const std::string TOPIC_B = "Gorillaz";
const std::string RESPOND_TOPIC = "Respond";
class Publisher {
public:
Publisher() {
MESSAGE_BUS.create(RESPOND_TOPIC, [this] (const std::string& response) {
getResponse(response);
});
}
void publish(const std::string& topic) {
if (topic == TOPIC_A) {
MESSAGE_BUS.push<void, const std::string&>(TOPIC_A, "Another Brick In The Wall");
}
else if (topic == TOPIC_B) {
MESSAGE_BUS.push<void, const std::string&>(TOPIC_B, "Tomorrow Comes Today");
}
}
private:
void getResponse(const std::string& response) {
std::cout << "Publisher gets response: " << response << std::endl;
}
};
class Subscriber {
public:
Subscriber(const std::string& name) : name(name) {}
void subscribe(const std::string& topic = "") {
MESSAGE_BUS.create(topic, [this] (const std::string& song) {
getMessage(song);
});
}
private:
std::string name;
void getMessage(const std::string& song) {
std::cout << name << " gets a message: " << song << std::endl;
MESSAGE_BUS.push<void, const std::string&>(RESPOND_TOPIC, "Nice song!");
}
};
int main() {
Publisher publisher;
Subscriber subscriberA("Subscriber A");
Subscriber subscriberB("Subscriber B");
subscriberA.subscribe(TOPIC_A);
subscriberA.subscribe(TOPIC_B);
subscriberB.subscribe(TOPIC_A);
subscriberB.subscribe(TOPIC_B);
publisher.publish(TOPIC_A);
publisher.publish(TOPIC_B);
}