From 8cb406742daf4122830f809e6e0945145cb24f12 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 9 Jun 2026 10:21:50 +0800 Subject: [PATCH 1/3] Fix queue invariant checks --- iotdb-core/calc-commons/pom.xml | 5 + .../schedule/queue/IndexedBlockingQueue.java | 4 + .../queue/IndexedBlockingReserveQueue.java | 4 + .../queue/IndexedBlockingQueueTest.java | 226 ++++++++++++++++++ .../db/utils/concurrent/FiniteSemaphore.java | 3 + .../FixedPriorityBlockingQueue.java | 3 + .../schedule/queue/L1PriorityQueueTest.java | 24 ++ .../schedule/queue/L2PriorityQueueTest.java | 24 ++ .../queue/MultilevelPriorityQueueTest.java | 36 +++ .../utils/concurrent/FiniteSemaphoreTest.java | 50 ++++ .../FixedPriorityBlockingQueueTest.java | 20 ++ 11 files changed, 399 insertions(+) create mode 100644 iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingQueueTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphoreTest.java diff --git a/iotdb-core/calc-commons/pom.xml b/iotdb-core/calc-commons/pom.xml index d854f29cce77e..fbe69d074c839 100644 --- a/iotdb-core/calc-commons/pom.xml +++ b/iotdb-core/calc-commons/pom.xml @@ -99,6 +99,11 @@ at.yawk.lz4 lz4-java + + junit + junit + test + diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingQueue.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingQueue.java index 1aec747685841..7bde713bbd2aa 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingQueue.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingQueue.java @@ -42,6 +42,8 @@ public abstract class IndexedBlockingQueue { public static final String TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG = "The system can't allow more queries."; + public static final String SAME_ID_ELEMENT_ALREADY_EXISTS_ERROR_MSG = + "The queue has already contained the same ID element."; protected final int capacity; protected final E queryHolder; @@ -57,6 +59,7 @@ public abstract class IndexedBlockingQueue { * @throws IllegalArgumentException if maxCapacity <= 0. */ protected IndexedBlockingQueue(int maxCapacity, E queryHolder) { + Preconditions.checkArgument(maxCapacity > 0, "maxCapacity must be greater than 0."); this.capacity = maxCapacity; this.queryHolder = queryHolder; } @@ -93,6 +96,7 @@ public synchronized void push(E element) { throw new NullPointerException(CalcMessages.PUSHED_ELEMENT_IS_NULL); } Preconditions.checkState(size < capacity, TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG); + Preconditions.checkState(!contains(element), SAME_ID_ELEMENT_ALREADY_EXISTS_ERROR_MSG); pushToQueue(element); size++; this.notifyAll(); diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingReserveQueue.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingReserveQueue.java index ce18e18b28a04..685286270516f 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingReserveQueue.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingReserveQueue.java @@ -57,6 +57,7 @@ public synchronized void push(E element) { throw new NullPointerException(CalcMessages.PUSHED_ELEMENT_IS_NULL); } Preconditions.checkState(size + reservedSize < capacity, TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG); + Preconditions.checkState(!contains(element), SAME_ID_ELEMENT_ALREADY_EXISTS_ERROR_MSG); pushToQueue(element); size++; this.notifyAll(); @@ -67,6 +68,8 @@ public synchronized void repush(E element) { if (element == null) { throw new NullPointerException(CalcMessages.PUSHED_ELEMENT_IS_NULL); } + Preconditions.checkState(reservedSize > 0, "No reserved space is available."); + Preconditions.checkState(!contains(element), SAME_ID_ELEMENT_ALREADY_EXISTS_ERROR_MSG); pushToQueue(element); reservedSize--; size++; @@ -77,6 +80,7 @@ public synchronized void repush(E element) { * For task that is not in readyQueue when it's cleared, it won't be added into the queue again. */ public synchronized void decreaseReservedSize() { + Preconditions.checkState(reservedSize > 0, "No reserved space is available."); this.reservedSize--; } } diff --git a/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingQueueTest.java b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingQueueTest.java new file mode 100644 index 0000000000000..278e88a7be7eb --- /dev/null +++ b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingQueueTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.calc.execution.schedule.queue; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayDeque; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; + +public class IndexedBlockingQueueTest { + + @Test + public void testRejectNonPositiveCapacity() { + Assert.assertThrows(IllegalArgumentException.class, () -> new SimpleQueue(0)); + Assert.assertThrows(IllegalArgumentException.class, () -> new ReserveQueue(-1)); + } + + @Test + public void testRejectDuplicateIdPush() throws InterruptedException { + SimpleQueue queue = new SimpleQueue(2); + Element first = new Element(1); + Element duplicate = new Element(1); + + queue.push(first); + Assert.assertThrows(IllegalStateException.class, () -> queue.push(duplicate)); + Assert.assertEquals(1, queue.size()); + Assert.assertSame(first, queue.poll()); + Assert.assertTrue(queue.isEmpty()); + } + + @Test + public void testRejectRepushWithoutReservedSpace() { + ReserveQueue queue = new ReserveQueue(1); + + Assert.assertThrows(IllegalStateException.class, () -> queue.repush(new Element(1))); + } + + @Test + public void testRejectDecreaseWithoutReservedSpace() { + ReserveQueue queue = new ReserveQueue(1); + + Assert.assertThrows(IllegalStateException.class, queue::decreaseReservedSize); + } + + @Test + public void testReservedSpaceCanOnlyBeReleasedOnce() throws InterruptedException { + ReserveQueue queue = new ReserveQueue(1); + Element element = new Element(1); + queue.push(element); + + Assert.assertSame(element, queue.poll()); + queue.decreaseReservedSize(); + + Assert.assertThrows(IllegalStateException.class, queue::decreaseReservedSize); + } + + private static class SimpleQueue extends IndexedBlockingQueue { + private final Queue elements = new ArrayDeque<>(); + private final Map keyedElements = new HashMap<>(); + + private SimpleQueue(int capacity) { + super(capacity, new Element(0)); + } + + @Override + protected Element remove(Element element) { + Element removed = keyedElements.remove(element.getDriverTaskId()); + if (removed != null) { + elements.remove(removed); + } + return removed; + } + + @Override + protected Element get(Element element) { + return keyedElements.get(element.getDriverTaskId()); + } + + @Override + public boolean isEmpty() { + return elements.isEmpty(); + } + + @Override + protected Element pollFirst() { + Element first = elements.remove(); + keyedElements.remove(first.getDriverTaskId()); + return first; + } + + @Override + protected void pushToQueue(Element element) { + elements.add(element); + keyedElements.put(element.getDriverTaskId(), element); + } + + @Override + protected boolean contains(Element element) { + return keyedElements.containsKey(element.getDriverTaskId()); + } + + @Override + protected void clearAllElements() { + elements.clear(); + keyedElements.clear(); + } + } + + private static class ReserveQueue extends IndexedBlockingReserveQueue { + private final Queue elements = new ArrayDeque<>(); + private final Map keyedElements = new HashMap<>(); + + private ReserveQueue(int capacity) { + super(capacity, new Element(0)); + } + + @Override + protected Element remove(Element element) { + Element removed = keyedElements.remove(element.getDriverTaskId()); + if (removed != null) { + elements.remove(removed); + } + return removed; + } + + @Override + protected Element get(Element element) { + return keyedElements.get(element.getDriverTaskId()); + } + + @Override + public boolean isEmpty() { + return elements.isEmpty(); + } + + @Override + protected Element pollFirst() { + Element first = elements.remove(); + keyedElements.remove(first.getDriverTaskId()); + return first; + } + + @Override + protected void pushToQueue(Element element) { + elements.add(element); + keyedElements.put(element.getDriverTaskId(), element); + } + + @Override + protected boolean contains(Element element) { + return keyedElements.containsKey(element.getDriverTaskId()); + } + + @Override + protected void clearAllElements() { + elements.clear(); + keyedElements.clear(); + } + } + + private static class Element implements IDIndexedAccessible { + private ElementId id; + + private Element(int id) { + this.id = new ElementId(id); + } + + @Override + public ID getDriverTaskId() { + return id; + } + + @Override + public void setId(ID id) { + this.id = (ElementId) id; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof Element && ((Element) obj).id.equals(id); + } + + @Override + public int hashCode() { + return id.hashCode(); + } + } + + private static class ElementId implements ID { + private final int id; + + private ElementId(int id) { + this.id = id; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof ElementId && ((ElementId) obj).id == id; + } + + @Override + public int hashCode() { + return Integer.hashCode(id); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphore.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphore.java index 4f0cf9eb85fd9..c09a94016b9cd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphore.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphore.java @@ -32,6 +32,9 @@ public class FiniteSemaphore { private int permit; public FiniteSemaphore(int capacity, int permit) { + if (capacity < 0 || permit < 0) { + throw new IllegalArgumentException("Capacity and initial permits should be non-negative."); + } if (capacity < permit) { throw new IllegalArgumentException(DataNodeMiscMessages.CAPACITY_LARGER_THAN_INITIAL_PERMITS); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FixedPriorityBlockingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FixedPriorityBlockingQueue.java index 382650b2b35b6..7dbf0ec13b036 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FixedPriorityBlockingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FixedPriorityBlockingQueue.java @@ -43,6 +43,9 @@ public class FixedPriorityBlockingQueue { protected List pollLastHookList = new CopyOnWriteArrayList<>(); public FixedPriorityBlockingQueue(int maxSize, Comparator comparator) { + if (maxSize <= 0) { + throw new IllegalArgumentException("maxSize must be greater than 0."); + } this.maxSize = maxSize; this.comparator = comparator; this.queue = MinMaxPriorityQueue.orderedBy(comparator).maximumSize(maxSize + 1).create(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/L1PriorityQueueTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/L1PriorityQueueTest.java index 4038bd51c2809..e6d60e982fa8f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/L1PriorityQueueTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/L1PriorityQueueTest.java @@ -86,6 +86,30 @@ public void testPushExceedCapacity() { } } + @Test + public void testPushDuplicateId() throws InterruptedException { + IndexedBlockingQueue queue = + new L1PriorityQueue<>( + 10, + (o1, o2) -> { + if (o1.equals(o2)) { + return 0; + } + return Integer.compare(o1.getValue(), o2.getValue()); + }, + new QueueElement(new QueueElement.QueueElementID(0), 0)); + QueueElement.QueueElementID id = new QueueElement.QueueElementID(1); + QueueElement e1 = new QueueElement(id, 1); + QueueElement e2 = new QueueElement(id, 2); + queue.push(e1); + + Assert.assertThrows(IllegalStateException.class, () -> queue.push(e2)); + Assert.assertEquals(1, queue.size()); + Assert.assertEquals(e1, queue.poll()); + Assert.assertEquals(0, queue.size()); + Assert.assertTrue(queue.isEmpty()); + } + @Test public void testPushAndPoll() throws InterruptedException { IndexedBlockingQueue queue = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/L2PriorityQueueTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/L2PriorityQueueTest.java index 537358d9a49e8..3b79b9b5209c9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/L2PriorityQueueTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/L2PriorityQueueTest.java @@ -91,6 +91,30 @@ public void testPushExceedCapacity() { } } + @Test + public void testPushDuplicateId() throws InterruptedException { + IndexedBlockingQueue queue = + new L2PriorityQueue<>( + 10, + (o1, o2) -> { + if (o1.equals(o2)) { + return 0; + } + return Integer.compare(o1.getValue(), o2.getValue()); + }, + new QueueElement(new QueueElement.QueueElementID(0), 0)); + QueueElement.QueueElementID id = new QueueElement.QueueElementID(1); + QueueElement e1 = new QueueElement(id, 1); + QueueElement e2 = new QueueElement(id, 2); + queue.push(e1); + + Assert.assertThrows(IllegalStateException.class, () -> queue.push(e2)); + Assert.assertEquals(1, queue.size()); + Assert.assertEquals(e1, queue.poll()); + Assert.assertEquals(0, queue.size()); + Assert.assertTrue(queue.isEmpty()); + } + @Test public void testPushAndPoll() throws InterruptedException { IndexedBlockingQueue queue = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/MultilevelPriorityQueueTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/MultilevelPriorityQueueTest.java index 0ee68774b126e..81a403af67f52 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/MultilevelPriorityQueueTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/MultilevelPriorityQueueTest.java @@ -84,6 +84,42 @@ public void testPushExceedCapacity() { }); } + @Test + public void testPushDuplicateId() throws InterruptedException { + IndexedBlockingQueue queue = new MultilevelPriorityQueue(2, 1000, new DriverTask()); + DriverTask e1 = mockDriverTask(mockDriverTaskId(), false); + DriverTask e2 = mockDriverTask(mockDriverTaskId(), false); + queue.push(e1); + + Assert.assertThrows(IllegalStateException.class, () -> queue.push(e2)); + Assert.assertEquals(1, queue.size()); + Assert.assertEquals(e1.getDriverTaskId().toString(), queue.poll().getDriverTaskId().toString()); + Assert.assertEquals(0, queue.size()); + Assert.assertTrue(queue.isEmpty()); + } + + @Test + public void testRepushDuplicateId() throws InterruptedException { + MultilevelPriorityQueue queue = new MultilevelPriorityQueue(2, 1000, new DriverTask()); + DriverTask e1 = mockDriverTask(mockDriverTaskId(), false); + DriverTask e2 = + mockDriverTask( + new DriverTaskId( + new FragmentInstanceId(new PlanFragmentId(new QueryId("test"), 0), "inst-1"), 0), + false); + queue.push(e1); + queue.push(e2); + + DriverTask polledTask = queue.poll(); + DriverTask queuedTask = polledTask.equals(e1) ? e2 : e1; + DriverTask duplicateQueuedTask = mockDriverTask(queuedTask.getDriverTaskId(), false); + + Assert.assertThrows(IllegalStateException.class, () -> queue.repush(duplicateQueuedTask)); + Assert.assertEquals(1, queue.size()); + queue.repush(polledTask); + Assert.assertEquals(2, queue.size()); + } + @Test public void testPushAndPoll() { try { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphoreTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphoreTest.java new file mode 100644 index 0000000000000..24ce666dbe763 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphoreTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils.concurrent; + +import org.junit.Assert; +import org.junit.Test; + +public class FiniteSemaphoreTest { + + @Test + public void testRejectNegativeCapacity() { + assertIllegalArgument(() -> new FiniteSemaphore(-1, -2)); + } + + @Test + public void testRejectNegativeInitialPermits() { + assertIllegalArgument(() -> new FiniteSemaphore(1, -1)); + } + + @Test + public void testRejectInitialPermitsLargerThanCapacity() { + assertIllegalArgument(() -> new FiniteSemaphore(1, 2)); + } + + private void assertIllegalArgument(Runnable runnable) { + try { + runnable.run(); + Assert.fail(); + } catch (IllegalArgumentException ignored) { + // expected exception + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/FixedPriorityBlockingQueueTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/FixedPriorityBlockingQueueTest.java index f7fd723fc0138..dfd3d696bed8a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/FixedPriorityBlockingQueueTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/FixedPriorityBlockingQueueTest.java @@ -43,6 +43,26 @@ public void tearDown() { queue.clear(); } + @Test + public void testRejectZeroMaxSize() { + try { + new FixedPriorityBlockingQueue<>(0, Integer::compare); + Assert.fail(); + } catch (IllegalArgumentException ignored) { + // expected exception + } + } + + @Test + public void testRejectNegativeMaxSize() { + try { + new FixedPriorityBlockingQueue<>(-1, Integer::compare); + Assert.fail(); + } catch (IllegalArgumentException ignored) { + // expected exception + } + } + @Test public void testBlockingTake() throws InterruptedException { final AtomicBoolean hasTaken = new AtomicBoolean(false); From 07026d7466fa85d10c6c548a9be4cf9136401e49 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 9 Jun 2026 12:16:13 +0800 Subject: [PATCH 2/3] Fix queue capacity invariants --- .../runtime/PipeConfigRegionListener.java | 3 + .../task/PipeTaskCoordinatorLock.java | 101 ++++++++++------ .../runtime/PipeConfigRegionListenerTest.java | 55 +++++++++ .../task/PipeTaskCoordinatorLockTest.java | 108 ++++++++++++++++++ .../table/DataNodeTableCache.java | 10 +- .../db/utils/concurrent/FiniteSemaphore.java | 22 ++-- .../table/DataNodeTableCacheTest.java | 61 ++++++++++ .../utils/concurrent/FiniteSemaphoreTest.java | 63 ++++++++++ .../plugin/service/PipePluginClassLoader.java | 2 +- .../service/PipePluginClassLoaderManager.java | 18 ++- .../commons/udf/service/UDFClassLoader.java | 2 +- .../udf/service/UDFClassLoaderManager.java | 32 ++++-- .../PipePluginClassLoaderManagerTest.java | 98 ++++++++++++++++ .../service/UDFClassLoaderManagerTest.java | 93 +++++++++++++++ 14 files changed, 604 insertions(+), 64 deletions(-) create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListenerTest.java create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java create mode 100644 iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoaderManagerTest.java create mode 100644 iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManagerTest.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListener.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListener.java index 278e133494b50..1c0292537d56c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListener.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListener.java @@ -50,6 +50,9 @@ public synchronized void increaseReference(final PipeParameters parameters) public synchronized void decreaseReference(final PipeParameters parameters) throws IllegalPathException { if (!ConfigRegionListeningFilter.parseListeningPlanTypeSet(parameters).isEmpty()) { + if (listeningQueueReferenceCount == 0) { + return; + } listeningQueueReferenceCount--; if (listeningQueueReferenceCount == 0) { listeningQueue.close(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java index 0f788435394f2..b102bcbb07c56 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java @@ -24,74 +24,105 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * {@link PipeTaskCoordinatorLock} is a cross-thread lock for pipe task coordinator. It is used to * ensure that only one thread can execute the pipe task coordinator at the same time. * - *

Uses {@link Semaphore} instead of {@link java.util.concurrent.locks.ReentrantLock} to support - * cross-thread acquire/release, which is required by the procedure recovery mechanism: locks may be - * acquired on the StateMachineUpdater thread during {@code restoreLock()} and released on a - * ProcedureCoreWorker thread after execution. + *

Supports cross-thread acquire/release, which is required by the procedure recovery mechanism: + * locks may be acquired on the StateMachineUpdater thread during {@code restoreLock()} and released + * on a ProcedureCoreWorker thread after execution. */ public class PipeTaskCoordinatorLock { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskCoordinatorLock.class); - private final Semaphore semaphore = new Semaphore(1); + private boolean locked = false; public void lock() { LOGGER.debug( ManagerMessages.PIPETASKCOORDINATOR_LOCK_WAITING_FOR_THREAD, Thread.currentThread().getName()); - try { - semaphore.acquire(); - LOGGER.debug( - ManagerMessages.PIPETASKCOORDINATOR_LOCK_ACQUIRED_BY_THREAD, - Thread.currentThread().getName()); - } catch (final InterruptedException e) { + + boolean interrupted = false; + synchronized (this) { + while (locked) { + try { + wait(); + } catch (final InterruptedException e) { + interrupted = true; + LOGGER.error( + ManagerMessages.INTERRUPTED_WHILE_WAITING_FOR_PIPETASKCOORDINATOR_LOCK_CURRENT_THREAD, + Thread.currentThread().getName()); + } + } + locked = true; + } + + if (interrupted) { Thread.currentThread().interrupt(); - LOGGER.error( - ManagerMessages.INTERRUPTED_WHILE_WAITING_FOR_PIPETASKCOORDINATOR_LOCK_CURRENT_THREAD, - Thread.currentThread().getName()); } + LOGGER.debug( + ManagerMessages.PIPETASKCOORDINATOR_LOCK_ACQUIRED_BY_THREAD, + Thread.currentThread().getName()); } public boolean tryLock() { - try { - LOGGER.debug( - ManagerMessages.PIPETASKCOORDINATOR_LOCK_WAITING_FOR_THREAD, - Thread.currentThread().getName()); - if (semaphore.tryAcquire(10, TimeUnit.SECONDS)) { - LOGGER.debug( - ManagerMessages.PIPETASKCOORDINATOR_LOCK_ACQUIRED_BY_THREAD, - Thread.currentThread().getName()); - return true; - } else { - LOGGER.info( - ManagerMessages.PIPETASKCOORDINATOR_LOCK_FAILED_TO_ACQUIRE_BY_THREAD_BECAUSE_OF_TIMEOUT, - Thread.currentThread().getName()); - return false; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + LOGGER.debug( + ManagerMessages.PIPETASKCOORDINATOR_LOCK_WAITING_FOR_THREAD, + Thread.currentThread().getName()); + if (Thread.currentThread().isInterrupted()) { LOGGER.error( ManagerMessages.INTERRUPTED_WHILE_WAITING_FOR_PIPETASKCOORDINATOR_LOCK_CURRENT_THREAD, Thread.currentThread().getName()); return false; } + + final long deadlineNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); + synchronized (this) { + while (locked) { + final long remainingNanos = deadlineNanos - System.nanoTime(); + if (remainingNanos <= 0) { + LOGGER.info( + ManagerMessages + .PIPETASKCOORDINATOR_LOCK_FAILED_TO_ACQUIRE_BY_THREAD_BECAUSE_OF_TIMEOUT, + Thread.currentThread().getName()); + return false; + } + try { + TimeUnit.NANOSECONDS.timedWait(this, remainingNanos); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.error( + ManagerMessages.INTERRUPTED_WHILE_WAITING_FOR_PIPETASKCOORDINATOR_LOCK_CURRENT_THREAD, + Thread.currentThread().getName()); + return false; + } + } + locked = true; + } + + LOGGER.debug( + ManagerMessages.PIPETASKCOORDINATOR_LOCK_ACQUIRED_BY_THREAD, + Thread.currentThread().getName()); + return true; } public void unlock() { - semaphore.release(); + synchronized (this) { + if (!locked) { + return; + } + locked = false; + notifyAll(); + } LOGGER.debug( ManagerMessages.PIPETASKCOORDINATOR_LOCK_RELEASED_BY_THREAD, Thread.currentThread().getName()); } - public boolean isLocked() { - return semaphore.availablePermits() == 0; + public synchronized boolean isLocked() { + return locked; } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListenerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListenerTest.java new file mode 100644 index 0000000000000..adaf4d8a73b59 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListenerTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.pipe.agent.runtime; + +import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; + +public class PipeConfigRegionListenerTest { + + @Test + public void testRepeatedDecreaseDoesNotBreakFutureIncrease() throws Exception { + PipeConfigRegionListener listener = new PipeConfigRegionListener(); + PipeParameters parameters = + new PipeParameters( + new HashMap() { + { + put(PipeSourceConstant.EXTRACTOR_INCLUSION_KEY, "schema.database.create"); + } + }); + + listener.increaseReference(parameters); + Assert.assertTrue(listener.listener().isOpened()); + + listener.decreaseReference(parameters); + Assert.assertFalse(listener.listener().isOpened()); + + listener.decreaseReference(parameters); + Assert.assertFalse(listener.listener().isOpened()); + + listener.increaseReference(parameters); + Assert.assertTrue(listener.listener().isOpened()); + } +} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java new file mode 100644 index 0000000000000..f10d929663374 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.pipe.coordinator.task; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class PipeTaskCoordinatorLockTest { + + @Test + public void testRepeatedUnlockDoesNotIncreaseCapacity() throws Exception { + PipeTaskCoordinatorLock lock = new PipeTaskCoordinatorLock(); + + lock.lock(); + lock.unlock(); + lock.unlock(); + Assert.assertTrue(lock.tryLock()); + + AtomicBoolean acquired = new AtomicBoolean(false); + Thread waiter = + new Thread( + () -> { + lock.lock(); + acquired.set(true); + }); + waiter.start(); + waitUntilState(waiter, Thread.State.WAITING); + Assert.assertFalse(acquired.get()); + + lock.unlock(); + waiter.join(TimeUnit.SECONDS.toMillis(5)); + Assert.assertFalse(waiter.isAlive()); + Assert.assertTrue(acquired.get()); + lock.unlock(); + } + + @Test + public void testInterruptedLockDoesNotReturnBeforeAcquired() throws Exception { + PipeTaskCoordinatorLock lock = new PipeTaskCoordinatorLock(); + AtomicBoolean acquired = new AtomicBoolean(false); + AtomicBoolean interruptedAfterLock = new AtomicBoolean(false); + + lock.lock(); + Thread waiter = + new Thread( + () -> { + Thread.currentThread().interrupt(); + lock.lock(); + acquired.set(true); + interruptedAfterLock.set(Thread.currentThread().isInterrupted()); + }); + waiter.start(); + waitUntilState(waiter, Thread.State.WAITING); + Assert.assertFalse(acquired.get()); + + lock.unlock(); + waiter.join(TimeUnit.SECONDS.toMillis(5)); + Assert.assertFalse(waiter.isAlive()); + Assert.assertTrue(acquired.get()); + Assert.assertTrue(interruptedAfterLock.get()); + lock.unlock(); + } + + @Test + public void testInterruptedTryLockDoesNotAcquire() { + PipeTaskCoordinatorLock lock = new PipeTaskCoordinatorLock(); + + Thread.currentThread().interrupt(); + try { + Assert.assertFalse(lock.tryLock()); + Assert.assertFalse(lock.isLocked()); + Assert.assertTrue(Thread.currentThread().isInterrupted()); + } finally { + Thread.interrupted(); + } + } + + private void waitUntilState(Thread thread, Thread.State expectedState) throws Exception { + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); + while (System.nanoTime() < deadline) { + if (thread.getState() == expectedState) { + return; + } + Thread.sleep(10); + } + Assert.assertEquals(expectedState, thread.getState()); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java index f545a9bda237d..d8d8f4fe19b95 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java @@ -374,8 +374,10 @@ private Map> mayGetTableInPreUpdateMap( private Map> getTablesInConfigNode( final Map> tableInput) { Map> result = Collections.emptyMap(); + boolean acquired = false; try { fetchTableSemaphore.acquire(); + acquired = true; final TFetchTableResp resp = ClusterConfigTaskExecutor.getInstance() .fetchTables( @@ -388,11 +390,11 @@ private Map> getTablesInConfigNode( } catch (final InterruptedException e) { Thread.currentThread().interrupt(); LOGGER.warn(DataNodeSchemaMessages.INTERRUPTED_ACQUIRE_SEMAPHORE_GET_TABLES); - } catch (final Exception e) { - fetchTableSemaphore.release(); - throw e; + } finally { + if (acquired) { + fetchTableSemaphore.release(); + } } - fetchTableSemaphore.release(); return result; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphore.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphore.java index c09a94016b9cd..b0d70528c0d30 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphore.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphore.java @@ -20,15 +20,12 @@ import org.apache.iotdb.db.i18n.DataNodeMiscMessages; -import java.util.concurrent.Semaphore; - /** * FiniteSemaphore defines a special Semaphore that the upper limit of permit is capacity. If * permits exceed the capacity, the release request will be ignored. */ public class FiniteSemaphore { private final int capacity; - private final Semaphore semaphore; private int permit; public FiniteSemaphore(int capacity, int permit) { @@ -39,23 +36,20 @@ public FiniteSemaphore(int capacity, int permit) { throw new IllegalArgumentException(DataNodeMiscMessages.CAPACITY_LARGER_THAN_INITIAL_PERMITS); } this.capacity = capacity; - this.semaphore = new Semaphore(permit); this.permit = permit; } - public void release() { - synchronized (this) { - if (permit < capacity) { - permit++; - semaphore.release(); - } + public synchronized void release() { + if (permit < capacity) { + permit++; + notifyAll(); } } - public void acquire() throws InterruptedException { - semaphore.acquire(); - synchronized (this) { - permit--; + public synchronized void acquire() throws InterruptedException { + while (permit == 0) { + wait(); } + permit--; } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java new file mode 100644 index 0000000000000..e12a06ff12631 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.schemaengine.table; + +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.Semaphore; + +public class DataNodeTableCacheTest { + + @Test + public void testInterruptedFetchDoesNotReleaseUnacquiredPermit() throws Exception { + DataNodeTableCache cache = DataNodeTableCache.getInstance(); + Semaphore semaphore = getFetchTableSemaphore(cache); + int permitsBeforeFetch = semaphore.availablePermits(); + + Thread.currentThread().interrupt(); + try { + getTablesInConfigNode(cache); + + Assert.assertEquals(permitsBeforeFetch, semaphore.availablePermits()); + Assert.assertTrue(Thread.currentThread().isInterrupted()); + } finally { + Thread.interrupted(); + } + } + + private Semaphore getFetchTableSemaphore(DataNodeTableCache cache) throws Exception { + Field semaphoreField = DataNodeTableCache.class.getDeclaredField("fetchTableSemaphore"); + semaphoreField.setAccessible(true); + return (Semaphore) semaphoreField.get(cache); + } + + private void getTablesInConfigNode(DataNodeTableCache cache) throws Exception { + Method method = DataNodeTableCache.class.getDeclaredMethod("getTablesInConfigNode", Map.class); + method.setAccessible(true); + method.invoke(cache, Collections.emptyMap()); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphoreTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphoreTest.java index 24ce666dbe763..93fdd06a290f2 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphoreTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphoreTest.java @@ -22,6 +22,10 @@ import org.junit.Assert; import org.junit.Test; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; + public class FiniteSemaphoreTest { @Test @@ -39,6 +43,65 @@ public void testRejectInitialPermitsLargerThanCapacity() { assertIllegalArgument(() -> new FiniteSemaphore(1, 2)); } + @Test + public void testReleaseDoesNotExceedCapacity() throws InterruptedException { + FiniteSemaphore semaphore = new FiniteSemaphore(1, 0); + + semaphore.release(); + semaphore.release(); + + semaphore.acquire(); + Thread waiter = + new Thread( + () -> { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + waiter.start(); + await() + .atMost(1, TimeUnit.MINUTES) + .untilAsserted(() -> Assert.assertEquals(Thread.State.WAITING, waiter.getState())); + + waiter.interrupt(); + waiter.join(); + } + + @Test + public void testAcquireWaitsUntilRelease() throws InterruptedException { + FiniteSemaphore semaphore = new FiniteSemaphore(1, 0); + Thread waiter = + new Thread( + () -> { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + waiter.start(); + await() + .atMost(1, TimeUnit.MINUTES) + .untilAsserted(() -> Assert.assertEquals(Thread.State.WAITING, waiter.getState())); + + semaphore.release(); + await() + .atMost(1, TimeUnit.MINUTES) + .untilAsserted(() -> Assert.assertEquals(Thread.State.TERMINATED, waiter.getState())); + waiter.join(); + } + + @Test + public void testPermitCanBeReused() throws InterruptedException { + FiniteSemaphore semaphore = new FiniteSemaphore(1, 1); + + semaphore.acquire(); + semaphore.release(); + semaphore.acquire(); + } + private void assertIllegalArgument(Runnable runnable) { try { runnable.run(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoader.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoader.java index 745d93566fc40..499778b3b5b15 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoader.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoader.java @@ -81,7 +81,7 @@ public synchronized void acquire() { } public synchronized void release() throws IOException { - activeInstanceCount.decrementAndGet(); + activeInstanceCount.updateAndGet(count -> count > 0 ? count - 1 : 0); closeIfPossible(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoaderManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoaderManager.java index a6bda955311c9..9837319a58cb5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoaderManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoaderManager.java @@ -25,6 +25,9 @@ import org.apache.iotdb.commons.service.ServiceType; import org.apache.iotdb.pipe.api.PipePlugin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.concurrent.NotThreadSafe; import java.io.IOException; @@ -34,6 +37,8 @@ @NotThreadSafe public class PipePluginClassLoaderManager implements IService { + private static final Logger LOGGER = LoggerFactory.getLogger(PipePluginClassLoaderManager.class); + private final String libRoot; /** @@ -64,7 +69,18 @@ public PipePluginClassLoader getPluginClassLoader(String pluginName) { } public void addPluginAndClassLoader(String pluginName, PipePluginClassLoader classLoader) { - pipePluginNameToClassLoaderMap.put(pluginName.toUpperCase(), classLoader); + PipePluginClassLoader oldClassLoader = + pipePluginNameToClassLoaderMap.put(pluginName.toUpperCase(), classLoader); + if (oldClassLoader != null && oldClassLoader != classLoader) { + try { + oldClassLoader.markAsDeprecated(); + } catch (IOException e) { + LOGGER.warn( + "Failed to close old PipePluginClassLoader when replacing plugin {}, because {}", + pluginName, + e.toString()); + } + } } public PipePluginClassLoader createPipePluginClassLoader(String pluginDirPath) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoader.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoader.java index df5dd977b5dd5..f47f74d44699e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoader.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoader.java @@ -70,7 +70,7 @@ public void acquire() { } public void release() throws IOException { - activeQueriesCount.decrementAndGet(); + activeQueriesCount.updateAndGet(count -> count > 0 ? count - 1 : 0); closeIfPossible(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java index 138e7e284b90c..7b3280c89cbb0 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManager.java @@ -43,6 +43,8 @@ public class UDFClassLoaderManager implements IService { /** The keys in the map are the query IDs of the UDF queries being executed. */ private final Map queryIdToUDFClassLoaderMap; + private final Object activeClassLoaderLock = new Object(); + /** * activeClassLoader is used to load all classes under libRoot. libRoot may be updated before the * user executes CREATE FUNCTION or after the user executes DROP FUNCTION. Therefore, we need to @@ -64,12 +66,24 @@ private UDFClassLoaderManager() throws IOException { } public void initializeUDFQuery(String queryId) { - activeClassLoader.get().acquire(); - queryIdToUDFClassLoaderMap.put(queryId, activeClassLoader.get()); + final UDFClassLoader previousClassLoader; + synchronized (activeClassLoaderLock) { + final UDFClassLoader currentClassLoader = activeClassLoader.get(); + currentClassLoader.acquire(); + previousClassLoader = queryIdToUDFClassLoaderMap.put(queryId, currentClassLoader); + } + releaseClassLoader(queryId, previousClassLoader); } public void finalizeUDFQuery(String queryId) { - UDFClassLoader classLoader = queryIdToUDFClassLoaderMap.remove(queryId); + final UDFClassLoader classLoader; + synchronized (activeClassLoaderLock) { + classLoader = queryIdToUDFClassLoaderMap.remove(queryId); + } + releaseClassLoader(queryId, classLoader); + } + + private void releaseClassLoader(String queryId, UDFClassLoader classLoader) { try { if (classLoader != null) { classLoader.release(); @@ -81,12 +95,14 @@ public void finalizeUDFQuery(String queryId) { } public UDFClassLoader updateAndGetActiveClassLoader() throws IOException { - UDFClassLoader deprecatedClassLoader = activeClassLoader.get(); - activeClassLoader.set(new UDFClassLoader(libRoot)); - if (deprecatedClassLoader != null) { - deprecatedClassLoader.markAsDeprecated(); + synchronized (activeClassLoaderLock) { + UDFClassLoader deprecatedClassLoader = activeClassLoader.get(); + activeClassLoader.set(new UDFClassLoader(libRoot)); + if (deprecatedClassLoader != null) { + deprecatedClassLoader.markAsDeprecated(); + } + return activeClassLoader.get(); } - return activeClassLoader.get(); } ///////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoaderManagerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoaderManagerTest.java new file mode 100644 index 0000000000000..5c8776e7c7065 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/agent/plugin/service/PipePluginClassLoaderManagerTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.agent.plugin.service; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.util.concurrent.atomic.AtomicLong; + +public class PipePluginClassLoaderManagerTest { + + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void testDuplicatePluginClassLoaderRegistrationClosesPreviousClassLoader() + throws Exception { + PipePluginClassLoaderManager manager = newManager(); + TestingPipePluginClassLoader oldClassLoader = newClassLoader(); + TestingPipePluginClassLoader newClassLoader = newClassLoader(); + + manager.addPluginAndClassLoader("plugin", oldClassLoader); + manager.addPluginAndClassLoader("plugin", newClassLoader); + + Assert.assertTrue(oldClassLoader.isClosed()); + Assert.assertSame(newClassLoader, manager.getPluginClassLoader("plugin")); + } + + @Test + public void testClassLoaderReleaseDoesNotUnderflow() throws Exception { + PipePluginClassLoader classLoader = newClassLoader(); + + classLoader.release(); + Assert.assertEquals(0, getActiveInstanceCount(classLoader)); + + classLoader.acquire(); + classLoader.release(); + classLoader.release(); + Assert.assertEquals(0, getActiveInstanceCount(classLoader)); + } + + private PipePluginClassLoaderManager newManager() throws Exception { + Constructor constructor = + PipePluginClassLoaderManager.class.getDeclaredConstructor(String.class); + constructor.setAccessible(true); + return constructor.newInstance(temporaryFolder.newFolder("pipe-lib").getAbsolutePath()); + } + + private TestingPipePluginClassLoader newClassLoader() throws Exception { + return new TestingPipePluginClassLoader(temporaryFolder.newFolder().getAbsolutePath()); + } + + private long getActiveInstanceCount(PipePluginClassLoader classLoader) throws Exception { + Field activeInstanceCountField = + PipePluginClassLoader.class.getDeclaredField("activeInstanceCount"); + activeInstanceCountField.setAccessible(true); + return ((AtomicLong) activeInstanceCountField.get(classLoader)).get(); + } + + private static class TestingPipePluginClassLoader extends PipePluginClassLoader { + + private boolean closed; + + private TestingPipePluginClassLoader(String libRoot) throws Exception { + super(libRoot); + } + + @Override + public void close() throws java.io.IOException { + closed = true; + super.close(); + } + + private boolean isClosed() { + return closed; + } + } +} diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManagerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManagerTest.java new file mode 100644 index 0000000000000..5a358dca97f7e --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/service/UDFClassLoaderManagerTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.udf.service; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +public class UDFClassLoaderManagerTest { + + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void testDuplicateQueryInitializationReleasesPreviousClassLoader() throws Exception { + UDFClassLoaderManager manager = newManager(); + + manager.initializeUDFQuery("query"); + UDFClassLoader deprecatedClassLoader = getActiveClassLoader(manager); + + manager.updateAndGetActiveClassLoader(); + UDFClassLoader currentClassLoader = getActiveClassLoader(manager); + manager.initializeUDFQuery("query"); + + Assert.assertEquals(0, getActiveQueriesCount(deprecatedClassLoader)); + Assert.assertEquals(1, getActiveQueriesCount(currentClassLoader)); + + manager.finalizeUDFQuery("query"); + + Assert.assertEquals(0, getActiveQueriesCount(currentClassLoader)); + } + + @Test + public void testClassLoaderReleaseDoesNotUnderflow() throws Exception { + UDFClassLoader classLoader = + new UDFClassLoader(temporaryFolder.newFolder("udf-lib").getAbsolutePath()); + + classLoader.release(); + Assert.assertEquals(0, getActiveQueriesCount(classLoader)); + + classLoader.acquire(); + classLoader.release(); + classLoader.release(); + Assert.assertEquals(0, getActiveQueriesCount(classLoader)); + } + + private UDFClassLoaderManager newManager() throws Exception { + Constructor constructor = + UDFClassLoaderManager.class.getDeclaredConstructor(String.class); + constructor.setAccessible(true); + UDFClassLoaderManager manager = + constructor.newInstance(temporaryFolder.newFolder("udf-lib").getAbsolutePath()); + manager.start(); + return manager; + } + + private UDFClassLoader getActiveClassLoader(UDFClassLoaderManager manager) throws Exception { + Field activeClassLoaderField = + UDFClassLoaderManager.class.getDeclaredField("activeClassLoader"); + activeClassLoaderField.setAccessible(true); + AtomicReference activeClassLoader = + (AtomicReference) activeClassLoaderField.get(manager); + return activeClassLoader.get(); + } + + private long getActiveQueriesCount(UDFClassLoader classLoader) throws Exception { + Field activeQueriesCountField = UDFClassLoader.class.getDeclaredField("activeQueriesCount"); + activeQueriesCountField.setAccessible(true); + return ((AtomicLong) activeQueriesCountField.get(classLoader)).get(); + } +} From 57c443cb12b2ca5336e00ab53810064a280d1048 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 9 Jun 2026 14:09:23 +0800 Subject: [PATCH 3/3] Fix driver scheduler reserved queue tests --- .../queue/IndexedBlockingReserveQueue.java | 6 ++++ .../queue/IndexedBlockingQueueTest.java | 15 ++++++++++ .../schedule/DefaultDriverSchedulerTest.java | 29 +++++++++++++++++++ 3 files changed, 50 insertions(+) diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingReserveQueue.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingReserveQueue.java index 685286270516f..16ce9034010d2 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingReserveQueue.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingReserveQueue.java @@ -83,4 +83,10 @@ public synchronized void decreaseReservedSize() { Preconditions.checkState(reservedSize > 0, "No reserved space is available."); this.reservedSize--; } + + @Override + public synchronized void clear() { + super.clear(); + reservedSize = 0; + } } diff --git a/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingQueueTest.java b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingQueueTest.java index 278e88a7be7eb..094c7a10f8d2d 100644 --- a/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingQueueTest.java +++ b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/schedule/queue/IndexedBlockingQueueTest.java @@ -74,6 +74,21 @@ public void testReservedSpaceCanOnlyBeReleasedOnce() throws InterruptedException Assert.assertThrows(IllegalStateException.class, queue::decreaseReservedSize); } + @Test + public void testClearReleasesReservedSpace() throws InterruptedException { + ReserveQueue queue = new ReserveQueue(1); + Element first = new Element(1); + Element second = new Element(2); + + queue.push(first); + Assert.assertSame(first, queue.poll()); + queue.clear(); + + queue.push(second); + Assert.assertEquals(1, queue.size()); + Assert.assertSame(second, queue.poll()); + } + private static class SimpleQueue extends IndexedBlockingQueue { private final Queue elements = new ArrayDeque<>(); private final Map keyedElements = new HashMap<>(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java index 0cc0e5edd71d5..f5c5cda1db4e0 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DefaultDriverSchedulerTest.java @@ -121,6 +121,7 @@ public void testBlockedToReady() { fragmentRelatedTask.put(instanceId, taskSet); manager.getQueryMap().put(queryId, fragmentRelatedTask); manager.getTimeoutQueue().push(testTask); + reserveReadyQueueSlotForPolledTask(testTask); defaultScheduler.blockedToReady(testTask); Assert.assertEquals(DriverTaskStatus.READY, testTask.getStatus()); Assert.assertFalse(manager.getBlockedTasks().contains(testTask)); @@ -245,6 +246,7 @@ public void testRunningToReady() { fragmentRelatedTask.put(instanceId, taskSet); manager.getQueryMap().put(queryId, fragmentRelatedTask); manager.getTimeoutQueue().push(testTask); + reserveReadyQueueSlotForPolledTask(testTask); ExecutionContext context = new ExecutionContext(); context.setTimeSlice(new Duration(1, TimeUnit.SECONDS)); context.setCpuDuration(new CpuTimer.CpuDuration()); @@ -307,6 +309,7 @@ public void testRunningToBlocked() { fragmentRelatedTask.put(instanceId, taskSet); manager.getQueryMap().put(queryId, fragmentRelatedTask); manager.getTimeoutQueue().push(testTask); + reserveReadyQueueSlotForPolledTask(testTask); ExecutionContext context = new ExecutionContext(); context.setTimeSlice(new Duration(1, TimeUnit.SECONDS)); context.setCpuDuration(new CpuTimer.CpuDuration()); @@ -369,6 +372,7 @@ public void testRunningToFinished() { fragmentRelatedTask.put(instanceId, taskSet); manager.getQueryMap().put(queryId, fragmentRelatedTask); manager.getTimeoutQueue().push(testTask); + reserveReadyQueueSlotForPolledTask(testTask); ExecutionContext context = new ExecutionContext(); context.setTimeSlice(new Duration(1, TimeUnit.SECONDS)); context.setCpuDuration(new CpuTimer.CpuDuration()); @@ -465,6 +469,17 @@ public void testToAbort() { fragmentRelatedTask.put(instanceId2, taskSet2); manager.getQueryMap().put(queryId, fragmentRelatedTask); manager.getTimeoutQueue().push(testTask1); + manager.getTimeoutQueue().push(testTask2); + reserveReadyQueueSlotForPolledTask(testTask2); + manager.getBlockedTasks().add(testTask2); + if (status == DriverTaskStatus.READY) { + manager.getReadyQueue().push(testTask1); + } else { + reserveReadyQueueSlotForPolledTask(testTask1); + if (status == DriverTaskStatus.BLOCKED) { + manager.getBlockedTasks().add(testTask1); + } + } defaultScheduler.toAborted(testTask1); Mockito.reset(mockMppServiceClient); @@ -495,4 +510,18 @@ private void clear() { manager.getReadyQueue().clear(); manager.getTimeoutQueue().clear(); } + + private void reserveReadyQueueSlotForPolledTask(DriverTask task) { + DriverTaskStatus status = task.getStatus(); + try { + task.setStatus(DriverTaskStatus.READY); + manager.getReadyQueue().push(task); + Assert.assertSame(task, manager.getReadyQueue().poll()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + Assert.fail("Interrupted while preparing ready queue reservation"); + } finally { + task.setStatus(status); + } + } }