queue = new MultilevelPriorityQueue(2, 1000, new DriverTask());
+ DriverTask e1 = mockDriverTask(mockDriverTaskId(), false);
+ DriverTask e2 = mockDriverTask(mockDriverTaskId(), false);
+ queue.push(e1);
+
+ Assert.assertThrows(IllegalStateException.class, () -> queue.push(e2));
+ Assert.assertEquals(1, queue.size());
+ Assert.assertEquals(e1.getDriverTaskId().toString(), queue.poll().getDriverTaskId().toString());
+ Assert.assertEquals(0, queue.size());
+ Assert.assertTrue(queue.isEmpty());
+ }
+
+ @Test
+ public void testRepushDuplicateId() throws InterruptedException {
+ MultilevelPriorityQueue queue = new MultilevelPriorityQueue(2, 1000, new DriverTask());
+ DriverTask e1 = mockDriverTask(mockDriverTaskId(), false);
+ DriverTask e2 =
+ mockDriverTask(
+ new DriverTaskId(
+ new FragmentInstanceId(new PlanFragmentId(new QueryId("test"), 0), "inst-1"), 0),
+ false);
+ queue.push(e1);
+ queue.push(e2);
+
+ DriverTask polledTask = queue.poll();
+ DriverTask queuedTask = polledTask.equals(e1) ? e2 : e1;
+ DriverTask duplicateQueuedTask = mockDriverTask(queuedTask.getDriverTaskId(), false);
+
+ Assert.assertThrows(IllegalStateException.class, () -> queue.repush(duplicateQueuedTask));
+ Assert.assertEquals(1, queue.size());
+ queue.repush(polledTask);
+ Assert.assertEquals(2, queue.size());
+ }
+
@Test
public void testPushAndPoll() {
try {
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphoreTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphoreTest.java
new file mode 100644
index 0000000000000..24ce666dbe763
--- /dev/null
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphoreTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.concurrent;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FiniteSemaphoreTest {
+
+ @Test
+ public void testRejectNegativeCapacity() {
+ assertIllegalArgument(() -> new FiniteSemaphore(-1, -2));
+ }
+
+ @Test
+ public void testRejectNegativeInitialPermits() {
+ assertIllegalArgument(() -> new FiniteSemaphore(1, -1));
+ }
+
+ @Test
+ public void testRejectInitialPermitsLargerThanCapacity() {
+ assertIllegalArgument(() -> new FiniteSemaphore(1, 2));
+ }
+
+ private void assertIllegalArgument(Runnable runnable) {
+ try {
+ runnable.run();
+ Assert.fail();
+ } catch (IllegalArgumentException ignored) {
+ // expected exception
+ }
+ }
+}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/FixedPriorityBlockingQueueTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/FixedPriorityBlockingQueueTest.java
index f7fd723fc0138..dfd3d696bed8a 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/FixedPriorityBlockingQueueTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/FixedPriorityBlockingQueueTest.java
@@ -43,6 +43,26 @@ public void tearDown() {
queue.clear();
}
+ @Test
+ public void testRejectZeroMaxSize() {
+ try {
+ new FixedPriorityBlockingQueue<>(0, Integer::compare);
+ Assert.fail();
+ } catch (IllegalArgumentException ignored) {
+ // expected exception
+ }
+ }
+
+ @Test
+ public void testRejectNegativeMaxSize() {
+ try {
+ new FixedPriorityBlockingQueue<>(-1, Integer::compare);
+ Assert.fail();
+ } catch (IllegalArgumentException ignored) {
+ // expected exception
+ }
+ }
+
@Test
public void testBlockingTake() throws InterruptedException {
final AtomicBoolean hasTaken = new AtomicBoolean(false);
From 07026d7466fa85d10c6c548a9be4cf9136401e49 Mon Sep 17 00:00:00 2001
From: Caideyipi <87789683+Caideyipi@users.noreply.github.com>
Date: Tue, 9 Jun 2026 12:16:13 +0800
Subject: [PATCH 2/3] Fix queue capacity invariants
---
.../runtime/PipeConfigRegionListener.java | 3 +
.../task/PipeTaskCoordinatorLock.java | 101 ++++++++++------
.../runtime/PipeConfigRegionListenerTest.java | 55 +++++++++
.../task/PipeTaskCoordinatorLockTest.java | 108 ++++++++++++++++++
.../table/DataNodeTableCache.java | 10 +-
.../db/utils/concurrent/FiniteSemaphore.java | 22 ++--
.../table/DataNodeTableCacheTest.java | 61 ++++++++++
.../utils/concurrent/FiniteSemaphoreTest.java | 63 ++++++++++
.../plugin/service/PipePluginClassLoader.java | 2 +-
.../service/PipePluginClassLoaderManager.java | 18 ++-
.../commons/udf/service/UDFClassLoader.java | 2 +-
.../udf/service/UDFClassLoaderManager.java | 32 ++++--
.../PipePluginClassLoaderManagerTest.java | 98 ++++++++++++++++
.../service/UDFClassLoaderManagerTest.java | 93 +++++++++++++++
14 files changed, 604 insertions(+), 64 deletions(-)
create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListenerTest.java
create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java
create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java
create mode 100644 iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoaderManagerTest.java
create mode 100644 iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManagerTest.java
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListener.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListener.java
index 278e133494b50..1c0292537d56c 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListener.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListener.java
@@ -50,6 +50,9 @@ public synchronized void increaseReference(final PipeParameters parameters)
public synchronized void decreaseReference(final PipeParameters parameters)
throws IllegalPathException {
if (!ConfigRegionListeningFilter.parseListeningPlanTypeSet(parameters).isEmpty()) {
+ if (listeningQueueReferenceCount == 0) {
+ return;
+ }
listeningQueueReferenceCount--;
if (listeningQueueReferenceCount == 0) {
listeningQueue.close();
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index 0f788435394f2..b102bcbb07c56 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -24,74 +24,105 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* {@link PipeTaskCoordinatorLock} is a cross-thread lock for pipe task coordinator. It is used to
* ensure that only one thread can execute the pipe task coordinator at the same time.
*
- * Uses {@link Semaphore} instead of {@link java.util.concurrent.locks.ReentrantLock} to support
- * cross-thread acquire/release, which is required by the procedure recovery mechanism: locks may be
- * acquired on the StateMachineUpdater thread during {@code restoreLock()} and released on a
- * ProcedureCoreWorker thread after execution.
+ *
Supports cross-thread acquire/release, which is required by the procedure recovery mechanism:
+ * locks may be acquired on the StateMachineUpdater thread during {@code restoreLock()} and released
+ * on a ProcedureCoreWorker thread after execution.
*/
public class PipeTaskCoordinatorLock {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskCoordinatorLock.class);
- private final Semaphore semaphore = new Semaphore(1);
+ private boolean locked = false;
public void lock() {
LOGGER.debug(
ManagerMessages.PIPETASKCOORDINATOR_LOCK_WAITING_FOR_THREAD,
Thread.currentThread().getName());
- try {
- semaphore.acquire();
- LOGGER.debug(
- ManagerMessages.PIPETASKCOORDINATOR_LOCK_ACQUIRED_BY_THREAD,
- Thread.currentThread().getName());
- } catch (final InterruptedException e) {
+
+ boolean interrupted = false;
+ synchronized (this) {
+ while (locked) {
+ try {
+ wait();
+ } catch (final InterruptedException e) {
+ interrupted = true;
+ LOGGER.error(
+ ManagerMessages.INTERRUPTED_WHILE_WAITING_FOR_PIPETASKCOORDINATOR_LOCK_CURRENT_THREAD,
+ Thread.currentThread().getName());
+ }
+ }
+ locked = true;
+ }
+
+ if (interrupted) {
Thread.currentThread().interrupt();
- LOGGER.error(
- ManagerMessages.INTERRUPTED_WHILE_WAITING_FOR_PIPETASKCOORDINATOR_LOCK_CURRENT_THREAD,
- Thread.currentThread().getName());
}
+ LOGGER.debug(
+ ManagerMessages.PIPETASKCOORDINATOR_LOCK_ACQUIRED_BY_THREAD,
+ Thread.currentThread().getName());
}
public boolean tryLock() {
- try {
- LOGGER.debug(
- ManagerMessages.PIPETASKCOORDINATOR_LOCK_WAITING_FOR_THREAD,
- Thread.currentThread().getName());
- if (semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
- LOGGER.debug(
- ManagerMessages.PIPETASKCOORDINATOR_LOCK_ACQUIRED_BY_THREAD,
- Thread.currentThread().getName());
- return true;
- } else {
- LOGGER.info(
- ManagerMessages.PIPETASKCOORDINATOR_LOCK_FAILED_TO_ACQUIRE_BY_THREAD_BECAUSE_OF_TIMEOUT,
- Thread.currentThread().getName());
- return false;
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ LOGGER.debug(
+ ManagerMessages.PIPETASKCOORDINATOR_LOCK_WAITING_FOR_THREAD,
+ Thread.currentThread().getName());
+ if (Thread.currentThread().isInterrupted()) {
LOGGER.error(
ManagerMessages.INTERRUPTED_WHILE_WAITING_FOR_PIPETASKCOORDINATOR_LOCK_CURRENT_THREAD,
Thread.currentThread().getName());
return false;
}
+
+ final long deadlineNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
+ synchronized (this) {
+ while (locked) {
+ final long remainingNanos = deadlineNanos - System.nanoTime();
+ if (remainingNanos <= 0) {
+ LOGGER.info(
+ ManagerMessages
+ .PIPETASKCOORDINATOR_LOCK_FAILED_TO_ACQUIRE_BY_THREAD_BECAUSE_OF_TIMEOUT,
+ Thread.currentThread().getName());
+ return false;
+ }
+ try {
+ TimeUnit.NANOSECONDS.timedWait(this, remainingNanos);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.error(
+ ManagerMessages.INTERRUPTED_WHILE_WAITING_FOR_PIPETASKCOORDINATOR_LOCK_CURRENT_THREAD,
+ Thread.currentThread().getName());
+ return false;
+ }
+ }
+ locked = true;
+ }
+
+ LOGGER.debug(
+ ManagerMessages.PIPETASKCOORDINATOR_LOCK_ACQUIRED_BY_THREAD,
+ Thread.currentThread().getName());
+ return true;
}
public void unlock() {
- semaphore.release();
+ synchronized (this) {
+ if (!locked) {
+ return;
+ }
+ locked = false;
+ notifyAll();
+ }
LOGGER.debug(
ManagerMessages.PIPETASKCOORDINATOR_LOCK_RELEASED_BY_THREAD,
Thread.currentThread().getName());
}
- public boolean isLocked() {
- return semaphore.availablePermits() == 0;
+ public synchronized boolean isLocked() {
+ return locked;
}
}
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListenerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListenerTest.java
new file mode 100644
index 0000000000000..adaf4d8a73b59
--- /dev/null
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListenerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.agent.runtime;
+
+import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class PipeConfigRegionListenerTest {
+
+ @Test
+ public void testRepeatedDecreaseDoesNotBreakFutureIncrease() throws Exception {
+ PipeConfigRegionListener listener = new PipeConfigRegionListener();
+ PipeParameters parameters =
+ new PipeParameters(
+ new HashMap() {
+ {
+ put(PipeSourceConstant.EXTRACTOR_INCLUSION_KEY, "schema.database.create");
+ }
+ });
+
+ listener.increaseReference(parameters);
+ Assert.assertTrue(listener.listener().isOpened());
+
+ listener.decreaseReference(parameters);
+ Assert.assertFalse(listener.listener().isOpened());
+
+ listener.decreaseReference(parameters);
+ Assert.assertFalse(listener.listener().isOpened());
+
+ listener.increaseReference(parameters);
+ Assert.assertTrue(listener.listener().isOpened());
+ }
+}
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java
new file mode 100644
index 0000000000000..f10d929663374
--- /dev/null
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.coordinator.task;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class PipeTaskCoordinatorLockTest {
+
+ @Test
+ public void testRepeatedUnlockDoesNotIncreaseCapacity() throws Exception {
+ PipeTaskCoordinatorLock lock = new PipeTaskCoordinatorLock();
+
+ lock.lock();
+ lock.unlock();
+ lock.unlock();
+ Assert.assertTrue(lock.tryLock());
+
+ AtomicBoolean acquired = new AtomicBoolean(false);
+ Thread waiter =
+ new Thread(
+ () -> {
+ lock.lock();
+ acquired.set(true);
+ });
+ waiter.start();
+ waitUntilState(waiter, Thread.State.WAITING);
+ Assert.assertFalse(acquired.get());
+
+ lock.unlock();
+ waiter.join(TimeUnit.SECONDS.toMillis(5));
+ Assert.assertFalse(waiter.isAlive());
+ Assert.assertTrue(acquired.get());
+ lock.unlock();
+ }
+
+ @Test
+ public void testInterruptedLockDoesNotReturnBeforeAcquired() throws Exception {
+ PipeTaskCoordinatorLock lock = new PipeTaskCoordinatorLock();
+ AtomicBoolean acquired = new AtomicBoolean(false);
+ AtomicBoolean interruptedAfterLock = new AtomicBoolean(false);
+
+ lock.lock();
+ Thread waiter =
+ new Thread(
+ () -> {
+ Thread.currentThread().interrupt();
+ lock.lock();
+ acquired.set(true);
+ interruptedAfterLock.set(Thread.currentThread().isInterrupted());
+ });
+ waiter.start();
+ waitUntilState(waiter, Thread.State.WAITING);
+ Assert.assertFalse(acquired.get());
+
+ lock.unlock();
+ waiter.join(TimeUnit.SECONDS.toMillis(5));
+ Assert.assertFalse(waiter.isAlive());
+ Assert.assertTrue(acquired.get());
+ Assert.assertTrue(interruptedAfterLock.get());
+ lock.unlock();
+ }
+
+ @Test
+ public void testInterruptedTryLockDoesNotAcquire() {
+ PipeTaskCoordinatorLock lock = new PipeTaskCoordinatorLock();
+
+ Thread.currentThread().interrupt();
+ try {
+ Assert.assertFalse(lock.tryLock());
+ Assert.assertFalse(lock.isLocked());
+ Assert.assertTrue(Thread.currentThread().isInterrupted());
+ } finally {
+ Thread.interrupted();
+ }
+ }
+
+ private void waitUntilState(Thread thread, Thread.State expectedState) throws Exception {
+ long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
+ while (System.nanoTime() < deadline) {
+ if (thread.getState() == expectedState) {
+ return;
+ }
+ Thread.sleep(10);
+ }
+ Assert.assertEquals(expectedState, thread.getState());
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
index f545a9bda237d..d8d8f4fe19b95 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
@@ -374,8 +374,10 @@ private Map> mayGetTableInPreUpdateMap(
private Map> getTablesInConfigNode(
final Map> tableInput) {
Map> result = Collections.emptyMap();
+ boolean acquired = false;
try {
fetchTableSemaphore.acquire();
+ acquired = true;
final TFetchTableResp resp =
ClusterConfigTaskExecutor.getInstance()
.fetchTables(
@@ -388,11 +390,11 @@ private Map> getTablesInConfigNode(
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn(DataNodeSchemaMessages.INTERRUPTED_ACQUIRE_SEMAPHORE_GET_TABLES);
- } catch (final Exception e) {
- fetchTableSemaphore.release();
- throw e;
+ } finally {
+ if (acquired) {
+ fetchTableSemaphore.release();
+ }
}
- fetchTableSemaphore.release();
return result;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphore.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphore.java
index c09a94016b9cd..b0d70528c0d30 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphore.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphore.java
@@ -20,15 +20,12 @@
import org.apache.iotdb.db.i18n.DataNodeMiscMessages;
-import java.util.concurrent.Semaphore;
-
/**
* FiniteSemaphore defines a special Semaphore that the upper limit of permit is capacity. If
* permits exceed the capacity, the release request will be ignored.
*/
public class FiniteSemaphore {
private final int capacity;
- private final Semaphore semaphore;
private int permit;
public FiniteSemaphore(int capacity, int permit) {
@@ -39,23 +36,20 @@ public FiniteSemaphore(int capacity, int permit) {
throw new IllegalArgumentException(DataNodeMiscMessages.CAPACITY_LARGER_THAN_INITIAL_PERMITS);
}
this.capacity = capacity;
- this.semaphore = new Semaphore(permit);
this.permit = permit;
}
- public void release() {
- synchronized (this) {
- if (permit < capacity) {
- permit++;
- semaphore.release();
- }
+ public synchronized void release() {
+ if (permit < capacity) {
+ permit++;
+ notifyAll();
}
}
- public void acquire() throws InterruptedException {
- semaphore.acquire();
- synchronized (this) {
- permit--;
+ public synchronized void acquire() throws InterruptedException {
+ while (permit == 0) {
+ wait();
}
+ permit--;
}
}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java
new file mode 100644
index 0000000000000..e12a06ff12631
--- /dev/null
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.schemaengine.table;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+
+public class DataNodeTableCacheTest {
+
+ @Test
+ public void testInterruptedFetchDoesNotReleaseUnacquiredPermit() throws Exception {
+ DataNodeTableCache cache = DataNodeTableCache.getInstance();
+ Semaphore semaphore = getFetchTableSemaphore(cache);
+ int permitsBeforeFetch = semaphore.availablePermits();
+
+ Thread.currentThread().interrupt();
+ try {
+ getTablesInConfigNode(cache);
+
+ Assert.assertEquals(permitsBeforeFetch, semaphore.availablePermits());
+ Assert.assertTrue(Thread.currentThread().isInterrupted());
+ } finally {
+ Thread.interrupted();
+ }
+ }
+
+ private Semaphore getFetchTableSemaphore(DataNodeTableCache cache) throws Exception {
+ Field semaphoreField = DataNodeTableCache.class.getDeclaredField("fetchTableSemaphore");
+ semaphoreField.setAccessible(true);
+ return (Semaphore) semaphoreField.get(cache);
+ }
+
+ private void getTablesInConfigNode(DataNodeTableCache cache) throws Exception {
+ Method method = DataNodeTableCache.class.getDeclaredMethod("getTablesInConfigNode", Map.class);
+ method.setAccessible(true);
+ method.invoke(cache, Collections.emptyMap());
+ }
+}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphoreTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphoreTest.java
index 24ce666dbe763..93fdd06a290f2 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphoreTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphoreTest.java
@@ -22,6 +22,10 @@
import org.junit.Assert;
import org.junit.Test;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+
public class FiniteSemaphoreTest {
@Test
@@ -39,6 +43,65 @@ public void testRejectInitialPermitsLargerThanCapacity() {
assertIllegalArgument(() -> new FiniteSemaphore(1, 2));
}
+ @Test
+ public void testReleaseDoesNotExceedCapacity() throws InterruptedException {
+ FiniteSemaphore semaphore = new FiniteSemaphore(1, 0);
+
+ semaphore.release();
+ semaphore.release();
+
+ semaphore.acquire();
+ Thread waiter =
+ new Thread(
+ () -> {
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ waiter.start();
+ await()
+ .atMost(1, TimeUnit.MINUTES)
+ .untilAsserted(() -> Assert.assertEquals(Thread.State.WAITING, waiter.getState()));
+
+ waiter.interrupt();
+ waiter.join();
+ }
+
+ @Test
+ public void testAcquireWaitsUntilRelease() throws InterruptedException {
+ FiniteSemaphore semaphore = new FiniteSemaphore(1, 0);
+ Thread waiter =
+ new Thread(
+ () -> {
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ waiter.start();
+ await()
+ .atMost(1, TimeUnit.MINUTES)
+ .untilAsserted(() -> Assert.assertEquals(Thread.State.WAITING, waiter.getState()));
+
+ semaphore.release();
+ await()
+ .atMost(1, TimeUnit.MINUTES)
+ .untilAsserted(() -> Assert.assertEquals(Thread.State.TERMINATED, waiter.getState()));
+ waiter.join();
+ }
+
+ @Test
+ public void testPermitCanBeReused() throws InterruptedException {
+ FiniteSemaphore semaphore = new FiniteSemaphore(1, 1);
+
+ semaphore.acquire();
+ semaphore.release();
+ semaphore.acquire();
+ }
+
private void assertIllegalArgument(Runnable runnable) {
try {
runnable.run();
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoader.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoader.java
index 745d93566fc40..499778b3b5b15 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoader.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoader.java
@@ -81,7 +81,7 @@ public synchronized void acquire() {
}
public synchronized void release() throws IOException {
- activeInstanceCount.decrementAndGet();
+ activeInstanceCount.updateAndGet(count -> count > 0 ? count - 1 : 0);
closeIfPossible();
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoaderManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoaderManager.java
index a6bda955311c9..9837319a58cb5 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoaderManager.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoaderManager.java
@@ -25,6 +25,9 @@
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.pipe.api.PipePlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException;
@@ -34,6 +37,8 @@
@NotThreadSafe
public class PipePluginClassLoaderManager implements IService {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PipePluginClassLoaderManager.class);
+
private final String libRoot;
/**
@@ -64,7 +69,18 @@ public PipePluginClassLoader getPluginClassLoader(String pluginName) {
}
public void addPluginAndClassLoader(String pluginName, PipePluginClassLoader classLoader) {
- pipePluginNameToClassLoaderMap.put(pluginName.toUpperCase(), classLoader);
+ PipePluginClassLoader oldClassLoader =
+ pipePluginNameToClassLoaderMap.put(pluginName.toUpperCase(), classLoader);
+ if (oldClassLoader != null && oldClassLoader != classLoader) {
+ try {
+ oldClassLoader.markAsDeprecated();
+ } catch (IOException e) {
+ LOGGER.warn(
+ "Failed to close old PipePluginClassLoader when replacing plugin {}, because {}",
+ pluginName,
+ e.toString());
+ }
+ }
}
public PipePluginClassLoader createPipePluginClassLoader(String pluginDirPath)
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoader.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoader.java
index df5dd977b5dd5..f47f74d44699e 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoader.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoader.java
@@ -70,7 +70,7 @@ public void acquire() {
}
public void release() throws IOException {
- activeQueriesCount.decrementAndGet();
+ activeQueriesCount.updateAndGet(count -> count > 0 ? count - 1 : 0);
closeIfPossible();
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java
index 138e7e284b90c..7b3280c89cbb0 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java
@@ -43,6 +43,8 @@ public class UDFClassLoaderManager implements IService {
/** The keys in the map are the query IDs of the UDF queries being executed. */
private final Map queryIdToUDFClassLoaderMap;
+ private final Object activeClassLoaderLock = new Object();
+
/**
* activeClassLoader is used to load all classes under libRoot. libRoot may be updated before the
* user executes CREATE FUNCTION or after the user executes DROP FUNCTION. Therefore, we need to
@@ -64,12 +66,24 @@ private UDFClassLoaderManager() throws IOException {
}
public void initializeUDFQuery(String queryId) {
- activeClassLoader.get().acquire();
- queryIdToUDFClassLoaderMap.put(queryId, activeClassLoader.get());
+ final UDFClassLoader previousClassLoader;
+ synchronized (activeClassLoaderLock) {
+ final UDFClassLoader currentClassLoader = activeClassLoader.get();
+ currentClassLoader.acquire();
+ previousClassLoader = queryIdToUDFClassLoaderMap.put(queryId, currentClassLoader);
+ }
+ releaseClassLoader(queryId, previousClassLoader);
}
public void finalizeUDFQuery(String queryId) {
- UDFClassLoader classLoader = queryIdToUDFClassLoaderMap.remove(queryId);
+ final UDFClassLoader classLoader;
+ synchronized (activeClassLoaderLock) {
+ classLoader = queryIdToUDFClassLoaderMap.remove(queryId);
+ }
+ releaseClassLoader(queryId, classLoader);
+ }
+
+ private void releaseClassLoader(String queryId, UDFClassLoader classLoader) {
try {
if (classLoader != null) {
classLoader.release();
@@ -81,12 +95,14 @@ public void finalizeUDFQuery(String queryId) {
}
public UDFClassLoader updateAndGetActiveClassLoader() throws IOException {
- UDFClassLoader deprecatedClassLoader = activeClassLoader.get();
- activeClassLoader.set(new UDFClassLoader(libRoot));
- if (deprecatedClassLoader != null) {
- deprecatedClassLoader.markAsDeprecated();
+ synchronized (activeClassLoaderLock) {
+ UDFClassLoader deprecatedClassLoader = activeClassLoader.get();
+ activeClassLoader.set(new UDFClassLoader(libRoot));
+ if (deprecatedClassLoader != null) {
+ deprecatedClassLoader.markAsDeprecated();
+ }
+ return activeClassLoader.get();
}
- return activeClassLoader.get();
}
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoaderManagerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoaderManagerTest.java
new file mode 100644
index 0000000000000..5c8776e7c7065
--- /dev/null
+++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoaderManagerTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.agent.plugin.service;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PipePluginClassLoaderManagerTest {
+
+ @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Test
+ public void testDuplicatePluginClassLoaderRegistrationClosesPreviousClassLoader()
+ throws Exception {
+ PipePluginClassLoaderManager manager = newManager();
+ TestingPipePluginClassLoader oldClassLoader = newClassLoader();
+ TestingPipePluginClassLoader newClassLoader = newClassLoader();
+
+ manager.addPluginAndClassLoader("plugin", oldClassLoader);
+ manager.addPluginAndClassLoader("plugin", newClassLoader);
+
+ Assert.assertTrue(oldClassLoader.isClosed());
+ Assert.assertSame(newClassLoader, manager.getPluginClassLoader("plugin"));
+ }
+
+ @Test
+ public void testClassLoaderReleaseDoesNotUnderflow() throws Exception {
+ PipePluginClassLoader classLoader = newClassLoader();
+
+ classLoader.release();
+ Assert.assertEquals(0, getActiveInstanceCount(classLoader));
+
+ classLoader.acquire();
+ classLoader.release();
+ classLoader.release();
+ Assert.assertEquals(0, getActiveInstanceCount(classLoader));
+ }
+
+ private PipePluginClassLoaderManager newManager() throws Exception {
+ Constructor constructor =
+ PipePluginClassLoaderManager.class.getDeclaredConstructor(String.class);
+ constructor.setAccessible(true);
+ return constructor.newInstance(temporaryFolder.newFolder("pipe-lib").getAbsolutePath());
+ }
+
+ private TestingPipePluginClassLoader newClassLoader() throws Exception {
+ return new TestingPipePluginClassLoader(temporaryFolder.newFolder().getAbsolutePath());
+ }
+
+ private long getActiveInstanceCount(PipePluginClassLoader classLoader) throws Exception {
+ Field activeInstanceCountField =
+ PipePluginClassLoader.class.getDeclaredField("activeInstanceCount");
+ activeInstanceCountField.setAccessible(true);
+ return ((AtomicLong) activeInstanceCountField.get(classLoader)).get();
+ }
+
+ private static class TestingPipePluginClassLoader extends PipePluginClassLoader {
+
+ private boolean closed;
+
+ private TestingPipePluginClassLoader(String libRoot) throws Exception {
+ super(libRoot);
+ }
+
+ @Override
+ public void close() throws java.io.IOException {
+ closed = true;
+ super.close();
+ }
+
+ private boolean isClosed() {
+ return closed;
+ }
+ }
+}
diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManagerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManagerTest.java
new file mode 100644
index 0000000000000..5a358dca97f7e
--- /dev/null
+++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManagerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.service;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class UDFClassLoaderManagerTest {
+
+ @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Test
+ public void testDuplicateQueryInitializationReleasesPreviousClassLoader() throws Exception {
+ UDFClassLoaderManager manager = newManager();
+
+ manager.initializeUDFQuery("query");
+ UDFClassLoader deprecatedClassLoader = getActiveClassLoader(manager);
+
+ manager.updateAndGetActiveClassLoader();
+ UDFClassLoader currentClassLoader = getActiveClassLoader(manager);
+ manager.initializeUDFQuery("query");
+
+ Assert.assertEquals(0, getActiveQueriesCount(deprecatedClassLoader));
+ Assert.assertEquals(1, getActiveQueriesCount(currentClassLoader));
+
+ manager.finalizeUDFQuery("query");
+
+ Assert.assertEquals(0, getActiveQueriesCount(currentClassLoader));
+ }
+
+ @Test
+ public void testClassLoaderReleaseDoesNotUnderflow() throws Exception {
+ UDFClassLoader classLoader =
+ new UDFClassLoader(temporaryFolder.newFolder("udf-lib").getAbsolutePath());
+
+ classLoader.release();
+ Assert.assertEquals(0, getActiveQueriesCount(classLoader));
+
+ classLoader.acquire();
+ classLoader.release();
+ classLoader.release();
+ Assert.assertEquals(0, getActiveQueriesCount(classLoader));
+ }
+
+ private UDFClassLoaderManager newManager() throws Exception {
+ Constructor constructor =
+ UDFClassLoaderManager.class.getDeclaredConstructor(String.class);
+ constructor.setAccessible(true);
+ UDFClassLoaderManager manager =
+ constructor.newInstance(temporaryFolder.newFolder("udf-lib").getAbsolutePath());
+ manager.start();
+ return manager;
+ }
+
+ private UDFClassLoader getActiveClassLoader(UDFClassLoaderManager manager) throws Exception {
+ Field activeClassLoaderField =
+ UDFClassLoaderManager.class.getDeclaredField("activeClassLoader");
+ activeClassLoaderField.setAccessible(true);
+ AtomicReference activeClassLoader =
+ (AtomicReference) activeClassLoaderField.get(manager);
+ return activeClassLoader.get();
+ }
+
+ private long getActiveQueriesCount(UDFClassLoader classLoader) throws Exception {
+ Field activeQueriesCountField = UDFClassLoader.class.getDeclaredField("activeQueriesCount");
+ activeQueriesCountField.setAccessible(true);
+ return ((AtomicLong) activeQueriesCountField.get(classLoader)).get();
+ }
+}
From 57c443cb12b2ca5336e00ab53810064a280d1048 Mon Sep 17 00:00:00 2001
From: Caideyipi <87789683+Caideyipi@users.noreply.github.com>
Date: Tue, 9 Jun 2026 14:09:23 +0800
Subject: [PATCH 3/3] Fix driver scheduler reserved queue tests
---
.../queue/IndexedBlockingReserveQueue.java | 6 ++++
.../queue/IndexedBlockingQueueTest.java | 15 ++++++++++
.../schedule/DefaultDriverSchedulerTest.java | 29 +++++++++++++++++++
3 files changed, 50 insertions(+)
diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingReserveQueue.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingReserveQueue.java
index 685286270516f..16ce9034010d2 100644
--- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingReserveQueue.java
+++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingReserveQueue.java
@@ -83,4 +83,10 @@ public synchronized void decreaseReservedSize() {
Preconditions.checkState(reservedSize > 0, "No reserved space is available.");
this.reservedSize--;
}
+
+ @Override
+ public synchronized void clear() {
+ super.clear();
+ reservedSize = 0;
+ }
}
diff --git a/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingQueueTest.java b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingQueueTest.java
index 278e88a7be7eb..094c7a10f8d2d 100644
--- a/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingQueueTest.java
+++ b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingQueueTest.java
@@ -74,6 +74,21 @@ public void testReservedSpaceCanOnlyBeReleasedOnce() throws InterruptedException
Assert.assertThrows(IllegalStateException.class, queue::decreaseReservedSize);
}
+ @Test
+ public void testClearReleasesReservedSpace() throws InterruptedException {
+ ReserveQueue queue = new ReserveQueue(1);
+ Element first = new Element(1);
+ Element second = new Element(2);
+
+ queue.push(first);
+ Assert.assertSame(first, queue.poll());
+ queue.clear();
+
+ queue.push(second);
+ Assert.assertEquals(1, queue.size());
+ Assert.assertSame(second, queue.poll());
+ }
+
private static class SimpleQueue extends IndexedBlockingQueue {
private final Queue elements = new ArrayDeque<>();
private final Map keyedElements = new HashMap<>();
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java
index 0cc0e5edd71d5..f5c5cda1db4e0 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java
@@ -121,6 +121,7 @@ public void testBlockedToReady() {
fragmentRelatedTask.put(instanceId, taskSet);
manager.getQueryMap().put(queryId, fragmentRelatedTask);
manager.getTimeoutQueue().push(testTask);
+ reserveReadyQueueSlotForPolledTask(testTask);
defaultScheduler.blockedToReady(testTask);
Assert.assertEquals(DriverTaskStatus.READY, testTask.getStatus());
Assert.assertFalse(manager.getBlockedTasks().contains(testTask));
@@ -245,6 +246,7 @@ public void testRunningToReady() {
fragmentRelatedTask.put(instanceId, taskSet);
manager.getQueryMap().put(queryId, fragmentRelatedTask);
manager.getTimeoutQueue().push(testTask);
+ reserveReadyQueueSlotForPolledTask(testTask);
ExecutionContext context = new ExecutionContext();
context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
context.setCpuDuration(new CpuTimer.CpuDuration());
@@ -307,6 +309,7 @@ public void testRunningToBlocked() {
fragmentRelatedTask.put(instanceId, taskSet);
manager.getQueryMap().put(queryId, fragmentRelatedTask);
manager.getTimeoutQueue().push(testTask);
+ reserveReadyQueueSlotForPolledTask(testTask);
ExecutionContext context = new ExecutionContext();
context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
context.setCpuDuration(new CpuTimer.CpuDuration());
@@ -369,6 +372,7 @@ public void testRunningToFinished() {
fragmentRelatedTask.put(instanceId, taskSet);
manager.getQueryMap().put(queryId, fragmentRelatedTask);
manager.getTimeoutQueue().push(testTask);
+ reserveReadyQueueSlotForPolledTask(testTask);
ExecutionContext context = new ExecutionContext();
context.setTimeSlice(new Duration(1, TimeUnit.SECONDS));
context.setCpuDuration(new CpuTimer.CpuDuration());
@@ -465,6 +469,17 @@ public void testToAbort() {
fragmentRelatedTask.put(instanceId2, taskSet2);
manager.getQueryMap().put(queryId, fragmentRelatedTask);
manager.getTimeoutQueue().push(testTask1);
+ manager.getTimeoutQueue().push(testTask2);
+ reserveReadyQueueSlotForPolledTask(testTask2);
+ manager.getBlockedTasks().add(testTask2);
+ if (status == DriverTaskStatus.READY) {
+ manager.getReadyQueue().push(testTask1);
+ } else {
+ reserveReadyQueueSlotForPolledTask(testTask1);
+ if (status == DriverTaskStatus.BLOCKED) {
+ manager.getBlockedTasks().add(testTask1);
+ }
+ }
defaultScheduler.toAborted(testTask1);
Mockito.reset(mockMppServiceClient);
@@ -495,4 +510,18 @@ private void clear() {
manager.getReadyQueue().clear();
manager.getTimeoutQueue().clear();
}
+
+ private void reserveReadyQueueSlotForPolledTask(DriverTask task) {
+ DriverTaskStatus status = task.getStatus();
+ try {
+ task.setStatus(DriverTaskStatus.READY);
+ manager.getReadyQueue().push(task);
+ Assert.assertSame(task, manager.getReadyQueue().poll());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ Assert.fail("Interrupted while preparing ready queue reservation");
+ } finally {
+ task.setStatus(status);
+ }
+ }
}