diff --git a/src/shared/Core/MessageBroker/MessageBroker.cpp b/src/shared/Core/MessageBroker/MessageBroker.cpp index 8409f19ca9e44d57af65d7d1eeee85e6ecd7d23e..5dfd504ff78f2c37fa7c34a7e6f196eeda6f5c3d 100644 --- a/src/shared/Core/MessageBroker/MessageBroker.cpp +++ b/src/shared/Core/MessageBroker/MessageBroker.cpp @@ -26,25 +26,25 @@ MessageBroker& MessageBroker::getInstance() return instance; } -void MessageBroker::subscribe(Filter filter, std::shared_ptr<Module> observer, - Callback callback) +std::unique_ptr<Subscription> MessageBroker::subscribe( + Filter filter, Subscription::SubscriberFn callback) { - observers.insert( - filter, {observer, std::make_unique<Callback>(std::move(callback))}); + auto subscription = + std::make_unique<Subscription>(std::move(callback), filter); - // Register the beforeDelete slot of the module to automatically - // remove the subscriber - // connect(observer, &Module::closeMe, this, - // &MessageBroker::onModuleDeleted); + observers.insert(filter, subscription.get()); + + return std::move(subscription); } -void MessageBroker::unsubscribe(Filter filter, std::shared_ptr<Module> module) +void MessageBroker::unsubscribe(Subscription* sub) { + auto filter = sub->filter; auto callbackList = observers.values(filter); for (auto& elem : callbackList) { - if (elem.first.expired() || elem.first.lock().get() == module.get()) + if (elem == sub) observers.remove(filter, elem); } } @@ -62,45 +62,13 @@ void MessageBroker::publish(const Message& message) for (auto subscriber = filteredSubscribers.begin(); subscriber != filteredSubscribers.end();) { - if (!subscriber->first.expired()) - { - auto copy = message; - filter.filterMessage(copy); - (*subscriber->second)(copy, filter); - } + + auto copy = message; + filter.filterMessage(copy); + (*subscriber)->callback(copy, filter); subscriber++; } } } - - for (auto it = observers.begin(); it != observers.end();) - { - if (it.value().first.expired()) - { - it = observers.erase(it); - } - else - { - it++; - } - } -} - -void MessageBroker::unsubscribeFromAll(std::shared_ptr<Module> module) -{ - Module* moduleAddr = module.get(); - - for (auto it = observers.begin(); it != observers.end();) - { - auto observer = it.value().first; - if (!observer.expired() && observer.lock().get() == moduleAddr) - { - it = observers.erase(it); - } - else - { - it++; - } - } } diff --git a/src/shared/Core/MessageBroker/MessageBroker.h b/src/shared/Core/MessageBroker/MessageBroker.h index 050167ac34e7d0bd121ca1b2a43f6120ade7c4af..d30f280ba504a6c3fe0e14ad56120a21b5dbd1d1 100644 --- a/src/shared/Core/MessageBroker/MessageBroker.h +++ b/src/shared/Core/MessageBroker/MessageBroker.h @@ -23,6 +23,8 @@ #include <QObject> #include <functional> +#include "Subscription.h" + class Module; class MessageBroker : public QObject @@ -30,33 +32,28 @@ class MessageBroker : public QObject Q_OBJECT public: - using Callback = std::function<void(const Message&, const Filter&)>; - static MessageBroker& getInstance(); - void subscribe(Filter filter, std::shared_ptr<Module> observer, - Callback callback); - - /** - * @brief Unsubscribe the module from the filter. - */ - void unsubscribe(Filter filter, std::shared_ptr<Module> module); + [[nodiscard( + "IF NOT SAVED SUBSCRIPTION WILL BE REMOVED AUTOMATICALLY")]] std:: + unique_ptr<Subscription> + subscribe(Filter filter, Subscription::SubscriberFn callback); /** * @brief Publish a message to all its subscribers. */ void publish(const Message& message); +private: + friend Subscription; /** - * @brief this method can be used to remove modules indipendent from the - * filters + * @brief Removes the subscription from the filter. */ - void unsubscribeFromAll(std::shared_ptr<Module> module); + void unsubscribe(Subscription* sub); -private: - MessageBroker() {} + MessageBroker(){}; - QMultiMap<Filter, QPair<std::weak_ptr<Module>, std::unique_ptr<Callback>>> observers; + QMultiMap<Filter, Subscription*> observers; // private slots: // void onModuleDeleted(std::shared_ptr<Module> module); diff --git a/src/shared/Core/MessageBroker/Subscription.h b/src/shared/Core/MessageBroker/Subscription.h new file mode 100644 index 0000000000000000000000000000000000000000..e428b916fcc3981e171db848e704b11ca221909d --- /dev/null +++ b/src/shared/Core/MessageBroker/Subscription.h @@ -0,0 +1,30 @@ +#pragma once + +#include <Core/Message/Filter.h> + +#include <functional> + +#include "MessageBroker.h" + +class Subscription +{ +public: + using SubscriberFn = std::function<void(const Message&, const Filter&)>; + + void unsubscribe() { MessageBroker::getInstance().unsubscribe(this); } + + ~Subscription() { unsubscribe(); } + +private: + friend MessageBroker; + + Subscription(SubscriberFn callback, Filter filter) + : callback(std::move(callback)), filter(filter){}; + + Subscription() = delete; + Subscription(Subscription&) = delete; + Subscription(Subscription&& other) = delete; + + Filter filter; + SubscriberFn callback; +}; \ No newline at end of file