Pipeline Overview
The psiaudio.pipeline module provides a framework for building real-time
and offline data processing pipelines. It is built around a push-based
architecture using Python coroutines and specialized data structures that
preserve metadata across processing steps.
Architecture
The pipeline follows a “Research -> Strategy -> Execution” philosophy, where
individual processing components are implemented as coroutines decorated with
@coroutine. Data is passed through the pipeline using the .send()
method.
Key Components
Coroutines: Functions decorated with
@coroutinethat automatically advance to the firstyieldstatement. They typically receive data from an upstream source and push results to one or more downstream targets.Broadcasting: The
broadcastcoroutine allows a single data stream to be sent to multiple processing branches.Transformation: The
transformcoroutine applies a simple function to data before passing it along.
Data Structures
PipelineData
The PipelineData class is a subclass of numpy.ndarray designed to carry
essential metadata along with the raw signal data. It supports:
Sampling Rate (fs): Persisted through slicing and many transformations.
Timing (s0, t0): Tracks the starting sample and time relative to the beginning of the stream.
Channels: Maintains channel labels for multichannel data.
Metadata: Dictionary or list of dictionaries (for epoched data) containing arbitrary key-value pairs (e.g., trial parameters).
Events
The Events class manages time-stamped events occurring within a data
stream. It provides utilities for sub-selecting events by time or sample
range and calculating event rates.
Common Processing Steps
Continuous Data
Filtering:
iirfilterprovides real-time IIR filtering with state preservation to avoid transients between data chunks.Resampling:
downsampleanddecimatefor changing sampling rates.RMS Calculation:
rmsandrms_bandfor power estimation over sliding windows.
Epoching
The extract_epochs coroutine is a powerful tool for segmenting continuous
data into discrete trials (epochs) based on trigger times. It includes a
look-back buffer to capture the start of an epoch even if the trigger signal
arrives slightly after the data.
Multichannel Support
mc_select: Select individual channels or subsets of channels.
mc_reference: Apply linear referencing matrices (e.g., Common Average Reference).
Example Usage
from psiaudio import pipeline
@pipeline.coroutine
def printer(data):
while True:
d = (yield)
print(f"Received data with shape {d.shape}")
# Build a simple pipeline: filter -> print
target = printer()
p = pipeline.iirfilter(fs=1000, N=2, Wn=50, rp=0.1, rs=40,
btype='lowpass', ftype='cheby1', target=target)
# Push data into the pipeline
import numpy as np
data = pipeline.PipelineData(np.random.randn(100), fs=1000)
p.send(data)
API Reference
For detailed information on specific functions and classes, see the psiaudio.pipeline module.