diff --git a/impl/grpc/pom.xml b/impl/grpc/pom.xml
new file mode 100644
index 000000000..620584030
--- /dev/null
+++ b/impl/grpc/pom.xml
@@ -0,0 +1,48 @@
+
+ 4.0.0
+
+ io.serverlessworkflow
+ serverlessworkflow-impl
+ 8.0.0-SNAPSHOT
+
+ serverlessworkflow-impl-grpc
+ Serverless Workflow :: Impl :: gRPC
+
+
+
+ io.serverlessworkflow
+ serverlessworkflow-impl-core
+
+
+ io.serverlessworkflow
+ serverlessworkflow-api
+
+
+ io.serverlessworkflow
+ serverlessworkflow-impl-jackson
+
+
+ io.grpc
+ grpc-stub
+
+
+ com.google.protobuf
+ protobuf-java
+ 3.25.8
+
+
+ com.google.protobuf
+ protobuf-java-util
+ 3.25.8
+
+
+ com.github.os72
+ protoc-jar
+ ${version.com.github.os72.protoc.jar}
+
+
+ io.grpc
+ grpc-protobuf
+
+
+
\ No newline at end of file
diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContext.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContext.java
new file mode 100644
index 000000000..a111e9dd0
--- /dev/null
+++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContext.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.grpc;
+
+import com.google.protobuf.DescriptorProtos;
+
+public record FileDescriptorContext(
+ DescriptorProtos.FileDescriptorSet fileDescriptorSet, String inputProto) {}
diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContextSupplier.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContextSupplier.java
new file mode 100644
index 000000000..1f416a9a5
--- /dev/null
+++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContextSupplier.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.grpc;
+
+import io.serverlessworkflow.impl.TaskContext;
+import io.serverlessworkflow.impl.WorkflowContext;
+import io.serverlessworkflow.impl.WorkflowModel;
+
+@FunctionalInterface
+public interface FileDescriptorContextSupplier {
+
+ FileDescriptorContext get(
+ WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input);
+}
diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java
new file mode 100644
index 000000000..377ec0ac7
--- /dev/null
+++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.grpc;
+
+import com.github.os72.protocjar.Protoc;
+import com.google.protobuf.DescriptorProtos;
+import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
+
+public class FileDescriptorReader {
+
+ public FileDescriptorContext readDescriptor(ExternalResourceHandler externalResourceHandler) {
+ Path grpcDir =
+ tryCreateTempGrpcDir()
+ .orElseThrow(
+ () -> new IllegalStateException("Could not create temporary gRPC directory"));
+
+ try (InputStream inputStream = externalResourceHandler.open()) {
+
+ Path protoFile = grpcDir.resolve(externalResourceHandler.name());
+
+ Files.copy(inputStream, protoFile);
+
+ Path descriptorOutput = grpcDir.resolve("descriptor.protobin");
+
+ try {
+
+ generateFileDescriptor(grpcDir, protoFile, descriptorOutput);
+
+ DescriptorProtos.FileDescriptorSet fileDescriptorSet =
+ DescriptorProtos.FileDescriptorSet.newBuilder()
+ .mergeFrom(Files.readAllBytes(descriptorOutput))
+ .build();
+
+ return new FileDescriptorContext(fileDescriptorSet, externalResourceHandler.name());
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ "Unable to read external resource handler: " + externalResourceHandler.name(), e);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Optional tryCreateTempGrpcDir() {
+ try {
+ return Optional.of(Files.createTempDirectory("serverless-workflow-"));
+ } catch (IOException e) {
+ throw new UncheckedIOException("Error while creating temporary gRPC directory", e);
+ }
+ }
+
+ /**
+ * Calls protoc binary with --descriptor_set_out= option set.
+ *
+ * @param grpcDir a temporary directory
+ * @param protoFile the .proto file used by protoc to generate the file descriptor
+ * @param descriptorOutput the output directory where the descriptor file will be generated
+ */
+ private static void generateFileDescriptor(Path grpcDir, Path protoFile, Path descriptorOutput) {
+ String[] protocArgs =
+ new String[] {
+ "--include_imports",
+ "--descriptor_set_out=" + descriptorOutput.toAbsolutePath(),
+ "-I",
+ grpcDir.toAbsolutePath().toString(),
+ protoFile.toAbsolutePath().toString()
+ };
+
+ try {
+
+ int status = Protoc.runProtoc(protocArgs);
+
+ // TODO: I need to resolve the protoc file from PATH or from JARs, supporting all OS
+ // systems.
+ // ProcessBuilder processBuilder = new ProcessBuilder(protocArgs);
+ // int status = ScriptUtils.uncheckedStart(processBuilder).waitFor();
+
+ if (status != 0) {
+ throw new RuntimeException(
+ "Unable to generate file descriptor, 'protoc' execution failed with status " + status);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Unable to generate file descriptor", e);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Unable to generate file descriptor", e);
+ }
+ }
+}
diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcCallExecutor.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcCallExecutor.java
new file mode 100644
index 000000000..465d9374b
--- /dev/null
+++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcCallExecutor.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.grpc;
+
+import io.serverlessworkflow.impl.TaskContext;
+import io.serverlessworkflow.impl.WorkflowContext;
+import io.serverlessworkflow.impl.WorkflowModel;
+
+@FunctionalInterface
+public interface GrpcCallExecutor {
+
+ WorkflowModel apply(
+ FileDescriptorContext fileDescriptorContext,
+ GrpcRequestContext requestContext,
+ WorkflowContext workflowContext,
+ TaskContext taskContext,
+ WorkflowModel model);
+}
diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java
new file mode 100644
index 000000000..c13f5de5a
--- /dev/null
+++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.grpc;
+
+import io.grpc.Channel;
+import io.grpc.ManagedChannelBuilder;
+import io.serverlessworkflow.impl.TaskContext;
+import io.serverlessworkflow.impl.WorkflowApplication;
+import io.serverlessworkflow.impl.WorkflowContext;
+
+public class GrpcChannelResolver {
+
+ public static final String GRPC_CHANNEL_PROVIDER = "grpcChannelProvider";
+
+ public static Channel channel(
+ WorkflowContext workflowContext,
+ TaskContext taskContext,
+ GrpcRequestContext grpcRequestContext) {
+ WorkflowApplication appl = workflowContext.definition().application();
+ return appl.additionalObject(GRPC_CHANNEL_PROVIDER, workflowContext, taskContext)
+ .orElseGet(
+ () ->
+ ManagedChannelBuilder.forAddress(
+ grpcRequestContext.address(), grpcRequestContext.port())
+ .usePlaintext()
+ .build());
+ }
+}
diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java
new file mode 100644
index 000000000..11605b498
--- /dev/null
+++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.grpc;
+
+import io.serverlessworkflow.impl.TaskContext;
+import io.serverlessworkflow.impl.WorkflowContext;
+import io.serverlessworkflow.impl.WorkflowModel;
+import io.serverlessworkflow.impl.executors.CallableTask;
+import java.util.concurrent.CompletableFuture;
+
+public class GrpcExecutor implements CallableTask {
+
+ private final GrpcRequestContext requestContext;
+ private final GrpcCallExecutor grpcCallExecutor;
+ private final FileDescriptorContextSupplier fileDescriptorContextSupplier;
+
+ public GrpcExecutor(
+ GrpcRequestContext builder,
+ GrpcCallExecutor grpcCallExecutor,
+ FileDescriptorContextSupplier fileDescriptorContextSupplier) {
+ this.requestContext = builder;
+ this.grpcCallExecutor = grpcCallExecutor;
+ this.fileDescriptorContextSupplier = fileDescriptorContextSupplier;
+ }
+
+ @Override
+ public CompletableFuture apply(
+ WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
+
+ FileDescriptorContext fileDescriptorContext =
+ this.fileDescriptorContextSupplier.get(workflowContext, taskContext, input);
+
+ return CompletableFuture.supplyAsync(
+ () ->
+ this.grpcCallExecutor.apply(
+ fileDescriptorContext, requestContext, workflowContext, taskContext, input));
+ }
+}
diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java
new file mode 100644
index 000000000..c58ca0d0d
--- /dev/null
+++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java
@@ -0,0 +1,241 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.grpc;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
+import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.MethodDescriptor;
+import io.grpc.protobuf.ProtoUtils;
+import io.grpc.stub.ClientCalls;
+import io.serverlessworkflow.api.types.CallGRPC;
+import io.serverlessworkflow.api.types.ExternalResource;
+import io.serverlessworkflow.api.types.GRPCArguments;
+import io.serverlessworkflow.api.types.TaskBase;
+import io.serverlessworkflow.api.types.WithGRPCService;
+import io.serverlessworkflow.impl.TaskContext;
+import io.serverlessworkflow.impl.WorkflowContext;
+import io.serverlessworkflow.impl.WorkflowDefinition;
+import io.serverlessworkflow.impl.WorkflowModel;
+import io.serverlessworkflow.impl.WorkflowMutablePosition;
+import io.serverlessworkflow.impl.WorkflowValueResolver;
+import io.serverlessworkflow.impl.executors.CallableTask;
+import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
+import io.serverlessworkflow.impl.jackson.JsonUtils;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GrpcExecutorBuilder implements CallableTaskBuilder {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(GrpcExecutorBuilder.class);
+
+ private ExternalResource proto;
+ private GrpcRequestContext grpcRequestContext;
+ private Map arguments;
+ private GrpcCallExecutor callExecutor;
+ private FileDescriptorContextSupplier fileDescriptorContextSupplier;
+ private WorkflowValueResolver protoUriSupplier;
+
+ @Override
+ public boolean accept(Class extends TaskBase> clazz) {
+ return clazz.equals(CallGRPC.class);
+ }
+
+ @Override
+ public void init(CallGRPC task, WorkflowDefinition definition, WorkflowMutablePosition position) {
+
+ GRPCArguments with = task.getWith();
+ WithGRPCService service = with.getService();
+ this.proto = with.getProto();
+
+ this.arguments =
+ with.getArguments() != null && with.getArguments().getAdditionalProperties() != null
+ ? with.getArguments().getAdditionalProperties()
+ : Map.of();
+
+ this.grpcRequestContext =
+ new GrpcRequestContext(
+ service.getHost(), service.getPort(), with.getMethod(), service.getName(), arguments);
+
+ FileDescriptorReader fileDescriptorReader = new FileDescriptorReader();
+
+ this.fileDescriptorContextSupplier =
+ (WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel workflowModel) ->
+ definition
+ .resourceLoader()
+ .load(
+ with.getProto().getEndpoint(),
+ fileDescriptorReader::readDescriptor,
+ workflowContext,
+ taskContext,
+ workflowModel);
+ this.callExecutor =
+ (fileDescriptorContext, requestContext, workflowContext, taskContext, model) -> {
+ Channel channel =
+ GrpcChannelResolver.channel(workflowContext, taskContext, this.grpcRequestContext);
+ String protoName = fileDescriptorContext.inputProto();
+
+ DescriptorProtos.FileDescriptorProto fileDescriptorProto =
+ fileDescriptorContext.fileDescriptorSet().getFileList().stream()
+ .filter(
+ file ->
+ file.getName()
+ .equals(
+ this.proto.getName() != null ? this.proto.getName() : protoName))
+ .findFirst()
+ .orElseThrow(
+ () -> new IllegalStateException("Proto file not found in descriptor set"));
+
+ try {
+ Descriptors.FileDescriptor fileDescriptor =
+ Descriptors.FileDescriptor.buildFrom(
+ fileDescriptorProto, new Descriptors.FileDescriptor[] {});
+ Descriptors.ServiceDescriptor serviceDescriptor =
+ fileDescriptor.findServiceByName(this.grpcRequestContext.service());
+
+ Objects.requireNonNull(
+ serviceDescriptor, "Service not found: " + this.grpcRequestContext.service());
+
+ Descriptors.MethodDescriptor methodDescriptor =
+ serviceDescriptor.findMethodByName(this.grpcRequestContext.method());
+
+ Objects.requireNonNull(
+ methodDescriptor, "Method not found: " + this.grpcRequestContext.method());
+
+ MethodDescriptor.MethodType methodType =
+ ProtobufMessageUtils.getMethodType(methodDescriptor);
+
+ ClientCall call =
+ channel.newCall(
+ io.grpc.MethodDescriptor.newBuilder()
+ .setType(methodType)
+ .setFullMethodName(
+ io.grpc.MethodDescriptor.generateFullMethodName(
+ serviceDescriptor.getFullName(), methodDescriptor.getName()))
+ .setRequestMarshaller(
+ ProtoUtils.marshaller(
+ DynamicMessage.newBuilder(methodDescriptor.getInputType())
+ .buildPartial()))
+ .setResponseMarshaller(
+ ProtoUtils.marshaller(
+ DynamicMessage.newBuilder(methodDescriptor.getOutputType())
+ .buildPartial()))
+ .build(),
+ CallOptions.DEFAULT.withWaitForReady());
+
+ return switch (methodType) {
+ case CLIENT_STREAMING ->
+ handleClientStreaming(workflowContext, arguments, methodDescriptor, call);
+ case BIDI_STREAMING ->
+ handleBidiStreaming(workflowContext, arguments, methodDescriptor, call);
+ case SERVER_STREAMING ->
+ handleServerStreaming(workflowContext, methodDescriptor, arguments, call);
+ case UNARY, UNKNOWN ->
+ handleBlockingUnary(workflowContext, methodDescriptor, arguments, call);
+ };
+
+ } catch (Descriptors.DescriptorValidationException
+ | InvalidProtocolBufferException
+ | JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ }
+
+ private static WorkflowModel handleClientStreaming(
+ WorkflowContext workflowContext,
+ Map parameters,
+ Descriptors.MethodDescriptor methodDescriptor,
+ ClientCall call) {
+ JsonNode jsonNode =
+ ProtobufMessageUtils.asyncStreamingCall(
+ parameters,
+ methodDescriptor,
+ responseObserver -> ClientCalls.asyncClientStreamingCall(call, responseObserver),
+ nodes -> nodes.isEmpty() ? NullNode.instance : nodes.get(0));
+ return workflowContext.definition().application().modelFactory().fromAny(jsonNode);
+ }
+
+ private static WorkflowModel handleServerStreaming(
+ WorkflowContext workflowContext,
+ Descriptors.MethodDescriptor methodDescriptor,
+ Map parameters,
+ ClientCall call)
+ throws InvalidProtocolBufferException, JsonProcessingException {
+ Message.Builder builder = ProtobufMessageUtils.buildMessage(methodDescriptor, parameters);
+ List nodes = new ArrayList<>();
+ ClientCalls.blockingServerStreamingCall(call, builder.build())
+ .forEachRemaining(message -> nodes.add(ProtobufMessageUtils.convert(message)));
+ return workflowContext.definition().application().modelFactory().fromAny(nodes);
+ }
+
+ private static WorkflowModel handleBlockingUnary(
+ WorkflowContext workflowContext,
+ Descriptors.MethodDescriptor methodDescriptor,
+ Map parameters,
+ ClientCall call)
+ throws InvalidProtocolBufferException, JsonProcessingException {
+ Message.Builder builder = ProtobufMessageUtils.buildMessage(methodDescriptor, parameters);
+
+ Message message = ClientCalls.blockingUnaryCall(call, builder.build());
+ return workflowContext
+ .definition()
+ .application()
+ .modelFactory()
+ .fromAny(ProtobufMessageUtils.convert(message));
+ }
+
+ private static WorkflowModel handleBidiStreaming(
+ WorkflowContext workflowContext,
+ Map parameters,
+ Descriptors.MethodDescriptor methodDescriptor,
+ ClientCall call) {
+ return workflowContext
+ .definition()
+ .application()
+ .modelFactory()
+ .fromAny(
+ ProtobufMessageUtils.asyncStreamingCall(
+ parameters,
+ methodDescriptor,
+ responseObserver -> ClientCalls.asyncBidiStreamingCall(call, responseObserver),
+ v -> {
+ Collection nodes = v;
+ List list = new ArrayList<>(nodes);
+ return JsonUtils.fromValue(list);
+ }));
+ }
+
+ @Override
+ public CallableTask build() {
+ return new GrpcExecutor(
+ this.grpcRequestContext, this.callExecutor, this.fileDescriptorContextSupplier);
+ }
+}
diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java
new file mode 100644
index 000000000..84ff555d9
--- /dev/null
+++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.grpc;
+
+import java.util.Map;
+
+public record GrpcRequestContext(
+ String address, int port, String method, String service, Map parameters) {}
diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java
new file mode 100644
index 000000000..384b449c9
--- /dev/null
+++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.grpc;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.util.JsonFormat;
+import io.grpc.MethodDescriptor;
+import io.grpc.stub.StreamObserver;
+import io.serverlessworkflow.api.WorkflowFormat;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+
+public interface ProtobufMessageUtils {
+
+ static JsonNode convert(Message message) {
+ StringBuilder str = new StringBuilder();
+ try {
+ JsonFormat.printer().appendTo(message, str);
+ return WorkflowFormat.JSON.mapper().readTree(str.toString());
+ } catch (IOException e) {
+ throw new UncheckedIOException("Error converting protobuf message to JSON", e);
+ }
+ }
+
+ static MethodDescriptor.MethodType getMethodType(
+ com.google.protobuf.Descriptors.MethodDescriptor methodDesc) {
+ DescriptorProtos.MethodDescriptorProto methodDescProto = methodDesc.toProto();
+ if (methodDescProto.getClientStreaming()) {
+ if (methodDescProto.getServerStreaming()) {
+ return MethodDescriptor.MethodType.BIDI_STREAMING;
+ }
+ return MethodDescriptor.MethodType.CLIENT_STREAMING;
+ } else if (methodDescProto.getServerStreaming()) {
+ return MethodDescriptor.MethodType.SERVER_STREAMING;
+ } else {
+ return MethodDescriptor.MethodType.UNARY;
+ }
+ }
+
+ static JsonNode asyncStreamingCall(
+ Map parameters,
+ com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor,
+ UnaryOperator> streamObserverFunction,
+ Function, JsonNode> nodesFunction) {
+ WaitingStreamObserver responseObserver = new WaitingStreamObserver();
+ StreamObserver requestObserver = streamObserverFunction.apply(responseObserver);
+
+ for (var entry : parameters.entrySet()) {
+ try {
+ Message message =
+ buildMessage(entry, DynamicMessage.newBuilder(methodDescriptor.getInputType())).build();
+ requestObserver.onNext(message);
+ } catch (Exception e) {
+ requestObserver.onError(e);
+ throw new RuntimeException(e);
+ }
+ responseObserver.checkForServerStreamErrors();
+ }
+ requestObserver.onCompleted();
+
+ return nodesFunction.apply(
+ responseObserver.get().stream()
+ .map(ProtobufMessageUtils::convert)
+ .collect(Collectors.toList()));
+ }
+
+ static Message.Builder buildMessage(Object object, Message.Builder builder)
+ throws InvalidProtocolBufferException, JsonProcessingException {
+ JsonFormat.parser().merge(WorkflowFormat.JSON.mapper().writeValueAsString(object), builder);
+ return builder;
+ }
+
+ static Message.Builder buildMessage(
+ Descriptors.MethodDescriptor methodDescriptor, Map parameters)
+ throws InvalidProtocolBufferException, JsonProcessingException {
+ DynamicMessage.Builder builder = DynamicMessage.newBuilder(methodDescriptor.getInputType());
+ JsonFormat.parser().merge(WorkflowFormat.JSON.mapper().writeValueAsString(parameters), builder);
+ return builder;
+ }
+}
diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/WaitingStreamObserver.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/WaitingStreamObserver.java
new file mode 100644
index 000000000..fe73d5cd3
--- /dev/null
+++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/WaitingStreamObserver.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.grpc;
+
+import com.google.protobuf.Message;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WaitingStreamObserver implements StreamObserver {
+
+ List responses = new ArrayList<>();
+ CompletableFuture> responsesFuture = new CompletableFuture<>();
+
+ @Override
+ public void onNext(Message messageReply) {
+ responses.add(messageReply);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ responsesFuture.completeExceptionally(throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ responsesFuture.complete(responses);
+ }
+
+ public List get() {
+ int defaultTimeout = 10000;
+
+ try {
+ return responsesFuture.get(defaultTimeout, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ } catch (TimeoutException e) {
+ throw new IllegalStateException(
+ String.format("gRPC call timed out after %d seconds", defaultTimeout), e);
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(getServerStreamErrorMessage(e.getCause()), e.getCause());
+ }
+ }
+
+ public void checkForServerStreamErrors() {
+ if (responsesFuture.isCompletedExceptionally()) {
+ try {
+ responsesFuture.join();
+ } catch (CompletionException e) {
+ throw new IllegalStateException(getServerStreamErrorMessage(e.getCause()), e.getCause());
+ }
+ }
+ }
+
+ private String getServerStreamErrorMessage(Throwable throwable) {
+ return String.format(
+ "Received an error through gRPC server stream with status: %s",
+ Status.fromThrowable(throwable));
+ }
+}
diff --git a/impl/grpc/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder b/impl/grpc/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder
new file mode 100644
index 000000000..6acd9ba63
--- /dev/null
+++ b/impl/grpc/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder
@@ -0,0 +1 @@
+io.serverlessworkflow.impl.executors.grpc.GrpcExecutorBuilder
\ No newline at end of file
diff --git a/impl/pom.xml b/impl/pom.xml
index 7c4f17133..166f96f0f 100644
--- a/impl/pom.xml
+++ b/impl/pom.xml
@@ -17,6 +17,7 @@
9.2.1
3.7.0
25.0.1
+ 3.11.4
@@ -105,6 +106,11 @@
serverlessworkflow-impl-container
${project.version}
+
+ io.serverlessworkflow
+ serverlessworkflow-impl-grpc
+ ${project.version}
+
net.thisptr
jackson-jq
@@ -193,5 +199,6 @@
test
javascript
python
+ grpc
diff --git a/impl/test/pom.xml b/impl/test/pom.xml
index fd47aae9c..2363c8d72 100644
--- a/impl/test/pom.xml
+++ b/impl/test/pom.xml
@@ -53,6 +53,10 @@
io.serverlessworkflow
serverlessworkflow-impl-container
+
+ io.serverlessworkflow
+ serverlessworkflow-impl-grpc
+
org.glassfish.jersey.core
jersey-client
@@ -85,6 +89,12 @@
org.awaitility
awaitility
+
+
+ io.grpc
+ grpc-netty
+ test
+
@@ -95,7 +105,32 @@
+
+
+ kr.motd.maven
+ os-maven-plugin
+ 1.7.1
+
+
+
+ org.xolstice.maven.plugins
+ protobuf-maven-plugin
+ 0.6.1
+
+ com.google.protobuf:protoc:3.25.8:exe:${os.detected.classifier}
+ grpc-java
+ io.grpc:protoc-gen-grpc-java:${version.io.grpc.java}:exe:${os.detected.classifier}
+
+
+
+
+ test-compile
+ test-compile-custom
+
+
+
+
maven-jar-plugin
diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java
new file mode 100644
index 000000000..175c6cc93
--- /dev/null
+++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.test.grpc;
+
+import io.grpc.MethodDescriptor;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.serverlessworkflow.api.WorkflowReader;
+import io.serverlessworkflow.api.types.Workflow;
+import io.serverlessworkflow.impl.WorkflowApplication;
+import io.serverlessworkflow.impl.WorkflowDefinition;
+import io.serverlessworkflow.impl.executors.grpc.contributors.Contributors;
+import io.serverlessworkflow.impl.executors.grpc.contributors.ContributorsAPIGrpc;
+import io.serverlessworkflow.impl.test.grpc.handlers.ContributorHandler;
+import java.io.IOException;
+import java.util.Map;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import person.Person;
+import person.PersonAPIGrpc;
+
+public class GrpcClientStreamingTest {
+
+ private static final int PORT_FOR_EXAMPLES = 5011;
+ private static WorkflowApplication app;
+ private static Server server;
+
+ @BeforeAll
+ static void setUpApp() throws IOException {
+ server = ServerBuilder.forPort(PORT_FOR_EXAMPLES).addService(new ContributorHandler()).build();
+ server.start();
+
+ app = WorkflowApplication.builder().build();
+ }
+
+ @AfterEach
+ void cleanup() throws InterruptedException {
+ server.shutdown().awaitTermination();
+ }
+
+ @Test
+ void grpcPerson() throws IOException {
+
+ MethodDescriptor
+ createContributorMethod = ContributorsAPIGrpc.getCreateContributorMethod();
+
+ MethodDescriptor getPersonMethod =
+ PersonAPIGrpc.getGetPersonMethod();
+
+ Workflow workflow =
+ WorkflowReader.readWorkflowFromClasspath(
+ "workflows-samples/grpc/contributors-client-stream-call.yaml");
+
+ WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow);
+
+ String filename =
+ getClass()
+ .getClassLoader()
+ .getResource("workflows-samples/grpc/proto/contributors.proto")
+ .getFile();
+
+ Map output =
+ workflowDefinition
+ .instance(Map.of("protoFilePath", "file://" + filename))
+ .start()
+ .join()
+ .asMap()
+ .orElseThrow();
+
+ Assertions.assertThat(output)
+ .contains(Map.entry("message", "dependabot[bot] has 1 contributions"));
+ }
+}
diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcTestUnaryCallTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcTestUnaryCallTest.java
new file mode 100644
index 000000000..365c98178
--- /dev/null
+++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcTestUnaryCallTest.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.test.grpc;
+
+import io.grpc.MethodDescriptor;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.serverlessworkflow.api.WorkflowReader;
+import io.serverlessworkflow.api.types.Workflow;
+import io.serverlessworkflow.impl.WorkflowApplication;
+import io.serverlessworkflow.impl.WorkflowDefinition;
+import java.io.IOException;
+import java.util.Map;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import person.Person;
+import person.PersonAPIGrpc;
+
+public class GrpcTestUnaryCallTest {
+
+ private static final int PORT_FOR_EXAMPLES = 5011;
+ private static WorkflowApplication app;
+ private static Server server;
+
+ @BeforeAll
+ static void setUpApp() throws IOException {
+ server =
+ ServerBuilder.forPort(PORT_FOR_EXAMPLES).addService(new PersonServiceHandler()).build();
+ server.start();
+
+ app = WorkflowApplication.builder().build();
+ }
+
+ @AfterEach
+ void cleanup() throws InterruptedException {
+ server.shutdown().awaitTermination();
+ }
+
+ @Test
+ void grpcPerson() throws IOException {
+ MethodDescriptor getPersonMethod =
+ PersonAPIGrpc.getGetPersonMethod();
+
+ Workflow workflow =
+ WorkflowReader.readWorkflowFromClasspath("workflows-samples/grpc/get-user-call.yaml");
+
+ WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow);
+
+ String filename =
+ getClass()
+ .getClassLoader()
+ .getResource("workflows-samples/grpc/proto/person.proto")
+ .getFile();
+
+ Map output =
+ workflowDefinition
+ .instance(Map.of("protoFilePath", "file://" + filename))
+ .start()
+ .join()
+ .asMap()
+ .orElseThrow();
+
+ Assertions.assertThat(output).contains(Map.entry("name", "John Doe"), Map.entry("id", 891182));
+ }
+}
diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/PersonServiceHandler.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/PersonServiceHandler.java
new file mode 100644
index 000000000..6404805d3
--- /dev/null
+++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/PersonServiceHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.test.grpc;
+
+import io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import person.Person;
+import person.PersonAPIGrpc;
+
+public class PersonServiceHandler extends PersonAPIGrpc.PersonAPIImplBase {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PersonServiceHandler.class);
+
+ @Override
+ public void getPerson(
+ Person.GetPersonRequest request, StreamObserver responseObserver) {
+ LOGGER.info("Receiving GetPersonRequest request: {}", request.toByteString());
+
+ responseObserver.onNext(
+ Person.GetPersonResponse.newBuilder().setId(891182).setName("John Doe").build());
+ responseObserver.onCompleted();
+ }
+}
diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorHandler.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorHandler.java
new file mode 100644
index 000000000..d27523152
--- /dev/null
+++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorHandler.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.test.grpc.handlers;
+
+import io.grpc.stub.StreamObserver;
+import io.serverlessworkflow.impl.executors.grpc.contributors.Contributors;
+import io.serverlessworkflow.impl.executors.grpc.contributors.ContributorsAPIGrpc;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.LongAdder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ContributorHandler extends ContributorsAPIGrpc.ContributorsAPIImplBase {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ContributorHandler.class);
+
+ private static final Map githubs = new ConcurrentHashMap<>();
+
+ @Override
+ public StreamObserver createContributor(
+ StreamObserver responseObserver) {
+
+ return new StreamObserver<>() {
+ @Override
+ public void onNext(Contributors.AddContributionRequest value) {
+ String github = value.getGithub();
+ LOGGER.info("Receiving a new contribution for: {}", value.getGithub());
+ githubs.compute(
+ github,
+ (key, counter) -> {
+ if (counter == null) {
+ LongAdder longAdder = new LongAdder();
+ longAdder.increment();
+ return longAdder;
+ }
+ counter.increment();
+ return counter;
+ });
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ LOGGER.error("Ok, ignore it", t);
+ }
+
+ @Override
+ public void onCompleted() {
+ StringBuilder stringBuilder = new StringBuilder();
+ Set> entries = githubs.entrySet();
+ for (Map.Entry entry : entries) {
+ stringBuilder
+ .append(entry.getKey())
+ .append(" has ")
+ .append(entry.getValue())
+ .append(" contributions");
+ }
+ responseObserver.onNext(
+ Contributors.AddContributionResponse.newBuilder()
+ .setMessage(stringBuilder.toString())
+ .build());
+ responseObserver.onCompleted();
+ }
+ };
+ }
+}
diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/PersonHandler.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/PersonHandler.java
new file mode 100644
index 000000000..dbe655e42
--- /dev/null
+++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/PersonHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.test.grpc.handlers;
+
+import io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import person.Person;
+import person.PersonAPIGrpc;
+
+public class PersonHandler extends PersonAPIGrpc.PersonAPIImplBase {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PersonHandler.class);
+
+ @Override
+ public void getPerson(
+ Person.GetPersonRequest request, StreamObserver responseObserver) {
+ LOGGER.info("Receiving GetPersonRequest request: {}", request.toByteString());
+
+ responseObserver.onNext(
+ Person.GetPersonResponse.newBuilder().setId(891182).setName("John Doe").build());
+ responseObserver.onCompleted();
+ }
+}
diff --git a/impl/test/src/test/proto/contributors.proto b/impl/test/src/test/proto/contributors.proto
new file mode 100644
index 000000000..1e8f5f642
--- /dev/null
+++ b/impl/test/src/test/proto/contributors.proto
@@ -0,0 +1,17 @@
+syntax = "proto2";
+
+package io.serverlessworkflow.impl.executors.grpc.contributors;
+
+message AddContributionRequest {
+ required string github = 1;
+}
+
+message AddContributionResponse {
+ required string message = 1;
+}
+
+// client stream
+service ContributorsAPI {
+ rpc CreateContributor(stream AddContributionRequest) returns (AddContributionResponse) {}
+}
+
diff --git a/impl/test/src/test/proto/person.proto b/impl/test/src/test/proto/person.proto
new file mode 100644
index 000000000..b84644d2b
--- /dev/null
+++ b/impl/test/src/test/proto/person.proto
@@ -0,0 +1,14 @@
+syntax = "proto2";
+
+package person;
+
+message GetPersonRequest {}
+
+message GetPersonResponse {
+ required string name = 1;
+ required int32 id = 2;
+}
+
+service PersonAPI {
+ rpc GetPerson(GetPersonRequest) returns (GetPersonResponse);
+}
\ No newline at end of file
diff --git a/impl/test/src/test/resources/workflows-samples/grpc/contributors-client-stream-call.yaml b/impl/test/src/test/resources/workflows-samples/grpc/contributors-client-stream-call.yaml
new file mode 100644
index 000000000..7d12ef105
--- /dev/null
+++ b/impl/test/src/test/resources/workflows-samples/grpc/contributors-client-stream-call.yaml
@@ -0,0 +1,18 @@
+document:
+ dsl: '1.0.2'
+ namespace: test
+ name: grpc-example
+ version: '0.1.0'
+do:
+ - greet:
+ call: grpc
+ with:
+ proto:
+ endpoint: ${ .protoFilePath }
+ service:
+ name: ContributorsAPI
+ host: localhost
+ port: 5011
+ method: CreateContributor
+ arguments:
+ github: dependabot[bot]
\ No newline at end of file
diff --git a/impl/test/src/test/resources/workflows-samples/grpc/get-user-call.yaml b/impl/test/src/test/resources/workflows-samples/grpc/get-user-call.yaml
new file mode 100644
index 000000000..39742806a
--- /dev/null
+++ b/impl/test/src/test/resources/workflows-samples/grpc/get-user-call.yaml
@@ -0,0 +1,16 @@
+document:
+ dsl: '1.0.2'
+ namespace: test
+ name: grpc-example
+ version: '0.1.0'
+do:
+ - greet:
+ call: grpc
+ with:
+ proto:
+ endpoint: ${ .protoFilePath }
+ service:
+ name: PersonAPI
+ host: localhost
+ port: 5011
+ method: GetPerson
\ No newline at end of file
diff --git a/impl/test/src/test/resources/workflows-samples/grpc/proto/contributors.proto b/impl/test/src/test/resources/workflows-samples/grpc/proto/contributors.proto
new file mode 100644
index 000000000..1e8f5f642
--- /dev/null
+++ b/impl/test/src/test/resources/workflows-samples/grpc/proto/contributors.proto
@@ -0,0 +1,17 @@
+syntax = "proto2";
+
+package io.serverlessworkflow.impl.executors.grpc.contributors;
+
+message AddContributionRequest {
+ required string github = 1;
+}
+
+message AddContributionResponse {
+ required string message = 1;
+}
+
+// client stream
+service ContributorsAPI {
+ rpc CreateContributor(stream AddContributionRequest) returns (AddContributionResponse) {}
+}
+
diff --git a/impl/test/src/test/resources/workflows-samples/grpc/proto/person.proto b/impl/test/src/test/resources/workflows-samples/grpc/proto/person.proto
new file mode 100644
index 000000000..3fde53e06
--- /dev/null
+++ b/impl/test/src/test/resources/workflows-samples/grpc/proto/person.proto
@@ -0,0 +1,19 @@
+// add a service name called OpenAPI.syntax = "proto2";
+
+syntax = "proto2";
+
+package person;
+
+message GetPersonRequest {
+
+}
+
+message GetPersonResponse {
+ required string name = 1;
+ required int32 id = 2;
+}
+// create a service called PersonAPI.Person
+
+service PersonAPI {
+ rpc GetPerson(GetPersonRequest) returns (GetPersonResponse);
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index cf156e592..997808529 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,6 +65,7 @@
3.6.2
3.5.4
3.2.8
+ 1.78.0
3.5.0
${java.version}
1.2.2
@@ -83,6 +84,7 @@
1.5.23
2.20.1
2.20
+ 4.32.1
2.0.0
5.3.2
4.0.1
@@ -203,6 +205,27 @@
jakarta.validation-api
${version.jakarta.validation}
+
+ io.grpc
+ grpc-netty
+ ${version.io.grpc.java}
+ runtime
+
+
+ io.grpc
+ grpc-protobuf
+ ${version.io.grpc.java}
+
+
+ io.grpc
+ grpc-stub
+ ${version.io.grpc.java}
+
+
+ com.google.protobuf
+ protobuf-java
+ ${version.com.google.protobuf.java}
+