Skip to content

Commit 2ee761b

Browse files
committed
Changes
java 17 -> 21 Update notification scheduler service - Using PriorityQueue and timer Added notification sender
1 parent a7eea20 commit 2ee761b

11 files changed

Lines changed: 173 additions & 236 deletions

File tree

Makefile

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
build:
44
@echo "🛠️ Билдим backend..."
5-
cd task-tracker-backend && mvn clean package -DskipTests
5+
mvn clean package -pl task-tracker-backend -DskipTests
66
@echo "🛠️ Билдим email-сервис..."
7-
cd task-tracker-email-sender && mvn clean package -DskipTests
7+
mvn clean package -pl task-tracker-email-sender -DskipTests
88
@echo "🛠️ Билдим scheduler..."
9-
cd task-tracker-scheduler && mvn clean package -DskipTests
9+
mvn clean package -pl task-tracker-scheduler -DskipTests
1010
@echo "🚀 Запускаем сервисы..."
1111
docker-compose up -d --build
1212

@@ -20,7 +20,7 @@ stop:
2020

2121
backend:
2222
docker-compose down backend
23-
cd task-tracker-backend && mvn clean package -DskipTests && cd ..
23+
mvn clean package -pl task-tracker-backend -am -DskipTests
2424
docker-compose build backend
2525

2626
scheduler:

task-tracker-backend/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
<url/>
2828
</scm>
2929
<properties>
30-
<java.version>17</java.version>
30+
<java.version>21</java.version>
3131
</properties>
3232
<dependencies>
3333
<!-- common -->

task-tracker-backend/src/main/java/com/metarash/backend/service/impl/TaskServiceImpl.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ public Slice<TaskDto> getTasksByUsername(String username, TaskFilter filter) {
4444
Specification<Task> spec = TaskSpecifications.build(filter, username);
4545

4646
Slice<TaskDto> tasks = taskRepository.findAll(spec, pageable).map(taskMapper::toDto);
47-
log.debug("Found tasks: {}", tasks.getContent());
4847
return tasks;
4948
}
5049

task-tracker-backend/src/main/java/com/metarash/backend/specification/TaskSpecifications.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,32 +4,35 @@
44
import com.metarash.backend.model.entity.Task;
55
import org.springframework.data.jpa.domain.Specification;
66

7+
import java.util.Optional;
8+
79
public class TaskSpecifications {
810

9-
public static Specification<Task> hasUsername(String username) {
11+
public static Specification<Task> hasField(String fieldName, Object value) {
1012
return (root, query, criteriaBuilder) ->
11-
criteriaBuilder.equal(root.get("user").get("username"), username);
13+
criteriaBuilder.equal(root.get(fieldName), value);
14+
}
15+
16+
public static Specification<Task> hasUsername(String username) {
17+
return hasField("username", username);
1218
}
1319

1420
public static Specification<Task> hasPriority(String priority) {
15-
return (root, query, criteriaBuilder) ->
16-
criteriaBuilder.equal(root.get("priority"), priority);
21+
return hasField("priority", priority);
1722
}
1823

1924
public static Specification<Task> hasStatus(String status) {
20-
return (root, query, criteriaBuilder) ->
21-
criteriaBuilder.equal(root.get("status"), status);
25+
return hasField("status", status);
2226
}
2327

2428
public static Specification<Task> build(TaskFilter filter, String username) {
2529
Specification<Task> spec = Specification.where(hasUsername(username));
2630

27-
if (filter.getPriority() != null) {
28-
spec = spec.and(hasPriority(filter.getPriority()));
29-
}
30-
if (filter.getStatus() != null) {
31-
spec = spec.and(hasStatus(filter.getStatus()));
32-
}
31+
Optional.ofNullable(filter.getPriority())
32+
.ifPresent(priority -> spec.and(hasPriority(priority)));
33+
34+
Optional.ofNullable(filter.getStatus())
35+
.ifPresent(status -> spec.and(hasStatus(status)));
3336

3437
return spec;
3538
}

task-tracker-email-sender/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
<url/>
2828
</scm>
2929
<properties>
30-
<java.version>17</java.version>
30+
<java.version>21</java.version>
3131
</properties>
3232
<dependencies>
3333
<!-- common -->

task-tracker-scheduler/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
<name>task-tracker-scheduler</name>
1717
<description>Task Tracker Scheduler</description>
1818
<properties>
19-
<java.version>17</java.version>
19+
<java.version>21</java.version>
2020
</properties>
2121
<dependencies>
2222
<!-- common -->

task-tracker-scheduler/pom.xml~

Lines changed: 0 additions & 105 deletions
This file was deleted.
Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,26 @@
11
package com.metarash.tasktrackerscheduler.kafka.consumer;
22

33
import com.metarash.dto.TaskEvent;
4-
import com.metarash.tasktrackerscheduler.service.OverdueNotificationScheduler;
4+
import com.metarash.tasktrackerscheduler.service.NotificationScheduler;
55
import lombok.RequiredArgsConstructor;
6+
import lombok.extern.slf4j.Slf4j;
67
import org.springframework.kafka.annotation.KafkaListener;
78
import org.springframework.stereotype.Component;
89

10+
@Slf4j
911
@Component
1012
@RequiredArgsConstructor
1113
public class TaskEventListener {
1214

13-
private final OverdueNotificationScheduler scheduler;
15+
private final NotificationScheduler notificationScheduler;
1416

1517
@KafkaListener(
1618
topics = "task-events",
1719
groupId = "scheduler-group",
1820
containerFactory = "kafkaListenerContainerFactory"
1921
)
2022
public void onTaskEvent(TaskEvent event) {
21-
scheduler.updateDueDateIfEarlier(event.getDueDate());
23+
log.info("Received task event: {}", event);
24+
notificationScheduler.scheduleNotification(event);
2225
}
2326
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package com.metarash.tasktrackerscheduler.service;
2+
3+
import com.metarash.dto.TaskEvent;
4+
import lombok.RequiredArgsConstructor;
5+
import lombok.extern.slf4j.Slf4j;
6+
import org.springframework.stereotype.Component;
7+
8+
import java.time.Duration;
9+
import java.time.LocalDateTime;
10+
import java.util.Comparator;
11+
import java.util.PriorityQueue;
12+
import java.util.Timer;
13+
import java.util.TimerTask;
14+
import java.util.concurrent.locks.ReentrantLock;
15+
16+
@Slf4j
17+
@Component
18+
@RequiredArgsConstructor
19+
public class NotificationScheduler {
20+
private final NotificationSender notificationSender;
21+
22+
23+
private final PriorityQueue<ScheduledTask> queue =
24+
new PriorityQueue<>(Comparator.comparing(ScheduledTask::dueDate));
25+
26+
private final ReentrantLock lock = new ReentrantLock();
27+
28+
private final Timer timer = new Timer();
29+
30+
public void scheduleNotification(TaskEvent event) {
31+
lock.lock();
32+
try {
33+
queue.add(new ScheduledTask(event.getTaskId(), event.getDueDate()));
34+
ScheduledTask peek = queue.peek();
35+
if (peek == null) {
36+
log.info("No tasks in queue - nothing to schedule");
37+
return;
38+
}
39+
if (peek.dueDate().equals(event.getDueDate())) {
40+
reschedule();
41+
}
42+
} finally {
43+
lock.unlock();
44+
}
45+
}
46+
47+
private void reschedule() {
48+
lock.lock();
49+
try {
50+
timer.cancel();
51+
52+
ScheduledTask nextTask = queue.peek();
53+
if (nextTask == null) {
54+
log.info("No tasks in queue - nothing to schedule");
55+
return;
56+
}
57+
58+
long delay = Duration.between(LocalDateTime.now(), nextTask.dueDate())
59+
.toMillis();
60+
61+
if (delay < 0) {
62+
log.warn("Task is already overdue: {}", nextTask.taskId());
63+
delay = 0;
64+
}
65+
66+
timer.schedule(new TimerTask() {
67+
@Override
68+
public void run() {
69+
handleDueTask();
70+
}
71+
}, delay);
72+
73+
log.info("Scheduled task {} for execution in {} ms", nextTask.taskId(), delay);
74+
} finally {
75+
lock.unlock();
76+
}
77+
}
78+
79+
private void handleDueTask() {
80+
lock.lock();
81+
try {
82+
ScheduledTask task = queue.poll();
83+
if (task == null) {
84+
log.warn("No task found in queue - possible race condition");
85+
return;
86+
}
87+
notificationSender.sendNotification(task.taskId());
88+
89+
if (!queue.isEmpty()) {
90+
reschedule();
91+
}
92+
} finally {
93+
lock.unlock();
94+
}
95+
}
96+
97+
private record ScheduledTask(Long taskId, LocalDateTime dueDate) {}
98+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package com.metarash.tasktrackerscheduler.service;
2+
3+
import com.metarash.dto.NotificationMessage;
4+
import com.metarash.tasktrackerscheduler.entity.Task;
5+
import com.metarash.tasktrackerscheduler.kafka.producer.ReportProducer;
6+
import com.metarash.tasktrackerscheduler.repository.TaskRepository;
7+
import lombok.RequiredArgsConstructor;
8+
import lombok.extern.slf4j.Slf4j;
9+
import org.springframework.stereotype.Component;
10+
11+
import java.util.concurrent.ExecutorService;
12+
import java.util.concurrent.Executors;
13+
14+
@Slf4j
15+
@Component
16+
@RequiredArgsConstructor
17+
public class NotificationSender {
18+
private final ReportProducer reportProducer;
19+
private final TaskRepository taskRepository;
20+
private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
21+
22+
public void sendNotification(Long taskId) {
23+
executor.submit(() -> {
24+
try {
25+
Task task = taskRepository.findById(taskId)
26+
.orElseThrow(() -> new Exception("task not found with id: " + taskId));
27+
28+
NotificationMessage message = buildMessage(task);
29+
reportProducer.sendOverdueTasks(message);
30+
log.info("Sent notification for task {}", taskId);
31+
} catch (Exception ex) {
32+
log.error("Failed to send notification for task {}", taskId, ex);
33+
// Здесь можно добавить повторную попытку
34+
}
35+
});
36+
}
37+
38+
private NotificationMessage buildMessage(Task task) {
39+
return new NotificationMessage(
40+
task.getId(),
41+
task.getTitle(),
42+
String.format("Задача просрочена: '%s'", task.getTitle()),
43+
task.getUser().getUsername(),
44+
task.getDueDate()
45+
);
46+
}
47+
}

0 commit comments

Comments
 (0)