Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,46 +55,46 @@ public void testCompressionRatioFile() throws SQLException {
Statement statement = connection.createStatement()) {
statement.execute("insert into root.comprssion_ratio_file.d1(timestamp,s1) values(1,1.0)");
statement.execute("flush");
// one global file and two data region file (including one AUDIT region)
// one global file and one data region file
assertEquals(2, collectCompressionRatioFiles(nodeWrapper).size());

statement.execute("drop database root.comprssion_ratio_file");
// one global file and system region file
// one global file
// deleting a file may not be sensed by other processes instantly
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.pollDelay(100, TimeUnit.MILLISECONDS)
.until(() -> collectCompressionRatioFiles(nodeWrapper).size() == 2);
.until(() -> collectCompressionRatioFiles(nodeWrapper).size() == 1);

statement.execute("insert into root.comprssion_ratio_file.d1(timestamp,s1) values(1,1.0)");
statement.execute("flush");
assertEquals(3, collectCompressionRatioFiles(nodeWrapper).size());
assertEquals(2, collectCompressionRatioFiles(nodeWrapper).size());

statement.execute("drop database root.comprssion_ratio_file");
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.pollDelay(100, TimeUnit.MILLISECONDS)
.until(() -> collectCompressionRatioFiles(nodeWrapper).size() == 2);
.until(() -> collectCompressionRatioFiles(nodeWrapper).size() == 1);

statement.execute("insert into root.comprssion_ratio_file.d1(timestamp,s1) values(1,1.0)");
statement.execute("flush");
assertEquals(3, collectCompressionRatioFiles(nodeWrapper).size());
assertEquals(2, collectCompressionRatioFiles(nodeWrapper).size());

statement.execute("insert into root.comprssion_ratio_file_2.d1(timestamp,s1) values(1,1.0)");
statement.execute("flush");
assertEquals(4, collectCompressionRatioFiles(nodeWrapper).size());
assertEquals(3, collectCompressionRatioFiles(nodeWrapper).size());

statement.execute("drop database root.comprssion_ratio_file");
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.pollDelay(100, TimeUnit.MILLISECONDS)
.until(() -> collectCompressionRatioFiles(nodeWrapper).size() == 3);
.until(() -> collectCompressionRatioFiles(nodeWrapper).size() == 2);

statement.execute("drop database root.comprssion_ratio_file_2");
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.pollDelay(100, TimeUnit.MILLISECONDS)
.until(() -> collectCompressionRatioFiles(nodeWrapper).size() == 2);
.until(() -> collectCompressionRatioFiles(nodeWrapper).size() == 1);
} finally {
simpleEnv.cleanClusterEnvironment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,26 @@ private StorageEngineMessages() {}
// ======================== Snapshot ========================

public static final String EXCEPTION_LOAD_SNAPSHOT = "Exception occurs while load snapshot from {}";
public static final String LOADING_SNAPSHOT_FOR = "Loading snapshot for {}-{}, source directory is {}";
public static final String EXCEPTION_LOADING_SNAPSHOT_FOR = "Exception occurs when loading snapshot for {}-{}";
public static final String READING_SNAPSHOT_LOG_FILE = "Reading snapshot log file {}";
public static final String REMOVE_ALL_DATA_FILES_IN_ORIGINAL_DIR = "Remove all data files in original data dir";
public static final String FAILED_TO_REMOVE_ORIGIN_DATA_FILES = "Failed to remove origin data files";
public static final String MOVING_SNAPSHOT_FILE_TO_DATA_DIRS = "Moving snapshot file to data dirs";
public static final String CANNOT_FIND_SNAPSHOT_DIRECTORY = "Cannot find snapshot directory %s";
public static final String NO_SEQ_OR_UNSEQ_FILES_IN_SNAPSHOT =
"No seq or unseq files in snapshot {}, skip creating file links";
public static final String EXCEPTION_DELETING_TIME_PARTITION_DIR =
"Exception occurs when deleting time partition directory for {}-{}";
public static final String CANNOT_CREATE_LINK_FALLBACK_COPY =
"Cannot create link from {} to {}, fallback to copy";
public static final String FAILED_TO_PROCESS_SNAPSHOT_FILE =
"Failed to process file {} in dir {}: {}";
public static final String FAILED_TO_PROCESS_SNAPSHOT_FILE_AFTER_RETRIES =
"Failed to process file after retries. Source: %s, Target suffix: %s";
public static final String SNAPSHOT_FILE_NUM_MISMATCH =
"The file num in log is %d, while file num in disk is %d";
public static final String SNAPSHOT_FILE_NOT_IN_LOG = "File %s is not in the log file list";
public static final String NO_COMPRESSION_RATIO_FILE_IN_DIR = "No compression ratio file in dir {}";
public static final String CANNOT_LOAD_COMPRESSION_RATIO = "Cannot load compression ratio from {}";
public static final String LOADED_COMPRESSION_RATIO = "Loaded compression ratio from {}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,26 @@ private StorageEngineMessages() {}
// ======================== Snapshot ========================

public static final String EXCEPTION_LOAD_SNAPSHOT = "从 {} 加载快照时发生异常";
public static final String LOADING_SNAPSHOT_FOR = "正在为 {}-{} 加载快照,源目录为 {}";
public static final String EXCEPTION_LOADING_SNAPSHOT_FOR = "为 {}-{} 加载快照时发生异常";
public static final String READING_SNAPSHOT_LOG_FILE = "正在读取快照日志文件 {}";
public static final String REMOVE_ALL_DATA_FILES_IN_ORIGINAL_DIR = "移除原始数据目录中的所有数据文件";
public static final String FAILED_TO_REMOVE_ORIGIN_DATA_FILES = "移除原始数据文件失败";
public static final String MOVING_SNAPSHOT_FILE_TO_DATA_DIRS = "正在将快照文件移动到数据目录";
public static final String CANNOT_FIND_SNAPSHOT_DIRECTORY = "找不到快照目录 %s";
public static final String NO_SEQ_OR_UNSEQ_FILES_IN_SNAPSHOT =
"快照 {} 中没有顺序或乱序文件,跳过创建文件链接";
public static final String EXCEPTION_DELETING_TIME_PARTITION_DIR =
"删除 {}-{} 的时间分区目录时发生异常";
public static final String CANNOT_CREATE_LINK_FALLBACK_COPY =
"无法创建从 {} 到 {} 的链接,回退为复制";
public static final String FAILED_TO_PROCESS_SNAPSHOT_FILE =
"处理文件 {} 失败,所在目录为 {}: {}";
public static final String FAILED_TO_PROCESS_SNAPSHOT_FILE_AFTER_RETRIES =
"重试后仍无法处理文件。源文件: %s,目标后缀: %s";
public static final String SNAPSHOT_FILE_NUM_MISMATCH =
"日志中的文件数为 %d,但磁盘中的文件数为 %d";
public static final String SNAPSHOT_FILE_NOT_IN_LOG = "文件 %s 不在日志文件列表中";
public static final String NO_COMPRESSION_RATIO_FILE_IN_DIR = "目录 {} 中没有压缩率文件";
public static final String CANNOT_LOAD_COMPRESSION_RATIO = "无法从 {} 加载压缩率";
public static final String LOADED_COMPRESSION_RATIO = "已从 {} 加载压缩率";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2201,7 +2201,9 @@
databaseName + "-" + dataRegionIdString,
systemDir);
int regionId = dataRegionId.getId();
TableDiskUsageIndex.getInstance().remove(databaseName, regionId);
if (isTableModel) {
TableDiskUsageIndex.getInstance().remove(databaseName, regionId);
}
FileTimeIndexCacheRecorder.getInstance().removeFileTimeIndexCache(regionId);
writeLock("deleteFolder");
try {
Expand Down Expand Up @@ -2635,7 +2637,7 @@
if (tsFileProcessor == null) {
// tsFileProcessor == null means this tsfile is being closed, here we try to busy loop
// until status in TsFileResource has been changed which is supposed to be the last step
// of closing

Check warning on line 2640 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 81 to 64, Complexity from 19 to 14, Nesting Level from 5 to 2, Number of Variables from 17 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6rcLubpgT3RC78Ivsd&open=AZ6rcLubpgT3RC78Ivsd&pullRequest=17880
while (!tsFileResource.isClosed() && waitTimeInMs > 0) {
TimeUnit.MILLISECONDS.sleep(5);
waitTimeInMs -= 5;
Expand Down Expand Up @@ -3073,7 +3075,7 @@
writeLock("delete");
boolean hasReleasedLock = false;
try {
if (deleted) {

Check warning on line 3078 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 97 to 64, Complexity from 17 to 14, Nesting Level from 7 to 2, Number of Variables from 24 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6rcLubpgT3RC78Ivse&open=AZ6rcLubpgT3RC78Ivse&pullRequest=17880
return;
}
String tableName = modEntries.get(0).getTableName();
Expand Down Expand Up @@ -3486,7 +3488,7 @@
long matchSize =
devicesInFile.stream()
.filter(
device -> {

Check warning on line 3491 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 149 to 64, Complexity from 29 to 14, Nesting Level from 5 to 2, Number of Variables from 26 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6rcLubpgT3RC78Ivsf&open=AZ6rcLubpgT3RC78Ivsf&pullRequest=17880
if (logger.isDebugEnabled()) {
logger.debug(
"device is {}, deviceTable is {}, tableDeletionEntry.getPredicate().matches(device) is {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,7 @@ private File getSnapshotLogFile() {
*/
public DataRegion loadSnapshotForStateMachine() {
LOGGER.info(
"Loading snapshot for {}-{}, source directory is {}",
storageGroupName,
dataRegionId,
snapshotPath);
StorageEngineMessages.LOADING_SNAPSHOT_FOR, storageGroupName, dataRegionId, snapshotPath);

File snapshotLogFile = getSnapshotLogFile();

Expand All @@ -129,7 +126,7 @@ private DataRegion loadSnapshotWithoutLog() {
return loadSnapshot();
} catch (IOException | DiskSpaceInsufficientException e) {
LOGGER.error(
"Exception occurs when loading snapshot for {}-{}", storageGroupName, dataRegionId, e);
StorageEngineMessages.EXCEPTION_LOADING_SNAPSHOT_FOR, storageGroupName, dataRegionId, e);
return null;
}
}
Expand Down Expand Up @@ -240,7 +237,7 @@ private void deleteAllFilesInDataDirs() throws IOException {
}
} catch (IOException e) {
LOGGER.error(
"Exception occurs when deleting time partition directory for {}-{}",
StorageEngineMessages.EXCEPTION_DELETING_TIME_PARTITION_DIR,
storageGroupName,
dataRegionId,
e);
Expand All @@ -250,6 +247,11 @@ private void deleteAllFilesInDataDirs() throws IOException {

private void createLinksFromSnapshotDirToDataDirWithoutLog(File sourceDir)
throws IOException, DiskSpaceInsufficientException {
if (!sourceDir.exists()) {
throw new IOException(
String.format(
StorageEngineMessages.CANNOT_FIND_SNAPSHOT_DIRECTORY, sourceDir.getAbsolutePath()));
}
File seqFileDir =
new File(
sourceDir,
Expand All @@ -267,10 +269,8 @@ private void createLinksFromSnapshotDirToDataDirWithoutLog(File sourceDir)
+ File.separator
+ dataRegionId);
if (!seqFileDir.exists() && !unseqFileDir.exists()) {
throw new IOException(
String.format(
"Cannot find %s or %s",
seqFileDir.getAbsolutePath(), unseqFileDir.getAbsolutePath()));
LOGGER.warn(StorageEngineMessages.NO_SEQ_OR_UNSEQ_FILES_IN_SNAPSHOT, sourceDir);
return;
}
FolderManager folderManager =
new FolderManager(
Expand Down Expand Up @@ -329,24 +329,29 @@ private File createLinksFromSnapshotToSourceDir(
if (!targetFile.getParentFile().exists() && !targetFile.getParentFile().mkdirs()) {
throw new IOException(
String.format(
"Cannot create directory %s", targetFile.getParentFile().getAbsolutePath()));
StorageEngineMessages.FAILED_TO_CREATE_DIR,
targetFile.getParentFile().getAbsolutePath()));
}

try {
Files.createLink(targetFile.toPath(), file.toPath());
LOGGER.debug("Created hard link from {} to {}", file, targetFile);
LOGGER.debug(StorageEngineMessages.CREATED_HARD_LINK, file, targetFile);
fileTarget.put(fileKey, finalDir);
return targetFile;
} catch (IOException e) {
LOGGER.info("Cannot create link from {} to {}, fallback to copy", file, targetFile);
LOGGER.info(StorageEngineMessages.CANNOT_CREATE_LINK_FALLBACK_COPY, file, targetFile);
}

Files.copy(file.toPath(), targetFile.toPath());
fileTarget.put(fileKey, finalDir);
return targetFile;
} catch (Exception e) {
LOGGER.warn(
"Failed to process file {} in dir {}: {}", file.getName(), finalDir, e.getMessage(), e);
StorageEngineMessages.FAILED_TO_PROCESS_SNAPSHOT_FILE,
file.getName(),
finalDir,
e.getMessage(),
e);
throw e;
}
}
Expand Down Expand Up @@ -381,8 +386,9 @@ private void createLinksFromSnapshotToSourceDir(
} catch (Exception e) {
throw new IOException(
String.format(
"Failed to process file after retries. Source: %s, Target suffix: %s",
file.getAbsolutePath(), targetSuffix),
StorageEngineMessages.FAILED_TO_PROCESS_SNAPSHOT_FILE_AFTER_RETRIES,
file.getAbsolutePath(),
targetSuffix),
e);
}
}
Expand All @@ -409,8 +415,7 @@ private void createLinksFromSnapshotDirToDataDirWithLog() throws IOException {
}
if (fileCnt != loggedFileNum) {
throw new IOException(
String.format(
"The file num in log is %d, while file num in disk is %d", loggedFileNum, fileCnt));
String.format(StorageEngineMessages.SNAPSHOT_FILE_NUM_MISMATCH, loggedFileNum, fileCnt));
}
}

Expand Down Expand Up @@ -492,13 +497,14 @@ private void createLinksFromSourceToTarget(File targetDir, File[] files, Set<Str
String infoStr = getFileInfoString(file);
if (!fileInfoSet.contains(infoStr)) {
throw new IOException(
String.format("File %s is not in the log file list", file.getAbsolutePath()));
String.format(StorageEngineMessages.SNAPSHOT_FILE_NOT_IN_LOG, file.getAbsolutePath()));
}
File targetFile = new File(targetDir, file.getName());
if (!targetFile.getParentFile().exists() && !targetFile.getParentFile().mkdirs()) {
throw new IOException(
String.format(
"Cannot create directory %s", targetFile.getParentFile().getAbsolutePath()));
StorageEngineMessages.FAILED_TO_CREATE_DIR,
targetFile.getParentFile().getAbsolutePath()));
}
Files.createLink(targetFile.toPath(), file.toPath());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,18 +443,25 @@ private RemoveRegionOperation(String database, int regionId) {

@Override
public void apply(TableDiskUsageIndex tableDiskUsageIndex) {
tableDiskUsageIndex.writerMap.computeIfPresent(
regionId,
(k, writer) -> {
if (writer.getActiveReaderNum() > 0) {
// If there are active readers, defer removal until all readers finish
writer.setRemovedFuture(future);
return writer;
}
writer.close();
future.complete(null);
return null;
});
DataRegionTableSizeIndexWriter removedWriter = null;
try {
removedWriter =
tableDiskUsageIndex.writerMap.computeIfPresent(
regionId,
(k, writer) -> {
if (writer.getActiveReaderNum() > 0) {
// If there are active readers, defer removal until all readers finish
writer.setRemovedFuture(future);
return writer;
}
writer.close();
return null;
});
} finally {
if (removedWriter == null) {
future.complete(null);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,32 @@ public void testCreateSnapshotWithUnclosedTsFile()
}
}

@Test
public void testLoadEmptySnapshotWithoutLog() throws IOException {
String[][] originDataDirs = IoTDBDescriptor.getInstance().getConfig().getTierDataDirs();
IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(testDataDirs);
TierManager.getInstance().resetFolders();
File snapshotDir = new File("target" + File.separator + "empty-snapshot");
try {
if (snapshotDir.exists()) {
FileUtils.recursivelyDeleteFolder(snapshotDir.getAbsolutePath());
}
Assert.assertTrue(snapshotDir.mkdirs());

DataRegion dataRegion =
new SnapshotLoader(snapshotDir.getAbsolutePath(), testSgName, "0")
.loadSnapshotForStateMachine();

Assert.assertNotNull(dataRegion);
assertEquals(0, dataRegion.getTsFileManager().getTsFileList(true).size());
assertEquals(0, dataRegion.getTsFileManager().getTsFileList(false).size());
} finally {
FileUtils.recursivelyDeleteFolder(snapshotDir.getAbsolutePath());
IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(originDataDirs);
TierManager.getInstance().resetFolders();
}
}

@Test
public void testLoadSnapshot()
throws IOException, WriteProcessException, DataRegionException, DirectoryNotLegalException {
Expand Down
Loading