From 9653d124dbf6b0df306ee67f1d9b74145904c32a Mon Sep 17 00:00:00 2001
From: Raul Radu <raul.radu@mail.polimi.it>
Date: Wed, 24 Jan 2024 22:01:20 +0100
Subject: [PATCH] [MessageBroker] Using smart pointers and resolved bugs

- Using smart pointers to store modules (using weak ptr since this must
      not control the life of a module)
- Added functionality to unsubscribe from all filters
- callbacks are now stored in a unique_ptr to prevent memory leaks when
    the pair is removed from the MultiMap
---
 .../Core/MessageBroker/MessageBroker.cpp      | 55 +++++++++++++------
 src/shared/Core/MessageBroker/MessageBroker.h | 17 ++++--
 2 files changed, 51 insertions(+), 21 deletions(-)

diff --git a/src/shared/Core/MessageBroker/MessageBroker.cpp b/src/shared/Core/MessageBroker/MessageBroker.cpp
index ba9f82bf..8409f19c 100644
--- a/src/shared/Core/MessageBroker/MessageBroker.cpp
+++ b/src/shared/Core/MessageBroker/MessageBroker.cpp
@@ -26,10 +26,11 @@ MessageBroker& MessageBroker::getInstance()
     return instance;
 }
 
-void MessageBroker::subscribe(Filter filter, Module* observer,
+void MessageBroker::subscribe(Filter filter, std::shared_ptr<Module> observer,
                               Callback callback)
 {
-    observers.insert(filter, {observer, new Callback(std::move(callback))});
+    observers.insert(
+        filter, {observer, std::make_unique<Callback>(std::move(callback))});
 
     // Register the beforeDelete slot of the module to automatically
     // remove the subscriber
@@ -37,13 +38,13 @@ void MessageBroker::subscribe(Filter filter, Module* observer,
     // &MessageBroker::onModuleDeleted);
 }
 
-void MessageBroker::unsubscribe(Filter filter, Module* module)
+void MessageBroker::unsubscribe(Filter filter, std::shared_ptr<Module> module)
 {
     auto callbackList = observers.values(filter);
 
     for (auto& elem : callbackList)
     {
-        if (elem.first == module)
+        if (elem.first.expired() || elem.first.lock().get() == module.get())
             observers.remove(filter, elem);
     }
 }
@@ -57,27 +58,49 @@ void MessageBroker::publish(const Message& message)
         // subscribers
         if (filter.match(message))
         {
-            for (auto subscriber : observers.values(filter))
+            auto filteredSubscribers = observers.values(filter);
+            for (auto subscriber = filteredSubscribers.begin();
+                 subscriber != filteredSubscribers.end();)
             {
-                auto copy = message;
-                filter.filterMessage(copy);
-                (*subscriber.second)(copy, filter);
+                if (!subscriber->first.expired())
+                {
+                    auto copy = message;
+                    filter.filterMessage(copy);
+                    (*subscriber->second)(copy, filter);
+                }
+
+                subscriber++;
             }
         }
     }
+
+    for (auto it = observers.begin(); it != observers.end();)
+    {
+        if (it.value().first.expired())
+        {
+            it = observers.erase(it);
+        }
+        else
+        {
+            it++;
+        }
+    }
 }
 
-void MessageBroker::onModuleDeleted(Module* module)
+void MessageBroker::unsubscribeFromAll(std::shared_ptr<Module> module)
 {
-    for (auto filter : observers.keys())
+    Module* moduleAddr = module.get();
+
+    for (auto it = observers.begin(); it != observers.end();)
     {
-        for (auto subscriber : observers.values(filter))
+        auto observer = it.value().first;
+        if (!observer.expired() && observer.lock().get() == moduleAddr)
         {
-            if (subscriber.first == module)
-            {
-                observers.remove(filter, subscriber);
-                delete subscriber.second;
-            }
+            it = observers.erase(it);
+        }
+        else
+        {
+            it++;
         }
     }
 }
diff --git a/src/shared/Core/MessageBroker/MessageBroker.h b/src/shared/Core/MessageBroker/MessageBroker.h
index 19471425..050167ac 100644
--- a/src/shared/Core/MessageBroker/MessageBroker.h
+++ b/src/shared/Core/MessageBroker/MessageBroker.h
@@ -34,25 +34,32 @@ public:
 
     static MessageBroker& getInstance();
 
-    void subscribe(Filter filter, Module* observer, Callback callback);
+    void subscribe(Filter filter, std::shared_ptr<Module> observer,
+                   Callback callback);
 
     /**
      * @brief Unsubscribe the module from the filter.
      */
-    void unsubscribe(Filter filter, Module* module);
+    void unsubscribe(Filter filter, std::shared_ptr<Module> module);
 
     /**
      * @brief Publish a message to all its subscribers.
      */
     void publish(const Message& message);
 
+    /**
+     * @brief this method can be used to remove modules indipendent from the
+     * filters
+     */
+    void unsubscribeFromAll(std::shared_ptr<Module> module);
+
 private:
     MessageBroker() {}
 
-    QMultiMap<Filter, QPair<Module*, Callback*>> observers;
+    QMultiMap<Filter, QPair<std::weak_ptr<Module>, std::unique_ptr<Callback>>> observers;
 
-private slots:
-    void onModuleDeleted(Module* module);
+    // private slots:
+    //     void onModuleDeleted(std::shared_ptr<Module> module);
 
     // public:
     //     MessageBroker(const MessageBroker&)            = delete;
-- 
GitLab