diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java index c7fc22b29f..fc6d47f885 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java @@ -410,9 +410,14 @@ void setReleaser(ByteBufferReleaser releaser) { @Override public void close() { - for (ColumnChunkPageReader reader : readers.values()) { - reader.releaseBuffers(); + try { + for (ColumnChunkPageReader reader : readers.values()) { + reader.releaseBuffers(); + } + } finally { + if (releaser != null) { + releaser.close(); + } } - releaser.close(); } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageReadStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageReadStore.java new file mode 100644 index 0000000000..00d205e136 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageReadStore.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import static org.apache.parquet.column.Encoding.PLAIN; +import static org.apache.parquet.column.Encoding.RLE; +import static org.junit.Assert.assertEquals; + +import java.nio.ByteBuffer; +import java.util.Collections; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.bytes.ByteBufferReleaser; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.bytes.TrackingByteBufferAllocator; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; +import org.apache.parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type.Repetition; +import org.junit.Test; + +public class TestColumnChunkPageReadStore { + + private static final ColumnDescriptor COLUMN = new ColumnDescriptor( + new String[] {"x"}, new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.INT32, "x"), 0, 0); + + private static final BytesInputDecompressor NOOP_DECOMPRESSOR = new BytesInputDecompressor() { + @Override + public BytesInput decompress(BytesInput bytes, int decompressedSize) { + return bytes; + } + + @Override + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) {} + + @Override + public void release() {} + }; + + @Test + public void closeWithoutSetReleaserDoesNotThrow() { + try (TrackingByteBufferAllocator allocator = TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) { + ParquetReadOptions options = + ParquetReadOptions.builder().withAllocator(allocator).build(); + + ColumnChunkPageReadStore store = new ColumnChunkPageReadStore(0L); + store.addColumn(COLUMN, newReaderWithoutPages(options)); + + // setReleaser() is intentionally NOT called here. + store.close(); + } + } + + @Test + public void closeReleasesReleaserEvenWhenReaderThrows() throws Exception { + RuntimeException releaseFailure = new RuntimeException("release boom"); + + ByteBufferAllocator throwingAllocator = new ByteBufferAllocator() { + @Override + public ByteBuffer allocate(int size) { + return ByteBuffer.allocateDirect(size); + } + + @Override + public void release(ByteBuffer b) { + throw releaseFailure; + } + + @Override + public boolean isDirect() { + return true; + } + }; + + try (TrackingByteBufferAllocator storeAllocator = + TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) { + ColumnChunkPageReadStore store = new ColumnChunkPageReadStore(1L); + store.addColumn(COLUMN, newReaderWithQueuedBuffer(throwingAllocator)); + + // The store-level releaser holds a tracked buffer that must be released by close()'s finally block. + ByteBufferReleaser storeReleaser = new ByteBufferReleaser(storeAllocator); + storeReleaser.releaseLater(storeAllocator.allocate(8)); + store.setReleaser(storeReleaser); + + try { + store.close(); + throw new AssertionError("Expected close() to propagate the reader failure"); + } catch (RuntimeException e) { + assertEquals(releaseFailure, e); + } + } + } + + private static ColumnChunkPageReader newReaderWithoutPages(ParquetReadOptions options) { + return new ColumnChunkPageReader( + NOOP_DECOMPRESSOR, Collections.emptyList(), null, null, 0L, null, null, 0, 0, options); + } + + private static ColumnChunkPageReader newReaderWithQueuedBuffer(ByteBufferAllocator allocator) { + ParquetReadOptions options = ParquetReadOptions.builder() + .withAllocator(allocator) + .useOffHeapDecryptBuffer(true) + .build(); + + ByteBuffer pageBytes = ByteBuffer.allocateDirect(4); + pageBytes.putInt(0, 42); + DataPageV1 page = new DataPageV1(BytesInput.from(pageBytes), 1, 4, null, RLE, RLE, PLAIN); + + ColumnChunkPageReader reader = new ColumnChunkPageReader( + NOOP_DECOMPRESSOR, + Collections.singletonList(page), + null, + null, + 1L, + null, + null, + 0, + 0, + options); + + // Reading the page through the off-heap path queues a buffer into the reader's internal releaser, so + // releaseBuffers() will later invoke the throwing allocator's release(). + if (reader.readPage() == null) { + throw new IllegalStateException("Expected a page to be read"); + } + return reader; + } +}