Skip to content

Better stream synchronization #6

@MichaelBarz

Description

@MichaelBarz

Currently, the streams are synchronized by ensuring real-time processing capabilities of each module/node. This needs to be improved to cope with nodes that are slower in processing than the sampling rate would require. The target is to offer two options:

  • Automatic slowdown.
    • Usecase: when using a dataset source, i.e., a source with a finite number of samples, we want all samples to be processed correctly. It doesn't matter, if the processing is slower than real-time.
    • Simple solution: Manually decrease the playback speed of each dataset source (Create a DatasetSource #22)
    • Advanced solution: The processing speed should be adapted with regard to the weakest link, i.e., the module/node that is the bottleneck in this pipeline. For instance, if a classification task in the middle of a pipeline with two source nodes allows 50% speed only, both sources should reduce their speed to 50%. If the system gets faster, e.g., because another process was ended, the processing speed should accelerate again. (related to Pipeline profiling  #7)
  • Automatic dropout setting.
    • Usecase: if a pipeline is used for real-time HCI, the system should immediately react to active user inputs/passive sensor data. It is more important to react fast than to process all available samples.
    • Each module (besides sources) have queues for buffering incoming data streams or events. Events that are older than a certain threshold should be ignored, i.e., these samples should be dropped out. The time threshold should be an optional parameter. If no threshold is given, it should be estimated automatically, e.g., as 2*1./sampling_rate. Implement in BaseSink
    • Measure the sampling rate of the incoming signals (for each topic separately)
    • Measure the outgoing processing rate (processing speed)
    • -> set max queue size to n times the sampling rate (e.g., n=.5 [s])... if queue is full, remove with queue.get() and insert the newer one. n is the maximum delay of this node.

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type
No fields configured for issues without a type.

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions