Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/main/java/com/marklogic/contentpump/LocalJobRunner.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2021 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
* Copyright (c) 2011-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -137,7 +137,6 @@ void run() throws Exception {
}
// Initialize thread pool
pool = threadManager.initThreadPool();
threadManager.runThreadPoller();

progress = new AtomicInteger[splits.size()];
for (int i = 0; i < splits.size(); i++) {
Expand Down Expand Up @@ -219,6 +218,12 @@ void run() throws Exception {
}
}
}
// Start thread poller after all tasks are submitted to avoid race
// condition where taskList grows between active count snapshot and
// scale method iteration.
if (pool != null) {
threadManager.runThreadPoller();
}
threadManager.shutdownThreadPool();
job.setJobState(JobStatus.State.SUCCEEDED);
monitor.interrupt();
Expand Down
35 changes: 31 additions & 4 deletions src/main/java/com/marklogic/contentpump/ThreadManager.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
* Copyright (c) 2011-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -260,6 +260,7 @@ public void scaleOutThreadPool(int activeTaskCounts) {
pool.setCorePoolSize(newServerThreads);
}
// Assign new available threads to each task
int activeIdx = 0;
for (int i = 0; i < taskList.size(); i++) {
LocalMapTask task = taskList.get(i);
if (task.getMapperClass() == (Class)MultithreadedMapper.class) {
Expand All @@ -272,13 +273,21 @@ public void scaleOutThreadPool(int activeTaskCounts) {
}
continue;
}
if (activeIdx >= randomIndexes.size()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Reached end of active task snapshot; " +
"remaining tasks will be considered in the " +
"next polling cycle.");
}
break;
}
// In assignThreads, pass down a random index as splitIndex to
// make sure threads are more evenly assigned. The total delta
// threads in a scale-out event equals to the new available
// threads plus the idle threads that finish running other
// LocalMapTasks.
int deltaTaskThreads = assignThreads(
randomIndexes.get(i), activeTaskCounts,
randomIndexes.get(activeIdx++), activeTaskCounts,
(newServerThreads - curServerThreads + idleServerThreads),
false);
int newTaskThreads = deltaTaskThreads + task.getThreadCount();
Expand Down Expand Up @@ -313,6 +322,7 @@ public void scaleInThreadPool(int activeTaskCounts) {
LOG.info("Thread pool is scaling-in. New thread pool size: " +
newServerThreads);
// Deduct runners from each task
int activeIdx = 0;
for (int i = 0; i < taskList.size(); i++) {
LocalMapTask task = taskList.get(i);
if (task.getMapperClass() == (Class)MultithreadedMapper.class) {
Expand All @@ -325,11 +335,19 @@ public void scaleInThreadPool(int activeTaskCounts) {
}
continue;
}
if (activeIdx >= randomIndexes.size()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Reached end of active task snapshot; " +
"remaining tasks will be considered in the " +
"next polling cycle.");
}
break;
}
// The total delta threads in a scale-in event equals to the
// unavailable threads minus the idle threads that finish
// running other LocalMapTasks.
int deltaTaskThreads = assignThreads(
randomIndexes.get(i), activeTaskCounts,
randomIndexes.get(activeIdx++), activeTaskCounts,
(curServerThreads - newServerThreads - idleServerThreads),
false);
int newTaskThreads = task.getThreadCount() - deltaTaskThreads;
Expand Down Expand Up @@ -368,6 +386,7 @@ public void assignIdleThreads(int activeTaskCounts) {
LOG.debug("Assigning idle threads to each LocalMapTask. Idle thread" +
"counts: " + idleServerThreads);
}
int activeIdx = 0;
for (int i = 0; i < taskList.size(); i++) {
LocalMapTask task = taskList.get(i);
if (task.isTaskDone()) {
Expand All @@ -378,7 +397,15 @@ public void assignIdleThreads(int activeTaskCounts) {
}
continue;
}
int deltaTaskThreads = assignThreads(randomIndexes.get(i),
if (activeIdx >= randomIndexes.size()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Reached end of active task snapshot; " +
"remaining tasks will be considered in the " +
"next polling cycle.");
}
break;
}
int deltaTaskThreads = assignThreads(randomIndexes.get(activeIdx++),
activeTaskCounts, idleServerThreads, false);
if (task.getMapperClass() == (Class)MultithreadedMapper.class) {
int newTaskThreads = deltaTaskThreads + task.getThreadCount();
Expand Down
21 changes: 5 additions & 16 deletions src/main/java/com/marklogic/mapreduce/examples/ContentReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,6 @@ public static void main(String[] args) throws Exception {
}

static class SslOptions implements SslConfigOptions {
public static final Log LOG =
LogFactory.getLog(SslOptions.class);

@Override
public String[] getEnabledCipherSuites() {
return null;
Expand All @@ -109,13 +106,9 @@ public String[] getEnabledProtocols() {
}

@Override
public SSLContext getSslContext() {
SSLContext sslContext = null;
try {
sslContext = SSLContext.getInstance("TLSv1.3");
} catch (NoSuchAlgorithmException e) {
LOG.error("Error creating SSLContext", e);
}
public SSLContext getSslContext()
throws NoSuchAlgorithmException, KeyManagementException {
SSLContext sslContext = SSLContext.getInstance("TLSv1.3");
TrustManager[] trustManagers = null;
// Trust anyone.
trustManagers = new TrustManager[] { new X509TrustManager() {
Expand All @@ -133,13 +126,9 @@ public X509Certificate[] getAcceptedIssuers() {
return null;
}
} };

KeyManager[] keyManagers = null;
try {
sslContext.init(keyManagers, trustManagers, null);
} catch (KeyManagementException e) {
LOG.error("Error initializing SSLContext", e);
}
sslContext.init(keyManagers, trustManagers, null);
return sslContext;
Comment thread
tposham marked this conversation as resolved.
}
}
Expand Down
Loading