-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmain.py
More file actions
265 lines (188 loc) · 8.7 KB
/
main.py
File metadata and controls
265 lines (188 loc) · 8.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# Standard library imports
import os
import time
import json
from datetime import datetime
from threading import Thread
from subprocess import PIPE
# External library imports for Raspberry Pi hardware control
import board
import digitalio
import RPi.GPIO as GPIO
import adafruit_character_lcd.character_lcd as characterlcd
# Imports for camera and image processing
from picamera2 import Picamera2
from picamera2.encoders import JpegEncoder
from PIL import Image
# Imports for sound and voice synthesis
from gtts import gTTS
import pygame
# Imports for machine learning and model processing
from transformers import BlipProcessor, BlipForConditionalGeneration
# Module for handling warnings
import warnings
# Additional helper functions from time module already imported
from time import sleep
# Suppress less critical warnings during runtime
warnings.filterwarnings("ignore")
# Constants for GPIO
BUTTON_PIN = 16
SHORT_PRESS_TIME = 0.5 # Duration for identifying a short press in seconds (500 milliseconds)
DEBOUNCE_TIME = 0.1 # Time to ignore further changes to avoid bouncing in seconds (100 milliseconds)
# Setup base directory for file paths
base_dir = os.path.dirname(os.path.abspath(__file__))
# Sound files for camera and system sounds
sounds = dict(
start = os.path.join(base_dir, "sounds", "pi-start.mp3"),
camera = os.path.join(base_dir, "sounds", "camera-shutter.mp3"),
)
# GPIO setup for button input
GPIO.setmode(GPIO.BCM) # Set GPIO pin numbering
GPIO.setup(BUTTON_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP) # Configure button pin with pull-up resistor
# Create and configure the camera
picam2 = Picamera2()
picam2.configure(picam2.create_preview_configuration(main={"size": (1920, 1080)})) # Set camera resolution
# LCD screen setup parameters
lcd_columns = 16 # Number of columns in the LCD display
lcd_rows = 2 # Number of rows in the LCD display
# Pins setup for the LCD on Raspberry Pi
lcd_rs = digitalio.DigitalInOut(board.D25)
lcd_en = digitalio.DigitalInOut(board.D24)
lcd_d4 = digitalio.DigitalInOut(board.D23)
lcd_d5 = digitalio.DigitalInOut(board.D17)
lcd_d6 = digitalio.DigitalInOut(board.D18)
lcd_d7 = digitalio.DigitalInOut(board.D22)
# Initialize the LCD display
lcd = characterlcd.Character_LCD_Mono(
lcd_rs, lcd_en, lcd_d4, lcd_d5, lcd_d6, lcd_d7, lcd_columns, lcd_rows
)
# Variables to track the state of the button
prev_button_state = GPIO.LOW # Previous state from the input pin
button_state = None # Current reading from the input pin
press_time_start = 0 # Start time of a button press
press_time_end = 0 # End time of a button press
def capture_image(filename):
"""Captures an image from the connected camera and saves it as a file."""
# Start the camera
picam2.start()
# Allow some time for the camera to adjust settings
time.sleep(1) # Sleep for 1 second
# Capture the image
picam2.capture_file(filename)
# Stop the camera
picam2.stop()
def analyse_image(filename):
"""Processes an image file to generate a caption using a pre-trained model."""
# Load pre-trained models
llm_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
llm_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")
try:
# Open and process the image
with Image.open(os.path.join(base_dir, filename)).convert('RGB') as raw_image:
inputs = llm_processor(raw_image, return_tensors="pt")
outputs = llm_model.generate(**inputs)
caption = llm_processor.decode(outputs[0], skip_special_tokens=True)
return caption
except Exception as e:
print(f"An error occurred: {e}")
return "Error processing the image."
def play_sound(musicName):
"""Plays a sound file from the sounds dictionary."""
# Play the audio file
pygame.mixer.init()
pygame.mixer.music.load(sounds.get(musicName))
pygame.mixer.music.play()
while pygame.mixer.music.get_busy():
pygame.time.Clock().tick(10) # Wait for the audio to finish playing
def convert_text_to_speech(speech_text):
"""Converts text to speech and plays it back."""
tts = gTTS(text=speech_text, lang='en')
tts.save("speech.mp3")
# Play the audio file
pygame.mixer.init()
pygame.mixer.music.load("speech.mp3")
pygame.mixer.music.play()
while pygame.mixer.music.get_busy():
pygame.time.Clock().tick(10) # Wait for the audio to finish playing
# Delete the audio file
os.remove("speech.mp3")
def display_message(txt, sleep_time=0):
"""Displays a message on the LCD with a left scrolling effect."""
lcd.clear()
txt = txt.strip() + ' ' # Ensure there's a space after the text
n = lcd_columns
while True:
lcd.clear() # Clear the display to update the scrolling text
sleep(0.1)
lcd.message = txt[:lcd_columns] # Show the first part of the text on the LCD
sleep(0.2) # Delay to control the speed of the scroll
txt = txt[1:] + txt[0] # Rotate text to the left
if n >= len(txt):
break
n += 1
if sleep_time > 0:
sleep(sleep_time)
lcd.clear()
def save_user_interaction(current_time, caption, filename):
with open(os.path.join(base_dir, "data", "history.json"), "r") as file:
data = json.load(file)
# Append the new data to the existing list
data.append(dict(
createdAt = current_time.isoformat(),
caption = caption,
filename = filename
))
# Save the updated data to the file
json.dump(data, open(os.path.join(base_dir, "data", "history.json"), "w"))
def process_two_functions_with_threading(func1, args1, func2, args2):
"""Process two functions concurrently."""
thread1 = Thread(target=func1, args=args1)
thread2 = Thread(target=func2, args=args2)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
def main():
"""Main function to handle button press logic and process image."""
global prev_button_state, press_time_start, press_time_end
# Read the state of the switch/button
button_state = GPIO.input(BUTTON_PIN)
time.sleep(DEBOUNCE_TIME) # Sleep to debounce the button
# Detect button press
if prev_button_state == GPIO.HIGH and button_state == GPIO.LOW: # Button is pressed
press_time_start = time.time()
elif prev_button_state == GPIO.LOW and button_state == GPIO.HIGH: # Button is released
press_time_end = time.time()
press_duration = press_time_end - press_time_start
# Check if the duration of a button press is short
if press_duration < SHORT_PRESS_TIME:
# Record the current time when the button press was registered
current_time = datetime.now()
# Construct a filename for saving the photo with a timestamp
filename = os.path.join("data", f"photo_{current_time.strftime('%Y%m%d_%H%M%S')}.png")
# Capture an image using the constructed filename
capture_image(filename=filename)
# Simultaneously display a message on the LCD and play a sound
process_two_functions_with_threading(display_message, ("Smile for the camera!",), play_sound, ("camera",))
# Display a processing message and convert the displayed text to speech concurrently
process_two_functions_with_threading(convert_text_to_speech, ("Processing image...",), display_message, ("Processing image...",))
# Analyze the captured image and retrieve a caption
caption = analyse_image(filename=filename)
# Log this interaction for future reference or analysis
save_user_interaction(current_time, caption, filename)
# Display the image caption and play a sound indicating the end of the process
process_two_functions_with_threading(convert_text_to_speech, (caption,), display_message, (caption,))
# Clear any previous messages from the LCD
lcd.clear()
# Prepare the system for the next interaction by indicating readiness
process_two_functions_with_threading(display_message, ("Ready...",), play_sound, ("start",))
prev_button_state = button_state
if __name__ == "__main__":
try:
lcd.clear()
process_two_functions_with_threading(display_message, ("Ready...",), play_sound, ("start",))
while True:
main()
except KeyboardInterrupt:
GPIO.cleanup()
display_message("Exiting...", 5)