diff --git a/impl/grpc/pom.xml b/impl/grpc/pom.xml new file mode 100644 index 000000000..620584030 --- /dev/null +++ b/impl/grpc/pom.xml @@ -0,0 +1,48 @@ + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-impl + 8.0.0-SNAPSHOT + + serverlessworkflow-impl-grpc + Serverless Workflow :: Impl :: gRPC + + + + io.serverlessworkflow + serverlessworkflow-impl-core + + + io.serverlessworkflow + serverlessworkflow-api + + + io.serverlessworkflow + serverlessworkflow-impl-jackson + + + io.grpc + grpc-stub + + + com.google.protobuf + protobuf-java + 3.25.8 + + + com.google.protobuf + protobuf-java-util + 3.25.8 + + + com.github.os72 + protoc-jar + ${version.com.github.os72.protoc.jar} + + + io.grpc + grpc-protobuf + + + \ No newline at end of file diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContext.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContext.java new file mode 100644 index 000000000..a111e9dd0 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContext.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import com.google.protobuf.DescriptorProtos; + +public record FileDescriptorContext( + DescriptorProtos.FileDescriptorSet fileDescriptorSet, String inputProto) {} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContextSupplier.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContextSupplier.java new file mode 100644 index 000000000..1f416a9a5 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorContextSupplier.java @@ -0,0 +1,27 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; + +@FunctionalInterface +public interface FileDescriptorContextSupplier { + + FileDescriptorContext get( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input); +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java new file mode 100644 index 000000000..377ec0ac7 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java @@ -0,0 +1,109 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import com.github.os72.protocjar.Protoc; +import com.google.protobuf.DescriptorProtos; +import io.serverlessworkflow.impl.resources.ExternalResourceHandler; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; + +public class FileDescriptorReader { + + public FileDescriptorContext readDescriptor(ExternalResourceHandler externalResourceHandler) { + Path grpcDir = + tryCreateTempGrpcDir() + .orElseThrow( + () -> new IllegalStateException("Could not create temporary gRPC directory")); + + try (InputStream inputStream = externalResourceHandler.open()) { + + Path protoFile = grpcDir.resolve(externalResourceHandler.name()); + + Files.copy(inputStream, protoFile); + + Path descriptorOutput = grpcDir.resolve("descriptor.protobin"); + + try { + + generateFileDescriptor(grpcDir, protoFile, descriptorOutput); + + DescriptorProtos.FileDescriptorSet fileDescriptorSet = + DescriptorProtos.FileDescriptorSet.newBuilder() + .mergeFrom(Files.readAllBytes(descriptorOutput)) + .build(); + + return new FileDescriptorContext(fileDescriptorSet, externalResourceHandler.name()); + + } catch (IOException e) { + throw new UncheckedIOException( + "Unable to read external resource handler: " + externalResourceHandler.name(), e); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private Optional tryCreateTempGrpcDir() { + try { + return Optional.of(Files.createTempDirectory("serverless-workflow-")); + } catch (IOException e) { + throw new UncheckedIOException("Error while creating temporary gRPC directory", e); + } + } + + /** + * Calls protoc binary with --descriptor_set_out= option set. + * + * @param grpcDir a temporary directory + * @param protoFile the .proto file used by protoc to generate the file descriptor + * @param descriptorOutput the output directory where the descriptor file will be generated + */ + private static void generateFileDescriptor(Path grpcDir, Path protoFile, Path descriptorOutput) { + String[] protocArgs = + new String[] { + "--include_imports", + "--descriptor_set_out=" + descriptorOutput.toAbsolutePath(), + "-I", + grpcDir.toAbsolutePath().toString(), + protoFile.toAbsolutePath().toString() + }; + + try { + + int status = Protoc.runProtoc(protocArgs); + + // TODO: I need to resolve the protoc file from PATH or from JARs, supporting all OS + // systems. + // ProcessBuilder processBuilder = new ProcessBuilder(protocArgs); + // int status = ScriptUtils.uncheckedStart(processBuilder).waitFor(); + + if (status != 0) { + throw new RuntimeException( + "Unable to generate file descriptor, 'protoc' execution failed with status " + status); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Unable to generate file descriptor", e); + } catch (IOException e) { + throw new UncheckedIOException("Unable to generate file descriptor", e); + } + } +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcCallExecutor.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcCallExecutor.java new file mode 100644 index 000000000..465d9374b --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcCallExecutor.java @@ -0,0 +1,31 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; + +@FunctionalInterface +public interface GrpcCallExecutor { + + WorkflowModel apply( + FileDescriptorContext fileDescriptorContext, + GrpcRequestContext requestContext, + WorkflowContext workflowContext, + TaskContext taskContext, + WorkflowModel model); +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java new file mode 100644 index 000000000..c13f5de5a --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcChannelResolver.java @@ -0,0 +1,41 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import io.grpc.Channel; +import io.grpc.ManagedChannelBuilder; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowContext; + +public class GrpcChannelResolver { + + public static final String GRPC_CHANNEL_PROVIDER = "grpcChannelProvider"; + + public static Channel channel( + WorkflowContext workflowContext, + TaskContext taskContext, + GrpcRequestContext grpcRequestContext) { + WorkflowApplication appl = workflowContext.definition().application(); + return appl.additionalObject(GRPC_CHANNEL_PROVIDER, workflowContext, taskContext) + .orElseGet( + () -> + ManagedChannelBuilder.forAddress( + grpcRequestContext.address(), grpcRequestContext.port()) + .usePlaintext() + .build()); + } +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java new file mode 100644 index 000000000..11605b498 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java @@ -0,0 +1,51 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.executors.CallableTask; +import java.util.concurrent.CompletableFuture; + +public class GrpcExecutor implements CallableTask { + + private final GrpcRequestContext requestContext; + private final GrpcCallExecutor grpcCallExecutor; + private final FileDescriptorContextSupplier fileDescriptorContextSupplier; + + public GrpcExecutor( + GrpcRequestContext builder, + GrpcCallExecutor grpcCallExecutor, + FileDescriptorContextSupplier fileDescriptorContextSupplier) { + this.requestContext = builder; + this.grpcCallExecutor = grpcCallExecutor; + this.fileDescriptorContextSupplier = fileDescriptorContextSupplier; + } + + @Override + public CompletableFuture apply( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { + + FileDescriptorContext fileDescriptorContext = + this.fileDescriptorContextSupplier.get(workflowContext, taskContext, input); + + return CompletableFuture.supplyAsync( + () -> + this.grpcCallExecutor.apply( + fileDescriptorContext, requestContext, workflowContext, taskContext, input)); + } +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java new file mode 100644 index 000000000..c58ca0d0d --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java @@ -0,0 +1,241 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NullNode; +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.MethodDescriptor; +import io.grpc.protobuf.ProtoUtils; +import io.grpc.stub.ClientCalls; +import io.serverlessworkflow.api.types.CallGRPC; +import io.serverlessworkflow.api.types.ExternalResource; +import io.serverlessworkflow.api.types.GRPCArguments; +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.WithGRPCService; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowMutablePosition; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import io.serverlessworkflow.impl.executors.CallableTask; +import io.serverlessworkflow.impl.executors.CallableTaskBuilder; +import io.serverlessworkflow.impl.jackson.JsonUtils; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GrpcExecutorBuilder implements CallableTaskBuilder { + + private static final Logger LOGGER = LoggerFactory.getLogger(GrpcExecutorBuilder.class); + + private ExternalResource proto; + private GrpcRequestContext grpcRequestContext; + private Map arguments; + private GrpcCallExecutor callExecutor; + private FileDescriptorContextSupplier fileDescriptorContextSupplier; + private WorkflowValueResolver protoUriSupplier; + + @Override + public boolean accept(Class clazz) { + return clazz.equals(CallGRPC.class); + } + + @Override + public void init(CallGRPC task, WorkflowDefinition definition, WorkflowMutablePosition position) { + + GRPCArguments with = task.getWith(); + WithGRPCService service = with.getService(); + this.proto = with.getProto(); + + this.arguments = + with.getArguments() != null && with.getArguments().getAdditionalProperties() != null + ? with.getArguments().getAdditionalProperties() + : Map.of(); + + this.grpcRequestContext = + new GrpcRequestContext( + service.getHost(), service.getPort(), with.getMethod(), service.getName(), arguments); + + FileDescriptorReader fileDescriptorReader = new FileDescriptorReader(); + + this.fileDescriptorContextSupplier = + (WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel workflowModel) -> + definition + .resourceLoader() + .load( + with.getProto().getEndpoint(), + fileDescriptorReader::readDescriptor, + workflowContext, + taskContext, + workflowModel); + this.callExecutor = + (fileDescriptorContext, requestContext, workflowContext, taskContext, model) -> { + Channel channel = + GrpcChannelResolver.channel(workflowContext, taskContext, this.grpcRequestContext); + String protoName = fileDescriptorContext.inputProto(); + + DescriptorProtos.FileDescriptorProto fileDescriptorProto = + fileDescriptorContext.fileDescriptorSet().getFileList().stream() + .filter( + file -> + file.getName() + .equals( + this.proto.getName() != null ? this.proto.getName() : protoName)) + .findFirst() + .orElseThrow( + () -> new IllegalStateException("Proto file not found in descriptor set")); + + try { + Descriptors.FileDescriptor fileDescriptor = + Descriptors.FileDescriptor.buildFrom( + fileDescriptorProto, new Descriptors.FileDescriptor[] {}); + Descriptors.ServiceDescriptor serviceDescriptor = + fileDescriptor.findServiceByName(this.grpcRequestContext.service()); + + Objects.requireNonNull( + serviceDescriptor, "Service not found: " + this.grpcRequestContext.service()); + + Descriptors.MethodDescriptor methodDescriptor = + serviceDescriptor.findMethodByName(this.grpcRequestContext.method()); + + Objects.requireNonNull( + methodDescriptor, "Method not found: " + this.grpcRequestContext.method()); + + MethodDescriptor.MethodType methodType = + ProtobufMessageUtils.getMethodType(methodDescriptor); + + ClientCall call = + channel.newCall( + io.grpc.MethodDescriptor.newBuilder() + .setType(methodType) + .setFullMethodName( + io.grpc.MethodDescriptor.generateFullMethodName( + serviceDescriptor.getFullName(), methodDescriptor.getName())) + .setRequestMarshaller( + ProtoUtils.marshaller( + DynamicMessage.newBuilder(methodDescriptor.getInputType()) + .buildPartial())) + .setResponseMarshaller( + ProtoUtils.marshaller( + DynamicMessage.newBuilder(methodDescriptor.getOutputType()) + .buildPartial())) + .build(), + CallOptions.DEFAULT.withWaitForReady()); + + return switch (methodType) { + case CLIENT_STREAMING -> + handleClientStreaming(workflowContext, arguments, methodDescriptor, call); + case BIDI_STREAMING -> + handleBidiStreaming(workflowContext, arguments, methodDescriptor, call); + case SERVER_STREAMING -> + handleServerStreaming(workflowContext, methodDescriptor, arguments, call); + case UNARY, UNKNOWN -> + handleBlockingUnary(workflowContext, methodDescriptor, arguments, call); + }; + + } catch (Descriptors.DescriptorValidationException + | InvalidProtocolBufferException + | JsonProcessingException e) { + throw new RuntimeException(e); + } + }; + } + + private static WorkflowModel handleClientStreaming( + WorkflowContext workflowContext, + Map parameters, + Descriptors.MethodDescriptor methodDescriptor, + ClientCall call) { + JsonNode jsonNode = + ProtobufMessageUtils.asyncStreamingCall( + parameters, + methodDescriptor, + responseObserver -> ClientCalls.asyncClientStreamingCall(call, responseObserver), + nodes -> nodes.isEmpty() ? NullNode.instance : nodes.get(0)); + return workflowContext.definition().application().modelFactory().fromAny(jsonNode); + } + + private static WorkflowModel handleServerStreaming( + WorkflowContext workflowContext, + Descriptors.MethodDescriptor methodDescriptor, + Map parameters, + ClientCall call) + throws InvalidProtocolBufferException, JsonProcessingException { + Message.Builder builder = ProtobufMessageUtils.buildMessage(methodDescriptor, parameters); + List nodes = new ArrayList<>(); + ClientCalls.blockingServerStreamingCall(call, builder.build()) + .forEachRemaining(message -> nodes.add(ProtobufMessageUtils.convert(message))); + return workflowContext.definition().application().modelFactory().fromAny(nodes); + } + + private static WorkflowModel handleBlockingUnary( + WorkflowContext workflowContext, + Descriptors.MethodDescriptor methodDescriptor, + Map parameters, + ClientCall call) + throws InvalidProtocolBufferException, JsonProcessingException { + Message.Builder builder = ProtobufMessageUtils.buildMessage(methodDescriptor, parameters); + + Message message = ClientCalls.blockingUnaryCall(call, builder.build()); + return workflowContext + .definition() + .application() + .modelFactory() + .fromAny(ProtobufMessageUtils.convert(message)); + } + + private static WorkflowModel handleBidiStreaming( + WorkflowContext workflowContext, + Map parameters, + Descriptors.MethodDescriptor methodDescriptor, + ClientCall call) { + return workflowContext + .definition() + .application() + .modelFactory() + .fromAny( + ProtobufMessageUtils.asyncStreamingCall( + parameters, + methodDescriptor, + responseObserver -> ClientCalls.asyncBidiStreamingCall(call, responseObserver), + v -> { + Collection nodes = v; + List list = new ArrayList<>(nodes); + return JsonUtils.fromValue(list); + })); + } + + @Override + public CallableTask build() { + return new GrpcExecutor( + this.grpcRequestContext, this.callExecutor, this.fileDescriptorContextSupplier); + } +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java new file mode 100644 index 000000000..84ff555d9 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcRequestContext.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import java.util.Map; + +public record GrpcRequestContext( + String address, int port, String method, String service, Map parameters) {} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java new file mode 100644 index 000000000..384b449c9 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/ProtobufMessageUtils.java @@ -0,0 +1,104 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.util.JsonFormat; +import io.grpc.MethodDescriptor; +import io.grpc.stub.StreamObserver; +import io.serverlessworkflow.api.WorkflowFormat; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; + +public interface ProtobufMessageUtils { + + static JsonNode convert(Message message) { + StringBuilder str = new StringBuilder(); + try { + JsonFormat.printer().appendTo(message, str); + return WorkflowFormat.JSON.mapper().readTree(str.toString()); + } catch (IOException e) { + throw new UncheckedIOException("Error converting protobuf message to JSON", e); + } + } + + static MethodDescriptor.MethodType getMethodType( + com.google.protobuf.Descriptors.MethodDescriptor methodDesc) { + DescriptorProtos.MethodDescriptorProto methodDescProto = methodDesc.toProto(); + if (methodDescProto.getClientStreaming()) { + if (methodDescProto.getServerStreaming()) { + return MethodDescriptor.MethodType.BIDI_STREAMING; + } + return MethodDescriptor.MethodType.CLIENT_STREAMING; + } else if (methodDescProto.getServerStreaming()) { + return MethodDescriptor.MethodType.SERVER_STREAMING; + } else { + return MethodDescriptor.MethodType.UNARY; + } + } + + static JsonNode asyncStreamingCall( + Map parameters, + com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, + UnaryOperator> streamObserverFunction, + Function, JsonNode> nodesFunction) { + WaitingStreamObserver responseObserver = new WaitingStreamObserver(); + StreamObserver requestObserver = streamObserverFunction.apply(responseObserver); + + for (var entry : parameters.entrySet()) { + try { + Message message = + buildMessage(entry, DynamicMessage.newBuilder(methodDescriptor.getInputType())).build(); + requestObserver.onNext(message); + } catch (Exception e) { + requestObserver.onError(e); + throw new RuntimeException(e); + } + responseObserver.checkForServerStreamErrors(); + } + requestObserver.onCompleted(); + + return nodesFunction.apply( + responseObserver.get().stream() + .map(ProtobufMessageUtils::convert) + .collect(Collectors.toList())); + } + + static Message.Builder buildMessage(Object object, Message.Builder builder) + throws InvalidProtocolBufferException, JsonProcessingException { + JsonFormat.parser().merge(WorkflowFormat.JSON.mapper().writeValueAsString(object), builder); + return builder; + } + + static Message.Builder buildMessage( + Descriptors.MethodDescriptor methodDescriptor, Map parameters) + throws InvalidProtocolBufferException, JsonProcessingException { + DynamicMessage.Builder builder = DynamicMessage.newBuilder(methodDescriptor.getInputType()); + JsonFormat.parser().merge(WorkflowFormat.JSON.mapper().writeValueAsString(parameters), builder); + return builder; + } +} diff --git a/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/WaitingStreamObserver.java b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/WaitingStreamObserver.java new file mode 100644 index 000000000..fe73d5cd3 --- /dev/null +++ b/impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/WaitingStreamObserver.java @@ -0,0 +1,80 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.grpc; + +import com.google.protobuf.Message; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class WaitingStreamObserver implements StreamObserver { + + List responses = new ArrayList<>(); + CompletableFuture> responsesFuture = new CompletableFuture<>(); + + @Override + public void onNext(Message messageReply) { + responses.add(messageReply); + } + + @Override + public void onError(Throwable throwable) { + responsesFuture.completeExceptionally(throwable); + } + + @Override + public void onCompleted() { + responsesFuture.complete(responses); + } + + public List get() { + int defaultTimeout = 10000; + + try { + return responsesFuture.get(defaultTimeout, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } catch (TimeoutException e) { + throw new IllegalStateException( + String.format("gRPC call timed out after %d seconds", defaultTimeout), e); + } catch (ExecutionException e) { + throw new IllegalStateException(getServerStreamErrorMessage(e.getCause()), e.getCause()); + } + } + + public void checkForServerStreamErrors() { + if (responsesFuture.isCompletedExceptionally()) { + try { + responsesFuture.join(); + } catch (CompletionException e) { + throw new IllegalStateException(getServerStreamErrorMessage(e.getCause()), e.getCause()); + } + } + } + + private String getServerStreamErrorMessage(Throwable throwable) { + return String.format( + "Received an error through gRPC server stream with status: %s", + Status.fromThrowable(throwable)); + } +} diff --git a/impl/grpc/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder b/impl/grpc/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder new file mode 100644 index 000000000..6acd9ba63 --- /dev/null +++ b/impl/grpc/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder @@ -0,0 +1 @@ +io.serverlessworkflow.impl.executors.grpc.GrpcExecutorBuilder \ No newline at end of file diff --git a/impl/pom.xml b/impl/pom.xml index 7c4f17133..166f96f0f 100644 --- a/impl/pom.xml +++ b/impl/pom.xml @@ -17,6 +17,7 @@ 9.2.1 3.7.0 25.0.1 + 3.11.4 @@ -105,6 +106,11 @@ serverlessworkflow-impl-container ${project.version} + + io.serverlessworkflow + serverlessworkflow-impl-grpc + ${project.version} + net.thisptr jackson-jq @@ -193,5 +199,6 @@ test javascript python + grpc diff --git a/impl/test/pom.xml b/impl/test/pom.xml index fd47aae9c..2363c8d72 100644 --- a/impl/test/pom.xml +++ b/impl/test/pom.xml @@ -53,6 +53,10 @@ io.serverlessworkflow serverlessworkflow-impl-container + + io.serverlessworkflow + serverlessworkflow-impl-grpc + org.glassfish.jersey.core jersey-client @@ -85,6 +89,12 @@ org.awaitility awaitility + + + io.grpc + grpc-netty + test + @@ -95,7 +105,32 @@ + + + kr.motd.maven + os-maven-plugin + 1.7.1 + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + com.google.protobuf:protoc:3.25.8:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:${version.io.grpc.java}:exe:${os.detected.classifier} + + + + + test-compile + test-compile-custom + + + + maven-jar-plugin diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java new file mode 100644 index 000000000..175c6cc93 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcClientStreamingTest.java @@ -0,0 +1,88 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc; + +import io.grpc.MethodDescriptor; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.serverlessworkflow.api.WorkflowReader; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.executors.grpc.contributors.Contributors; +import io.serverlessworkflow.impl.executors.grpc.contributors.ContributorsAPIGrpc; +import io.serverlessworkflow.impl.test.grpc.handlers.ContributorHandler; +import java.io.IOException; +import java.util.Map; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import person.Person; +import person.PersonAPIGrpc; + +public class GrpcClientStreamingTest { + + private static final int PORT_FOR_EXAMPLES = 5011; + private static WorkflowApplication app; + private static Server server; + + @BeforeAll + static void setUpApp() throws IOException { + server = ServerBuilder.forPort(PORT_FOR_EXAMPLES).addService(new ContributorHandler()).build(); + server.start(); + + app = WorkflowApplication.builder().build(); + } + + @AfterEach + void cleanup() throws InterruptedException { + server.shutdown().awaitTermination(); + } + + @Test + void grpcPerson() throws IOException { + + MethodDescriptor + createContributorMethod = ContributorsAPIGrpc.getCreateContributorMethod(); + + MethodDescriptor getPersonMethod = + PersonAPIGrpc.getGetPersonMethod(); + + Workflow workflow = + WorkflowReader.readWorkflowFromClasspath( + "workflows-samples/grpc/contributors-client-stream-call.yaml"); + + WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow); + + String filename = + getClass() + .getClassLoader() + .getResource("workflows-samples/grpc/proto/contributors.proto") + .getFile(); + + Map output = + workflowDefinition + .instance(Map.of("protoFilePath", "file://" + filename)) + .start() + .join() + .asMap() + .orElseThrow(); + + Assertions.assertThat(output) + .contains(Map.entry("message", "dependabot[bot] has 1 contributions")); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcTestUnaryCallTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcTestUnaryCallTest.java new file mode 100644 index 000000000..365c98178 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/GrpcTestUnaryCallTest.java @@ -0,0 +1,80 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc; + +import io.grpc.MethodDescriptor; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.serverlessworkflow.api.WorkflowReader; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import java.io.IOException; +import java.util.Map; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import person.Person; +import person.PersonAPIGrpc; + +public class GrpcTestUnaryCallTest { + + private static final int PORT_FOR_EXAMPLES = 5011; + private static WorkflowApplication app; + private static Server server; + + @BeforeAll + static void setUpApp() throws IOException { + server = + ServerBuilder.forPort(PORT_FOR_EXAMPLES).addService(new PersonServiceHandler()).build(); + server.start(); + + app = WorkflowApplication.builder().build(); + } + + @AfterEach + void cleanup() throws InterruptedException { + server.shutdown().awaitTermination(); + } + + @Test + void grpcPerson() throws IOException { + MethodDescriptor getPersonMethod = + PersonAPIGrpc.getGetPersonMethod(); + + Workflow workflow = + WorkflowReader.readWorkflowFromClasspath("workflows-samples/grpc/get-user-call.yaml"); + + WorkflowDefinition workflowDefinition = app.workflowDefinition(workflow); + + String filename = + getClass() + .getClassLoader() + .getResource("workflows-samples/grpc/proto/person.proto") + .getFile(); + + Map output = + workflowDefinition + .instance(Map.of("protoFilePath", "file://" + filename)) + .start() + .join() + .asMap() + .orElseThrow(); + + Assertions.assertThat(output).contains(Map.entry("name", "John Doe"), Map.entry("id", 891182)); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/PersonServiceHandler.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/PersonServiceHandler.java new file mode 100644 index 000000000..6404805d3 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/PersonServiceHandler.java @@ -0,0 +1,37 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc; + +import io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import person.Person; +import person.PersonAPIGrpc; + +public class PersonServiceHandler extends PersonAPIGrpc.PersonAPIImplBase { + + private static final Logger LOGGER = LoggerFactory.getLogger(PersonServiceHandler.class); + + @Override + public void getPerson( + Person.GetPersonRequest request, StreamObserver responseObserver) { + LOGGER.info("Receiving GetPersonRequest request: {}", request.toByteString()); + + responseObserver.onNext( + Person.GetPersonResponse.newBuilder().setId(891182).setName("John Doe").build()); + responseObserver.onCompleted(); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorHandler.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorHandler.java new file mode 100644 index 000000000..d27523152 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/ContributorHandler.java @@ -0,0 +1,80 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc.handlers; + +import io.grpc.stub.StreamObserver; +import io.serverlessworkflow.impl.executors.grpc.contributors.Contributors; +import io.serverlessworkflow.impl.executors.grpc.contributors.ContributorsAPIGrpc; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.LongAdder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ContributorHandler extends ContributorsAPIGrpc.ContributorsAPIImplBase { + + private static final Logger LOGGER = LoggerFactory.getLogger(ContributorHandler.class); + + private static final Map githubs = new ConcurrentHashMap<>(); + + @Override + public StreamObserver createContributor( + StreamObserver responseObserver) { + + return new StreamObserver<>() { + @Override + public void onNext(Contributors.AddContributionRequest value) { + String github = value.getGithub(); + LOGGER.info("Receiving a new contribution for: {}", value.getGithub()); + githubs.compute( + github, + (key, counter) -> { + if (counter == null) { + LongAdder longAdder = new LongAdder(); + longAdder.increment(); + return longAdder; + } + counter.increment(); + return counter; + }); + } + + @Override + public void onError(Throwable t) { + LOGGER.error("Ok, ignore it", t); + } + + @Override + public void onCompleted() { + StringBuilder stringBuilder = new StringBuilder(); + Set> entries = githubs.entrySet(); + for (Map.Entry entry : entries) { + stringBuilder + .append(entry.getKey()) + .append(" has ") + .append(entry.getValue()) + .append(" contributions"); + } + responseObserver.onNext( + Contributors.AddContributionResponse.newBuilder() + .setMessage(stringBuilder.toString()) + .build()); + responseObserver.onCompleted(); + } + }; + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/PersonHandler.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/PersonHandler.java new file mode 100644 index 000000000..dbe655e42 --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/grpc/handlers/PersonHandler.java @@ -0,0 +1,37 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test.grpc.handlers; + +import io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import person.Person; +import person.PersonAPIGrpc; + +public class PersonHandler extends PersonAPIGrpc.PersonAPIImplBase { + + private static final Logger LOGGER = LoggerFactory.getLogger(PersonHandler.class); + + @Override + public void getPerson( + Person.GetPersonRequest request, StreamObserver responseObserver) { + LOGGER.info("Receiving GetPersonRequest request: {}", request.toByteString()); + + responseObserver.onNext( + Person.GetPersonResponse.newBuilder().setId(891182).setName("John Doe").build()); + responseObserver.onCompleted(); + } +} diff --git a/impl/test/src/test/proto/contributors.proto b/impl/test/src/test/proto/contributors.proto new file mode 100644 index 000000000..1e8f5f642 --- /dev/null +++ b/impl/test/src/test/proto/contributors.proto @@ -0,0 +1,17 @@ +syntax = "proto2"; + +package io.serverlessworkflow.impl.executors.grpc.contributors; + +message AddContributionRequest { + required string github = 1; +} + +message AddContributionResponse { + required string message = 1; +} + +// client stream +service ContributorsAPI { + rpc CreateContributor(stream AddContributionRequest) returns (AddContributionResponse) {} +} + diff --git a/impl/test/src/test/proto/person.proto b/impl/test/src/test/proto/person.proto new file mode 100644 index 000000000..b84644d2b --- /dev/null +++ b/impl/test/src/test/proto/person.proto @@ -0,0 +1,14 @@ +syntax = "proto2"; + +package person; + +message GetPersonRequest {} + +message GetPersonResponse { + required string name = 1; + required int32 id = 2; +} + +service PersonAPI { + rpc GetPerson(GetPersonRequest) returns (GetPersonResponse); +} \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/contributors-client-stream-call.yaml b/impl/test/src/test/resources/workflows-samples/grpc/contributors-client-stream-call.yaml new file mode 100644 index 000000000..7d12ef105 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/contributors-client-stream-call.yaml @@ -0,0 +1,18 @@ +document: + dsl: '1.0.2' + namespace: test + name: grpc-example + version: '0.1.0' +do: + - greet: + call: grpc + with: + proto: + endpoint: ${ .protoFilePath } + service: + name: ContributorsAPI + host: localhost + port: 5011 + method: CreateContributor + arguments: + github: dependabot[bot] \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/get-user-call.yaml b/impl/test/src/test/resources/workflows-samples/grpc/get-user-call.yaml new file mode 100644 index 000000000..39742806a --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/get-user-call.yaml @@ -0,0 +1,16 @@ +document: + dsl: '1.0.2' + namespace: test + name: grpc-example + version: '0.1.0' +do: + - greet: + call: grpc + with: + proto: + endpoint: ${ .protoFilePath } + service: + name: PersonAPI + host: localhost + port: 5011 + method: GetPerson \ No newline at end of file diff --git a/impl/test/src/test/resources/workflows-samples/grpc/proto/contributors.proto b/impl/test/src/test/resources/workflows-samples/grpc/proto/contributors.proto new file mode 100644 index 000000000..1e8f5f642 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/proto/contributors.proto @@ -0,0 +1,17 @@ +syntax = "proto2"; + +package io.serverlessworkflow.impl.executors.grpc.contributors; + +message AddContributionRequest { + required string github = 1; +} + +message AddContributionResponse { + required string message = 1; +} + +// client stream +service ContributorsAPI { + rpc CreateContributor(stream AddContributionRequest) returns (AddContributionResponse) {} +} + diff --git a/impl/test/src/test/resources/workflows-samples/grpc/proto/person.proto b/impl/test/src/test/resources/workflows-samples/grpc/proto/person.proto new file mode 100644 index 000000000..3fde53e06 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/grpc/proto/person.proto @@ -0,0 +1,19 @@ +// add a service name called OpenAPI.syntax = "proto2"; + +syntax = "proto2"; + +package person; + +message GetPersonRequest { + +} + +message GetPersonResponse { + required string name = 1; + required int32 id = 2; +} +// create a service called PersonAPI.Person + +service PersonAPI { + rpc GetPerson(GetPersonRequest) returns (GetPersonResponse); +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index cf156e592..997808529 100644 --- a/pom.xml +++ b/pom.xml @@ -65,6 +65,7 @@ 3.6.2 3.5.4 3.2.8 + 1.78.0 3.5.0 ${java.version} 1.2.2 @@ -83,6 +84,7 @@ 1.5.23 2.20.1 2.20 + 4.32.1 2.0.0 5.3.2 4.0.1 @@ -203,6 +205,27 @@ jakarta.validation-api ${version.jakarta.validation} + + io.grpc + grpc-netty + ${version.io.grpc.java} + runtime + + + io.grpc + grpc-protobuf + ${version.io.grpc.java} + + + io.grpc + grpc-stub + ${version.io.grpc.java} + + + com.google.protobuf + protobuf-java + ${version.com.google.protobuf.java} +