-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathChallenge2RouteBuilder.java
More file actions
73 lines (59 loc) · 2.72 KB
/
Challenge2RouteBuilder.java
File metadata and controls
73 lines (59 loc) · 2.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package challenge2.camel;
import externalLegacyCodeNotUnderOurControl.PriceService;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.builder.RouteBuilder;
import java.util.List;
/**
* @author Dmytro Rud
*/
@Slf4j
public class Challenge2RouteBuilder extends RouteBuilder {
public static final String RECEIVED_COUNT = "meetup.received.count";
// to be injected by Spring
@Getter @Setter private List<PriceService> priceServices;
@Override
public void configure() throws Exception {
final String[] serverUris = new String[priceServices.size()];
for (int i = 0; i < priceServices.size(); ++i) {
serverUris[i] = "direct:get-price-" + i;
from(serverUris[i]).setBody(new PriceExpression(priceServices.get(i)));
}
from("direct:start")
.to("seda:multicast")
.process(exchange -> log.debug("Caller thread continues processing"));
from("seda:multicast")
.multicast()
.to(serverUris)
.parallelProcessing()
.streaming()
.timeout(15_000)
.aggregationStrategy((oldExchange, newExchange) -> {
int newPrice = newExchange.getIn().getBody(int.class);
log.debug("Arrived price: {}", newPrice);
if (oldExchange == null) {
newExchange.setProperty(RECEIVED_COUNT, 1);
} else {
int oldSum = oldExchange.getIn().getBody(int.class);
newExchange.getIn().setBody(oldSum + newPrice);
newExchange.setProperty(RECEIVED_COUNT, oldExchange.getProperty(RECEIVED_COUNT, int.class) + 1);
}
return newExchange;
})
.end()
.process(exchange -> {
int expectedCount = serverUris.length;
int receivedCount = exchange.getProperty(RECEIVED_COUNT, 0, int.class);
int sum = (receivedCount == 0) ? 0 : exchange.getIn().getBody(int.class);
log.debug("Collected sum: {}", sum);
int missingCount = expectedCount - receivedCount;
if (missingCount > 0) {
sum += (missingCount * 42);
log.debug("{} responses are missing, fallback them to 42, corrected sum: {}", missingCount, sum);
}
double average = ((double) sum) / serverUris.length;
log.debug("Average: {}", average);
});
}
}