Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 159 additions & 19 deletions src/ffi_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <cassert>
#include <csignal>
#include <cstdio>

#include "data_track.pb.h"
#include "e2ee.pb.h"
Expand Down Expand Up @@ -146,44 +147,147 @@ FfiClient& FfiClient::instance() noexcept {
return instance;
}

// clang-tidy flags this as a trivial destructor in release mode
// due to the assert being pre-processed out
// NOLINTNEXTLINE(modernize-use-equals-default)
FfiClient::~FfiClient() {
assert(!initialized_.load() &&
"LiveKit SDK was not shut down before process exit. "
"Call livekit::shutdown().");
if (lifecycle_state_.load() == LifecycleState::Initialized) {
// Explicitly use this over spdlog/std::cerr which can throw
// Wrapping spdlog try/catch also flags "empty catch" clang-tidy check
std::fputs("[livekit] [warning] SDK was not shut down before process exit. Use livekit::shutdown()\n", stderr);
Comment thread
stephen-derosa marked this conversation as resolved.
std::fflush(stderr);
}
}

void FfiClient::shutdown() noexcept {
if (!isInitialized()) {
return;
// Don't use string to avoid exceptions
// (Also cleaner with exception.what() and printing)
const char* shutdown_error = nullptr;
try {
// compare_exchange_strong atomically claims Initialized -> ShuttingDown so only one
// concurrent shutdown() drains listeners and disposes the FFI.
LifecycleState expected = LifecycleState::Initialized;
if (!lifecycle_state_.compare_exchange_strong(expected, LifecycleState::ShuttingDown, std::memory_order_acq_rel)) {
// If not Initialized, return early to avoid unnecessary cleanup
Comment thread
stephen-derosa marked this conversation as resolved.
std::fputs("[livekit] [warning] SDK was shutdown while not initialized\n", stderr);
std::fflush(stderr);
return;
}

std::vector<std::shared_ptr<ListenerSlot>> listeners_to_drain;
std::vector<std::unique_ptr<PendingBase>> pending_to_cancel;
{
const std::scoped_lock<std::mutex> guard(lock_);
listeners_to_drain.reserve(listeners_.size());
for (auto& [id, slot] : listeners_) {
(void)id;
if (slot) {
// Mark the listener as removed to prevent race conditions
{
const std::scoped_lock<std::mutex> slot_guard(slot->mutex);
slot->removed = true;
}
// Add the listener to the list of listeners to drain
listeners_to_drain.push_back(std::move(slot));
}
}
listeners_.clear();

// Add the pending operations to the list of pending operations to cancel
pending_to_cancel.reserve(pending_by_id_.size());
for (auto& [async_id, pending] : pending_by_id_) {
(void)async_id;
if (pending) {
pending_to_cancel.push_back(std::move(pending));
}
}
pending_by_id_.clear();
}

// Cancel the pending operations
for (auto& pending : pending_to_cancel) {
pending->cancel();
}

const auto this_thread = std::this_thread::get_id();
// Wait for all in-flight listener callbacks to complete
for (const auto& slot : listeners_to_drain) {
std::unique_lock<std::mutex> slot_lock(slot->mutex);

// When shutdown() isn't on a listener thread, self_active is 0 and we wait for active_callbacks == 0. When it's
// called from inside a listener, self_active is 1 and the wait succeeds immediately with active_callbacks == 1,
// so we don't wait on our own in-flight callback
slot->cv.wait(slot_lock, [&slot, this_thread] {
const auto thread_it = slot->active_threads.find(this_thread);
const int self_active = thread_it == slot->active_threads.end() ? 0 : thread_it->second;
return slot->active_callbacks == self_active;
});
}
} catch (const std::exception& e) {
shutdown_error = e.what();
} catch (...) {
shutdown_error = "unknown exception";
}
initialized_.store(false, std::memory_order_release);

livekit_ffi_dispose();
lifecycle_state_.store(LifecycleState::Uninitialized, std::memory_order_release);

if (shutdown_error != nullptr) {
// Explicitly use this over spdlog (method is noexcept)
(void)std::fputs("[livekit] [error] SDK shutdown failed during local cleanup: ", stderr);
(void)std::fputs(shutdown_error, stderr);
(void)std::fputs("\n", stderr);
(void)std::fflush(stderr);
}
}

bool FfiClient::initialize(bool capture_logs) {
if (isInitialized()) {
LifecycleState expected = LifecycleState::Uninitialized;
if (!lifecycle_state_.compare_exchange_strong(expected, LifecycleState::Initializing, std::memory_order_acq_rel)) {
Comment thread
stephen-derosa marked this conversation as resolved.
Comment thread
stephen-derosa marked this conversation as resolved.
return false;
}
initialized_.store(true, std::memory_order_release);
livekit_ffi_initialize(&ffiEventCallback, capture_logs, LIVEKIT_BUILD_FLAVOR, LIVEKIT_BUILD_VERSION);

try {
livekit_ffi_initialize(&ffiEventCallback, capture_logs, LIVEKIT_BUILD_FLAVOR, LIVEKIT_BUILD_VERSION);
} catch (...) {
lifecycle_state_.store(LifecycleState::Uninitialized, std::memory_order_release);
throw;
}

lifecycle_state_.store(LifecycleState::Initialized, std::memory_order_release);
return true;
}

bool FfiClient::isInitialized() const noexcept { return initialized_.load(std::memory_order_acquire); }
bool FfiClient::isInitialized() const noexcept {
return lifecycle_state_.load(std::memory_order_acquire) == LifecycleState::Initialized;
}

FfiClient::ListenerId FfiClient::addListener(const FfiClient::Listener& listener) {
const std::scoped_lock<std::mutex> guard(lock_);
if (lifecycle_state_.load(std::memory_order_acquire) == LifecycleState::ShuttingDown) {
logAndThrow("FfiClient::addListener failed: LiveKit is shutting down");
}
const FfiClient::ListenerId id = next_listener_id++;
listeners_[id] = listener;
listeners_[id] = std::make_shared<ListenerSlot>(listener);
return id;
}

void FfiClient::removeListener(ListenerId id) {
const std::scoped_lock<std::mutex> guard(lock_);
listeners_.erase(id);
std::shared_ptr<ListenerSlot> slot;
{
const std::scoped_lock<std::mutex> guard(lock_);
auto it = listeners_.find(id);
if (it == listeners_.end()) {
Comment thread
stephen-derosa marked this conversation as resolved.
return;
}
slot = std::move(it->second);
listeners_.erase(it);
}

const auto this_thread = std::this_thread::get_id();
std::unique_lock<std::mutex> slot_lock(slot->mutex);
slot->cv.wait(slot_lock, [&slot, this_thread] {
const auto self_active = slot->active_threads.count(this_thread) != 0;
return slot->active_callbacks == 0 || (self_active && slot->active_callbacks == 1);
});
slot->removed = true;
}

proto::FfiResponse FfiClient::sendRequest(const proto::FfiRequest& request) const {
Expand Down Expand Up @@ -221,9 +325,12 @@ proto::FfiResponse FfiClient::sendRequest(const proto::FfiRequest& request) cons

void FfiClient::pushEvent(const proto::FfiEvent& event) const {
std::unique_ptr<PendingBase> to_complete;
std::vector<Listener> listeners_copy;
std::vector<std::shared_ptr<ListenerSlot>> listeners_copy;
{
const std::scoped_lock<std::mutex> guard(lock_);
if (lifecycle_state_.load(std::memory_order_acquire) != LifecycleState::Initialized) {
return;
}

// Complete pending future if this event is a callback with async_id
if (auto async_id = ExtractAsyncId(event)) {
Expand All @@ -246,8 +353,41 @@ void FfiClient::pushEvent(const proto::FfiEvent& event) const {
}

// Notify listeners outside lock
for (auto& listener : listeners_copy) {
listener(event);
for (const auto& slot : listeners_copy) {
Listener listener;
const auto this_thread = std::this_thread::get_id();
{
const std::scoped_lock<std::mutex> slot_guard(slot->mutex);
if (slot->removed) {
continue;
}
++slot->active_callbacks;
++slot->active_threads[this_thread];
listener = slot->listener;
}

try {
listener(event);
} catch (const std::exception& e) {
LK_LOG_ERROR("FfiClient listener threw: {}", e.what());
} catch (...) {
LK_LOG_ERROR("FfiClient listener threw: unknown exception");
}

{
const std::scoped_lock<std::mutex> slot_guard(slot->mutex);
const auto thread_it = slot->active_threads.find(this_thread);
if (thread_it != slot->active_threads.end()) {
--thread_it->second;
if (thread_it->second == 0) {
slot->active_threads.erase(thread_it);
}
}
--slot->active_callbacks;
}

// Notify in case this listener was marked for removal during the callback (will be waiting on this)
slot->cv.notify_all();
Comment thread
stephen-derosa marked this conversation as resolved.
}
}

Expand Down
39 changes: 37 additions & 2 deletions src/ffi_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
#pragma once

#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <optional>
#include <stdexcept>
#include <thread>
#include <unordered_map>
#include <utility>

#include "data_track.pb.h"
#include "livekit/data_track_error.h"
Expand Down Expand Up @@ -147,6 +150,15 @@ class LIVEKIT_INTERNAL_API FfiClient {
private:
FfiClient() = default;

/// Lifecycle state of the FfiClient
/// This is used to prevent race conditions/use-after-free scenarios
enum class LifecycleState : std::uint8_t {
Uninitialized,
Initializing,
Initialized,
ShuttingDown,
};

// Base class for type-erased pending ops
struct PendingBase {
AsyncId async_id = 0; // Client-generated async ID for cancellation
Expand Down Expand Up @@ -176,6 +188,25 @@ class LIVEKIT_INTERNAL_API FfiClient {
}
};

/// Additional data structure to track listener callbacks and their state.
/// This is used to coordinate the FFI thread and the app thread, and prevent race conditions/use-after-free scenarios
struct ListenerSlot {
explicit ListenerSlot(Listener cb) : listener(std::move(cb)) {}

/// The user-provided listener callback
Listener listener;
/// Mutex to protect the listener slot
std::mutex mutex;
/// Condition variable to wait for the listener to finish
std::condition_variable cv;
/// Map of thread IDs to the number of active threads
std::unordered_map<std::thread::id, int> active_threads;
/// Number of active callbacks
int active_callbacks = 0;
/// Whether the listener has been removed (used for race mitigation before removal)
bool removed = false;
};

template <typename T>
std::future<T> registerAsync(AsyncId async_id, std::function<bool(const proto::FfiEvent&)> match,
std::function<void(const proto::FfiEvent&, std::promise<T>&)> handler);
Expand All @@ -187,14 +218,18 @@ class LIVEKIT_INTERNAL_API FfiClient {
// removed.
bool cancelPendingByAsyncId(AsyncId async_id);

std::unordered_map<ListenerId, Listener> listeners_;
/// Map of listener IDs to listener slots
std::unordered_map<ListenerId, std::shared_ptr<ListenerSlot>> listeners_;
/// Next listener ID to generate
std::atomic<ListenerId> next_listener_id{1};
mutable std::mutex lock_;
/// Map of async IDs to pending operations
mutable std::unordered_map<AsyncId, std::unique_ptr<PendingBase>> pending_by_id_;
/// Next async ID to generate
std::atomic<AsyncId> next_async_id_{1};

void pushEvent(const proto::FfiEvent& event) const;
friend void ffiEventCallback(const uint8_t* buf, size_t len);
std::atomic<bool> initialized_{false};
std::atomic<LifecycleState> lifecycle_state_{LifecycleState::Uninitialized};
};
} // namespace livekit
Loading
Loading