-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscript.py
More file actions
292 lines (249 loc) · 9.38 KB
/
script.py
File metadata and controls
292 lines (249 loc) · 9.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
#Python3
from subprocess import *
import os
import sys
import shutil
import re
import concurrent.futures
FORENSIC_READ_ONLY_FILE = "./udev/01-forensic-readonly.rules"
TOOLS_DIRECTORY = "./tools"
RULES_DST_PATH = "/etc/udev/rules.d"
TOOLS_DST_PATH = "/usr/sbin"
"""
Module to perform actions on a given service using systemctl
Usage: service_ops(<Action(Eg: "start", "stop")>, <Service Name>)
"""
def service_ops(action, service_name):
base = ["systemctl"]
implementCommand = base + [action, service_name]
# implementCommand = implementCommand.append(action)
# implementCommand = implementCommand.append(service_name)
try:
check_output(implementCommand)
except CalledProcessError as cpe:
if (cpe.returncode == 5):
print(f"ERROR: Service name {service_name} does not exist!")
elif (cpe.returncode == 4):
print(f"ERROR: Execute script with sudo privileges!")
else:
statusCheckCommand = base + ["status"]
statusCheckCommand = statusCheckCommand + [service_name]
proc = Popen(statusCheckCommand, stdout=PIPE, stderr=PIPE)
serviceStatus = proc.communicate()[0].decode("utf-8")
# print(serviceStatus)
if action == "stop":
stopCheck(service_name, serviceStatus)
if action == "start":
startCheck(service_name, serviceStatus)
def startCheck(service_name, serviceStatus):
if "Active: active" in serviceStatus:
print("Service %s started." % (service_name))
else:
print("Unable to start %s service." % service_name)
def stopCheck(service_name, serviceStatus):
if "Active: inactive" in serviceStatus:
print("Service %s stopped." % (service_name))
else:
print("Unable to stop %s service." % service_name)
def implementPatch():
try:
shutil.copy(FORENSIC_READ_ONLY_FILE, RULES_DST_PATH)
except Exception as e:
print(e)
sys.exit()
try:
files = os.listdir(TOOLS_DIRECTORY)
if not os.path.exists(TOOLS_DST_PATH):
os.mkdir(TOOLS_DST_PATH)
for f in files:
shutil.copy2(os.path.join(TOOLS_DIRECTORY,f), TOOLS_DST_PATH)
except Exception as e:
print (e)
sys.exit()
def deviceDetection():
proc = Popen("blkid", stdout=PIPE, shell=True)
beforeInsertion = proc.communicate()[0].decode('utf-8')
count = 1
devices = []
# devices = {}
input("Insert your device now.\nPress any key after insertion.")
proc = Popen("blkid", stdout=PIPE, shell=True)
afterInsertion = proc.communicate()[0].decode('utf-8')
beforeInsertion = beforeInsertion.splitlines()
afterInsertion = afterInsertion.splitlines()
for line in afterInsertion:
if line not in beforeInsertion:
if "LABEL" in line:
path = re.findall('(.*?)\:', line)[0]
label = re.findall('LABEL=\"(.*?)\"', line)[0]
devices.append([path, label])
else:
path = re.findall('(.*?)\:', line)[0]
# label = "NO LABEL"
devices.append([path, "No Label"])
if path:
# devices.append(label)
print("{}: {} -> {}".format(count, path, label))
count += 1
if len(devices) == 0:
print ("""
No new device detected!
Please follow the steps mentioned below:
1. Remove the evidence device
2. Start the script again
3. Reinsert the evidence device only afteer prompted""")
else:
confirmation = input("Enter the index number of your device.")
confirmation = int(confirmation)
if confirmation <= count:
return devices[confirmation-1]
else:
return False
# for line in beforeInsertion:
# if "LABEL" in line:
# label = re.findall('LABEL=\"(.*?)\"', line)
# else:
# label = re.findall('(.*?)\:', line)
# if label:
# devices.append(label[0])
# print("{}: {}".format(count, label[0]))
# count += 1
# confirmation = input("Enter the number of your device:\nIf your device is not listed then enter any other key and reinsert your device after starting the script.")
# confirmation = int(confirmation)
# if confirmation <= count:
# return devices[confirmation-1]
# else:
# return False
def deviceBlock(path):
print("Enabling read only for block: {} ".format(path), end="")
try:
run(["blockdev", "--setro", path])
print("--> Enabled!")
return True
except Exception as e:
print ("\n", e)
sys.exit()
def deviceMount(path):
print("Mounting the device at {} to /mnt/evidence ".format(path), end="")
try:
if not os.path.exists("/mnt/evidence"):
os.makedirs("/mnt/evidence")
run(["mount", "-o", "ro", path, "/mnt/evidence"])
print("--> Mounted!")
return True
except Exception as e:
print ("\n", e)
sys.exit()
def deviceImageCreation(path, name):
print("Creating image for {}".format(path))
print("Image will be stored at: ./{}.dd".format(name))
try:
run(["dd", "if={}".format(path), "of=./{}.dd".format(name), "conv=noerror,sync", "status=progress"])
print("Image created and stored at: ./{}.dd".format(name))
return True
except Exception as e:
print ("\n", e)
sys.exit()
def hashCalc(path):
print("Generating Hash for {}".format(path))
proc = Popen(["md5sum", path], stdout=PIPE, stderr=PIPE)
serviceStatus = proc.communicate()[0].decode("utf-8")
serviceStatus = serviceStatus.split()
return serviceStatus[0]
def hashMatch(path, name):
paths = [path, "./{}.dd".format(name)]
hashes = []
print ("Hashes will be tested for\n1. {}\n2. {}".format(path, name))
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for p in paths:
futures.append(executor.submit(hashCalc, p))
for future in concurrent.futures.as_completed(futures):
hashes.append(future.result())
if hashes[0] == hashes[1]:
print("Hash is: {}".format(hashes[0]))
return True
else:
print("Hash Mismatch!\n{} -> {}\n{}.dd -> {}".format(path, hashes[0], name, hashes[1]))
return False
def deviceUnmount(path):
print("Unmounting device {} from /mnt/evidence.".format(path), end="")
try:
run(["umount", "/mnt/evidence"])
print("--> Unmounted")
return True
except Exception as e:
print("\n", e)
sys.exit()
def deviceUnplug(path):
input("Remove the device and press any key to continue.")
proc = Popen("blkid", stdout=PIPE, shell=True)
devices = proc.communicate()[0].decode('utf-8')
if path not in devices:
return True
else:
return False
if __name__ == "__main__":
if not os.geteuid() == 0:
print("Run this script with sudo.")
sys.exit()
print("### Write Blocker Script ###")
print("P.S. Run this script only on Linux Machines")
print("Do not insert the evidence device until instructed to do so!")
print("If already inserted then remove it!")
input("Press enter key when you are ready!")
print("\n", end="")
service_ops("stop", "udisks2.service")
print("\n", end="")
deviceDetected = deviceDetection()
if deviceDetected:
print ("You have selected {} as your target device.\n".format(deviceDetected[1]))
else:
print("Exiting!\n")
sys.exit()
blockBlockStatus = deviceBlock(deviceDetected[0])
if not blockBlockStatus:
print("Issue with Blocking {}".format(deviceDetected[0]))
sys.exit()
print("\n", end="")
deviceMountStatus = deviceMount(deviceDetected[0])
if not deviceMountStatus:
print("Issue with Mounting {}".format(deviceDetected[0]))
sys.exit()
print("\n", end="")
createImage = input("Do you want to create an image? y/n: ")
if createImage == "y" or createImage == "yes" or createImage == "Y":
deviceImageCreationStatus = deviceImageCreation(deviceDetected[0], deviceDetected[1])
if not deviceImageCreationStatus:
print("Issue with Image Creation for {}".format(deviceDetected[0]))
sys.exit()
print("\n", end="")
hashMatchResult = hashMatch(deviceDetected[0], deviceDetected[1])
if hashMatchResult:
print("Hashes matched!")
else:
print("Hashes do not match!")
print("\n", end="")
else:
print("Image creation and hash calculation skipped!")
input("Press any key when you want to unmount the device and enable automounting.")
print("\n", end="")
deviceUnmountStatus = deviceUnmount(deviceDetected[0])
if not deviceUnmountStatus:
print("Device Unmount Failed!")
sys.exit()
print("\n", end="")
deviceUnplugStatus = deviceUnplug(deviceDetected[0])
if deviceUnplugStatus:
print("Device Removed Successfully!")
else:
print("Issue with device removal!")
sys.exit()
print("\n", end="")
service_ops("start", "udisks2.service")
'''
REFERENCES
1. https://www.reddit.com/r/learnpython/comments/4ijwob/python_code_to_detect_connected_devices/
2. https://github.com/msuhanov/Linux-write-blocker
3. https://linuxhint.com/list-usb-devices-linux/
'''