From 1da87b95fe7069a1d5a45360e3fe0a5c469a6fea Mon Sep 17 00:00:00 2001 From: arbaazkhan1 Date: Thu, 5 Feb 2026 10:13:22 -0500 Subject: [PATCH 1/2] added new test --- .../accumulo/test/functional/BulkNewIT.java | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java index eb52ec2c819..a17fd9297c3 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java @@ -39,6 +39,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -64,6 +65,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TimeType; @@ -73,6 +75,7 @@ import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.data.LoadPlan.RangeType; import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.data.constraints.Constraint; @@ -715,6 +718,67 @@ public void testExceptionInMetadataUpdate() throws Exception { } } + @Test + public void testManyTabletAndFiles() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String dir = getDir("/testBulkFile-"); + FileSystem fs = getCluster().getFileSystem(); + fs.mkdirs(new Path(dir)); + + TreeSet splits = IntStream.range(1, 9000).mapToObj(BulkNewIT::row).map(Text::new) + .collect(Collectors.toCollection(TreeSet::new)); + c.tableOperations().addSplits(tableName, splits); + + final int numTasks = 16; + var executor = Executors.newFixedThreadPool(numTasks); + var futures = new ArrayList>(numTasks); + // wait for a portion of the tasks to be ready + CountDownLatch startLatch = new CountDownLatch(numTasks); + assertTrue(numTasks >= startLatch.getCount(), + "Not enough tasks/threads to satisfy latch count - deadlock risk"); + + var loadPlanBuilder = LoadPlan.builder(); + var rowsExpected = new HashSet<>(); + var imports = IntStream.range(2, 8999).boxed().collect(Collectors.toList()); + // The order in which imports are added to the load plan should not matter so test that. + Collections.shuffle(imports); + for (var data : imports) { + String filename = "f" + data + "."; + loadPlanBuilder.loadFileTo(filename + RFile.EXTENSION, RangeType.TABLE, row(data - 1), + row(data)); + var future = executor.submit(() -> { + startLatch.countDown(); + startLatch.await(); + writeData(fs, dir + "/" + filename, aconf, data, data); + return null; + }); + futures.add(future); + rowsExpected.add(row(data)); + } + assertEquals(imports.size(), futures.size()); + + for (var future : futures) { + future.get(); + } + + executor.shutdown(); + + var loadPlan = loadPlanBuilder.build(); + + c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load(); + + // using a batch scanner can read from lots of tablets w/ less RPCs + try (var scanner = c.createBatchScanner(tableName)) { + // use a scan server so that tablets do not need to be hosted + scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL); + scanner.setRanges(List.of(new Range())); + var rowsSeen = scanner.stream().map(e -> e.getKey().getRowData().toString()) + .collect(Collectors.toSet()); + assertEquals(rowsExpected, rowsSeen); + } + } + } + @Test public void testManyTablets() throws Exception { From 943dcffcea34333fa8a0a206af9acdd78e8eb92b Mon Sep 17 00:00:00 2001 From: "Dom G." Date: Mon, 9 Feb 2026 10:46:16 -0500 Subject: [PATCH 2/2] Update test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java --- .../java/org/apache/accumulo/test/functional/BulkNewIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java index a17fd9297c3..458ec594c84 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java @@ -731,7 +731,7 @@ public void testManyTabletAndFiles() throws Exception { final int numTasks = 16; var executor = Executors.newFixedThreadPool(numTasks); - var futures = new ArrayList>(numTasks); + var futures = new ArrayList>(); // wait for a portion of the tasks to be ready CountDownLatch startLatch = new CountDownLatch(numTasks); assertTrue(numTasks >= startLatch.getCount(),