-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmaster_core.py
More file actions
102 lines (92 loc) · 4.41 KB
/
master_core.py
File metadata and controls
102 lines (92 loc) · 4.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# receive request from the client (filename) -> respond with a list of ports: download filename
# upload request from client: upload
# dataKeeper sends upload success upload message: (upload success: filename dataKeeper_id user_id file_path size)
# dataKeeper sends download success message: (download success: ip port)
import parse
import zmq
from functions import *
class Master:
def __init__(self, port, videos, keepers, lv, lk):
self.videos = videos
self.keepers = keepers
self.lv = lv
self.lk = lk
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REP)
self.socket.bind("tcp://{}:{}".format("*", port))
def run(self):
while True:
request = self.socket.recv_string()
# download filename
if request.startswith("download"):
filename = str(parse.parse("download {}", request)[0])
keeper_data = self.videos.get(filename)
if keeper_data is None:
self.socket.send_json(None)
continue
keeper_ips = keeper_data[0]
file_size = self.videos[filename][3]
ips = []
ports = []
for ip in keeper_ips:
if not self.keepers[ip][-1]:
continue
for port, busy in self.keepers[ip][0].items():
if not busy:
ips.append(ip)
ports.append(port)
set_busy(self.keepers, self.lk, ip, port, True)
break
self.socket.send_json((ips, ports, file_size))
elif request.startswith("upload"):
keepers_used_storage = {}
for ip in self.keepers.keys():
if not self.keepers[ip][-1]:
continue
keepers_used_storage[ip] = 0
for file, data in self.videos.items():
for ip in data[0]:
if not self.keepers[ip][-1]:
continue
keepers_used_storage[ip] += data[3]
keepers_ips = [ip for ip, used_storage in
sorted(keepers_used_storage.items(), key=lambda item: item[1])]
chosen_ip = None
chosen_port = None
for ip in keepers_ips:
if not self.keepers[ip][-1]:
continue
for port, busy in self.keepers[ip][0].items():
if not busy:
chosen_ip = ip
chosen_port = port
break
if chosen_ip is not None:
break
if chosen_ip is not None:
self.socket.send_json((chosen_ip, chosen_port))
set_busy(self.keepers, self.lk, chosen_ip, chosen_port, True)
else:
self.socket.send_json(None)
elif request.startswith("successfully_uploaded"):
parsed = parse.parse("successfully_uploaded: {} {} {} {} {} {}", request)
filename = str(parsed[0])
data_keeper_ip = parsed[1]
data_keeper_port = int(parsed[2])
user_id = int(parsed[3])
file_path = str(parsed[4])
size = int(parsed[5])
add_video(self.videos, self.lv, filename, {data_keeper_ip : True}, user_id, file_path, size)
set_busy(self.keepers, self.lk, data_keeper_ip, data_keeper_port, False)
self.socket.send_string("OK")
print('User: {} uploaded "{}" successfully to {}:{}'.format(user_id, filename, data_keeper_ip,
data_keeper_port))
elif request.startswith("successfully_downloaded"):
parsed = parse.parse("successfully_downloaded: {} {}", request)
data_keeper_ip = parsed[0]
data_keeper_port = int(parsed[1])
set_busy(self.keepers, self.lk, data_keeper_ip, data_keeper_port, False)
self.socket.send_string("OK")
def init_master_process(port, videos, keepers, lv, lk):
master = Master(port, videos, keepers, lv, lk)
master.run()