Source code for psi.util

import logging
log = logging.getLogger(__name__)

import ast
import inspect
import json
from pathlib import Path
import threading
import textwrap
import uuid

import numpy as np
import pandas as pd
from atom.api import Atom, Property

from psiaudio.calibration import BaseCalibration


[docs] def as_numeric(x): if not isinstance(x, (np.ndarray, pd.DataFrame, pd.Series)): x = np.asanyarray(x) return x
[docs] def rpc(plugin_name, method_name): '''Decorator to map an Enaml command handler to the plugin method''' def wrapper(event): plugin = event.workbench.get_plugin(plugin_name) f = getattr(plugin, method_name) argnames = inspect.getfullargspec(f).args parameters = {} for k, v in event.parameters.items(): if k in argnames: parameters[k] = v return getattr(plugin, method_name)(**parameters) return wrapper
[docs] def get_tagged_members(obj, tag_name, tag_value=True, exclude_properties=False): result = {} if exclude_properties: match = lambda m: m.metadata and m.metadata.get(tag_name) == tag_value \ and not isinstance(m, Property) else: match = lambda m: m.metadata and m.metadata.get(tag_name) == tag_value return {n: m for n, m in obj.members().items() if match(m)}
[docs] def get_tagged_values(obj, tag_name, tag_value=True, exclude_properties=False): members = get_tagged_members(obj, tag_name, tag_value, exclude_properties) return {n: getattr(obj, n) for n in members}
[docs] def declarative_to_dict(value, tag_name, tag_value=True, include_dunder=True, seen_objects=None): if seen_objects is None: seen_objects = [] args = (tag_name, tag_value, include_dunder, seen_objects) if isinstance(value, int) \ or isinstance(value, float) \ or isinstance(value, str) \ or value is None: return value if isinstance(value, Path): return str(value) if id(value) in seen_objects: return f'__obj__::{id(value)}' else: seen_objects.append(id(value)) if isinstance(value, Atom): if include_dunder: result = { '__type__': value.__class__.__name__, '__id__': id(value), } else: result = {} for name, member in value.members().items(): if member.metadata and member.metadata.get(tag_name) == tag_value: v = getattr(value, name) result[name] = declarative_to_dict(v, *args) return result if isinstance(value, np.ndarray): return value.tolist() if isinstance(value, list): return [declarative_to_dict(v, *args) for v in value] if isinstance(value, dict): return {k: declarative_to_dict(v, *args) for k, v in value.items()} if isinstance(value, BaseCalibration): # Special case for the Calibration data since it's from psiaudio (and # we do not wish to introduce extra dependencies in psiaudio). attrs = [a for a in dir(value) if not \ (a.startswith('_') or callable(getattr(value, a)))] result = {a: declarative_to_dict(getattr(value, a), *args) \ for a in attrs} if include_dunder: result.update({ '__type__': value.__class__.__name__, '__id__': id(value) }) return result if hasattr(value, '__dict__') or hasattr(value, '__slots__'): if include_dunder: return { '__type__': value.__class__.__name__, '__id__': id(value), } else: return {} return value
[docs] def declarative_to_json(filename, value, tag_name, tag_value=True, include_dunder=True, seen_objects=None): filename = Path(filename) info = declarative_to_dict(value, tag_name, tag_value, include_dunder=include_dunder) filename.write_text(json.dumps(info, indent=2))
[docs] def dict_to_declarative(obj, info, skip_errors=False): for k, v in info.items(): if k in ('type', '__type__', '__id__'): continue if isinstance(v, dict) and ('type' in v or '__type__' in v): dict_to_declarative(getattr(obj, k), v, skip_errors) else: try: setattr(obj, k, v) except Exception as e: if not skip_errors: raise
[docs] def copy_declarative(old, exclude=None, **kw): attributes = get_tagged_values(old, 'metadata', exclude_properties=True) if exclude is not None: for k in exclude: attributes.pop(k, None) attributes.update(kw) new = old.__class__(**attributes) return new
class _FullNameGetter(ast.NodeVisitor): def __init__(self, *args, **kwargs): self.names = [] super(_FullNameGetter, self).__init__(*args, **kwargs) def visit_Name(self, node): self.names.append(node.id) def visit_Attribute(self, node): names = [] while isinstance(node, ast.Attribute): names.append(node.attr) node = node.value names.append(node.id) name = '.'.join(names[::-1]) self.names.append(name)
[docs] def get_dependencies(expression): tree = ast.parse(expression) ng = _FullNameGetter() ng.visit(tree) return ng.names
[docs] class SignalBuffer: def __init__(self, fs, size, fill_value=np.nan, dtype=np.double, n_channels=None): ''' Parameters ---------- fs : float Sampling rate of buffered signal size : float Duration of buffer in sec fill_value dtype ''' log.debug('Creating signal buffer with fs=%f and size=%f', fs, size) self._lock = threading.RLock() self._buffer_fs = fs self._buffer_samples = int(np.ceil(fs*size)) self._n_channels = n_channels self._size = size if n_channels is not None: shape = (self._n_channels, self._buffer_samples) else: shape = self._buffer_samples self._buffer = np.full(shape, fill_value, dtype=dtype) self._fill_value = fill_value self._samples = 0 # This is an attribute that represents the current "start" of the # buffered data. We always "push" data into the array from the right, # so the newest samples will appear at the end of the array. In the # beginning, the "start" of the buffered data is at the very end of hte # array, but as we fill up the array, `_ilb` will eventually always be # 0. self._ilb = self._buffer_samples
[docs] def resize(self, size): ''' Resize buffer to hold the specified number of time samples Notes ----- * This does not allow you to add/remove channels. * A request to decrease buffer size is ignored. ''' # Don't shrink buffer size. Not worth the effort. with self._lock: old_samples = self._buffer_samples self._buffer = self.get_latest(-size, fill_value=self._fill_value) self._buffer_samples = self._buffer.shape[-1] new_samples = self._buffer_samples # This corrects the lower bound index to point to the oldest data # in the buffer. self._ilb = max(0, self._ilb + (new_samples - old_samples))
[docs] def time_to_samples(self, t): ''' Convert time to samples (re acquisition start) ''' return round(t*self._buffer_fs)
[docs] def time_to_index(self, t): ''' Convert time to index in buffer. Note that the index may fall out of the buffered range. ''' i = self.time_to_samples(t) return self.samples_to_index(i)
[docs] def samples_to_index(self, i): # Convert index to the index in the buffer. Note that the index can # fall outside the buffered range. return i - self._samples + self._buffer_samples
[docs] def get_range_filled(self, lb, ub, fill_value): # Index of requested range with self._lock: ilb = self.time_to_samples(lb) iub = self.time_to_samples(ub) # Index of buffered range slb = self.get_samples_lb() sub = self.get_samples_ub() lpadding = max(slb-ilb, 0) elb = max(slb, ilb) rpadding = max(iub-sub, 0) eub = min(sub, iub) data = self.get_range_samples(elb, eub) padding = (lpadding, rpadding) if data.ndim == 2: padding = ((0, 0), (lpadding, rpadding)) else: padding = (lpadding, rpadding) return np.pad(data, padding, 'constant', constant_values=fill_value)
[docs] def get_range(self, lb=None, ub=None): with self._lock: if lb is None: lb = self.get_time_lb() if ub is None: ub = self.get_time_ub() ilb = None if lb is None else self.time_to_samples(lb) iub = None if ub is None else self.time_to_samples(ub) return self.get_range_samples(ilb, iub)
[docs] def get_range_samples(self, lb=None, ub=None): with self._lock: if lb is None: lb = self.get_samples_lb() if ub is None: ub = self.get_samples_ub() ilb = self.samples_to_index(lb) iub = self.samples_to_index(ub) if ilb < self._ilb: raise IndexError elif iub > self._buffer_samples: raise IndexError return self._buffer[..., ilb:iub]
[docs] def append_data(self, data): if self._n_channels is not None: if data.ndim != 2: raise ValueError('Appended data must be two-dimensional') if data.shape[0] != self._n_channels: raise ValueError(f'Appended data must have {self._n_channels} channels.') else: if data.ndim != 1: raise ValueError('Appended data must be one-dimensional') with self._lock: samples = data.shape[-1] if samples > self._buffer_samples: self._buffer[..., :] = data[..., -self._buffer_samples:] self._ilb = 0 else: self._buffer[..., :-samples] = self._buffer[..., samples:] self._buffer[..., -samples:] = data self._ilb = max(0, self._ilb - samples) self._samples += samples
def _invalidate(self, i): # This is only called by invalidate or invalidate_samples, which are # already wrapped inside a lock block. if i <= 0: self._buffer[:] = self._fill_value self._ilb = self._buffer_samples else: self._buffer[..., -i:] = self._buffer[..., :i] self._buffer[..., :-i] = np.nan self._ilb = self._ilb + self._buffer_samples - i
[docs] def invalidate(self, t): with self._lock: self.invalidate_samples(self.time_to_samples(t))
[docs] def invalidate_samples(self, i): with self._lock: if i >= self._samples: return bi = self.samples_to_index(i) self._invalidate(bi) di = self.get_samples_ub() - i self._samples -= di
[docs] def get_latest(self, lb, ub=0, fill_value=None): ''' Returns buffered data relative to the most recently-buffered sample Parameters ---------- lb : float Time in seconds relative to the most recently-buffered sample. Usually will be a negative value. ub : float Time in seconds relative to the most recently-buffered sample. Usually will be 0 or a negative value. Examples -------- Get the most recent 1 second of buffered data >>> buffer.get_latest(-1) Get the buffered data from -2 to -1 relative to current time. >>> buffer.get_latest(-2, -1) ''' with self._lock: log.trace('Converting latest %f to %f to absolute time', lb, ub) lb = lb + self.get_time_ub() ub = ub + self.get_time_ub() log.trace('Absolute time is %f to %f', lb, ub) if fill_value is None: return self.get_range(lb, ub) else: return self.get_range_filled(lb, ub, fill_value)
[docs] def get_time_lb(self): return self.get_samples_lb()/self._buffer_fs
[docs] def get_time_ub(self): with self._lock: return self.get_samples_ub()/self._buffer_fs
[docs] def get_samples_lb(self): with self._lock: return self._samples - self._buffer_samples + self._ilb
[docs] def get_samples_ub(self): with self._lock: return self._samples
[docs] class ConfigurationException(Exception): pass
[docs] def psi_json_decoder_hook(obj): ''' This adds support for loading legacy files generated using json-tricks ''' if isinstance(obj, dict) and '__ndarray__' in obj: return np.asarray(obj['__ndarray__'], dtype=obj['dtype']) else: return obj
[docs] class PSIJsonEncoder(json.JSONEncoder):
[docs] def default(self, obj): if isinstance(obj, np.integer): return int(obj) elif isinstance(obj, np.bool_): return int(obj) elif isinstance(obj, np.floating): return float(obj) elif isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, Path): return str(obj) elif isinstance(obj, uuid.UUID): return str(obj) elif isinstance(obj, slice): return str(obj) else: return super().default(obj)
[docs] def log_with_header(header, info): ''' Utility function to log multiple lines with a header with a single logging command. Parameters ---------- header : str Title to show in header info : sequence List or tuple of strings (one string per line) to put in the logging message. ''' hline = '*' * 80 text = f'* {header.ljust(76)} *' header = '\n'.join([hline, text, hline]) info = '\n'.join(info) log.info(f'\n{header}\n{info}')
[docs] def wrap_text(text): ''' Format message for display in the console This strips leading whitespace and wraps to a uniform width while preserving double line-breaks that indicate paragraphs. ''' formatted = [] for paragraph in text.split('\n\n'): if paragraph.strip() != '': formatted.append(textwrap.fill(textwrap.dedent(paragraph))) return '\n\n'.join(formatted).strip()