-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwatcher.py
More file actions
63 lines (49 loc) · 2.13 KB
/
watcher.py
File metadata and controls
63 lines (49 loc) · 2.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from __future__ import annotations
import time
from pathlib import Path
from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer
class FolderEventHandler(FileSystemEventHandler):
def __init__(self, processor, logger, dry_run: bool = False) -> None:
self.processor = processor
self.logger = logger
self.dry_run = dry_run
self._processed: set[str] = set()
def on_any_event(self, event: FileSystemEvent) -> None:
if event.is_directory:
return
src = Path(event.src_path)
dest = Path(event.dest_path) if hasattr(event, "dest_path") and event.dest_path else None
path = dest if dest and dest.is_file() else src
if not path.is_file():
return
key = str(path.resolve())
if key in self._processed:
return
self._processed.add(key)
self.logger.info(f"Detected: {path.name}")
result = self.processor.process(path, dry_run=self.dry_run)
self.logger.info(f"Result: status={result.status} dst={result.destination}")
# 処理済みキャッシュは30秒後にクリア(同名ファイルの再投入に対応)
# シンプルに次のイベントで古いものを掃除
if len(self._processed) > 100:
self._processed.clear()
class FolderWatcher:
def __init__(self, watch_dir: str | Path, processor, logger, dry_run: bool = False) -> None:
self.watch_dir = Path(watch_dir)
self.processor = processor
self.logger = logger
self.dry_run = dry_run
self.observer = Observer()
def start(self) -> None:
handler = FolderEventHandler(self.processor, self.logger, dry_run=self.dry_run)
self.observer.schedule(handler, str(self.watch_dir), recursive=False)
self.observer.start()
self.logger.info(f"Watching directory: {self.watch_dir.resolve()}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
self.logger.info("Stopping watcher")
self.observer.stop()
self.observer.join()