-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsimple_examples.py
More file actions
executable file
·120 lines (104 loc) · 3.66 KB
/
Copy pathsimple_examples.py
File metadata and controls
executable file
·120 lines (104 loc) · 3.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import logging
import time
import os
import pandas as pd
from ut61eplus import UT61EPLUS
# --- CONFIGURATION ---
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def example_single_measurement():
"""Example: connect, read a single measurement, and disconnect."""
print("\n--- Example 1: Reading a single measurement ---")
dmm = None
try:
dmm = UT61EPLUS()
measurement = dmm.take_measurement()
if measurement:
data = measurement.to_dict()
print("Data received:")
print(data)
else:
print("Failed to get measurement.")
except Exception as e:
log.error(f"An error occurred: {e}")
finally:
if dmm:
dmm.close()
def example_multiple_measurements(count=5):
"""Example: reading multiple measurements in a loop."""
print(f"\n--- Example 2: Reading {count} measurements ---")
dmm = None
try:
dmm = UT61EPLUS()
for i in range(count):
measurement = dmm.take_measurement()
if measurement:
print(f"Measurement {i+1}: {measurement.to_dict()}")
else:
print(f"Measurement {i+1}: Failed to get.")
time.sleep(0.5) # A small pause between measurements
except Exception as e:
log.error(f"An error occurred: {e}")
finally:
if dmm:
dmm.close()
def example_log_to_csv(duration_seconds=10):
"""Example: continuous logging of data to a CSV file for a specified duration."""
print(f"\n--- Example 3: Logging to CSV for {duration_seconds} seconds ---")
CSV_FILEPATH = "dmm_log.csv"
BUFFER_SIZE = 50 # Write to file every 50 data points
dmm = None
data_buffer = []
def write_buffer_to_csv():
nonlocal data_buffer
if not data_buffer:
return
df = pd.DataFrame(data_buffer)
df.to_csv(CSV_FILEPATH, mode='a', header=not os.path.exists(CSV_FILEPATH), index=False)
log.info(f"Wrote {len(data_buffer)} rows to {CSV_FILEPATH}")
data_buffer = []
try:
dmm = UT61EPLUS()
log.info(f"Starting data collection... Press Ctrl+C to stop.")
start_time = time.time()
while time.time() - start_time < duration_seconds:
try:
measurement = dmm.take_measurement()
if measurement:
data_buffer.append(measurement.to_dict())
if len(data_buffer) >= BUFFER_SIZE:
write_buffer_to_csv()
except KeyboardInterrupt:
break
except Exception as e:
log.error(f"An error occurred: {e}")
finally:
log.info("Shutting down, saving remaining buffer...")
write_buffer_to_csv()
if dmm:
dmm.close()
def example_send_command():
"""Example: sending a command to toggle the backlight."""
print("\n--- Example 4: Sending 'lamp' command ---")
dmm = None
try:
dmm = UT61EPLUS()
print("Turning on the backlight for 3 seconds...")
dmm.send_command('lamp')
time.sleep(3)
print("Turning off the backlight.")
dmm.send_command('lamp')
time.sleep(1) # Pause to ensure the command has been sent
except Exception as e:
log.error(f"An error occurred: {e}")
finally:
if dmm:
dmm.close()
if __name__ == '__main__':
example_single_measurement()
time.sleep(2)
example_multiple_measurements()
time.sleep(2)
example_log_to_csv(duration_seconds=5)
time.sleep(2)
example_send_command()