From 5e163028ed6565b819b28cbddbe7f79077c7aebc Mon Sep 17 00:00:00 2001 From: Jaakko Malkki Date: Thu, 24 Oct 2019 13:23:49 +0300 Subject: [PATCH 1/3] Update common version to 1.3.1 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 7176a1d..cd63eec 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ UTF-8 1.8 1.8 - 1.2.0 + 1.3.1 From b13751abe31f90d54b06070c92e307cb26bc467a Mon Sep 17 00:00:00 2001 From: Jaakko Malkki Date: Thu, 24 Oct 2019 13:24:07 +0300 Subject: [PATCH 2/3] Only produce vehicle positions from the first metro unit --- .../vehicleposition/application/VehiclePositionHandler.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/main/java/fi/hsl/transitdata/vehicleposition/application/VehiclePositionHandler.java b/src/main/java/fi/hsl/transitdata/vehicleposition/application/VehiclePositionHandler.java index 1863bc8..5f13b02 100644 --- a/src/main/java/fi/hsl/transitdata/vehicleposition/application/VehiclePositionHandler.java +++ b/src/main/java/fi/hsl/transitdata/vehicleposition/application/VehiclePositionHandler.java @@ -71,6 +71,12 @@ public void handleMessage(Message message) { return; } + //Ignore data from metro units other than the first + //Note that using tripVehicleCache would also produce one GTFS-RT vehicle position from multiple metro units, but we want to produce positions from the first unit + if (data.getTopic().getTransportMode() == Hfp.Topic.TransportMode.metro && data.getPayload().hasSeq() && data.getPayload().getSeq() != 1) { + return; + } + //If some other vehicle was registered for the trip, do not produce vehicle position if (!tripVehicleCache.registerVehicleForTrip(data.getTopic().getUniqueVehicleId(), data.getTopic().getRouteId(), data.getPayload().getOday(), data.getTopic().getStartTime(), data.getPayload().getDir())) { log.debug("There was already a vehicle registered for trip {} / {} / {} / {} - not producing vehicle position message for {}", data.getTopic().getRouteId(), data.getPayload().getOday(), data.getTopic().getStartTime(), data.getPayload().getDir(), data.getTopic().getUniqueVehicleId()); From 8b8f5bd062200bd3edb6d15b53715202c1f3f672 Mon Sep 17 00:00:00 2001 From: Jaakko Malkki Date: Wed, 19 Oct 2022 12:40:45 +0300 Subject: [PATCH 3/3] Create vehicle positions only for the vehicle that has smallest seq --- .../application/VehiclePositionHandler.java | 8 ++++--- .../application/utils/SeqCache.java | 24 +++++++++++++++++++ .../application/utils/SeqCacheTest.java | 18 ++++++++++++++ 3 files changed, 47 insertions(+), 3 deletions(-) create mode 100644 src/main/java/fi/hsl/transitdata/vehicleposition/application/utils/SeqCache.java create mode 100644 src/test/java/fi/hsl/transitdata/vehicleposition/application/utils/SeqCacheTest.java diff --git a/src/main/java/fi/hsl/transitdata/vehicleposition/application/VehiclePositionHandler.java b/src/main/java/fi/hsl/transitdata/vehicleposition/application/VehiclePositionHandler.java index 9866e4e..e5722a4 100644 --- a/src/main/java/fi/hsl/transitdata/vehicleposition/application/VehiclePositionHandler.java +++ b/src/main/java/fi/hsl/transitdata/vehicleposition/application/VehiclePositionHandler.java @@ -13,6 +13,7 @@ import fi.hsl.transitdata.vehicleposition.application.gtfsrt.GtfsRtGenerator; import fi.hsl.transitdata.vehicleposition.application.gtfsrt.GtfsRtOccupancyStatusHelper; import fi.hsl.transitdata.vehicleposition.application.utils.PassengerCountCache; +import fi.hsl.transitdata.vehicleposition.application.utils.SeqCache; import fi.hsl.transitdata.vehicleposition.application.utils.TripVehicleCache; import fi.hsl.transitdata.vehicleposition.application.utils.VehicleTimestampValidator; import org.apache.pulsar.client.api.*; @@ -37,6 +38,7 @@ public class VehiclePositionHandler implements IMessageHandler { private final Config config; private final TripVehicleCache tripVehicleCache; + private final SeqCache seqCache; private final StopStatusProcessor stopStatusProcessor; private final VehicleTimestampValidator vehicleTimestampValidator; @@ -55,6 +57,7 @@ public VehiclePositionHandler(final PulsarApplicationContext context) { config = context.getConfig(); tripVehicleCache = new TripVehicleCache(); + seqCache = new SeqCache(); stopStatusProcessor = new StopStatusProcessor(); vehicleTimestampValidator = new VehicleTimestampValidator(config.getDuration("processor.vehicleposition.maxTimeDifference", TimeUnit.SECONDS)); @@ -117,9 +120,8 @@ public void handleMessage(Message message) { return; } - //Ignore data from metro units other than the first - //Note that using tripVehicleCache would also produce one GTFS-RT vehicle position from multiple metro units, but we want to produce positions from the first unit - if (data.getTopic().getTransportMode() == Hfp.Topic.TransportMode.metro && data.getPayload().hasSeq() && data.getPayload().getSeq() != 1) { + //Produce vehicle positions only for the vehicle that has smallest seq + if (data.getPayload().hasSeq() && !seqCache.isSmallestSeq(data.getTopic().getUniqueVehicleId(), data.getPayload().getSeq())) { return; } diff --git a/src/main/java/fi/hsl/transitdata/vehicleposition/application/utils/SeqCache.java b/src/main/java/fi/hsl/transitdata/vehicleposition/application/utils/SeqCache.java new file mode 100644 index 0000000..817bda2 --- /dev/null +++ b/src/main/java/fi/hsl/transitdata/vehicleposition/application/utils/SeqCache.java @@ -0,0 +1,24 @@ +package fi.hsl.transitdata.vehicleposition.application.utils; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Scheduler; + +import java.util.concurrent.TimeUnit; + +/** + * Cache for saving smallest sequence number seen from a certain vehicle. + * This is needed for producing vehicle positions only from the first vehicle. + */ +public class SeqCache { + private final Cache smallestSeqCache = Caffeine + .newBuilder() + .expireAfterWrite(15, TimeUnit.MINUTES).scheduler(Scheduler.systemScheduler()) + .build(); + + public boolean isSmallestSeq(String uniqueVehicleId, int seq) { + final int smallestSeq = smallestSeqCache.asMap().compute(uniqueVehicleId, (key, prev) -> (prev == null || seq <= prev) ? seq : prev); + + return seq == smallestSeq; + } +} diff --git a/src/test/java/fi/hsl/transitdata/vehicleposition/application/utils/SeqCacheTest.java b/src/test/java/fi/hsl/transitdata/vehicleposition/application/utils/SeqCacheTest.java new file mode 100644 index 0000000..9a2dc31 --- /dev/null +++ b/src/test/java/fi/hsl/transitdata/vehicleposition/application/utils/SeqCacheTest.java @@ -0,0 +1,18 @@ +package fi.hsl.transitdata.vehicleposition.application.utils; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class SeqCacheTest { + @Test + public void testSeqCache() { + SeqCache seqCache = new SeqCache(); + + assertTrue(seqCache.isSmallestSeq("1", 7)); + assertFalse(seqCache.isSmallestSeq("1", 9)); + assertTrue(seqCache.isSmallestSeq("1", 7)); + assertTrue(seqCache.isSmallestSeq("1", 1)); + } +}