-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathatx_data_processor_workers.py
More file actions
95 lines (86 loc) · 2.89 KB
/
atx_data_processor_workers.py
File metadata and controls
95 lines (86 loc) · 2.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import multiprocessing as mpu
import sys
import uuid
import socket
from rq import Worker, Connection
from src.common.logger import Logger
from src.app import server
log = Logger()
redis_conn = server.get_redis().get_redis_conn()
def get_ip_and_host():
ip = "0.0.0.0"
host = ""
try:
host = socket.gethostname()
ip = socket.gethostbyname(socket.gethostname())
except Exception as e:
log.info(f"get_ip_and_host : Exception occurred {e}")
ip = socket.gethostbyname("")
return ip + "_" + host
def start_worker(process_count):
log.info(f"starting worker {process_count}")
with Connection(redis_conn):
if process_count % 4 == 0:
w = Worker(
["atx_dp_high_priority_job_queue", "atx_dp_low_priority_job_queue"],
connection=redis_conn,
name="atx_dp_worker"
+ str(process_count)
+ "_"
+ str(get_ip_and_host())
+ "_"
+ str(uuid.uuid4()),
)
w.work()
if process_count % 4 == 3:
w = Worker(
["atx_dp_medium_priority_job_queue", "atx_dp_low_priority_job_queue"],
connection=redis_conn,
name="atx_dp_worker"
+ str(process_count)
+ "_"
+ str(get_ip_and_host())
+ "_"
+ str(uuid.uuid4()),
)
w.work()
if process_count % 4 == 2:
w = Worker(
["atx_dp_high_priority_job_queue", "atx_dp_medium_priority_job_queue"],
connection=redis_conn,
name="atx_dp_worker"
+ str(process_count)
+ "_"
+ str(get_ip_and_host())
+ "_"
+ str(uuid.uuid4()),
)
w.work()
if process_count % 4 == 1:
w = Worker(
[
"atx_dp_high_priority_job_queue",
"atx_dp_medium_priority_job_queue",
"atx_dp_low_priority_job_queue",
],
connection=redis_conn,
name="atx_dp_worker"
+ str(process_count)
+ "_"
+ str(get_ip_and_host())
+ "_"
+ str(uuid.uuid4()),
)
w.work()
if __name__ == "__main__":
if len(sys.argv) > 1:
worker_id = sys.argv[1]
log.info(f"server core count: {mpu.cpu_count()}; worker id:{worker_id}")
pool = mpu.Pool(processes=mpu.cpu_count())
pool.map(
start_worker, tuple(list(range(int(worker_id), mpu.cpu_count() * 4, 4)))
)
else:
log.info(f"server core count: {mpu.cpu_count()}")
pool = mpu.Pool(processes=mpu.cpu_count())
pool.map(start_worker, tuple(list(range(1, mpu.cpu_count() + 1))))