From 83ffb4ca7cb2eb17e1381de064a470e1de046872 Mon Sep 17 00:00:00 2001 From: Raul Gomez Rodriguez Date: Mon, 22 Jun 2026 19:37:40 -0700 Subject: [PATCH 1/4] Fix STL wait-timer strand: restore pre-#975 backend (close #973) #975 rewrote the shared STL wait-timer backend (WaitTimer_stl.cpp) to replace the O(N) pointer-keyed cancellation scan with an O(log N) per-timer generation scheme: every Start() bumps a generation counter and pushes a fresh heap entry, and the worker discards any popped entry whose generation no longer matches the timer's current generation. That scheme can drop a due callback. A Start() that races the worker between its Peek() of the earliest entry and its dispatch of that entry bumps the generation, so when the worker pops the entry it now reads a stale generation and discards it instead of firing it. The re-pushed entry carries a new deadline, so the original due callback is never delivered. With the entry gone, the worker sleeps on the next (later or absent) deadline and goes idle. On a single-port manual XTaskQueue this is unrecoverable: the platform HTTP provider re-arms its poll via XTaskQueueSubmitDelayedCallback, and once that re-poll is dropped there is no independent traffic to wake the queue and sweep the pending list again. The symptom is an infinite sign-in / inventory hang (the delayed re-poll sits in the pending list while the timer thread waits in _Cnd_timedwait, never re-armed). Why the arming-layer fixes did not catch or fix it: - #975, #980 and #983 (#983 = "Centralize timer-arming to close TOCTOU race") all hardened the CAS->Start bookkeeping in TaskQueuePortImpl (m_timerDue) in TaskQueue.cpp. Those fixes are correct and are retained here. But they assume Start() reliably arms the backend for the published deadline. The generation backend violates that assumption by dropping the entry after Start(), so no amount of arming-layer verification can recover it. - The defect only affects WaitTimer_stl.cpp. Win32/UWP use WaitTimer_win32.cpp (OS CreateThreadpoolTimer, no generation scheme) and are unaffected. The libHttpClient unit-test project compiles the Win32 backend, so the Windows suite stayed green and none of #975/#980/#983 exercised the STL backend. Fix: restore the proven pre-#975 STL backend algorithm (libHttpClient 2.3.1, commit 0fa5f24) - pointer-keyed cancellation and an unconditional notify on every Set, which cannot discard a due entry - bridged to the post-#975 public WaitTimer API (GetCurrentTime / GetDueTime / Start(dueTime)). The clock is pinned to steady_clock (the pre-#975 code used high_resolution_clock, which is steady_clock on libc++ but wall-clock system_clock on libstdc++); pinning keeps the delayed-callback ordering monotonic on every platform that compiles this backend, preserving #975's monotonic-time intent. The arming-layer improvements in TaskQueue.cpp from #975/#980/#983 are kept unchanged; only the STL timer backend is reverted. Validated on-device in a shipping title: the reproducing infinite sign-in / inventory hang no longer occurs with this backend and the current TaskQueue arming logic. Follow-up: the STL backend has no unit-test coverage (the test project compiles only the Win32 backend). A dedicated STL-backend test lane is needed so a future re-land of the generation optimization cannot reintroduce this class of strand. --- Source/Task/WaitTimer_stl.cpp | 342 +++++++++------------------------- 1 file changed, 83 insertions(+), 259 deletions(-) diff --git a/Source/Task/WaitTimer_stl.cpp b/Source/Task/WaitTimer_stl.cpp index 76092dc9..8dfcb72f 100644 --- a/Source/Task/WaitTimer_stl.cpp +++ b/Source/Task/WaitTimer_stl.cpp @@ -1,111 +1,35 @@ #include "pch.h" #include "WaitTimer.h" +// NOTE: This is the proven pre-#975 STL wait-timer backend (libHttpClient +// 2.3.1 / commit 0fa5f24), restored to fix a delayed-callback strand that +// #975's rewrite introduced. The #975 backend added a per-timer "generation" +// scheme where every Start() invalidated the previously pushed heap entry; a +// Start() racing the worker as it was about to dispatch the due entry caused +// the worker to discard that entry as stale and never fire it, permanently +// stranding the callback (observed on-device as an infinite sign-in / inventory +// hang on a single-port manual queue with no independent traffic to rescue it). +// +// This backend uses the original pointer-keyed cancellation and an +// unconditional notify on every Set, which cannot drop a due callback. The +// public WaitTimer API matches the post-#975 surface (GetCurrentTime / +// GetDueTime / Start(dueTime)); both time helpers and the worker share a +// single monotonic clock so the values TaskQueue compares stay consistent and +// are immune to wall-clock adjustments. +// +// The pre-#975 backend keyed this clock off std::high_resolution_clock. That +// alias is steady_clock on some standard libraries (libc++) but system_clock +// on others (libstdc++), which is wall-clock and not monotonic. We pin it to +// steady_clock explicitly so the ordering and "now < dueTime" comparisons in +// TaskQueue are monotonic on every platform that compiles this backend. + using Clock = std::chrono::steady_clock; using Deadline = Clock::time_point; -using TimerDuration = std::chrono::nanoseconds; - -namespace -{ - // Keep the public WaitTimer surface on a plain integer so TaskQueue can use - // atomics without dragging chrono types through its state. The integer still - // represents steady-clock time, not wall-clock time. - Deadline DeadlineFromDueTime(uint64_t dueTime) noexcept - { - return Deadline(std::chrono::duration_cast(TimerDuration(dueTime))); - } - - uint64_t DueTimeFromDeadline(Deadline deadline) noexcept - { - return static_cast( - std::chrono::duration_cast(deadline.time_since_epoch()).count()); - } -} namespace OS { class TimerQueue; - // Keep callback payload and teardown coordination in shared state because the - // Worker can still hold a due heap entry after the owning WaitTimerImpl has - // been canceled or destroyed. - class WaitTimerState - { - public: - WaitTimerState(_In_opt_ void* context, _In_ WaitTimerCallback* callback) noexcept - : m_context(context), m_callback(callback) - {} - - uint64_t NextGeneration() noexcept - { - return ++m_generation; - } - - uint64_t Generation() const noexcept - { - return m_generation.load(std::memory_order_acquire); - } - - void BeginTerminate() noexcept - { - m_terminating.store(true, std::memory_order_release); - } - - bool TryBeginDispatch() noexcept - { - if (m_terminating.load(std::memory_order_acquire)) - { - return false; - } - - std::lock_guard lock{ m_lock }; - // Re-check under the lock so teardown cannot race with the in-flight - // count increment and then wait forever for a dispatch we started. - if (m_terminating.load(std::memory_order_relaxed)) - { - return false; - } - - ++m_inFlightDispatch; - return true; - } - - void EndDispatch() noexcept - { - std::lock_guard lock{ m_lock }; - ASSERT(m_inFlightDispatch != 0); - - --m_inFlightDispatch; - if (m_inFlightDispatch == 0) - { - m_quiesced.notify_all(); - } - } - - void WaitForQuiesce() noexcept - { - std::unique_lock lock{ m_lock }; - m_quiesced.wait(lock, [this]() noexcept - { - return m_inFlightDispatch == 0; - }); - } - - void InvokeCallback() noexcept - { - m_callback(m_context); - } - - private: - void* m_context; - WaitTimerCallback* m_callback; - std::atomic m_generation{ 0 }; - std::atomic m_terminating{ false }; - DefaultUnnamedMutex m_lock; - DefaultUnnamedConditionVariable m_quiesced; - uint32_t m_inFlightDispatch = 0; - }; - class WaitTimerImpl { public: @@ -113,23 +37,20 @@ namespace OS HRESULT Initialize(_In_opt_ void* context, _In_ WaitTimerCallback* callback); void Start(_In_ uint64_t dueTime); void Cancel(); - void Terminate() noexcept; + void InvokeCallback(); private: - std::shared_ptr m_state; + + void* m_context; + WaitTimerCallback* m_callback; std::shared_ptr m_timerQueue; }; struct TimerEntry { Deadline When; - std::shared_ptr State; - // Each Start() pushes a new heap entry instead of searching/removing the - // old one. Generation lets the Worker discard superseded entries cheaply. - uint64_t Generation; - TimerEntry(Deadline d, std::shared_ptr state, uint64_t g) - : When{ d }, State{ std::move(state) }, Generation{ g } - {} + WaitTimerImpl* Timer; + TimerEntry(Deadline d, WaitTimerImpl* t) : When{ d }, Timer{ t } {} }; struct TimerEntryComparator @@ -140,19 +61,14 @@ namespace OS } }; - // The queue is shared across timers, but it should still retire once the - // last timer goes away instead of leaking for process lifetime. - class TimerQueue : public std::enable_shared_from_this + class TimerQueue { public: bool Init() noexcept; ~TimerQueue(); - void AddTimer() noexcept; - void RemoveTimer() noexcept; - void Set(std::shared_ptr const& state, Deadline deadline) noexcept; - void Remove(WaitTimerState const* state) noexcept; - std::thread::id WorkerThreadId() const noexcept; + void Set(WaitTimerImpl* timer, Deadline deadline) noexcept; + void Remove(WaitTimerImpl const* timer) noexcept; private: void Worker() noexcept; @@ -161,10 +77,9 @@ namespace OS TimerEntry Pop() noexcept; DefaultUnnamedMutex m_mutex; - DefaultUnnamedConditionVariable m_cv; + std::condition_variable m_cv; std::vector m_queue; // used as a heap std::thread m_t; - std::atomic m_timerCount{ 0 }; bool m_exitThread = false; bool m_initialized = false; }; @@ -185,16 +100,7 @@ namespace OS m_cv.notify_all(); if (m_t.joinable()) { - if (m_t.get_id() == std::this_thread::get_id()) - { - // Immediate-port callbacks can tear down their own timer from the - // Worker thread. Joining from that path would deadlock. - m_t.detach(); - } - else - { - m_t.join(); - } + m_t.join(); } } @@ -204,11 +110,9 @@ namespace OS try { - // Capture a self-reference so the queue stays alive until the Worker - // exits even if the last timer concurrently clears the global slot. - m_t = std::thread([keepAlive = shared_from_this()]() + m_t = std::thread([this]() { - keepAlive->Worker(); + Worker(); }); m_initialized = true; } @@ -220,81 +124,41 @@ namespace OS return m_initialized; } - void TimerQueue::AddTimer() noexcept - { - m_timerCount.fetch_add(1, std::memory_order_relaxed); - } - - void TimerQueue::RemoveTimer() noexcept + void TimerQueue::Set(WaitTimerImpl* timer, Deadline deadline) noexcept { - if (m_timerCount.fetch_sub(1, std::memory_order_acq_rel) != 1) { - return; - } + std::lock_guard lock{ m_mutex }; - // Last timer out clears the global queue and wakes the Worker so Linux - // teardown actually quiesces instead of depending on process exit. - { - std::lock_guard globalLock{ g_timerQueueMutex }; - if (g_timerQueue.get() == this) + for (auto& entry : m_queue) { - g_timerQueue.reset(); + if (entry.Timer == timer) + { + entry.Timer = nullptr; + } } - } - - { - std::lock_guard lock{ m_mutex }; - m_exitThread = true; - } - - m_cv.notify_one(); - } - - void TimerQueue::Set(std::shared_ptr const& state, Deadline deadline) noexcept - { - bool shouldNotify; - { - std::lock_guard lock{ m_mutex }; - - // Bump the generation so stale heap entries for this timer are - // skipped by the Worker on pop. This replaces the old O(N) - // nullification scan with an O(log N) push. - uint64_t gen = state->NextGeneration(); - - // Only wake the Worker when the new deadline might be earlier - // than the current heap top; otherwise the Worker is already - // sleeping for a deadline that is at least as early. - shouldNotify = m_queue.empty() || deadline < m_queue.front().When; - m_queue.emplace_back(deadline, state, gen); + m_queue.emplace_back(deadline, timer); std::push_heap(m_queue.begin(), m_queue.end(), TimerEntryComparator{}); } - if (shouldNotify) - { - m_cv.notify_one(); - } + m_cv.notify_all(); } - void TimerQueue::Remove(WaitTimerState const* state) noexcept + void TimerQueue::Remove(WaitTimerImpl const* timer) noexcept { std::lock_guard lock{ m_mutex }; - // Remove is only called during cancellation/teardown. - // Reset shared state entries so the Worker never dispatches them. + // since m_queue is a heap, removing elements is non trivial, instead we + // just clean the timer pointer and the entry will be popped eventually + for (auto& entry : m_queue) { - if (entry.State.get() == state) + if (entry.Timer == timer) { - entry.State.reset(); + entry.Timer = nullptr; } } } - std::thread::id TimerQueue::WorkerThreadId() const noexcept - { - return m_t.get_id(); - } - void TimerQueue::Worker() noexcept { std::unique_lock lock{ m_mutex }; @@ -302,52 +166,28 @@ namespace OS { while (!m_queue.empty()) { - auto& top = Peek(); - - // Discard stale/nullified entries without releasing the lock. - if (!top.State || - top.Generation != top.State->Generation()) - { - Pop(); - continue; - } - - if (Clock::now() < top.When) + Deadline next = Peek().When; + if (Clock::now() < next) { break; } TimerEntry entry = Pop(); - if (!entry.State->TryBeginDispatch()) - { - continue; - } - // Release the lock while invoking the callback, just in case - // the timer gets destroyed on this thread or re-adds itself - // in the callback. + // release the lock while invoking the callback, just in case timer + // gets destroyed on this thread or re-adds itself in the callback lock.unlock(); - entry.State->InvokeCallback(); - entry.State->EndDispatch(); - lock.lock(); - } - - // Drain dead entries at the heap top so wait_until targets a - // live deadline rather than sleeping until a stale entry's time. - while (!m_queue.empty()) - { - auto& top = Peek(); - if (top.State && - top.Generation == top.State->Generation()) + if (entry.Timer) // Timer is set to nullptr if the entry is removed { - break; + entry.Timer->InvokeCallback(); } - Pop(); + lock.lock(); } if (!m_queue.empty()) { - m_cv.wait_until(lock, Peek().When); + Deadline next = Peek().When; + m_cv.wait_until(lock, next); } else { @@ -373,19 +213,25 @@ namespace OS WaitTimerImpl::~WaitTimerImpl() { - Terminate(); + std::lock_guard lock{ g_timerQueueMutex }; + + // If we are the last one referencing the global timer the + // shared use count will be two (us + the global). If it is, + // clear out the global. We let our own reference reset + // as the class destructs. This puts it outside the mutex + // lock, which we want since there is some shutdown cost + // associated with shutting the timer down. + + if (g_timerQueue.use_count() == 2) + { + g_timerQueue.reset(); + } } HRESULT WaitTimerImpl::Initialize(_In_opt_ void* context, _In_ WaitTimerCallback* callback) { - try - { - m_state = http_allocate_shared(context, callback); - } - catch (const std::bad_alloc&) - { - return E_OUTOFMEMORY; - } + m_context = context; + m_callback = callback; std::lock_guard lock{ g_timerQueueMutex }; @@ -408,46 +254,23 @@ namespace OS } m_timerQueue = g_timerQueue; - m_timerQueue->AddTimer(); return S_OK; } void WaitTimerImpl::Start(_In_ uint64_t dueTime) { - m_timerQueue->Set(m_state, DeadlineFromDueTime(dueTime)); + m_timerQueue->Set(this, Deadline(Deadline::duration(dueTime))); } void WaitTimerImpl::Cancel() { - if (m_state != nullptr && m_timerQueue != nullptr) - { - m_timerQueue->Remove(m_state.get()); - } + m_timerQueue->Remove(this); } - void WaitTimerImpl::Terminate() noexcept + void WaitTimerImpl::InvokeCallback() { - std::shared_ptr state = std::move(m_state); - std::shared_ptr timerQueue = std::move(m_timerQueue); - if (state == nullptr || timerQueue == nullptr) - { - return; - } - - // Block any new dispatch before removing queued entries so teardown has - // a single publish point that both the Worker and waiter observe. - state->BeginTerminate(); - timerQueue->Remove(state.get()); - - if (std::this_thread::get_id() != timerQueue->WorkerThreadId()) - { - // Delayed callbacks can run on Immediate queues and self-terminate on - // the Worker thread. Waiting there would deadlock on our own dispatch. - state->WaitForQuiesce(); - } - - timerQueue->RemoveTimer(); + m_callback(m_context); } WaitTimer::WaitTimer() noexcept @@ -481,7 +304,7 @@ namespace OS std::unique_ptr timer(m_impl.exchange(nullptr)); if (timer != nullptr) { - timer->Terminate(); + timer->Cancel(); } } @@ -497,12 +320,13 @@ namespace OS uint64_t WaitTimer::GetCurrentTime() noexcept { - return DueTimeFromDeadline(Clock::now()); + Deadline now = Clock::now(); + return now.time_since_epoch().count(); } uint64_t WaitTimer::GetDueTime(_In_ uint32_t msFromNow) noexcept { - Deadline deadline = Clock::now() + std::chrono::milliseconds(msFromNow); - return DueTimeFromDeadline(deadline); + Deadline d = Clock::now() + std::chrono::milliseconds(msFromNow); + return d.time_since_epoch().count(); } } // Namespace From 77e25d851f43efc5c729437b60c54932ddb909cf Mon Sep 17 00:00:00 2001 From: Raul Gomez Rodriguez Date: Mon, 22 Jun 2026 19:56:18 -0700 Subject: [PATCH 2/4] Add single-port delayed-callback strand regression test Adds a regression guard for the strand fixed by the previous commit and wires it into both timer backends, complementing the existing composite-queue starvation guard. The existing CompositeQueueStarvationScenario keeps many concurrent submitters and terminate races running for the whole measurement window, so there is always independent traffic whose timer fires sweep the pending list and rescue a momentarily-stranded entry. That continuous rescue traffic masks the single-port strand, which is why the existing lane did not catch it. The new scenario reproduces the field failure shape directly: a single self-resubmitting delayed-poll loop on one manual Work port (mirroring a platform HTTP provider that re-arms its poll every few ms via XTaskQueueSubmitDelayedCallback). A short burst of unrelated one-shot delayed callbacks runs concurrently to create timer-arming contention, then stops. After the burst drains, the poll loop is the only remaining work: if a re-arm was dropped during the burst, nothing sweeps the pending list to rescue it, the poll loop stops advancing, and the test fails deterministically (it requires the loop to make measurable progress after the burst ends). Wiring (same pattern as CompositeQueueStarvationScenario): - Tests/Shared/SingleQueueDelayedPollStrandScenario.h - shared, platform-neutral scenario using only the public XTaskQueue API + the standard library. - Tests/UnitTests/Tests/TaskQueueTests.cpp - VerifySingleQueueDelayedPollStrand exercises it on the Win32 WaitTimer backend. - Tests/TaskQueueStarvation/SingleQueuePollStrandRepro.cpp + the taskqueue-poll-strand-linux CTest target in Build/libHttpClient.Linux exercise it on the STL WaitTimer backend (where the regression manifests and the Windows lanes cannot reach). --- Build/libHttpClient.Linux/CMakeLists.txt | 50 ++++ .../SingleQueueDelayedPollStrandScenario.h | 258 ++++++++++++++++++ .../SingleQueuePollStrandRepro.cpp | 50 ++++ Tests/UnitTests/Tests/TaskQueueTests.cpp | 32 +++ 4 files changed, 390 insertions(+) create mode 100644 Tests/Shared/SingleQueueDelayedPollStrandScenario.h create mode 100644 Tests/TaskQueueStarvation/SingleQueuePollStrandRepro.cpp diff --git a/Build/libHttpClient.Linux/CMakeLists.txt b/Build/libHttpClient.Linux/CMakeLists.txt index 92b82fca..a6de675a 100644 --- a/Build/libHttpClient.Linux/CMakeLists.txt +++ b/Build/libHttpClient.Linux/CMakeLists.txt @@ -314,4 +314,54 @@ add_test( COMMAND "TaskQueueStarvationTests.Linux" ) +################################################# +### TaskQueue single-port poll strand test ### +################################################# +# Standalone regression guard for the single-port delayed-callback strand bug. +# Exercises the STL WaitTimer backend that the Windows test lanes cannot reach. +# Uses only the public XTaskQueue API + the standard library, so it just links +# the libHttpClient.Linux target. +add_executable( + "SingleQueuePollStrandTests.Linux" + "${PATH_TO_ROOT}/Tests/TaskQueueStarvation/SingleQueuePollStrandRepro.cpp" +) + +target_include_directories( + "SingleQueuePollStrandTests.Linux" + PRIVATE + "${COMMON_INCLUDE_DIRS}" + "${LINUX_INCLUDE_DIRS}" + "${PATH_TO_ROOT}/Tests/Shared" +) + +target_link_libraries( + "SingleQueuePollStrandTests.Linux" + PRIVATE + "${PROJECT_NAME}" + Threads::Threads + ${CMAKE_DL_LIBS} +) + +if (NOT BUILD_SHARED_LIBS) + target_link_libraries( + "SingleQueuePollStrandTests.Linux" + PRIVATE + "${LIBCURL_BINARY_PATH}" + "${LIBSSL_BINARY_PATH}" + "${LIBCRYPTO_BINARY_PATH}" + ) +endif() + +target_set_flags( + "SingleQueuePollStrandTests.Linux" + "${FLAGS}" + "${FLAGS_DEBUG}" + "${FLAGS_RELEASE}" +) + +add_test( + NAME taskqueue-poll-strand-linux + COMMAND "SingleQueuePollStrandTests.Linux" +) + export(TARGETS ${PROJECT_NAME} FILE ${PROJECT_NAME}Config.cmake) diff --git a/Tests/Shared/SingleQueueDelayedPollStrandScenario.h b/Tests/Shared/SingleQueueDelayedPollStrandScenario.h new file mode 100644 index 00000000..d9a16b49 --- /dev/null +++ b/Tests/Shared/SingleQueueDelayedPollStrandScenario.h @@ -0,0 +1,258 @@ +// Copyright (c) Microsoft Corporation +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +// Shared, platform-neutral scenario for the single-port delayed-callback strand +// regression. Exercised by: +// - the Windows unit test VerifySingleQueueDelayedPollStrand +// (TaskQueueTests.cpp, Win32 WaitTimer backend), and +// - the standalone Linux repro binary (CMake target, STL WaitTimer backend). +// +// This complements CompositeQueueStarvationScenario.h. That scenario keeps many +// concurrent submitters and terminate races running for the whole measurement +// window, so there is always independent traffic whose timer fires sweep the +// pending list and rescue a momentarily-stranded entry. That continuous rescue +// traffic masks the strand this scenario targets. +// +// Here we reproduce the field failure shape directly: a single self-resubmitting +// delayed-poll loop on one manual Work port (mirroring a platform HTTP provider +// that re-arms its poll every few ms via XTaskQueueSubmitDelayedCallback). A +// short burst of unrelated one-shot delayed callbacks runs concurrently to +// create timer-arming contention, then STOPS. After the burst drains, the poll +// loop is the only remaining work: if a re-arm was dropped during the burst, +// nothing sweeps the pending list to rescue it, the poll loop stops advancing, +// and the queue is permanently stalled. +// +// The STL timer backend (Linux / non-Microsoft platforms) is where the +// regression manifests, so running this on Linux CI guards the platform the +// Windows lanes cannot reach. +// +// Uses only the public XTaskQueue C API plus the C++ standard library so the +// same translation unit compiles on every platform. + +#pragma once + +// async.h pulls in pal.h (HRESULT, STDAPI, CALLBACK, FAILED, ...) and then +// XTaskQueue.h, so this header is self-contained on every platform. +#include + +#include +#include +#include +#include +#include + +namespace hc_test +{ + +struct SingleQueuePollStrandConfig +{ + int outerIterations = 50; + // Victim poll loop: a single delayed callback that re-arms itself. + uint32_t pollDelayMs = 2; + // Contention burst: unrelated one-shot delayed callbacks submitted onto the + // same Work port from several threads, then stopped. Tiny delays thrash the + // "earliest due" arming the same way concurrent real work does. + int burstThreads = 4; + int burstSubmitsPerThread = 250; + uint32_t burstDelayMs = 1; + // After the burst drains, the poll loop must advance by at least this many + // additional polls within the timeout. A stranded loop advances by zero. + int progressAfterBurstPolls = 30; + int progressTimeoutMs = 5000; + // Bound on how long to wait for the burst callbacks to drain. + int burstDrainTimeoutMs = 5000; +}; + +namespace detail +{ + +struct StrandPollRequest +{ + XTaskQueueHandle queue{ nullptr }; + uint32_t delayMs{ 0 }; + std::atomic* keepGoing{ nullptr }; + std::atomic polls{ 0 }; +}; + +inline void CALLBACK StrandPollCallback(void* context, bool cancelled) +{ + StrandPollRequest* r = static_cast(context); + + if (cancelled) + { + // Dispatched as cancelled during teardown; do not re-arm. + return; + } + + r->polls.fetch_add(1, std::memory_order_release); + + if (r->keepGoing->load(std::memory_order_acquire)) + { + // Re-arm the poll. This is the exact pattern a platform HTTP provider + // uses to schedule its next poll, and the operation whose dropped + // re-arm strands the loop. + (void)XTaskQueueSubmitDelayedCallback(r->queue, XTaskQueuePort::Work, r->delayMs, r, StrandPollCallback); + } +} + +struct BurstRequest +{ + std::atomic* outstanding{ nullptr }; +}; + +inline void CALLBACK BurstCallback(void* context, bool /*cancelled*/) +{ + BurstRequest* r = static_cast(context); + r->outstanding->fetch_sub(1, std::memory_order_acq_rel); + delete r; +} + +} // namespace detail + +// Runs the scenario. Returns true if the poll loop kept advancing after the +// contention burst ended in every iteration (healthy timer arming). Returns +// false if the loop stalled in any iteration (a dropped re-arm was never +// rescued). If failedIteration / pollsAfterStallStart are provided, they +// receive the first failing iteration index and how many polls the loop had +// reached when it stalled. +inline bool RunSingleQueuePollStrandScenario( + const SingleQueuePollStrandConfig& cfg = SingleQueuePollStrandConfig{}, + int* failedIteration = nullptr, + int* pollsAtStall = nullptr) +{ + using detail::StrandPollRequest; + using detail::StrandPollCallback; + using detail::BurstRequest; + using detail::BurstCallback; + + for (int iter = 0; iter < cfg.outerIterations; iter++) + { + XTaskQueueHandle queue{ nullptr }; + if (FAILED(XTaskQueueCreate(XTaskQueueDispatchMode::Manual, XTaskQueueDispatchMode::Manual, &queue))) + { + if (failedIteration) { *failedIteration = iter; } + if (pollsAtStall) { *pollsAtStall = -1; } + return false; + } + + // Single pump thread for the shared Work port. + std::atomic pumping{ true }; + std::thread pump([queue, &pumping]() + { + while (pumping.load(std::memory_order_acquire)) + { + XTaskQueueDispatch(queue, XTaskQueuePort::Work, 50); + } + }); + + std::atomic keepGoing{ true }; + + StrandPollRequest poller{}; + poller.queue = queue; + poller.delayMs = cfg.pollDelayMs; + poller.keepGoing = &keepGoing; + + // Start the self-resubmitting poll loop and let it establish itself. + if (FAILED(XTaskQueueSubmitDelayedCallback(queue, XTaskQueuePort::Work, cfg.pollDelayMs, &poller, StrandPollCallback))) + { + keepGoing.store(false, std::memory_order_release); + pumping.store(false, std::memory_order_release); + pump.join(); + XTaskQueueCloseHandle(queue); + if (failedIteration) { *failedIteration = iter; } + if (pollsAtStall) { *pollsAtStall = -1; } + return false; + } + + { + auto established = std::chrono::steady_clock::now() + std::chrono::milliseconds(1000); + while (poller.polls.load(std::memory_order_acquire) == 0 && + std::chrono::steady_clock::now() < established) + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } + + // Contention burst: several threads each submit a run of unrelated + // one-shot delayed callbacks onto the same Work port, racing the poll + // loop's re-arms against the timer worker's arming, then stop. + std::atomic outstanding{ 0 }; + std::vector burst; + burst.reserve(cfg.burstThreads); + for (int t = 0; t < cfg.burstThreads; t++) + { + burst.emplace_back([queue, &cfg, &outstanding]() + { + for (int i = 0; i < cfg.burstSubmitsPerThread; i++) + { + BurstRequest* b = new BurstRequest{}; + b->outstanding = &outstanding; + outstanding.fetch_add(1, std::memory_order_acq_rel); + if (FAILED(XTaskQueueSubmitDelayedCallback(queue, XTaskQueuePort::Work, cfg.burstDelayMs, b, BurstCallback))) + { + outstanding.fetch_sub(1, std::memory_order_acq_rel); + delete b; + } + } + }); + } + + for (auto& b : burst) + { + b.join(); + } + + // Let the burst callbacks drain so the poll loop is the only remaining + // work on the queue. + { + auto drainDeadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(cfg.burstDrainTimeoutMs); + while (outstanding.load(std::memory_order_acquire) > 0 && + std::chrono::steady_clock::now() < drainDeadline) + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } + + // With no remaining rescue traffic, the poll loop must keep advancing. + // A strand introduced during the burst shows up here as zero progress. + int pollsAtBurstEnd = poller.polls.load(std::memory_order_acquire); + int target = pollsAtBurstEnd + cfg.progressAfterBurstPolls; + + bool progressed = false; + { + auto progressDeadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(cfg.progressTimeoutMs); + while (std::chrono::steady_clock::now() < progressDeadline) + { + if (poller.polls.load(std::memory_order_acquire) >= target) + { + progressed = true; + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + } + + // Teardown: stop the loop, stop the pump, then terminate + drain so any + // pending (including a stranded) poll callback is dispatched as + // cancelled before the queue and the stack-allocated poller go away. + keepGoing.store(false, std::memory_order_release); + pumping.store(false, std::memory_order_release); + pump.join(); + + XTaskQueueTerminate(queue, false, nullptr, nullptr); + while (XTaskQueueDispatch(queue, XTaskQueuePort::Work, 0)) {} + while (XTaskQueueDispatch(queue, XTaskQueuePort::Completion, 0)) {} + XTaskQueueCloseHandle(queue); + + if (!progressed) + { + if (failedIteration) { *failedIteration = iter; } + if (pollsAtStall) { *pollsAtStall = pollsAtBurstEnd; } + return false; + } + } + + return true; +} + +} // namespace hc_test diff --git a/Tests/TaskQueueStarvation/SingleQueuePollStrandRepro.cpp b/Tests/TaskQueueStarvation/SingleQueuePollStrandRepro.cpp new file mode 100644 index 00000000..5bc10253 --- /dev/null +++ b/Tests/TaskQueueStarvation/SingleQueuePollStrandRepro.cpp @@ -0,0 +1,50 @@ +// Copyright (c) Microsoft Corporation +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +// Standalone Linux repro / regression guard for the single-port delayed-callback +// strand bug. Runs the shared scenario against the STL WaitTimer backend (the +// platform where the regression manifests; the Windows unit-test lanes only +// exercise the Win32 timer backend and cannot catch it). +// +// Exit codes: +// 0 - scenario healthy (the poll loop kept advancing after every burst) +// 1 - strand detected (the poll loop stalled with no rescue traffic) +// +// Built as the CTest target "taskqueue-poll-strand-linux" in +// Build/libHttpClient.Linux/CMakeLists.txt and run by the Linux CI lane. + +#include "SingleQueueDelayedPollStrandScenario.h" + +#include + +int main() +{ + hc_test::SingleQueuePollStrandConfig cfg{}; + + int failedIteration = -1; + int pollsAtStall = -1; + + std::printf( + "[taskqueue-poll-strand] running %d iterations, poll delay %ums, burst %d threads x %d submits...\n", + cfg.outerIterations, + cfg.pollDelayMs, + cfg.burstThreads, + cfg.burstSubmitsPerThread); + std::fflush(stdout); + + bool ok = hc_test::RunSingleQueuePollStrandScenario(cfg, &failedIteration, &pollsAtStall); + + if (!ok) + { + std::printf( + "[taskqueue-poll-strand] FAILED: poll loop stalled at iteration %d (reached %d polls, then no progress)\n", + failedIteration, + pollsAtStall); + std::fflush(stdout); + return 1; + } + + std::printf("[taskqueue-poll-strand] PASSED: no strand detected\n"); + std::fflush(stdout); + return 0; +} diff --git a/Tests/UnitTests/Tests/TaskQueueTests.cpp b/Tests/UnitTests/Tests/TaskQueueTests.cpp index 424fa816..5c2eea18 100644 --- a/Tests/UnitTests/Tests/TaskQueueTests.cpp +++ b/Tests/UnitTests/Tests/TaskQueueTests.cpp @@ -7,6 +7,7 @@ #include "PumpedTaskQueue.h" #include "XTaskQueuePriv.h" #include "CompositeQueueStarvationScenario.h" +#include "SingleQueueDelayedPollStrandScenario.h" #define TEST_CLASS_OWNER L"brianpe" @@ -235,6 +236,37 @@ DEFINE_TEST_CLASS(TaskQueueTests) VERIFY_IS_TRUE(ok); } + DEFINE_TEST_CASE(VerifySingleQueueDelayedPollStrand) + { + // Regression guard for the single-port delayed-callback strand. A single + // self-resubmitting delayed-poll loop runs on one manual Work port + // (mirroring a platform HTTP provider that re-arms its poll every few ms + // via XTaskQueueSubmitDelayedCallback). A short burst of unrelated + // one-shot delayed callbacks runs concurrently to create timer-arming + // contention, then stops. After the burst drains, the poll loop is the + // only remaining work: if a re-arm was dropped during the burst, nothing + // sweeps the pending list to rescue it and the loop stalls. Unlike + // VerifyCompositeQueueDelayedCallbackStarvation, there is no continuous + // rescue traffic to mask the strand once the burst ends. + // + // The scenario itself lives in the shared header so the exact same code + // runs here on the Win32 WaitTimer backend and in the standalone Linux + // repro binary on the STL WaitTimer backend (where the bug manifests). + + int failedIteration = -1; + int pollsAtStall = -1; + bool ok = hc_test::RunSingleQueuePollStrandScenario( + hc_test::SingleQueuePollStrandConfig{}, + &failedIteration, + &pollsAtStall); + + if (!ok) + { + LOG_COMMENT(L"STRAND at iteration %d: poll loop stalled at %d polls", failedIteration, pollsAtStall); + } + VERIFY_IS_TRUE(ok); + } + DEFINE_TEST_CASE(VerifyDuplicateQueueHandle) { const size_t count = 10; From 8c802e3e5d209d7252f0f222cdd573227df5ae34 Mon Sep 17 00:00:00 2001 From: Raul Gomez Rodriguez Date: Tue, 23 Jun 2026 08:32:35 -0700 Subject: [PATCH 3/4] CI: run the single-port poll-strand test in the Linux ctest lane The Linux test step filtered ctest to -R taskqueue-starvation-linux, so the new taskqueue-poll-strand-linux target was built but never executed. Widen the filter to run both taskqueue regression tests. --- Utilities/Pipelines/Tasks/linux-build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Utilities/Pipelines/Tasks/linux-build.yml b/Utilities/Pipelines/Tasks/linux-build.yml index 71e280f8..4ac566b2 100644 --- a/Utilities/Pipelines/Tasks/linux-build.yml +++ b/Utilities/Pipelines/Tasks/linux-build.yml @@ -35,7 +35,7 @@ steps: script: | set -e cd "$(Build.SourcesDirectory)/Int/CMake/libHttpClient.Linux" - ctest --output-on-failure -R taskqueue-starvation-linux + ctest --output-on-failure -R "taskqueue-starvation-linux|taskqueue-poll-strand-linux" - task: Bash@3 displayName: 'Build libHttpClient Shared' From 3024234db98bfccd8a0774cd1a2887b5528815c0 Mon Sep 17 00:00:00 2001 From: Raul Gomez Rodriguez Date: Tue, 23 Jun 2026 08:33:51 -0700 Subject: [PATCH 4/4] CI: select TaskQueue regression tests by ctest label, not by name Label both TaskQueue regression ctests with LABELS taskqueue and run them via 'ctest -L taskqueue' instead of an explicit -R name list. Future regression scenarios run automatically by adding the label in CMake, with no pipeline change required. --- Build/libHttpClient.Linux/CMakeLists.txt | 12 ++++++++++++ Utilities/Pipelines/Tasks/linux-build.yml | 5 ++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/Build/libHttpClient.Linux/CMakeLists.txt b/Build/libHttpClient.Linux/CMakeLists.txt index a6de675a..15105c71 100644 --- a/Build/libHttpClient.Linux/CMakeLists.txt +++ b/Build/libHttpClient.Linux/CMakeLists.txt @@ -313,6 +313,13 @@ add_test( NAME taskqueue-starvation-linux COMMAND "TaskQueueStarvationTests.Linux" ) +# Label so CI can run every TaskQueue regression test by label (-L taskqueue) +# without having to opt each new test in by name. +set_tests_properties( + taskqueue-starvation-linux + PROPERTIES + LABELS "taskqueue" +) ################################################# ### TaskQueue single-port poll strand test ### @@ -363,5 +370,10 @@ add_test( NAME taskqueue-poll-strand-linux COMMAND "SingleQueuePollStrandTests.Linux" ) +set_tests_properties( + taskqueue-poll-strand-linux + PROPERTIES + LABELS "taskqueue" +) export(TARGETS ${PROJECT_NAME} FILE ${PROJECT_NAME}Config.cmake) diff --git a/Utilities/Pipelines/Tasks/linux-build.yml b/Utilities/Pipelines/Tasks/linux-build.yml index 4ac566b2..e6e9abc5 100644 --- a/Utilities/Pipelines/Tasks/linux-build.yml +++ b/Utilities/Pipelines/Tasks/linux-build.yml @@ -35,7 +35,10 @@ steps: script: | set -e cd "$(Build.SourcesDirectory)/Int/CMake/libHttpClient.Linux" - ctest --output-on-failure -R "taskqueue-starvation-linux|taskqueue-poll-strand-linux" + # Run every test labelled "taskqueue" so new regression scenarios are + # picked up automatically just by adding the label in CMake (no pipeline + # change needed). + ctest --output-on-failure -L taskqueue - task: Bash@3 displayName: 'Build libHttpClient Shared'