From bc0c0878d63306ea88487aee8f828c9afc1feb1d Mon Sep 17 00:00:00 2001 From: "p.zahnen" Date: Tue, 6 Jan 2026 14:01:22 +0100 Subject: [PATCH 1/3] refactor Redis job backend into modular operations --- .../redis/app/JobDetailsMapper.java | 85 +++++ .../redis/app/JobQueueBackendRedis.java | 317 ++++-------------- .../ii/xtraplatform/redis/app/RedisImpl.java | 30 +- .../redis/app/RedisJobLifecycleManager.java | 67 ++++ .../redis/app/RedisJobOperations.java | 66 ++++ .../redis/app/RedisJobSetOperations.java | 119 +++++++ .../redis/app/RedisMetadataOperations.java | 56 ++++ .../redis/app/RedisQueueOperations.java | 66 ++++ 8 files changed, 534 insertions(+), 272 deletions(-) create mode 100644 xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobDetailsMapper.java create mode 100644 xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobLifecycleManager.java create mode 100644 xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobOperations.java create mode 100644 xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobSetOperations.java create mode 100644 xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisMetadataOperations.java create mode 100644 xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisQueueOperations.java diff --git a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobDetailsMapper.java b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobDetailsMapper.java new file mode 100644 index 00000000..2c7de04c --- /dev/null +++ b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobDetailsMapper.java @@ -0,0 +1,85 @@ +/* + * Copyright 2026 interactive instruments GmbH + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package de.ii.xtraplatform.redis.app; + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.ii.xtraplatform.jobs.domain.Job; +import de.ii.xtraplatform.jobs.domain.Job.JobDetails; +import de.ii.xtraplatform.jobs.domain.JobSet; +import de.ii.xtraplatform.jobs.domain.JobSet.JobSetDetails; +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; +import javax.inject.Inject; +import javax.inject.Singleton; + +@Singleton +public class JobDetailsMapper { + private final ObjectMapper mapper; + private Function>> jobTypes; + + @Inject + JobDetailsMapper(ObjectMapper mapper) { + this.mapper = mapper; + this.jobTypes = type -> Optional.empty(); + } + + public void setJobTypes(Function>> jobTypesMapper) { + this.jobTypes = jobTypesMapper; + } + + public T getJobDetails(Class detailsType, Job job) { + return detailsType.cast(unpackDetails(job)); + } + + public T getJobSetDetails(Class detailsType, JobSet jobSet) { + return detailsType.cast(unpackSetDetails(jobSet)); + } + + private Object unpackDetails(Job job) { + if (Objects.nonNull(job) + && Objects.nonNull(job.getDetails()) + && job.getDetails() instanceof Map + && !((Map) job.getDetails()).isEmpty()) { + try { + Object details = + mapper.readValue( + mapper.writeValueAsBytes(job.getDetails()), + jobTypes.apply(job.getType()).orElseThrow()); + if (details instanceof JobDetails) { + return details; + } + } catch (IOException e) { + throw new IllegalStateException("Failed to deserialize job details", e); + } + } + return job.getDetails(); + } + + private Object unpackSetDetails(JobSet jobSet) { + if (Objects.nonNull(jobSet) + && Objects.nonNull(jobSet.getDetails()) + && jobSet.getDetails() instanceof Map + && !((Map) jobSet.getDetails()).isEmpty()) { + try { + Object details = + mapper.readValue( + mapper.writeValueAsBytes(jobSet.getDetails()), + jobTypes.apply(jobSet.getType()).orElseThrow()); + if (details instanceof JobSetDetails) { + return details; + } + } catch (IOException e) { + throw new IllegalStateException("Failed to deserialize job set details", e); + } + } + return jobSet.getDetails(); + } +} diff --git a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobQueueBackendRedis.java b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobQueueBackendRedis.java index d66fd414..79c665cd 100644 --- a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobQueueBackendRedis.java +++ b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobQueueBackendRedis.java @@ -7,43 +7,27 @@ */ package de.ii.xtraplatform.redis.app; -import static de.ii.xtraplatform.base.domain.util.JacksonModules.DESERIALIZE_IMMUTABLE_BUILDER_NESTED; - -import com.fasterxml.jackson.databind.ObjectMapper; import com.github.azahnen.dagger.annotations.AutoBind; import de.ii.xtraplatform.base.domain.AppContext; -import de.ii.xtraplatform.base.domain.Jackson; import de.ii.xtraplatform.base.domain.JobsConfiguration.QUEUE; import de.ii.xtraplatform.base.domain.resiliency.VolatileRegistry; import de.ii.xtraplatform.jobs.domain.AbstractJobQueueBackend; import de.ii.xtraplatform.jobs.domain.BaseJob; import de.ii.xtraplatform.jobs.domain.Job; -import de.ii.xtraplatform.jobs.domain.Job.JobDetails; import de.ii.xtraplatform.jobs.domain.JobQueueBackend; import de.ii.xtraplatform.jobs.domain.JobSet; -import de.ii.xtraplatform.jobs.domain.JobSet.JobSetDetails; import de.ii.xtraplatform.redis.domain.Redis; -import java.io.IOException; -import java.time.Instant; -import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Consumer; import java.util.function.Function; -import java.util.stream.IntStream; import javax.inject.Inject; import javax.inject.Singleton; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import redis.clients.jedis.args.ListDirection; -import redis.clients.jedis.json.Path; -import redis.clients.jedis.json.Path2; @Singleton @AutoBind(interfaces = JobQueueBackend.class) @@ -51,36 +35,41 @@ public class JobQueueBackendRedis extends AbstractJobQueueBackend implements JobQueueBackend { private static final Logger LOGGER = LoggerFactory.getLogger(JobQueueBackendRedis.class); - private static final List INITIAL_LEVELS = - IntStream.range(0, 24).map(i -> -1).boxed().toList(); private final boolean enabled; - private final Redis redis; - private final ObjectMapper mapper; - private Function>> jobTypes; + private final RedisJobOperations jobOps; + private final RedisJobSetOperations jobSetOps; + private final RedisQueueOperations queueOps; + private final RedisMetadataOperations metadataOps; + private final RedisJobLifecycleManager lifecycleManager; + private final JobDetailsMapper detailsMapper; @Inject JobQueueBackendRedis( - AppContext appContext, Jackson jackson, VolatileRegistry volatileRegistry, Redis redis) { + AppContext appContext, + VolatileRegistry volatileRegistry, + Redis redis, + RedisJobOperations jobOps, + RedisJobSetOperations jobSetOps, + RedisQueueOperations queueOps, + RedisMetadataOperations metadataOps, + RedisJobLifecycleManager lifecycleManager, + JobDetailsMapper detailsMapper) { super(volatileRegistry); - // TODO: housekeeping might check taken list using RPOPLPUSH with same source and destination - // this way it can check for timeouts, then use a transaction with LREM, LPUSH and HMSET to - // retry - this.enabled = appContext.getConfiguration().getJobs().getQueue() == QUEUE.REDIS; - this.redis = redis; - this.mapper = - jackson - .getDefaultObjectMapper() - .copy() - .registerModule(DESERIALIZE_IMMUTABLE_BUILDER_NESTED); - this.jobTypes = type -> Optional.empty(); + this.jobOps = jobOps; + this.jobSetOps = jobSetOps; + this.queueOps = queueOps; + this.metadataOps = metadataOps; + this.lifecycleManager = lifecycleManager; + this.detailsMapper = detailsMapper; - onVolatileStart(); + // Initialize cross-dependencies + jobSetOps.setDetailsMapper(detailsMapper); + onVolatileStart(); addSubcomponent(redis); - onVolatileStarted(); } @@ -91,233 +80,128 @@ public boolean isEnabled() { @Override public void setJobTypes(Function>> jobTypesMapper) { - this.jobTypes = jobTypesMapper; + detailsMapper.setJobTypes(jobTypesMapper); } @Override protected String createQueue(String type, int priority) { - redis.cmd().zadd("xtraplatform:jobs:priorities:" + type, priority, String.valueOf(priority)); - - return "xtraplatform:jobs:queue:" + type + ":" + priority; + return metadataOps.createQueue(type, priority); } @Override protected Set getTypes() { - return redis.cmd().keys("xtraplatform:jobs:priorities:*").stream() - .map(key -> key.substring("xtraplatform:jobs:priorities:".length())) - .collect(LinkedHashSet::new, LinkedHashSet::add, LinkedHashSet::addAll); + return metadataOps.getTypes(); } @Override protected Set getPriorities(String type) { - List priorities = redis.cmd().zrevrange("xtraplatform:jobs:priorities:" + type, 0, -1); - - return new LinkedHashSet<>(priorities.stream().map(Integer::parseInt).toList()); + return metadataOps.getPriorities(type); } @Override protected void updateJob(Job job) { - try { - redis.json().jsonSet("xtraplatform:jobs:job:" + job.getId(), mapper.writeValueAsString(job)); - } catch (Throwable e) { - throw new RuntimeException(e); - } + jobOps.updateJob(job); } @Override public void updateJob(Job job, int progressDelta) { - redis - .json() - .jsonNumIncrBy( - "xtraplatform:jobs:job:" + job.getId(), Path2.of("$.current"), progressDelta); - redis - .json() - .jsonSet( - "xtraplatform:jobs:job:" + job.getId(), - Path2.of("$.updatedAt"), - Instant.now().getEpochSecond()); + jobOps.updateJobProgress(job.getId(), progressDelta); } @Override protected void updateJobSet(JobSet jobSet) { - try { - redis - .json() - .jsonSet("xtraplatform:jobs:set:" + jobSet.getId(), mapper.writeValueAsString(jobSet)); - } catch (Throwable e) { - throw new RuntimeException(e); - } + jobSetOps.updateJobSet(jobSet); } @Override public void startJobSet(JobSet jobSet) { - redis - .json() - .jsonSet( - "xtraplatform:jobs:set:" + jobSet.getId(), - Path2.of("$.startedAt"), - Instant.now().getEpochSecond()); + jobSetOps.startJobSet(jobSet.getId()); } @Override public void initJobSet(JobSet jobSet, int progressDelta, Map detailParameters) { - Map jsonPathUpdates = new LinkedHashMap<>(); - jsonPathUpdates.put("$.total", progressDelta); - - JobSetDetails details = getJobSetDetails(JobSetDetails.class, jobSet); - jsonPathUpdates.putAll(details.initJson(detailParameters)); - - applyJsonPaths(jobSet.getId(), jsonPathUpdates); + jobSetOps.initJobSet(jobSet, progressDelta, detailParameters); } @Override public void updateJobSet(JobSet jobSet, int progressDelta, Map detailParameters) { - Map jsonPathUpdates = new LinkedHashMap<>(); - jsonPathUpdates.put("$.current", progressDelta); - jsonPathUpdates.put("$.updatedAt", Instant.now().getEpochSecond()); - - JobSetDetails details = getJobSetDetails(JobSetDetails.class, jobSet); - jsonPathUpdates.putAll(details.updateJson(detailParameters)); - - applyJsonPaths(jobSet.getId(), jsonPathUpdates); + jobSetOps.updateJobSetWithProgress(jobSet, progressDelta, detailParameters); } @Override protected Optional getJobSet(String setId) { - String jobSetJson = - redis.json().jsonGetAsPlainString("xtraplatform:jobs:set:" + setId, Path.ROOT_PATH); - - if (Objects.isNull(jobSetJson)) { - return Optional.empty(); - } - - try { - JobSet job = mapper.readValue(jobSetJson, JobSet.class); - - return Optional.ofNullable(job); - } catch (Throwable e) { - throw new RuntimeException(e); - } + return jobSetOps.getJobSet(setId); } @Override protected void queueJob(Job job, boolean untake) { String queue = getQueue(job.getType(), job.getPriority()); updateJob(job); - - if (untake) { - // TODO: use a transaction here - redis.cmd().lrem("xtraplatform:jobs:taken", 1, job.getId()); - redis.cmd().rpush(queue, job.getId()); - } else { - redis.cmd().lpush(queue, job.getId()); - } + queueOps.queueJob(queue, job.getId(), untake); } @Override protected Job resetJob(Job job, Optional jobSet) { - if (jobSet.isPresent()) { - jobSet.get().update(-(job.getCurrent().get())); - JobSetDetails details = getJobSetDetails(JobSetDetails.class, jobSet.get()); - details.reset(job); - updateJobSet(jobSet.get().with(details)); - } - - return job.reset(); + return lifecycleManager.resetJob(job, jobSet, jobSetOps); } @Override protected Job startJob(Job job, String executor) { - Job startedJob = job.started(executor); - - updateJob(startedJob); - - return startedJob; + return lifecycleManager.startJob(job, executor); } @Override protected Job failJob(Job job, String error) { - Job failedJob = job.failed(error); - - updateJob(failedJob); - - redis.cmd().rpush("xtraplatform:jobs:failed", job.getId()); - - return failedJob; + return lifecycleManager.failJob(job, error); } @Override protected Job doneJob(Job job) { - Job doneJob = job.done(); - - redis.json().jsonDel("xtraplatform:jobs:job:" + doneJob.getId()); - - return doneJob; + return lifecycleManager.doneJob(job); } @Override protected Optional takeJob(String queue) { - String jobId = - redis - .cmd() - .lmove(queue, "xtraplatform:jobs:taken", ListDirection.RIGHT, ListDirection.LEFT); - - if (Objects.nonNull(jobId)) { - return getJob(jobId); - } - - return Optional.empty(); + Optional jobId = queueOps.takeJob(queue); + return jobId.flatMap(this::getJob); } @Override protected Optional untakeJob(String jobId) { - long count = redis.cmd().lrem("xtraplatform:jobs:taken", 1, jobId); - - if (count > 0) { + if (queueOps.untakeJob(jobId)) { return getJob(jobId); } - return Optional.empty(); } @Override protected List onJobFinished(Job job, JobSet jobSet) { - List followUps = jobSet.done(job); - - redis.json().jsonDel("xtraplatform:jobs:job:" + job.getId()); - - return followUps; + return lifecycleManager.onJobFinished(job, jobSet); } @Override protected List getJobsInQueue(String queue) { - List jobIds = redis.cmd().lrange(queue, 0, -1); - List jobs = new ArrayList<>(); - - for (String jobId : jobIds) { - Optional job = getJob(jobId); - - job.ifPresent(jobs::add); - } - - return jobs; + List jobIds = queueOps.getJobsInQueue(queue); + return jobIds.stream() + .map(this::getJob) + .filter(Optional::isPresent) + .map(Optional::get) + .toList(); } @Override protected void notifyObservers(String type) { - redis.pubsub().publish("xtraplatform:jobs:notifications", type); + queueOps.notifyObservers(type); } @Override public void onPush(Consumer callback) { - redis.pubsub().subscribe("xtraplatform:jobs:notifications", callback); + queueOps.onPush(callback); } @Override public boolean doneSet(String jobSetId) { - long count = redis.json().jsonDel("xtraplatform:jobs:set:" + jobSetId); - - return count > 0; + return jobSetOps.deleteJobSet(jobSetId); } @Override @@ -328,114 +212,31 @@ public boolean error(String jobId, String error, boolean retry) { @Override public Collection getSets() { - Set jobSetIds = redis.cmd().keys("xtraplatform:jobs:set:*"); - - return jobSetIds.stream() - .map(id -> id.substring("xtraplatform:jobs:set:".length())) - .map(this::getJobSet) - .filter(Optional::isPresent) - .map(Optional::get) - .toList(); + return metadataOps.getSets(); } @Override protected List getTakenIds() { - return redis.cmd().lrange("xtraplatform:jobs:taken", 0, -1); + return queueOps.getTakenIds(); } @Override protected List getFailedIds() { - return redis.cmd().lrange("xtraplatform:jobs:failed", 0, -1); + return queueOps.getFailedIds(); } @Override protected Optional getJob(String jobId) { - String jobJson = - redis.json().jsonGetAsPlainString("xtraplatform:jobs:job:" + jobId, Path.ROOT_PATH); - - if (Objects.isNull(jobJson)) { - return Optional.empty(); - } - - try { - Job job = mapper.readValue(jobJson, Job.class); - - return Optional.ofNullable(job); - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - - private Object unpackDetails(Job job) { - if (Objects.nonNull(job) - && Objects.nonNull(job.getDetails()) - && job.getDetails() instanceof Map - && !((Map) job.getDetails()).isEmpty()) { - try { - Object details = - mapper.readValue( - mapper.writeValueAsBytes(job.getDetails()), - jobTypes.apply(job.getType()).orElseThrow()); - - if (details instanceof JobDetails) { - return details; - } - - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - return job.getDetails(); - } - - private Object unpackSetDetails(JobSet jobSet) { - if (Objects.nonNull(jobSet) - && Objects.nonNull(jobSet.getDetails()) - && jobSet.getDetails() instanceof Map - && !((Map) jobSet.getDetails()).isEmpty()) { - try { - Object details = - mapper.readValue( - mapper.writeValueAsBytes(jobSet.getDetails()), - jobTypes.apply(jobSet.getType()).orElseThrow()); - - if (details instanceof JobSetDetails) { - return details; - } - - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - return jobSet.getDetails(); + return jobOps.getJob(jobId); } @Override public T getJobDetails(Class detailsType, Job job) { - return detailsType.cast(unpackDetails(job)); + return detailsMapper.getJobDetails(detailsType, job); } @Override public T getJobSetDetails(Class detailsType, JobSet jobSet) { - return detailsType.cast(unpackSetDetails(jobSet)); - } - - private void applyJsonPaths(String jobSetId, Map jsonPathUpdates) { - for (Map.Entry entry : jsonPathUpdates.entrySet()) { - if (entry.getValue() instanceof Integer) { - redis - .json() - .jsonNumIncrBy( - "xtraplatform:jobs:set:" + jobSetId, - Path2.of(entry.getKey()), - (Integer) entry.getValue()); - continue; - } - redis - .json() - .jsonSet("xtraplatform:jobs:set:" + jobSetId, Path2.of(entry.getKey()), entry.getValue()); - } + return detailsMapper.getJobSetDetails(detailsType, jobSet); } } diff --git a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisImpl.java b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisImpl.java index 4193e147..d98086fb 100644 --- a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisImpl.java +++ b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisImpl.java @@ -75,23 +75,25 @@ public void onStop() { } @Override - protected synchronized void onVolatileStart() { + protected void onVolatileStart() { super.onVolatileStart(); - if (asyncStartup) { - if (getState() == State.UNAVAILABLE) { - LOGGER.warn("Could not establish connection to redis"); + synchronized (this) { + if (asyncStartup) { + if (getState() == State.UNAVAILABLE) { + LOGGER.warn("Could not establish connection to redis"); + } + + onStateChange( + (from, to) -> { + if (to == State.AVAILABLE) { + LOGGER.info("Re-established connection to redis"); + } else if (to == State.UNAVAILABLE) { + LOGGER.warn("Lost connection to redis"); + } + }, + false); } - - onStateChange( - (from, to) -> { - if (to == State.AVAILABLE) { - LOGGER.info("Re-established connection to redis"); - } else if (to == State.UNAVAILABLE) { - LOGGER.warn("Lost connection to redis"); - } - }, - false); } } diff --git a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobLifecycleManager.java b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobLifecycleManager.java new file mode 100644 index 00000000..5ea1a381 --- /dev/null +++ b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobLifecycleManager.java @@ -0,0 +1,67 @@ +/* + * Copyright 2026 interactive instruments GmbH + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package de.ii.xtraplatform.redis.app; + +import de.ii.xtraplatform.jobs.domain.BaseJob; +import de.ii.xtraplatform.jobs.domain.Job; +import de.ii.xtraplatform.jobs.domain.JobSet; +import de.ii.xtraplatform.jobs.domain.JobSet.JobSetDetails; +import de.ii.xtraplatform.redis.domain.Redis; +import java.util.List; +import java.util.Optional; +import javax.inject.Inject; +import javax.inject.Singleton; + +@Singleton +public class RedisJobLifecycleManager { + private final Redis redis; + private final RedisJobOperations jobOps; + private final JobDetailsMapper detailsMapper; + + @Inject + RedisJobLifecycleManager(Redis redis, RedisJobOperations jobOps, JobDetailsMapper detailsMapper) { + this.redis = redis; + this.jobOps = jobOps; + this.detailsMapper = detailsMapper; + } + + public Job resetJob(Job job, Optional jobSet, RedisJobSetOperations jobSetOps) { + if (jobSet.isPresent()) { + jobSet.get().update(-(job.getCurrent().get())); + JobSetDetails details = detailsMapper.getJobSetDetails(JobSetDetails.class, jobSet.get()); + details.reset(job); + jobSetOps.updateJobSet(jobSet.get().with(details)); + } + return job.reset(); + } + + public Job startJob(Job job, String executor) { + Job startedJob = job.started(executor); + jobOps.updateJob(startedJob); + return startedJob; + } + + public Job failJob(Job job, String error) { + Job failedJob = job.failed(error); + jobOps.updateJob(failedJob); + redis.cmd().rpush("xtraplatform:jobs:failed", job.getId()); + return failedJob; + } + + public Job doneJob(Job job) { + Job doneJob = job.done(); + jobOps.deleteJob(doneJob.getId()); + return doneJob; + } + + public List onJobFinished(Job job, JobSet jobSet) { + List followUps = jobSet.done(job); + jobOps.deleteJob(job.getId()); + return followUps; + } +} diff --git a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobOperations.java b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobOperations.java new file mode 100644 index 00000000..998d1584 --- /dev/null +++ b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobOperations.java @@ -0,0 +1,66 @@ +/* + * Copyright 2026 interactive instruments GmbH + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package de.ii.xtraplatform.redis.app; + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.ii.xtraplatform.jobs.domain.Job; +import de.ii.xtraplatform.redis.domain.Redis; +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; +import javax.inject.Inject; +import javax.inject.Singleton; +import redis.clients.jedis.json.Path; +import redis.clients.jedis.json.Path2; + +@Singleton +public class RedisJobOperations { + private final Redis redis; + private final ObjectMapper mapper; + private static final String JOB_KEY_PREFIX = "xtraplatform:jobs:job:"; + + @Inject + RedisJobOperations(Redis redis, ObjectMapper mapper) { + this.redis = redis; + this.mapper = mapper; + } + + public void updateJob(Job job) { + try { + redis.json().jsonSet(JOB_KEY_PREFIX + job.getId(), mapper.writeValueAsString(job)); + } catch (Throwable e) { + throw new IllegalStateException("Failed to update job", e); + } + } + + public void updateJobProgress(String jobId, int progressDelta) { + redis.json().jsonNumIncrBy(JOB_KEY_PREFIX + jobId, Path2.of("$.current"), progressDelta); + redis + .json() + .jsonSet(JOB_KEY_PREFIX + jobId, Path2.of("$.updatedAt"), Instant.now().getEpochSecond()); + } + + public Optional getJob(String jobId) { + String jobJson = redis.json().jsonGetAsPlainString(JOB_KEY_PREFIX + jobId, Path.ROOT_PATH); + + if (Objects.isNull(jobJson)) { + return Optional.empty(); + } + + try { + Job job = mapper.readValue(jobJson, Job.class); + return Optional.ofNullable(job); + } catch (Throwable e) { + throw new IllegalStateException("Failed to deserialize job", e); + } + } + + public void deleteJob(String jobId) { + redis.json().jsonDel(JOB_KEY_PREFIX + jobId); + } +} diff --git a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobSetOperations.java b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobSetOperations.java new file mode 100644 index 00000000..b2a7a58e --- /dev/null +++ b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobSetOperations.java @@ -0,0 +1,119 @@ +/* + * Copyright 2026 interactive instruments GmbH + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package de.ii.xtraplatform.redis.app; + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.ii.xtraplatform.jobs.domain.JobSet; +import de.ii.xtraplatform.jobs.domain.JobSet.JobSetDetails; +import de.ii.xtraplatform.redis.domain.Redis; +import java.time.Instant; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import javax.inject.Inject; +import javax.inject.Singleton; +import redis.clients.jedis.json.Path; +import redis.clients.jedis.json.Path2; + +@Singleton +public class RedisJobSetOperations { + private static final String JOB_SET_KEY_PREFIX = "xtraplatform:jobs:set:"; + private final Redis redis; + private final ObjectMapper mapper; + private JobDetailsMapper detailsMapper; + + @Inject + RedisJobSetOperations(Redis redis, ObjectMapper mapper) { + this.redis = redis; + this.mapper = mapper; + } + + public void setDetailsMapper(JobDetailsMapper detailsMapper) { + this.detailsMapper = detailsMapper; + } + + public void updateJobSet(JobSet jobSet) { + try { + redis.json().jsonSet(JOB_SET_KEY_PREFIX + jobSet.getId(), mapper.writeValueAsString(jobSet)); + } catch (Throwable e) { + throw new IllegalStateException("Failed to update job set", e); + } + } + + public void startJobSet(String jobSetId) { + redis + .json() + .jsonSet( + JOB_SET_KEY_PREFIX + jobSetId, Path2.of("$.startedAt"), Instant.now().getEpochSecond()); + } + + public void applyJsonPaths(String jobSetId, Map jsonPathUpdates) { + for (Map.Entry entry : jsonPathUpdates.entrySet()) { + if (entry.getValue() instanceof Integer) { + redis + .json() + .jsonNumIncrBy( + JOB_SET_KEY_PREFIX + jobSetId, + Path2.of(entry.getKey()), + (Integer) entry.getValue()); + continue; + } + redis + .json() + .jsonSet(JOB_SET_KEY_PREFIX + jobSetId, Path2.of(entry.getKey()), entry.getValue()); + } + } + + public Optional getJobSet(String setId) { + String jobSetJson = + redis.json().jsonGetAsPlainString(JOB_SET_KEY_PREFIX + setId, Path.ROOT_PATH); + + if (Objects.isNull(jobSetJson)) { + return Optional.empty(); + } + + try { + JobSet jobSet = mapper.readValue(jobSetJson, JobSet.class); + return Optional.ofNullable(jobSet); + } catch (Throwable e) { + throw new IllegalStateException("Failed to deserialize job set", e); + } + } + + public boolean deleteJobSet(String jobSetId) { + long count = redis.json().jsonDel(JOB_SET_KEY_PREFIX + jobSetId); + return count > 0; + } + + public void initJobSet(JobSet jobSet, int progressDelta, Map detailParameters) { + Map jsonPathUpdates = new LinkedHashMap<>(); + jsonPathUpdates.put("$.total", progressDelta); + + if (detailsMapper != null) { + JobSetDetails details = detailsMapper.getJobSetDetails(JobSetDetails.class, jobSet); + jsonPathUpdates.putAll(details.initJson(detailParameters)); + } + + applyJsonPaths(jobSet.getId(), jsonPathUpdates); + } + + public void updateJobSetWithProgress( + JobSet jobSet, int progressDelta, Map detailParameters) { + Map jsonPathUpdates = new LinkedHashMap<>(); + jsonPathUpdates.put("$.current", progressDelta); + jsonPathUpdates.put("$.updatedAt", Instant.now().getEpochSecond()); + + if (detailsMapper != null) { + JobSetDetails details = detailsMapper.getJobSetDetails(JobSetDetails.class, jobSet); + jsonPathUpdates.putAll(details.updateJson(detailParameters)); + } + + applyJsonPaths(jobSet.getId(), jsonPathUpdates); + } +} diff --git a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisMetadataOperations.java b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisMetadataOperations.java new file mode 100644 index 00000000..61d14e7c --- /dev/null +++ b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisMetadataOperations.java @@ -0,0 +1,56 @@ +/* + * Copyright 2026 interactive instruments GmbH + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package de.ii.xtraplatform.redis.app; + +import de.ii.xtraplatform.jobs.domain.JobSet; +import de.ii.xtraplatform.redis.domain.Redis; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import javax.inject.Inject; +import javax.inject.Singleton; + +@Singleton +public class RedisMetadataOperations { + private final Redis redis; + private final RedisJobSetOperations jobSetOps; + + @Inject + RedisMetadataOperations(Redis redis, RedisJobSetOperations jobSetOps) { + this.redis = redis; + this.jobSetOps = jobSetOps; + } + + public String createQueue(String type, int priority) { + redis.cmd().zadd("xtraplatform:jobs:priorities:" + type, priority, String.valueOf(priority)); + return "xtraplatform:jobs:queue:" + type + ":" + priority; + } + + public Set getTypes() { + return redis.cmd().keys("xtraplatform:jobs:priorities:*").stream() + .map(key -> key.substring("xtraplatform:jobs:priorities:".length())) + .collect(LinkedHashSet::new, LinkedHashSet::add, LinkedHashSet::addAll); + } + + public Set getPriorities(String type) { + List priorities = redis.cmd().zrevrange("xtraplatform:jobs:priorities:" + type, 0, -1); + return new LinkedHashSet<>(priorities.stream().map(Integer::parseInt).toList()); + } + + public Collection getSets() { + Set jobSetIds = redis.cmd().keys("xtraplatform:jobs:set:*"); + return jobSetIds.stream() + .map(id -> id.substring("xtraplatform:jobs:set:".length())) + .map(jobSetOps::getJobSet) + .filter(Optional::isPresent) + .map(Optional::get) + .toList(); + } +} diff --git a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisQueueOperations.java b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisQueueOperations.java new file mode 100644 index 00000000..3c713a2a --- /dev/null +++ b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisQueueOperations.java @@ -0,0 +1,66 @@ +/* + * Copyright 2026 interactive instruments GmbH + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package de.ii.xtraplatform.redis.app; + +import de.ii.xtraplatform.redis.domain.Redis; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; +import javax.inject.Inject; +import javax.inject.Singleton; +import redis.clients.jedis.args.ListDirection; + +@Singleton +public class RedisQueueOperations { + private static final String TAKEN_KEY = "xtraplatform:jobs:taken"; + private final Redis redis; + + @Inject + RedisQueueOperations(Redis redis) { + this.redis = redis; + } + + public void notifyObservers(String type) { + redis.pubsub().publish("xtraplatform:jobs:notifications", type); + } + + public void onPush(Consumer callback) { + redis.pubsub().subscribe("xtraplatform:jobs:notifications", callback); + } + + public void queueJob(String queue, String jobId, boolean untake) { + if (untake) { + redis.cmd().lrem(TAKEN_KEY, 1, jobId); + redis.cmd().rpush(queue, jobId); + } else { + redis.cmd().lpush(queue, jobId); + } + } + + public Optional takeJob(String queue) { + String jobId = redis.cmd().lmove(queue, TAKEN_KEY, ListDirection.RIGHT, ListDirection.LEFT); + return Optional.ofNullable(jobId); + } + + public boolean untakeJob(String jobId) { + long count = redis.cmd().lrem(TAKEN_KEY, 1, jobId); + return count > 0; + } + + public List getJobsInQueue(String queue) { + return redis.cmd().lrange(queue, 0, -1); + } + + public List getTakenIds() { + return redis.cmd().lrange(TAKEN_KEY, 0, -1); + } + + public List getFailedIds() { + return redis.cmd().lrange("xtraplatform:jobs:failed", 0, -1); + } +} From df14f499480a957c1b5606041731d6d141772b42 Mon Sep 17 00:00:00 2001 From: "p.zahnen" Date: Fri, 9 Jan 2026 11:06:36 +0100 Subject: [PATCH 2/3] Revert "refactor Redis job backend into modular operations" This reverts commit bc0c0878d63306ea88487aee8f828c9afc1feb1d. --- .../redis/app/JobDetailsMapper.java | 85 ----- .../redis/app/JobQueueBackendRedis.java | 317 ++++++++++++++---- .../ii/xtraplatform/redis/app/RedisImpl.java | 30 +- .../redis/app/RedisJobLifecycleManager.java | 67 ---- .../redis/app/RedisJobOperations.java | 66 ---- .../redis/app/RedisJobSetOperations.java | 119 ------- .../redis/app/RedisMetadataOperations.java | 56 ---- .../redis/app/RedisQueueOperations.java | 66 ---- 8 files changed, 272 insertions(+), 534 deletions(-) delete mode 100644 xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobDetailsMapper.java delete mode 100644 xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobLifecycleManager.java delete mode 100644 xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobOperations.java delete mode 100644 xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobSetOperations.java delete mode 100644 xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisMetadataOperations.java delete mode 100644 xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisQueueOperations.java diff --git a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobDetailsMapper.java b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobDetailsMapper.java deleted file mode 100644 index 2c7de04c..00000000 --- a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobDetailsMapper.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2026 interactive instruments GmbH - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ -package de.ii.xtraplatform.redis.app; - -import com.fasterxml.jackson.databind.ObjectMapper; -import de.ii.xtraplatform.jobs.domain.Job; -import de.ii.xtraplatform.jobs.domain.Job.JobDetails; -import de.ii.xtraplatform.jobs.domain.JobSet; -import de.ii.xtraplatform.jobs.domain.JobSet.JobSetDetails; -import java.io.IOException; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.function.Function; -import javax.inject.Inject; -import javax.inject.Singleton; - -@Singleton -public class JobDetailsMapper { - private final ObjectMapper mapper; - private Function>> jobTypes; - - @Inject - JobDetailsMapper(ObjectMapper mapper) { - this.mapper = mapper; - this.jobTypes = type -> Optional.empty(); - } - - public void setJobTypes(Function>> jobTypesMapper) { - this.jobTypes = jobTypesMapper; - } - - public T getJobDetails(Class detailsType, Job job) { - return detailsType.cast(unpackDetails(job)); - } - - public T getJobSetDetails(Class detailsType, JobSet jobSet) { - return detailsType.cast(unpackSetDetails(jobSet)); - } - - private Object unpackDetails(Job job) { - if (Objects.nonNull(job) - && Objects.nonNull(job.getDetails()) - && job.getDetails() instanceof Map - && !((Map) job.getDetails()).isEmpty()) { - try { - Object details = - mapper.readValue( - mapper.writeValueAsBytes(job.getDetails()), - jobTypes.apply(job.getType()).orElseThrow()); - if (details instanceof JobDetails) { - return details; - } - } catch (IOException e) { - throw new IllegalStateException("Failed to deserialize job details", e); - } - } - return job.getDetails(); - } - - private Object unpackSetDetails(JobSet jobSet) { - if (Objects.nonNull(jobSet) - && Objects.nonNull(jobSet.getDetails()) - && jobSet.getDetails() instanceof Map - && !((Map) jobSet.getDetails()).isEmpty()) { - try { - Object details = - mapper.readValue( - mapper.writeValueAsBytes(jobSet.getDetails()), - jobTypes.apply(jobSet.getType()).orElseThrow()); - if (details instanceof JobSetDetails) { - return details; - } - } catch (IOException e) { - throw new IllegalStateException("Failed to deserialize job set details", e); - } - } - return jobSet.getDetails(); - } -} diff --git a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobQueueBackendRedis.java b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobQueueBackendRedis.java index 79c665cd..d66fd414 100644 --- a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobQueueBackendRedis.java +++ b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobQueueBackendRedis.java @@ -7,27 +7,43 @@ */ package de.ii.xtraplatform.redis.app; +import static de.ii.xtraplatform.base.domain.util.JacksonModules.DESERIALIZE_IMMUTABLE_BUILDER_NESTED; + +import com.fasterxml.jackson.databind.ObjectMapper; import com.github.azahnen.dagger.annotations.AutoBind; import de.ii.xtraplatform.base.domain.AppContext; +import de.ii.xtraplatform.base.domain.Jackson; import de.ii.xtraplatform.base.domain.JobsConfiguration.QUEUE; import de.ii.xtraplatform.base.domain.resiliency.VolatileRegistry; import de.ii.xtraplatform.jobs.domain.AbstractJobQueueBackend; import de.ii.xtraplatform.jobs.domain.BaseJob; import de.ii.xtraplatform.jobs.domain.Job; +import de.ii.xtraplatform.jobs.domain.Job.JobDetails; import de.ii.xtraplatform.jobs.domain.JobQueueBackend; import de.ii.xtraplatform.jobs.domain.JobSet; +import de.ii.xtraplatform.jobs.domain.JobSet.JobSetDetails; import de.ii.xtraplatform.redis.domain.Redis; +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.IntStream; import javax.inject.Inject; import javax.inject.Singleton; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import redis.clients.jedis.args.ListDirection; +import redis.clients.jedis.json.Path; +import redis.clients.jedis.json.Path2; @Singleton @AutoBind(interfaces = JobQueueBackend.class) @@ -35,41 +51,36 @@ public class JobQueueBackendRedis extends AbstractJobQueueBackend implements JobQueueBackend { private static final Logger LOGGER = LoggerFactory.getLogger(JobQueueBackendRedis.class); + private static final List INITIAL_LEVELS = + IntStream.range(0, 24).map(i -> -1).boxed().toList(); private final boolean enabled; - private final RedisJobOperations jobOps; - private final RedisJobSetOperations jobSetOps; - private final RedisQueueOperations queueOps; - private final RedisMetadataOperations metadataOps; - private final RedisJobLifecycleManager lifecycleManager; - private final JobDetailsMapper detailsMapper; + private final Redis redis; + private final ObjectMapper mapper; + private Function>> jobTypes; @Inject JobQueueBackendRedis( - AppContext appContext, - VolatileRegistry volatileRegistry, - Redis redis, - RedisJobOperations jobOps, - RedisJobSetOperations jobSetOps, - RedisQueueOperations queueOps, - RedisMetadataOperations metadataOps, - RedisJobLifecycleManager lifecycleManager, - JobDetailsMapper detailsMapper) { + AppContext appContext, Jackson jackson, VolatileRegistry volatileRegistry, Redis redis) { super(volatileRegistry); - this.enabled = appContext.getConfiguration().getJobs().getQueue() == QUEUE.REDIS; - this.jobOps = jobOps; - this.jobSetOps = jobSetOps; - this.queueOps = queueOps; - this.metadataOps = metadataOps; - this.lifecycleManager = lifecycleManager; - this.detailsMapper = detailsMapper; + // TODO: housekeeping might check taken list using RPOPLPUSH with same source and destination + // this way it can check for timeouts, then use a transaction with LREM, LPUSH and HMSET to + // retry - // Initialize cross-dependencies - jobSetOps.setDetailsMapper(detailsMapper); + this.enabled = appContext.getConfiguration().getJobs().getQueue() == QUEUE.REDIS; + this.redis = redis; + this.mapper = + jackson + .getDefaultObjectMapper() + .copy() + .registerModule(DESERIALIZE_IMMUTABLE_BUILDER_NESTED); + this.jobTypes = type -> Optional.empty(); onVolatileStart(); + addSubcomponent(redis); + onVolatileStarted(); } @@ -80,128 +91,233 @@ public boolean isEnabled() { @Override public void setJobTypes(Function>> jobTypesMapper) { - detailsMapper.setJobTypes(jobTypesMapper); + this.jobTypes = jobTypesMapper; } @Override protected String createQueue(String type, int priority) { - return metadataOps.createQueue(type, priority); + redis.cmd().zadd("xtraplatform:jobs:priorities:" + type, priority, String.valueOf(priority)); + + return "xtraplatform:jobs:queue:" + type + ":" + priority; } @Override protected Set getTypes() { - return metadataOps.getTypes(); + return redis.cmd().keys("xtraplatform:jobs:priorities:*").stream() + .map(key -> key.substring("xtraplatform:jobs:priorities:".length())) + .collect(LinkedHashSet::new, LinkedHashSet::add, LinkedHashSet::addAll); } @Override protected Set getPriorities(String type) { - return metadataOps.getPriorities(type); + List priorities = redis.cmd().zrevrange("xtraplatform:jobs:priorities:" + type, 0, -1); + + return new LinkedHashSet<>(priorities.stream().map(Integer::parseInt).toList()); } @Override protected void updateJob(Job job) { - jobOps.updateJob(job); + try { + redis.json().jsonSet("xtraplatform:jobs:job:" + job.getId(), mapper.writeValueAsString(job)); + } catch (Throwable e) { + throw new RuntimeException(e); + } } @Override public void updateJob(Job job, int progressDelta) { - jobOps.updateJobProgress(job.getId(), progressDelta); + redis + .json() + .jsonNumIncrBy( + "xtraplatform:jobs:job:" + job.getId(), Path2.of("$.current"), progressDelta); + redis + .json() + .jsonSet( + "xtraplatform:jobs:job:" + job.getId(), + Path2.of("$.updatedAt"), + Instant.now().getEpochSecond()); } @Override protected void updateJobSet(JobSet jobSet) { - jobSetOps.updateJobSet(jobSet); + try { + redis + .json() + .jsonSet("xtraplatform:jobs:set:" + jobSet.getId(), mapper.writeValueAsString(jobSet)); + } catch (Throwable e) { + throw new RuntimeException(e); + } } @Override public void startJobSet(JobSet jobSet) { - jobSetOps.startJobSet(jobSet.getId()); + redis + .json() + .jsonSet( + "xtraplatform:jobs:set:" + jobSet.getId(), + Path2.of("$.startedAt"), + Instant.now().getEpochSecond()); } @Override public void initJobSet(JobSet jobSet, int progressDelta, Map detailParameters) { - jobSetOps.initJobSet(jobSet, progressDelta, detailParameters); + Map jsonPathUpdates = new LinkedHashMap<>(); + jsonPathUpdates.put("$.total", progressDelta); + + JobSetDetails details = getJobSetDetails(JobSetDetails.class, jobSet); + jsonPathUpdates.putAll(details.initJson(detailParameters)); + + applyJsonPaths(jobSet.getId(), jsonPathUpdates); } @Override public void updateJobSet(JobSet jobSet, int progressDelta, Map detailParameters) { - jobSetOps.updateJobSetWithProgress(jobSet, progressDelta, detailParameters); + Map jsonPathUpdates = new LinkedHashMap<>(); + jsonPathUpdates.put("$.current", progressDelta); + jsonPathUpdates.put("$.updatedAt", Instant.now().getEpochSecond()); + + JobSetDetails details = getJobSetDetails(JobSetDetails.class, jobSet); + jsonPathUpdates.putAll(details.updateJson(detailParameters)); + + applyJsonPaths(jobSet.getId(), jsonPathUpdates); } @Override protected Optional getJobSet(String setId) { - return jobSetOps.getJobSet(setId); + String jobSetJson = + redis.json().jsonGetAsPlainString("xtraplatform:jobs:set:" + setId, Path.ROOT_PATH); + + if (Objects.isNull(jobSetJson)) { + return Optional.empty(); + } + + try { + JobSet job = mapper.readValue(jobSetJson, JobSet.class); + + return Optional.ofNullable(job); + } catch (Throwable e) { + throw new RuntimeException(e); + } } @Override protected void queueJob(Job job, boolean untake) { String queue = getQueue(job.getType(), job.getPriority()); updateJob(job); - queueOps.queueJob(queue, job.getId(), untake); + + if (untake) { + // TODO: use a transaction here + redis.cmd().lrem("xtraplatform:jobs:taken", 1, job.getId()); + redis.cmd().rpush(queue, job.getId()); + } else { + redis.cmd().lpush(queue, job.getId()); + } } @Override protected Job resetJob(Job job, Optional jobSet) { - return lifecycleManager.resetJob(job, jobSet, jobSetOps); + if (jobSet.isPresent()) { + jobSet.get().update(-(job.getCurrent().get())); + JobSetDetails details = getJobSetDetails(JobSetDetails.class, jobSet.get()); + details.reset(job); + updateJobSet(jobSet.get().with(details)); + } + + return job.reset(); } @Override protected Job startJob(Job job, String executor) { - return lifecycleManager.startJob(job, executor); + Job startedJob = job.started(executor); + + updateJob(startedJob); + + return startedJob; } @Override protected Job failJob(Job job, String error) { - return lifecycleManager.failJob(job, error); + Job failedJob = job.failed(error); + + updateJob(failedJob); + + redis.cmd().rpush("xtraplatform:jobs:failed", job.getId()); + + return failedJob; } @Override protected Job doneJob(Job job) { - return lifecycleManager.doneJob(job); + Job doneJob = job.done(); + + redis.json().jsonDel("xtraplatform:jobs:job:" + doneJob.getId()); + + return doneJob; } @Override protected Optional takeJob(String queue) { - Optional jobId = queueOps.takeJob(queue); - return jobId.flatMap(this::getJob); + String jobId = + redis + .cmd() + .lmove(queue, "xtraplatform:jobs:taken", ListDirection.RIGHT, ListDirection.LEFT); + + if (Objects.nonNull(jobId)) { + return getJob(jobId); + } + + return Optional.empty(); } @Override protected Optional untakeJob(String jobId) { - if (queueOps.untakeJob(jobId)) { + long count = redis.cmd().lrem("xtraplatform:jobs:taken", 1, jobId); + + if (count > 0) { return getJob(jobId); } + return Optional.empty(); } @Override protected List onJobFinished(Job job, JobSet jobSet) { - return lifecycleManager.onJobFinished(job, jobSet); + List followUps = jobSet.done(job); + + redis.json().jsonDel("xtraplatform:jobs:job:" + job.getId()); + + return followUps; } @Override protected List getJobsInQueue(String queue) { - List jobIds = queueOps.getJobsInQueue(queue); - return jobIds.stream() - .map(this::getJob) - .filter(Optional::isPresent) - .map(Optional::get) - .toList(); + List jobIds = redis.cmd().lrange(queue, 0, -1); + List jobs = new ArrayList<>(); + + for (String jobId : jobIds) { + Optional job = getJob(jobId); + + job.ifPresent(jobs::add); + } + + return jobs; } @Override protected void notifyObservers(String type) { - queueOps.notifyObservers(type); + redis.pubsub().publish("xtraplatform:jobs:notifications", type); } @Override public void onPush(Consumer callback) { - queueOps.onPush(callback); + redis.pubsub().subscribe("xtraplatform:jobs:notifications", callback); } @Override public boolean doneSet(String jobSetId) { - return jobSetOps.deleteJobSet(jobSetId); + long count = redis.json().jsonDel("xtraplatform:jobs:set:" + jobSetId); + + return count > 0; } @Override @@ -212,31 +328,114 @@ public boolean error(String jobId, String error, boolean retry) { @Override public Collection getSets() { - return metadataOps.getSets(); + Set jobSetIds = redis.cmd().keys("xtraplatform:jobs:set:*"); + + return jobSetIds.stream() + .map(id -> id.substring("xtraplatform:jobs:set:".length())) + .map(this::getJobSet) + .filter(Optional::isPresent) + .map(Optional::get) + .toList(); } @Override protected List getTakenIds() { - return queueOps.getTakenIds(); + return redis.cmd().lrange("xtraplatform:jobs:taken", 0, -1); } @Override protected List getFailedIds() { - return queueOps.getFailedIds(); + return redis.cmd().lrange("xtraplatform:jobs:failed", 0, -1); } @Override protected Optional getJob(String jobId) { - return jobOps.getJob(jobId); + String jobJson = + redis.json().jsonGetAsPlainString("xtraplatform:jobs:job:" + jobId, Path.ROOT_PATH); + + if (Objects.isNull(jobJson)) { + return Optional.empty(); + } + + try { + Job job = mapper.readValue(jobJson, Job.class); + + return Optional.ofNullable(job); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + private Object unpackDetails(Job job) { + if (Objects.nonNull(job) + && Objects.nonNull(job.getDetails()) + && job.getDetails() instanceof Map + && !((Map) job.getDetails()).isEmpty()) { + try { + Object details = + mapper.readValue( + mapper.writeValueAsBytes(job.getDetails()), + jobTypes.apply(job.getType()).orElseThrow()); + + if (details instanceof JobDetails) { + return details; + } + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + return job.getDetails(); + } + + private Object unpackSetDetails(JobSet jobSet) { + if (Objects.nonNull(jobSet) + && Objects.nonNull(jobSet.getDetails()) + && jobSet.getDetails() instanceof Map + && !((Map) jobSet.getDetails()).isEmpty()) { + try { + Object details = + mapper.readValue( + mapper.writeValueAsBytes(jobSet.getDetails()), + jobTypes.apply(jobSet.getType()).orElseThrow()); + + if (details instanceof JobSetDetails) { + return details; + } + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + return jobSet.getDetails(); } @Override public T getJobDetails(Class detailsType, Job job) { - return detailsMapper.getJobDetails(detailsType, job); + return detailsType.cast(unpackDetails(job)); } @Override public T getJobSetDetails(Class detailsType, JobSet jobSet) { - return detailsMapper.getJobSetDetails(detailsType, jobSet); + return detailsType.cast(unpackSetDetails(jobSet)); + } + + private void applyJsonPaths(String jobSetId, Map jsonPathUpdates) { + for (Map.Entry entry : jsonPathUpdates.entrySet()) { + if (entry.getValue() instanceof Integer) { + redis + .json() + .jsonNumIncrBy( + "xtraplatform:jobs:set:" + jobSetId, + Path2.of(entry.getKey()), + (Integer) entry.getValue()); + continue; + } + redis + .json() + .jsonSet("xtraplatform:jobs:set:" + jobSetId, Path2.of(entry.getKey()), entry.getValue()); + } } } diff --git a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisImpl.java b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisImpl.java index d98086fb..4193e147 100644 --- a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisImpl.java +++ b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisImpl.java @@ -75,25 +75,23 @@ public void onStop() { } @Override - protected void onVolatileStart() { + protected synchronized void onVolatileStart() { super.onVolatileStart(); - synchronized (this) { - if (asyncStartup) { - if (getState() == State.UNAVAILABLE) { - LOGGER.warn("Could not establish connection to redis"); - } - - onStateChange( - (from, to) -> { - if (to == State.AVAILABLE) { - LOGGER.info("Re-established connection to redis"); - } else if (to == State.UNAVAILABLE) { - LOGGER.warn("Lost connection to redis"); - } - }, - false); + if (asyncStartup) { + if (getState() == State.UNAVAILABLE) { + LOGGER.warn("Could not establish connection to redis"); } + + onStateChange( + (from, to) -> { + if (to == State.AVAILABLE) { + LOGGER.info("Re-established connection to redis"); + } else if (to == State.UNAVAILABLE) { + LOGGER.warn("Lost connection to redis"); + } + }, + false); } } diff --git a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobLifecycleManager.java b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobLifecycleManager.java deleted file mode 100644 index 5ea1a381..00000000 --- a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobLifecycleManager.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2026 interactive instruments GmbH - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ -package de.ii.xtraplatform.redis.app; - -import de.ii.xtraplatform.jobs.domain.BaseJob; -import de.ii.xtraplatform.jobs.domain.Job; -import de.ii.xtraplatform.jobs.domain.JobSet; -import de.ii.xtraplatform.jobs.domain.JobSet.JobSetDetails; -import de.ii.xtraplatform.redis.domain.Redis; -import java.util.List; -import java.util.Optional; -import javax.inject.Inject; -import javax.inject.Singleton; - -@Singleton -public class RedisJobLifecycleManager { - private final Redis redis; - private final RedisJobOperations jobOps; - private final JobDetailsMapper detailsMapper; - - @Inject - RedisJobLifecycleManager(Redis redis, RedisJobOperations jobOps, JobDetailsMapper detailsMapper) { - this.redis = redis; - this.jobOps = jobOps; - this.detailsMapper = detailsMapper; - } - - public Job resetJob(Job job, Optional jobSet, RedisJobSetOperations jobSetOps) { - if (jobSet.isPresent()) { - jobSet.get().update(-(job.getCurrent().get())); - JobSetDetails details = detailsMapper.getJobSetDetails(JobSetDetails.class, jobSet.get()); - details.reset(job); - jobSetOps.updateJobSet(jobSet.get().with(details)); - } - return job.reset(); - } - - public Job startJob(Job job, String executor) { - Job startedJob = job.started(executor); - jobOps.updateJob(startedJob); - return startedJob; - } - - public Job failJob(Job job, String error) { - Job failedJob = job.failed(error); - jobOps.updateJob(failedJob); - redis.cmd().rpush("xtraplatform:jobs:failed", job.getId()); - return failedJob; - } - - public Job doneJob(Job job) { - Job doneJob = job.done(); - jobOps.deleteJob(doneJob.getId()); - return doneJob; - } - - public List onJobFinished(Job job, JobSet jobSet) { - List followUps = jobSet.done(job); - jobOps.deleteJob(job.getId()); - return followUps; - } -} diff --git a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobOperations.java b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobOperations.java deleted file mode 100644 index 998d1584..00000000 --- a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobOperations.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2026 interactive instruments GmbH - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ -package de.ii.xtraplatform.redis.app; - -import com.fasterxml.jackson.databind.ObjectMapper; -import de.ii.xtraplatform.jobs.domain.Job; -import de.ii.xtraplatform.redis.domain.Redis; -import java.time.Instant; -import java.util.Objects; -import java.util.Optional; -import javax.inject.Inject; -import javax.inject.Singleton; -import redis.clients.jedis.json.Path; -import redis.clients.jedis.json.Path2; - -@Singleton -public class RedisJobOperations { - private final Redis redis; - private final ObjectMapper mapper; - private static final String JOB_KEY_PREFIX = "xtraplatform:jobs:job:"; - - @Inject - RedisJobOperations(Redis redis, ObjectMapper mapper) { - this.redis = redis; - this.mapper = mapper; - } - - public void updateJob(Job job) { - try { - redis.json().jsonSet(JOB_KEY_PREFIX + job.getId(), mapper.writeValueAsString(job)); - } catch (Throwable e) { - throw new IllegalStateException("Failed to update job", e); - } - } - - public void updateJobProgress(String jobId, int progressDelta) { - redis.json().jsonNumIncrBy(JOB_KEY_PREFIX + jobId, Path2.of("$.current"), progressDelta); - redis - .json() - .jsonSet(JOB_KEY_PREFIX + jobId, Path2.of("$.updatedAt"), Instant.now().getEpochSecond()); - } - - public Optional getJob(String jobId) { - String jobJson = redis.json().jsonGetAsPlainString(JOB_KEY_PREFIX + jobId, Path.ROOT_PATH); - - if (Objects.isNull(jobJson)) { - return Optional.empty(); - } - - try { - Job job = mapper.readValue(jobJson, Job.class); - return Optional.ofNullable(job); - } catch (Throwable e) { - throw new IllegalStateException("Failed to deserialize job", e); - } - } - - public void deleteJob(String jobId) { - redis.json().jsonDel(JOB_KEY_PREFIX + jobId); - } -} diff --git a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobSetOperations.java b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobSetOperations.java deleted file mode 100644 index b2a7a58e..00000000 --- a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisJobSetOperations.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright 2026 interactive instruments GmbH - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ -package de.ii.xtraplatform.redis.app; - -import com.fasterxml.jackson.databind.ObjectMapper; -import de.ii.xtraplatform.jobs.domain.JobSet; -import de.ii.xtraplatform.jobs.domain.JobSet.JobSetDetails; -import de.ii.xtraplatform.redis.domain.Redis; -import java.time.Instant; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import javax.inject.Inject; -import javax.inject.Singleton; -import redis.clients.jedis.json.Path; -import redis.clients.jedis.json.Path2; - -@Singleton -public class RedisJobSetOperations { - private static final String JOB_SET_KEY_PREFIX = "xtraplatform:jobs:set:"; - private final Redis redis; - private final ObjectMapper mapper; - private JobDetailsMapper detailsMapper; - - @Inject - RedisJobSetOperations(Redis redis, ObjectMapper mapper) { - this.redis = redis; - this.mapper = mapper; - } - - public void setDetailsMapper(JobDetailsMapper detailsMapper) { - this.detailsMapper = detailsMapper; - } - - public void updateJobSet(JobSet jobSet) { - try { - redis.json().jsonSet(JOB_SET_KEY_PREFIX + jobSet.getId(), mapper.writeValueAsString(jobSet)); - } catch (Throwable e) { - throw new IllegalStateException("Failed to update job set", e); - } - } - - public void startJobSet(String jobSetId) { - redis - .json() - .jsonSet( - JOB_SET_KEY_PREFIX + jobSetId, Path2.of("$.startedAt"), Instant.now().getEpochSecond()); - } - - public void applyJsonPaths(String jobSetId, Map jsonPathUpdates) { - for (Map.Entry entry : jsonPathUpdates.entrySet()) { - if (entry.getValue() instanceof Integer) { - redis - .json() - .jsonNumIncrBy( - JOB_SET_KEY_PREFIX + jobSetId, - Path2.of(entry.getKey()), - (Integer) entry.getValue()); - continue; - } - redis - .json() - .jsonSet(JOB_SET_KEY_PREFIX + jobSetId, Path2.of(entry.getKey()), entry.getValue()); - } - } - - public Optional getJobSet(String setId) { - String jobSetJson = - redis.json().jsonGetAsPlainString(JOB_SET_KEY_PREFIX + setId, Path.ROOT_PATH); - - if (Objects.isNull(jobSetJson)) { - return Optional.empty(); - } - - try { - JobSet jobSet = mapper.readValue(jobSetJson, JobSet.class); - return Optional.ofNullable(jobSet); - } catch (Throwable e) { - throw new IllegalStateException("Failed to deserialize job set", e); - } - } - - public boolean deleteJobSet(String jobSetId) { - long count = redis.json().jsonDel(JOB_SET_KEY_PREFIX + jobSetId); - return count > 0; - } - - public void initJobSet(JobSet jobSet, int progressDelta, Map detailParameters) { - Map jsonPathUpdates = new LinkedHashMap<>(); - jsonPathUpdates.put("$.total", progressDelta); - - if (detailsMapper != null) { - JobSetDetails details = detailsMapper.getJobSetDetails(JobSetDetails.class, jobSet); - jsonPathUpdates.putAll(details.initJson(detailParameters)); - } - - applyJsonPaths(jobSet.getId(), jsonPathUpdates); - } - - public void updateJobSetWithProgress( - JobSet jobSet, int progressDelta, Map detailParameters) { - Map jsonPathUpdates = new LinkedHashMap<>(); - jsonPathUpdates.put("$.current", progressDelta); - jsonPathUpdates.put("$.updatedAt", Instant.now().getEpochSecond()); - - if (detailsMapper != null) { - JobSetDetails details = detailsMapper.getJobSetDetails(JobSetDetails.class, jobSet); - jsonPathUpdates.putAll(details.updateJson(detailParameters)); - } - - applyJsonPaths(jobSet.getId(), jsonPathUpdates); - } -} diff --git a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisMetadataOperations.java b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisMetadataOperations.java deleted file mode 100644 index 61d14e7c..00000000 --- a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisMetadataOperations.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2026 interactive instruments GmbH - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ -package de.ii.xtraplatform.redis.app; - -import de.ii.xtraplatform.jobs.domain.JobSet; -import de.ii.xtraplatform.redis.domain.Redis; -import java.util.Collection; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import javax.inject.Inject; -import javax.inject.Singleton; - -@Singleton -public class RedisMetadataOperations { - private final Redis redis; - private final RedisJobSetOperations jobSetOps; - - @Inject - RedisMetadataOperations(Redis redis, RedisJobSetOperations jobSetOps) { - this.redis = redis; - this.jobSetOps = jobSetOps; - } - - public String createQueue(String type, int priority) { - redis.cmd().zadd("xtraplatform:jobs:priorities:" + type, priority, String.valueOf(priority)); - return "xtraplatform:jobs:queue:" + type + ":" + priority; - } - - public Set getTypes() { - return redis.cmd().keys("xtraplatform:jobs:priorities:*").stream() - .map(key -> key.substring("xtraplatform:jobs:priorities:".length())) - .collect(LinkedHashSet::new, LinkedHashSet::add, LinkedHashSet::addAll); - } - - public Set getPriorities(String type) { - List priorities = redis.cmd().zrevrange("xtraplatform:jobs:priorities:" + type, 0, -1); - return new LinkedHashSet<>(priorities.stream().map(Integer::parseInt).toList()); - } - - public Collection getSets() { - Set jobSetIds = redis.cmd().keys("xtraplatform:jobs:set:*"); - return jobSetIds.stream() - .map(id -> id.substring("xtraplatform:jobs:set:".length())) - .map(jobSetOps::getJobSet) - .filter(Optional::isPresent) - .map(Optional::get) - .toList(); - } -} diff --git a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisQueueOperations.java b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisQueueOperations.java deleted file mode 100644 index 3c713a2a..00000000 --- a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisQueueOperations.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2026 interactive instruments GmbH - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ -package de.ii.xtraplatform.redis.app; - -import de.ii.xtraplatform.redis.domain.Redis; -import java.util.List; -import java.util.Optional; -import java.util.function.Consumer; -import javax.inject.Inject; -import javax.inject.Singleton; -import redis.clients.jedis.args.ListDirection; - -@Singleton -public class RedisQueueOperations { - private static final String TAKEN_KEY = "xtraplatform:jobs:taken"; - private final Redis redis; - - @Inject - RedisQueueOperations(Redis redis) { - this.redis = redis; - } - - public void notifyObservers(String type) { - redis.pubsub().publish("xtraplatform:jobs:notifications", type); - } - - public void onPush(Consumer callback) { - redis.pubsub().subscribe("xtraplatform:jobs:notifications", callback); - } - - public void queueJob(String queue, String jobId, boolean untake) { - if (untake) { - redis.cmd().lrem(TAKEN_KEY, 1, jobId); - redis.cmd().rpush(queue, jobId); - } else { - redis.cmd().lpush(queue, jobId); - } - } - - public Optional takeJob(String queue) { - String jobId = redis.cmd().lmove(queue, TAKEN_KEY, ListDirection.RIGHT, ListDirection.LEFT); - return Optional.ofNullable(jobId); - } - - public boolean untakeJob(String jobId) { - long count = redis.cmd().lrem(TAKEN_KEY, 1, jobId); - return count > 0; - } - - public List getJobsInQueue(String queue) { - return redis.cmd().lrange(queue, 0, -1); - } - - public List getTakenIds() { - return redis.cmd().lrange(TAKEN_KEY, 0, -1); - } - - public List getFailedIds() { - return redis.cmd().lrange("xtraplatform:jobs:failed", 0, -1); - } -} From 755cb20c512f4bde519142b90b5dbe7a7af5b23a Mon Sep 17 00:00:00 2001 From: "p.zahnen" Date: Tue, 13 Jan 2026 17:31:14 +0100 Subject: [PATCH 3/3] refactor Redis and RunnerRx classes for clarity and maintainability --- .../redis/app/JobQueueBackendRedis.java | 100 +++++++++--------- .../ii/xtraplatform/redis/app/RedisImpl.java | 3 +- .../xtraplatform/streams/app/ReactiveRx.java | 1 + .../ii/xtraplatform/streams/app/RunnerRx.java | 31 +++--- 4 files changed, 64 insertions(+), 71 deletions(-) diff --git a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobQueueBackendRedis.java b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobQueueBackendRedis.java index d66fd414..351b93b0 100644 --- a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobQueueBackendRedis.java +++ b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/JobQueueBackendRedis.java @@ -47,6 +47,7 @@ @Singleton @AutoBind(interfaces = JobQueueBackend.class) +@SuppressWarnings("PMD.TooManyMethods") public class JobQueueBackendRedis extends AbstractJobQueueBackend implements JobQueueBackend { @@ -54,6 +55,15 @@ public class JobQueueBackendRedis extends AbstractJobQueueBackend private static final List INITIAL_LEVELS = IntStream.range(0, 24).map(i -> -1).boxed().toList(); + // Redis key constants + private static final String REDIS_KEY_PRIORITIES = "xtraplatform:jobs:priorities:"; + private static final String REDIS_KEY_QUEUE = "xtraplatform:jobs:queue:"; + private static final String REDIS_KEY_JOB = "xtraplatform:jobs:job:"; + private static final String REDIS_KEY_SET = "xtraplatform:jobs:set:"; + private static final String REDIS_KEY_TAKEN = "xtraplatform:jobs:taken"; + private static final String REDIS_KEY_FAILED = "xtraplatform:jobs:failed"; + private static final String REDIS_KEY_NOTIFICATIONS = "xtraplatform:jobs:notifications"; + private final boolean enabled; private final Redis redis; private final ObjectMapper mapper; @@ -64,7 +74,8 @@ public class JobQueueBackendRedis extends AbstractJobQueueBackend AppContext appContext, Jackson jackson, VolatileRegistry volatileRegistry, Redis redis) { super(volatileRegistry); - // TODO: housekeeping might check taken list using RPOPLPUSH with same source and destination + // NOPMD - TODO: housekeeping might check taken list using RPOPLPUSH with same source and + // destination // this way it can check for timeouts, then use a transaction with LREM, LPUSH and HMSET to // retry @@ -96,21 +107,21 @@ public void setJobTypes(Function>> jobTypesM @Override protected String createQueue(String type, int priority) { - redis.cmd().zadd("xtraplatform:jobs:priorities:" + type, priority, String.valueOf(priority)); + redis.cmd().zadd(REDIS_KEY_PRIORITIES + type, priority, String.valueOf(priority)); - return "xtraplatform:jobs:queue:" + type + ":" + priority; + return REDIS_KEY_QUEUE + type + ":" + priority; } @Override protected Set getTypes() { - return redis.cmd().keys("xtraplatform:jobs:priorities:*").stream() - .map(key -> key.substring("xtraplatform:jobs:priorities:".length())) + return redis.cmd().keys(REDIS_KEY_PRIORITIES + "*").stream() + .map(key -> key.substring(REDIS_KEY_PRIORITIES.length())) .collect(LinkedHashSet::new, LinkedHashSet::add, LinkedHashSet::addAll); } @Override protected Set getPriorities(String type) { - List priorities = redis.cmd().zrevrange("xtraplatform:jobs:priorities:" + type, 0, -1); + List priorities = redis.cmd().zrevrange(REDIS_KEY_PRIORITIES + type, 0, -1); return new LinkedHashSet<>(priorities.stream().map(Integer::parseInt).toList()); } @@ -118,34 +129,27 @@ protected Set getPriorities(String type) { @Override protected void updateJob(Job job) { try { - redis.json().jsonSet("xtraplatform:jobs:job:" + job.getId(), mapper.writeValueAsString(job)); + redis.json().jsonSet(REDIS_KEY_JOB + job.getId(), mapper.writeValueAsString(job)); } catch (Throwable e) { - throw new RuntimeException(e); + throw new IllegalStateException("Failed to serialize job to JSON: " + job.getId(), e); } } @Override public void updateJob(Job job, int progressDelta) { - redis - .json() - .jsonNumIncrBy( - "xtraplatform:jobs:job:" + job.getId(), Path2.of("$.current"), progressDelta); + redis.json().jsonNumIncrBy(REDIS_KEY_JOB + job.getId(), Path2.of("$.current"), progressDelta); redis .json() .jsonSet( - "xtraplatform:jobs:job:" + job.getId(), - Path2.of("$.updatedAt"), - Instant.now().getEpochSecond()); + REDIS_KEY_JOB + job.getId(), Path2.of("$.updatedAt"), Instant.now().getEpochSecond()); } @Override protected void updateJobSet(JobSet jobSet) { try { - redis - .json() - .jsonSet("xtraplatform:jobs:set:" + jobSet.getId(), mapper.writeValueAsString(jobSet)); + redis.json().jsonSet(REDIS_KEY_SET + jobSet.getId(), mapper.writeValueAsString(jobSet)); } catch (Throwable e) { - throw new RuntimeException(e); + throw new IllegalStateException("Failed to serialize job set to JSON: " + jobSet.getId(), e); } } @@ -154,7 +158,7 @@ public void startJobSet(JobSet jobSet) { redis .json() .jsonSet( - "xtraplatform:jobs:set:" + jobSet.getId(), + REDIS_KEY_SET + jobSet.getId(), Path2.of("$.startedAt"), Instant.now().getEpochSecond()); } @@ -184,8 +188,7 @@ public void updateJobSet(JobSet jobSet, int progressDelta, Map d @Override protected Optional getJobSet(String setId) { - String jobSetJson = - redis.json().jsonGetAsPlainString("xtraplatform:jobs:set:" + setId, Path.ROOT_PATH); + String jobSetJson = redis.json().jsonGetAsPlainString(REDIS_KEY_SET + setId, Path.ROOT_PATH); if (Objects.isNull(jobSetJson)) { return Optional.empty(); @@ -196,7 +199,7 @@ protected Optional getJobSet(String setId) { return Optional.ofNullable(job); } catch (Throwable e) { - throw new RuntimeException(e); + throw new IllegalStateException("Failed to deserialize job set from JSON: " + setId, e); } } @@ -206,8 +209,8 @@ protected void queueJob(Job job, boolean untake) { updateJob(job); if (untake) { - // TODO: use a transaction here - redis.cmd().lrem("xtraplatform:jobs:taken", 1, job.getId()); + // NOPMD - TODO: use a transaction here + redis.cmd().lrem(REDIS_KEY_TAKEN, 1, job.getId()); redis.cmd().rpush(queue, job.getId()); } else { redis.cmd().lpush(queue, job.getId()); @@ -241,7 +244,7 @@ protected Job failJob(Job job, String error) { updateJob(failedJob); - redis.cmd().rpush("xtraplatform:jobs:failed", job.getId()); + redis.cmd().rpush(REDIS_KEY_FAILED, job.getId()); return failedJob; } @@ -250,7 +253,7 @@ protected Job failJob(Job job, String error) { protected Job doneJob(Job job) { Job doneJob = job.done(); - redis.json().jsonDel("xtraplatform:jobs:job:" + doneJob.getId()); + redis.json().jsonDel(REDIS_KEY_JOB + doneJob.getId()); return doneJob; } @@ -258,9 +261,7 @@ protected Job doneJob(Job job) { @Override protected Optional takeJob(String queue) { String jobId = - redis - .cmd() - .lmove(queue, "xtraplatform:jobs:taken", ListDirection.RIGHT, ListDirection.LEFT); + redis.cmd().lmove(queue, REDIS_KEY_TAKEN, ListDirection.RIGHT, ListDirection.LEFT); if (Objects.nonNull(jobId)) { return getJob(jobId); @@ -271,7 +272,7 @@ protected Optional takeJob(String queue) { @Override protected Optional untakeJob(String jobId) { - long count = redis.cmd().lrem("xtraplatform:jobs:taken", 1, jobId); + long count = redis.cmd().lrem(REDIS_KEY_TAKEN, 1, jobId); if (count > 0) { return getJob(jobId); @@ -284,7 +285,7 @@ protected Optional untakeJob(String jobId) { protected List onJobFinished(Job job, JobSet jobSet) { List followUps = jobSet.done(job); - redis.json().jsonDel("xtraplatform:jobs:job:" + job.getId()); + redis.json().jsonDel(REDIS_KEY_JOB + job.getId()); return followUps; } @@ -305,33 +306,33 @@ protected List getJobsInQueue(String queue) { @Override protected void notifyObservers(String type) { - redis.pubsub().publish("xtraplatform:jobs:notifications", type); + redis.pubsub().publish(REDIS_KEY_NOTIFICATIONS, type); } @Override public void onPush(Consumer callback) { - redis.pubsub().subscribe("xtraplatform:jobs:notifications", callback); + redis.pubsub().subscribe(REDIS_KEY_NOTIFICATIONS, callback); } @Override public boolean doneSet(String jobSetId) { - long count = redis.json().jsonDel("xtraplatform:jobs:set:" + jobSetId); + long count = redis.json().jsonDel(REDIS_KEY_SET + jobSetId); return count > 0; } @Override public boolean error(String jobId, String error, boolean retry) { - // TODO: retry logic + // NOPMD - TODO: retry logic return false; } @Override public Collection getSets() { - Set jobSetIds = redis.cmd().keys("xtraplatform:jobs:set:*"); + Set jobSetIds = redis.cmd().keys(REDIS_KEY_SET + "*"); return jobSetIds.stream() - .map(id -> id.substring("xtraplatform:jobs:set:".length())) + .map(id -> id.substring(REDIS_KEY_SET.length())) .map(this::getJobSet) .filter(Optional::isPresent) .map(Optional::get) @@ -340,18 +341,17 @@ public Collection getSets() { @Override protected List getTakenIds() { - return redis.cmd().lrange("xtraplatform:jobs:taken", 0, -1); + return redis.cmd().lrange(REDIS_KEY_TAKEN, 0, -1); } @Override protected List getFailedIds() { - return redis.cmd().lrange("xtraplatform:jobs:failed", 0, -1); + return redis.cmd().lrange(REDIS_KEY_FAILED, 0, -1); } @Override protected Optional getJob(String jobId) { - String jobJson = - redis.json().jsonGetAsPlainString("xtraplatform:jobs:job:" + jobId, Path.ROOT_PATH); + String jobJson = redis.json().jsonGetAsPlainString(REDIS_KEY_JOB + jobId, Path.ROOT_PATH); if (Objects.isNull(jobJson)) { return Optional.empty(); @@ -362,7 +362,7 @@ protected Optional getJob(String jobId) { return Optional.ofNullable(job); } catch (Throwable e) { - throw new RuntimeException(e); + throw new IllegalStateException("Failed to deserialize job from JSON: " + jobId, e); } } @@ -382,7 +382,8 @@ private Object unpackDetails(Job job) { } } catch (IOException e) { - throw new RuntimeException(e); + throw new IllegalStateException( + "Failed to convert job details to target type: " + job.getType(), e); } } @@ -405,7 +406,8 @@ private Object unpackSetDetails(JobSet jobSet) { } } catch (IOException e) { - throw new RuntimeException(e); + throw new IllegalStateException( + "Failed to convert job set details to target type: " + jobSet.getType(), e); } } @@ -428,14 +430,10 @@ private void applyJsonPaths(String jobSetId, Map jsonPathUpdates redis .json() .jsonNumIncrBy( - "xtraplatform:jobs:set:" + jobSetId, - Path2.of(entry.getKey()), - (Integer) entry.getValue()); + REDIS_KEY_SET + jobSetId, Path2.of(entry.getKey()), (Integer) entry.getValue()); continue; } - redis - .json() - .jsonSet("xtraplatform:jobs:set:" + jobSetId, Path2.of(entry.getKey()), entry.getValue()); + redis.json().jsonSet(REDIS_KEY_SET + jobSetId, Path2.of(entry.getKey()), entry.getValue()); } } } diff --git a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisImpl.java b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisImpl.java index 4193e147..ad0b2e34 100644 --- a/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisImpl.java +++ b/xtraplatform-redis/src/main/java/de/ii/xtraplatform/redis/app/RedisImpl.java @@ -74,6 +74,7 @@ public void onStop() { AppLifeCycle.super.onStop(); } + @SuppressWarnings("PMD.AvoidSynchronizedAtMethodLevel") @Override protected synchronized void onVolatileStart() { super.onVolatileStart(); @@ -145,7 +146,7 @@ public Tuple check() { connect(); if (Objects.isNull(jedis)) { - // TODO: retry + // NOPMD - TODO: retry if (Objects.nonNull(connectionError)) { return Tuple.of(State.UNAVAILABLE, connectionError.getMessage()); } diff --git a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/ReactiveRx.java b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/ReactiveRx.java index 0ff50a1f..c3211e16 100644 --- a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/ReactiveRx.java +++ b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/ReactiveRx.java @@ -30,6 +30,7 @@ @Singleton @AutoBind +@SuppressWarnings({"PMD.GodClass", "PMD.TooManyMethods"}) public class ReactiveRx implements Reactive { @Inject diff --git a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/RunnerRx.java b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/RunnerRx.java index f2c3c888..3a5a064a 100644 --- a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/RunnerRx.java +++ b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/RunnerRx.java @@ -24,8 +24,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; // TODO: the queue was introduced as a mean to protect the connection pool and prevent deadlocks // because of a bug (running.get() < queueSize instead of running.get() < capacity) it was never @@ -35,8 +33,6 @@ // FeatureStreams public class RunnerRx implements Runner { - private static final Logger LOGGER = LoggerFactory.getLogger(RunnerRx.class); - private final Scheduler scheduler; private final String name; private final int capacity; @@ -45,11 +41,16 @@ public class RunnerRx implements Runner { private final AtomicInteger running; public RunnerRx(String name) { - this(name, Runner.DYNAMIC_CAPACITY, Runner.DYNAMIC_CAPACITY); + this( + getConfig(Runner.DYNAMIC_CAPACITY), name, Runner.DYNAMIC_CAPACITY, Runner.DYNAMIC_CAPACITY); } public RunnerRx(String name, int capacity, int queueSize) { - this(getConfig(name, capacity), name, capacity, queueSize); + this(getConfig(capacity), name, capacity, queueSize); + } + + public RunnerRx(int capacity, int queueSize) { + this(getConfig(capacity), "default", capacity, queueSize); } RunnerRx(ExecutorService executorService, String name, int capacity, int queueSize) { @@ -58,7 +59,6 @@ public RunnerRx(String name, int capacity, int queueSize) { } // TODO: thread names - getDispatcherName(name); this.scheduler = Schedulers.from(executorService); scheduler.start(); @@ -153,26 +153,19 @@ public int getActiveStreams() { return running.get(); } - private static ExecutorService getConfig(String name, int capacity) { - return capacity == Runner.DYNAMIC_CAPACITY - ? getDefaultConfig(name) - : getConfig(name, capacity, capacity); + private static ExecutorService getConfig(int capacity) { + return capacity == Runner.DYNAMIC_CAPACITY ? getDefaultConfig() : getConfig(capacity, capacity); } - private static ExecutorService getDefaultConfig(String name) { - return getConfig(name, 8, 64); + private static ExecutorService getDefaultConfig() { + return getConfig(8, 64); } - // TODO - private static ExecutorService getConfig(String name, int parallelismMin, int parallelismMax) { + private static ExecutorService getConfig(int parallelismMin, int parallelismMax) { return Executors.newWorkStealingPool(Math.max(1, parallelismMax)); } - private static String getDispatcherName(String name) { - return String.format("stream.%s", name); - } - @Override public void close() { if (Objects.nonNull(scheduler)) {