diff --git a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/LoggingContextCloser.java b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/LoggingContextCloser.java index afdf0c47..7c1ddfc6 100644 --- a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/LoggingContextCloser.java +++ b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/LoggingContextCloser.java @@ -68,6 +68,7 @@ private class OutputStreamCloseListener extends OutputStream { private final OutputStream entityStream; public OutputStreamCloseListener(ContainerResponseContext responseContext) { + super(); this.responseContext = responseContext; this.entityStream = responseContext.getEntityStream(); } diff --git a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/SchedulerCron4j.java b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/SchedulerCron4j.java index f5ae061f..0f1989ac 100644 --- a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/SchedulerCron4j.java +++ b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/SchedulerCron4j.java @@ -18,22 +18,11 @@ import it.sauronsoftware.cron4j.TaskExecutor; import java.text.DecimalFormat; import java.time.Duration; -import java.util.List; import java.util.Locale; -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import javax.inject.Inject; import javax.inject.Singleton; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.threeten.extra.AmountFormats; @@ -99,155 +88,11 @@ public void deschedule(String jobId) { @Override public TaskQueue createQueue(String id, int maxConcurrentTasks) { - return new TaskQueue() { - private final BlockingQueue>> queue = - new LinkedBlockingQueue<>(); - private final BlockingQueue currentTasks = - new LinkedBlockingQueue<>(maxConcurrentTasks); - private final BlockingQueue threadNumbers = - new LinkedBlockingQueue<>( - IntStream.rangeClosed(1, maxConcurrentTasks).boxed().collect(Collectors.toList())); - - @Override - public synchronized CompletableFuture launch(Task task) { - Thread.currentThread().setName("bg-task-0"); - task.logContext(); - cleanup(); - - for (TaskStatus runningTask : currentTasks) { - if (Objects.nonNull(runningTask) - && Objects.equals(runningTask.getId(), task.getId()) - && Objects.equals(runningTask.getLabel(), task.getLabel())) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "Ignoring task '{}' for '{}', already running", task.getLabel(), task.getId()); - } - return CompletableFuture.failedFuture(new IllegalArgumentException()); - } - } - - if (getFutureTasks().stream() - .anyMatch( - futureTask -> - Objects.equals(futureTask.getId(), task.getId()) - && Objects.equals(futureTask.getLabel(), task.getLabel()))) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "Ignoring task '{}' for '{}', already in queue", task.getLabel(), task.getId()); - } - return CompletableFuture.failedFuture(new IllegalArgumentException()); - } - - // LOGGER.debug("Queuing task {}", task.getLabel()); - final CompletableFuture taskStatusCompletableFuture = new CompletableFuture<>(); - - queue.offer(new ImmutablePair<>(task, taskStatusCompletableFuture)); - - checkQueue(); - - return taskStatusCompletableFuture; - } - - @Override - public CompletableFuture launch(Task task, long delay) { - CompletableFuture completableFuture = new CompletableFuture<>(); - ForkJoinPool.commonPool() - .execute( - () -> { - try { - Thread.sleep(delay); - } catch (InterruptedException e) { - // ignore - } - launch(task).thenAccept(completableFuture::complete); - }); - return completableFuture; - } - - @Override - public void remove(Task task) { - List>> toRemove = - queue.stream() - .filter(entry -> Objects.equals(task, entry.getLeft())) - .collect(Collectors.toList()); - toRemove.forEach( - o -> { - boolean removed = queue.remove(o); - if (removed) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace( - "REMOVED TASK {} -> {}", o.getLeft().getLabel(), o.getLeft().getId()); - } - } - }); - } - - @Override - public List getFutureTasks() { - return queue.stream().map(Pair::getLeft).collect(Collectors.toList()); - } - - // TODO: list of currentTasks - @Override - public Optional getCurrentTask() { - return Optional.ofNullable(currentTasks.peek()); - } - - private synchronized void checkQueue() { - cleanup(); - - if (currentTasks.remainingCapacity() > 0) { - final Pair> task = queue.poll(); - - if (Objects.nonNull(task)) { - if (task.getLeft().getMaxPartials() > 1 && currentTasks.remainingCapacity() > 1) { - int maxPartials = - Math.min(currentTasks.remainingCapacity(), task.getLeft().getMaxPartials()); - AtomicInteger activePartials = new AtomicInteger(maxPartials); - - for (int i = 1; i <= maxPartials; i++) { - int threadNumber = Objects.requireNonNullElse(threadNumbers.poll(), 1); - TaskCron4j taskCron4j = - new TaskCron4j(task.getLeft(), maxPartials, i, threadNumber, activePartials); - final TaskExecutor taskExecutor = scheduler.launch(taskCron4j); - TaskStatus currentTask = new TaskStatusCron4j(taskCron4j, taskExecutor); - - addLogging(taskCron4j, currentTask, threadNumber); - currentTask.onDone( - throwable -> { - activePartials.decrementAndGet(); - threadNumbers.offer(threadNumber); - checkQueue(); - }); - currentTasks.offer(currentTask); - - // TODO: currently not used? - // task.getRight().complete(currentTask); - } - } else { - int threadNumber = Objects.requireNonNullElse(threadNumbers.poll(), 1); - TaskCron4j taskCron4j = new TaskCron4j(task.getLeft(), threadNumber); - final TaskExecutor taskExecutor = scheduler.launch(taskCron4j); - TaskStatus currentTask = new TaskStatusCron4j(taskCron4j, taskExecutor); - - addLogging(taskCron4j, currentTask, threadNumber); - currentTask.onDone( - throwable -> { - threadNumbers.offer(threadNumber); - checkQueue(); - }); - currentTasks.offer(currentTask); - - task.getRight().complete(currentTask); - } - } - } - } - - private synchronized void cleanup() { - currentTasks.removeIf(TaskStatus::isDone); - } - }; + return new TaskQueueCron4j( + scheduler, + (taskCron4j, taskStatus) -> + addLogging(taskCron4j, taskStatus, taskCron4j.getThreadNumber()), + maxConcurrentTasks); } private void addLogging(TaskCron4j taskCron4j, TaskStatus taskStatus, int threadNum) { @@ -268,7 +113,9 @@ private void addLogging(TaskCron4j taskCron4j, TaskStatus taskStatus, int thread taskCron4j.isPartial() ? String.format(" (part [%d/%d])", taskCron4j.getPartial(), taskCron4j.getMaxPartials()) : ""; - LOGGER.info("{}{} started", task.getLabel(), part); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("{}{} started", task.getLabel(), part); + } taskStatus.onChange( (progress, message) -> { @@ -293,7 +140,9 @@ private void addLogging(TaskCron4j taskCron4j, TaskStatus taskStatus, int thread } else { String time = pretty(taskStatus.getEndTime() - taskStatus.getStartTime()); - LOGGER.info("{}{} finished in {}", task.getLabel(), part, time); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("{}{} finished in {}", task.getLabel(), part, time); + } } }); } diff --git a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/ServiceBackgroundTasksImpl.java b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/ServiceBackgroundTasksImpl.java index bd15086d..84431ff8 100644 --- a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/ServiceBackgroundTasksImpl.java +++ b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/ServiceBackgroundTasksImpl.java @@ -36,6 +36,7 @@ @Singleton @AutoBind +@SuppressWarnings("PMD.TooManyMethods") public class ServiceBackgroundTasksImpl implements ServiceBackgroundTasks, AppLifeCycle { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBackgroundTasksImpl.class); diff --git a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/ServiceContextBinder.java b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/ServiceContextBinder.java index 202e14a4..5989e546 100644 --- a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/ServiceContextBinder.java +++ b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/ServiceContextBinder.java @@ -29,7 +29,9 @@ public class ServiceContextBinder extends AbstractBinder implements Binder, ServiceInjectableContext { @Inject - public ServiceContextBinder() {} + public ServiceContextBinder() { + super(); + } // TODO: bind every subtype @Override diff --git a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/ServicesContextImpl.java b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/ServicesContextImpl.java index 5d73d588..15da191d 100644 --- a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/ServicesContextImpl.java +++ b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/ServicesContextImpl.java @@ -31,6 +31,7 @@ public URI getUri() { return appContext.getUri(); } + @Override public List getPathPrefix() { return appContext.getPathPrefix(); } diff --git a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/ServicesEndpoint.java b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/ServicesEndpoint.java index 815d7d93..e76292b6 100755 --- a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/ServicesEndpoint.java +++ b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/ServicesEndpoint.java @@ -32,6 +32,7 @@ import java.nio.charset.StandardCharsets; import java.security.Principal; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -64,6 +65,7 @@ @Hidden @Path("/") @Produces(MediaTypeCharset.APPLICATION_JSON_UTF8) +@SuppressWarnings({"PMD.GodClass", "PMD.TooManyMethods"}) public class ServicesEndpoint implements Endpoint { private static final Logger LOGGER = LoggerFactory.getLogger(ServicesEndpoint.class); @@ -98,6 +100,7 @@ public ServicesEndpoint( // TODO @GET @Produces(MediaType.WILDCARD) + @SuppressWarnings("PMD.UnusedFormalParameter") // callback parameter part of API contract public Response getServices( @QueryParam("callback") String callback, @QueryParam("f") String f, @@ -111,26 +114,7 @@ public Response getServices( // .filter(serviceData -> !serviceData.hasError()) .collect(Collectors.toList()); - MediaType mediaType = - Objects.equals(f, "json") - ? MediaType.APPLICATION_JSON_TYPE - : Objects.equals(f, "html") - ? MediaType.TEXT_HTML_TYPE - : Objects.nonNull(containerRequestContext.getMediaType()) - ? containerRequestContext.getMediaType() - : (containerRequestContext.getAcceptableMediaTypes().size() > 0 - && !containerRequestContext - .getAcceptableMediaTypes() - .get(0) - .equals(MediaType.WILDCARD_TYPE)) - ? containerRequestContext.getAcceptableMediaTypes().get(0) - : (Objects.nonNull(containerRequestContext.getHeaderString("user-agent")) - && containerRequestContext - .getHeaderString("user-agent") - .toLowerCase() - .contains("google-site-verification") - ? MediaType.TEXT_HTML_TYPE - : MediaType.APPLICATION_JSON_TYPE); + MediaType mediaType = determineMediaType(f, containerRequestContext); Optional provider = serviceListingProviders.get().stream() @@ -149,10 +133,11 @@ public Response getServices( Map queryParameters = containerRequestContext.getUriInfo().getQueryParameters().entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get(0))); - Response serviceListing = - provider.get().getServiceListing(services, uriCustomizer, queryParameters, user); - return Response.ok().entity(serviceListing.getEntity()).type(mediaType).build(); + try (Response serviceListing = + provider.get().getServiceListing(services, uriCustomizer, queryParameters, user)) { + return Response.ok().entity(serviceListing.getEntity()).type(mediaType).build(); + } } return Response.ok().entity(services).build(); @@ -251,6 +236,7 @@ public Response getLogout( } @Path("/{service}/") + @SuppressWarnings("PMD.UnusedFormalParameter") // callback parameter part of API contract public ServiceEndpoint getServiceResource( @PathParam("service") String id, @QueryParam("callback") String callback, @@ -259,13 +245,14 @@ public ServiceEndpoint getServiceResource( } @Path("/{service}/v{version}/") + @SuppressWarnings("PMD.UnusedFormalParameter") // callback parameter part of API contract public ServiceEndpoint getVersionedServiceResource( @PathParam("service") String id, @QueryParam("callback") String callback, @Context ContainerRequestContext containerRequestContext, @PathParam("version") Integer version) { - Service service = getService(id, callback); + Service service = getService(id); if (service.getData().getApiVersion().isPresent()) { Integer apiVersion = service.getData().getApiVersion().get(); @@ -312,7 +299,7 @@ private ServiceEndpoint getServiceResource(Service s) { .orElseThrow(); } - private Service getService(String id, String callback) { + private Service getService(String id) { Optional s = entityRegistry.getEntity(Service.class, id); if (s.isEmpty() /*|| s.get().getData().hasError()*/) { @@ -326,18 +313,56 @@ private Optional getExternalUri() { return Optional.of(servicesContext.getUri()); } + private MediaType determineMediaType(String f, ContainerRequestContext containerRequestContext) { + if (Objects.equals(f, "json")) { + return MediaType.APPLICATION_JSON_TYPE; + } + + if (Objects.equals(f, "html")) { + return MediaType.TEXT_HTML_TYPE; + } + + if (Objects.nonNull(containerRequestContext.getMediaType())) { + return containerRequestContext.getMediaType(); + } + + List acceptableTypes = containerRequestContext.getAcceptableMediaTypes(); + if (!acceptableTypes.isEmpty() && !acceptableTypes.get(0).equals(MediaType.WILDCARD_TYPE)) { + return acceptableTypes.get(0); + } + + String userAgent = containerRequestContext.getHeaderString("user-agent"); + if (Objects.nonNull(userAgent) + && userAgent.toLowerCase(Locale.ROOT).contains("google-site-verification")) { + return MediaType.TEXT_HTML_TYPE; + } + + return MediaType.APPLICATION_JSON_TYPE; + } + private void openLoggingContext(ContainerRequestContext containerRequestContext) { openLoggingContext(null, null, containerRequestContext); } private void openLoggingContext( String serviceId, Integer version, ContainerRequestContext containerRequestContext) { + setupServiceContext(serviceId); + setupRequestLogging(serviceId, version, containerRequestContext); + logRequestUser(containerRequestContext); + logRequestHeaders(containerRequestContext); + logRequestBody(containerRequestContext); + } + + private void setupServiceContext(String serviceId) { if (Objects.nonNull(serviceId)) { LogContext.put(LogContext.CONTEXT.SERVICE, serviceId); } else { LogContext.remove(LogContext.CONTEXT.SERVICE); } + } + private void setupRequestLogging( + String serviceId, Integer version, ContainerRequestContext containerRequestContext) { if (LOGGER.isDebugEnabled() || LOGGER.isDebugEnabled(MARKER.REQUEST)) { LogContext.put(LogContext.CONTEXT.REQUEST, LogContext.generateRandomUuid().toString()); @@ -349,7 +374,10 @@ private void openLoggingContext( } else { LogContext.remove(LogContext.CONTEXT.REQUEST); } + } + @SuppressWarnings("PMD.GuardLogStatement") + private void logRequestUser(ContainerRequestContext containerRequestContext) { if (LOGGER.isDebugEnabled(MARKER.REQUEST_USER)) { Principal principal = containerRequestContext.getSecurityContext().getUserPrincipal(); @@ -359,27 +387,29 @@ private void openLoggingContext( LOGGER.debug(MARKER.REQUEST_USER, "Request user: null"); } } + } + private void logRequestHeaders(ContainerRequestContext containerRequestContext) { if (LOGGER.isDebugEnabled(MARKER.REQUEST_HEADER)) { String headers = Joiner.on("\n ").withKeyValueSeparator(": ").join(containerRequestContext.getHeaders()); LOGGER.debug(MARKER.REQUEST_HEADER, "Request headers: \n {}", headers); } + } - if (LOGGER.isDebugEnabled(MARKER.REQUEST_BODY)) { - if (containerRequestContext.hasEntity()) { - try { - containerRequestContext.getEntityStream().mark(Integer.MAX_VALUE); - String body = - new String( - containerRequestContext.getEntityStream().readAllBytes(), StandardCharsets.UTF_8); - containerRequestContext.getEntityStream().reset(); - - LOGGER.debug(MARKER.REQUEST_BODY, "Request body: \n {}", body); - } catch (IOException e) { - // ignore - } + private void logRequestBody(ContainerRequestContext containerRequestContext) { + if (LOGGER.isDebugEnabled(MARKER.REQUEST_BODY) && containerRequestContext.hasEntity()) { + try { + containerRequestContext.getEntityStream().mark(Integer.MAX_VALUE); + String body = + new String( + containerRequestContext.getEntityStream().readAllBytes(), StandardCharsets.UTF_8); + containerRequestContext.getEntityStream().reset(); + + LOGGER.debug(MARKER.REQUEST_BODY, "Request body: \n {}", body); + } catch (IOException e) { + // ignore } } } diff --git a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/TaskCron4j.java b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/TaskCron4j.java index 3a419eca..9db73bc8 100644 --- a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/TaskCron4j.java +++ b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/TaskCron4j.java @@ -20,6 +20,7 @@ public class TaskCron4j extends Task { private final de.ii.xtraplatform.services.domain.Task task; private final int maxPartials; private final int partial; + private final int threadNumber; private final String threadName; private final AtomicInteger activePartials; @@ -33,9 +34,11 @@ public TaskCron4j( int partial, int threadNumber, AtomicInteger activePartials) { + super(); this.task = task; this.maxPartials = maxPartials; this.partial = partial; + this.threadNumber = threadNumber; this.threadName = "bg-task-" + threadNumber; this.activePartials = activePartials; } @@ -60,8 +63,12 @@ public String getThreadName() { return threadName; } + public int getThreadNumber() { + return threadNumber; + } + @Override - public void execute(TaskExecutionContext taskExecutionContext) throws RuntimeException { + public void execute(TaskExecutionContext taskExecutionContext) { final TaskContext taskContext = new TaskContextCron4j( taskExecutionContext, diff --git a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/TaskQueueCron4j.java b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/TaskQueueCron4j.java new file mode 100644 index 00000000..24affc92 --- /dev/null +++ b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/TaskQueueCron4j.java @@ -0,0 +1,230 @@ +/* + * Copyright 2018-2020 interactive instruments GmbH + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package de.ii.xtraplatform.services.app; + +import de.ii.xtraplatform.services.domain.Task; +import de.ii.xtraplatform.services.domain.TaskQueue; +import de.ii.xtraplatform.services.domain.TaskStatus; +import it.sauronsoftware.cron4j.TaskExecutor; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Cron4j implementation of TaskQueue with support for parallel execution and task queuing. */ +@SuppressWarnings("PMD.TooManyMethods") +public class TaskQueueCron4j implements TaskQueue { + + private static final Logger LOGGER = LoggerFactory.getLogger(TaskQueueCron4j.class); + + private final it.sauronsoftware.cron4j.Scheduler scheduler; + private final BiConsumer loggingHandler; + private final BlockingQueue>> queue; + private final BlockingQueue currentTasks; + private final BlockingQueue threadNumbers; + + public TaskQueueCron4j( + it.sauronsoftware.cron4j.Scheduler scheduler, + BiConsumer loggingHandler, + int maxConcurrentTasks) { + this.scheduler = scheduler; + this.loggingHandler = loggingHandler; + this.queue = new LinkedBlockingQueue<>(); + this.currentTasks = new LinkedBlockingQueue<>(maxConcurrentTasks); + this.threadNumbers = + new LinkedBlockingQueue<>( + IntStream.rangeClosed(1, maxConcurrentTasks).boxed().collect(Collectors.toList())); + } + + @Override + public CompletableFuture launch(Task task) { + synchronized (this) { + Thread.currentThread().setName("bg-task-0"); + task.logContext(); + cleanup(); + + if (isTaskAlreadyRunning(task)) { + logTaskIgnored(task, "already running"); + return CompletableFuture.failedFuture(new IllegalArgumentException()); + } + + if (isTaskAlreadyQueued(task)) { + logTaskIgnored(task, "already in queue"); + return CompletableFuture.failedFuture(new IllegalArgumentException()); + } + + return queueTask(task); + } + } + + @Override + public CompletableFuture launch(Task task, long delay) { + CompletableFuture completableFuture = new CompletableFuture<>(); + ForkJoinPool.commonPool() + .execute( + () -> { + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + // ignore + } + launch(task).thenAccept(completableFuture::complete); + }); + return completableFuture; + } + + @Override + public void remove(Task task) { + List>> toRemove = + queue.stream() + .filter(entry -> Objects.equals(task, entry.getLeft())) + .collect(Collectors.toList()); + + toRemove.forEach( + entry -> { + boolean removed = queue.remove(entry); + if (removed && LOGGER.isTraceEnabled()) { + LOGGER.trace( + "REMOVED TASK {} -> {}", entry.getLeft().getLabel(), entry.getLeft().getId()); + } + }); + } + + @Override + public List getFutureTasks() { + return queue.stream().map(Pair::getLeft).collect(Collectors.toList()); + } + + @Override + public Optional getCurrentTask() { + return Optional.ofNullable(currentTasks.peek()); + } + + private boolean isTaskAlreadyRunning(Task task) { + return currentTasks.stream() + .filter(Objects::nonNull) + .anyMatch( + runningTask -> + Objects.equals(runningTask.getId(), task.getId()) + && Objects.equals(runningTask.getLabel(), task.getLabel())); + } + + private boolean isTaskAlreadyQueued(Task task) { + return getFutureTasks().stream() + .anyMatch( + futureTask -> + Objects.equals(futureTask.getId(), task.getId()) + && Objects.equals(futureTask.getLabel(), task.getLabel())); + } + + private void logTaskIgnored(Task task, String reason) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Ignoring task '{}' for '{}', {}", task.getLabel(), task.getId(), reason); + } + } + + private CompletableFuture queueTask(Task task) { + final CompletableFuture taskStatusCompletableFuture = new CompletableFuture<>(); + queue.offer(new ImmutablePair<>(task, taskStatusCompletableFuture)); + checkQueue(); + return taskStatusCompletableFuture; + } + + private void checkQueue() { + synchronized (this) { + cleanup(); + + if (currentTasks.remainingCapacity() <= 0) { + return; + } + + final Pair> taskPair = queue.poll(); + if (taskPair == null) { + return; + } + + Task task = taskPair.getLeft(); + if (shouldLaunchAsPartialTasks(task)) { + launchPartialTasks(task); + } else { + launchSingleTask(task, taskPair.getRight()); + } + } + } + + private boolean shouldLaunchAsPartialTasks(Task task) { + return task.getMaxPartials() > 1 && currentTasks.remainingCapacity() > 1; + } + + private void launchPartialTasks(Task task) { + int maxPartials = Math.min(currentTasks.remainingCapacity(), task.getMaxPartials()); + AtomicInteger activePartials = new AtomicInteger(maxPartials); + + for (int i = 1; i <= maxPartials; i++) { + int threadNumber = Objects.requireNonNullElse(threadNumbers.poll(), 1); + @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") + TaskCron4j taskCron4j = new TaskCron4j(task, maxPartials, i, threadNumber, activePartials); + TaskStatus currentTask = createAndLaunchTask(taskCron4j); + + setupPartialTaskCompletion(currentTask, threadNumber, activePartials); + currentTasks.offer(currentTask); + } + } + + private void launchSingleTask(Task task, CompletableFuture future) { + int threadNumber = Objects.requireNonNullElse(threadNumbers.poll(), 1); + TaskCron4j taskCron4j = new TaskCron4j(task, threadNumber); + TaskStatus currentTask = createAndLaunchTask(taskCron4j); + + setupSingleTaskCompletion(currentTask, threadNumber); + currentTasks.offer(currentTask); + future.complete(currentTask); + } + + private TaskStatus createAndLaunchTask(TaskCron4j taskCron4j) { + final TaskExecutor taskExecutor = scheduler.launch(taskCron4j); + TaskStatus currentTask = new TaskStatusCron4j(taskCron4j, taskExecutor); + loggingHandler.accept(taskCron4j, currentTask); + return currentTask; + } + + private void setupPartialTaskCompletion( + TaskStatus currentTask, int threadNumber, AtomicInteger activePartials) { + currentTask.onDone( + throwable -> { + activePartials.decrementAndGet(); + threadNumbers.offer(threadNumber); + checkQueue(); + }); + } + + private void setupSingleTaskCompletion(TaskStatus currentTask, int threadNumber) { + currentTask.onDone( + throwable -> { + threadNumbers.offer(threadNumber); + checkQueue(); + }); + } + + private void cleanup() { + synchronized (this) { + currentTasks.removeIf(TaskStatus::isDone); + } + } +} diff --git a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/TaskStatusCron4j.java b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/TaskStatusCron4j.java index 05049fd0..f9f22d2e 100644 --- a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/TaskStatusCron4j.java +++ b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/app/TaskStatusCron4j.java @@ -14,14 +14,11 @@ import java.util.Optional; import java.util.function.BiConsumer; import java.util.function.Consumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * @author zahnen */ public class TaskStatusCron4j implements TaskStatus { - private static final Logger LOGGER = LoggerFactory.getLogger(TaskStatusCron4j.class); private final String id; private final String label; diff --git a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/AbstractService.java b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/AbstractService.java index 14a2585d..e6330f89 100644 --- a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/AbstractService.java +++ b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/AbstractService.java @@ -35,21 +35,29 @@ protected void onStarted() { onStateChange( (from, to) -> { - LOGGER.info("Service with id '{}' state changed: {}", getId(), getState()); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Service with id '{}' state changed: {}", getId(), getState()); + } }, true); - LOGGER.info("Service with id '{}' started successfully.", getId()); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Service with id '{}' started successfully.", getId()); + } } @Override protected void onReloaded(boolean forceReload) { - LOGGER.info("Service with id '{}' reloaded successfully.", getId()); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Service with id '{}' reloaded successfully.", getId()); + } } @Override protected void onStopped() { - LOGGER.info("Service with id '{}' stopped.", getId()); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Service with id '{}' stopped.", getId()); + } } @Override diff --git a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/GenericView.java b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/GenericView.java index 63f93c3b..984165c7 100644 --- a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/GenericView.java +++ b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/GenericView.java @@ -9,27 +9,22 @@ import io.dropwizard.views.common.View; import java.net.URI; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * @author fischer */ public class GenericView extends View { - private static final Logger LOGGER = LoggerFactory.getLogger(GenericView.class); - private final URI uri; - private Object data; + private final Object data; public GenericView(String template, URI uri) { - super(template + ".mustache"); - this.uri = uri; - this.data = null; + this(template, uri, null); } public GenericView(String template, URI uri, Object data) { - this(template, uri); + super(template + ".mustache"); + this.uri = uri; this.data = data; } diff --git a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/Notification.java b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/Notification.java index 947c3c11..d17f01ea 100755 --- a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/Notification.java +++ b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/Notification.java @@ -31,7 +31,6 @@ public enum LEVEL { public Map getMessages() { return ImmutableMap.of(DEFAULT_LANGUAGE, ""); } - ; @Value.Derived public String getMessage() { diff --git a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/SchedulerTask.java b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/SchedulerTask.java index b3fa3084..d4d1db7e 100644 --- a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/SchedulerTask.java +++ b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/SchedulerTask.java @@ -12,11 +12,11 @@ */ public interface SchedulerTask { - public String getId(); + String getId(); - public void setId(String id); + void setId(String id); - public String getPattern(); + String getPattern(); - public Runnable getTask(); + Runnable getTask(); } diff --git a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/ServiceData.java b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/ServiceData.java index 3a722ca7..f4ccfe72 100644 --- a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/ServiceData.java +++ b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/ServiceData.java @@ -14,6 +14,7 @@ import de.ii.xtraplatform.entities.domain.AutoEntity; import de.ii.xtraplatform.entities.domain.EntityData; import java.util.List; +import java.util.Locale; import java.util.Optional; import org.immutables.value.Value; @@ -22,7 +23,7 @@ public interface ServiceData extends EntityData, AutoEntity { @Override default Optional getEntitySubType() { - return Optional.of(getServiceType().toLowerCase()); + return Optional.of(getServiceType().toLowerCase(Locale.ROOT)); } String getServiceType(); @@ -51,6 +52,7 @@ default String getLabel() { * erreichbar ist und Hintergrundprozesse nicht laufen. * @default true */ + @Override @Value.Default default boolean getEnabled() { return true; diff --git a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/ServiceDataCommon.java b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/ServiceDataCommon.java index 83494c17..1ffe416e 100644 --- a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/ServiceDataCommon.java +++ b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/ServiceDataCommon.java @@ -15,5 +15,5 @@ @JsonDeserialize(builder = ImmutableServiceDataCommon.Builder.class) public interface ServiceDataCommon extends ServiceData { - abstract static class Builder implements EntityDataBuilder {} + abstract class Builder implements EntityDataBuilder {} } diff --git a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/TaskContext.java b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/TaskContext.java index 3060578e..05a5050b 100644 --- a/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/TaskContext.java +++ b/xtraplatform-services/src/main/java/de/ii/xtraplatform/services/domain/TaskContext.java @@ -25,7 +25,7 @@ default boolean isFirstPartial() { } default boolean matchesPartialModulo(int number) { - return (number % getMaxPartials()) == (getCurrentPartial() - 1); + return number % getMaxPartials() == getCurrentPartial() - 1; } int getActivePartials();