diff --git a/src/shared/Core/MessageBroker/MessageBroker.cpp b/src/shared/Core/MessageBroker/MessageBroker.cpp index ba9f82bf411c7562e71621e6b7057eb6604785f1..8409f19ca9e44d57af65d7d1eeee85e6ecd7d23e 100644 --- a/src/shared/Core/MessageBroker/MessageBroker.cpp +++ b/src/shared/Core/MessageBroker/MessageBroker.cpp @@ -26,10 +26,11 @@ MessageBroker& MessageBroker::getInstance() return instance; } -void MessageBroker::subscribe(Filter filter, Module* observer, +void MessageBroker::subscribe(Filter filter, std::shared_ptr<Module> observer, Callback callback) { - observers.insert(filter, {observer, new Callback(std::move(callback))}); + observers.insert( + filter, {observer, std::make_unique<Callback>(std::move(callback))}); // Register the beforeDelete slot of the module to automatically // remove the subscriber @@ -37,13 +38,13 @@ void MessageBroker::subscribe(Filter filter, Module* observer, // &MessageBroker::onModuleDeleted); } -void MessageBroker::unsubscribe(Filter filter, Module* module) +void MessageBroker::unsubscribe(Filter filter, std::shared_ptr<Module> module) { auto callbackList = observers.values(filter); for (auto& elem : callbackList) { - if (elem.first == module) + if (elem.first.expired() || elem.first.lock().get() == module.get()) observers.remove(filter, elem); } } @@ -57,27 +58,49 @@ void MessageBroker::publish(const Message& message) // subscribers if (filter.match(message)) { - for (auto subscriber : observers.values(filter)) + auto filteredSubscribers = observers.values(filter); + for (auto subscriber = filteredSubscribers.begin(); + subscriber != filteredSubscribers.end();) { - auto copy = message; - filter.filterMessage(copy); - (*subscriber.second)(copy, filter); + if (!subscriber->first.expired()) + { + auto copy = message; + filter.filterMessage(copy); + (*subscriber->second)(copy, filter); + } + + subscriber++; } } } + + for (auto it = observers.begin(); it != observers.end();) + { + if (it.value().first.expired()) + { + it = observers.erase(it); + } + else + { + it++; + } + } } -void MessageBroker::onModuleDeleted(Module* module) +void MessageBroker::unsubscribeFromAll(std::shared_ptr<Module> module) { - for (auto filter : observers.keys()) + Module* moduleAddr = module.get(); + + for (auto it = observers.begin(); it != observers.end();) { - for (auto subscriber : observers.values(filter)) + auto observer = it.value().first; + if (!observer.expired() && observer.lock().get() == moduleAddr) { - if (subscriber.first == module) - { - observers.remove(filter, subscriber); - delete subscriber.second; - } + it = observers.erase(it); + } + else + { + it++; } } } diff --git a/src/shared/Core/MessageBroker/MessageBroker.h b/src/shared/Core/MessageBroker/MessageBroker.h index 194714253860319020b60c893bba5627cc9c22bf..050167ac34e7d0bd121ca1b2a43f6120ade7c4af 100644 --- a/src/shared/Core/MessageBroker/MessageBroker.h +++ b/src/shared/Core/MessageBroker/MessageBroker.h @@ -34,25 +34,32 @@ public: static MessageBroker& getInstance(); - void subscribe(Filter filter, Module* observer, Callback callback); + void subscribe(Filter filter, std::shared_ptr<Module> observer, + Callback callback); /** * @brief Unsubscribe the module from the filter. */ - void unsubscribe(Filter filter, Module* module); + void unsubscribe(Filter filter, std::shared_ptr<Module> module); /** * @brief Publish a message to all its subscribers. */ void publish(const Message& message); + /** + * @brief this method can be used to remove modules indipendent from the + * filters + */ + void unsubscribeFromAll(std::shared_ptr<Module> module); + private: MessageBroker() {} - QMultiMap<Filter, QPair<Module*, Callback*>> observers; + QMultiMap<Filter, QPair<std::weak_ptr<Module>, std::unique_ptr<Callback>>> observers; -private slots: - void onModuleDeleted(Module* module); + // private slots: + // void onModuleDeleted(std::shared_ptr<Module> module); // public: // MessageBroker(const MessageBroker&) = delete;