Azure IoT Edge module for ingesting and processing oceanographic sensor data (CTD, GNSS, ADCP) from USV and ship-based platforms. Outputs GeoParquet files with per-record metadata JSON. Runs on edge devices (Jetson Orin, x86 Ubuntu) and locally in standalone mode.
Sensor data (TCP/UDP stream, file drop, IoT Edge message)
→ ingest/ (parse NMEA, CSV, .hex, .cnv, .raw)
→ process/pipeline.py (DataFrame → enrich → validate)
→ exports/ (GeoParquet + metadata JSON → storage, telemetry → IoT Hub)
| Format | Extension | Parser | Notes |
|---|---|---|---|
| NMEA 0183 | .txt |
pynmea2 | GGA, RMC, VTG, ZDA; timestamped or raw |
| CSV | .csv |
pandas | Auto-detect columns; EMSO, R2R, generic |
| Sea-Bird CNV | .cnv |
pandas | Processed CTD with header metadata |
| Sea-Bird HEX | .hex + .hdr + .XMLCON |
seabirdscientific + gsw | Raw CTD frequency → T/C/P/S/depth |
| RDI ADCP | .raw |
dolfyn | Beam→earth transform, ensemble averaging, u/v/w velocities |
| Nortek AD2CP | .ad2cp |
oceanstream | Echosounder Sv (volume backscatter) and/or velocity data |
| tar.gz | .tar.gz, .tgz |
tarfile | Extracts and processes contained files |
python3.12 -m venv .venv
.venv/bin/pip install -r requirements.txt
# Process files from a directory
.venv/bin/python standalone.py \
--input-dir ./test_data \
--output-dir ./output \
--campaign-id my_cruise
# Watch a directory for new files
.venv/bin/python standalone.py \
--watch /tmp/sensorstream-watch \
--output-dir ./output
# Simulate a CTD hex stream over TCP
.venv/bin/python standalone.py \
--simulate-ctd ./test_data/ctd/hex/11901.hex \
--output-dir ./output
# Process ADCP raw binary files
.venv/bin/python standalone.py \
--input-dir ./test_data/adcp \
--output-dir ./output \
--campaign-id adcp_testDeploy via the Streambase CLI:
streambase module build -e <env> -m iotedge-sensorstream -o Linux -a amd64 -t latest
streambase device apply -e <env> -d <device-id>Configuration is driven by IoT Hub module twin desired properties. All EdgeConfig fields can be set via the twin and are reported back as reported properties.
| Module | Purpose |
|---|---|
azure_handler/ |
IoT Hub client, message sending, twin sync, storage abstraction |
ingest/ |
TCP/UDP stream listener, file watcher, IoT Edge triggers, hex/NMEA/CSV/ADCP parsing |
process/ |
Processing pipeline: file → DataFrame → GeoParquet + metadata JSON |
exports/ |
D2C telemetry, metadata JSON generation, telemetry throttle/downsampling |
simulate/ |
Built-in simulators: file dropper, NMEA stream replayer, CTD hex stream |
| File | Purpose |
|---|---|
main.py |
IoT Edge module — runs under aziot-edge runtime |
standalone.py |
CLI — local processing, file watching, stream listening, simulators |
simulate/__main__.py |
python -m simulate — run simulators directly |
All config flows through the EdgeConfig dataclass in config.py. In IoT Edge mode, every field is readable/writable via module twin desired properties (changes apply live). In standalone mode, fields come from environment variables and CLI args.
| Field | Type | Default | Description |
|---|---|---|---|
input_mode |
stream | file | both |
both |
Which ingest sources to activate |
| Field | Type | Default | Description |
|---|---|---|---|
stream_protocol |
tcp | udp | auto |
auto |
Network protocol for stream listener |
stream_host |
string | 0.0.0.0 |
Bind address (server mode) or remote host (client mode) |
stream_port |
int | 9100 |
Listen port (server) or connect port (client) |
stream_format |
nmea | csv | hex | auto |
auto |
Expected data format on the stream |
stream_connect_mode |
server | client |
server |
TCP server (listen) or client (connect to remote) |
| Field | Type | Default | Description |
|---|---|---|---|
watch_dir |
string | /data/sensor |
Directory to watch for new files |
watch_patterns |
string | *.csv,*.txt,*.hex,*.cnv,*.raw,*.ad2cp,*.tar.gz |
Comma-separated glob patterns |
watch_polling |
bool | false |
Use polling instead of inotify (required for SMB/NFS mounts) |
watch_poll_interval |
int | 2 |
Seconds between polls when watch_polling is true |
backfill_minutes |
int | 0 |
On startup, queue files modified within the last N minutes. 0 = skip all existing files (only process new arrivals). Set to e.g. 60 to reprocess the last hour of data after a restart |
| Field | Type | Default | Description |
|---|---|---|---|
batch_interval_seconds |
int | 60 |
Stream batch flush interval (seconds) |
batch_max_records |
int | 1000 |
Stream batch flush when this many records buffered |
| Field | Type | Default | Description |
|---|---|---|---|
campaign_id |
string | "" |
Campaign identifier — used as blob container name and output path prefix |
platform_id |
string | "" |
Platform/vessel identifier |
platform_name |
string | "" |
Human-readable platform name |
provider |
string | auto |
Oceanstream data provider for enrichment (auto, generic, or a named provider) |
| Field | Type | Default | Description |
|---|---|---|---|
telemetry_interval_seconds |
int | 300 |
Periodic telemetry summary interval |
telemetry_send_records |
bool | true |
Send individual sensor records to IoT Hub |
telemetry_send_summaries |
bool | true |
Send periodic summary messages to IoT Hub |
telemetry_downsample_seconds |
int | 30 |
Minimum interval between D2C telemetry messages |
| Field | Type | Default | Description |
|---|---|---|---|
ctd_enabled |
bool | false |
Enable polling a CTD file for latest readings |
ctd_file_path |
string | /mnt/ctd/latest_ctd.csv |
Path to the CTD CSV file (updated in place by logger) |
ctd_poll_interval_seconds |
int | 30 |
How often to read the CTD file |
ctd_observatory |
string | munkholmen |
Observatory name for CTD provider enrichment |
| Field | Type | Default | Description |
|---|---|---|---|
storage_backend |
azure-blob-edge | local |
azure-blob-edge |
Output storage backend |
output_base_path |
string | /app/processed |
Root path for local storage backend |
processed_container |
string | sensordata |
Subfolder within the campaign container for processed output |
| Field | Type | Default | Description |
|---|---|---|---|
log_level |
string | INFO |
Logging level (DEBUG, INFO, WARNING, ERROR) |
By default (backfill_minutes: 0), the module does not process existing files when it starts. It only processes files that arrive after startup via the file watcher, stream listener, or IoT Edge messages. This prevents reprocessing the entire dataset on every container restart.
To backfill recent data after a restart, set backfill_minutes to the desired window (e.g. 60 for the last hour). Only files whose modification time falls within that window are queued. This is useful when the module was down briefly and you want to catch up on missed files.
See config.py for implementation details.
There are three ingestion paths, depending on how sensors deliver data.
Most oceanographic instruments output serial data. A serial-to-TCP converter (e.g. Moxa NPort, Digi Connect, or ser2net on Linux) bridges the serial port to a TCP socket.
Server mode — module listens, sensor/converter connects to it:
Sensor → Serial → ser2net/Moxa → TCP connect to :9100 → sensorstream
Twin config:
{"stream_port": 9100, "stream_connect_mode": "server", "stream_format": "auto"}Client mode — module connects to an existing TCP server:
Sensor → Serial → ser2net :4001 ← TCP connect ← sensorstream
Twin config:
{"stream_host": "192.168.0.50", "stream_port": 4001, "stream_connect_mode": "client", "stream_format": "nmea"}UDP also works — common for NMEA multiplexers that broadcast on a UDP port.
| Sensor | Protocol | Format | Notes |
|---|---|---|---|
| Ship GPS (GGA/RMC) | TCP/UDP | nmea |
NMEA 0183 via serial gateway |
| Sea-Bird SBE 11plus | TCP | hex |
Deck unit serial output, hex scan lines |
| Generic CTD | TCP | csv |
Comma-separated T/C/P values |
Sensors or acquisition software write files to a shared volume. The module watches that directory.
EK80/SBE software → writes .cnv/.hex to /data/sensor/ → file watcher → sensorstream
Twin config:
{"input_mode": "file", "watch_dir": "/data/sensor", "watch_patterns": "*.csv,*.txt,*.hex,*.cnv"}On edge devices, bind-mount the data volume into the container:
"createOptions": {"HostConfig": {"Binds": ["/data/sensor:/data/sensor:ro"]}}The filenotifier module watches raw data directories and sends messages when new files appear:
"routes": {
"sensorNotifyToStream": "FROM /messages/modules/filenotifier/outputs/sensorfileadded INTO BrokeredEndpoint(\"/modules/iotedge-sensorstream/inputs/sensorfileadded\")"
}Set input_mode: "both" to run file watcher and stream listener simultaneously — e.g. GNSS over TCP stream + CTD .hex files dropped to disk.
Use the built-in simulators:
# Replay NMEA over TCP
.venv/bin/python standalone.py --simulate-stream ./test_data/gnss/track.txt --output-dir ./output
# Replay CTD hex scans over TCP (emulates SBE 11plus)
.venv/bin/python standalone.py --simulate-ctd ./test_data/ctd/hex/11901.hex --output-dir ./output
# Drop files into a watch directory
.venv/bin/python standalone.py --simulate-files ./test_data --watch /tmp/watch --output-dir ./output
# Process ADCP raw binary directly
.venv/bin/python standalone.py --input-dir ./test_data/adcp --output-dir ./output --campaign-id adcp_test# Run all tests
.venv/bin/python -m pytest test/ -v --tb=short
# Run Azure E2E test (standalone script)
.venv/bin/python test/test_azure_e2e.pyTest data is stored in Azure Blob Storage (sensorstream-test container) and downloaded automatically on first run. Set AZURE_CONNECTION_STRING in .env or environment. Tests fall back to local test_data/ if no connection is available.
118 tests across 9 files: config, stream parsing, pipeline, file watcher, simulators, hex parsing, ADCP parsing, CTD stream, and telemetry throttling.
Input: Receives sensorfileadded messages from the filenotifier module:
{"event": "fileadd", "path": "/data/raw/ctd/cast.cnv", "size": 17500}Output: D2C telemetry to IoT Hub (rate-limited by telemetry_downsample_seconds), GeoParquet + metadata JSON to blob storage.
Twin: All config fields are readable/writable via module twin desired properties. Changes are applied live without restart. Twin property names map 1:1 to config field names, except Log_Level → log_level.
Backfill on restart: By default the module skips existing files. Set backfill_minutes in the twin to process recent files after a restart (e.g. 60 for the last hour).
| Variable | Default | Description |
|---|---|---|
AZURE_STORAGE_CONNECTION_STRING |
— | Azure Blob Storage connection string (edge blob or cloud) |
STORAGE_BACKEND |
azure-blob-edge |
local for standalone |
OUTPUT_BASE_PATH |
/app/processed |
Output directory |
PROCESSED_CONTAINER_NAME |
sensordata |
Subfolder name within campaign container |
WATCH_DIR |
/data/sensor |
File watch directory |
STREAM_HOST |
0.0.0.0 |
Stream listener bind address |
STREAM_PORT |
9100 |
Stream listener port |
CAMPAIGN_ID |
— | Campaign identifier for output partitioning |
PLATFORM_ID |
— | Platform/vessel identifier |
LOG_LEVEL |
INFO |
Logging level |
Two Dockerfiles for IoT Edge deployment:
Dockerfile.Linux.amd64— x86_64Dockerfile.Linux.arm64— ARM64 (Jetson Orin)
See LICENSE.