-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsumer2.py
More file actions
58 lines (45 loc) · 1.83 KB
/
consumer2.py
File metadata and controls
58 lines (45 loc) · 1.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from kafka import KafkaConsumer
from kafka import TopicPartition
import json
import sys
import base64
import os
bootstrap_server = '{provide kafka broker address and port number}'
# Define the partition numbers you want to consume from
partition_numbers = [0,1] # Change Plzz. [0,1] [1,2] [2,3]
# Initialize consumer variable and manually assign the partitions
consumer = KafkaConsumer(
bootstrap_servers=bootstrap_server,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
group_id="Consumer_Group_0" # Change Plzz...
)
# Manually assign the partitions to consume from based on partition_numbers
topic = 'hello_world.' # Replace with your Kafka topic name
partitions_to_assign = [TopicPartition(topic, partition_number) for partition_number in partition_numbers]
consumer.assign(partitions_to_assign)
data_dir = os.path.join(os.getcwd(), "data")
metadata_list = [] # List to store metadata entries
# Read and print messages from the assigned partitions
for msg in consumer:
file_name = msg.value['name']
img_data = base64.b64decode(msg.value['img'])
# Store metadata in a dictionary
metadata = {
'filename': file_name,
'topic': topic,
'consumer_group': "Consumer_Group_0", # Change Plzzz.
'partition_number': msg.partition
}
# Append metadata to the list
# metadata_list.append(metadata)
with open(os.path.join(data_dir, file_name), mode='wb') as file:
file.write(img_data)
# Print metadata and a message indicating the file is saved
print('Metadata:', metadata)
# print('{} File saved'.format(file_name))
metadata_file_path = os.path.join(data_dir, 'metadata.json')
with open(metadata_file_path, 'a') as metadata_file:
json.dump(metadata, metadata_file, indent=4)
# Terminate the script
sys.exit()