Skip to content

Commit 1509780

Browse files
authored
Merge branch 'master' into dplyukhin/improve-contributing-guide
2 parents 19a7fe4 + 1386d4b commit 1509780

17 files changed

Lines changed: 215 additions & 216 deletions

File tree

.github/workflows/nightly-throughput-stress.yml

Lines changed: 0 additions & 202 deletions
This file was deleted.

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
![Temporal Java SDK](https://raw.githubusercontent.com/temporalio/assets/main/files/w/java.png)
2+
13
# Temporal Java SDK [![Build status](https://github.com/temporalio/sdk-java/actions/workflows/ci.yml/badge.svg?event=push)](https://github.com/temporalio/sdk-java/actions/workflows/ci.yml) [![Coverage Status](https://coveralls.io/repos/github/temporalio/sdk-java/badge.svg?branch=master)](https://coveralls.io/github/temporalio/sdk-java?branch=master)
24

35
[Temporal](https://github.com/temporalio/temporal) is a Workflow-as-Code platform for building and operating
@@ -95,4 +97,4 @@ Unless required by applicable law or agreed to in writing, software
9597
distributed under the License is distributed on an "AS IS" BASIS,
9698
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
9799
See the License for the specific language governing permissions and
98-
limitations under the License.
100+
limitations under the License.

temporal-sdk/src/main/java/io/temporal/internal/common/LinkConverter.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,19 @@ public class LinkConverter {
3232

3333
public static io.temporal.api.nexus.v1.Link workflowEventToNexusLink(Link.WorkflowEvent we) {
3434
try {
35+
3536
String url =
3637
String.format(
3738
linkPathFormat,
3839
URLEncoder.encode(we.getNamespace(), StandardCharsets.UTF_8.toString()),
39-
URLEncoder.encode(we.getWorkflowId(), StandardCharsets.UTF_8.toString()),
40+
// The 'replace' below handles spaces - the encoder will convert them to a plus,
41+
// which the UI then handles as a plus, thus breaking the link as the
42+
// space is lost.
43+
// It's a known quirk with the URLEncoder as it encodes for forms, not general URIs.
44+
// Only done for the WorkflowId as the other two are values we control,
45+
// and will never have spaces.
46+
URLEncoder.encode(we.getWorkflowId(), StandardCharsets.UTF_8.toString())
47+
.replace("+", "%20"),
4048
URLEncoder.encode(we.getRunId(), StandardCharsets.UTF_8.toString()));
4149

4250
List<Map.Entry<String, String>> queryParams = new ArrayList<>();

temporal-sdk/src/main/java/io/temporal/internal/nexus/InternalNexusOperationContext.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,21 @@
1010
public class InternalNexusOperationContext {
1111
private final String namespace;
1212
private final String taskQueue;
13+
private final String endpoint;
1314
private final Scope metricScope;
1415
private final WorkflowClient client;
1516
NexusOperationOutboundCallsInterceptor outboundCalls;
1617
Link startWorkflowResponseLink;
1718

1819
public InternalNexusOperationContext(
19-
String namespace, String taskQueue, Scope metricScope, WorkflowClient client) {
20+
String namespace,
21+
String taskQueue,
22+
String endpoint,
23+
Scope metricScope,
24+
WorkflowClient client) {
2025
this.namespace = namespace;
2126
this.taskQueue = taskQueue;
27+
this.endpoint = endpoint;
2228
this.metricScope = metricScope;
2329
this.client = client;
2430
}
@@ -39,6 +45,10 @@ public String getNamespace() {
3945
return namespace;
4046
}
4147

48+
public String getEndpoint() {
49+
return endpoint;
50+
}
51+
4252
public void setOutboundInterceptor(NexusOperationOutboundCallsInterceptor outboundCalls) {
4353
this.outboundCalls = outboundCalls;
4454
}

temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusInfoImpl.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
class NexusInfoImpl implements NexusOperationInfo {
66
private final String namespace;
77
private final String taskQueue;
8+
private final String endpoint;
89

9-
NexusInfoImpl(String namespace, String taskQueue) {
10+
NexusInfoImpl(String namespace, String taskQueue, String endpoint) {
1011
this.namespace = namespace;
1112
this.taskQueue = taskQueue;
13+
this.endpoint = endpoint;
1214
}
1315

1416
@Override
@@ -20,4 +22,9 @@ public String getNamespace() {
2022
public String getTaskQueue() {
2123
return taskQueue;
2224
}
25+
26+
@Override
27+
public String getEndpoint() {
28+
return endpoint;
29+
}
2330
}

temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException
116116
}
117117

118118
CurrentNexusOperationContext.set(
119-
new InternalNexusOperationContext(namespace, taskQueue, metricsScope, client));
119+
new InternalNexusOperationContext(
120+
namespace, taskQueue, request.getEndpoint(), metricsScope, client));
120121

121122
switch (request.getVariantCase()) {
122123
case START_OPERATION:

temporal-sdk/src/main/java/io/temporal/internal/nexus/TemporalInterceptorMiddleware.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ public OperationHandler<Object, Object> intercept(
2929
temporalNexusContext.getMetricsScope(),
3030
temporalNexusContext.getWorkflowClient(),
3131
new NexusInfoImpl(
32-
temporalNexusContext.getNamespace(), temporalNexusContext.getTaskQueue())));
32+
temporalNexusContext.getNamespace(),
33+
temporalNexusContext.getTaskQueue(),
34+
temporalNexusContext.getEndpoint())));
3335
return new OperationInterceptorConverter(inboundCallsInterceptor);
3436
}
3537

temporal-sdk/src/main/java/io/temporal/internal/worker/ShutdownManager.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,23 @@ public class ShutdownManager implements Closeable {
2121
new ExecutorThreadFactory(
2222
WorkerThreadsNameHelper.SHUTDOWN_MANAGER_THREAD_NAME_PREFIX, null));
2323

24-
private static final int CHECK_PERIOD_MS = 250;
24+
private static final int DEFAULT_CHECK_PERIOD_MS = 250;
25+
private final int checkPeriodMs;
26+
27+
public ShutdownManager() {
28+
this(DEFAULT_CHECK_PERIOD_MS);
29+
}
30+
31+
/**
32+
* @param checkPeriodMs interval in milliseconds between shutdown-completion polls. Lower values
33+
* speed up teardown at the cost of more frequent polling. Must be positive.
34+
*/
35+
public ShutdownManager(int checkPeriodMs) {
36+
if (checkPeriodMs <= 0) {
37+
throw new IllegalArgumentException("checkPeriodMs must be positive, was: " + checkPeriodMs);
38+
}
39+
this.checkPeriodMs = checkPeriodMs;
40+
}
2541

2642
/** executorToShutdown.shutdownNow() -&gt; timed wait for a graceful termination */
2743
public CompletableFuture<Void> shutdownExecutorNow(
@@ -97,7 +113,7 @@ private CompletableFuture<Void> untimedWait(
97113
*/
98114
private CompletableFuture<Void> limitedWait(
99115
ExecutorService executorToShutdown, String executorName, Duration timeout) {
100-
int attempts = (int) Math.ceil((double) timeout.toMillis() / CHECK_PERIOD_MS);
116+
int attempts = (int) Math.ceil((double) timeout.toMillis() / checkPeriodMs);
101117

102118
CompletableFuture<Void> future = new CompletableFuture<>();
103119
scheduledExecutorService.submit(
@@ -167,7 +183,7 @@ public void run() {
167183
promise.complete(null);
168184
return;
169185
}
170-
scheduledExecutorService.schedule(this, CHECK_PERIOD_MS, TimeUnit.MILLISECONDS);
186+
scheduledExecutorService.schedule(this, checkPeriodMs, TimeUnit.MILLISECONDS);
171187
}
172188

173189
abstract boolean isTerminated();
@@ -238,7 +254,7 @@ public void run() {
238254
onSlowTermination();
239255
}
240256
}
241-
scheduledExecutorService.schedule(this, CHECK_PERIOD_MS, TimeUnit.MILLISECONDS);
257+
scheduledExecutorService.schedule(this, checkPeriodMs, TimeUnit.MILLISECONDS);
242258
}
243259

244260
abstract boolean isTerminated();

temporal-sdk/src/main/java/io/temporal/nexus/NexusOperationInfo.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,10 @@ public interface NexusOperationInfo {
1414
* @return Nexus Task Queue of the worker that is executing the Nexus Operation
1515
*/
1616
String getTaskQueue();
17+
18+
/**
19+
* @return Endpoint that the Nexus request was addressed to before being forwarded to this worker.
20+
* Supported from server v1.30.0.
21+
*/
22+
String getEndpoint();
1723
}

temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,8 @@ private void shutdownInternal(boolean interruptUserTasks) {
411411

412412
/** Internal method that actually shuts down workers. Called from the plugin chain. */
413413
private void doShutdown(boolean interruptUserTasks) {
414-
ShutdownManager shutdownManager = new ShutdownManager();
414+
ShutdownManager shutdownManager =
415+
new ShutdownManager((int) factoryOptions.getShutdownCheckInterval().toMillis());
415416

416417
// Shutdown each worker with plugin hooks
417418
List<CompletableFuture<Void>> shutdownFutures = new ArrayList<>();

0 commit comments

Comments
 (0)