Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ private class OutputStreamCloseListener extends OutputStream {
private final OutputStream entityStream;

public OutputStreamCloseListener(ContainerResponseContext responseContext) {
super();
this.responseContext = responseContext;
this.entityStream = responseContext.getEntityStream();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,11 @@
import it.sauronsoftware.cron4j.TaskExecutor;
import java.text.DecimalFormat;
import java.time.Duration;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.extra.AmountFormats;
Expand Down Expand Up @@ -99,155 +88,11 @@ public void deschedule(String jobId) {

@Override
public TaskQueue createQueue(String id, int maxConcurrentTasks) {
return new TaskQueue() {
private final BlockingQueue<Pair<Task, CompletableFuture<TaskStatus>>> queue =
new LinkedBlockingQueue<>();
private final BlockingQueue<TaskStatus> currentTasks =
new LinkedBlockingQueue<>(maxConcurrentTasks);
private final BlockingQueue<Integer> threadNumbers =
new LinkedBlockingQueue<>(
IntStream.rangeClosed(1, maxConcurrentTasks).boxed().collect(Collectors.toList()));

@Override
public synchronized CompletableFuture<TaskStatus> launch(Task task) {
Thread.currentThread().setName("bg-task-0");
task.logContext();
cleanup();

for (TaskStatus runningTask : currentTasks) {
if (Objects.nonNull(runningTask)
&& Objects.equals(runningTask.getId(), task.getId())
&& Objects.equals(runningTask.getLabel(), task.getLabel())) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Ignoring task '{}' for '{}', already running", task.getLabel(), task.getId());
}
return CompletableFuture.failedFuture(new IllegalArgumentException());
}
}

if (getFutureTasks().stream()
.anyMatch(
futureTask ->
Objects.equals(futureTask.getId(), task.getId())
&& Objects.equals(futureTask.getLabel(), task.getLabel()))) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Ignoring task '{}' for '{}', already in queue", task.getLabel(), task.getId());
}
return CompletableFuture.failedFuture(new IllegalArgumentException());
}

// LOGGER.debug("Queuing task {}", task.getLabel());
final CompletableFuture<TaskStatus> taskStatusCompletableFuture = new CompletableFuture<>();

queue.offer(new ImmutablePair<>(task, taskStatusCompletableFuture));

checkQueue();

return taskStatusCompletableFuture;
}

@Override
public CompletableFuture<TaskStatus> launch(Task task, long delay) {
CompletableFuture<TaskStatus> completableFuture = new CompletableFuture<>();
ForkJoinPool.commonPool()
.execute(
() -> {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
// ignore
}
launch(task).thenAccept(completableFuture::complete);
});
return completableFuture;
}

@Override
public void remove(Task task) {
List<Pair<Task, CompletableFuture<TaskStatus>>> toRemove =
queue.stream()
.filter(entry -> Objects.equals(task, entry.getLeft()))
.collect(Collectors.toList());
toRemove.forEach(
o -> {
boolean removed = queue.remove(o);
if (removed) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(
"REMOVED TASK {} -> {}", o.getLeft().getLabel(), o.getLeft().getId());
}
}
});
}

@Override
public List<Task> getFutureTasks() {
return queue.stream().map(Pair::getLeft).collect(Collectors.toList());
}

// TODO: list of currentTasks
@Override
public Optional<TaskStatus> getCurrentTask() {
return Optional.ofNullable(currentTasks.peek());
}

private synchronized void checkQueue() {
cleanup();

if (currentTasks.remainingCapacity() > 0) {
final Pair<Task, CompletableFuture<TaskStatus>> task = queue.poll();

if (Objects.nonNull(task)) {
if (task.getLeft().getMaxPartials() > 1 && currentTasks.remainingCapacity() > 1) {
int maxPartials =
Math.min(currentTasks.remainingCapacity(), task.getLeft().getMaxPartials());
AtomicInteger activePartials = new AtomicInteger(maxPartials);

for (int i = 1; i <= maxPartials; i++) {
int threadNumber = Objects.requireNonNullElse(threadNumbers.poll(), 1);
TaskCron4j taskCron4j =
new TaskCron4j(task.getLeft(), maxPartials, i, threadNumber, activePartials);
final TaskExecutor taskExecutor = scheduler.launch(taskCron4j);
TaskStatus currentTask = new TaskStatusCron4j(taskCron4j, taskExecutor);

addLogging(taskCron4j, currentTask, threadNumber);
currentTask.onDone(
throwable -> {
activePartials.decrementAndGet();
threadNumbers.offer(threadNumber);
checkQueue();
});
currentTasks.offer(currentTask);

// TODO: currently not used?
// task.getRight().complete(currentTask);
}
} else {
int threadNumber = Objects.requireNonNullElse(threadNumbers.poll(), 1);
TaskCron4j taskCron4j = new TaskCron4j(task.getLeft(), threadNumber);
final TaskExecutor taskExecutor = scheduler.launch(taskCron4j);
TaskStatus currentTask = new TaskStatusCron4j(taskCron4j, taskExecutor);

addLogging(taskCron4j, currentTask, threadNumber);
currentTask.onDone(
throwable -> {
threadNumbers.offer(threadNumber);
checkQueue();
});
currentTasks.offer(currentTask);

task.getRight().complete(currentTask);
}
}
}
}

private synchronized void cleanup() {
currentTasks.removeIf(TaskStatus::isDone);
}
};
return new TaskQueueCron4j(
scheduler,
(taskCron4j, taskStatus) ->
addLogging(taskCron4j, taskStatus, taskCron4j.getThreadNumber()),
maxConcurrentTasks);
}

private void addLogging(TaskCron4j taskCron4j, TaskStatus taskStatus, int threadNum) {
Expand All @@ -268,7 +113,9 @@ private void addLogging(TaskCron4j taskCron4j, TaskStatus taskStatus, int thread
taskCron4j.isPartial()
? String.format(" (part [%d/%d])", taskCron4j.getPartial(), taskCron4j.getMaxPartials())
: "";
LOGGER.info("{}{} started", task.getLabel(), part);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("{}{} started", task.getLabel(), part);
}

taskStatus.onChange(
(progress, message) -> {
Expand All @@ -293,7 +140,9 @@ private void addLogging(TaskCron4j taskCron4j, TaskStatus taskStatus, int thread
} else {
String time = pretty(taskStatus.getEndTime() - taskStatus.getStartTime());

LOGGER.info("{}{} finished in {}", task.getLabel(), part, time);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("{}{} finished in {}", task.getLabel(), part, time);
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

@Singleton
@AutoBind
@SuppressWarnings("PMD.TooManyMethods")
public class ServiceBackgroundTasksImpl implements ServiceBackgroundTasks, AppLifeCycle {

private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBackgroundTasksImpl.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ public class ServiceContextBinder extends AbstractBinder
implements Binder, ServiceInjectableContext {

@Inject
public ServiceContextBinder() {}
public ServiceContextBinder() {
super();
}

// TODO: bind every subtype
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public URI getUri() {
return appContext.getUri();
}

@Override
public List<String> getPathPrefix() {
return appContext.getPathPrefix();
}
Expand Down
Loading