Skip to content

OceanStreamIO/iotedge-sensorstream

Repository files navigation

iotedge-sensorstream

Azure IoT Edge module for ingesting and processing oceanographic sensor data (CTD, GNSS, ADCP) from USV and ship-based platforms. Outputs GeoParquet files with per-record metadata JSON. Runs on edge devices (Jetson Orin, x86 Ubuntu) and locally in standalone mode.

Data Flow

Sensor data (TCP/UDP stream, file drop, IoT Edge message)
  → ingest/ (parse NMEA, CSV, .hex, .cnv, .raw)
  → process/pipeline.py (DataFrame → enrich → validate)
  → exports/ (GeoParquet + metadata JSON → storage, telemetry → IoT Hub)

Supported Formats

Format Extension Parser Notes
NMEA 0183 .txt pynmea2 GGA, RMC, VTG, ZDA; timestamped or raw
CSV .csv pandas Auto-detect columns; EMSO, R2R, generic
Sea-Bird CNV .cnv pandas Processed CTD with header metadata
Sea-Bird HEX .hex + .hdr + .XMLCON seabirdscientific + gsw Raw CTD frequency → T/C/P/S/depth
RDI ADCP .raw dolfyn Beam→earth transform, ensemble averaging, u/v/w velocities
Nortek AD2CP .ad2cp oceanstream Echosounder Sv (volume backscatter) and/or velocity data
tar.gz .tar.gz, .tgz tarfile Extracts and processes contained files

Quick Start

Standalone (local processing)

python3.12 -m venv .venv
.venv/bin/pip install -r requirements.txt

# Process files from a directory
.venv/bin/python standalone.py \
    --input-dir ./test_data \
    --output-dir ./output \
    --campaign-id my_cruise

# Watch a directory for new files
.venv/bin/python standalone.py \
    --watch /tmp/sensorstream-watch \
    --output-dir ./output

# Simulate a CTD hex stream over TCP
.venv/bin/python standalone.py \
    --simulate-ctd ./test_data/ctd/hex/11901.hex \
    --output-dir ./output

# Process ADCP raw binary files
.venv/bin/python standalone.py \
    --input-dir ./test_data/adcp \
    --output-dir ./output \
    --campaign-id adcp_test

IoT Edge

Deploy via the Streambase CLI:

streambase module build -e <env> -m iotedge-sensorstream -o Linux -a amd64 -t latest
streambase device apply -e <env> -d <device-id>

Configuration is driven by IoT Hub module twin desired properties. All EdgeConfig fields can be set via the twin and are reported back as reported properties.

Architecture

Module Purpose
azure_handler/ IoT Hub client, message sending, twin sync, storage abstraction
ingest/ TCP/UDP stream listener, file watcher, IoT Edge triggers, hex/NMEA/CSV/ADCP parsing
process/ Processing pipeline: file → DataFrame → GeoParquet + metadata JSON
exports/ D2C telemetry, metadata JSON generation, telemetry throttle/downsampling
simulate/ Built-in simulators: file dropper, NMEA stream replayer, CTD hex stream

Entry Points

File Purpose
main.py IoT Edge module — runs under aziot-edge runtime
standalone.py CLI — local processing, file watching, stream listening, simulators
simulate/__main__.py python -m simulate — run simulators directly

Configuration

All config flows through the EdgeConfig dataclass in config.py. In IoT Edge mode, every field is readable/writable via module twin desired properties (changes apply live). In standalone mode, fields come from environment variables and CLI args.

Input Mode

Field Type Default Description
input_mode stream | file | both both Which ingest sources to activate

Network Stream

Field Type Default Description
stream_protocol tcp | udp | auto auto Network protocol for stream listener
stream_host string 0.0.0.0 Bind address (server mode) or remote host (client mode)
stream_port int 9100 Listen port (server) or connect port (client)
stream_format nmea | csv | hex | auto auto Expected data format on the stream
stream_connect_mode server | client server TCP server (listen) or client (connect to remote)

File Watcher

Field Type Default Description
watch_dir string /data/sensor Directory to watch for new files
watch_patterns string *.csv,*.txt,*.hex,*.cnv,*.raw,*.ad2cp,*.tar.gz Comma-separated glob patterns
watch_polling bool false Use polling instead of inotify (required for SMB/NFS mounts)
watch_poll_interval int 2 Seconds between polls when watch_polling is true
backfill_minutes int 0 On startup, queue files modified within the last N minutes. 0 = skip all existing files (only process new arrivals). Set to e.g. 60 to reprocess the last hour of data after a restart

Batching

Field Type Default Description
batch_interval_seconds int 60 Stream batch flush interval (seconds)
batch_max_records int 1000 Stream batch flush when this many records buffered

Metadata

Field Type Default Description
campaign_id string "" Campaign identifier — used as blob container name and output path prefix
platform_id string "" Platform/vessel identifier
platform_name string "" Human-readable platform name
provider string auto Oceanstream data provider for enrichment (auto, generic, or a named provider)

Telemetry

Field Type Default Description
telemetry_interval_seconds int 300 Periodic telemetry summary interval
telemetry_send_records bool true Send individual sensor records to IoT Hub
telemetry_send_summaries bool true Send periodic summary messages to IoT Hub
telemetry_downsample_seconds int 30 Minimum interval between D2C telemetry messages

CTD File Monitor

Field Type Default Description
ctd_enabled bool false Enable polling a CTD file for latest readings
ctd_file_path string /mnt/ctd/latest_ctd.csv Path to the CTD CSV file (updated in place by logger)
ctd_poll_interval_seconds int 30 How often to read the CTD file
ctd_observatory string munkholmen Observatory name for CTD provider enrichment

Storage

Field Type Default Description
storage_backend azure-blob-edge | local azure-blob-edge Output storage backend
output_base_path string /app/processed Root path for local storage backend
processed_container string sensordata Subfolder within the campaign container for processed output

Logging

Field Type Default Description
log_level string INFO Logging level (DEBUG, INFO, WARNING, ERROR)

Startup Behaviour

By default (backfill_minutes: 0), the module does not process existing files when it starts. It only processes files that arrive after startup via the file watcher, stream listener, or IoT Edge messages. This prevents reprocessing the entire dataset on every container restart.

To backfill recent data after a restart, set backfill_minutes to the desired window (e.g. 60 for the last hour). Only files whose modification time falls within that window are queued. This is useful when the module was down briefly and you want to catch up on missed files.

See config.py for implementation details.

Connecting Sensors

There are three ingestion paths, depending on how sensors deliver data.

Network Stream (live sensors)

Most oceanographic instruments output serial data. A serial-to-TCP converter (e.g. Moxa NPort, Digi Connect, or ser2net on Linux) bridges the serial port to a TCP socket.

Server mode — module listens, sensor/converter connects to it:

Sensor → Serial → ser2net/Moxa → TCP connect to :9100 → sensorstream

Twin config:

{"stream_port": 9100, "stream_connect_mode": "server", "stream_format": "auto"}

Client mode — module connects to an existing TCP server:

Sensor → Serial → ser2net :4001 ← TCP connect ← sensorstream

Twin config:

{"stream_host": "192.168.0.50", "stream_port": 4001, "stream_connect_mode": "client", "stream_format": "nmea"}

UDP also works — common for NMEA multiplexers that broadcast on a UDP port.

Sensor Protocol Format Notes
Ship GPS (GGA/RMC) TCP/UDP nmea NMEA 0183 via serial gateway
Sea-Bird SBE 11plus TCP hex Deck unit serial output, hex scan lines
Generic CTD TCP csv Comma-separated T/C/P values

File Drop (batch or near-real-time)

Sensors or acquisition software write files to a shared volume. The module watches that directory.

EK80/SBE software → writes .cnv/.hex to /data/sensor/ → file watcher → sensorstream

Twin config:

{"input_mode": "file", "watch_dir": "/data/sensor", "watch_patterns": "*.csv,*.txt,*.hex,*.cnv"}

On edge devices, bind-mount the data volume into the container:

"createOptions": {"HostConfig": {"Binds": ["/data/sensor:/data/sensor:ro"]}}

IoT Edge Message Route (from filenotifier)

The filenotifier module watches raw data directories and sends messages when new files appear:

"routes": {
  "sensorNotifyToStream": "FROM /messages/modules/filenotifier/outputs/sensorfileadded INTO BrokeredEndpoint(\"/modules/iotedge-sensorstream/inputs/sensorfileadded\")"
}

Combining Modes

Set input_mode: "both" to run file watcher and stream listener simultaneously — e.g. GNSS over TCP stream + CTD .hex files dropped to disk.

Testing Without Hardware

Use the built-in simulators:

# Replay NMEA over TCP
.venv/bin/python standalone.py --simulate-stream ./test_data/gnss/track.txt --output-dir ./output

# Replay CTD hex scans over TCP (emulates SBE 11plus)
.venv/bin/python standalone.py --simulate-ctd ./test_data/ctd/hex/11901.hex --output-dir ./output

# Drop files into a watch directory
.venv/bin/python standalone.py --simulate-files ./test_data --watch /tmp/watch --output-dir ./output

# Process ADCP raw binary directly
.venv/bin/python standalone.py --input-dir ./test_data/adcp --output-dir ./output --campaign-id adcp_test

Testing

# Run all tests
.venv/bin/python -m pytest test/ -v --tb=short

# Run Azure E2E test (standalone script)
.venv/bin/python test/test_azure_e2e.py

Test data is stored in Azure Blob Storage (sensorstream-test container) and downloaded automatically on first run. Set AZURE_CONNECTION_STRING in .env or environment. Tests fall back to local test_data/ if no connection is available.

118 tests across 9 files: config, stream parsing, pipeline, file watcher, simulators, hex parsing, ADCP parsing, CTD stream, and telemetry throttling.

IoT Edge Integration

Input: Receives sensorfileadded messages from the filenotifier module:

{"event": "fileadd", "path": "/data/raw/ctd/cast.cnv", "size": 17500}

Output: D2C telemetry to IoT Hub (rate-limited by telemetry_downsample_seconds), GeoParquet + metadata JSON to blob storage.

Twin: All config fields are readable/writable via module twin desired properties. Changes are applied live without restart. Twin property names map 1:1 to config field names, except Log_Levellog_level.

Backfill on restart: By default the module skips existing files. Set backfill_minutes in the twin to process recent files after a restart (e.g. 60 for the last hour).

Environment Variables

Variable Default Description
AZURE_STORAGE_CONNECTION_STRING Azure Blob Storage connection string (edge blob or cloud)
STORAGE_BACKEND azure-blob-edge local for standalone
OUTPUT_BASE_PATH /app/processed Output directory
PROCESSED_CONTAINER_NAME sensordata Subfolder name within campaign container
WATCH_DIR /data/sensor File watch directory
STREAM_HOST 0.0.0.0 Stream listener bind address
STREAM_PORT 9100 Stream listener port
CAMPAIGN_ID Campaign identifier for output partitioning
PLATFORM_ID Platform/vessel identifier
LOG_LEVEL INFO Logging level

Docker

Two Dockerfiles for IoT Edge deployment:

  • Dockerfile.Linux.amd64 — x86_64
  • Dockerfile.Linux.arm64 — ARM64 (Jetson Orin)

License

See LICENSE.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages