diff --git a/Build/libHttpClient.Linux/CMakeLists.txt b/Build/libHttpClient.Linux/CMakeLists.txt index 92b82fca..15105c71 100644 --- a/Build/libHttpClient.Linux/CMakeLists.txt +++ b/Build/libHttpClient.Linux/CMakeLists.txt @@ -313,5 +313,67 @@ add_test( NAME taskqueue-starvation-linux COMMAND "TaskQueueStarvationTests.Linux" ) +# Label so CI can run every TaskQueue regression test by label (-L taskqueue) +# without having to opt each new test in by name. +set_tests_properties( + taskqueue-starvation-linux + PROPERTIES + LABELS "taskqueue" +) + +################################################# +### TaskQueue single-port poll strand test ### +################################################# +# Standalone regression guard for the single-port delayed-callback strand bug. +# Exercises the STL WaitTimer backend that the Windows test lanes cannot reach. +# Uses only the public XTaskQueue API + the standard library, so it just links +# the libHttpClient.Linux target. +add_executable( + "SingleQueuePollStrandTests.Linux" + "${PATH_TO_ROOT}/Tests/TaskQueueStarvation/SingleQueuePollStrandRepro.cpp" +) + +target_include_directories( + "SingleQueuePollStrandTests.Linux" + PRIVATE + "${COMMON_INCLUDE_DIRS}" + "${LINUX_INCLUDE_DIRS}" + "${PATH_TO_ROOT}/Tests/Shared" +) + +target_link_libraries( + "SingleQueuePollStrandTests.Linux" + PRIVATE + "${PROJECT_NAME}" + Threads::Threads + ${CMAKE_DL_LIBS} +) + +if (NOT BUILD_SHARED_LIBS) + target_link_libraries( + "SingleQueuePollStrandTests.Linux" + PRIVATE + "${LIBCURL_BINARY_PATH}" + "${LIBSSL_BINARY_PATH}" + "${LIBCRYPTO_BINARY_PATH}" + ) +endif() + +target_set_flags( + "SingleQueuePollStrandTests.Linux" + "${FLAGS}" + "${FLAGS_DEBUG}" + "${FLAGS_RELEASE}" +) + +add_test( + NAME taskqueue-poll-strand-linux + COMMAND "SingleQueuePollStrandTests.Linux" +) +set_tests_properties( + taskqueue-poll-strand-linux + PROPERTIES + LABELS "taskqueue" +) export(TARGETS ${PROJECT_NAME} FILE ${PROJECT_NAME}Config.cmake) diff --git a/Source/Task/WaitTimer_stl.cpp b/Source/Task/WaitTimer_stl.cpp index 76092dc9..8dfcb72f 100644 --- a/Source/Task/WaitTimer_stl.cpp +++ b/Source/Task/WaitTimer_stl.cpp @@ -1,111 +1,35 @@ #include "pch.h" #include "WaitTimer.h" +// NOTE: This is the proven pre-#975 STL wait-timer backend (libHttpClient +// 2.3.1 / commit 0fa5f24), restored to fix a delayed-callback strand that +// #975's rewrite introduced. The #975 backend added a per-timer "generation" +// scheme where every Start() invalidated the previously pushed heap entry; a +// Start() racing the worker as it was about to dispatch the due entry caused +// the worker to discard that entry as stale and never fire it, permanently +// stranding the callback (observed on-device as an infinite sign-in / inventory +// hang on a single-port manual queue with no independent traffic to rescue it). +// +// This backend uses the original pointer-keyed cancellation and an +// unconditional notify on every Set, which cannot drop a due callback. The +// public WaitTimer API matches the post-#975 surface (GetCurrentTime / +// GetDueTime / Start(dueTime)); both time helpers and the worker share a +// single monotonic clock so the values TaskQueue compares stay consistent and +// are immune to wall-clock adjustments. +// +// The pre-#975 backend keyed this clock off std::high_resolution_clock. That +// alias is steady_clock on some standard libraries (libc++) but system_clock +// on others (libstdc++), which is wall-clock and not monotonic. We pin it to +// steady_clock explicitly so the ordering and "now < dueTime" comparisons in +// TaskQueue are monotonic on every platform that compiles this backend. + using Clock = std::chrono::steady_clock; using Deadline = Clock::time_point; -using TimerDuration = std::chrono::nanoseconds; - -namespace -{ - // Keep the public WaitTimer surface on a plain integer so TaskQueue can use - // atomics without dragging chrono types through its state. The integer still - // represents steady-clock time, not wall-clock time. - Deadline DeadlineFromDueTime(uint64_t dueTime) noexcept - { - return Deadline(std::chrono::duration_cast(TimerDuration(dueTime))); - } - - uint64_t DueTimeFromDeadline(Deadline deadline) noexcept - { - return static_cast( - std::chrono::duration_cast(deadline.time_since_epoch()).count()); - } -} namespace OS { class TimerQueue; - // Keep callback payload and teardown coordination in shared state because the - // Worker can still hold a due heap entry after the owning WaitTimerImpl has - // been canceled or destroyed. - class WaitTimerState - { - public: - WaitTimerState(_In_opt_ void* context, _In_ WaitTimerCallback* callback) noexcept - : m_context(context), m_callback(callback) - {} - - uint64_t NextGeneration() noexcept - { - return ++m_generation; - } - - uint64_t Generation() const noexcept - { - return m_generation.load(std::memory_order_acquire); - } - - void BeginTerminate() noexcept - { - m_terminating.store(true, std::memory_order_release); - } - - bool TryBeginDispatch() noexcept - { - if (m_terminating.load(std::memory_order_acquire)) - { - return false; - } - - std::lock_guard lock{ m_lock }; - // Re-check under the lock so teardown cannot race with the in-flight - // count increment and then wait forever for a dispatch we started. - if (m_terminating.load(std::memory_order_relaxed)) - { - return false; - } - - ++m_inFlightDispatch; - return true; - } - - void EndDispatch() noexcept - { - std::lock_guard lock{ m_lock }; - ASSERT(m_inFlightDispatch != 0); - - --m_inFlightDispatch; - if (m_inFlightDispatch == 0) - { - m_quiesced.notify_all(); - } - } - - void WaitForQuiesce() noexcept - { - std::unique_lock lock{ m_lock }; - m_quiesced.wait(lock, [this]() noexcept - { - return m_inFlightDispatch == 0; - }); - } - - void InvokeCallback() noexcept - { - m_callback(m_context); - } - - private: - void* m_context; - WaitTimerCallback* m_callback; - std::atomic m_generation{ 0 }; - std::atomic m_terminating{ false }; - DefaultUnnamedMutex m_lock; - DefaultUnnamedConditionVariable m_quiesced; - uint32_t m_inFlightDispatch = 0; - }; - class WaitTimerImpl { public: @@ -113,23 +37,20 @@ namespace OS HRESULT Initialize(_In_opt_ void* context, _In_ WaitTimerCallback* callback); void Start(_In_ uint64_t dueTime); void Cancel(); - void Terminate() noexcept; + void InvokeCallback(); private: - std::shared_ptr m_state; + + void* m_context; + WaitTimerCallback* m_callback; std::shared_ptr m_timerQueue; }; struct TimerEntry { Deadline When; - std::shared_ptr State; - // Each Start() pushes a new heap entry instead of searching/removing the - // old one. Generation lets the Worker discard superseded entries cheaply. - uint64_t Generation; - TimerEntry(Deadline d, std::shared_ptr state, uint64_t g) - : When{ d }, State{ std::move(state) }, Generation{ g } - {} + WaitTimerImpl* Timer; + TimerEntry(Deadline d, WaitTimerImpl* t) : When{ d }, Timer{ t } {} }; struct TimerEntryComparator @@ -140,19 +61,14 @@ namespace OS } }; - // The queue is shared across timers, but it should still retire once the - // last timer goes away instead of leaking for process lifetime. - class TimerQueue : public std::enable_shared_from_this + class TimerQueue { public: bool Init() noexcept; ~TimerQueue(); - void AddTimer() noexcept; - void RemoveTimer() noexcept; - void Set(std::shared_ptr const& state, Deadline deadline) noexcept; - void Remove(WaitTimerState const* state) noexcept; - std::thread::id WorkerThreadId() const noexcept; + void Set(WaitTimerImpl* timer, Deadline deadline) noexcept; + void Remove(WaitTimerImpl const* timer) noexcept; private: void Worker() noexcept; @@ -161,10 +77,9 @@ namespace OS TimerEntry Pop() noexcept; DefaultUnnamedMutex m_mutex; - DefaultUnnamedConditionVariable m_cv; + std::condition_variable m_cv; std::vector m_queue; // used as a heap std::thread m_t; - std::atomic m_timerCount{ 0 }; bool m_exitThread = false; bool m_initialized = false; }; @@ -185,16 +100,7 @@ namespace OS m_cv.notify_all(); if (m_t.joinable()) { - if (m_t.get_id() == std::this_thread::get_id()) - { - // Immediate-port callbacks can tear down their own timer from the - // Worker thread. Joining from that path would deadlock. - m_t.detach(); - } - else - { - m_t.join(); - } + m_t.join(); } } @@ -204,11 +110,9 @@ namespace OS try { - // Capture a self-reference so the queue stays alive until the Worker - // exits even if the last timer concurrently clears the global slot. - m_t = std::thread([keepAlive = shared_from_this()]() + m_t = std::thread([this]() { - keepAlive->Worker(); + Worker(); }); m_initialized = true; } @@ -220,81 +124,41 @@ namespace OS return m_initialized; } - void TimerQueue::AddTimer() noexcept - { - m_timerCount.fetch_add(1, std::memory_order_relaxed); - } - - void TimerQueue::RemoveTimer() noexcept + void TimerQueue::Set(WaitTimerImpl* timer, Deadline deadline) noexcept { - if (m_timerCount.fetch_sub(1, std::memory_order_acq_rel) != 1) { - return; - } + std::lock_guard lock{ m_mutex }; - // Last timer out clears the global queue and wakes the Worker so Linux - // teardown actually quiesces instead of depending on process exit. - { - std::lock_guard globalLock{ g_timerQueueMutex }; - if (g_timerQueue.get() == this) + for (auto& entry : m_queue) { - g_timerQueue.reset(); + if (entry.Timer == timer) + { + entry.Timer = nullptr; + } } - } - - { - std::lock_guard lock{ m_mutex }; - m_exitThread = true; - } - - m_cv.notify_one(); - } - - void TimerQueue::Set(std::shared_ptr const& state, Deadline deadline) noexcept - { - bool shouldNotify; - { - std::lock_guard lock{ m_mutex }; - - // Bump the generation so stale heap entries for this timer are - // skipped by the Worker on pop. This replaces the old O(N) - // nullification scan with an O(log N) push. - uint64_t gen = state->NextGeneration(); - - // Only wake the Worker when the new deadline might be earlier - // than the current heap top; otherwise the Worker is already - // sleeping for a deadline that is at least as early. - shouldNotify = m_queue.empty() || deadline < m_queue.front().When; - m_queue.emplace_back(deadline, state, gen); + m_queue.emplace_back(deadline, timer); std::push_heap(m_queue.begin(), m_queue.end(), TimerEntryComparator{}); } - if (shouldNotify) - { - m_cv.notify_one(); - } + m_cv.notify_all(); } - void TimerQueue::Remove(WaitTimerState const* state) noexcept + void TimerQueue::Remove(WaitTimerImpl const* timer) noexcept { std::lock_guard lock{ m_mutex }; - // Remove is only called during cancellation/teardown. - // Reset shared state entries so the Worker never dispatches them. + // since m_queue is a heap, removing elements is non trivial, instead we + // just clean the timer pointer and the entry will be popped eventually + for (auto& entry : m_queue) { - if (entry.State.get() == state) + if (entry.Timer == timer) { - entry.State.reset(); + entry.Timer = nullptr; } } } - std::thread::id TimerQueue::WorkerThreadId() const noexcept - { - return m_t.get_id(); - } - void TimerQueue::Worker() noexcept { std::unique_lock lock{ m_mutex }; @@ -302,52 +166,28 @@ namespace OS { while (!m_queue.empty()) { - auto& top = Peek(); - - // Discard stale/nullified entries without releasing the lock. - if (!top.State || - top.Generation != top.State->Generation()) - { - Pop(); - continue; - } - - if (Clock::now() < top.When) + Deadline next = Peek().When; + if (Clock::now() < next) { break; } TimerEntry entry = Pop(); - if (!entry.State->TryBeginDispatch()) - { - continue; - } - // Release the lock while invoking the callback, just in case - // the timer gets destroyed on this thread or re-adds itself - // in the callback. + // release the lock while invoking the callback, just in case timer + // gets destroyed on this thread or re-adds itself in the callback lock.unlock(); - entry.State->InvokeCallback(); - entry.State->EndDispatch(); - lock.lock(); - } - - // Drain dead entries at the heap top so wait_until targets a - // live deadline rather than sleeping until a stale entry's time. - while (!m_queue.empty()) - { - auto& top = Peek(); - if (top.State && - top.Generation == top.State->Generation()) + if (entry.Timer) // Timer is set to nullptr if the entry is removed { - break; + entry.Timer->InvokeCallback(); } - Pop(); + lock.lock(); } if (!m_queue.empty()) { - m_cv.wait_until(lock, Peek().When); + Deadline next = Peek().When; + m_cv.wait_until(lock, next); } else { @@ -373,19 +213,25 @@ namespace OS WaitTimerImpl::~WaitTimerImpl() { - Terminate(); + std::lock_guard lock{ g_timerQueueMutex }; + + // If we are the last one referencing the global timer the + // shared use count will be two (us + the global). If it is, + // clear out the global. We let our own reference reset + // as the class destructs. This puts it outside the mutex + // lock, which we want since there is some shutdown cost + // associated with shutting the timer down. + + if (g_timerQueue.use_count() == 2) + { + g_timerQueue.reset(); + } } HRESULT WaitTimerImpl::Initialize(_In_opt_ void* context, _In_ WaitTimerCallback* callback) { - try - { - m_state = http_allocate_shared(context, callback); - } - catch (const std::bad_alloc&) - { - return E_OUTOFMEMORY; - } + m_context = context; + m_callback = callback; std::lock_guard lock{ g_timerQueueMutex }; @@ -408,46 +254,23 @@ namespace OS } m_timerQueue = g_timerQueue; - m_timerQueue->AddTimer(); return S_OK; } void WaitTimerImpl::Start(_In_ uint64_t dueTime) { - m_timerQueue->Set(m_state, DeadlineFromDueTime(dueTime)); + m_timerQueue->Set(this, Deadline(Deadline::duration(dueTime))); } void WaitTimerImpl::Cancel() { - if (m_state != nullptr && m_timerQueue != nullptr) - { - m_timerQueue->Remove(m_state.get()); - } + m_timerQueue->Remove(this); } - void WaitTimerImpl::Terminate() noexcept + void WaitTimerImpl::InvokeCallback() { - std::shared_ptr state = std::move(m_state); - std::shared_ptr timerQueue = std::move(m_timerQueue); - if (state == nullptr || timerQueue == nullptr) - { - return; - } - - // Block any new dispatch before removing queued entries so teardown has - // a single publish point that both the Worker and waiter observe. - state->BeginTerminate(); - timerQueue->Remove(state.get()); - - if (std::this_thread::get_id() != timerQueue->WorkerThreadId()) - { - // Delayed callbacks can run on Immediate queues and self-terminate on - // the Worker thread. Waiting there would deadlock on our own dispatch. - state->WaitForQuiesce(); - } - - timerQueue->RemoveTimer(); + m_callback(m_context); } WaitTimer::WaitTimer() noexcept @@ -481,7 +304,7 @@ namespace OS std::unique_ptr timer(m_impl.exchange(nullptr)); if (timer != nullptr) { - timer->Terminate(); + timer->Cancel(); } } @@ -497,12 +320,13 @@ namespace OS uint64_t WaitTimer::GetCurrentTime() noexcept { - return DueTimeFromDeadline(Clock::now()); + Deadline now = Clock::now(); + return now.time_since_epoch().count(); } uint64_t WaitTimer::GetDueTime(_In_ uint32_t msFromNow) noexcept { - Deadline deadline = Clock::now() + std::chrono::milliseconds(msFromNow); - return DueTimeFromDeadline(deadline); + Deadline d = Clock::now() + std::chrono::milliseconds(msFromNow); + return d.time_since_epoch().count(); } } // Namespace diff --git a/Tests/Shared/SingleQueueDelayedPollStrandScenario.h b/Tests/Shared/SingleQueueDelayedPollStrandScenario.h new file mode 100644 index 00000000..d9a16b49 --- /dev/null +++ b/Tests/Shared/SingleQueueDelayedPollStrandScenario.h @@ -0,0 +1,258 @@ +// Copyright (c) Microsoft Corporation +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +// Shared, platform-neutral scenario for the single-port delayed-callback strand +// regression. Exercised by: +// - the Windows unit test VerifySingleQueueDelayedPollStrand +// (TaskQueueTests.cpp, Win32 WaitTimer backend), and +// - the standalone Linux repro binary (CMake target, STL WaitTimer backend). +// +// This complements CompositeQueueStarvationScenario.h. That scenario keeps many +// concurrent submitters and terminate races running for the whole measurement +// window, so there is always independent traffic whose timer fires sweep the +// pending list and rescue a momentarily-stranded entry. That continuous rescue +// traffic masks the strand this scenario targets. +// +// Here we reproduce the field failure shape directly: a single self-resubmitting +// delayed-poll loop on one manual Work port (mirroring a platform HTTP provider +// that re-arms its poll every few ms via XTaskQueueSubmitDelayedCallback). A +// short burst of unrelated one-shot delayed callbacks runs concurrently to +// create timer-arming contention, then STOPS. After the burst drains, the poll +// loop is the only remaining work: if a re-arm was dropped during the burst, +// nothing sweeps the pending list to rescue it, the poll loop stops advancing, +// and the queue is permanently stalled. +// +// The STL timer backend (Linux / non-Microsoft platforms) is where the +// regression manifests, so running this on Linux CI guards the platform the +// Windows lanes cannot reach. +// +// Uses only the public XTaskQueue C API plus the C++ standard library so the +// same translation unit compiles on every platform. + +#pragma once + +// async.h pulls in pal.h (HRESULT, STDAPI, CALLBACK, FAILED, ...) and then +// XTaskQueue.h, so this header is self-contained on every platform. +#include + +#include +#include +#include +#include +#include + +namespace hc_test +{ + +struct SingleQueuePollStrandConfig +{ + int outerIterations = 50; + // Victim poll loop: a single delayed callback that re-arms itself. + uint32_t pollDelayMs = 2; + // Contention burst: unrelated one-shot delayed callbacks submitted onto the + // same Work port from several threads, then stopped. Tiny delays thrash the + // "earliest due" arming the same way concurrent real work does. + int burstThreads = 4; + int burstSubmitsPerThread = 250; + uint32_t burstDelayMs = 1; + // After the burst drains, the poll loop must advance by at least this many + // additional polls within the timeout. A stranded loop advances by zero. + int progressAfterBurstPolls = 30; + int progressTimeoutMs = 5000; + // Bound on how long to wait for the burst callbacks to drain. + int burstDrainTimeoutMs = 5000; +}; + +namespace detail +{ + +struct StrandPollRequest +{ + XTaskQueueHandle queue{ nullptr }; + uint32_t delayMs{ 0 }; + std::atomic* keepGoing{ nullptr }; + std::atomic polls{ 0 }; +}; + +inline void CALLBACK StrandPollCallback(void* context, bool cancelled) +{ + StrandPollRequest* r = static_cast(context); + + if (cancelled) + { + // Dispatched as cancelled during teardown; do not re-arm. + return; + } + + r->polls.fetch_add(1, std::memory_order_release); + + if (r->keepGoing->load(std::memory_order_acquire)) + { + // Re-arm the poll. This is the exact pattern a platform HTTP provider + // uses to schedule its next poll, and the operation whose dropped + // re-arm strands the loop. + (void)XTaskQueueSubmitDelayedCallback(r->queue, XTaskQueuePort::Work, r->delayMs, r, StrandPollCallback); + } +} + +struct BurstRequest +{ + std::atomic* outstanding{ nullptr }; +}; + +inline void CALLBACK BurstCallback(void* context, bool /*cancelled*/) +{ + BurstRequest* r = static_cast(context); + r->outstanding->fetch_sub(1, std::memory_order_acq_rel); + delete r; +} + +} // namespace detail + +// Runs the scenario. Returns true if the poll loop kept advancing after the +// contention burst ended in every iteration (healthy timer arming). Returns +// false if the loop stalled in any iteration (a dropped re-arm was never +// rescued). If failedIteration / pollsAfterStallStart are provided, they +// receive the first failing iteration index and how many polls the loop had +// reached when it stalled. +inline bool RunSingleQueuePollStrandScenario( + const SingleQueuePollStrandConfig& cfg = SingleQueuePollStrandConfig{}, + int* failedIteration = nullptr, + int* pollsAtStall = nullptr) +{ + using detail::StrandPollRequest; + using detail::StrandPollCallback; + using detail::BurstRequest; + using detail::BurstCallback; + + for (int iter = 0; iter < cfg.outerIterations; iter++) + { + XTaskQueueHandle queue{ nullptr }; + if (FAILED(XTaskQueueCreate(XTaskQueueDispatchMode::Manual, XTaskQueueDispatchMode::Manual, &queue))) + { + if (failedIteration) { *failedIteration = iter; } + if (pollsAtStall) { *pollsAtStall = -1; } + return false; + } + + // Single pump thread for the shared Work port. + std::atomic pumping{ true }; + std::thread pump([queue, &pumping]() + { + while (pumping.load(std::memory_order_acquire)) + { + XTaskQueueDispatch(queue, XTaskQueuePort::Work, 50); + } + }); + + std::atomic keepGoing{ true }; + + StrandPollRequest poller{}; + poller.queue = queue; + poller.delayMs = cfg.pollDelayMs; + poller.keepGoing = &keepGoing; + + // Start the self-resubmitting poll loop and let it establish itself. + if (FAILED(XTaskQueueSubmitDelayedCallback(queue, XTaskQueuePort::Work, cfg.pollDelayMs, &poller, StrandPollCallback))) + { + keepGoing.store(false, std::memory_order_release); + pumping.store(false, std::memory_order_release); + pump.join(); + XTaskQueueCloseHandle(queue); + if (failedIteration) { *failedIteration = iter; } + if (pollsAtStall) { *pollsAtStall = -1; } + return false; + } + + { + auto established = std::chrono::steady_clock::now() + std::chrono::milliseconds(1000); + while (poller.polls.load(std::memory_order_acquire) == 0 && + std::chrono::steady_clock::now() < established) + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } + + // Contention burst: several threads each submit a run of unrelated + // one-shot delayed callbacks onto the same Work port, racing the poll + // loop's re-arms against the timer worker's arming, then stop. + std::atomic outstanding{ 0 }; + std::vector burst; + burst.reserve(cfg.burstThreads); + for (int t = 0; t < cfg.burstThreads; t++) + { + burst.emplace_back([queue, &cfg, &outstanding]() + { + for (int i = 0; i < cfg.burstSubmitsPerThread; i++) + { + BurstRequest* b = new BurstRequest{}; + b->outstanding = &outstanding; + outstanding.fetch_add(1, std::memory_order_acq_rel); + if (FAILED(XTaskQueueSubmitDelayedCallback(queue, XTaskQueuePort::Work, cfg.burstDelayMs, b, BurstCallback))) + { + outstanding.fetch_sub(1, std::memory_order_acq_rel); + delete b; + } + } + }); + } + + for (auto& b : burst) + { + b.join(); + } + + // Let the burst callbacks drain so the poll loop is the only remaining + // work on the queue. + { + auto drainDeadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(cfg.burstDrainTimeoutMs); + while (outstanding.load(std::memory_order_acquire) > 0 && + std::chrono::steady_clock::now() < drainDeadline) + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } + + // With no remaining rescue traffic, the poll loop must keep advancing. + // A strand introduced during the burst shows up here as zero progress. + int pollsAtBurstEnd = poller.polls.load(std::memory_order_acquire); + int target = pollsAtBurstEnd + cfg.progressAfterBurstPolls; + + bool progressed = false; + { + auto progressDeadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(cfg.progressTimeoutMs); + while (std::chrono::steady_clock::now() < progressDeadline) + { + if (poller.polls.load(std::memory_order_acquire) >= target) + { + progressed = true; + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + } + + // Teardown: stop the loop, stop the pump, then terminate + drain so any + // pending (including a stranded) poll callback is dispatched as + // cancelled before the queue and the stack-allocated poller go away. + keepGoing.store(false, std::memory_order_release); + pumping.store(false, std::memory_order_release); + pump.join(); + + XTaskQueueTerminate(queue, false, nullptr, nullptr); + while (XTaskQueueDispatch(queue, XTaskQueuePort::Work, 0)) {} + while (XTaskQueueDispatch(queue, XTaskQueuePort::Completion, 0)) {} + XTaskQueueCloseHandle(queue); + + if (!progressed) + { + if (failedIteration) { *failedIteration = iter; } + if (pollsAtStall) { *pollsAtStall = pollsAtBurstEnd; } + return false; + } + } + + return true; +} + +} // namespace hc_test diff --git a/Tests/TaskQueueStarvation/SingleQueuePollStrandRepro.cpp b/Tests/TaskQueueStarvation/SingleQueuePollStrandRepro.cpp new file mode 100644 index 00000000..5bc10253 --- /dev/null +++ b/Tests/TaskQueueStarvation/SingleQueuePollStrandRepro.cpp @@ -0,0 +1,50 @@ +// Copyright (c) Microsoft Corporation +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +// Standalone Linux repro / regression guard for the single-port delayed-callback +// strand bug. Runs the shared scenario against the STL WaitTimer backend (the +// platform where the regression manifests; the Windows unit-test lanes only +// exercise the Win32 timer backend and cannot catch it). +// +// Exit codes: +// 0 - scenario healthy (the poll loop kept advancing after every burst) +// 1 - strand detected (the poll loop stalled with no rescue traffic) +// +// Built as the CTest target "taskqueue-poll-strand-linux" in +// Build/libHttpClient.Linux/CMakeLists.txt and run by the Linux CI lane. + +#include "SingleQueueDelayedPollStrandScenario.h" + +#include + +int main() +{ + hc_test::SingleQueuePollStrandConfig cfg{}; + + int failedIteration = -1; + int pollsAtStall = -1; + + std::printf( + "[taskqueue-poll-strand] running %d iterations, poll delay %ums, burst %d threads x %d submits...\n", + cfg.outerIterations, + cfg.pollDelayMs, + cfg.burstThreads, + cfg.burstSubmitsPerThread); + std::fflush(stdout); + + bool ok = hc_test::RunSingleQueuePollStrandScenario(cfg, &failedIteration, &pollsAtStall); + + if (!ok) + { + std::printf( + "[taskqueue-poll-strand] FAILED: poll loop stalled at iteration %d (reached %d polls, then no progress)\n", + failedIteration, + pollsAtStall); + std::fflush(stdout); + return 1; + } + + std::printf("[taskqueue-poll-strand] PASSED: no strand detected\n"); + std::fflush(stdout); + return 0; +} diff --git a/Tests/UnitTests/Tests/TaskQueueTests.cpp b/Tests/UnitTests/Tests/TaskQueueTests.cpp index 424fa816..5c2eea18 100644 --- a/Tests/UnitTests/Tests/TaskQueueTests.cpp +++ b/Tests/UnitTests/Tests/TaskQueueTests.cpp @@ -7,6 +7,7 @@ #include "PumpedTaskQueue.h" #include "XTaskQueuePriv.h" #include "CompositeQueueStarvationScenario.h" +#include "SingleQueueDelayedPollStrandScenario.h" #define TEST_CLASS_OWNER L"brianpe" @@ -235,6 +236,37 @@ DEFINE_TEST_CLASS(TaskQueueTests) VERIFY_IS_TRUE(ok); } + DEFINE_TEST_CASE(VerifySingleQueueDelayedPollStrand) + { + // Regression guard for the single-port delayed-callback strand. A single + // self-resubmitting delayed-poll loop runs on one manual Work port + // (mirroring a platform HTTP provider that re-arms its poll every few ms + // via XTaskQueueSubmitDelayedCallback). A short burst of unrelated + // one-shot delayed callbacks runs concurrently to create timer-arming + // contention, then stops. After the burst drains, the poll loop is the + // only remaining work: if a re-arm was dropped during the burst, nothing + // sweeps the pending list to rescue it and the loop stalls. Unlike + // VerifyCompositeQueueDelayedCallbackStarvation, there is no continuous + // rescue traffic to mask the strand once the burst ends. + // + // The scenario itself lives in the shared header so the exact same code + // runs here on the Win32 WaitTimer backend and in the standalone Linux + // repro binary on the STL WaitTimer backend (where the bug manifests). + + int failedIteration = -1; + int pollsAtStall = -1; + bool ok = hc_test::RunSingleQueuePollStrandScenario( + hc_test::SingleQueuePollStrandConfig{}, + &failedIteration, + &pollsAtStall); + + if (!ok) + { + LOG_COMMENT(L"STRAND at iteration %d: poll loop stalled at %d polls", failedIteration, pollsAtStall); + } + VERIFY_IS_TRUE(ok); + } + DEFINE_TEST_CASE(VerifyDuplicateQueueHandle) { const size_t count = 10; diff --git a/Utilities/Pipelines/Tasks/linux-build.yml b/Utilities/Pipelines/Tasks/linux-build.yml index 71e280f8..e6e9abc5 100644 --- a/Utilities/Pipelines/Tasks/linux-build.yml +++ b/Utilities/Pipelines/Tasks/linux-build.yml @@ -35,7 +35,10 @@ steps: script: | set -e cd "$(Build.SourcesDirectory)/Int/CMake/libHttpClient.Linux" - ctest --output-on-failure -R taskqueue-starvation-linux + # Run every test labelled "taskqueue" so new regression scenarios are + # picked up automatically just by adding the label in CMake (no pipeline + # change needed). + ctest --output-on-failure -L taskqueue - task: Bash@3 displayName: 'Build libHttpClient Shared'