Skip to content

Commit 7d46ac0

Browse files
feat: Implement correlation on event filter (#1386)
* feat: Implement correlation on event filter - Add CorrelationPredicate for evaluating correlation expressions - Add correlate support in AbstractEventFilterBuilder and AbstractEventFilterSpec - Update TypeEventRegistration and TypeEventRegistrationBuilder with correlation predicates - Implement correlation matching in AbstractTypeConsumer - Add CorrelationTest and listen-correlate.yaml - Add correlate tests in WorkflowBuilderTest and DSLTest Signed-off-by: Matheus André <matheusandr2@gmail.com> * Review comments Signed-off-by: Matheus André <matheusandr2@gmail.com> --------- Signed-off-by: Matheus André <matheusandr2@gmail.com> Co-authored-by: fjtirado <ftirados@redhat.com>
1 parent 565e15d commit 7d46ac0

18 files changed

Lines changed: 643 additions & 23 deletions

File tree

fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventFilterBuilder.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,13 @@ public SELF with(Consumer<P> c) {
3636
return self();
3737
}
3838

39-
public SELF correlate(String key, Consumer<ListenTaskBuilder.CorrelatePropertyBuilder> c) {
40-
throw new UnsupportedOperationException(
41-
"correlate is not supported in the engine level: https://github.com/serverlessworkflow/sdk-java/issues/1206");
39+
public SELF correlate(
40+
String key, Consumer<? super AbstractListenTaskBuilder.CorrelatePropertyBuilder> c) {
41+
AbstractListenTaskBuilder.CorrelatePropertyBuilder cb =
42+
new AbstractListenTaskBuilder.CorrelatePropertyBuilder();
43+
c.accept(cb);
44+
correlate.setAdditionalProperty(key, cb.build());
45+
return self();
4246
}
4347

4448
public EventFilter build() {

fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/AbstractEventFilterSpec.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.serverlessworkflow.fluent.spec.AbstractEventFilterBuilder;
1919
import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder;
20+
import io.serverlessworkflow.fluent.spec.AbstractListenTaskBuilder;
2021
import java.util.ArrayList;
2122
import java.util.List;
2223
import java.util.function.Consumer;
@@ -41,13 +42,11 @@ protected List<Consumer<EVENT_FILTER>> getFilterSteps() {
4142
return filterSteps;
4243
}
4344

44-
// TODO: "correlate is not supported in the engine level:
45-
// https://github.com/serverlessworkflow/sdk-java/issues/1206". Keeping the code for a future
46-
// reference.
47-
// public SELF correlate(String key, Consumer<ListenTaskBuilder.CorrelatePropertyBuilder> c) {
48-
// filterSteps.add(f -> f.correlate(key, c));
49-
// return self();
50-
// }
45+
public SELF correlate(
46+
String key, Consumer<? super AbstractListenTaskBuilder.CorrelatePropertyBuilder> c) {
47+
addFilterStep(f -> f.correlate(key, c));
48+
return self();
49+
}
5150

5251
@Override
5352
public void accept(EVENT_FILTER filterBuilder) {

fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import io.serverlessworkflow.api.types.AuthenticationPolicyUnion;
3838
import io.serverlessworkflow.api.types.CallHTTP;
3939
import io.serverlessworkflow.api.types.CatchErrors;
40+
import io.serverlessworkflow.api.types.CorrelateProperty;
4041
import io.serverlessworkflow.api.types.Document;
4142
import io.serverlessworkflow.api.types.EmitEventDefinition;
4243
import io.serverlessworkflow.api.types.EmitTask;
@@ -310,8 +311,12 @@ void testDoTaskListenOne() {
310311
to ->
311312
to.one(
312313
f ->
313-
f.with(
314-
p -> p.type("com.fake.pet").source("mySource"))))))
314+
f.with(p -> p.type("com.fake.pet").source("mySource"))
315+
.correlate(
316+
"orderId",
317+
c ->
318+
c.from("$.data.orderId")
319+
.expect("$.input.orderId"))))))
315320
.build();
316321

317322
List<TaskItem> items = wf.getDo();
@@ -327,6 +332,10 @@ void testDoTaskListenOne() {
327332
EventFilter filter = one.getOne();
328333
assertNotNull(filter, "EventFilter should be present");
329334
assertEquals("com.fake.pet", filter.getWith().getType(), "Filter type should match");
335+
CorrelateProperty correlate = filter.getCorrelate().getAdditionalProperties().get("orderId");
336+
assertNotNull(correlate, "Correlate property should be present");
337+
assertEquals("$.data.orderId", correlate.getFrom(), "Correlate from should match");
338+
assertEquals("$.input.orderId", correlate.getExpect(), "Correlate expect should match");
330339
}
331340

332341
@Test

fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import static io.serverlessworkflow.fluent.spec.dsl.DSL.workflow;
3131
import static org.assertj.core.api.Assertions.assertThat;
3232

33+
import io.serverlessworkflow.api.types.CorrelateProperty;
3334
import io.serverlessworkflow.api.types.HTTPArguments;
3435
import io.serverlessworkflow.api.types.ListenTaskConfiguration;
3536
import io.serverlessworkflow.api.types.RunTaskConfiguration;
@@ -166,7 +167,15 @@ public void when_listen_any_with_until() {
166167
public void when_listen_one() {
167168
Workflow wf =
168169
WorkflowBuilder.workflow("f", "ns", "1")
169-
.tasks(t -> t.listen(to().one(event().type("only-once"))))
170+
.tasks(
171+
t ->
172+
t.listen(
173+
to().one(
174+
event()
175+
.type("only-once")
176+
.correlate(
177+
"workflowInstanceId",
178+
c -> c.from("$.metadata.instanceId")))))
170179
.build();
171180

172181
var to = wf.getDo().get(0).getTask().getListenTask().getListen().getTo();
@@ -178,6 +187,10 @@ public void when_listen_one() {
178187
var one = to.getOneEventConsumptionStrategy().getOne();
179188
assertThat(one.getWith()).isNotNull();
180189
assertThat(one.getWith().getType()).isEqualTo("only-once");
190+
CorrelateProperty correlate =
191+
one.getCorrelate().getAdditionalProperties().get("workflowInstanceId");
192+
assertThat(correlate).isNotNull();
193+
assertThat(correlate.getFrom()).isEqualTo("$.metadata.instanceId");
181194
}
182195

183196
@Test

impl/core/src/main/java/io/serverlessworkflow/impl/CompositeExpressionFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ public WorkflowValueResolver<String> resolveString(ExpressionDescriptor desc) {
4949
return processFactories(desc, f -> f.resolveString(desc));
5050
}
5151

52+
@Override
53+
public WorkflowValueResolver<Object> resolveValue(ExpressionDescriptor desc) {
54+
return processFactories(desc, f -> f.resolveValue(desc));
55+
}
56+
5257
@Override
5358
public WorkflowValueResolver<OffsetDateTime> resolveDate(ExpressionDescriptor desc) {
5459
return processFactories(desc, f -> f.resolveDate(desc));

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,6 @@ public interface WorkflowInstance extends WorkflowInstanceData {
5252
boolean resume();
5353

5454
<T> T addMetadataIfAbsent(String key, Supplier<T> supplier);
55+
56+
void removeMetadata(String key);
5557
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,11 @@ public <T> T addMetadataIfAbsent(String key, Supplier<T> supplier) {
355355
return (T) additionalObjects.computeIfAbsent(key, k -> supplier.get());
356356
}
357357

358+
@Override
359+
public void removeMetadata(String key) {
360+
additionalObjects.remove(key);
361+
}
362+
358363
@Override
359364
public <T> Optional<T> findMetadata(String key, Class<T> objectClass) {
360365
Object value = additionalObjects.get(key);

impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java

Lines changed: 77 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@
1616
package io.serverlessworkflow.impl.events;
1717

1818
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.api.types.CorrelateProperty;
1920
import io.serverlessworkflow.api.types.EventFilter;
21+
import io.serverlessworkflow.api.types.EventFilterCorrelate;
2022
import io.serverlessworkflow.api.types.EventProperties;
2123
import io.serverlessworkflow.impl.TaskContext;
2224
import io.serverlessworkflow.impl.WorkflowApplication;
2325
import io.serverlessworkflow.impl.WorkflowContext;
26+
import io.serverlessworkflow.impl.WorkflowModel;
2427
import java.util.AbstractCollection;
28+
import java.util.ArrayList;
2529
import java.util.Collection;
2630
import java.util.Iterator;
2731
import java.util.List;
@@ -37,6 +41,8 @@ public abstract class AbstractTypeConsumer
3741

3842
private static final Logger logger = LoggerFactory.getLogger(AbstractTypeConsumer.class);
3943

44+
private static final CloudEventPredicate ALWAYS_TRUE = (ce, wf, t) -> true;
45+
4046
protected abstract void registerToAll(Consumer<CloudEvent> consumer);
4147

4248
protected abstract void unregisterFromAll();
@@ -52,13 +58,32 @@ public TypeEventRegistrationBuilder listen(
5258
EventFilter register, WorkflowApplication application) {
5359
EventProperties properties = register.getWith();
5460
String type = properties.getType();
55-
return new TypeEventRegistrationBuilder(
56-
type, application.cloudEventPredicateFactory().build(application, properties));
61+
CloudEventPredicate cePredicate =
62+
application.cloudEventPredicateFactory().build(application, properties);
63+
Collection<CloudEventPredicate> correlationPredicates =
64+
buildCorrelationPredicates(register.getCorrelate(), application);
65+
return new TypeEventRegistrationBuilder(type, cePredicate, correlationPredicates);
66+
}
67+
68+
private Collection<CloudEventPredicate> buildCorrelationPredicates(
69+
EventFilterCorrelate correlate, WorkflowApplication application) {
70+
if (correlate == null) {
71+
return List.of();
72+
}
73+
Map<String, CorrelateProperty> additionalProperties = correlate.getAdditionalProperties();
74+
if (additionalProperties == null || additionalProperties.isEmpty()) {
75+
return List.of();
76+
}
77+
Collection<CloudEventPredicate> predicates = new ArrayList<>();
78+
for (Map.Entry<String, CorrelateProperty> entry : additionalProperties.entrySet()) {
79+
predicates.add(CorrelationPredicate.from(entry.getKey(), entry.getValue(), application));
80+
}
81+
return predicates;
5782
}
5883

5984
@Override
6085
public Collection<TypeEventRegistrationBuilder> listenToAll(WorkflowApplication application) {
61-
return List.of(new TypeEventRegistrationBuilder(null, null));
86+
return List.of(new TypeEventRegistrationBuilder(null, ALWAYS_TRUE, List.of()));
6287
}
6388

6489
private static class CloudEventConsumer extends AbstractCollection<TypeEventRegistration>
@@ -68,13 +93,42 @@ private static class CloudEventConsumer extends AbstractCollection<TypeEventRegi
6893
@Override
6994
public void accept(CloudEvent ce) {
7095
logger.debug("Received cloud event {}", ce);
96+
WorkflowModel eventModel = null;
7197
for (TypeEventRegistration registration : registrations) {
72-
if (registration.predicate().test(ce, registration.workflow(), registration.task())) {
73-
registration.consumer().accept(ce);
98+
if (!registration.predicate().test(ce, registration.workflow(), registration.task())) {
99+
continue;
74100
}
101+
Collection<CloudEventPredicate> correlationPredicates =
102+
registration.correlationPredicates();
103+
if (!correlationPredicates.isEmpty()) {
104+
if (eventModel == null && registration.hasModelAwareCorrelation()) {
105+
eventModel = registration.workflow().definition().application().modelFactory().from(ce);
106+
}
107+
if (!testCorrelation(ce, registration, eventModel)) {
108+
continue;
109+
}
110+
}
111+
registration.consumer().accept(ce);
75112
}
76113
}
77114

115+
private boolean testCorrelation(
116+
CloudEvent ce, TypeEventRegistration registration, WorkflowModel eventModel) {
117+
Collection<CloudEventPredicate> predicates = registration.correlationPredicates();
118+
for (CloudEventPredicate pred : predicates) {
119+
if (pred instanceof ModelAwareCloudEventPredicate ma) {
120+
if (!ma.test(eventModel, registration.workflow(), registration.task())) {
121+
return false;
122+
}
123+
} else {
124+
if (!pred.test(ce, registration.workflow(), registration.task())) {
125+
return false;
126+
}
127+
}
128+
}
129+
return true;
130+
}
131+
78132
@Override
79133
public boolean add(TypeEventRegistration registration) {
80134
return registrations.add(registration);
@@ -104,10 +158,16 @@ public TypeEventRegistration register(
104158
TaskContext task) {
105159
if (builder.type() == null) {
106160
registerToAll(ce);
107-
return new TypeEventRegistration(null, ce, null, workflow, task);
161+
return new TypeEventRegistration(null, ce, ALWAYS_TRUE, workflow, task);
108162
} else {
109163
TypeEventRegistration registration =
110-
new TypeEventRegistration(builder.type(), ce, builder.cePredicate(), workflow, task);
164+
new TypeEventRegistration(
165+
builder.type(),
166+
ce,
167+
builder.cePredicate(),
168+
builder.correlationPredicates(),
169+
workflow,
170+
task);
111171
registrations
112172
.computeIfAbsent(
113173
registration.type(),
@@ -138,5 +198,15 @@ public void unregister(TypeEventRegistration registration) {
138198
}
139199
});
140200
}
201+
cleanupCorrelationState(registration);
202+
}
203+
204+
private void cleanupCorrelationState(TypeEventRegistration registration) {
205+
for (CloudEventPredicate pred : registration.correlationPredicates()) {
206+
if (pred instanceof CorrelationPredicate cp) {
207+
cp.stateKey(registration.task())
208+
.ifPresent(key -> registration.workflow().instance().removeMetadata(key));
209+
}
210+
}
141211
}
142212
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.events;
17+
18+
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.api.types.CorrelateProperty;
20+
import io.serverlessworkflow.impl.TaskContext;
21+
import io.serverlessworkflow.impl.WorkflowApplication;
22+
import io.serverlessworkflow.impl.WorkflowContext;
23+
import io.serverlessworkflow.impl.WorkflowModel;
24+
import io.serverlessworkflow.impl.WorkflowValueResolver;
25+
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
26+
import java.util.Objects;
27+
import java.util.Optional;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
class CorrelationPredicate implements ModelAwareCloudEventPredicate {
32+
33+
private static final Logger logger = LoggerFactory.getLogger(CorrelationPredicate.class);
34+
35+
private final String correlationKey;
36+
private final WorkflowValueResolver<Object> fromResolver;
37+
private final WorkflowValueResolver<Object> expectResolver;
38+
39+
private CorrelationPredicate(
40+
String correlationKey,
41+
WorkflowValueResolver<Object> fromResolver,
42+
WorkflowValueResolver<Object> expectResolver) {
43+
this.correlationKey = correlationKey;
44+
this.fromResolver = fromResolver;
45+
this.expectResolver = expectResolver;
46+
}
47+
48+
public static CorrelationPredicate from(
49+
String key, CorrelateProperty prop, WorkflowApplication app) {
50+
WorkflowValueResolver<Object> fromResolver =
51+
app.expressionFactory().resolveValue(ExpressionDescriptor.from(prop.getFrom()));
52+
WorkflowValueResolver<Object> expectResolver =
53+
prop.getExpect() != null
54+
? app.expressionFactory().resolveValue(ExpressionDescriptor.from(prop.getExpect()))
55+
: null;
56+
return new CorrelationPredicate(key, fromResolver, expectResolver);
57+
}
58+
59+
private String correlationStateKey(TaskContext task) {
60+
return "correlation:"
61+
+ task.position().jsonPointer()
62+
+ ":"
63+
+ task.iteration()
64+
+ ":"
65+
+ correlationKey;
66+
}
67+
68+
Optional<String> stateKey(TaskContext task) {
69+
return expectResolver == null ? Optional.of(correlationStateKey(task)) : Optional.empty();
70+
}
71+
72+
@Override
73+
public boolean test(CloudEvent cloudEvent, WorkflowContext workflow, TaskContext task) {
74+
WorkflowModel eventModel = workflow.definition().application().modelFactory().from(cloudEvent);
75+
return test(eventModel, workflow, task);
76+
}
77+
78+
@Override
79+
public boolean test(WorkflowModel eventModel, WorkflowContext workflow, TaskContext task) {
80+
Object eventValue = fromResolver.apply(workflow, task, eventModel);
81+
if (eventValue == null) {
82+
logger.debug("Correlation from expression returned null");
83+
return false;
84+
}
85+
86+
if (expectResolver == null) {
87+
String stateKey = correlationStateKey(task);
88+
Object firstValue = workflow.instance().addMetadataIfAbsent(stateKey, () -> eventValue);
89+
boolean result = Objects.equals(eventValue, firstValue);
90+
logger.debug(
91+
"Correlation no expect, eventValue='{}', firstValue='{}', match={}",
92+
eventValue,
93+
firstValue,
94+
result);
95+
return result;
96+
}
97+
98+
Object expectedValue = expectResolver.apply(workflow, task, task.input());
99+
boolean result = Objects.equals(eventValue, expectedValue);
100+
logger.debug(
101+
"Correlation eventValue='{}', expectedValue='{}', match={}",
102+
eventValue,
103+
expectedValue,
104+
result);
105+
return result;
106+
}
107+
}

0 commit comments

Comments
 (0)