diff --git a/src/main/java/com/marklogic/contentpump/LocalJobRunner.java b/src/main/java/com/marklogic/contentpump/LocalJobRunner.java index e735e017d..f2bdef0a5 100644 --- a/src/main/java/com/marklogic/contentpump/LocalJobRunner.java +++ b/src/main/java/com/marklogic/contentpump/LocalJobRunner.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2021 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + * Copyright (c) 2011-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -137,7 +137,6 @@ void run() throws Exception { } // Initialize thread pool pool = threadManager.initThreadPool(); - threadManager.runThreadPoller(); progress = new AtomicInteger[splits.size()]; for (int i = 0; i < splits.size(); i++) { @@ -219,6 +218,12 @@ void run() throws Exception { } } } + // Start thread poller after all tasks are submitted to avoid race + // condition where taskList grows between active count snapshot and + // scale method iteration. + if (pool != null) { + threadManager.runThreadPoller(); + } threadManager.shutdownThreadPool(); job.setJobState(JobStatus.State.SUCCEEDED); monitor.interrupt(); diff --git a/src/main/java/com/marklogic/contentpump/ThreadManager.java b/src/main/java/com/marklogic/contentpump/ThreadManager.java index 632d282d8..9893cca06 100644 --- a/src/main/java/com/marklogic/contentpump/ThreadManager.java +++ b/src/main/java/com/marklogic/contentpump/ThreadManager.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + * Copyright (c) 2011-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -260,6 +260,7 @@ public void scaleOutThreadPool(int activeTaskCounts) { pool.setCorePoolSize(newServerThreads); } // Assign new available threads to each task + int activeIdx = 0; for (int i = 0; i < taskList.size(); i++) { LocalMapTask task = taskList.get(i); if (task.getMapperClass() == (Class)MultithreadedMapper.class) { @@ -272,13 +273,21 @@ public void scaleOutThreadPool(int activeTaskCounts) { } continue; } + if (activeIdx >= randomIndexes.size()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Reached end of active task snapshot; " + + "remaining tasks will be considered in the " + + "next polling cycle."); + } + break; + } // In assignThreads, pass down a random index as splitIndex to // make sure threads are more evenly assigned. The total delta // threads in a scale-out event equals to the new available // threads plus the idle threads that finish running other // LocalMapTasks. int deltaTaskThreads = assignThreads( - randomIndexes.get(i), activeTaskCounts, + randomIndexes.get(activeIdx++), activeTaskCounts, (newServerThreads - curServerThreads + idleServerThreads), false); int newTaskThreads = deltaTaskThreads + task.getThreadCount(); @@ -313,6 +322,7 @@ public void scaleInThreadPool(int activeTaskCounts) { LOG.info("Thread pool is scaling-in. New thread pool size: " + newServerThreads); // Deduct runners from each task + int activeIdx = 0; for (int i = 0; i < taskList.size(); i++) { LocalMapTask task = taskList.get(i); if (task.getMapperClass() == (Class)MultithreadedMapper.class) { @@ -325,11 +335,19 @@ public void scaleInThreadPool(int activeTaskCounts) { } continue; } + if (activeIdx >= randomIndexes.size()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Reached end of active task snapshot; " + + "remaining tasks will be considered in the " + + "next polling cycle."); + } + break; + } // The total delta threads in a scale-in event equals to the // unavailable threads minus the idle threads that finish // running other LocalMapTasks. int deltaTaskThreads = assignThreads( - randomIndexes.get(i), activeTaskCounts, + randomIndexes.get(activeIdx++), activeTaskCounts, (curServerThreads - newServerThreads - idleServerThreads), false); int newTaskThreads = task.getThreadCount() - deltaTaskThreads; @@ -368,6 +386,7 @@ public void assignIdleThreads(int activeTaskCounts) { LOG.debug("Assigning idle threads to each LocalMapTask. Idle thread" + "counts: " + idleServerThreads); } + int activeIdx = 0; for (int i = 0; i < taskList.size(); i++) { LocalMapTask task = taskList.get(i); if (task.isTaskDone()) { @@ -378,7 +397,15 @@ public void assignIdleThreads(int activeTaskCounts) { } continue; } - int deltaTaskThreads = assignThreads(randomIndexes.get(i), + if (activeIdx >= randomIndexes.size()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Reached end of active task snapshot; " + + "remaining tasks will be considered in the " + + "next polling cycle."); + } + break; + } + int deltaTaskThreads = assignThreads(randomIndexes.get(activeIdx++), activeTaskCounts, idleServerThreads, false); if (task.getMapperClass() == (Class)MultithreadedMapper.class) { int newTaskThreads = deltaTaskThreads + task.getThreadCount(); diff --git a/src/main/java/com/marklogic/mapreduce/examples/ContentReader.java b/src/main/java/com/marklogic/mapreduce/examples/ContentReader.java index 332f2fb67..aed2702ca 100644 --- a/src/main/java/com/marklogic/mapreduce/examples/ContentReader.java +++ b/src/main/java/com/marklogic/mapreduce/examples/ContentReader.java @@ -95,9 +95,6 @@ public static void main(String[] args) throws Exception { } static class SslOptions implements SslConfigOptions { - public static final Log LOG = - LogFactory.getLog(SslOptions.class); - @Override public String[] getEnabledCipherSuites() { return null; @@ -109,13 +106,9 @@ public String[] getEnabledProtocols() { } @Override - public SSLContext getSslContext() { - SSLContext sslContext = null; - try { - sslContext = SSLContext.getInstance("TLSv1.3"); - } catch (NoSuchAlgorithmException e) { - LOG.error("Error creating SSLContext", e); - } + public SSLContext getSslContext() + throws NoSuchAlgorithmException, KeyManagementException { + SSLContext sslContext = SSLContext.getInstance("TLSv1.3"); TrustManager[] trustManagers = null; // Trust anyone. trustManagers = new TrustManager[] { new X509TrustManager() { @@ -133,13 +126,9 @@ public X509Certificate[] getAcceptedIssuers() { return null; } } }; - + KeyManager[] keyManagers = null; - try { - sslContext.init(keyManagers, trustManagers, null); - } catch (KeyManagementException e) { - LOG.error("Error initializing SSLContext", e); - } + sslContext.init(keyManagers, trustManagers, null); return sslContext; } }