Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions impl/grpc/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.serverlessworkflow</groupId>
<artifactId>serverlessworkflow-impl</artifactId>
<version>8.0.0-SNAPSHOT</version>
</parent>
<artifactId>serverlessworkflow-impl-grpc</artifactId>
<name>Serverless Workflow :: Impl :: gRPC</name>

<dependencies>
<dependency>
<groupId>io.serverlessworkflow</groupId>
<artifactId>serverlessworkflow-impl-core</artifactId>
</dependency>
<dependency>
<groupId>io.serverlessworkflow</groupId>
<artifactId>serverlessworkflow-api</artifactId>
</dependency>
<dependency>
<groupId>io.serverlessworkflow</groupId>
<artifactId>serverlessworkflow-impl-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.25.8</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.25.8</version>
</dependency>
<dependency>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar</artifactId>
<version>${version.com.github.os72.protoc.jar}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors.grpc;

import com.google.protobuf.DescriptorProtos;

public record FileDescriptorContext(
DescriptorProtos.FileDescriptorSet fileDescriptorSet, String inputProto) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors.grpc;

import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowModel;

@FunctionalInterface
public interface FileDescriptorContextSupplier {

FileDescriptorContext get(
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors.grpc;

import com.github.os72.protocjar.Protoc;
import com.google.protobuf.DescriptorProtos;
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;

public class FileDescriptorReader {

public FileDescriptorContext readDescriptor(ExternalResourceHandler externalResourceHandler) {
Path grpcDir =
tryCreateTempGrpcDir()
.orElseThrow(
() -> new IllegalStateException("Could not create temporary gRPC directory"));

try (InputStream inputStream = externalResourceHandler.open()) {

Path protoFile = grpcDir.resolve(externalResourceHandler.name());

Files.copy(inputStream, protoFile);

Path descriptorOutput = grpcDir.resolve("descriptor.protobin");

try {

generateFileDescriptor(grpcDir, protoFile, descriptorOutput);

DescriptorProtos.FileDescriptorSet fileDescriptorSet =
DescriptorProtos.FileDescriptorSet.newBuilder()
.mergeFrom(Files.readAllBytes(descriptorOutput))
.build();

return new FileDescriptorContext(fileDescriptorSet, externalResourceHandler.name());

} catch (IOException e) {
throw new UncheckedIOException(
"Unable to read external resource handler: " + externalResourceHandler.name(), e);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private Optional<Path> tryCreateTempGrpcDir() {
try {
return Optional.of(Files.createTempDirectory("serverless-workflow-"));
} catch (IOException e) {
throw new UncheckedIOException("Error while creating temporary gRPC directory", e);
}
}

/**
* Calls protoc binary with <code>--descriptor_set_out=</code> option set.
*
* @param grpcDir a temporary directory
* @param protoFile the .proto file used by <code>protoc</code> to generate the file descriptor
* @param descriptorOutput the output directory where the descriptor file will be generated
*/
private static void generateFileDescriptor(Path grpcDir, Path protoFile, Path descriptorOutput) {
String[] protocArgs =
new String[] {
"--include_imports",
"--descriptor_set_out=" + descriptorOutput.toAbsolutePath(),
"-I",
grpcDir.toAbsolutePath().toString(),
protoFile.toAbsolutePath().toString()
};

try {

int status = Protoc.runProtoc(protocArgs);

// TODO: I need to resolve the protoc file from PATH or from JARs, supporting all OS
// systems.
// ProcessBuilder processBuilder = new ProcessBuilder(protocArgs);
// int status = ScriptUtils.uncheckedStart(processBuilder).waitFor();

if (status != 0) {
throw new RuntimeException(
"Unable to generate file descriptor, 'protoc' execution failed with status " + status);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Unable to generate file descriptor", e);
} catch (IOException e) {
throw new UncheckedIOException("Unable to generate file descriptor", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors.grpc;

import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowModel;

@FunctionalInterface
public interface GrpcCallExecutor {

WorkflowModel apply(
FileDescriptorContext fileDescriptorContext,
GrpcRequestContext requestContext,
WorkflowContext workflowContext,
TaskContext taskContext,
WorkflowModel model);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors.grpc;

import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowContext;

public class GrpcChannelResolver {

public static final String GRPC_CHANNEL_PROVIDER = "grpcChannelProvider";

public static Channel channel(
WorkflowContext workflowContext,
TaskContext taskContext,
GrpcRequestContext grpcRequestContext) {
WorkflowApplication appl = workflowContext.definition().application();
return appl.<Channel>additionalObject(GRPC_CHANNEL_PROVIDER, workflowContext, taskContext)
.orElseGet(
() ->
ManagedChannelBuilder.forAddress(
grpcRequestContext.address(), grpcRequestContext.port())
.usePlaintext()
.build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors.grpc;

import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.executors.CallableTask;
import java.util.concurrent.CompletableFuture;

public class GrpcExecutor implements CallableTask {

private final GrpcRequestContext requestContext;
private final GrpcCallExecutor grpcCallExecutor;
private final FileDescriptorContextSupplier fileDescriptorContextSupplier;

public GrpcExecutor(
GrpcRequestContext builder,
GrpcCallExecutor grpcCallExecutor,
FileDescriptorContextSupplier fileDescriptorContextSupplier) {
this.requestContext = builder;
this.grpcCallExecutor = grpcCallExecutor;
this.fileDescriptorContextSupplier = fileDescriptorContextSupplier;
}

@Override
public CompletableFuture<WorkflowModel> apply(
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {

FileDescriptorContext fileDescriptorContext =
this.fileDescriptorContextSupplier.get(workflowContext, taskContext, input);

return CompletableFuture.supplyAsync(
() ->
this.grpcCallExecutor.apply(
fileDescriptorContext, requestContext, workflowContext, taskContext, input));
}
}
Loading