psiaudio.pipeline module

class Events(events, start, end, fs)

Bases: object

Collection of events occurring over a given time span.

Maintains a DataFrame of events with timestamps and provides methods for sub-selecting events based on time or sample ranges.

events

DataFrame with columns ‘event’, ‘sample’, and ‘ts’.

Type:

pd.DataFrame

start

Start sample of the collection range.

Type:

int

end

End sample of the collection range.

Type:

int

fs

Sampling rate.

Type:

float

t_start

Start time in seconds.

Type:

float

t_end

End time in seconds.

Type:

float

get_latest(lb, ub=0)

Return events relative to the end of the collection range (in seconds).

Parameters:
  • lb (float) – Offset from end (seconds).

  • ub (float, optional) – Offset from end (seconds).

get_latest_samples(lb, ub=0)

Return events relative to the end of the collection range.

Parameters:
  • lb (int) – Offset from end for lower bound.

  • ub (int, optional) – Offset from end for upper bound.

get_range(lb, ub)

Return a new Events instance for the specified time range (in seconds).

Parameters:
  • lb (float) – Lower bound (seconds).

  • ub (float) – Upper bound (seconds).

get_range_samples(start, end)

Return a new Events instance for the specified sample range.

Parameters:
  • start (int) – Starting sample.

  • end (int) – Ending sample.

property range_samples

Number of samples in the range.

rate()

Calculate event rate (events per second).

property t0

Starting time of the range.

class PipelineData(arr, fs, s0=0, channel=None, metadata=None)

Bases: ndarray

A specialized ndarray subclass for audio and physiological data.

PipelineData maintains metadata such as sampling rate, starting sample, channel labels, and epoch-specific metadata across common NumPy operations.

fs

Sampling rate in Hz.

Type:

float

s0

Starting sample number of this segment.

Type:

int

t0

Starting time in seconds.

Type:

float

s_end

Ending sample number (exclusive).

Type:

int

t_end

Ending time in seconds (exclusive).

Type:

float

channel

Channel labels or identifiers.

Type:

list or None

metadata

Metadata associated with the data. If 3D (epochs), metadata is a list of dictionaries, one per epoch.

Type:

dict or list of dicts

add_metadata(key, value)

Add a key-value pair to metadata.

Parameters:
  • key (str) – Metadata key.

  • value (object) – Metadata value.

property duration

Duration of segment in seconds.

mean(axis=None, *args, **kwargs)

Calculate mean and update metadata.

Parameters:

axis ({None, int, str}) – Axis along which to compute the mean. Can be ‘time’, ‘channel’, or ‘epoch’.

property n_channels

Number of channels.

property n_epochs

Number of epochs (None if 1D or 2D).

property n_time

Number of time samples.

property t

Array of timestamps for each sample.

accumulate(n, axis, newaxis, status_cb, target)

Accumulate data segments and concatenate once N segments are received.

Parameters:
  • n (int) – Number of segments to accumulate.

  • axis (int) – Axis to concatenate along.

  • newaxis (bool) – If True, add a new axis before concatenation.

  • status_cb (callable, optional) – Called with the current count of accumulated segments.

  • target (callable) – Recipient of concatenated data.

auto_scale(scale, baseline, target)

Automatically scale data based on a baseline period.

Parameters:
  • scale (float) – Target maximum value.

  • baseline (float) – Duration in seconds for baseline calculation.

  • target (callable)

auto_th(n, baseline, target, fs='auto', mode='positive', auto_th_cb=None, current_th_cb=None)

Set threshold based on standard deviation of baseline.

Parameters:
  • n (float) – Standard deviations for threshold.

  • baseline (float) – Duration of baseline in seconds.

  • target (callable) – Recipient of boolean thresholded data.

  • fs (float or 'auto', optional)

  • mode ({'positive', 'negative', 'both'})

  • auto_th_cb (callable, optional) – Callback for calculated threshold.

  • current_th_cb (callable, optional) – Callback to override calculated threshold.

average(n, target)

Running average of incoming segments.

Parameters:
  • n (int) – Number of segments to average.

  • target (callable)

blocked(block_size, target)

Emit data in fixed-size blocks.

Parameters:
  • block_size (int) – Number of samples per block.

  • target (callable) – Recipient of blocked data.

broadcast(*targets)

Send incoming data to multiple targets.

Parameters:

*targets (callables) – Targets to receive data.

capture(fs, queue, target)

Capture segments of data on command.

Parameters:
  • fs (float) – Sampling rate.

  • queue (deque) – Queue of capture commands (t0).

  • target (callable) – Recipient of captured segments.

capture_epoch(epoch_s0, epoch_samples, info, target, fs=None, auto_send=False)

Coroutine to facilitate capture of a single epoch.

Parameters:
  • epoch_s0 (int) – Starting sample of epoch.

  • epoch_samples (int) – Number of samples to capture.

  • info (dict) – Metadata to be attached to the epoch.

  • target (callable) – Recipient of the captured epoch (PipelineData).

  • fs (float, optional) – Sampling rate.

  • auto_send (bool, optional) – If True, send samples as they are acquired (partial epochs).

combine_events(events)

Combine multiple Events collections into one.

Parameters:

events (list of Events) – Collections must be contiguous in time.

concat(arrays, axis=-1)

Concatenate PipelineData objects while preserving metadata.

Parameters:
  • arrays (list of PipelineData) – List of data segments to concatenate.

  • axis ({int, str}) – Axis along which to concatenate.

Returns:

concatenated

Return type:

PipelineData

continuous_auto_scale(scale, target)

Update scaling factor continuously based on global max seen so far.

coroutine(func)

Decorator to auto-start a coroutine.

decimate(q, target)

Decimate data by factor q (low-pass filtering followed by downsampling).

Parameters:
  • q (int) – Decimation factor.

  • target (callable) – Recipient of decimated data.

delay(n, target)

Introduce a delay of n samples by emitting NaNs first.

Parameters:
  • n (int) – Delay in samples.

  • target (callable) – Recipient of delayed data.

derivative(initial_state, target)

Time derivative of input signal.

Parameters:
  • initial_state (float) – Initial value for calculation.

  • target (callable)

detrend(mode, target)

Detrend epoched data.

Parameters:

mode ({None, 'constant', 'linear'}) – Detrending mode.

dim_axis(axis)

Map semantic axis names to integer indices.

Parameters:

axis ({int, str}) – Axis identifier (‘time’, ‘channel’, ‘epoch’ or -1, -2, -3).

Returns:

  • dim (str) – Semantic name (‘time’, ‘channel’, ‘epoch’).

  • axis (int) – Integer index.

discard(discard_samples, cb)

Discard a fixed number of samples from the start of the stream.

Parameters:
  • discard_samples (int) – Number of samples to discard.

  • cb (callable) – Target for samples after discarding.

downsample(q, target)

Downsample data by integer factor q (decimation without filtering).

Parameters:
  • q (int) – Downsampling factor.

  • target (callable) – Recipient of downsampled data.

edges(min_samples, target, initial_state=False, fs='auto', detect='both', min_events=0)

Detect rising and falling edges in a boolean signal.

Parameters:
  • min_samples (int) – Minimum samples for debouncing.

  • target (callable) – Recipient of Events instance.

  • initial_state (bool, optional) – Initial state of the signal.

  • fs (float or 'auto', optional) – Sampling rate.

  • detect ({'both', 'rising', 'falling'}) – Type of edges to detect.

  • min_events (int, optional) – Minimum number of events to accumulate before emitting.

ensure_dim(arrays, dim)

Ensure that arrays have the required dimensionality for a given operation.

Parameters:
  • arrays (list of np.ndarray) – Arrays to check and reshape.

  • dim ({'channel', 'epoch'}) – Target dimension type.

Returns:

reshaped_arrays

Return type:

list of np.ndarray

event_rate(block_size, block_step, target, s0_mode='center')

Calculate sliding-window event rate.

Parameters:
  • block_size (int) – Window size in samples.

  • block_step (int) – Step size in samples.

  • target (callable)

  • s0_mode ({'center', 'left', 'right'})

events_to_info(trigger_edge, base_info, target)

Convert events to epoch info dictionaries.

Parameters:
  • trigger_edge (str) – Event name that triggers epoching.

  • base_info (dict) – Base metadata for the epoch.

extract_epochs(fs, queue, epoch_size, target, buffer_size=0, empty_queue_cb=None, removed_queue=None, prestim_time=0, poststim_time=0, source_complete=None)

Extract epochs from an incoming stream of data based on trigger times.

Parameters:
  • fs (float) – Sampling rate.

  • queue (deque) – Queue of epoch requests (dictionaries with ‘t0’ and optional ‘metadata’).

  • epoch_size (float or None) – Size of epoch in seconds. If None, requests must have ‘duration’.

  • target (callable) – Recipient of extracted epochs (as PipelineData).

  • buffer_size (float) – Look-back buffer size in seconds.

  • empty_queue_cb (callable, optional) – Called when queue is empty and all epochs are captured.

  • removed_queue (deque, optional) – Queue of requests to cancel.

  • prestim_time (float) – Time before t0 to include in epoch.

  • poststim_time (float) – Time after epoch_size to include.

  • source_complete (Event, optional) – If set, signals that no more stimuli will be added to the queue.

extract_power(frequency, window, target)

Quadrature demodulation to extract power at a specific frequency.

Parameters:
  • frequency (float) – Target frequency.

  • window (str) – Scipy window name.

  • target (callable)

iirfilter(fs, N, Wn, rp, rs, btype, ftype, target)

Apply an IIR filter to a continuous stream of data.

Maintains filter state between chunks to avoid transients.

Parameters:
  • fs (float) – Sampling rate.

  • N (objects) – Arguments passed to scipy.signal.iirfilter.

  • Wn (objects) – Arguments passed to scipy.signal.iirfilter.

  • rp (objects) – Arguments passed to scipy.signal.iirfilter.

  • rs (objects) – Arguments passed to scipy.signal.iirfilter.

  • btype (objects) – Arguments passed to scipy.signal.iirfilter.

  • ftype (objects) – Arguments passed to scipy.signal.iirfilter.

  • target (callable) – Recipient of filtered data.

mc_reference(matrix, target)

Apply a referencing matrix to multichannel data.

Parameters:

matrix (np.ndarray) – Referencing matrix.

mc_select(channel, labels, target)

Select a specific channel from multichannel data.

Parameters:
  • channel (int or str) – Channel index or label.

  • labels (list of str) – List of channel labels.

normalize_index(index, ndim)

Expands an index into the same dimensionality as the array.

Parameters:
  • index ({Ellipsis, None, slice, tuple, int, list, np.ndarray}) – Index to normalize.

  • ndim (int) – The dimension of the object that is being indexed.

Returns:

norm_index – The expanded index.

Return type:

tuple

Raises:
  • IndexError – If 2D indexing is attempted or more than one Ellipsis is provided.

  • ValueError – If an unrecognized index type is encountered.

reject_epochs(reject_threshold, mode, status_cb, valid_target)

Discard epochs containing artifacts based on threshold.

Parameters:
  • reject_threshold (float or callable) – Rejection threshold.

  • mode ({'absolute value', 'amplitude'}) – Rejection logic.

  • status_cb (callable) – Callback with boolean mask of accepted epochs.

  • valid_target (callable) – Recipient of accepted epochs.

rms(fs, duration, target)

Calculate RMS over fixed duration blocks.

Parameters:
  • fs (float) – Sampling rate.

  • duration (float) – Duration of window for RMS calculation.

  • target (callable) – Target to receive RMS values.

rms_band(fs, fl, fh, duration, target)

Calculate RMS within a specific frequency band.

Parameters:
  • fs (float) – Sampling rate.

  • fl (float) – Lower frequency bound.

  • fh (float) – Upper frequency bound.

  • duration (float) – Duration for calculation.

  • target (callable) – Recipient of RMS results.

transform(function, target)

Apply a function to incoming data and send result to target.

Parameters:
  • function (callable) – Transformation to apply.

  • target (callable) – Recipient of transformed data.