-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsender_base.py
More file actions
36 lines (29 loc) · 1.02 KB
/
sender_base.py
File metadata and controls
36 lines (29 loc) · 1.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import threading
from typing import Callable, List, Optional
LogCallback = Callable[[str], None]
LineCallback = Callable[[int], None]
ErrorCallback = Callable[[str], None]
class GcodeSenderBase(threading.Thread):
def __init__(
self,
gcode_lines: List[str],
on_log: Optional[LogCallback] = None,
on_line_start: Optional[LineCallback] = None,
on_error: Optional[ErrorCallback] = None,
on_done: Optional[Callable[[], None]] = None,
):
super().__init__(daemon=True)
self.gcode_lines = gcode_lines
self.on_log = on_log
self.on_line_start = on_line_start
self.on_error = on_error
self.on_done = on_done
self._running = True
def stop(self):
self._running = False
def _emit_log(self, msg: str):
if self.on_log: self.on_log(msg)
def _emit_error(self, msg: str):
if self.on_error: self.on_error(msg)
def _emit_line_start(self, idx: int):
if self.on_line_start: self.on_line_start(idx)