You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Currently, the streams are synchronized by ensuring real-time processing capabilities of each module/node. This needs to be improved to cope with nodes that are slower in processing than the sampling rate would require. The target is to offer two options:
Automatic slowdown.
Usecase: when using a dataset source, i.e., a source with a finite number of samples, we want all samples to be processed correctly. It doesn't matter, if the processing is slower than real-time.
Simple solution: Manually decrease the playback speed of each dataset source (Create a DatasetSource #22)
Advanced solution: The processing speed should be adapted with regard to the weakest link, i.e., the module/node that is the bottleneck in this pipeline. For instance, if a classification task in the middle of a pipeline with two source nodes allows 50% speed only, both sources should reduce their speed to 50%. If the system gets faster, e.g., because another process was ended, the processing speed should accelerate again. (related to Pipeline profiling #7)
Automatic dropout setting.
Usecase: if a pipeline is used for real-time HCI, the system should immediately react to active user inputs/passive sensor data. It is more important to react fast than to process all available samples.
Each module (besides sources) have queues for buffering incoming data streams or events. Events that are older than a certain threshold should be ignored, i.e., these samples should be dropped out. The time threshold should be an optional parameter. If no threshold is given, it should be estimated automatically, e.g., as 2*1./sampling_rate. Implement in BaseSink
Measure the sampling rate of the incoming signals (for each topic separately)
Measure the outgoing processing rate (processing speed)
-> set max queue size to n times the sampling rate (e.g., n=.5 [s])... if queue is full, remove with queue.get() and insert the newer one. n is the maximum delay of this node.
Currently, the streams are synchronized by ensuring real-time processing capabilities of each module/node. This needs to be improved to cope with nodes that are slower in processing than the sampling rate would require. The target is to offer two options:
2*1./sampling_rate. Implement in BaseSink