Skip to content

Commit 7facf39

Browse files
committed
Updated for code review comments
1 parent 5ca7674 commit 7facf39

11 files changed

Lines changed: 202 additions & 86 deletions

File tree

core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ temporal operator nexus endpoint create \
3333
--target-task-queue nexus-messaging-handler-task-queue
3434
```
3535

36+
This sample loads connection settings from `ClientConfigProfile`. The
37+
`nexus-messaging-handler` and `nexus-messaging-caller` profiles are defined in
38+
`core/src/main/resources/config.toml`. You can override settings with environment
39+
variables or by editing the TOML file (see the `envconfig` sample for details).
40+
3641
In one terminal, start the handler worker:
3742

3843
```bash

core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/caller/CallerStarter.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,34 @@
11
package io.temporal.samples.nexus_messaging.callerpattern.caller;
22

33
import io.temporal.client.WorkflowClient;
4-
import io.temporal.client.WorkflowClientOptions;
54
import io.temporal.client.WorkflowOptions;
5+
import io.temporal.envconfig.ClientConfigProfile;
6+
import io.temporal.envconfig.LoadClientConfigProfileOptions;
67
import io.temporal.serviceclient.WorkflowServiceStubs;
8+
import java.nio.file.Paths;
79
import java.util.List;
810
import java.util.UUID;
911

1012
public class CallerStarter {
1113

1214
public static void main(String[] args) {
13-
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
14-
WorkflowClient client =
15-
WorkflowClient.newInstance(
16-
service,
17-
WorkflowClientOptions.newBuilder().setNamespace(CallerWorker.NAMESPACE).build());
15+
ClientConfigProfile profile;
16+
try {
17+
String configFilePath =
18+
Paths.get(CallerStarter.class.getResource("/config.toml").toURI()).toString();
19+
profile =
20+
ClientConfigProfile.load(
21+
LoadClientConfigProfileOptions.newBuilder()
22+
.setConfigFilePath(configFilePath)
23+
.setConfigFileProfile(CallerWorker.CONFIG_PROFILE)
24+
.build());
25+
} catch (Exception e) {
26+
throw new RuntimeException("Failed to load client configuration", e);
27+
}
28+
29+
WorkflowServiceStubs service =
30+
WorkflowServiceStubs.newServiceStubs(profile.toWorkflowServiceStubsOptions());
31+
WorkflowClient client = WorkflowClient.newInstance(service, profile.toWorkflowClientOptions());
1832

1933
CallerWorkflow workflow =
2034
client.newWorkflowStub(

core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/caller/CallerWorker.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,43 @@
11
package io.temporal.samples.nexus_messaging.callerpattern.caller;
22

33
import io.temporal.client.WorkflowClient;
4-
import io.temporal.client.WorkflowClientOptions;
4+
import io.temporal.envconfig.ClientConfigProfile;
5+
import io.temporal.envconfig.LoadClientConfigProfileOptions;
56
import io.temporal.serviceclient.WorkflowServiceStubs;
67
import io.temporal.worker.Worker;
78
import io.temporal.worker.WorkerFactory;
89
import io.temporal.worker.WorkflowImplementationOptions;
910
import io.temporal.workflow.NexusServiceOptions;
11+
import java.nio.file.Paths;
1012
import java.util.Collections;
1113
import org.slf4j.Logger;
1214
import org.slf4j.LoggerFactory;
1315

1416
public class CallerWorker {
1517
private static final Logger logger = LoggerFactory.getLogger(CallerWorker.class);
1618

17-
public static final String NAMESPACE = "nexus-messaging-caller-namespace";
19+
static final String CONFIG_PROFILE = "nexus-messaging-caller";
1820
public static final String TASK_QUEUE = "nexus-messaging-caller-task-queue";
1921
static final String NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint";
2022

2123
public static void main(String[] args) throws InterruptedException {
22-
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
23-
WorkflowClient client =
24-
WorkflowClient.newInstance(
25-
service, WorkflowClientOptions.newBuilder().setNamespace(NAMESPACE).build());
24+
ClientConfigProfile profile;
25+
try {
26+
String configFilePath =
27+
Paths.get(CallerWorker.class.getResource("/config.toml").toURI()).toString();
28+
profile =
29+
ClientConfigProfile.load(
30+
LoadClientConfigProfileOptions.newBuilder()
31+
.setConfigFilePath(configFilePath)
32+
.setConfigFileProfile(CONFIG_PROFILE)
33+
.build());
34+
} catch (Exception e) {
35+
throw new RuntimeException("Failed to load client configuration", e);
36+
}
37+
38+
WorkflowServiceStubs service =
39+
WorkflowServiceStubs.newServiceStubs(profile.toWorkflowServiceStubsOptions());
40+
WorkflowClient client = WorkflowClient.newInstance(service, profile.toWorkflowClientOptions());
2641

2742
WorkerFactory factory = WorkerFactory.newInstance(client);
2843
Worker worker = factory.newWorker(TASK_QUEUE);

core/src/main/java/io/temporal/samples/nexus_messaging/callerpattern/handler/HandlerWorker.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,42 @@
11
package io.temporal.samples.nexus_messaging.callerpattern.handler;
22

33
import io.temporal.client.WorkflowClient;
4-
import io.temporal.client.WorkflowClientOptions;
54
import io.temporal.client.WorkflowExecutionAlreadyStarted;
65
import io.temporal.client.WorkflowOptions;
6+
import io.temporal.envconfig.ClientConfigProfile;
7+
import io.temporal.envconfig.LoadClientConfigProfileOptions;
78
import io.temporal.serviceclient.WorkflowServiceStubs;
89
import io.temporal.worker.Worker;
910
import io.temporal.worker.WorkerFactory;
11+
import java.nio.file.Paths;
1012
import org.slf4j.Logger;
1113
import org.slf4j.LoggerFactory;
1214

1315
public class HandlerWorker {
1416
private static final Logger logger = LoggerFactory.getLogger(HandlerWorker.class);
1517

16-
public static final String NAMESPACE = "nexus-messaging-handler-namespace";
18+
static final String CONFIG_PROFILE = "nexus-messaging-handler";
1719
public static final String TASK_QUEUE = "nexus-messaging-handler-task-queue";
1820
static final String USER_ID = "user-1";
1921

2022
public static void main(String[] args) throws InterruptedException {
21-
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
22-
WorkflowClient client =
23-
WorkflowClient.newInstance(
24-
service, WorkflowClientOptions.newBuilder().setNamespace(NAMESPACE).build());
23+
ClientConfigProfile profile;
24+
try {
25+
String configFilePath =
26+
Paths.get(HandlerWorker.class.getResource("/config.toml").toURI()).toString();
27+
profile =
28+
ClientConfigProfile.load(
29+
LoadClientConfigProfileOptions.newBuilder()
30+
.setConfigFilePath(configFilePath)
31+
.setConfigFileProfile(CONFIG_PROFILE)
32+
.build());
33+
} catch (Exception e) {
34+
throw new RuntimeException("Failed to load client configuration", e);
35+
}
36+
37+
WorkflowServiceStubs service =
38+
WorkflowServiceStubs.newServiceStubs(profile.toWorkflowServiceStubsOptions());
39+
WorkflowClient client = WorkflowClient.newInstance(service, profile.toWorkflowClientOptions());
2540

2641
// Start the long-running entity workflow that backs the Nexus service, if not already running.
2742
// Create a workflow ID derived from the given user ID.

core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ temporal operator nexus endpoint create \
3333
--target-task-queue nexus-messaging-handler-task-queue
3434
```
3535

36+
This sample loads connection settings from `ClientConfigProfile`. The
37+
`nexus-messaging-handler` and `nexus-messaging-caller` profiles are defined in
38+
`core/src/main/resources/config.toml`. You can override settings with environment
39+
variables or by editing the TOML file (see the `envconfig` sample for details).
40+
3641
In one terminal, start the handler worker:
3742

3843
```bash

core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/caller/CallerRemoteStarter.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,34 @@
11
package io.temporal.samples.nexus_messaging.ondemandpattern.caller;
22

33
import io.temporal.client.WorkflowClient;
4-
import io.temporal.client.WorkflowClientOptions;
54
import io.temporal.client.WorkflowOptions;
5+
import io.temporal.envconfig.ClientConfigProfile;
6+
import io.temporal.envconfig.LoadClientConfigProfileOptions;
67
import io.temporal.serviceclient.WorkflowServiceStubs;
8+
import java.nio.file.Paths;
79
import java.util.List;
810
import java.util.UUID;
911

1012
public class CallerRemoteStarter {
1113

1214
public static void main(String[] args) {
13-
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
14-
WorkflowClient client =
15-
WorkflowClient.newInstance(
16-
service,
17-
WorkflowClientOptions.newBuilder().setNamespace(CallerRemoteWorker.NAMESPACE).build());
15+
ClientConfigProfile profile;
16+
try {
17+
String configFilePath =
18+
Paths.get(CallerRemoteStarter.class.getResource("/config.toml").toURI()).toString();
19+
profile =
20+
ClientConfigProfile.load(
21+
LoadClientConfigProfileOptions.newBuilder()
22+
.setConfigFilePath(configFilePath)
23+
.setConfigFileProfile(CallerRemoteWorker.CONFIG_PROFILE)
24+
.build());
25+
} catch (Exception e) {
26+
throw new RuntimeException("Failed to load client configuration", e);
27+
}
28+
29+
WorkflowServiceStubs service =
30+
WorkflowServiceStubs.newServiceStubs(profile.toWorkflowServiceStubsOptions());
31+
WorkflowClient client = WorkflowClient.newInstance(service, profile.toWorkflowClientOptions());
1832

1933
CallerRemoteWorkflow workflow =
2034
client.newWorkflowStub(

core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/caller/CallerRemoteWorker.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,43 @@
11
package io.temporal.samples.nexus_messaging.ondemandpattern.caller;
22

33
import io.temporal.client.WorkflowClient;
4-
import io.temporal.client.WorkflowClientOptions;
4+
import io.temporal.envconfig.ClientConfigProfile;
5+
import io.temporal.envconfig.LoadClientConfigProfileOptions;
56
import io.temporal.serviceclient.WorkflowServiceStubs;
67
import io.temporal.worker.Worker;
78
import io.temporal.worker.WorkerFactory;
89
import io.temporal.worker.WorkflowImplementationOptions;
910
import io.temporal.workflow.NexusServiceOptions;
10-
import io.temporal.workflow.Workflow;
11+
import java.nio.file.Paths;
1112
import java.util.Collections;
1213
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
1315

1416
public class CallerRemoteWorker {
15-
private static final Logger logger = Workflow.getLogger(CallerRemoteWorker.class);
17+
private static final Logger logger = LoggerFactory.getLogger(CallerRemoteWorker.class);
1618

17-
public static final String NAMESPACE = "nexus-messaging-caller-namespace";
19+
static final String CONFIG_PROFILE = "nexus-messaging-caller";
1820
public static final String TASK_QUEUE = "nexus-messaging-caller-remote-task-queue";
1921
static final String NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint";
2022

2123
public static void main(String[] args) throws InterruptedException {
22-
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
23-
WorkflowClient client =
24-
WorkflowClient.newInstance(
25-
service, WorkflowClientOptions.newBuilder().setNamespace(NAMESPACE).build());
24+
ClientConfigProfile profile;
25+
try {
26+
String configFilePath =
27+
Paths.get(CallerRemoteWorker.class.getResource("/config.toml").toURI()).toString();
28+
profile =
29+
ClientConfigProfile.load(
30+
LoadClientConfigProfileOptions.newBuilder()
31+
.setConfigFilePath(configFilePath)
32+
.setConfigFileProfile(CONFIG_PROFILE)
33+
.build());
34+
} catch (Exception e) {
35+
throw new RuntimeException("Failed to load client configuration", e);
36+
}
37+
38+
WorkflowServiceStubs service =
39+
WorkflowServiceStubs.newServiceStubs(profile.toWorkflowServiceStubsOptions());
40+
WorkflowClient client = WorkflowClient.newInstance(service, profile.toWorkflowClientOptions());
2641

2742
WorkerFactory factory = WorkerFactory.newInstance(client);
2843
Worker worker = factory.newWorker(TASK_QUEUE);

core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/handler/HandlerWorker.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,39 @@
11
package io.temporal.samples.nexus_messaging.ondemandpattern.handler;
22

33
import io.temporal.client.WorkflowClient;
4-
import io.temporal.client.WorkflowClientOptions;
4+
import io.temporal.envconfig.ClientConfigProfile;
5+
import io.temporal.envconfig.LoadClientConfigProfileOptions;
56
import io.temporal.serviceclient.WorkflowServiceStubs;
67
import io.temporal.worker.Worker;
78
import io.temporal.worker.WorkerFactory;
9+
import java.nio.file.Paths;
810
import org.slf4j.Logger;
911
import org.slf4j.LoggerFactory;
1012

1113
public class HandlerWorker {
1214
private static final Logger logger = LoggerFactory.getLogger(HandlerWorker.class);
1315

14-
public static final String NAMESPACE = "nexus-messaging-handler-namespace";
16+
static final String CONFIG_PROFILE = "nexus-messaging-handler";
1517
public static final String TASK_QUEUE = "nexus-messaging-handler-task-queue";
1618

1719
public static void main(String[] args) throws InterruptedException {
18-
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
19-
WorkflowClient client =
20-
WorkflowClient.newInstance(
21-
service, WorkflowClientOptions.newBuilder().setNamespace(NAMESPACE).build());
20+
ClientConfigProfile profile;
21+
try {
22+
String configFilePath =
23+
Paths.get(HandlerWorker.class.getResource("/config.toml").toURI()).toString();
24+
profile =
25+
ClientConfigProfile.load(
26+
LoadClientConfigProfileOptions.newBuilder()
27+
.setConfigFilePath(configFilePath)
28+
.setConfigFileProfile(CONFIG_PROFILE)
29+
.build());
30+
} catch (Exception e) {
31+
throw new RuntimeException("Failed to load client configuration", e);
32+
}
33+
34+
WorkflowServiceStubs service =
35+
WorkflowServiceStubs.newServiceStubs(profile.toWorkflowServiceStubsOptions());
36+
WorkflowClient client = WorkflowClient.newInstance(service, profile.toWorkflowClientOptions());
2237

2338
WorkerFactory factory = WorkerFactory.newInstance(client);
2439
Worker worker = factory.newWorker(TASK_QUEUE);

core/src/main/java/io/temporal/samples/nexus_messaging/ondemandpattern/handler/NexusRemoteGreetingServiceImpl.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,20 @@ public class NexusRemoteGreetingServiceImpl {
2222
private static final Logger logger =
2323
LoggerFactory.getLogger(NexusRemoteGreetingServiceImpl.class);
2424

25-
private GreetingWorkflow getWorkflowStub(String workflowId) {
25+
static final String WORKFLOW_ID_PREFIX = "GreetingWorkflow_for_";
26+
27+
// This example assumes you might have multiple workflows, one for each user.
28+
// If you had a single workflow for all users, then you could remove the
29+
// getWorkflowId method, remove the user ID from each input, and just
30+
// use the single worflow ID in the getWorkflowStub method below.
31+
public static String getWorkflowId(String userId) {
32+
return WORKFLOW_ID_PREFIX + userId;
33+
}
34+
35+
private GreetingWorkflow getWorkflowStub(String userId) {
2636
return Nexus.getOperationContext()
2737
.getWorkflowClient()
28-
.newWorkflowStub(GreetingWorkflow.class, workflowId);
38+
.newWorkflowStub(GreetingWorkflow.class, getWorkflowId(userId));
2939
}
3040

3141
// Starts a new GreetingWorkflow with the caller-specified workflow ID. This is an async
@@ -34,14 +44,14 @@ private GreetingWorkflow getWorkflowStub(String workflowId) {
3444
public OperationHandler<NexusRemoteGreetingService.RunFromRemoteInput, String> runFromRemote() {
3545
return WorkflowRunOperation.fromWorkflowHandle(
3646
(ctx, details, input) -> {
37-
logger.info("RunFromRemote was received for workflow {}", input.getWorkflowId());
47+
logger.info("RunFromRemote was received for userID {}", input.getUserId());
3848
return WorkflowHandle.fromWorkflowMethod(
3949
Nexus.getOperationContext()
4050
.getWorkflowClient()
4151
.newWorkflowStub(
4252
GreetingWorkflow.class,
4353
WorkflowOptions.newBuilder()
44-
.setWorkflowId(input.getWorkflowId())
54+
.setWorkflowId(getWorkflowId(input.getUserId()))
4555
.setTaskQueue(HandlerWorker.TASK_QUEUE)
4656
.build())
4757
::run);
@@ -55,8 +65,8 @@ public OperationHandler<NexusRemoteGreetingService.RunFromRemoteInput, String> r
5565
getLanguages() {
5666
return OperationHandler.sync(
5767
(ctx, details, input) -> {
58-
logger.info("Query for GetLanguages was received for workflow {}", input.getWorkflowId());
59-
return getWorkflowStub(input.getWorkflowId())
68+
logger.info("Query for GetLanguages was received for userId {}", input.getUserId());
69+
return getWorkflowStub(input.getUserId())
6070
.getLanguages(new GreetingWorkflow.GetLanguagesInput(input.isIncludeUnsupported()));
6171
});
6272
}
@@ -65,8 +75,8 @@ public OperationHandler<NexusRemoteGreetingService.RunFromRemoteInput, String> r
6575
public OperationHandler<NexusRemoteGreetingService.GetLanguageInput, Language> getLanguage() {
6676
return OperationHandler.sync(
6777
(ctx, details, input) -> {
68-
logger.info("Query for GetLanguage was received for workflow {}", input.getWorkflowId());
69-
return getWorkflowStub(input.getWorkflowId()).getLanguage();
78+
logger.info("Query for GetLanguage was received for userId {}", input.getUserId());
79+
return getWorkflowStub(input.getUserId()).getLanguage();
7080
});
7181
}
7282

@@ -75,8 +85,8 @@ public OperationHandler<NexusRemoteGreetingService.GetLanguageInput, Language> g
7585
public OperationHandler<NexusRemoteGreetingService.SetLanguageInput, Language> setLanguage() {
7686
return OperationHandler.sync(
7787
(ctx, details, input) -> {
78-
logger.info("Update for SetLanguage was received for workflow {}", input.getWorkflowId());
79-
return getWorkflowStub(input.getWorkflowId())
88+
logger.info("Update for SetLanguage was received for userId {}", input.getUserId());
89+
return getWorkflowStub(input.getUserId())
8090
.setLanguageUsingActivity(new GreetingWorkflow.SetLanguageInput(input.getLanguage()));
8191
});
8292
}
@@ -87,8 +97,8 @@ public OperationHandler<NexusRemoteGreetingService.SetLanguageInput, Language> s
8797
approve() {
8898
return OperationHandler.sync(
8999
(ctx, details, input) -> {
90-
logger.info("Signal for Approve was received for workflow {}", input.getWorkflowId());
91-
getWorkflowStub(input.getWorkflowId())
100+
logger.info("Signal for Approve was received for userId {}", input.getUserId());
101+
getWorkflowStub(input.getUserId())
92102
.approve(new GreetingWorkflow.ApproveInput(input.getName()));
93103
return new NexusRemoteGreetingService.ApproveOutput();
94104
});

0 commit comments

Comments
 (0)