-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDOSpacesWrapper.py
More file actions
384 lines (319 loc) · 14.1 KB
/
DOSpacesWrapper.py
File metadata and controls
384 lines (319 loc) · 14.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
import os
import boto3
from botocore.exceptions import ClientError
from botocore.config import Config
class DOSpacesWrapper:
def __init__(self):
"""
Initializes the DOSpacesWrapper object with the necessary configurations for interacting with DigitalOcean Spaces.
"""
self.session = boto3.session.Session()
self.client = self.session.client(
's3',
region_name=os.getenv('DO_SPACES_REGION'),
endpoint_url=f"https://nyc3.digitaloceanspaces.com",
aws_access_key_id=os.getenv('DO_SPACES_KEY_ID'),
aws_secret_access_key=os.getenv('DO_SPACES_SECRET_KEY'),
config=Config(s3={'addressing_style': 'virtual'}),
# verify=False
)
self.signature_version='s3v4'
self.use_ssl=True
self.file_overwrite=True
self.aws_media_location='storage'
self.object_parameters={'CacheControl': 'max-age=86400'}
self.default_acl='public-read-write'
self.bucket_name = os.getenv('DO_SPACES_BUCKET_NAME')
self.querystring_expire = 3600
self.querystring_auth = True
self.max_memory_size = 0 # don't roll over
self.retries = {
'max_attempts': 5,
'mode': 'standard'
}
self.encryption_scheme = {'ServerSideEncryption': 'AES256'}
def connectToBucket(self, bucketName=None):
"""Connects to a DO Spaces bucket.
Args:
bucketName: The name of the bucket to connect to (optional).
If not provided, uses the bucket name from the class initialization.
Raises:
ClientError: If an error occurs while connecting to the bucket.
"""
if bucketName is None:
bucketName = self.bucket_name
try:
self.client.head_bucket(Bucket=bucketName)
print(f"Successfully connected to bucket: {bucketName}")
return bucketName
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == '404':
print(f"Bucket '{bucketName}' does not exist.")
else:
print(f"An error occurred: {e}")
def listBuckets(self):
"""
Lists all DigitalOcean Spaces buckets.
"""
try:
response = self.client.list_buckets()
print("DigitalOcean Spaces buckets:")
for bucket in response['Buckets']:
print(f" {bucket['Name']}")
except ClientError as e:
print(f"An error occurred: {e}")
def createBucket(self):
"""
Creates a new DigitalOcean Spaces bucket.
"""
try:
self.client.create_bucket(Bucket=self.bucket_name)
print(f"Successfully created bucket: {self.bucket_name}")
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'BucketAlreadyOwnedByYou':
print(f"Bucket '{self.bucket_name}' already exists.")
else:
print(f"An error occurred: {e}")
def createFolder(self, folderPath):
"""
Creates a folder (directory) within the DigitalOcean Spaces bucket.
Args:
folderPath (str): The path of the folder to be created.
"""
try:
self.client.put_object(Bucket=self.bucket_name, Key=f"{folderPath}/", Body=b'')
print(f"Successfully created folder: {folderPath}")
except ClientError as e:
print(f"An error occurred while creating the folder: {e}")
def folderExists(self, folderPath):
"""
Checks if a folder exists within the DigitalOcean Spaces bucket.
Args:
folderPath (str): The path of the folder to check.
Returns:
bool: True if the folder exists, False otherwise.
"""
try:
self.client.head_object(Bucket=self.bucket_name, Key=f"{folderPath}/")
return True
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == '404':
return False
else:
print(f"An error occurred while checking folder existence: {e}")
return False
def fileExists(self, filePath):
"""
Checks if a file exists within the DigitalOcean Spaces bucket.
Args:
filePath (str): The path of the file to check.
Returns:
bool: True if the file exists, False otherwise.
"""
try:
self.client.head_object(Bucket=self.bucket_name, Key=filePath)
return True
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == '404':
return False
else:
print(f"An error occurred while checking file existence: {e}")
return False
def uploadFile(self, filePath, fileData):
"""
Uploads a file to the DigitalOcean Spaces bucket.
Args:
filePath (str): The path where the file should be uploaded.
fileData (bytes or file-like object): The file data to be uploaded.
"""
try:
self.client.put_object(Bucket=self.bucket_name, Key=filePath, Body=fileData)
print(f"Successfully uploaded file: {filePath}")
except ClientError as e:
print(f"An error occurred while uploading the file: {e}")
def updateFile(self, filePath, fileData):
"""
Updates a file in the DigitalOcean Spaces bucket.
Args:
filePath (str): The path of the file to be updated.
fileData (bytes or file-like object): The new file data.
"""
try:
self.client.put_object(Bucket=self.bucket_name, Key=filePath, Body=fileData)
print(f"Successfully updated file: {filePath}")
except ClientError as e:
print(f"An error occurred while updating the file: {e}")
def deleteFile(self, filePath):
"""
Deletes a file from the DigitalOcean Spaces bucket.
Args:
filePath (str): The path of the file to be deleted.
"""
try:
self.client.delete_object(Bucket=self.bucket_name, Key=filePath)
print(f"Successfully deleted file: {filePath}")
except ClientError as e:
print(f"An error occurred while deleting the file: {e}")
def deleteFolder(self, folderPath):
"""
Deletes a folder and its contents from the DigitalOcean Spaces bucket.
Args:
folderPath (str): The path of the folder to be deleted.
"""
try:
paginator = self.client.get_paginator('list_objects')
operation_parameters = {'Bucket': self.bucket_name, 'Prefix': folderPath}
page_iterator = paginator.paginate(**operation_parameters)
for page in page_iterator:
if 'Contents' in page:
delete_keys = [{'Key': obj['Key']} for obj in page['Contents']]
self.client.delete_objects(Bucket=self.bucket_name, Delete={'Objects': delete_keys})
print(f"Successfully deleted folder: {folderPath}")
except ClientError as e:
print(f"An error occurred while deleting the folder: {e}")
def listFolders(self, prefix=''):
"""
Lists all folders within the DigitalOcean Spaces bucket or a specified prefix.
Args:
prefix (str, optional): The prefix to filter the folder list. Defaults to ''.
Returns:
list: A list of folder paths.
"""
try:
paginator = self.client.get_paginator('list_objects_v2')
operation_parameters = {'Bucket': self.bucket_name, 'Prefix': prefix, 'Delimiter': '/'}
page_iterator = paginator.paginate(**operation_parameters)
folders = []
for page in page_iterator:
if 'CommonPrefixes' in page:
folders.extend(prefix.replace(prefix, '') for prefix in page['CommonPrefixes'])
return folders
except ClientError as e:
print(f"An error occurred while listing folders: {e}")
return []
def listFolderContents(self, folderPath):
"""
Lists the contents (files and folders) within a specific folder in the DigitalOcean Spaces bucket.
Args:
folderPath (str): The path of the folder to list contents for.
Returns:
list: A list of file and folder paths within the specified folder.
"""
print("Listing folder contents of ", folderPath)
# check if the folder exists first
if not self.folderExists(folderPath):
raise Exception(f"Folder {folderPath} does not exist")
try:
paginator = self.client.get_paginator('list_objects_v2')
operation_parameters = {'Bucket': self.bucket_name, 'Prefix': f'{folderPath}/', 'Delimiter': '/'}
page_iterator = paginator.paginate(**operation_parameters)
contents = []
for page in page_iterator:
if 'Contents' in page:
for object in page['Contents']:
key = object['Key']
if key != f'{folderPath}/': # Exclude the folder path itself
contents.append(key)
if 'CommonPrefixes' in page:
for prefix in page['CommonPrefixes']:
if prefix['Prefix'] != f'{folderPath}/': # Exclude the folder path itself
contents.append(prefix['Prefix'])
return contents
except ClientError as e:
print(f"An error occurred while listing folder contents: {e}")
return []
def streamFileContent(self, filePath, chunkSize=8192):
"""
Streams the content of a file from the DigitalOcean Spaces bucket.
Args:
filePath (str): The path of the file to stream.
chunkSize (int, optional): The size of each chunk in bytes. Defaults to 8192.
Yields:
bytes: The next chunk of the file content.
"""
try:
response = self.client.get_object(Bucket=self.bucket_name, Key=filePath)
stream = response['Body']._raw_stream
while True:
chunk = stream.read(chunkSize)
if not chunk:
break
yield chunk
except ClientError as e:
print(f"An error occurred while streaming the file content: {e}")
def readFile(self, filePath, chunkSize=8388608):
"""
Reads data from a file in the DigitalOcean Spaces bucket.
Args:
filePath (str): The path of the file to read from.
chunkSize (int, optional): The size of each chunk in bytes. Defaults to 8388608 (8MB).
Yields:
bytes: The next chunk of the file content.
"""
try:
response = self.client.get_object(Bucket=self.bucket_name, Key=filePath)
stream = response['Body']._raw_stream
while True:
chunk = stream.read(chunkSize)
if not chunk:
break
yield chunk
except ClientError as e:
print(f"An error occurred while reading from the file: {e}")
def multipartUpload(self, filePath):
"""
Initiates a multipart upload and returns an upload ID.
Args:
filePath (str): The path of the file for which the multipart upload will be initiated.
Returns:
str: The upload ID for the initiated multipart upload.
"""
try:
response = self.client.create_multipart_upload(Bucket=self.bucket_name, Key=filePath)
return response['UploadId']
except ClientError as e:
print(f"An error occurred while initiating the multipart upload: {e}")
return None
def uploadFileChunked(self, filePath, fileData, chunkSize=8388608): # 8MB chunks
"""
Uploads a file to the DigitalOcean Spaces bucket in chunks.
Args:
filePath (str): The path where the file should be uploaded.
fileData (bytes or file-like object): The file data to be uploaded.
chunkSize (int, optional): The size of each chunk in bytes. Defaults to 8388608 (8MB).
"""
try:
if hasattr(fileData, 'read'): # If fileData is a file-like object
parts = []
part_number = 1
upload_id = self.client.create_multipart_upload(Bucket=self.bucket_name, Key=filePath)['UploadId']
while True:
chunk = fileData.read(chunkSize)
if not chunk:
break
part = self.client.upload_part(Body=chunk, Bucket=self.bucket_name, Key=filePath, PartNumber=part_number, UploadId=upload_id)
parts.append({'PartNumber': part_number, 'ETag': part['ETag']})
part_number += 1
self.client.complete_multipart_upload(Bucket=self.bucket_name, Key=filePath, MultipartUpload={'Parts': parts}, UploadId=upload_id)
else: # If fileData is bytes
self.client.put_object(Bucket=self.bucket_name, Key=filePath, Body=fileData)
print(f"Successfully uploaded file: {filePath}")
except ClientError as e:
print(f"An error occurred while uploading the file: {e}")
def getActualFileNames(filePaths):
"""Extracts filenames from a list of file paths.
Args:
file_paths (list): A list of file paths.
Returns:
list: A list containing only the filenames extracted from the paths.
"""
filenames = []
for path in filePaths:
# Split the path and extract the last element (filename)
filename = path.split("/")[-1]
filenames.append(filename)
return filenames