psiaudio.pipeline module
- class Events(events, start, end, fs)
Bases:
objectCollection of events occurring over a given time span.
Maintains a DataFrame of events with timestamps and provides methods for sub-selecting events based on time or sample ranges.
- events
DataFrame with columns ‘event’, ‘sample’, and ‘ts’.
- Type:
pd.DataFrame
- start
Start sample of the collection range.
- Type:
int
- end
End sample of the collection range.
- Type:
int
- fs
Sampling rate.
- Type:
float
- t_start
Start time in seconds.
- Type:
float
- t_end
End time in seconds.
- Type:
float
- get_latest(lb, ub=0)
Return events relative to the end of the collection range (in seconds).
- Parameters:
lb (float) – Offset from end (seconds).
ub (float, optional) – Offset from end (seconds).
- get_latest_samples(lb, ub=0)
Return events relative to the end of the collection range.
- Parameters:
lb (int) – Offset from end for lower bound.
ub (int, optional) – Offset from end for upper bound.
- get_range(lb, ub)
Return a new Events instance for the specified time range (in seconds).
- Parameters:
lb (float) – Lower bound (seconds).
ub (float) – Upper bound (seconds).
- get_range_samples(start, end)
Return a new Events instance for the specified sample range.
- Parameters:
start (int) – Starting sample.
end (int) – Ending sample.
- property range_samples
Number of samples in the range.
- rate()
Calculate event rate (events per second).
- property t0
Starting time of the range.
- class PipelineData(arr, fs, s0=0, channel=None, metadata=None)
Bases:
ndarrayA specialized ndarray subclass for audio and physiological data.
PipelineData maintains metadata such as sampling rate, starting sample, channel labels, and epoch-specific metadata across common NumPy operations.
- fs
Sampling rate in Hz.
- Type:
float
- s0
Starting sample number of this segment.
- Type:
int
- t0
Starting time in seconds.
- Type:
float
- s_end
Ending sample number (exclusive).
- Type:
int
- t_end
Ending time in seconds (exclusive).
- Type:
float
- channel
Channel labels or identifiers.
- Type:
list or None
- metadata
Metadata associated with the data. If 3D (epochs), metadata is a list of dictionaries, one per epoch.
- Type:
dict or list of dicts
- add_metadata(key, value)
Add a key-value pair to metadata.
- Parameters:
key (str) – Metadata key.
value (object) – Metadata value.
- property duration
Duration of segment in seconds.
- mean(axis=None, *args, **kwargs)
Calculate mean and update metadata.
- Parameters:
axis ({None, int, str}) – Axis along which to compute the mean. Can be ‘time’, ‘channel’, or ‘epoch’.
- property n_channels
Number of channels.
- property n_epochs
Number of epochs (None if 1D or 2D).
- property n_time
Number of time samples.
- property t
Array of timestamps for each sample.
- accumulate(n, axis, newaxis, status_cb, target)
Accumulate data segments and concatenate once N segments are received.
- Parameters:
n (int) – Number of segments to accumulate.
axis (int) – Axis to concatenate along.
newaxis (bool) – If True, add a new axis before concatenation.
status_cb (callable, optional) – Called with the current count of accumulated segments.
target (callable) – Recipient of concatenated data.
- auto_scale(scale, baseline, target)
Automatically scale data based on a baseline period.
- Parameters:
scale (float) – Target maximum value.
baseline (float) – Duration in seconds for baseline calculation.
target (callable)
- auto_th(n, baseline, target, fs='auto', mode='positive', auto_th_cb=None, current_th_cb=None)
Set threshold based on standard deviation of baseline.
- Parameters:
n (float) – Standard deviations for threshold.
baseline (float) – Duration of baseline in seconds.
target (callable) – Recipient of boolean thresholded data.
fs (float or 'auto', optional)
mode ({'positive', 'negative', 'both'})
auto_th_cb (callable, optional) – Callback for calculated threshold.
current_th_cb (callable, optional) – Callback to override calculated threshold.
- average(n, target)
Running average of incoming segments.
- Parameters:
n (int) – Number of segments to average.
target (callable)
- blocked(block_size, target)
Emit data in fixed-size blocks.
- Parameters:
block_size (int) – Number of samples per block.
target (callable) – Recipient of blocked data.
- broadcast(*targets)
Send incoming data to multiple targets.
- Parameters:
*targets (callables) – Targets to receive data.
- capture(fs, queue, target)
Capture segments of data on command.
- Parameters:
fs (float) – Sampling rate.
queue (deque) – Queue of capture commands (t0).
target (callable) – Recipient of captured segments.
- capture_epoch(epoch_s0, epoch_samples, info, target, fs=None, auto_send=False)
Coroutine to facilitate capture of a single epoch.
- Parameters:
epoch_s0 (int) – Starting sample of epoch.
epoch_samples (int) – Number of samples to capture.
info (dict) – Metadata to be attached to the epoch.
target (callable) – Recipient of the captured epoch (PipelineData).
fs (float, optional) – Sampling rate.
auto_send (bool, optional) – If True, send samples as they are acquired (partial epochs).
- combine_events(events)
Combine multiple Events collections into one.
- Parameters:
events (list of Events) – Collections must be contiguous in time.
- concat(arrays, axis=-1)
Concatenate PipelineData objects while preserving metadata.
- Parameters:
arrays (list of PipelineData) – List of data segments to concatenate.
axis ({int, str}) – Axis along which to concatenate.
- Returns:
concatenated
- Return type:
- continuous_auto_scale(scale, target)
Update scaling factor continuously based on global max seen so far.
- coroutine(func)
Decorator to auto-start a coroutine.
- decimate(q, target)
Decimate data by factor q (low-pass filtering followed by downsampling).
- Parameters:
q (int) – Decimation factor.
target (callable) – Recipient of decimated data.
- delay(n, target)
Introduce a delay of n samples by emitting NaNs first.
- Parameters:
n (int) – Delay in samples.
target (callable) – Recipient of delayed data.
- derivative(initial_state, target)
Time derivative of input signal.
- Parameters:
initial_state (float) – Initial value for calculation.
target (callable)
- detrend(mode, target)
Detrend epoched data.
- Parameters:
mode ({None, 'constant', 'linear'}) – Detrending mode.
- dim_axis(axis)
Map semantic axis names to integer indices.
- Parameters:
axis ({int, str}) – Axis identifier (‘time’, ‘channel’, ‘epoch’ or -1, -2, -3).
- Returns:
dim (str) – Semantic name (‘time’, ‘channel’, ‘epoch’).
axis (int) – Integer index.
- discard(discard_samples, cb)
Discard a fixed number of samples from the start of the stream.
- Parameters:
discard_samples (int) – Number of samples to discard.
cb (callable) – Target for samples after discarding.
- downsample(q, target)
Downsample data by integer factor q (decimation without filtering).
- Parameters:
q (int) – Downsampling factor.
target (callable) – Recipient of downsampled data.
- edges(min_samples, target, initial_state=False, fs='auto', detect='both', min_events=0)
Detect rising and falling edges in a boolean signal.
- Parameters:
min_samples (int) – Minimum samples for debouncing.
target (callable) – Recipient of Events instance.
initial_state (bool, optional) – Initial state of the signal.
fs (float or 'auto', optional) – Sampling rate.
detect ({'both', 'rising', 'falling'}) – Type of edges to detect.
min_events (int, optional) – Minimum number of events to accumulate before emitting.
- ensure_dim(arrays, dim)
Ensure that arrays have the required dimensionality for a given operation.
- Parameters:
arrays (list of np.ndarray) – Arrays to check and reshape.
dim ({'channel', 'epoch'}) – Target dimension type.
- Returns:
reshaped_arrays
- Return type:
list of np.ndarray
- event_rate(block_size, block_step, target, s0_mode='center')
Calculate sliding-window event rate.
- Parameters:
block_size (int) – Window size in samples.
block_step (int) – Step size in samples.
target (callable)
s0_mode ({'center', 'left', 'right'})
- events_to_info(trigger_edge, base_info, target)
Convert events to epoch info dictionaries.
- Parameters:
trigger_edge (str) – Event name that triggers epoching.
base_info (dict) – Base metadata for the epoch.
- extract_epochs(fs, queue, epoch_size, target, buffer_size=0, empty_queue_cb=None, removed_queue=None, prestim_time=0, poststim_time=0, source_complete=None)
Extract epochs from an incoming stream of data based on trigger times.
- Parameters:
fs (float) – Sampling rate.
queue (deque) – Queue of epoch requests (dictionaries with ‘t0’ and optional ‘metadata’).
epoch_size (float or None) – Size of epoch in seconds. If None, requests must have ‘duration’.
target (callable) – Recipient of extracted epochs (as PipelineData).
buffer_size (float) – Look-back buffer size in seconds.
empty_queue_cb (callable, optional) – Called when queue is empty and all epochs are captured.
removed_queue (deque, optional) – Queue of requests to cancel.
prestim_time (float) – Time before t0 to include in epoch.
poststim_time (float) – Time after epoch_size to include.
source_complete (Event, optional) – If set, signals that no more stimuli will be added to the queue.
- extract_power(frequency, window, target)
Quadrature demodulation to extract power at a specific frequency.
- Parameters:
frequency (float) – Target frequency.
window (str) – Scipy window name.
target (callable)
- iirfilter(fs, N, Wn, rp, rs, btype, ftype, target)
Apply an IIR filter to a continuous stream of data.
Maintains filter state between chunks to avoid transients.
- Parameters:
fs (float) – Sampling rate.
N (objects) – Arguments passed to scipy.signal.iirfilter.
Wn (objects) – Arguments passed to scipy.signal.iirfilter.
rp (objects) – Arguments passed to scipy.signal.iirfilter.
rs (objects) – Arguments passed to scipy.signal.iirfilter.
btype (objects) – Arguments passed to scipy.signal.iirfilter.
ftype (objects) – Arguments passed to scipy.signal.iirfilter.
target (callable) – Recipient of filtered data.
- mc_reference(matrix, target)
Apply a referencing matrix to multichannel data.
- Parameters:
matrix (np.ndarray) – Referencing matrix.
- mc_select(channel, labels, target)
Select a specific channel from multichannel data.
- Parameters:
channel (int or str) – Channel index or label.
labels (list of str) – List of channel labels.
- normalize_index(index, ndim)
Expands an index into the same dimensionality as the array.
- Parameters:
index ({Ellipsis, None, slice, tuple, int, list, np.ndarray}) – Index to normalize.
ndim (int) – The dimension of the object that is being indexed.
- Returns:
norm_index – The expanded index.
- Return type:
tuple
- Raises:
IndexError – If 2D indexing is attempted or more than one Ellipsis is provided.
ValueError – If an unrecognized index type is encountered.
- reject_epochs(reject_threshold, mode, status_cb, valid_target)
Discard epochs containing artifacts based on threshold.
- Parameters:
reject_threshold (float or callable) – Rejection threshold.
mode ({'absolute value', 'amplitude'}) – Rejection logic.
status_cb (callable) – Callback with boolean mask of accepted epochs.
valid_target (callable) – Recipient of accepted epochs.
- rms(fs, duration, target)
Calculate RMS over fixed duration blocks.
- Parameters:
fs (float) – Sampling rate.
duration (float) – Duration of window for RMS calculation.
target (callable) – Target to receive RMS values.
- rms_band(fs, fl, fh, duration, target)
Calculate RMS within a specific frequency band.
- Parameters:
fs (float) – Sampling rate.
fl (float) – Lower frequency bound.
fh (float) – Upper frequency bound.
duration (float) – Duration for calculation.
target (callable) – Recipient of RMS results.
- transform(function, target)
Apply a function to incoming data and send result to target.
- Parameters:
function (callable) – Transformation to apply.
target (callable) – Recipient of transformed data.