-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAPRSFriendAlert.py
More file actions
178 lines (149 loc) · 8.89 KB
/
APRSFriendAlert.py
File metadata and controls
178 lines (149 loc) · 8.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import config as cf
from OpenRouteService import OpenRouteService
from APRS import APRS
from tests.dummyAPRS import dummyAPRS
from TelegramChatManager import TelegramChatManager
import logging
import os
import numpy as np
from time import sleep
class APRSFriendAlert:
"""The main class for the whole application. It provides the requred logic."""
class ErrorHandler(logging.Handler):
""" An inner class which provides the error handler to also send messages to telegram."""
def __init__(self, messageCallback, level = 'ERROR') -> None:
"""Here you define a function to be called with a chat ID and message and a logging level"""
super().__init__()
self.messageCallback = messageCallback # store callback function
try:
self.level = int(logging.getLevelName(level)) # try to get the level, revert to 'ERROR' if it fails
except:
self.level = 40
cf.log.error('[AFA] Could not determine the logging-level for telegram messages. Default: ERROR')
def emit(self, record):
""" The function which is called on each logging event, pushes the errors and worse to telegram"""
if record.levelno >= self.level: # record passt the threshold
try:
self.messageCallback(cf.MASTER_CHATID,'\U0001F6A7 LOGGING EVENT \U0001F6A7\n[' + record.levelname + '] ' + record.message)
except: # this exception is not needed, the error is logged anyways
pass
def __init__(self, dummy=False):
"""Constructor for the basic logic of this bot. This Object holds all subroutines/objects needed to work. """
# Flags
self.following = False
self.dest = None
self.alertees = None
self.ALERT_TIMES = [-1, 60, 30, 10, 5, 1] # the intervals during which the alertees are alerted. -1 means the inital away time, regardless of how far that is
self.alertState = [False for i in range(len(self.ALERT_TIMES))]
self.toStr = 'you'
# Setup the API Bouncers
self.ors = OpenRouteService()
if dummy:
self.aprs = dummyAPRS(self.newAPRSData)
else:
self.aprs = APRS(self.newAPRSData)
self.tcm = TelegramChatManager(self.routeUpdate, self.ors.geocode, self.arrived)
cf.log.addHandler(self.ErrorHandler(self.tcm.sendMessage, os.getenv('TELEGRAM_LOGGING_LEVEL'))) # now add the telegram error handler
def main(self):
"""This function starts the telegram conversation handler. This function will not return, as long as the bot is running."""
self.tcm.execTCM()
@staticmethod
def getTimeStr(time):
"""Basic methond to convert minutes into a string in h and mins."""
hours, minutes = divmod(round(time), 60)
if hours == 0:
output_str = f"{minutes} min"
elif minutes == 0:
output_str = f"{hours} h"
else:
output_str = f"{hours} h and {minutes} min"
return output_str
def newAPRSData(self, coords):
"""This function gets called by the APRS Thread. New data is available and we can check the traveltime to the destionation and notiofy the users."""
if not self.following: # following stopped by someone, stop the aprs thread
self.aprs.stop()
return
cf.log.debug('[AFA] New APRS Data!')
if coords != self.dest:
response = self.ors.getRouteSummary(coords, self.dest) # now check how long it takes from the current position to the destination
else:
response = [0.0, 0.0]
if response != None:
# extract the distance and time
distance = np.round(response[0],1)
time = response[1]
timeStr = APRSFriendAlert.getTimeStr(time)
# now check weather we have to alert somebody
if self.alertState[0] == False: # the initial alert. Notify master an the clients
self.alertState[0] = True # set the flag for inital alert
self.alertState[1:] = [round(time) <= alert_time for alert_time in self.ALERT_TIMES[1:]] # set all flags for longer times. This route will not need every alert
if all(self.alertState): # we've arrived at the desitination already!?
self.tcm.sendMessage(cf.MASTER_CHATID, 'OOPS! \nIt seems you\'re already at your destination \U0001F3C1 \nI won\'t do anything further.')
self.aprs.stop()
self.following = False
return
# Ok now send the messages
self.tcm.sendMessage(cf.MASTER_CHATID, '\U0001F698 EN-ROUTE \U0001F698 \n' + os.getenv('APRS_FOLLOW_CALL') + ' is now beeing followed. Your route is ' + str(distance) + ' km long and will take ' + timeStr + '.')
url = 'https://aprs.fi/#!mt=roadmap&z=11&call=a%2F{}&timerange=3600&tail=3600'.format( os.getenv('APRS_FOLLOW_CALL'))
for alertee in self.alertees:
self.tcm.sendMessage(alertee, 'Great! \U0001F698\n'+ os.getenv('APRS_FOLLOW_CALL') + ' is on its way to '+ self.toStr +'!\n' + os.getenv('APRS_FOLLOW_CALL') + ' is currently ' + str(distance) + ' km and ' + timeStr + ' away.\n\nYou can see ' + os.getenv('APRS_FOLLOW_CALL')+'\'s position here:\n' + url)
cf.log.info('[AFA] Follow Process is started, first packet arrived successfully!')
else:
# now get the next index of the time we have to wait for.
# if the self.alertState is [True, True, False, False, False] nextAlertIx will be 2
nextAlertIx = np.where(self.alertState)[0][-1] + 1
# now check if the next time falls below the alert threshold
if round(time) <= self.ALERT_TIMES[nextAlertIx]:
message = ''
self.alertState[1:] = [round(time) <= alert_time for alert_time in self.ALERT_TIMES[1:]] # set the flag(s) if we skipped a alert
if all(self.alertState): # are we there yet?
message = '\U0001F6A8 ' + os.getenv('APRS_FOLLOW_CALL') + ' arrived! \U0001F3C1'
self.tcm.sendMessage(cf.MASTER_CHATID, 'You\'ve arrived at your destination! \U0001F3C1')
self.aprs.stop()
self.following = False
cf.log.info('[AFA] You have arrived at your desitination. Stopping processes...')
else:
message = '\U0001F697 ' + os.getenv('APRS_FOLLOW_CALL') + ' is currently ' + str(distance) + ' km and ' + timeStr + ' away.'
# message is built, send it
for alertee in self.alertees:
self.tcm.sendMessage(alertee, message)
cf.log.debug('[AFA] Messages have been set. Time to destination ' + timeStr)
else:
cf.log.debug('[AFA] Nobody to notify. Time to destination ' + timeStr)
def routeUpdate(self, dest, alertees, toStr = None):
"""This function is to be called by the TelegramChatManager and tells the main logic where the new route is going. It initiates or terminates the process. The desitination is None if the follow process is to be terminated, otherwise the desitnation coordinates. Alerts is a list of chatIDs which are to be alerted."""
if dest == None:
self.aprs.stop()
self.following = False
self.dest = None
self.alertees = None
self.alertState = [False, False, False, False, False, False]
self.toStr = 'you'
cf.log.info('[AFA] Quitting following process.')
else:
self.dest = dest
self.alertees = alertees
self.following = True
self.alertState = [False, False, False, False, False, False]
if toStr == None or toStr == '':
self.toStr = 'you'
else:
self.toStr = toStr
self.aprs.start()
cf.log.info('[AFA] New following process initiated.')
def arrived(self):
""" This function may be called from the TCM. It is used to manually trigger the arrival. It returns false, if there is no routing active. Otherwise true."""
if self.following == False:
return False
cf.log.info('[AFA] Manual arrival triggered.')
self.newAPRSData(self.dest)
return True
if __name__ == '__main__':
afa = APRSFriendAlert(dummy=False) # change to true for testing
afa.main()
afa.aprs.stop()
cf.log.info('[AFA] System shutting down.')
#tcm = TelegramChatManager(None, None)
#distance = 2300
#timeStr = APRSFriendAlert.getTimeStr(63)
#tcm.sendMessage(cf.MASTER_CHATID, 'OOPS! It seems you\'re already at your destination! I won\'t do anything further.' )