From 3f22889c59a5fedfd80b3c5f6c9dd70c9acee888 Mon Sep 17 00:00:00 2001 From: 761417898 <761417898@qq.com> Date: Mon, 24 Nov 2025 12:20:12 +0800 Subject: [PATCH 1/7] move FolderManager to node-common module --- .../pipeconsensus/PipeConsensusReceiver.java | 6 +++--- .../protocol/thrift/IoTDBDataNodeReceiver.java | 6 +++--- .../sink/util/builder/PipeTsFileBuilder.java | 6 +++--- .../db/storageengine/dataregion/DataRegion.java | 2 +- .../execute/task/InnerSpaceCompactionTask.java | 2 +- .../impl/SizeTieredCompactionSelector.java | 2 +- .../dataregion/snapshot/SnapshotLoader.java | 6 +++--- .../tsfile/generator/TsFileNameGenerator.java | 4 ++-- .../AbstractNodeAllocationStrategy.java | 6 +++--- .../db/storageengine/load/LoadTsFileManager.java | 6 +++--- .../load/active/ActiveLoadUtil.java | 6 +++--- .../load/disk/ILoadDiskSelector.java | 2 +- .../InheritSystemMultiDisksStrategySelector.java | 2 +- .../storageengine/load/disk/MinIOSelector.java | 2 +- .../storageengine/rescon/disk/TierManager.java | 11 ++++++----- .../strategy/DirectoryStrategyTest.java | 10 +++++----- .../utils/CompactionFileGeneratorUtils.java | 2 +- .../rescon/disk/FolderManagerTest.java | 5 +++-- .../iotdb/commons}/disk/FolderManager.java | 16 ++++++++-------- .../disk/strategy/DirectoryStrategy.java | 8 ++++---- .../disk/strategy/DirectoryStrategyType.java | 2 +- .../MaxDiskUsableSpaceFirstStrategy.java | 4 ++-- .../MinFolderOccupiedSpaceFirstStrategy.java | 4 ++-- .../RandomOnDiskUsableSpaceStrategy.java | 4 ++-- .../commons}/disk/strategy/SequenceStrategy.java | 4 ++-- .../DiskSpaceInsufficientException.java | 4 ++-- 26 files changed, 67 insertions(+), 65 deletions(-) rename iotdb-core/{datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon => node-commons/src/main/java/org/apache/iotdb/commons}/disk/FolderManager.java (88%) rename iotdb-core/{datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon => node-commons/src/main/java/org/apache/iotdb/commons}/disk/strategy/DirectoryStrategy.java (92%) rename iotdb-core/{datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon => node-commons/src/main/java/org/apache/iotdb/commons}/disk/strategy/DirectoryStrategyType.java (93%) rename iotdb-core/{datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon => node-commons/src/main/java/org/apache/iotdb/commons}/disk/strategy/MaxDiskUsableSpaceFirstStrategy.java (92%) rename iotdb-core/{datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon => node-commons/src/main/java/org/apache/iotdb/commons}/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java (93%) rename iotdb-core/{datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon => node-commons/src/main/java/org/apache/iotdb/commons}/disk/strategy/RandomOnDiskUsableSpaceStrategy.java (94%) rename iotdb-core/{datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon => node-commons/src/main/java/org/apache/iotdb/commons}/disk/strategy/SequenceStrategy.java (95%) rename iotdb-core/{datanode/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/exception/DiskSpaceInsufficientException.java (91%) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java index b250046d5c04..578ddb86ae11 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java @@ -27,6 +27,9 @@ import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.ProgressIndexType; +import org.apache.iotdb.commons.disk.FolderManager; +import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent; import org.apache.iotdb.commons.pipe.sink.payload.pipeconsensus.request.PipeConsensusRequestType; import org.apache.iotdb.commons.pipe.sink.payload.pipeconsensus.request.PipeConsensusRequestVersion; @@ -44,7 +47,6 @@ import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.exception.load.LoadFileException; import org.apache.iotdb.db.pipe.consensus.metric.PipeConsensusReceiverMetrics; import org.apache.iotdb.db.pipe.event.common.tsfile.aggregator.TsFileInsertionPointCounter; @@ -62,8 +64,6 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils; import org.apache.iotdb.db.storageengine.load.LoadTsFileManager; -import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 652530d11b48..0d36ea5e9ad5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -23,6 +23,9 @@ import org.apache.iotdb.commons.audit.IAuditEntity; import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.disk.FolderManager; +import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.path.PartialPath; @@ -46,7 +49,6 @@ import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent; import org.apache.iotdb.db.pipe.metric.receiver.PipeDataNodeReceiverMetrics; @@ -102,8 +104,6 @@ import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper; import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil; -import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType; import org.apache.iotdb.db.tools.schema.SRStatementGenerator; import org.apache.iotdb.db.tools.schema.SchemaRegionSnapshotParser; import org.apache.iotdb.db.utils.DataNodeAuthUtils; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileBuilder.java index 815579638229..c5eec5bbec90 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileBuilder.java @@ -19,10 +19,10 @@ package org.apache.iotdb.db.pipe.sink.util.builder; +import org.apache.iotdb.commons.disk.FolderManager; +import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; -import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.tsfile.common.constant.TsFileConstant; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 02aaaed91934..5efff3a05cf2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.file.SystemFileFactory; import org.apache.iotdb.commons.path.IFullPath; @@ -50,7 +51,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.BatchProcessException; import org.apache.iotdb.db.exception.DataRegionException; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.exception.TsFileProcessorException; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.WriteProcessRejectException; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java index ef4d1d850fdb..731cd04f7e60 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java @@ -19,9 +19,9 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.db.service.metrics.FileMetrics; import org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java index 314ae8349e90..1ed8572e2a97 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java @@ -20,9 +20,9 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.RepairUnsortedFileCompactionTask; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java index 9ac4956724c8..9f977199f89b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java @@ -20,13 +20,13 @@ package org.apache.iotdb.db.storageengine.dataregion.snapshot; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.disk.FolderManager; +import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio; -import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType; import org.apache.tsfile.external.commons.io.FileUtils; import org.slf4j.Logger; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java index 16be82188e9c..280307362fdc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java @@ -20,11 +20,11 @@ package org.apache.iotdb.db.storageengine.dataregion.tsfile.generator; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.disk.FolderManager; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.utils.TestOnly; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; -import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; import org.apache.tsfile.common.constant.TsFileConstant; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/allocation/AbstractNodeAllocationStrategy.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/allocation/AbstractNodeAllocationStrategy.java index 119f262ea660..15cd670aff43 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/allocation/AbstractNodeAllocationStrategy.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/allocation/AbstractNodeAllocationStrategy.java @@ -21,12 +21,12 @@ import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; +import org.apache.iotdb.commons.disk.FolderManager; +import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.storageengine.dataregion.wal.node.IWALNode; import org.apache.iotdb.db.storageengine.dataregion.wal.node.WALFakeNode; import org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode; -import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java index 30bb64cbd316..fa45435fd0e9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java @@ -25,6 +25,9 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; +import org.apache.iotdb.commons.disk.FolderManager; +import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.file.SystemFileFactory; import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.service.metric.MetricService; @@ -35,7 +38,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.exception.load.LoadFileException; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; @@ -53,8 +55,6 @@ import org.apache.iotdb.db.storageengine.load.splitter.ChunkData; import org.apache.iotdb.db.storageengine.load.splitter.DeletionData; import org.apache.iotdb.db.storageengine.load.splitter.TsFileData; -import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType; import org.apache.iotdb.metrics.utils.MetricLevel; import com.github.benmanes.caffeine.cache.Cache; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java index e3dbe43507d8..7b9c856c065f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java @@ -19,12 +19,12 @@ package org.apache.iotdb.db.storageengine.load.active; +import org.apache.iotdb.commons.disk.FolderManager; +import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.storageengine.load.disk.ILoadDiskSelector; -import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java index e9216e9b5551..b013c7e9ad47 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.storageengine.load.disk; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.exception.load.LoadFileException; import java.io.File; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java index c56addc4bf90..c937f8fa966a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.storageengine.load.disk; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.exception.load.LoadFileException; import org.apache.tsfile.fileSystem.FSFactoryProducer; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java index 9956148b0835..e5b1836a90b3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.storageengine.load.disk; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.exception.load.LoadFileException; import org.apache.iotdb.metrics.utils.FileStoreUtils; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java index d929f7656077..f0579d77e1ce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java @@ -19,13 +19,14 @@ package org.apache.iotdb.db.storageengine.rescon.disk; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.disk.FolderManager; +import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; +import org.apache.iotdb.commons.disk.strategy.MaxDiskUsableSpaceFirstStrategy; +import org.apache.iotdb.commons.disk.strategy.MinFolderOccupiedSpaceFirstStrategy; +import org.apache.iotdb.commons.disk.strategy.RandomOnDiskUsableSpaceStrategy; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.MaxDiskUsableSpaceFirstStrategy; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.MinFolderOccupiedSpaceFirstStrategy; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.RandomOnDiskUsableSpaceStrategy; import org.apache.iotdb.metrics.utils.FileStoreUtils; import org.apache.tsfile.fileSystem.FSFactoryProducer; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategyTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategyTest.java index 1a95ca327686..6343dcc16994 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategyTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategyTest.java @@ -18,12 +18,12 @@ */ package org.apache.iotdb.db.conf.directories.strategy; +import org.apache.iotdb.commons.disk.strategy.MaxDiskUsableSpaceFirstStrategy; +import org.apache.iotdb.commons.disk.strategy.MinFolderOccupiedSpaceFirstStrategy; +import org.apache.iotdb.commons.disk.strategy.RandomOnDiskUsableSpaceStrategy; +import org.apache.iotdb.commons.disk.strategy.SequenceStrategy; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.utils.JVMCommonUtils; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.MaxDiskUsableSpaceFirstStrategy; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.MinFolderOccupiedSpaceFirstStrategy; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.RandomOnDiskUsableSpaceStrategy; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.SequenceStrategy; import org.apache.iotdb.db.utils.constant.TestConstant; import org.junit.After; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionFileGeneratorUtils.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionFileGeneratorUtils.java index 2573a95d1dad..fff273adf768 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionFileGeneratorUtils.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionFileGeneratorUtils.java @@ -20,10 +20,10 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.utils; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManagerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManagerTest.java index c8ad437a8c34..52f13c16e649 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManagerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManagerTest.java @@ -19,8 +19,9 @@ package org.apache.iotdb.db.storageengine.rescon.disk; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType; +import org.apache.iotdb.commons.disk.FolderManager; +import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.tsfile.fileSystem.FSFactoryProducer; import org.apache.tsfile.fileSystem.fsFactory.FSFactory; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/FolderManager.java similarity index 88% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManager.java rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/FolderManager.java index e90292853f14..95cb24fa8664 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/FolderManager.java @@ -17,17 +17,17 @@ * under the License. */ -package org.apache.iotdb.db.storageengine.rescon.disk; +package org.apache.iotdb.commons.disk; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.conf.CommonDescriptor; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategy; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.MaxDiskUsableSpaceFirstStrategy; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.MinFolderOccupiedSpaceFirstStrategy; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.RandomOnDiskUsableSpaceStrategy; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.SequenceStrategy; +import org.apache.iotdb.commons.disk.strategy.DirectoryStrategy; +import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; +import org.apache.iotdb.commons.disk.strategy.MaxDiskUsableSpaceFirstStrategy; +import org.apache.iotdb.commons.disk.strategy.MinFolderOccupiedSpaceFirstStrategy; +import org.apache.iotdb.commons.disk.strategy.RandomOnDiskUsableSpaceStrategy; +import org.apache.iotdb.commons.disk.strategy.SequenceStrategy; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/strategy/DirectoryStrategy.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/DirectoryStrategy.java similarity index 92% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/strategy/DirectoryStrategy.java rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/DirectoryStrategy.java index bac8d8e47a82..626bb6af29f4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/strategy/DirectoryStrategy.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/DirectoryStrategy.java @@ -16,13 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.storageengine.rescon.disk.strategy; +package org.apache.iotdb.commons.disk.strategy; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.disk.FolderManager; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.utils.JVMCommonUtils; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; -import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +32,7 @@ import java.util.List; import java.util.Map; -import static org.apache.iotdb.db.storageengine.rescon.disk.FolderManager.FolderState.HEALTHY; +import static org.apache.iotdb.commons.disk.FolderManager.FolderState.HEALTHY; /** * The basic class of all the strategies of multiple directories. If a user wants to define his own diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/strategy/DirectoryStrategyType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/DirectoryStrategyType.java similarity index 93% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/strategy/DirectoryStrategyType.java rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/DirectoryStrategyType.java index 195ad9c8e72f..2d081dd87f51 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/strategy/DirectoryStrategyType.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/DirectoryStrategyType.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.storageengine.rescon.disk.strategy; +package org.apache.iotdb.commons.disk.strategy; public enum DirectoryStrategyType { SEQUENCE_STRATEGY, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/strategy/MaxDiskUsableSpaceFirstStrategy.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/MaxDiskUsableSpaceFirstStrategy.java similarity index 92% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/strategy/MaxDiskUsableSpaceFirstStrategy.java rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/MaxDiskUsableSpaceFirstStrategy.java index dccedaad207f..9158027b8a67 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/strategy/MaxDiskUsableSpaceFirstStrategy.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/MaxDiskUsableSpaceFirstStrategy.java @@ -16,10 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.storageengine.rescon.disk.strategy; +package org.apache.iotdb.commons.disk.strategy; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.utils.JVMCommonUtils; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; public class MaxDiskUsableSpaceFirstStrategy extends DirectoryStrategy { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java similarity index 93% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java index 70ceb70573e8..7b71bf8726f1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java @@ -16,10 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.storageengine.rescon.disk.strategy; +package org.apache.iotdb.commons.disk.strategy; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.utils.JVMCommonUtils; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import java.io.IOException; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/strategy/RandomOnDiskUsableSpaceStrategy.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/RandomOnDiskUsableSpaceStrategy.java similarity index 94% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/strategy/RandomOnDiskUsableSpaceStrategy.java rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/RandomOnDiskUsableSpaceStrategy.java index 30c5000396a1..28264f142a13 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/strategy/RandomOnDiskUsableSpaceStrategy.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/RandomOnDiskUsableSpaceStrategy.java @@ -16,10 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.storageengine.rescon.disk.strategy; +package org.apache.iotdb.commons.disk.strategy; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.utils.JVMCommonUtils; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import java.security.SecureRandom; import java.util.ArrayList; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/strategy/SequenceStrategy.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/SequenceStrategy.java similarity index 95% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/strategy/SequenceStrategy.java rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/SequenceStrategy.java index 881623a1882f..6b0394e0768d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/strategy/SequenceStrategy.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/SequenceStrategy.java @@ -16,10 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.storageengine.rescon.disk.strategy; +package org.apache.iotdb.commons.disk.strategy; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.utils.JVMCommonUtils; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.tsfile.fileSystem.FSFactoryProducer; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/DiskSpaceInsufficientException.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/DiskSpaceInsufficientException.java similarity index 91% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/DiskSpaceInsufficientException.java rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/DiskSpaceInsufficientException.java index 8d3c7a9ef30a..0a1863759054 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/DiskSpaceInsufficientException.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/DiskSpaceInsufficientException.java @@ -16,13 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.exception; +package org.apache.iotdb.commons.exception; import org.apache.iotdb.rpc.TSStatusCode; import java.util.List; -public class DiskSpaceInsufficientException extends StorageEngineException { +public class DiskSpaceInsufficientException extends IoTDBException { private static final long serialVersionUID = 9001643829368311032L; From 002bb00957bc1d32457f6c7400cd4bb6c37e5f35 Mon Sep 17 00:00:00 2001 From: 761417898 <761417898@qq.com> Date: Tue, 25 Nov 2025 11:58:24 +0800 Subject: [PATCH 2/7] move FolderManager to node-common module --- .../db/storageengine/dataregion/TTLTest.java | 4 +++- .../BatchedCompactionWithTsFileSplitterTest.java | 16 +++++++++++----- ...tionWithReadPointPerformerValidationTest.java | 4 +++- .../utils/MultiTsFileDeviceIteratorTest.java | 7 +++++-- 4 files changed, 22 insertions(+), 9 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TTLTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TTLTest.java index afd7ed608ead..6833dcbee5cf 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TTLTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TTLTest.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.NonAlignedFullPath; @@ -327,7 +328,8 @@ public void testTTLRemoval() throws StorageEngineException, WriteProcessException, IllegalPathException, - InterruptedException { + InterruptedException, + DiskSpaceInsufficientException { boolean isEnableCrossCompaction = IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction(); IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java index 272d9e6ae5ca..d6efab2a940f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -89,7 +90,8 @@ public void testCompactionFlushChunk() InterruptedException, MetadataException, PageException, - LoadFileException { + LoadFileException, + DiskSpaceInsufficientException { TsFileResource seqResource1 = generateSingleAlignedSeriesFile( "d0", @@ -124,7 +126,8 @@ public void testCompactionFlushChunkAndSplitByTimePartition() InterruptedException, MetadataException, PageException, - LoadFileException { + LoadFileException, + DiskSpaceInsufficientException { TsFileResource seqResource1 = generateSingleAlignedSeriesFile( "d0", @@ -159,7 +162,8 @@ public void testCompactionFlushPage() InterruptedException, MetadataException, PageException, - LoadFileException { + LoadFileException, + DiskSpaceInsufficientException { TsFileResource seqResource1 = generateSingleAlignedSeriesFile( "d0", @@ -198,7 +202,8 @@ public void testCompactionFlushPageAndSplitByTimePartition() InterruptedException, MetadataException, PageException, - LoadFileException { + LoadFileException, + DiskSpaceInsufficientException { TsFileResource seqResource1 = generateSingleAlignedSeriesFile( "d0", @@ -237,7 +242,8 @@ private TsFileResource performCompaction() IOException, PageException, InterruptedException, - MetadataException { + MetadataException, + DiskSpaceInsufficientException { TsFileResource targetResource = TsFileNameGenerator.getInnerCompactionTargetFileResource(seqResources, true); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionWithReadPointPerformerValidationTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionWithReadPointPerformerValidationTest.java index b05ab1e633e3..7b0c88079d75 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionWithReadPointPerformerValidationTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionWithReadPointPerformerValidationTest.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.cross; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.AlignedFullPath; @@ -2101,7 +2102,8 @@ public void testSelectingFilesWhenSomeFilesBeingDeleted() WriteProcessException, StorageEngineException, InterruptedException, - MergeException { + MergeException, + DiskSpaceInsufficientException { registerTimeseriesInMManger(5, 10, true); createFiles(5, 10, 5, 1000, 0, 0, 100, 100, false, true); createFiles(1, 5, 10, 4500, 500, 500, 0, 100, false, false); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java index 1180247a2d3f..b5010e60449d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/MultiTsFileDeviceIteratorTest.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.utils; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.AlignedFullPath; import org.apache.iotdb.commons.path.IFullPath; @@ -412,7 +413,8 @@ public void getDeletedDevicesWithSameNameFromSeqFilesByReadChunkPerformer() IOException, WriteProcessException, StorageEngineException, - InterruptedException { + InterruptedException, + DiskSpaceInsufficientException { TSFileDescriptor.getInstance().getConfig().setMaxDegreeOfIndexNode(3); int oldAlignedDeviceOffset = TsFileGeneratorUtils.alignDeviceOffset; TsFileGeneratorUtils.alignDeviceOffset = 0; @@ -566,7 +568,8 @@ public void getDeletedDevicesWithSameNameFromSeqFilesByReadChunkPerformer2() IOException, WriteProcessException, StorageEngineException, - InterruptedException { + InterruptedException, + DiskSpaceInsufficientException { TSFileDescriptor.getInstance().getConfig().setMaxDegreeOfIndexNode(3); int oldAlignedDeviceOffset = TsFileGeneratorUtils.alignDeviceOffset; TsFileGeneratorUtils.alignDeviceOffset = 0; From 1fef8e49063f027d7f3c08a3d0689c20523191b0 Mon Sep 17 00:00:00 2001 From: 761417898 <761417898@qq.com> Date: Tue, 25 Nov 2025 17:00:18 +0800 Subject: [PATCH 3/7] Multi-directory snapshot support for IoTConsensus receiver --- .../consensus/config/ConsensusConfig.java | 14 ++++ .../iotdb/consensus/iot/IoTConsensus.java | 81 ++++++++++++------- .../iotdb/consensus/iot/ReplicateTest.java | 16 ++-- .../iotdb/consensus/iot/StabilityTest.java | 71 ++++++++++++++-- .../db/consensus/DataRegionConsensusImpl.java | 1 + 5 files changed, 137 insertions(+), 46 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java index dba53214abd7..b0ad71b8192c 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java @@ -29,6 +29,7 @@ public class ConsensusConfig { private final TEndPoint thisNodeEndPoint; private final int thisNodeId; private final String storageDir; + private final String[] storageDirs; private final TConsensusGroupType consensusGroupType; private final RatisConfig ratisConfig; private final IoTConsensusConfig iotConsensusConfig; @@ -38,6 +39,7 @@ private ConsensusConfig( TEndPoint thisNode, int thisNodeId, String storageDir, + String[] storageDirs, TConsensusGroupType consensusGroupType, RatisConfig ratisConfig, IoTConsensusConfig iotConsensusConfig, @@ -45,6 +47,7 @@ private ConsensusConfig( this.thisNodeEndPoint = thisNode; this.thisNodeId = thisNodeId; this.storageDir = storageDir; + this.storageDirs = storageDirs; this.consensusGroupType = consensusGroupType; this.ratisConfig = ratisConfig; this.iotConsensusConfig = iotConsensusConfig; @@ -63,6 +66,10 @@ public String getStorageDir() { return storageDir; } + public String[] getStorageDirs() { + return storageDirs; + } + public TConsensusGroupType getConsensusGroupType() { return consensusGroupType; } @@ -88,6 +95,7 @@ public static class Builder { private TEndPoint thisNode; private int thisNodeId; private String storageDir; + private String[] storageDirs; private TConsensusGroupType consensusGroupType; private RatisConfig ratisConfig; private IoTConsensusConfig iotConsensusConfig; @@ -98,6 +106,7 @@ public ConsensusConfig build() { thisNode, thisNodeId, storageDir, + storageDirs, consensusGroupType, Optional.ofNullable(ratisConfig).orElseGet(() -> RatisConfig.newBuilder().build()), Optional.ofNullable(iotConsensusConfig) @@ -121,6 +130,11 @@ public Builder setStorageDir(String storageDir) { return this; } + public Builder setStorageDirs(String[] storageDirs) { + this.storageDirs = storageDirs; + return this; + } + public Builder setConsensusGroupType(TConsensusGroupType groupType) { this.consensusGroupType = groupType; return this; diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index 959191ca2d6d..9570dde1d7e3 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java @@ -26,6 +26,9 @@ import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.consensus.ConsensusGroupId; +import org.apache.iotdb.commons.disk.FolderManager; +import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.service.RegisterManager; import org.apache.iotdb.commons.utils.FileUtils; @@ -71,6 +74,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -91,7 +95,7 @@ public class IoTConsensus implements IConsensus { private final TEndPoint thisNode; private final int thisNodeId; - private final File storageDir; + FolderManager folderManager = null; private final IStateMachine.Registry registry; private final Map stateMachineMap = new ConcurrentHashMap<>(); @@ -104,10 +108,13 @@ public class IoTConsensus implements IConsensus { private Future updateReaderFuture; private Map> correctPeerListBeforeStart = null; - public IoTConsensus(ConsensusConfig config, Registry registry) { + public IoTConsensus(ConsensusConfig config, Registry registry) + throws DiskSpaceInsufficientException { this.thisNode = config.getThisNodeEndPoint(); this.thisNodeId = config.getThisNodeId(); - this.storageDir = new File(config.getStorageDir()); + this.folderManager = + new FolderManager( + Arrays.asList(config.getStorageDirs()), DirectoryStrategyType.SEQUENCE_STRATEGY); this.config = config.getIotConsensusConfig(); this.registry = registry; this.service = new IoTConsensusRPCService(thisNode, config.getIotConsensusConfig()); @@ -160,31 +167,35 @@ public synchronized void start() throws IOException { } private void initAndRecover() throws IOException { - if (!storageDir.exists()) { - if (!storageDir.mkdirs()) { - throw new IOException(String.format("Unable to create consensus dir at %s", storageDir)); - } - } else { - try (DirectoryStream stream = Files.newDirectoryStream(storageDir.toPath())) { - for (Path path : stream) { - String[] items = path.getFileName().toString().split("_"); - ConsensusGroupId consensusGroupId = - ConsensusGroupId.Factory.create( - Integer.parseInt(items[0]), Integer.parseInt(items[1])); - IoTConsensusServerImpl consensus = - new IoTConsensusServerImpl( - path.toString(), - new Peer(consensusGroupId, thisNodeId, thisNode), - new TreeSet<>(), - registry.apply(consensusGroupId), - backgroundTaskService, - clientManager, - syncClientManager, - config); - stateMachineMap.put(consensusGroupId, consensus); + for (String folder : folderManager.getFolders()) { + File storageDir = new File(folder); + if (!storageDir.exists()) { + if (!storageDir.mkdirs()) { + throw new IOException(String.format("Unable to create consensus dir at %s", storageDir)); + } + } else { + try (DirectoryStream stream = Files.newDirectoryStream(storageDir.toPath())) { + for (Path path : stream) { + String[] items = path.getFileName().toString().split("_"); + ConsensusGroupId consensusGroupId = + ConsensusGroupId.Factory.create( + Integer.parseInt(items[0]), Integer.parseInt(items[1])); + IoTConsensusServerImpl consensus = + new IoTConsensusServerImpl( + path.toString(), + new Peer(consensusGroupId, thisNodeId, thisNode), + new TreeSet<>(), + registry.apply(consensusGroupId), + backgroundTaskService, + clientManager, + syncClientManager, + config); + stateMachineMap.put(consensusGroupId, consensus); + } } } } + if (correctPeerListBeforeStart != null) { BiConsumer> resetPeerListWithoutThrow = (consensusGroupId, peers) -> { @@ -271,8 +282,18 @@ public void createLocalPeer(ConsensusGroupId groupId, List peers) k -> { exist.set(false); - String path = buildPeerDir(storageDir, groupId); + String path = null; + try { + path = buildPeerDir(folderManager.getNextFolder(), groupId); + } catch (DiskSpaceInsufficientException e) { + logger.warn( + "Failed to create consensus directory for group {} due to disk space insufficiency: {}", + groupId, + e.getMessage()); + return null; + } File file = new File(path); + System.out.println(file.getAbsolutePath()); if (!file.mkdirs()) { logger.warn("Unable to create consensus dir for group {} at {}", groupId, path); return null; @@ -315,7 +336,9 @@ public void deleteLocalPeer(ConsensusGroupId groupId) throws ConsensusException if (!exist.get()) { throw new ConsensusGroupNotExistException(groupId); } - FileUtils.deleteFileOrDirectory(new File(buildPeerDir(storageDir, groupId))); + for (String folder : folderManager.getFolders()) { + FileUtils.deleteFileOrDirectory(new File(buildPeerDir(folder, groupId))); + } KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.AFTER_DELETE); } @@ -465,7 +488,7 @@ public List getAllConsensusGroupIds() { @Override public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) { - return buildPeerDir(storageDir, groupId); + return null; } @Override @@ -552,7 +575,7 @@ public IoTConsensusServerImpl getImpl(ConsensusGroupId groupId) { return stateMachineMap.get(groupId); } - public static String buildPeerDir(File storageDir, ConsensusGroupId groupId) { + public static String buildPeerDir(String storageDir, ConsensusGroupId groupId) { return storageDir + File.separator + groupId.getType().getValue() + "_" + groupId.getId(); } } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java index f22d3fdadcd9..40669a361920 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java @@ -71,6 +71,12 @@ public class ReplicateTest { new File("target" + File.separator + "2"), new File("target" + File.separator + "3")); + private final String[][] storageDirs = { + {"target" + File.separator + "1"}, + {"target" + File.separator + "2"}, + {"target" + File.separator + "3"} + }; + private final ConsensusGroup group = new ConsensusGroup(gid, peers); private final List servers = new ArrayList<>(); private final List stateMachines = new ArrayList<>(); @@ -104,7 +110,7 @@ private void initServer() throws IOException { ConsensusConfig.newBuilder() .setThisNodeId(peers.get(i).getNodeId()) .setThisNode(peers.get(i).getEndpoint()) - .setStorageDir(peersStorage.get(i).getAbsolutePath()) + .setStorageDirs(storageDirs[i]) .setConsensusGroupType(TConsensusGroupType.DataRegion) .build(), groupId -> stateMachines.get(finalI)) @@ -293,14 +299,6 @@ public void parsingAndConstructIDTest() throws Exception { for (int i = 0; i < CHECK_POINT_GAP; i++) { servers.get(0).write(gid, new TestEntry(i, peers.get(0))); } - - String regionDir = servers.get(0).getRegionDirFromConsensusGroupId(gid); - try { - File regionDirFile = new File(regionDir); - Assert.assertTrue(regionDirFile.exists()); - } catch (Exception e) { - Assert.fail(); - } } private boolean checkPortAvailable() { diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java index 5147632431f2..7c79ab33b4a0 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java @@ -23,6 +23,9 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.disk.FolderManager; +import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.consensus.common.Peer; @@ -58,8 +61,11 @@ public class StabilityTest { private final ConsensusGroupId dataRegionId = new DataRegionId(1); - private final File storageDir = new File("target" + java.io.File.separator + "stability"); - + private final String[] folders = { + "target" + File.separator + "stability1", + "target" + File.separator + "stability2", + "target" + File.separator + "stability3" + }; private IoTConsensus consensusImpl; private final int basePort = 6667; @@ -72,7 +78,7 @@ public void constructConsensus() throws IOException { ConsensusConfig.newBuilder() .setThisNodeId(1) .setThisNode(new TEndPoint("0.0.0.0", basePort)) - .setStorageDir(storageDir.getAbsolutePath()) + .setStorageDirs(folders) .setConsensusGroupType(TConsensusGroupType.DataRegion) .build(), gid -> new TestStateMachine()) @@ -87,14 +93,24 @@ public void constructConsensus() throws IOException { @Before public void setUp() throws Exception { - FileUtils.deleteFully(storageDir); + for (String folder : folders) { + File dir = new File(folder); + if (dir.exists()) { + FileUtils.deleteFully(dir); + } + } constructConsensus(); } @After public void tearDown() throws IOException { consensusImpl.stop(); - FileUtils.deleteFully(storageDir); + for (String folder : folders) { + File dir = new File(folder); + if (dir.exists()) { + FileUtils.deleteFully(dir); + } + } } @Test @@ -104,6 +120,7 @@ public void allTest() throws Exception { peerTest(); transferLeader(); snapshotTest(); + multiRegionSnapshotTest(); } public void addConsensusGroup() { @@ -202,10 +219,19 @@ public void snapshotTest() throws ConsensusException { Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", basePort)))); consensusImpl.triggerSnapshot(dataRegionId, false); - File dataDir = new File(IoTConsensus.buildPeerDir(storageDir, dataRegionId)); + File dataDir = null; + File[] versionFiles1 = null; + for (String folder : folders) { + dataDir = new File(IoTConsensus.buildPeerDir(folder, dataRegionId)); + if ((versionFiles1 = + dataDir.listFiles( + (dir, name) -> name.startsWith(IoTConsensusServerImpl.SNAPSHOT_DIR_NAME))) + != null + && versionFiles1.length > 0) { + break; + } + } - File[] versionFiles1 = - dataDir.listFiles((dir, name) -> name.startsWith(IoTConsensusServerImpl.SNAPSHOT_DIR_NAME)); Assert.assertNotNull(versionFiles1); Assert.assertEquals(1, versionFiles1.length); @@ -221,6 +247,35 @@ public void snapshotTest() throws ConsensusException { consensusImpl.deleteLocalPeer(dataRegionId); } + public void multiRegionSnapshotTest() throws ConsensusException, DiskSpaceInsufficientException { + consensusImpl.folderManager = + new FolderManager( + consensusImpl.folderManager.getFolders(), DirectoryStrategyType.SEQUENCE_STRATEGY); + ConsensusGroupId[] dataRegionIds = new ConsensusGroupId[folders.length]; + for (int i = 0; i < folders.length; i++) { + dataRegionIds[i] = new DataRegionId(i + 100); + consensusImpl.createLocalPeer( + dataRegionIds[i], + Collections.singletonList( + new Peer(dataRegionIds[i], 1, new TEndPoint("0.0.0.0", basePort)))); + consensusImpl.triggerSnapshot(dataRegionIds[i], true); + } + + for (int i = 0; i < folders.length; i++) { + File dataDir = new File(IoTConsensus.buildPeerDir(folders[i], dataRegionIds[i])); + System.out.println(dataDir.getAbsolutePath()); + File[] versionFiles1 = + dataDir.listFiles( + (dir, name) -> name.startsWith(IoTConsensusServerImpl.SNAPSHOT_DIR_NAME)); + Assert.assertNotNull(versionFiles1); + Assert.assertEquals(1, versionFiles1.length); + } + + for (int i = 0; i < folders.length; i++) { + consensusImpl.deleteLocalPeer(dataRegionIds[i]); + } + } + @Test public void recordAndResetPeerListTest() throws Exception { try { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java index 2649f8de7af4..193702193c65 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java @@ -125,6 +125,7 @@ private static ConsensusConfig buildConsensusConfig() { .setThisNodeId(CONF.getDataNodeId()) .setThisNode(new TEndPoint(CONF.getInternalAddress(), CONF.getDataRegionConsensusPort())) .setStorageDir(CONF.getDataRegionConsensusDir()) + .setStorageDirs(CONF.getDataDirs()) .setConsensusGroupType(TConsensusGroupType.DataRegion) .setIoTConsensusConfig( IoTConsensusConfig.newBuilder() From f65cd2a200d37442b0ecf7ff11e559837284e8d8 Mon Sep 17 00:00:00 2001 From: 761417898 <761417898@qq.com> Date: Thu, 4 Dec 2025 16:03:00 +0800 Subject: [PATCH 4/7] Revert "Multi-directory snapshot support for IoTConsensus receiver" This reverts commit 1fef8e49063f027d7f3c08a3d0689c20523191b0. --- .../consensus/config/ConsensusConfig.java | 14 ---- .../iotdb/consensus/iot/IoTConsensus.java | 81 +++++++------------ .../iotdb/consensus/iot/ReplicateTest.java | 16 ++-- .../iotdb/consensus/iot/StabilityTest.java | 71 ++-------------- .../db/consensus/DataRegionConsensusImpl.java | 1 - 5 files changed, 46 insertions(+), 137 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java index b0ad71b8192c..dba53214abd7 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java @@ -29,7 +29,6 @@ public class ConsensusConfig { private final TEndPoint thisNodeEndPoint; private final int thisNodeId; private final String storageDir; - private final String[] storageDirs; private final TConsensusGroupType consensusGroupType; private final RatisConfig ratisConfig; private final IoTConsensusConfig iotConsensusConfig; @@ -39,7 +38,6 @@ private ConsensusConfig( TEndPoint thisNode, int thisNodeId, String storageDir, - String[] storageDirs, TConsensusGroupType consensusGroupType, RatisConfig ratisConfig, IoTConsensusConfig iotConsensusConfig, @@ -47,7 +45,6 @@ private ConsensusConfig( this.thisNodeEndPoint = thisNode; this.thisNodeId = thisNodeId; this.storageDir = storageDir; - this.storageDirs = storageDirs; this.consensusGroupType = consensusGroupType; this.ratisConfig = ratisConfig; this.iotConsensusConfig = iotConsensusConfig; @@ -66,10 +63,6 @@ public String getStorageDir() { return storageDir; } - public String[] getStorageDirs() { - return storageDirs; - } - public TConsensusGroupType getConsensusGroupType() { return consensusGroupType; } @@ -95,7 +88,6 @@ public static class Builder { private TEndPoint thisNode; private int thisNodeId; private String storageDir; - private String[] storageDirs; private TConsensusGroupType consensusGroupType; private RatisConfig ratisConfig; private IoTConsensusConfig iotConsensusConfig; @@ -106,7 +98,6 @@ public ConsensusConfig build() { thisNode, thisNodeId, storageDir, - storageDirs, consensusGroupType, Optional.ofNullable(ratisConfig).orElseGet(() -> RatisConfig.newBuilder().build()), Optional.ofNullable(iotConsensusConfig) @@ -130,11 +121,6 @@ public Builder setStorageDir(String storageDir) { return this; } - public Builder setStorageDirs(String[] storageDirs) { - this.storageDirs = storageDirs; - return this; - } - public Builder setConsensusGroupType(TConsensusGroupType groupType) { this.consensusGroupType = groupType; return this; diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index 9570dde1d7e3..959191ca2d6d 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java @@ -26,9 +26,6 @@ import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.consensus.ConsensusGroupId; -import org.apache.iotdb.commons.disk.FolderManager; -import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; -import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.service.RegisterManager; import org.apache.iotdb.commons.utils.FileUtils; @@ -74,7 +71,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -95,7 +91,7 @@ public class IoTConsensus implements IConsensus { private final TEndPoint thisNode; private final int thisNodeId; - FolderManager folderManager = null; + private final File storageDir; private final IStateMachine.Registry registry; private final Map stateMachineMap = new ConcurrentHashMap<>(); @@ -108,13 +104,10 @@ public class IoTConsensus implements IConsensus { private Future updateReaderFuture; private Map> correctPeerListBeforeStart = null; - public IoTConsensus(ConsensusConfig config, Registry registry) - throws DiskSpaceInsufficientException { + public IoTConsensus(ConsensusConfig config, Registry registry) { this.thisNode = config.getThisNodeEndPoint(); this.thisNodeId = config.getThisNodeId(); - this.folderManager = - new FolderManager( - Arrays.asList(config.getStorageDirs()), DirectoryStrategyType.SEQUENCE_STRATEGY); + this.storageDir = new File(config.getStorageDir()); this.config = config.getIotConsensusConfig(); this.registry = registry; this.service = new IoTConsensusRPCService(thisNode, config.getIotConsensusConfig()); @@ -167,35 +160,31 @@ public synchronized void start() throws IOException { } private void initAndRecover() throws IOException { - for (String folder : folderManager.getFolders()) { - File storageDir = new File(folder); - if (!storageDir.exists()) { - if (!storageDir.mkdirs()) { - throw new IOException(String.format("Unable to create consensus dir at %s", storageDir)); - } - } else { - try (DirectoryStream stream = Files.newDirectoryStream(storageDir.toPath())) { - for (Path path : stream) { - String[] items = path.getFileName().toString().split("_"); - ConsensusGroupId consensusGroupId = - ConsensusGroupId.Factory.create( - Integer.parseInt(items[0]), Integer.parseInt(items[1])); - IoTConsensusServerImpl consensus = - new IoTConsensusServerImpl( - path.toString(), - new Peer(consensusGroupId, thisNodeId, thisNode), - new TreeSet<>(), - registry.apply(consensusGroupId), - backgroundTaskService, - clientManager, - syncClientManager, - config); - stateMachineMap.put(consensusGroupId, consensus); - } + if (!storageDir.exists()) { + if (!storageDir.mkdirs()) { + throw new IOException(String.format("Unable to create consensus dir at %s", storageDir)); + } + } else { + try (DirectoryStream stream = Files.newDirectoryStream(storageDir.toPath())) { + for (Path path : stream) { + String[] items = path.getFileName().toString().split("_"); + ConsensusGroupId consensusGroupId = + ConsensusGroupId.Factory.create( + Integer.parseInt(items[0]), Integer.parseInt(items[1])); + IoTConsensusServerImpl consensus = + new IoTConsensusServerImpl( + path.toString(), + new Peer(consensusGroupId, thisNodeId, thisNode), + new TreeSet<>(), + registry.apply(consensusGroupId), + backgroundTaskService, + clientManager, + syncClientManager, + config); + stateMachineMap.put(consensusGroupId, consensus); } } } - if (correctPeerListBeforeStart != null) { BiConsumer> resetPeerListWithoutThrow = (consensusGroupId, peers) -> { @@ -282,18 +271,8 @@ public void createLocalPeer(ConsensusGroupId groupId, List peers) k -> { exist.set(false); - String path = null; - try { - path = buildPeerDir(folderManager.getNextFolder(), groupId); - } catch (DiskSpaceInsufficientException e) { - logger.warn( - "Failed to create consensus directory for group {} due to disk space insufficiency: {}", - groupId, - e.getMessage()); - return null; - } + String path = buildPeerDir(storageDir, groupId); File file = new File(path); - System.out.println(file.getAbsolutePath()); if (!file.mkdirs()) { logger.warn("Unable to create consensus dir for group {} at {}", groupId, path); return null; @@ -336,9 +315,7 @@ public void deleteLocalPeer(ConsensusGroupId groupId) throws ConsensusException if (!exist.get()) { throw new ConsensusGroupNotExistException(groupId); } - for (String folder : folderManager.getFolders()) { - FileUtils.deleteFileOrDirectory(new File(buildPeerDir(folder, groupId))); - } + FileUtils.deleteFileOrDirectory(new File(buildPeerDir(storageDir, groupId))); KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.AFTER_DELETE); } @@ -488,7 +465,7 @@ public List getAllConsensusGroupIds() { @Override public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) { - return null; + return buildPeerDir(storageDir, groupId); } @Override @@ -575,7 +552,7 @@ public IoTConsensusServerImpl getImpl(ConsensusGroupId groupId) { return stateMachineMap.get(groupId); } - public static String buildPeerDir(String storageDir, ConsensusGroupId groupId) { + public static String buildPeerDir(File storageDir, ConsensusGroupId groupId) { return storageDir + File.separator + groupId.getType().getValue() + "_" + groupId.getId(); } } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java index 40669a361920..f22d3fdadcd9 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java @@ -71,12 +71,6 @@ public class ReplicateTest { new File("target" + File.separator + "2"), new File("target" + File.separator + "3")); - private final String[][] storageDirs = { - {"target" + File.separator + "1"}, - {"target" + File.separator + "2"}, - {"target" + File.separator + "3"} - }; - private final ConsensusGroup group = new ConsensusGroup(gid, peers); private final List servers = new ArrayList<>(); private final List stateMachines = new ArrayList<>(); @@ -110,7 +104,7 @@ private void initServer() throws IOException { ConsensusConfig.newBuilder() .setThisNodeId(peers.get(i).getNodeId()) .setThisNode(peers.get(i).getEndpoint()) - .setStorageDirs(storageDirs[i]) + .setStorageDir(peersStorage.get(i).getAbsolutePath()) .setConsensusGroupType(TConsensusGroupType.DataRegion) .build(), groupId -> stateMachines.get(finalI)) @@ -299,6 +293,14 @@ public void parsingAndConstructIDTest() throws Exception { for (int i = 0; i < CHECK_POINT_GAP; i++) { servers.get(0).write(gid, new TestEntry(i, peers.get(0))); } + + String regionDir = servers.get(0).getRegionDirFromConsensusGroupId(gid); + try { + File regionDirFile = new File(regionDir); + Assert.assertTrue(regionDirFile.exists()); + } catch (Exception e) { + Assert.fail(); + } } private boolean checkPortAvailable() { diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java index 7c79ab33b4a0..5147632431f2 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java @@ -23,9 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.DataRegionId; -import org.apache.iotdb.commons.disk.FolderManager; -import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; -import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.consensus.common.Peer; @@ -61,11 +58,8 @@ public class StabilityTest { private final ConsensusGroupId dataRegionId = new DataRegionId(1); - private final String[] folders = { - "target" + File.separator + "stability1", - "target" + File.separator + "stability2", - "target" + File.separator + "stability3" - }; + private final File storageDir = new File("target" + java.io.File.separator + "stability"); + private IoTConsensus consensusImpl; private final int basePort = 6667; @@ -78,7 +72,7 @@ public void constructConsensus() throws IOException { ConsensusConfig.newBuilder() .setThisNodeId(1) .setThisNode(new TEndPoint("0.0.0.0", basePort)) - .setStorageDirs(folders) + .setStorageDir(storageDir.getAbsolutePath()) .setConsensusGroupType(TConsensusGroupType.DataRegion) .build(), gid -> new TestStateMachine()) @@ -93,24 +87,14 @@ public void constructConsensus() throws IOException { @Before public void setUp() throws Exception { - for (String folder : folders) { - File dir = new File(folder); - if (dir.exists()) { - FileUtils.deleteFully(dir); - } - } + FileUtils.deleteFully(storageDir); constructConsensus(); } @After public void tearDown() throws IOException { consensusImpl.stop(); - for (String folder : folders) { - File dir = new File(folder); - if (dir.exists()) { - FileUtils.deleteFully(dir); - } - } + FileUtils.deleteFully(storageDir); } @Test @@ -120,7 +104,6 @@ public void allTest() throws Exception { peerTest(); transferLeader(); snapshotTest(); - multiRegionSnapshotTest(); } public void addConsensusGroup() { @@ -219,19 +202,10 @@ public void snapshotTest() throws ConsensusException { Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", basePort)))); consensusImpl.triggerSnapshot(dataRegionId, false); - File dataDir = null; - File[] versionFiles1 = null; - for (String folder : folders) { - dataDir = new File(IoTConsensus.buildPeerDir(folder, dataRegionId)); - if ((versionFiles1 = - dataDir.listFiles( - (dir, name) -> name.startsWith(IoTConsensusServerImpl.SNAPSHOT_DIR_NAME))) - != null - && versionFiles1.length > 0) { - break; - } - } + File dataDir = new File(IoTConsensus.buildPeerDir(storageDir, dataRegionId)); + File[] versionFiles1 = + dataDir.listFiles((dir, name) -> name.startsWith(IoTConsensusServerImpl.SNAPSHOT_DIR_NAME)); Assert.assertNotNull(versionFiles1); Assert.assertEquals(1, versionFiles1.length); @@ -247,35 +221,6 @@ public void snapshotTest() throws ConsensusException { consensusImpl.deleteLocalPeer(dataRegionId); } - public void multiRegionSnapshotTest() throws ConsensusException, DiskSpaceInsufficientException { - consensusImpl.folderManager = - new FolderManager( - consensusImpl.folderManager.getFolders(), DirectoryStrategyType.SEQUENCE_STRATEGY); - ConsensusGroupId[] dataRegionIds = new ConsensusGroupId[folders.length]; - for (int i = 0; i < folders.length; i++) { - dataRegionIds[i] = new DataRegionId(i + 100); - consensusImpl.createLocalPeer( - dataRegionIds[i], - Collections.singletonList( - new Peer(dataRegionIds[i], 1, new TEndPoint("0.0.0.0", basePort)))); - consensusImpl.triggerSnapshot(dataRegionIds[i], true); - } - - for (int i = 0; i < folders.length; i++) { - File dataDir = new File(IoTConsensus.buildPeerDir(folders[i], dataRegionIds[i])); - System.out.println(dataDir.getAbsolutePath()); - File[] versionFiles1 = - dataDir.listFiles( - (dir, name) -> name.startsWith(IoTConsensusServerImpl.SNAPSHOT_DIR_NAME)); - Assert.assertNotNull(versionFiles1); - Assert.assertEquals(1, versionFiles1.length); - } - - for (int i = 0; i < folders.length; i++) { - consensusImpl.deleteLocalPeer(dataRegionIds[i]); - } - } - @Test public void recordAndResetPeerListTest() throws Exception { try { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java index 193702193c65..2649f8de7af4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java @@ -125,7 +125,6 @@ private static ConsensusConfig buildConsensusConfig() { .setThisNodeId(CONF.getDataNodeId()) .setThisNode(new TEndPoint(CONF.getInternalAddress(), CONF.getDataRegionConsensusPort())) .setStorageDir(CONF.getDataRegionConsensusDir()) - .setStorageDirs(CONF.getDataDirs()) .setConsensusGroupType(TConsensusGroupType.DataRegion) .setIoTConsensusConfig( IoTConsensusConfig.newBuilder() From b779a1ed252778fdd2bb03471a956efa08d248ee Mon Sep 17 00:00:00 2001 From: 761417898 <761417898@qq.com> Date: Thu, 4 Dec 2025 17:47:55 +0800 Subject: [PATCH 5/7] IoTConsensus receiver snapshot multi dirs --- .../consensus/config/ConsensusConfig.java | 15 ++++ .../iotdb/consensus/iot/IoTConsensus.java | 32 +++++--- .../consensus/iot/IoTConsensusServerImpl.java | 79 +++++++++++++------ .../iotdb/consensus/iot/ReplicateTest.java | 27 +++++++ .../iotdb/consensus/iot/StabilityTest.java | 8 ++ .../db/consensus/DataRegionConsensusImpl.java | 2 + 6 files changed, 129 insertions(+), 34 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java index dba53214abd7..0e230d172bdb 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import java.util.List; import java.util.Optional; public class ConsensusConfig { @@ -29,6 +30,7 @@ public class ConsensusConfig { private final TEndPoint thisNodeEndPoint; private final int thisNodeId; private final String storageDir; + private final List recvSnapshotDirs; private final TConsensusGroupType consensusGroupType; private final RatisConfig ratisConfig; private final IoTConsensusConfig iotConsensusConfig; @@ -38,6 +40,7 @@ private ConsensusConfig( TEndPoint thisNode, int thisNodeId, String storageDir, + List recvSnapshotDirs, TConsensusGroupType consensusGroupType, RatisConfig ratisConfig, IoTConsensusConfig iotConsensusConfig, @@ -45,6 +48,7 @@ private ConsensusConfig( this.thisNodeEndPoint = thisNode; this.thisNodeId = thisNodeId; this.storageDir = storageDir; + this.recvSnapshotDirs = recvSnapshotDirs; this.consensusGroupType = consensusGroupType; this.ratisConfig = ratisConfig; this.iotConsensusConfig = iotConsensusConfig; @@ -63,6 +67,10 @@ public String getStorageDir() { return storageDir; } + public List getRecvSnapshotDirs() { + return recvSnapshotDirs; + } + public TConsensusGroupType getConsensusGroupType() { return consensusGroupType; } @@ -88,6 +96,7 @@ public static class Builder { private TEndPoint thisNode; private int thisNodeId; private String storageDir; + private List recvSnapshotDirs; private TConsensusGroupType consensusGroupType; private RatisConfig ratisConfig; private IoTConsensusConfig iotConsensusConfig; @@ -98,6 +107,7 @@ public ConsensusConfig build() { thisNode, thisNodeId, storageDir, + recvSnapshotDirs, consensusGroupType, Optional.ofNullable(ratisConfig).orElseGet(() -> RatisConfig.newBuilder().build()), Optional.ofNullable(iotConsensusConfig) @@ -121,6 +131,11 @@ public Builder setStorageDir(String storageDir) { return this; } + public Builder setRecvSnapshotDirs(List recvSnapshotDirs) { + this.recvSnapshotDirs = recvSnapshotDirs; + return this; + } + public Builder setConsensusGroupType(TConsensusGroupType groupType) { this.consensusGroupType = groupType; return this; diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index 959191ca2d6d..7bf7fdd93a14 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.consensus.ConsensusGroupId; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.service.RegisterManager; import org.apache.iotdb.commons.utils.FileUtils; @@ -92,6 +93,7 @@ public class IoTConsensus implements IConsensus { private final TEndPoint thisNode; private final int thisNodeId; private final File storageDir; + private final List recvSnapshotDirs; private final IStateMachine.Registry registry; private final Map stateMachineMap = new ConcurrentHashMap<>(); @@ -108,6 +110,7 @@ public IoTConsensus(ConsensusConfig config, Registry registry) { this.thisNode = config.getThisNodeEndPoint(); this.thisNodeId = config.getThisNodeId(); this.storageDir = new File(config.getStorageDir()); + this.recvSnapshotDirs = config.getRecvSnapshotDirs(); this.config = config.getIotConsensusConfig(); this.registry = registry; this.service = new IoTConsensusRPCService(thisNode, config.getIotConsensusConfig()); @@ -174,6 +177,7 @@ private void initAndRecover() throws IOException { IoTConsensusServerImpl consensus = new IoTConsensusServerImpl( path.toString(), + recvSnapshotDirs, new Peer(consensusGroupId, thisNodeId, thisNode), new TreeSet<>(), registry.apply(consensusGroupId), @@ -183,6 +187,8 @@ private void initAndRecover() throws IOException { config); stateMachineMap.put(consensusGroupId, consensus); } + } catch (DiskSpaceInsufficientException e) { + throw new IOException(e); } } if (correctPeerListBeforeStart != null) { @@ -278,16 +284,22 @@ public void createLocalPeer(ConsensusGroupId groupId, List peers) return null; } - IoTConsensusServerImpl impl = - new IoTConsensusServerImpl( - path, - new Peer(groupId, thisNodeId, thisNode), - new TreeSet<>(peers), - registry.apply(groupId), - backgroundTaskService, - clientManager, - syncClientManager, - config); + IoTConsensusServerImpl impl = null; + try { + impl = + new IoTConsensusServerImpl( + path, + recvSnapshotDirs, + new Peer(groupId, thisNodeId, thisNode), + new TreeSet<>(peers), + registry.apply(groupId), + backgroundTaskService, + clientManager, + syncClientManager, + config); + } catch (DiskSpaceInsufficientException e) { + throw new RuntimeException(e); + } impl.start(); return impl; })) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index 567261efffff..f9b9ab17edf6 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -26,6 +26,9 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest; import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex; +import org.apache.iotdb.commons.disk.FolderManager; +import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; @@ -113,6 +116,7 @@ public class IoTConsensusServerImpl { private final Lock stateMachineLock = new ReentrantLock(); private final Condition stateMachineCondition = stateMachineLock.newCondition(); private final String storageDir; + private FolderManager recvFolderManager = null; private final TreeSet configuration; private final AtomicLong searchIndex; private final LogDispatcher logDispatcher; @@ -130,15 +134,29 @@ public class IoTConsensusServerImpl { public IoTConsensusServerImpl( String storageDir, + List recvSnapshotDirs, Peer thisNode, TreeSet configuration, IStateMachine stateMachine, ScheduledExecutorService backgroundTaskService, IClientManager clientManager, IClientManager syncClientManager, - IoTConsensusConfig config) { + IoTConsensusConfig config) + throws DiskSpaceInsufficientException { this.active = true; this.storageDir = storageDir; + List snapshotDirs = new ArrayList<>(); + if (recvSnapshotDirs != null) { + for (String dir : recvSnapshotDirs) { + snapshotDirs.add(dir + File.separator + SNAPSHOT_DIR_NAME); + } + } else { + snapshotDirs.add(storageDir); + } + + this.recvFolderManager = + new FolderManager( + snapshotDirs, DirectoryStrategyType.MIN_FOLDER_OCCUPIED_SPACE_FIRST_STRATEGY); this.thisNode = thisNode; this.stateMachine = stateMachine; this.cacheQueueMap = new ConcurrentHashMap<>(); @@ -361,18 +379,22 @@ public void receiveSnapshotFragment( throws ConsensusGroupModifyPeerException { try { String targetFilePath = calculateSnapshotPath(snapshotId, originalFilePath); - File targetFile = getSnapshotPath(targetFilePath); - Path parentDir = Paths.get(targetFile.getParent()); - if (!Files.exists(parentDir)) { - Files.createDirectories(parentDir); - } - try (FileOutputStream fos = new FileOutputStream(targetFile.getAbsolutePath(), true); - FileChannel channel = fos.getChannel()) { - channel.write(fileChunk.slice(), fileOffset); - } - } catch (IOException e) { + recvFolderManager.getNextWithRetry( + folder -> { + File targetFile = getSnapshotPath(folder, targetFilePath); + Path parentDir = Paths.get(targetFile.getParent()); + if (!Files.exists(parentDir)) { + Files.createDirectories(parentDir); + } + try (FileOutputStream fos = new FileOutputStream(targetFile.getAbsolutePath(), true); + FileChannel channel = fos.getChannel()) { + channel.write(fileChunk.slice(), fileOffset); + } + return null; + }); + } catch (DiskSpaceInsufficientException e) { throw new ConsensusGroupModifyPeerException( - String.format("error when receiving snapshot %s", snapshotId), e); + String.format("Error when receiving snapshot %s", snapshotId), e); } } @@ -408,12 +430,17 @@ private void clearOldSnapshot() { public void loadSnapshot(String snapshotId) { // TODO: (xingtanzjr) throw exception if the snapshot load failed - stateMachine.loadSnapshot(getSnapshotPath(snapshotId)); + recvFolderManager + .getFolders() + .forEach( + dir -> { + stateMachine.loadSnapshot(getSnapshotPath(dir, snapshotId)); + }); } - private File getSnapshotPath(String snapshotRelativePath) { - File storageDirFile = new File(storageDir); - File snapshotDir = new File(storageDir, snapshotRelativePath); + private File getSnapshotPath(String curStorageDir, String snapshotRelativePath) { + File storageDirFile = new File(curStorageDir); + File snapshotDir = new File(curStorageDir, snapshotRelativePath); try { if (!snapshotDir .getCanonicalFile() @@ -839,15 +866,19 @@ public void cleanupRemoteSnapshot(Peer targetPeer) throws ConsensusGroupModifyPe } public void cleanupSnapshot(String snapshotId) throws ConsensusGroupModifyPeerException { - File snapshotDir = getSnapshotPath(snapshotId); - if (snapshotDir.exists()) { - try { - FileUtils.deleteDirectory(snapshotDir); - } catch (IOException e) { - throw new ConsensusGroupModifyPeerException(e); + List allDirs = new ArrayList<>(Collections.singletonList(storageDir)); + allDirs.addAll(recvFolderManager.getFolders()); + for (String dir : allDirs) { + File snapshotDir = getSnapshotPath(dir, snapshotId); + if (snapshotDir.exists()) { + try { + FileUtils.deleteDirectory(snapshotDir); + } catch (IOException e) { + throw new ConsensusGroupModifyPeerException(e); + } + } else { + logger.info("File not exist: {}", snapshotDir); } - } else { - logger.info("File not exist: {}", snapshotDir); } } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java index f22d3fdadcd9..ca96ffec4859 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java @@ -71,6 +71,21 @@ public class ReplicateTest { new File("target" + File.separator + "2"), new File("target" + File.separator + "3")); + private final List> peersRecvSnapshotDirs = + Arrays.asList( + Arrays.asList( + "target" + File.separator + "1-1", + "target" + File.separator + "1-2", + "target" + File.separator + "1-3"), + Arrays.asList( + "target" + File.separator + "2-1", + "target" + File.separator + "2-2", + "target" + File.separator + "2-3"), + Arrays.asList( + "target" + File.separator + "3-1", + "target" + File.separator + "3-2", + "target" + File.separator + "3-3")); + private final ConsensusGroup group = new ConsensusGroup(gid, peers); private final List servers = new ArrayList<>(); private final List stateMachines = new ArrayList<>(); @@ -81,6 +96,7 @@ public void setUp() throws Exception { file.mkdirs(); stateMachines.add(new TestStateMachine()); } + peersRecvSnapshotDirs.forEach(innerList -> innerList.forEach(dir -> new File(dir).mkdirs())); initServer(); } @@ -90,6 +106,16 @@ public void tearDown() throws Exception { for (File file : peersStorage) { FileUtils.deleteFully(file); } + peersRecvSnapshotDirs.forEach( + innerList -> + innerList.forEach( + dir -> { + try { + FileUtils.deleteFully(new File(dir)); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); } private void initServer() throws IOException { @@ -105,6 +131,7 @@ private void initServer() throws IOException { .setThisNodeId(peers.get(i).getNodeId()) .setThisNode(peers.get(i).getEndpoint()) .setStorageDir(peersStorage.get(i).getAbsolutePath()) + .setRecvSnapshotDirs(peersRecvSnapshotDirs.get(i)) .setConsensusGroupType(TConsensusGroupType.DataRegion) .build(), groupId -> stateMachines.get(finalI)) diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java index 5147632431f2..f456b070124c 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java @@ -45,6 +45,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -60,6 +61,12 @@ public class StabilityTest { private final File storageDir = new File("target" + java.io.File.separator + "stability"); + private final List recvSnapshotDirs = + Arrays.asList( + "target" + File.separator + "1-1", + "target" + File.separator + "1-2", + "target" + File.separator + "1-3"); + private IoTConsensus consensusImpl; private final int basePort = 6667; @@ -73,6 +80,7 @@ public void constructConsensus() throws IOException { .setThisNodeId(1) .setThisNode(new TEndPoint("0.0.0.0", basePort)) .setStorageDir(storageDir.getAbsolutePath()) + .setRecvSnapshotDirs(recvSnapshotDirs) .setConsensusGroupType(TConsensusGroupType.DataRegion) .build(), gid -> new TestStateMachine()) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java index 2649f8de7af4..85ffd32c10f2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java @@ -53,6 +53,7 @@ import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TimeDuration; +import java.util.Arrays; import java.util.concurrent.TimeUnit; /** @@ -125,6 +126,7 @@ private static ConsensusConfig buildConsensusConfig() { .setThisNodeId(CONF.getDataNodeId()) .setThisNode(new TEndPoint(CONF.getInternalAddress(), CONF.getDataRegionConsensusPort())) .setStorageDir(CONF.getDataRegionConsensusDir()) + .setRecvSnapshotDirs(Arrays.asList(CONF.getDataDirs())) .setConsensusGroupType(TConsensusGroupType.DataRegion) .setIoTConsensusConfig( IoTConsensusConfig.newBuilder() From 7f86ba647a18442af1d678f748bd64b7af6fbfd5 Mon Sep 17 00:00:00 2001 From: 761417898 <761417898@qq.com> Date: Thu, 4 Dec 2025 17:56:03 +0800 Subject: [PATCH 6/7] fix imports --- .../receiver/protocol/thrift/IoTDBDataNodeReceiver.java | 6 +++--- .../apache/iotdb/db/storageengine/load/util/LoadUtil.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index bedd7fbe96a5..bcbce833e0f8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -23,6 +23,9 @@ import org.apache.iotdb.commons.audit.IAuditEntity; import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.disk.FolderManager; +import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.path.PartialPath; @@ -46,7 +49,6 @@ import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent; import org.apache.iotdb.db.pipe.metric.receiver.PipeDataNodeReceiverMetrics; @@ -102,8 +104,6 @@ import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper; import org.apache.iotdb.db.storageengine.load.util.LoadUtil; -import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType; import org.apache.iotdb.db.tools.schema.SRStatementGenerator; import org.apache.iotdb.db.tools.schema.SchemaRegionSnapshotParser; import org.apache.iotdb.db.utils.DataNodeAuthUtils; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/util/LoadUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/util/LoadUtil.java index a3d29337b865..0eddfa08e977 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/util/LoadUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/util/LoadUtil.java @@ -19,16 +19,16 @@ package org.apache.iotdb.db.storageengine.load.util; +import org.apache.iotdb.commons.disk.FolderManager; +import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper; import org.apache.iotdb.db.storageengine.load.disk.ILoadDiskSelector; -import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; -import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From de6b65d04b07b7b08340fd4df1c26fecb5584bd8 Mon Sep 17 00:00:00 2001 From: 761417898 <761417898@qq.com> Date: Thu, 4 Dec 2025 18:32:52 +0800 Subject: [PATCH 7/7] Add keep same disk when loading snapshot --- .../org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++ .../apache/iotdb/db/conf/IoTDBDescriptor.java | 6 + .../dataregion/snapshot/SnapshotLoader.java | 99 +++++++++------- .../snapshot/IoTDBSnapshotTest.java | 111 ++++++++++++++++-- .../conf/iotdb-system.properties.template | 6 + .../iotdb/commons/disk/FolderManager.java | 22 ++++ 6 files changed, 199 insertions(+), 55 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 81a8aa4f50a3..67498253d655 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1185,6 +1185,8 @@ public class IoTDBConfig { private boolean includeNullValueInWriteThroughputMetric = false; + private boolean keepSameDiskWhenLoadingSnapshot = false; + private ConcurrentHashMap tsFileDBToEncryptMap = new ConcurrentHashMap<>( Collections.singletonMap("root.__audit", new EncryptParameter("UNENCRYPTED", null))); @@ -4257,6 +4259,14 @@ public void setPasswordLockTimeMinutes(int passwordLockTimeMinutes) { this.passwordLockTimeMinutes = passwordLockTimeMinutes; } + public boolean isKeepSameDiskWhenLoadingSnapshot() { + return keepSameDiskWhenLoadingSnapshot; + } + + public void setKeepSameDiskWhenLoadingSnapshot(boolean keepSameDiskWhenLoadingSnapshot) { + this.keepSameDiskWhenLoadingSnapshot = keepSameDiskWhenLoadingSnapshot; + } + public ConcurrentHashMap getTSFileDBToEncryptMap() { return tsFileDBToEncryptMap; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 3c9e8159d0ff..41131b5a6c40 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -1173,6 +1173,12 @@ private void loadIoTConsensusProps(TrimProperties properties) throws IOException "region_migration_speed_limit_bytes_per_second", ConfigurationFileUtils.getConfigurationDefaultValue( "region_migration_speed_limit_bytes_per_second")))); + conf.setKeepSameDiskWhenLoadingSnapshot( + Boolean.parseBoolean( + properties.getProperty( + "keep_same_disk_when_loading_snapshot", + ConfigurationFileUtils.getConfigurationDefaultValue( + "keep_same_disk_when_loading_snapshot")))); } private void loadIoTConsensusV2Props(TrimProperties properties) throws IOException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java index 9f977199f89b..f486479a1288 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java @@ -314,56 +314,69 @@ private void createLinksFromSnapshotDirToDataDirWithoutLog(File sourceDir) } } + private File createLinksFromSnapshotToSourceDir( + String targetSuffix, + File file, + Map fileTarget, + String fileKey, + String finalDir) + throws IOException { + File targetFile = + new File(finalDir + File.separator + targetSuffix + File.separator + file.getName()); + + try { + if (!targetFile.getParentFile().exists() && !targetFile.getParentFile().mkdirs()) { + throw new IOException( + String.format( + "Cannot create directory %s", targetFile.getParentFile().getAbsolutePath())); + } + + try { + Files.createLink(targetFile.toPath(), file.toPath()); + LOGGER.debug("Created hard link from {} to {}", file, targetFile); + fileTarget.put(fileKey, finalDir); + return targetFile; + } catch (IOException e) { + LOGGER.info("Cannot create link from {} to {}, fallback to copy", file, targetFile); + } + + Files.copy(file.toPath(), targetFile.toPath()); + fileTarget.put(fileKey, finalDir); + return targetFile; + } catch (Exception e) { + LOGGER.warn( + "Failed to process file {} in dir {}: {}", file.getName(), finalDir, e.getMessage(), e); + throw e; + } + } + private void createLinksFromSnapshotToSourceDir( - String targetSuffix, File[] files, FolderManager folderManager) - throws DiskSpaceInsufficientException, IOException { + String targetSuffix, File[] files, FolderManager folderManager) throws IOException { Map fileTarget = new HashMap<>(); for (File file : files) { String fileKey = file.getName().split("\\.")[0]; String dataDir = fileTarget.get(fileKey); + if (dataDir != null) { + createLinksFromSnapshotToSourceDir(targetSuffix, file, fileTarget, fileKey, dataDir); + continue; + } + try { - folderManager.getNextWithRetry( - currentDataDir -> { - String effectiveDir = (dataDir != null) ? dataDir : currentDataDir; - File targetFile = - new File( - effectiveDir - + File.separator - + targetSuffix - + File.separator - + file.getName()); - - try { - if (!targetFile.getParentFile().exists() && !targetFile.getParentFile().mkdirs()) { - throw new IOException( - String.format( - "Cannot create directory %s", - targetFile.getParentFile().getAbsolutePath())); - } - - try { - Files.createLink(targetFile.toPath(), file.toPath()); - LOGGER.debug("Created hard link from {} to {}", file, targetFile); - return targetFile; - } catch (IOException e) { - LOGGER.info( - "Cannot create link from {} to {}, fallback to copy", file, targetFile); - } - - Files.copy(file.toPath(), targetFile.toPath()); - fileTarget.put(fileKey, effectiveDir); - return targetFile; - } catch (Exception e) { - LOGGER.warn( - "Failed to process file {} in dir {}: {}", - file.getName(), - effectiveDir, - e.getMessage(), - e); - throw e; - } - }); + String firstFolderOfSameDisk = + IoTDBDescriptor.getInstance().getConfig().isKeepSameDiskWhenLoadingSnapshot() + ? folderManager.getFirstFolderOfSameDisk(file.getAbsolutePath()) + : null; + + if (firstFolderOfSameDisk != null) { + createLinksFromSnapshotToSourceDir( + targetSuffix, file, fileTarget, fileKey, firstFolderOfSameDisk); + } else { + folderManager.getNextWithRetry( + currentDataDir -> + createLinksFromSnapshotToSourceDir( + targetSuffix, file, fileTarget, fileKey, currentDataDir)); + } } catch (Exception e) { throw new IOException( String.format( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java index 9314fc2fa0aa..6b67499c2e18 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java @@ -38,15 +38,21 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; import java.io.File; import java.io.IOException; +import java.nio.file.FileStore; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.List; +import static org.apache.iotdb.consensus.iot.IoTConsensusServerImpl.SNAPSHOT_DIR_NAME; import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; +import static org.junit.Assert.assertEquals; public class IoTDBSnapshotTest { private String[][] testDataDirs = @@ -65,11 +71,12 @@ public void tearDown() throws IOException, StorageEngineException { FileUtils.recursivelyDeleteFolder("target" + File.separator + "tmp"); } - private List writeTsFiles() throws IOException, WriteProcessException { + private List writeTsFiles(String[] dataDirs) + throws IOException, WriteProcessException { List resources = new ArrayList<>(); for (int i = 0; i < 100; i++) { String filePath = - testDataDirs[0][i % 3] + dataDirs[i % dataDirs.length] + File.separator + "sequence" + File.separator @@ -108,7 +115,7 @@ public void testCreateSnapshot() IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(testDataDirs); TierManager.getInstance().resetFolders(); try { - List resources = writeTsFiles(); + List resources = writeTsFiles(testDataDirs[0]); DataRegion region = new DataRegion(testSgName, "0"); region.getTsFileManager().addAll(resources, true); File snapshotDir = new File("target" + File.separator + "snapshot"); @@ -117,12 +124,12 @@ public void testCreateSnapshot() new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); File[] files = snapshotDir.listFiles((dir, name) -> name.equals(SnapshotLogger.SNAPSHOT_LOG_NAME)); - Assert.assertEquals(1, files.length); + assertEquals(1, files.length); SnapshotLogAnalyzer analyzer = new SnapshotLogAnalyzer(files[0]); Assert.assertTrue(analyzer.isSnapshotComplete()); int cnt = analyzer.getTotalFileCountInSnapshot(); analyzer.close(); - Assert.assertEquals(200, cnt); + assertEquals(200, cnt); for (TsFileResource resource : resources) { Assert.assertTrue(resource.tryWriteLock()); } @@ -142,7 +149,7 @@ public void testCreateSnapshotWithUnclosedTsFile() IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(testDataDirs); TierManager.getInstance().resetFolders(); try { - List resources = writeTsFiles(); + List resources = writeTsFiles(testDataDirs[0]); resources.subList(50, 100).forEach(x -> x.setStatusForTest(TsFileResourceStatus.UNCLOSED)); DataRegion region = new DataRegion(testSgName, "0"); region.setAllowCompaction(false); @@ -153,13 +160,13 @@ public void testCreateSnapshotWithUnclosedTsFile() new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true); File[] files = snapshotDir.listFiles((dir, name) -> name.equals(SnapshotLogger.SNAPSHOT_LOG_NAME)); - Assert.assertEquals(1, files.length); + assertEquals(1, files.length); SnapshotLogAnalyzer analyzer = new SnapshotLogAnalyzer(files[0]); int cnt = 0; Assert.assertTrue(analyzer.isSnapshotComplete()); cnt = analyzer.getTotalFileCountInSnapshot(); analyzer.close(); - Assert.assertEquals(100, cnt); + assertEquals(100, cnt); for (TsFileResource resource : resources) { Assert.assertTrue(resource.tryWriteLock()); } @@ -179,7 +186,7 @@ public void testLoadSnapshot() IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(testDataDirs); TierManager.getInstance().resetFolders(); try { - List resources = writeTsFiles(); + List resources = writeTsFiles(testDataDirs[0]); DataRegion region = new DataRegion(testSgName, "0"); CompressionRatio.getInstance().updateRatio(100, 100, "0"); region.getTsFileManager().addAll(resources, true); @@ -195,8 +202,8 @@ public void testLoadSnapshot() .loadSnapshotForStateMachine(); Assert.assertNotNull(dataRegion); List resource = dataRegion.getTsFileManager().getTsFileList(true); - Assert.assertEquals(100, resource.size()); - Assert.assertEquals( + assertEquals(100, resource.size()); + assertEquals( new Pair<>(100L, 100L), CompressionRatio.getInstance().getDataRegionRatioMap().get("0")); } finally { @@ -208,6 +215,86 @@ public void testLoadSnapshot() } } + @Ignore("Need manual execution to specify different disks") + @Test + public void testLoadSnapshotNoHardLink() + throws IOException, WriteProcessException, DirectoryNotLegalException { + IoTDBDescriptor.getInstance().getConfig().setKeepSameDiskWhenLoadingSnapshot(true); + // initialize dirs + String[][] dataDirsForDB = new String[][] {{"C://snapshot_test", "D://snapshot_test"}}; + File snapshotDir = new File("D://snapshot_store//"); + if (snapshotDir.exists()) { + FileUtils.recursivelyDeleteFolder(snapshotDir.getAbsolutePath()); + } + for (String[] dirs : dataDirsForDB) { + for (String dir : dirs) { + if (new File(dir).exists()) { + FileUtils.recursivelyDeleteFolder(dir); + } + } + } + IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(dataDirsForDB); + TierManager.getInstance().resetFolders(); + + // prepare files, files should be written into two folders + List resources = writeTsFiles(dataDirsForDB[0]); + DataRegion region = new DataRegion(testSgName, "0"); + region.getTsFileManager().addAll(resources, true); + + // take a snapshot into one disk + Assert.assertTrue(snapshotDir.exists() || snapshotDir.mkdirs()); + try { + Assert.assertTrue( + new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true)); + File[] files = + snapshotDir.listFiles((dir, name) -> name.equals(SnapshotLogger.SNAPSHOT_LOG_NAME)); + // use loadWithoutLog + if (files != null && files.length > 0) { + files[0].delete(); + } + // move files to snapshot store (simulate snapshot transfer) + for (String dir : dataDirsForDB[0]) { + File internalSnapshotDir = new File(dir, SNAPSHOT_DIR_NAME); + if (internalSnapshotDir.exists()) { + for (File file : FileUtils.listFilesRecursively(internalSnapshotDir, f -> true)) { + if (file.isFile()) { + String absolutePath = file.getAbsolutePath(); + int snapshotIdIndex = absolutePath.indexOf("snapshot_store"); + int suffixIndex = snapshotIdIndex + "snapshot_store".length(); + String suffix = absolutePath.substring(suffixIndex); + File snapshotFile = new File(snapshotDir, suffix); + FileUtils.copyFile(file, snapshotFile); + } + } + } + } + + // load the snapshot + DataRegion dataRegion = + new SnapshotLoader(snapshotDir.getAbsolutePath(), testSgName, "0") + .loadSnapshotForStateMachine(); + Assert.assertNotNull(dataRegion); + resources = dataRegion.getTsFileManager().getTsFileList(true); + assertEquals(100, resources.size()); + + // files should not be moved to another disk + Path snapshotDirPath = snapshotDir.toPath(); + FileStore snapshotFileStore = Files.getFileStore(snapshotDirPath); + for (TsFileResource tsFileResource : resources) { + Path tsfilePath = tsFileResource.getTsFile().toPath(); + FileStore tsFileFileStore = Files.getFileStore(tsfilePath); + assertEquals(snapshotFileStore, tsFileFileStore); + } + } finally { + FileUtils.recursivelyDeleteFolder(snapshotDir.getAbsolutePath()); + for (String[] dirs : dataDirsForDB) { + for (String dir : dirs) { + FileUtils.recursivelyDeleteFolder(dir); + } + } + } + } + @Test public void testGetSnapshotFile() throws IOException { File tsFile = @@ -228,7 +315,7 @@ public void testGetSnapshotFile() throws IOException { Mockito.when(region.getDataRegionIdString()).thenReturn("0"); File snapshotFile = new SnapshotTaker(region).getSnapshotFilePathForTsFile(tsFile, "test-snapshotId"); - Assert.assertEquals( + assertEquals( new File( IoTDBDescriptor.getInstance().getConfig().getLocalDataDirs()[0] + File.separator diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 4b31bf9a286c..580eca54988b 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -1607,6 +1607,12 @@ data_region_iot_max_memory_ratio_for_queue = 0.6 # Datatype: long region_migration_speed_limit_bytes_per_second = 50331648 +# When loading snapshot, try keeping TsFiles in the same disk as the snapshot dir. +# This may reduce file copies but may also result in a worse disk load-balance +# effectiveMode: hot_reload +# Datatype: boolean +keep_same_disk_when_loading_snapshot=false + #################### ### Blob Allocator Configuration #################### diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/FolderManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/FolderManager.java index 95cb24fa8664..e2316885cd0f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/FolderManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/FolderManager.java @@ -32,6 +32,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.file.FileStore; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -146,4 +151,21 @@ public T getNextWithRetry(ThrowingFunction getFolders() { return folders; } + + public String getFirstFolderOfSameDisk(String pathStr) { + Path path = Paths.get(pathStr); + try { + FileStore fileStore = Files.getFileStore(path); + for (String folder : folders) { + Path folderPath = Paths.get(folder); + FileStore folderFileStore = Files.getFileStore(folderPath); + if (folderFileStore.equals(fileStore)) { + return folder; + } + } + } catch (IOException e) { + logger.warn("Failed to read file store path '" + pathStr + "'", e); + } + return null; + } }