Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@

import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly;

Expand All @@ -51,6 +54,8 @@ public class IoTDBRegionGroupExpandAndShrinkForIoTV1IT
extends IoTDBRegionOperationReliabilityITFramework {
private static final String EXPAND_FORMAT = "extend region %d to %d";
private static final String SHRINK_FORMAT = "remove region %d from %d";
private static final String MULTI_EXPAND_FORMAT = "extend region %s to %d";
private static final String MULTI_SHRINK_FORMAT = "remove region %s from %d";

private static Logger LOGGER =
LoggerFactory.getLogger(IoTDBRegionGroupExpandAndShrinkForIoTV1IT.class);
Expand All @@ -65,7 +70,7 @@ public class IoTDBRegionGroupExpandAndShrinkForIoTV1IT
* <p>4. Check
*/
@Test
public void normal1C5DTest() throws Exception {
public void singleRegionTest() throws Exception {
EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
Expand Down Expand Up @@ -181,4 +186,326 @@ private void regionGroupShrink(

LOGGER.info("Region {} has shrunk from DataNode {}", selectedRegion, targetDataNode);
}

/**
* Test multi-region expand and shrink operations with normal flow: 1. Multi-expand: expand
* multiple regions to target DataNode 2. Multi-shrink: shrink multiple regions from target
* DataNode
*/
@Test
public void multiRegionNormalTest() throws Exception {
EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataReplicationFactor(1)
.setSchemaReplicationFactor(1);

EnvFactory.getEnv().initClusterEnvironment(1, 5);

try (final Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection());
final Statement statement = makeItCloseQuietly(connection.createStatement());
SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
// prepare data
statement.execute(INSERTION1);
statement.execute(FLUSH_COMMAND);

// collect necessary information
Map<Integer, Set<Integer>> regionMap = getAllRegionMap(statement);
Set<Integer> allDataNodeId = getAllDataNodes(statement);

// expect one data region, one schema region
// plus one system data region, one system schema region
Assert.assertEquals(4, regionMap.size());

// select multiple regions for testing
List<Integer> selectedRegions = new ArrayList<>(regionMap.keySet());
selectedRegions = selectedRegions.subList(0, Math.min(3, selectedRegions.size()));

// find target DataNode that doesn't contain any of the selected regions
int targetDataNode =
findDataNodeNotContainsAnyRegion(allDataNodeId, regionMap, selectedRegions);

LOGGER.info("Selected regions for multi-region test: {}", selectedRegions);
LOGGER.info("Target DataNode: {}", targetDataNode);

// multi-expand: expand all selected regions to target DataNode
multiRegionGroupExpand(statement, client, selectedRegions, targetDataNode);

// verify expand result
regionMap = getAllRegionMap(statement);
for (int regionId : selectedRegions) {
Assert.assertTrue(
"Region " + regionId + " should contain target DataNode " + targetDataNode,
regionMap.get(regionId).contains(targetDataNode));
}
LOGGER.info("Multi-region expand test passed");

// multi-shrink: shrink all selected regions from target DataNode
multiRegionGroupShrink(statement, client, selectedRegions, targetDataNode);

// verify shrink result
regionMap = getAllRegionMap(statement);
for (int regionId : selectedRegions) {
Assert.assertFalse(
"Region " + regionId + " should not contain target DataNode " + targetDataNode,
regionMap.get(regionId).contains(targetDataNode));
}
LOGGER.info("Multi-region shrink test passed");
}
}

/** Test multi-region expand with partial regions already in target DataNode */
@Test
public void multiRegionExpandPartialExistTest() throws Exception {
EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataReplicationFactor(1)
.setSchemaReplicationFactor(1);

EnvFactory.getEnv().initClusterEnvironment(1, 5);

try (final Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection());
final Statement statement = makeItCloseQuietly(connection.createStatement());
SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
// prepare data
statement.execute(INSERTION1);
statement.execute(FLUSH_COMMAND);

Map<Integer, Set<Integer>> regionMap = getAllRegionMap(statement);
Set<Integer> allDataNodeId = getAllDataNodes(statement);

List<Integer> allRegions = new ArrayList<>(regionMap.keySet());
List<Integer> selectedRegions = allRegions.subList(0, Math.min(3, allRegions.size()));

int targetDataNode =
findDataNodeNotContainsAnyRegion(allDataNodeId, regionMap, selectedRegions);

// first expand some regions individually
List<Integer> preExpandRegions =
selectedRegions.subList(0, Math.min(2, selectedRegions.size()));
for (int regionId : preExpandRegions) {
regionGroupExpand(statement, client, regionId, targetDataNode);
}

// now try to expand all regions (including already expanded ones)
LOGGER.info(
"Testing multi-expand with regions {} to DataNode {}, where {} already exist",
selectedRegions,
targetDataNode,
preExpandRegions);

multiRegionGroupExpand(statement, client, selectedRegions, targetDataNode);

// verify all regions are in target DataNode
regionMap = getAllRegionMap(statement);
for (int regionId : selectedRegions) {
Assert.assertTrue(
"Region " + regionId + " should contain target DataNode " + targetDataNode,
regionMap.get(regionId).contains(targetDataNode));
}
LOGGER.info("Multi-region expand partial exist test passed");
}
}

/** Test multi-region shrink with partial regions not in target DataNode */
@Test
public void multiRegionShrinkPartialNotExistTest() throws Exception {
EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataReplicationFactor(1)
.setSchemaReplicationFactor(1);

EnvFactory.getEnv().initClusterEnvironment(1, 5);

try (final Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection());
final Statement statement = makeItCloseQuietly(connection.createStatement());
SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
// prepare data
statement.execute(INSERTION1);
statement.execute(FLUSH_COMMAND);

Map<Integer, Set<Integer>> regionMap = getAllRegionMap(statement);
Set<Integer> allDataNodeId = getAllDataNodes(statement);

List<Integer> allRegions = new ArrayList<>(regionMap.keySet());
List<Integer> selectedRegions = allRegions.subList(0, Math.min(3, allRegions.size()));

int targetDataNode =
findDataNodeNotContainsAnyRegion(allDataNodeId, regionMap, selectedRegions);

// first expand all regions to target DataNode
multiRegionGroupExpand(statement, client, selectedRegions, targetDataNode);

// then shrink some regions individually
List<Integer> preShrinkRegions =
selectedRegions.subList(0, Math.min(2, selectedRegions.size()));
for (int regionId : preShrinkRegions) {
regionGroupShrink(statement, client, regionId, targetDataNode);
}

// now try to shrink all regions (including already shrunk ones)
LOGGER.info(
"Testing multi-shrink with regions {} from DataNode {}, where {} already removed",
selectedRegions,
targetDataNode,
preShrinkRegions);

multiRegionGroupShrink(statement, client, selectedRegions, targetDataNode);

// verify all regions are not in target DataNode
regionMap = getAllRegionMap(statement);
for (int regionId : selectedRegions) {
Assert.assertFalse(
"Region " + regionId + " should not contain target DataNode " + targetDataNode,
regionMap.get(regionId).contains(targetDataNode));
}
LOGGER.info("Multi-region shrink partial not exist test passed");
}
}

private void multiRegionGroupExpand(
Statement statement,
SyncConfigNodeIServiceClient client,
List<Integer> regionIds,
int targetDataNode)
throws Exception {
String command = buildMultiRegionCommand(MULTI_EXPAND_FORMAT, regionIds, targetDataNode);

Predicate<TShowRegionResp> expandPredicate =
tShowRegionResp -> {
Map<Integer, Set<Integer>> newRegionMap =
getRunningRegionMap(tShowRegionResp.getRegionInfoList());
return regionIds.stream()
.allMatch(
regionId -> {
Set<Integer> dataNodes = newRegionMap.get(regionId);
return dataNodes != null && dataNodes.contains(targetDataNode);
});
};

executeMultiRegionOperation(
statement,
client,
command,
regionIds,
expandPredicate,
Optional.of(targetDataNode),
Optional.empty(),
"expand");
}

private void multiRegionGroupShrink(
Statement statement,
SyncConfigNodeIServiceClient client,
List<Integer> regionIds,
int targetDataNode)
throws Exception {
String command = buildMultiRegionCommand(MULTI_SHRINK_FORMAT, regionIds, targetDataNode);

Predicate<TShowRegionResp> shrinkPredicate =
tShowRegionResp -> {
Map<Integer, Set<Integer>> newRegionMap =
getRegionMap(tShowRegionResp.getRegionInfoList());
return regionIds.stream()
.allMatch(
regionId -> {
Set<Integer> dataNodes = newRegionMap.get(regionId);
return dataNodes == null || !dataNodes.contains(targetDataNode);
});
};

executeMultiRegionOperation(
statement,
client,
command,
regionIds,
shrinkPredicate,
Optional.empty(),
Optional.of(targetDataNode),
"shrink");
}

private String buildMultiRegionCommand(
String format, List<Integer> regionIds, int targetDataNode) {
String regionIdStr = regionIds.stream().map(String::valueOf).collect(Collectors.joining(","));
return String.format(format, regionIdStr, targetDataNode);
}

private void executeMultiRegionOperation(
Statement statement,
SyncConfigNodeIServiceClient client,
String command,
List<Integer> regionIds,
Predicate<TShowRegionResp> predicate,
Optional<Integer> expectedDataNode,
Optional<Integer> notExpectedDataNode,
String operationType) {

LOGGER.info("Executing multi-region {} command: {}", operationType, command);

Awaitility.await()
.atMost(30, TimeUnit.SECONDS)
.pollInterval(2, TimeUnit.SECONDS)
.until(
() -> {
try {
statement.execute(command);
return true;
} catch (Exception e) {
String errorMessage = e.getMessage();
// If error message contains both "successfully submitted" and "failed to submit",
// consider it as partial success and continue
if (errorMessage != null
&& errorMessage.contains("successfully submitted")
&& errorMessage.contains("failed to submit")) {
LOGGER.warn(
"Multi-region {} partially succeeded: {}", operationType, errorMessage);
return true;
}
LOGGER.warn(
"Multi-region {} command execution failed, retrying: {}",
operationType,
errorMessage);
return false;
}
});

// Use the first region for awaitUntilSuccess (framework limitation)
awaitUntilSuccess(client, regionIds.get(0), predicate, expectedDataNode, notExpectedDataNode);

String targetDescription =
expectedDataNode.isPresent()
? "to DataNode " + expectedDataNode.get()
: "from DataNode " + notExpectedDataNode.get();
LOGGER.info(
"Regions {} have {} {}",
regionIds,
operationType.equals("expand") ? "expanded" : "shrunk",
targetDescription);
}

private int findDataNodeNotContainsAnyRegion(
Set<Integer> allDataNodeId, Map<Integer, Set<Integer>> regionMap, List<Integer> regionIds) {
return allDataNodeId.stream()
.filter(
dataNodeId ->
regionIds.stream()
.noneMatch(regionId -> regionMap.get(regionId).contains(dataNodeId)))
.findFirst()
.orElseThrow(
() ->
new RuntimeException(
"Cannot find DataNode that doesn't contain any of the regions"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -541,11 +541,11 @@ reconstructRegion
;

extendRegion
: EXTEND REGION regionId=INTEGER_LITERAL TO targetDataNodeId=INTEGER_LITERAL
: EXTEND REGION regionIds+=INTEGER_LITERAL (COMMA regionIds+=INTEGER_LITERAL)* TO targetDataNodeId=INTEGER_LITERAL
;

removeRegion
: REMOVE REGION regionId=INTEGER_LITERAL FROM targetDataNodeId=INTEGER_LITERAL
: REMOVE REGION regionIds+=INTEGER_LITERAL (COMMA regionIds+=INTEGER_LITERAL)* FROM targetDataNodeId=INTEGER_LITERAL
;

verifyConnection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2489,15 +2489,15 @@ public TSStatus reconstructRegion(TReconstructRegionReq req) {
public TSStatus extendRegion(TExtendRegionReq req) {
TSStatus status = confirmLeader();
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
? procedureManager.extendRegion(req)
? procedureManager.extendRegions(req)
: status;
}

@Override
public TSStatus removeRegion(TRemoveRegionReq req) {
TSStatus status = confirmLeader();
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
? procedureManager.removeRegion(req)
? procedureManager.removeRegions(req)
: status;
}

Expand Down
Loading