import logging
log = logging.getLogger(__name__)
from atom.api import Atom, Dict
[docs]
class BaseCallbackMixin(Atom):
#: Dictionary of callbacks
_callbacks = Dict()
[docs]
def complete(self):
log.debug('Triggering "done" callbacks')
for cb in self._callbacks.get('done', []):
cb()
[docs]
class ChannelSliceCallbackMixin(BaseCallbackMixin):
#: Dictionary of callbacks
_callbacks = Dict()
#: List of channel names for each task_name
_channel_names = Dict()
def _get_channel_slice(self, task_name, channel_names):
if channel_names is None:
return Ellipsis
names = self._channel_names[task_name]
if isinstance(channel_names, str):
# We want the channel slice to preserve dimensiality (i.e, we don't
# want to drop the channel dimension from the PipelineData object),
# so we return it as a list.
return [names.index(channel_names)]
return [names.index(c) for c in channel_names]
[docs]
def register_done_callback(self, callback):
self._callbacks.setdefault('done', []).append(callback)
[docs]
def register_ao_callback(self, callback, channel_name=None):
s = self._get_channel_slice('hw_ao', channel_name)
self._callbacks.setdefault('ao', []).append((channel_name, s, callback))
[docs]
def register_ai_callback(self, callback, channel_name=None):
s = self._get_channel_slice('hw_ai', channel_name)
self._callbacks.setdefault('ai', []).append((channel_name, s, callback))
[docs]
def register_ci_callback(self, callback, channel_name=None):
s = self._get_channel_slice('hw_ci', channel_name)
self._callbacks.setdefault('ci', []).append((channel_name, s, callback))
[docs]
def register_di_callback(self, callback, channel_name=None):
s = self._get_channel_slice('hw_di', channel_name)
self._callbacks.setdefault('di', []).append((channel_name, s, callback))
[docs]
def unregister_done_callback(self, callback):
try:
self._callbacks['done'].remove(callback)
except KeyError:
log.warning('Callback no longer exists.')
[docs]
def unregister_ao_callback(self, callback, channel_name):
try:
s = self._get_channel_slice('hw_ao', channel_name)
self._callbacks['ao'].remove((channel_name, s, callback))
except (KeyError, AttributeError):
log.warning('Callback no longer exists.')
[docs]
def unregister_ai_callback(self, callback, channel_name):
try:
s = self._get_channel_slice('hw_ai', channel_name)
self._callbacks['ai'].remove((channel_name, s, callback))
except (KeyError, AttributeError):
log.warning('Callback no longer exists.')
[docs]
def unregister_di_callback(self, callback, channel_name):
s = self._get_channel_slice('hw_di', channel_name)
self._callbacks['di'].remove((channel_name, s, callback))