From 842fbb6961b7e0260103b354b01f91140d8c3358 Mon Sep 17 00:00:00 2001 From: Alan George Date: Sun, 14 Jun 2026 18:05:00 -0400 Subject: [PATCH 1/4] Fix FfiClient listener use-after-free during Room teardown. Track each listener in a ListenerSlot with active-callback draining so removeListener and shutdown block until in-flight FFI events finish before the owning object (e.g. Room) is destroyed. Add unit tests that reproduce the reported event-vs-destruction race. Co-authored-by: Cursor --- src/ffi_client.cpp | 165 +++++++++++++++++--- src/ffi_client.h | 25 ++- src/tests/unit/test_ffi_client.cpp | 243 ++++++++++++++++++++++++++++- 3 files changed, 406 insertions(+), 27 deletions(-) diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index 0966816f..94a6af24 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -18,6 +18,7 @@ #include #include +#include #include "data_track.pb.h" #include "e2ee.pb.h" @@ -146,44 +147,134 @@ FfiClient& FfiClient::instance() noexcept { return instance; } -// clang-tidy flags this as a trivial destructor in release mode -// due to the assert being pre-processed out -// NOLINTNEXTLINE(modernize-use-equals-default) FfiClient::~FfiClient() { - assert(!initialized_.load() && - "LiveKit SDK was not shut down before process exit. " - "Call livekit::shutdown()."); + if (lifecycle_state_.load() == LifecycleState::Initialized) { + // Explicitly use this over spdlog + // spdlog can throw, and wrapping in try/catch also flags "empty catch" clang-tidy check + std::fputs( + "LiveKit SDK was not shut down before process exit. " + "Call livekit::shutdown().\n", + stderr); + std::fflush(stderr); + } } void FfiClient::shutdown() noexcept { - if (!isInitialized()) { - return; + bool dispose_ffi = false; + try { + // Atomically claim shutdown ownership; only the caller that transitions + // Initialized -> ShuttingDown may drain callbacks and dispose the FFI. + LifecycleState expected = LifecycleState::Initialized; + // Note: compare_exchange_strong transitions Initialized -> ShuttingDown + if (!lifecycle_state_.compare_exchange_strong(expected, LifecycleState::ShuttingDown, std::memory_order_acq_rel)) { + return; + } + dispose_ffi = true; + + std::vector> listeners_to_drain; + std::vector> pending_to_cancel; + { + const std::scoped_lock guard(lock_); + listeners_to_drain.reserve(listeners_.size()); + for (auto& [id, slot] : listeners_) { + (void)id; + if (slot) { + { + const std::scoped_lock slot_guard(slot->mutex); + slot->removed = true; + } + listeners_to_drain.push_back(std::move(slot)); + } + } + listeners_.clear(); + + pending_to_cancel.reserve(pending_by_id_.size()); + for (auto& [async_id, pending] : pending_by_id_) { + (void)async_id; + if (pending) { + pending_to_cancel.push_back(std::move(pending)); + } + } + pending_by_id_.clear(); + } + + for (auto& pending : pending_to_cancel) { + pending->cancel(); + } + + const auto this_thread = std::this_thread::get_id(); + for (const auto& slot : listeners_to_drain) { + std::unique_lock slot_lock(slot->mutex); + slot->cv.wait(slot_lock, [&slot, this_thread] { + const auto thread_it = slot->active_threads.find(this_thread); + const int self_active = thread_it == slot->active_threads.end() ? 0 : thread_it->second; + return slot->active_callbacks == self_active; + }); + } + + livekit_ffi_dispose(); + dispose_ffi = false; + lifecycle_state_.store(LifecycleState::Uninitialized, std::memory_order_release); + } catch (...) { + if (dispose_ffi) { + livekit_ffi_dispose(); + lifecycle_state_.store(LifecycleState::Uninitialized, std::memory_order_release); + } + (void)std::fputs("LiveKit SDK shutdown failed during local cleanup.\n", stderr); + (void)std::fflush(stderr); } - initialized_.store(false, std::memory_order_release); - livekit_ffi_dispose(); } bool FfiClient::initialize(bool capture_logs) { - if (isInitialized()) { + LifecycleState expected = LifecycleState::Uninitialized; + if (!lifecycle_state_.compare_exchange_strong(expected, LifecycleState::Initializing, std::memory_order_acq_rel)) { return false; } - initialized_.store(true, std::memory_order_release); - livekit_ffi_initialize(&ffiEventCallback, capture_logs, LIVEKIT_BUILD_FLAVOR, LIVEKIT_BUILD_VERSION); + + try { + livekit_ffi_initialize(&ffiEventCallback, capture_logs, LIVEKIT_BUILD_FLAVOR, LIVEKIT_BUILD_VERSION); + } catch (...) { + lifecycle_state_.store(LifecycleState::Uninitialized, std::memory_order_release); + throw; + } + + lifecycle_state_.store(LifecycleState::Initialized, std::memory_order_release); return true; } -bool FfiClient::isInitialized() const noexcept { return initialized_.load(std::memory_order_acquire); } +bool FfiClient::isInitialized() const noexcept { + return lifecycle_state_.load(std::memory_order_acquire) == LifecycleState::Initialized; +} FfiClient::ListenerId FfiClient::addListener(const FfiClient::Listener& listener) { const std::scoped_lock guard(lock_); + if (lifecycle_state_.load(std::memory_order_acquire) == LifecycleState::ShuttingDown) { + logAndThrow("FfiClient::addListener failed: LiveKit is shutting down"); + } const FfiClient::ListenerId id = next_listener_id++; - listeners_[id] = listener; + listeners_[id] = std::make_shared(listener); return id; } void FfiClient::removeListener(ListenerId id) { - const std::scoped_lock guard(lock_); - listeners_.erase(id); + std::shared_ptr slot; + { + const std::scoped_lock guard(lock_); + auto it = listeners_.find(id); + if (it == listeners_.end()) { + return; + } + slot = std::move(it->second); + listeners_.erase(it); + } + + const auto this_thread = std::this_thread::get_id(); + std::unique_lock slot_lock(slot->mutex); + slot->removed = true; + slot->cv.wait(slot_lock, [&slot, this_thread] { + const auto self_active = slot->active_threads.count(this_thread) != 0; + return slot->active_callbacks == 0 || (self_active && slot->active_callbacks == 1); + }); } proto::FfiResponse FfiClient::sendRequest(const proto::FfiRequest& request) const { @@ -221,9 +312,12 @@ proto::FfiResponse FfiClient::sendRequest(const proto::FfiRequest& request) cons void FfiClient::pushEvent(const proto::FfiEvent& event) const { std::unique_ptr to_complete; - std::vector listeners_copy; + std::vector> listeners_copy; { const std::scoped_lock guard(lock_); + if (lifecycle_state_.load(std::memory_order_acquire) != LifecycleState::Initialized) { + return; + } // Complete pending future if this event is a callback with async_id if (auto async_id = ExtractAsyncId(event)) { @@ -246,8 +340,39 @@ void FfiClient::pushEvent(const proto::FfiEvent& event) const { } // Notify listeners outside lock - for (auto& listener : listeners_copy) { - listener(event); + for (const auto& slot : listeners_copy) { + Listener listener; + const auto this_thread = std::this_thread::get_id(); + { + const std::scoped_lock slot_guard(slot->mutex); + if (slot->removed) { + continue; + } + ++slot->active_callbacks; + ++slot->active_threads[this_thread]; + listener = slot->listener; + } + + try { + listener(event); + } catch (const std::exception& e) { + LK_LOG_ERROR("FfiClient listener threw: {}", e.what()); + } catch (...) { + LK_LOG_ERROR("FfiClient listener threw: unknown exception"); + } + + { + const std::scoped_lock slot_guard(slot->mutex); + const auto thread_it = slot->active_threads.find(this_thread); + if (thread_it != slot->active_threads.end()) { + --thread_it->second; + if (thread_it->second == 0) { + slot->active_threads.erase(thread_it); + } + } + --slot->active_callbacks; + } + slot->cv.notify_all(); } } diff --git a/src/ffi_client.h b/src/ffi_client.h index 5ea0a89a..e6f35d83 100644 --- a/src/ffi_client.h +++ b/src/ffi_client.h @@ -17,6 +17,7 @@ #pragma once #include +#include #include #include #include @@ -24,7 +25,9 @@ #include #include #include +#include #include +#include #include "data_track.pb.h" #include "livekit/data_track_error.h" @@ -147,6 +150,13 @@ class LIVEKIT_INTERNAL_API FfiClient { private: FfiClient() = default; + enum class LifecycleState : std::uint8_t { + Uninitialized, + Initializing, + Initialized, + ShuttingDown, + }; + // Base class for type-erased pending ops struct PendingBase { AsyncId async_id = 0; // Client-generated async ID for cancellation @@ -176,6 +186,17 @@ class LIVEKIT_INTERNAL_API FfiClient { } }; + struct ListenerSlot { + explicit ListenerSlot(Listener cb) : listener(std::move(cb)) {} + + Listener listener; + std::mutex mutex; + std::condition_variable cv; + std::unordered_map active_threads; + int active_callbacks = 0; + bool removed = false; + }; + template std::future registerAsync(AsyncId async_id, std::function match, std::function&)> handler); @@ -187,7 +208,7 @@ class LIVEKIT_INTERNAL_API FfiClient { // removed. bool cancelPendingByAsyncId(AsyncId async_id); - std::unordered_map listeners_; + std::unordered_map> listeners_; std::atomic next_listener_id{1}; mutable std::mutex lock_; mutable std::unordered_map> pending_by_id_; @@ -195,6 +216,6 @@ class LIVEKIT_INTERNAL_API FfiClient { void pushEvent(const proto::FfiEvent& event) const; friend void ffiEventCallback(const uint8_t* buf, size_t len); - std::atomic initialized_{false}; + std::atomic lifecycle_state_{LifecycleState::Uninitialized}; }; } // namespace livekit diff --git a/src/tests/unit/test_ffi_client.cpp b/src/tests/unit/test_ffi_client.cpp index f6982ebb..882356d7 100644 --- a/src/tests/unit/test_ffi_client.cpp +++ b/src/tests/unit/test_ffi_client.cpp @@ -17,9 +17,13 @@ #include #include +#include +#include #include +#include #include #include +#include #include #include "ffi.pb.h" @@ -38,6 +42,76 @@ void handleSignal(int signal) { } } +void emitLogEvent() { + proto::FfiEvent event; + auto* record = event.mutable_logs()->add_records(); + record->set_level(proto::LOG_INFO); + record->set_target("test"); + record->set_message("listener event"); + + std::string bytes; + ASSERT_TRUE(event.SerializeToString(&bytes)); + ffiEventCallback(reinterpret_cast(bytes.data()), bytes.size()); +} + +// Minimal stand-in for Room that mirrors its relationship to FfiClient: +// - it registers an FFI listener whose callback dereferences `this` +// (like Room's `[this](const FfiEvent& e){ onEvent(e); }`), and +// - it tears that listener down in its destructor +// (like ~Room -> disconnect -> removeListener). +// +// This is the object the user's bug report is about: if the FFI thread is +// dispatching an event into the listener while the object is destroyed, +// the callback must never touch freed memory. `magic_` is a liveness +// sentinel so a use-after-free is observable even without a sanitizer. +class FakeRoom { +public: + static constexpr std::uint32_t kAlive = 0xA11ECAFEU; + static constexpr std::uint32_t kDead = 0xDEADBEEFU; + + FakeRoom() { + listener_id_ = FfiClient::instance().addListener([this](const proto::FfiEvent& e) { onEvent(e); }); + } + + ~FakeRoom() { + // Mirror ~Room: removeListener() blocks until any in-flight callback + // for this listener finishes, so onEvent() below can never run against + // a destroyed FakeRoom. + FfiClient::instance().removeListener(listener_id_); + magic_ = kDead; + } + + FakeRoom(const FakeRoom&) = delete; + FakeRoom& operator=(const FakeRoom&) = delete; + FakeRoom(FakeRoom&&) = delete; + FakeRoom& operator=(FakeRoom&&) = delete; + + void setOnEntered(std::function fn) { on_entered_ = std::move(fn); } + void setReleaseGate(std::shared_future gate) { gate_ = std::move(gate); } + int events() const { return events_.load(); } + +private: + void onEvent(const proto::FfiEvent&) { + // If `this` were freed mid-dispatch, these reads would observe kDead or + // garbage (and trip ASan); the listener handshake must keep us alive. + EXPECT_EQ(magic_, kAlive) << "onEvent ran against a destroyed FakeRoom (use-after-free)"; + if (on_entered_) { + on_entered_(); + } + if (gate_.valid()) { + gate_.wait(); + } + EXPECT_EQ(magic_, kAlive) << "FakeRoom freed while onEvent was still running"; + ++events_; + } + + std::uint32_t magic_ = kAlive; + FfiClient::ListenerId listener_id_ = 0; + std::function on_entered_; + std::shared_future gate_; + std::atomic events_{0}; +}; + } // namespace class FfiClientTest : public ::testing::Test { @@ -144,15 +218,174 @@ TEST_F(FfiClientTest, RemoveListenerIsIdempotent) { EXPECT_NO_THROW(FfiClient::instance().removeListener(id)); } -TEST_F(FfiClientTest, ListenerRegistrationSurvivesShutdownReinitCycle) { +TEST_F(FfiClientTest, ShutdownClearsListenerRegistrations) { FfiClient::instance().initialize(false); - const auto id = FfiClient::instance().addListener([](const proto::FfiEvent&) {}); + std::atomic listener_calls{0}; + const auto id = FfiClient::instance().addListener([&listener_calls](const proto::FfiEvent&) { ++listener_calls; }); EXPECT_NE(id, 0); - // shutdown() does not clear the C++-side listener map today; document that - // contract here so a future refactor that changes it is a deliberate choice. FfiClient::instance().shutdown(); - EXPECT_NO_THROW(FfiClient::instance().removeListener(id)); + ASSERT_FALSE(FfiClient::instance().isInitialized()); + + ASSERT_TRUE(FfiClient::instance().initialize(false)); + emitLogEvent(); + EXPECT_EQ(listener_calls.load(), 0); +} + +TEST_F(FfiClientTest, RemoveListenerWaitsForInFlightCallback) { + ASSERT_TRUE(FfiClient::instance().initialize(false)); + + std::promise callback_entered; + auto callback_entered_future = callback_entered.get_future(); + std::promise release_callback; + auto release_callback_future = release_callback.get_future(); + std::atomic callback_completed{false}; + + const auto id = FfiClient::instance().addListener([&](const proto::FfiEvent&) { + callback_entered.set_value(); + release_callback_future.wait(); + callback_completed.store(true); + }); + + std::thread callback_thread([] { emitLogEvent(); }); + ASSERT_EQ(callback_entered_future.wait_for(std::chrono::seconds(5)), std::future_status::ready); + + auto remove_future = std::async(std::launch::async, [&] { FfiClient::instance().removeListener(id); }); + EXPECT_EQ(remove_future.wait_for(std::chrono::milliseconds(50)), std::future_status::timeout); + EXPECT_FALSE(callback_completed.load()); + + release_callback.set_value(); + callback_thread.join(); + + EXPECT_EQ(remove_future.wait_for(std::chrono::seconds(5)), std::future_status::ready); + EXPECT_TRUE(callback_completed.load()); +} + +// Reproduces the reported "Room event vs. Room destruction" race: the FFI +// thread is inside the listener callback (dereferencing `this`) at the exact +// moment the owning object is destroyed on another thread. ~FakeRoom() -> +// removeListener() must block until the in-flight callback returns, so the +// callback never touches freed memory. Without the ListenerSlot handshake the +// destroy thread would free the FakeRoom while onEvent() is still running. +TEST_F(FfiClientTest, RoomEventRoomDestructionRace) { + ASSERT_TRUE(FfiClient::instance().initialize(false)); + + std::promise callback_entered; + auto callback_entered_future = callback_entered.get_future(); + std::promise release_callback; + const std::shared_future release_callback_future = release_callback.get_future().share(); + std::atomic entered_once{false}; + + auto room = std::make_unique(); + room->setReleaseGate(release_callback_future); + room->setOnEntered([&] { + if (!entered_once.exchange(true)) { + callback_entered.set_value(); + } + }); + + // FFI thread dispatches an event; FakeRoom::onEvent is now parked inside the + // callback holding `this`, waiting on the release gate. + std::thread ffi_thread([] { emitLogEvent(); }); + ASSERT_EQ(callback_entered_future.wait_for(std::chrono::seconds(5)), std::future_status::ready); + + // Destroy the owner on a different thread while the callback is in flight. + std::atomic destroyed{false}; + std::thread destroy_thread([&] { + room.reset(); + destroyed.store(true); + }); + + // The destructor (removeListener) must block while the callback holds the + // slot; the FakeRoom must still be alive. + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + EXPECT_FALSE(destroyed.load()) << "destruction completed while a callback was still running"; + + // Let the callback finish; destruction should now unblock and complete. + release_callback.set_value(); + ffi_thread.join(); + destroy_thread.join(); + EXPECT_TRUE(destroyed.load()); +} + +// Same race exercised under contention: repeatedly create a FakeRoom while a +// background thread floods events, then destroy it. The destroy can land +// before, during, or after dispatch, sweeping the (A) copy-pointer / (B) +// invoke-onEvent window the report describes. Any use-after-free trips the +// magic-sentinel assertions in FakeRoom::onEvent (and ASan, if enabled). +TEST_F(FfiClientTest, ConcurrentEventAndOwnerDestructionStressIsSafe) { + ASSERT_TRUE(FfiClient::instance().initialize(false)); + + std::atomic stop{false}; + std::thread emitter([&] { + while (!stop.load(std::memory_order_relaxed)) { + emitLogEvent(); + } + }); + + constexpr int kIterations = 500; + for (int i = 0; i < kIterations; ++i) { + auto room = std::make_unique(); + // Give the emitter a chance to dispatch into this listener before we tear + // it down, so destruction races against an active/just-finishing callback. + std::this_thread::yield(); + room.reset(); // ~FakeRoom -> removeListener must drain safely. + } + + stop.store(true, std::memory_order_relaxed); + emitter.join(); +} + +TEST_F(FfiClientTest, ShutdownFromListenerDoesNotDeadlock) { + ASSERT_TRUE(FfiClient::instance().initialize(false)); + + std::atomic shutdown_returned{false}; + const auto id = FfiClient::instance().addListener([&shutdown_returned](const proto::FfiEvent&) { + FfiClient::instance().shutdown(); + shutdown_returned.store(true); + }); + ASSERT_NE(id, 0); + + auto callback_future = std::async(std::launch::async, [] { emitLogEvent(); }); + EXPECT_EQ(callback_future.wait_for(std::chrono::seconds(5)), std::future_status::ready); + EXPECT_TRUE(shutdown_returned.load()); + EXPECT_FALSE(FfiClient::instance().isInitialized()); +} + +TEST_F(FfiClientTest, ShutdownRejectsReinitializeAndDropsNewEventsWhileDraining) { + ASSERT_TRUE(FfiClient::instance().initialize(false)); + + std::promise callback_entered; + auto callback_entered_future = callback_entered.get_future(); + std::promise release_callback; + auto release_callback_future = release_callback.get_future(); + std::atomic listener_calls{0}; + + const auto id = FfiClient::instance().addListener([&](const proto::FfiEvent&) { + ++listener_calls; + callback_entered.set_value(); + release_callback_future.wait(); + }); + ASSERT_NE(id, 0); + + std::thread callback_thread([] { emitLogEvent(); }); + ASSERT_EQ(callback_entered_future.wait_for(std::chrono::seconds(5)), std::future_status::ready); + + auto shutdown_future = std::async(std::launch::async, [] { FfiClient::instance().shutdown(); }); + for (int i = 0; i < 5000 && FfiClient::instance().isInitialized(); ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + ASSERT_FALSE(FfiClient::instance().isInitialized()); + EXPECT_EQ(shutdown_future.wait_for(std::chrono::milliseconds(50)), std::future_status::timeout); + EXPECT_FALSE(FfiClient::instance().initialize(false)); + + emitLogEvent(); + EXPECT_EQ(listener_calls.load(), 1); + + release_callback.set_value(); + callback_thread.join(); + EXPECT_EQ(shutdown_future.wait_for(std::chrono::seconds(5)), std::future_status::ready); + EXPECT_FALSE(FfiClient::instance().isInitialized()); } TEST_F(FfiClientTest, PanicEvent) { From 6d685dbc640e5a1d994acec6b69545a6ea4c43e5 Mon Sep 17 00:00:00 2001 From: Alan George Date: Sun, 14 Jun 2026 19:40:13 -0400 Subject: [PATCH 2/4] Additional comments from self-review --- src/ffi_client.cpp | 25 ++++++++++++++++++------- src/ffi_client.h | 12 ++++++++++++ 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index 94a6af24..2ba85ea9 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -151,10 +151,7 @@ FfiClient::~FfiClient() { if (lifecycle_state_.load() == LifecycleState::Initialized) { // Explicitly use this over spdlog // spdlog can throw, and wrapping in try/catch also flags "empty catch" clang-tidy check - std::fputs( - "LiveKit SDK was not shut down before process exit. " - "Call livekit::shutdown().\n", - stderr); + std::fputs("[livekit] [warning] SDK was not shut down before process exit. Use livekit::shutdown()\n", stderr); std::fflush(stderr); } } @@ -165,10 +162,12 @@ void FfiClient::shutdown() noexcept { // Atomically claim shutdown ownership; only the caller that transitions // Initialized -> ShuttingDown may drain callbacks and dispose the FFI. LifecycleState expected = LifecycleState::Initialized; - // Note: compare_exchange_strong transitions Initialized -> ShuttingDown if (!lifecycle_state_.compare_exchange_strong(expected, LifecycleState::ShuttingDown, std::memory_order_acq_rel)) { + // If not Initialized, return early to avoid unnecessary cleanup return; } + + // Initialized, proceed with cleanup dispose_ffi = true; std::vector> listeners_to_drain; @@ -179,15 +178,18 @@ void FfiClient::shutdown() noexcept { for (auto& [id, slot] : listeners_) { (void)id; if (slot) { + // Mark the listener as removed to prevent race conditions { const std::scoped_lock slot_guard(slot->mutex); slot->removed = true; } + // Add the listener to the list of listeners to drain listeners_to_drain.push_back(std::move(slot)); } } listeners_.clear(); + // Add the pending operations to the list of pending operations to cancel pending_to_cancel.reserve(pending_by_id_.size()); for (auto& [async_id, pending] : pending_by_id_) { (void)async_id; @@ -198,13 +200,19 @@ void FfiClient::shutdown() noexcept { pending_by_id_.clear(); } + // Cancel the pending operations for (auto& pending : pending_to_cancel) { pending->cancel(); } const auto this_thread = std::this_thread::get_id(); + // Wait for all in-flight listener callbacks to complete for (const auto& slot : listeners_to_drain) { std::unique_lock slot_lock(slot->mutex); + + // When shutdown() isn't on a listener thread, self_active is 0 and we wait for active_callbacks == 0. When it's + // called from inside a listener (e.g. ShutdownFromListenerDoesNotDeadlock), self_active is 1 and the wait + // succeeds immediately with active_callbacks == 1, so we don't wait on our own in-flight callback. slot->cv.wait(slot_lock, [&slot, this_thread] { const auto thread_it = slot->active_threads.find(this_thread); const int self_active = thread_it == slot->active_threads.end() ? 0 : thread_it->second; @@ -215,12 +223,15 @@ void FfiClient::shutdown() noexcept { livekit_ffi_dispose(); dispose_ffi = false; lifecycle_state_.store(LifecycleState::Uninitialized, std::memory_order_release); - } catch (...) { + } catch (const std::exception& e) { if (dispose_ffi) { livekit_ffi_dispose(); lifecycle_state_.store(LifecycleState::Uninitialized, std::memory_order_release); } - (void)std::fputs("LiveKit SDK shutdown failed during local cleanup.\n", stderr); + // Explicitly use this over spdlog (method is noexcept) + (void)std::fputs("[livekit] [error] SDK shutdown failed during local cleanup: ", stderr); + (void)std::fputs(e.what(), stderr); + (void)std::fputs("\n", stderr); (void)std::fflush(stderr); } } diff --git a/src/ffi_client.h b/src/ffi_client.h index e6f35d83..db252932 100644 --- a/src/ffi_client.h +++ b/src/ffi_client.h @@ -186,14 +186,22 @@ class LIVEKIT_INTERNAL_API FfiClient { } }; + /// Additional data structure to track listener callbacks and their state. + /// This is used to coordinate the FFI thread and the app thread, and prevent race conditions/use-after-free scenarios struct ListenerSlot { explicit ListenerSlot(Listener cb) : listener(std::move(cb)) {} + /// The user-provided listener callback Listener listener; + /// Mutex to protect the listener slot std::mutex mutex; + /// Condition variable to wait for the listener to finish std::condition_variable cv; + /// Map of thread IDs to the number of active threads std::unordered_map active_threads; + /// Number of active callbacks int active_callbacks = 0; + /// Whether the listener has been removed (used for race mitigation before removal) bool removed = false; }; @@ -208,10 +216,14 @@ class LIVEKIT_INTERNAL_API FfiClient { // removed. bool cancelPendingByAsyncId(AsyncId async_id); + /// Map of listener IDs to listener slots std::unordered_map> listeners_; + /// Next listener ID to generate std::atomic next_listener_id{1}; mutable std::mutex lock_; + /// Map of async IDs to pending operations mutable std::unordered_map> pending_by_id_; + /// Next async ID to generate std::atomic next_async_id_{1}; void pushEvent(const proto::FfiEvent& event) const; From 3d5abab09d1ce6fb93974ad4e813d1db016f51ba Mon Sep 17 00:00:00 2001 From: Alan George Date: Sun, 14 Jun 2026 20:58:20 -0400 Subject: [PATCH 3/4] Additional cleanup --- src/ffi_client.cpp | 30 +++++++++++++++--------------- src/ffi_client.h | 2 ++ src/tests/unit/test_ffi_client.cpp | 4 ++-- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index 2ba85ea9..124fe083 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -157,7 +157,9 @@ FfiClient::~FfiClient() { } void FfiClient::shutdown() noexcept { - bool dispose_ffi = false; + // Don't use string to avoid exceptions + // (Also cleaner with exception.what() and printing) + const char* shutdown_error = nullptr; try { // Atomically claim shutdown ownership; only the caller that transitions // Initialized -> ShuttingDown may drain callbacks and dispose the FFI. @@ -167,9 +169,6 @@ void FfiClient::shutdown() noexcept { return; } - // Initialized, proceed with cleanup - dispose_ffi = true; - std::vector> listeners_to_drain; std::vector> pending_to_cancel; { @@ -211,26 +210,27 @@ void FfiClient::shutdown() noexcept { std::unique_lock slot_lock(slot->mutex); // When shutdown() isn't on a listener thread, self_active is 0 and we wait for active_callbacks == 0. When it's - // called from inside a listener (e.g. ShutdownFromListenerDoesNotDeadlock), self_active is 1 and the wait - // succeeds immediately with active_callbacks == 1, so we don't wait on our own in-flight callback. + // called from inside a listener, self_active is 1 and the wait succeeds immediately with active_callbacks == 1, + // so we don't wait on our own in-flight callback slot->cv.wait(slot_lock, [&slot, this_thread] { const auto thread_it = slot->active_threads.find(this_thread); const int self_active = thread_it == slot->active_threads.end() ? 0 : thread_it->second; return slot->active_callbacks == self_active; }); } - - livekit_ffi_dispose(); - dispose_ffi = false; - lifecycle_state_.store(LifecycleState::Uninitialized, std::memory_order_release); } catch (const std::exception& e) { - if (dispose_ffi) { - livekit_ffi_dispose(); - lifecycle_state_.store(LifecycleState::Uninitialized, std::memory_order_release); - } + shutdown_error = e.what(); + } catch (...) { + shutdown_error = "unknown exception"; + } + + livekit_ffi_dispose(); + lifecycle_state_.store(LifecycleState::Uninitialized, std::memory_order_release); + + if (shutdown_error != nullptr) { // Explicitly use this over spdlog (method is noexcept) (void)std::fputs("[livekit] [error] SDK shutdown failed during local cleanup: ", stderr); - (void)std::fputs(e.what(), stderr); + (void)std::fputs(shutdown_error, stderr); (void)std::fputs("\n", stderr); (void)std::fflush(stderr); } diff --git a/src/ffi_client.h b/src/ffi_client.h index db252932..9dc840e4 100644 --- a/src/ffi_client.h +++ b/src/ffi_client.h @@ -150,6 +150,8 @@ class LIVEKIT_INTERNAL_API FfiClient { private: FfiClient() = default; + /// Lifecycle state of the FfiClient + /// This is used to prevent race conditions/use-after-free scenarios enum class LifecycleState : std::uint8_t { Uninitialized, Initializing, diff --git a/src/tests/unit/test_ffi_client.cpp b/src/tests/unit/test_ffi_client.cpp index 882356d7..8eeba153 100644 --- a/src/tests/unit/test_ffi_client.cpp +++ b/src/tests/unit/test_ffi_client.cpp @@ -267,7 +267,7 @@ TEST_F(FfiClientTest, RemoveListenerWaitsForInFlightCallback) { // removeListener() must block until the in-flight callback returns, so the // callback never touches freed memory. Without the ListenerSlot handshake the // destroy thread would free the FakeRoom while onEvent() is still running. -TEST_F(FfiClientTest, RoomEventRoomDestructionRace) { +TEST_F(FfiClientTest, RoomDestructionRace) { ASSERT_TRUE(FfiClient::instance().initialize(false)); std::promise callback_entered; @@ -313,7 +313,7 @@ TEST_F(FfiClientTest, RoomEventRoomDestructionRace) { // before, during, or after dispatch, sweeping the (A) copy-pointer / (B) // invoke-onEvent window the report describes. Any use-after-free trips the // magic-sentinel assertions in FakeRoom::onEvent (and ASan, if enabled). -TEST_F(FfiClientTest, ConcurrentEventAndOwnerDestructionStressIsSafe) { +TEST_F(FfiClientTest, RoomDestructionRaceFloodEvents) { ASSERT_TRUE(FfiClient::instance().initialize(false)); std::atomic stop{false}; From 4c2a7b044b3eddbe2d41ef14a5ffe5e7cf6368e0 Mon Sep 17 00:00:00 2001 From: Alan George Date: Mon, 15 Jun 2026 13:52:51 -0600 Subject: [PATCH 4/4] Address PR feedback --- src/ffi_client.cpp | 14 ++++++++----- src/tests/unit/test_ffi_client.cpp | 33 +++++++++++++++++------------- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index 124fe083..1b3aeb58 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -149,8 +149,8 @@ FfiClient& FfiClient::instance() noexcept { FfiClient::~FfiClient() { if (lifecycle_state_.load() == LifecycleState::Initialized) { - // Explicitly use this over spdlog - // spdlog can throw, and wrapping in try/catch also flags "empty catch" clang-tidy check + // Explicitly use this over spdlog/std::cerr which can throw + // Wrapping spdlog try/catch also flags "empty catch" clang-tidy check std::fputs("[livekit] [warning] SDK was not shut down before process exit. Use livekit::shutdown()\n", stderr); std::fflush(stderr); } @@ -161,11 +161,13 @@ void FfiClient::shutdown() noexcept { // (Also cleaner with exception.what() and printing) const char* shutdown_error = nullptr; try { - // Atomically claim shutdown ownership; only the caller that transitions - // Initialized -> ShuttingDown may drain callbacks and dispose the FFI. + // compare_exchange_strong atomically claims Initialized -> ShuttingDown so only one + // concurrent shutdown() drains listeners and disposes the FFI. LifecycleState expected = LifecycleState::Initialized; if (!lifecycle_state_.compare_exchange_strong(expected, LifecycleState::ShuttingDown, std::memory_order_acq_rel)) { // If not Initialized, return early to avoid unnecessary cleanup + std::fputs("[livekit] [warning] SDK was shutdown while not initialized\n", stderr); + std::fflush(stderr); return; } @@ -281,11 +283,11 @@ void FfiClient::removeListener(ListenerId id) { const auto this_thread = std::this_thread::get_id(); std::unique_lock slot_lock(slot->mutex); - slot->removed = true; slot->cv.wait(slot_lock, [&slot, this_thread] { const auto self_active = slot->active_threads.count(this_thread) != 0; return slot->active_callbacks == 0 || (self_active && slot->active_callbacks == 1); }); + slot->removed = true; } proto::FfiResponse FfiClient::sendRequest(const proto::FfiRequest& request) const { @@ -383,6 +385,8 @@ void FfiClient::pushEvent(const proto::FfiEvent& event) const { } --slot->active_callbacks; } + + // Notify in case this listener was marked for removal during the callback (will be waiting on this) slot->cv.notify_all(); } } diff --git a/src/tests/unit/test_ffi_client.cpp b/src/tests/unit/test_ffi_client.cpp index 8eeba153..ae17c599 100644 --- a/src/tests/unit/test_ffi_client.cpp +++ b/src/tests/unit/test_ffi_client.cpp @@ -35,6 +35,10 @@ namespace { volatile bool g_sigterm_received = false; +// Waits for listener entry or drain completion should finish in milliseconds +// This is a generous anti-hang bound for CI thread scheduling, not expected latency +constexpr auto kListenerSyncTimeout = std::chrono::seconds(5); + // Has to be registered globally per csignal API void handleSignal(int signal) { if (signal == SIGTERM) { @@ -42,7 +46,8 @@ void handleSignal(int signal) { } } -void emitLogEvent() { +// Simple helper to emit a test event +void emitEvent() { proto::FfiEvent event; auto* record = event.mutable_logs()->add_records(); record->set_level(proto::LOG_INFO); @@ -228,7 +233,7 @@ TEST_F(FfiClientTest, ShutdownClearsListenerRegistrations) { ASSERT_FALSE(FfiClient::instance().isInitialized()); ASSERT_TRUE(FfiClient::instance().initialize(false)); - emitLogEvent(); + emitEvent(); EXPECT_EQ(listener_calls.load(), 0); } @@ -247,8 +252,8 @@ TEST_F(FfiClientTest, RemoveListenerWaitsForInFlightCallback) { callback_completed.store(true); }); - std::thread callback_thread([] { emitLogEvent(); }); - ASSERT_EQ(callback_entered_future.wait_for(std::chrono::seconds(5)), std::future_status::ready); + std::thread callback_thread([] { emitEvent(); }); + ASSERT_EQ(callback_entered_future.wait_for(kListenerSyncTimeout), std::future_status::ready); auto remove_future = std::async(std::launch::async, [&] { FfiClient::instance().removeListener(id); }); EXPECT_EQ(remove_future.wait_for(std::chrono::milliseconds(50)), std::future_status::timeout); @@ -257,7 +262,7 @@ TEST_F(FfiClientTest, RemoveListenerWaitsForInFlightCallback) { release_callback.set_value(); callback_thread.join(); - EXPECT_EQ(remove_future.wait_for(std::chrono::seconds(5)), std::future_status::ready); + EXPECT_EQ(remove_future.wait_for(kListenerSyncTimeout), std::future_status::ready); EXPECT_TRUE(callback_completed.load()); } @@ -286,8 +291,8 @@ TEST_F(FfiClientTest, RoomDestructionRace) { // FFI thread dispatches an event; FakeRoom::onEvent is now parked inside the // callback holding `this`, waiting on the release gate. - std::thread ffi_thread([] { emitLogEvent(); }); - ASSERT_EQ(callback_entered_future.wait_for(std::chrono::seconds(5)), std::future_status::ready); + std::thread ffi_thread([] { emitEvent(); }); + ASSERT_EQ(callback_entered_future.wait_for(kListenerSyncTimeout), std::future_status::ready); // Destroy the owner on a different thread while the callback is in flight. std::atomic destroyed{false}; @@ -319,7 +324,7 @@ TEST_F(FfiClientTest, RoomDestructionRaceFloodEvents) { std::atomic stop{false}; std::thread emitter([&] { while (!stop.load(std::memory_order_relaxed)) { - emitLogEvent(); + emitEvent(); } }); @@ -346,8 +351,8 @@ TEST_F(FfiClientTest, ShutdownFromListenerDoesNotDeadlock) { }); ASSERT_NE(id, 0); - auto callback_future = std::async(std::launch::async, [] { emitLogEvent(); }); - EXPECT_EQ(callback_future.wait_for(std::chrono::seconds(5)), std::future_status::ready); + auto callback_future = std::async(std::launch::async, [] { emitEvent(); }); + EXPECT_EQ(callback_future.wait_for(kListenerSyncTimeout), std::future_status::ready); EXPECT_TRUE(shutdown_returned.load()); EXPECT_FALSE(FfiClient::instance().isInitialized()); } @@ -368,8 +373,8 @@ TEST_F(FfiClientTest, ShutdownRejectsReinitializeAndDropsNewEventsWhileDraining) }); ASSERT_NE(id, 0); - std::thread callback_thread([] { emitLogEvent(); }); - ASSERT_EQ(callback_entered_future.wait_for(std::chrono::seconds(5)), std::future_status::ready); + std::thread callback_thread([] { emitEvent(); }); + ASSERT_EQ(callback_entered_future.wait_for(kListenerSyncTimeout), std::future_status::ready); auto shutdown_future = std::async(std::launch::async, [] { FfiClient::instance().shutdown(); }); for (int i = 0; i < 5000 && FfiClient::instance().isInitialized(); ++i) { @@ -379,12 +384,12 @@ TEST_F(FfiClientTest, ShutdownRejectsReinitializeAndDropsNewEventsWhileDraining) EXPECT_EQ(shutdown_future.wait_for(std::chrono::milliseconds(50)), std::future_status::timeout); EXPECT_FALSE(FfiClient::instance().initialize(false)); - emitLogEvent(); + emitEvent(); EXPECT_EQ(listener_calls.load(), 1); release_callback.set_value(); callback_thread.join(); - EXPECT_EQ(shutdown_future.wait_for(std::chrono::seconds(5)), std::future_status::ready); + EXPECT_EQ(shutdown_future.wait_for(kListenerSyncTimeout), std::future_status::ready); EXPECT_FALSE(FfiClient::instance().isInitialized()); }