From 8c56481249ed1bc0b46bca3aec0c7e86495c5d3a Mon Sep 17 00:00:00 2001
From: Morph <39850852+Morph1984@users.noreply.github.com>
Date: Sun, 19 Mar 2023 14:48:01 -0400
Subject: [PATCH] bounded_threadsafe_queue: Add producer cv to avoid busy
 waiting

---
 src/common/bounded_threadsafe_queue.h | 46 +++++++++++++++++----------
 1 file changed, 29 insertions(+), 17 deletions(-)

diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h
index 975215863..0fb2f42d1 100644
--- a/src/common/bounded_threadsafe_queue.h
+++ b/src/common/bounded_threadsafe_queue.h
@@ -45,12 +45,12 @@ public:
     }
 
     void PopWait(T& t, std::stop_token stop_token) {
-        Wait(stop_token);
+        ConsumerWait(stop_token);
         Pop(t);
     }
 
     T PopWait(std::stop_token stop_token) {
-        Wait(stop_token);
+        ConsumerWait(stop_token);
         T t;
         Pop(t);
         return t;
@@ -88,9 +88,10 @@ private:
             }
         } else if constexpr (Mode == PushMode::Wait) {
             // Wait until we have free slots to write to.
-            while ((write_index - m_read_index.load()) == Capacity) {
-                std::this_thread::yield();
-            }
+            std::unique_lock lock{producer_cv_mutex};
+            producer_cv.wait(lock, [this, write_index] {
+                return (write_index - m_read_index.load()) < Capacity;
+            });
         } else {
             static_assert(Mode < PushMode::Count, "Invalid PushMode.");
         }
@@ -105,8 +106,8 @@ private:
         ++m_write_index;
 
         // Notify the consumer that we have pushed into the queue.
-        std::scoped_lock lock{cv_mutex};
-        cv.notify_one();
+        std::scoped_lock lock{consumer_cv_mutex};
+        consumer_cv.notify_one();
 
         return true;
     }
@@ -122,9 +123,10 @@ private:
             }
         } else if constexpr (Mode == PushMode::Wait) {
             // Wait until we have free slots to write to.
-            while ((write_index - m_read_index.load()) == Capacity) {
-                std::this_thread::yield();
-            }
+            std::unique_lock lock{producer_cv_mutex};
+            producer_cv.wait(lock, [this, write_index] {
+                return (write_index - m_read_index.load()) < Capacity;
+            });
         } else {
             static_assert(Mode < PushMode::Count, "Invalid PushMode.");
         }
@@ -139,8 +141,8 @@ private:
         ++m_write_index;
 
         // Notify the consumer that we have pushed into the queue.
-        std::scoped_lock lock{cv_mutex};
-        cv.notify_one();
+        std::scoped_lock lock{consumer_cv_mutex};
+        consumer_cv.notify_one();
 
         return true;
     }
@@ -161,6 +163,10 @@ private:
 
         // Increment the read index.
         ++m_read_index;
+
+        // Notify the producer that we have popped off the queue.
+        std::unique_lock lock{producer_cv_mutex};
+        producer_cv.notify_one();
     }
 
     bool Pop(T& t) {
@@ -180,12 +186,16 @@ private:
         // Increment the read index.
         ++m_read_index;
 
+        // Notify the producer that we have popped off the queue.
+        std::scoped_lock lock{producer_cv_mutex};
+        producer_cv.notify_one();
+
         return true;
     }
 
-    void Wait(std::stop_token stop_token) {
-        std::unique_lock lock{cv_mutex};
-        Common::CondvarWait(cv, lock, stop_token, [this] { return !Empty(); });
+    void ConsumerWait(std::stop_token stop_token) {
+        std::unique_lock lock{consumer_cv_mutex};
+        Common::CondvarWait(consumer_cv, lock, stop_token, [this] { return !Empty(); });
     }
 
     alignas(128) std::atomic_size_t m_read_index{0};
@@ -193,8 +203,10 @@ private:
 
     std::array<T, Capacity> m_data;
 
-    std::condition_variable_any cv;
-    std::mutex cv_mutex;
+    std::condition_variable_any producer_cv;
+    std::mutex producer_cv_mutex;
+    std::condition_variable_any consumer_cv;
+    std::mutex consumer_cv_mutex;
 };
 
 template <typename T, size_t Capacity = detail::DefaultCapacity>