from __future__ import annotations from .entangleware_control_link import ( debug_mode, connect, set_digital_state, set_analog_state, build_sequence, run_sequence, rerun_last_sequence, stop_sequence, clear_sequence, disconnect, ) from .entangleware_math import ( linear_ramp, exponential_ramp, sin, cos, ) from enum import Enum from math import sqrt def _to_bitlist(n: int, B: int=32): # big-endian return [(n >> k) % 2 for k in range(B)] UNSHIFT_ANALOG = lambda Vout: \ -1.0060901 / (2 * (-0.0033839)) \ + (+1 if Vout > -1.0060901 / (2 * (-0.0033839)) else -1) \ * sqrt( (1.0060901 / (2 * (-0.0033839)))**2 - (0.32121935 - Vout) / (-0.0033839) ) class EventType(Enum): Digital = 0 Analog = 1 def __eq__(self, other: EventType | int): assert isinstance(other, (EventType, int)) return self.value == other def __ne__(self, other: EventType | int): assert isinstance(other, (EventType, int)) return self.value != other def __lt__(self, other: EventType | int): assert isinstance(other, (EventType, int)) return self.value < other def __gt__(self, other: EventType | int): assert isinstance(other, (EventType, int)) return self.value > other def __le__(self, other: EventType | int): assert isinstance(other, (EventType, int)) return self.value <= other def __ge__(self, other: EventType | int): assert isinstance(other, (EventType, int)) return self.value >= other def __hash__(self): return hash(self.value) def dict_haskey_multi(D: dict, keys: list): if len(keys) > 0: return True if keys[0] in D.keys() else dict_haskey_multi(D, keys[1:]) else: return False def dict_get_multi(D: dict, keys: list, default=None): if len(keys) > 0: try: return D[keys[0]] except KeyError: return dict_get_multi(D, keys[1:], default) else: return default def _to_bitstr(k: int, N: int=None) -> str: b = str(bin(k))[2:] n = len(b) if N is None or N < 0 else N return ((n - len(b)) * "0" + b)[-n:] EVENTPROPS = { EventType.Digital: { "c": ("connector", "conn", "c"), "m": ("mask", "m"), "s": ("state", "s"), }, EventType.Analog: { "c": ("channel", "ch", "c"), "s": ("state", "s"), }, } IEVENTPROPS = { EventType.Digital: { "board": "board", "b": "board", "connector": "connector", "conn": "connector", "c": "connector", "mask": "mask", "m": "mask", "state": "state", "s": "state", }, EventType.Analog: { "board": "board", "b": "board", "channel": "channel", "ch": "channel", "c": "channel", "state": "state", "s": "state", }, } class Event: """ Holds the time and state data of an event -- either digital or analog. Fields ------ kind : EventType data : dict[str, Number] Contains state data of the event, dependent on the event's type: EventType.Digital: {'board': int, 'connector': int, 'mask': int, 'state': int} EventType.Analog: {'board': int, 'channel': int, 'state': float} time : float Time of the event in seconds. """ def __init__(self, kind: EventType, time: float=None, **data): """ Constructor. Parameters ---------- kind : EventType time : float (optional) Time of the event in seconds. **data State data of the event, dependent on `kind` as keyword arguments: EventType.Digital: {'board': int, 'connector': int, 'mask': int, 'state': int} EventType.Analog: {'board': int, 'channel': int, 'state': float} """ self.kind = kind assert all(dict_haskey_multi(data, props) for props in EVENTPROPS[kind].values()), "missing parameters" self.data = data for key, val in self.data.items(): if key in EVENTPROPS[EventType.Digital]["m"] \ or key in EVENTPROPS[EventType.Analog]["s"]: self.data[key] \ = ch(*val) if isinstance(val, (list, tuple, set)) \ else val self.time = time @staticmethod def digital(time: float=None, **data): """ Construct a digital event. Parameters ---------- time : float (optional) Time of the event in seconds. **data State data of the event: {'board': int, 'connector': int, 'mask': int, 'state': int} """ _data = data.copy() if "kind" in _data.keys(): del _data["kind"] return Event(EventType.Digital, time, **_data) @staticmethod def digital1(time: float=None, **data_s): """ Construct a digital event on a single channel. Parameters ---------- time : float (optional) Time of the event in seconds. **data_s State data of the event: {'board': int, 'connector': int, 'channel': int, 'state': int} """ board = dict_get_multi(data_s, ("board", "b"), 0) connector = dict_get_multi(data_s, EVENTPROPS[EventType.Digital]["c"]) channel = dict_get_multi(data_s, EVENTPROPS[EventType.Analog]["c"]) state = dict_get_multi(data_s, EVENTPROPS[EventType.Digital]["s"]) data = { "board": board, "connector": connector, "mask": 1 << channel, "state": int(bool(state)) << channel, } return Event(EventType.Digital, time, **data) @staticmethod def digitalc(conn: DigitalConnection, state: int, time: float, with_delay: bool=True): """ Construct a digital event on a single channel using a DigitalConnection, optionally accounting for rising/falling delays. Parameters ---------- conn : DigitalConnection state : int HIGH/LOW (1/0) state of the output on the specified connection. time : float Time of the event in seconds. with_delay : bool (optional) Use the delay_up/delay_down fields of the connection to adjust `time`. """ assert isinstance(conn, DigitalConnection), "digitalc: object must be DigitalConnection" board = 0 connector = conn.connector channel = conn.channel state = int(bool(state)) data = { "board": board, "connector": connector, "mask": 1 << channel, "state": int(bool(state)) << channel, } _time = (time - (conn.delay_down, conn.delay_up)[state]) if with_delay \ else time return Event(EventType.Digital, _time, **data) @staticmethod def analog(time: float=None, **data): """ Construct an analog event. Parameters ---------- time : float (optional) Time of the event in seconds. **data State data of the event: {'board': int, 'channel': int, 'state': float} """ _data = data.copy() if "kind" in _data.keys(): del _data["kind"] return Event(EventType.Analog, time, **_data) @staticmethod def analogc(conn: AnalogConnection, state: float, time: float, with_delay: bool=True): """ Construct an analog event on a single channel using an AnalogConnection, optionally accounting for timing delays. Parameters ---------- conn : AnalogConnection state: float Output voltage of the output on the specified connection. time : float Time of the event in seconds. with_delay : bool (optional) Use the delay field of the connection to adjust `time`. """ assert isinstance(conn, AnalogConnection), "analogc: object must be AnalogConnection" board = 0 channel = conn.channel state = float(state) data = { "board": board, "channel": channel, "state": state, } _time = (time - conn.delay) if with_delay else time return Event(EventType.Analog, _time, **data) def __matmul__(self, time: float) -> Event: """ Set self.time and return self, e.g.: ``` E = Event(...) @ time ``` """ assert isinstance(time, (float, int)) self.time = time return self def gen_args(self, **kwargs) -> tuple: """ Generate the proper arguments to pass to the backend function. Parameters ---------- **kwargs Keyword arguments to ensure that the event is valid for the computer. Expected keyword arguments depend on self.kind: EventType.Digital: {'boards': int, 'connectors': int, 'channels': int} EventType.Analog: {'boards': int, 'channels': int, 'Vmin': float, 'Vmax': float} """ if self.kind == EventType.Digital: b = dict_get_multi(self.data, ("board", "b"), 0) c = dict_get_multi(self.data, EVENTPROPS[self.kind]["c"]) m = dict_get_multi(self.data, EVENTPROPS[self.kind]["m"]) s = dict_get_multi(self.data, EVENTPROPS[self.kind]["s"]) assert b < kwargs["boards"], "invalid board" assert c < kwargs["connectors"], "invalid connector" assert m < 2 ** kwargs["channels"], "invalid mask" assert s < 2 ** kwargs["channels"], "invalid state" return (b, c, m, m, s) elif self.kind == EventType.Analog: b = dict_get_multi(self.data, ("board", "b"), 0) c = dict_get_multi(self.data, EVENTPROPS[self.kind]["c"]) s = dict_get_multi(self.data, EVENTPROPS[self.kind]["s"]) s = UNSHIFT_ANALOG(s) assert b < kwargs["boards"], "invalid board" assert c < kwargs["channels"], "invalid channel" assert s >= kwargs["Vmin"], "value < Vmin" assert s <= kwargs["Vmax"], "value > Vmax" return (b, c, s) else: raise Exception("Unknown EventType") def to_primitives(self) -> dict[str, ...]: """ Generate a dictionary containing all event data. """ alldata = { "is_digital": True if self.kind == EventType.Digital else False, "time": float(self.time) } alldata.update({ IEVENTPROPS[self.kind][key]: value for key, value in self.data.items() }) return alldata @staticmethod def from_primitives(alldata: dict[str, ...]): """ Construct an Event object from a dict containing event data following the format output by Event.to_primitives. """ assert "is_digital" in alldata.keys(), \ "Event.from_primitives: missing required key 'is_digital'" assert "time" in alldata.keys(), \ "Event.from_primitives: missing required key 'time'" data = alldata.copy() kind = EventType.Digital if data.pop("is_digital") else EventType.Analog time = float(data.pop("time")) return Event(kind, data, time) def domain(self) -> (EventType, int, int): if self.kind == EventType.Digital: return (self.kind, self.board, self.connector) elif self.kind == EventType.Analog: return (self.kind, self.board, 0) else: raise Exception("Unknown EventType") def __getitem__(self, key: str): assert isinstance(key, str), "__getitem__: key must be str" if key == "kind": return self.kind elif key == "time": return self.time elif key in self.data.keys(): return self.data.get(key) else: raise KeyError def __getattr__(self, key: str): if key in self.data.keys(): return self.data.get(key) else: raise AttributeError def __str__(self): kv = [ ( s, _to_bitstr(self[s], 32) if s in ["mask", "state"] and self.kind == EventType.Digital else self[s] ) for s in sorted(self.data.keys()) ] return f"Event({self.kind},\n" \ + ",\n".join( f" {IEVENTPROPS[self.kind][k]:>9s} = {v}" for k, v in kv ) + "\n" \ + f") @ t = {self.time}" class Sequence: """ List-like of Event objects. Fields ------ events : list[Event] color: str stack_idx: int """ def __init__(self, events: list[Event]=None, color: str=None, stack_idx: int=None): """ Constructor. Parameters ---------- events : list[Event] (optional) color: str (optional) stack_idx: int (optional) """ self.events = list() if events is None else events self.color = color self.stack_idx = stack_idx def set_color(self, color: str): self.color = color def with_color(self, color: str): self.set_color(color) return self def set_stack_idx(self, stack_idx: int): assert isinstance(stack_idx, int), "Stack idx must be an integer" self.stack_idx = stack_idx def with_stack_idx(self, stack_idx: int): self.set_stack_idx(stack_idx) return self @staticmethod def from_digital_data(board: int, connector: int, mask: int, T: list[float], S: list[int]) -> Sequence: """ Construct a Sequence of digital events on `board`, `connector` with `mask` from lists of times and states. `mask` is applied globally to all states. Parameters ---------- T : list[float] S : list[int] board : int connector : int mask : int Returns ------- seq : Sequence """ assert len(T) == len(S), "from_digital_data: data lists must be equal length" return Sequence([ Event(EventType.Digital, board=board, connector=connector, mask=mask, state=s, time=t ) for t, s in zip(T, S) ]) @staticmethod def from_digital1_data(board: int, connector: int, channel: int, T: list[float], S: list[int]) -> Sequence: """ Construct a Sequence of digital events on `board`, `connector`, `channel` from lists of times and states. Parameters ---------- T : list[float] S : list[int] board : int connector : int channel : int """ assert len(T) == len(S), "from_digital1_data: data lists must be equal length" return Sequence([ Event(EventType.Digital, board=board, connector=connector, mask=1 << channel, state=int(bool(s)) << channel, time=t ) for t, s in zip(T, S) ]) @staticmethod def from_digitalc_data(conn: DigitalConnection, T: list[float], S: list[int], with_delay: bool=True) -> Sequence: """ Construct a Sequence of digital events on `conn` from lists of times and states, optionally using delay information from `conn`. Parameters ---------- T : list[float] S : list[int] conn : DigitalConnection with_delay : bool (optional) """ assert len(T) == len(S), "from_digitalc_data: data lists must be equal length" return Sequence.from_digital1_data( 0, *conn, [ t - (conn.delay_down, conn.delay_up)[S[i]] for i, t in enumerate(T) ] if with_delay else T, S ) @staticmethod def from_analog_data(board: int, channel: int, T: list[float], V: list[float]) -> Sequence: """ Construct a Sequence of analog events on `board`, `channel` from lists of times and voltages. Parameters ---------- T, V : list[float] board : int channel : int Returns ------- seq : Sequence """ assert len(T) == len(V), "from_analog_data: data lists must be equal length" return Sequence([ Event(EventType.Analog, board=board, channel=channel, state=v, time=t ) for t, v in zip(T, V) ]) @staticmethod def from_analogc_data(conn: AnalogConnection, T: list[float], V: list[float], with_delay: bool=True) -> Sequence: """ Construct a Sequence of analog events on `conn` from lists of times and voltages, optionally using delay information from `conn`. Parameters ---------- T : list[float] V : list[float] with_delay : bool (optional) """ assert len(T) == len(V), "from_analogc_data: data lists must be equal length" return Sequence.from_analog_data( 0, *conn, [t - conn.delay for t in T] if with_delay else T, V ) def to_primitives(self) -> dict[str, ...]: """ Generate a dict containing all sequence data as only primitives. """ return { "events": [event.to_primitives() for event in self.events], "color": self.color, "stack_idx": self.stack_idx } @staticmethod def from_primitives(alldata: dict[str, ...]): """ Construct a Sequence object from a dict containing data following the format output by Event.to_primitives. """ assert "events" in alldata.keys(), \ "Sequence.from_primitives: missing required key 'events'" return Sequence( [Event.from_primitives(event_dict) for event_dict in alldata["events"]], alldata.get("color"), alldata.get("stack_idx") ) @staticmethod def digital_hilo(connector: int, channel: int, t0: float, t1: float) \ -> Sequence: """ Construct a Sequence of events for a single digital HIGH at `t0`, followed by a digital LOW at `t1` on the specified connector and channel. Parameters ---------- connector : int channel : int t0 : float t1 : float Returns ------- seq : Sequence """ seq = (Sequence() << Event.digital(c=connector, m=[channel], s=[channel]) @ t0 << Event.digital(c=connector, m=[channel], s=0) @ t1 ) return seq @staticmethod def digital_hilo_c(conn: DigitalConnection, t0: float, t1: float, with_delay: bool=True) -> Sequence: """ Construct a Sequence of events for a single digital HIGH at `t0`, followed by a digital LOW at `t1` on the specified connection, optionally using delay information from the connection. Parameters ---------- conn : DigitalConnection t0 : float t1 : float with_delay : bool (optional) Returns ------- seq : Sequence """ seq = (Sequence() << Event.digitalc(conn, 1, t0, with_delay) << Event.digitalc(conn, 0, t1, with_delay) ) return seq @staticmethod def digital_lohi(connector: int, channel: int, t0: float, t1: float) \ -> Sequence: """ Construct a Sequence if events for a single digital LOW at `t0`, followed by a digital HIGH at `t1` on the specified connector and channel. Parameters ---------- connector : int channel : int t0 : float t1 : float Returns ------- seq : Sequence """ seq = (Sequence() << Event.digital(c=connector, m=[channel], s=0) @ t0 << Event.digital(c=connector, m=[channel], s=[channel]) @ t1 ) return seq @staticmethod def digital_lohi_c(conn: DigitalConnection, t0: float, t1: float, with_delay: bool=True) -> Sequence: """ Construct a Sequence of events for a single digital LOW at `t0`, followed by a digital HIGH at `t1` on the specified connection, optionally using delay information from the connection. Parameters ---------- conn : DigitalConnection t0 : float t1 : float with_delay : bool (optional) Returns ------- seq : Sequence """ seq = (Sequence() << Event.digitalc(conn, 0, t0, with_delay) << Event.digitalc(conn, 1, t1, with_delay) ) return seq @staticmethod def digital_pulse(connector: int, channel: int, t0: float, dt: float, invert: bool=False) -> Sequence: """ Construct a Sequence of events for a single digital pulse of duration `dt` starting at `t0` on the specified connector and channel. Pass `invert=True` to swap HIGH <-> LOW. Parameters ---------- connector : int channel : int t0 : float dt : float invert : bool (optional) Returns ------- seq : Sequence """ if not invert: return Sequence.digital_hilo(connector, channel, t0, t0 + dt) else: return Sequence.digital_lohi(connector, channel, t0, t0 + dt) @staticmethod def digital_pulse_c(conn: DigitalConnection, t0: float, dt: float, invert: bool=False, with_delay: bool=True) -> Sequence: """ Construct a Sequence of events for a single digital pulse of duration `dt` starting at `t0` on the specified connection, optionally using delays specified in the connection. Pass `invert=True` to swap HIGH <-> LOW. Parameters ---------- conn : Digital Connection t0 : float dt : float invert : bool (optional) with_delay : bool (optional) Returns ------- seq : Sequence """ if not invert: return Sequence.digital_hilo_c(conn, t0, t0 + dt, with_delay) else: return Sequence.digital_lohi_c(conn, t0, t0 + dt, with_delay) @staticmethod def serial_bits(connector: int, channel: int, t0: float, val: int, length: int, val_reg: int, length_reg: int, connector_clk: int=None, channel_clk: int=None, connector_sync: int=None, channel_sync: int=None, clk_freq=1e6) -> Sequence: """ Construct a Sequence of events for writing a series of `length` bits over (`connector`, `channel`) starting at time `t0` and corresponding to an integer value `val`. The bits will be written in small-endian order (most-significant digit first) with a register address prepended at a speed corresponding to a clock frequency `clk_freq` and an optional clock signal can be generated on (`connector_clk`, `channel_clk`). If a clock signal is generated, the bit-write timing will be shifted backward in time by a quarter of a clock cycle such that bit values are taken on the falling edges of the clock. "Sync" falling/rising edges book-ending the operation can also be generated on (`connector_sync` and `channel_sync`) if provided. Parameters ---------- connector : int channel : int t0 : float val : int length : int val_reg : int length_reg : int connector_clk : int (optional) channel_clk : int (optional) connector_sync : int (optional) channel_sync : int (optional) clk_freq : float (optional) Returns ------- seq : Sequence """ use_clock = (connector_clk is not None) and (channel_clk is not None) use_sync = (connector_sync is not None) and (channel_sync is not None) bits = ( _to_bitlist(val_reg, length_reg)[::-1] + _to_bitlist(val, length)[::-1] ) T = 1 / clk_freq seq = Sequence() if use_clock: for k, b in enumerate(bits): (seq << Event.digital1(c=connector, ch=channel, s=b) @ (t0 + k * T - T / 2) << Event.digital1(c=connector, ch=channel, s=0) @ (t0 + k * T + T / 4) << Event.digital1(c=connector_clk, ch=channel_clk, s=1) @ (t0 + k * T - T / 2) << Event.digital1(c=connector_clk, ch=channel_clk, s=0) @ (t0 + k * T) ) else: for k, b in enumerate(bits): (seq << Event.digital1(c=connector, ch=channel, s=b) @ (t0 + k * T) << Event.digital1(c=connector, ch=channel, s=0) @ (t0 + k * T + T / 2) ) if use_sync: (seq << Event.digital1(c=connector_sync, ch=channel_sync, s=1) @ (t0 - T / 2 - use_clock * (T / 2)) << Event.digital1(c=connector_sync, ch=channel_sync, s=0) @ (t0 - use_clock * (T / 2)) << Event.digital1(c=connector_sync, ch=channel_sync, s=1) @ (t0 + len(bits) * T - use_clock * (T / 2)) ) return seq @staticmethod def serial_bits_c(conn: DigitalConnection, t0: float, val: int, length: int, val_reg: int, length_reg: int, conn_clk: DigitalConnection=None, conn_sync: DigitalConnection=None, clk_freq: float=1e6, with_delay: bool=True) -> Sequence: """ Construct a Sequence of events for writing a series of `length` bits `conn` starting at time `t0` and corresponding to an integer value `val`, optionally using timing delay information from `conn`. The bits will be written in small-endian order (most-significant digit first) with a register address prepended at a speed corresponding to a clock frequency `clk_freq` and an optional clock signal can be generated on `conn_clk`. If a clock signal is generated, the bit-write timing will be shifted backward in time by a quarter of a clock cycle such that bit values are taken on the falling edges of the clock. "Sync" falling/rising edges book-ending the operation can also be generated on `conn_sync` if provided. Overall timings are such that the first bit will always be written at `t0`. Parameters ---------- conn : DigitalConnection t0 : float val : int length : int val_reg : int length_reg : int conn_clk : DigitalConnection (optional) conn_sync : DigitalConnection (optional) clk_freq : float (optional) with_delay : bool (optional) """ use_clock = conn_clk is not None use_sync = conn_sync is not None bits = ( _to_bitlist(val_reg, length_reg)[::-1] + _to_bitlist(val, length)[::-1] ) T = 1 / clk_freq seq = Sequence() if use_clock: for k, b in enumerate(bits): (seq << Event.digitalc(conn, b, t0 + k * T - T / 2) << Event.digitalc(conn, 0, t0 + k * T + T / 4) << Event.digitalc(conn_clk, 1, t0 + k * T - T / 2) << Event.digitalc(conn_clk, 0, t0 + k * T) ) else: for k, b in enumerate(bits): (seq << Event.digitalc(conn, b, t0 + k * T) << Event.digitalc(conn, 0, t0 + k * T + T / 2) ) if use_sync: (seq << Event.digitalc( conn_sync, 1, t0 - T / 2 - use_clock * (T / 2)) << Event.digitalc( conn_sync, 0, t0 - T / 4 - use_clock * (T / 2)) << Event.digitalc( conn_sync, 1, t0 + len(bits) * T - use_clock * (T / 2)) ) return seq @staticmethod def analog_ramp_lin(channel: int, t0: float, t1: float, V0: float, V1: float) -> Sequence: """ Construct a Sequence of events for a linear analog ramp beginning at (t0, V0) and ending at (t1, V1) on the specified channel at maximum voltage resolution. Parameters ---------- channel : int t0, t1 : float V0, V1 : float Returns ------- seq : Sequence """ return Sequence.from_analog_data( 0, channel, *linear_ramp(V0, V1, t0, t1)) @staticmethod def analog_ramp_lin_c(conn: AnalogConnection, t0: float, t1: float, V0: float, V1: float, with_delay: bool=True) -> Sequence: """ Construct a Sequence of events for a linear analog ramp beginning at (t0, V0) and ending at (t1, V1) on the specified connection at maximum resolution, optionally using timing delay information from `conn`. Parameters ---------- conn : AnalogConnection t0, t1 : float V0, V1 : float with_delay : float (optional) Returns ------- seq : Sequence """ return Sequence.from_analogc_data( conn, *linear_ramp(V0, V1, t0, t1), with_delay) @staticmethod def analog_ramp_exp(channel: int, t0: float, t1: float, V0: float, V1: float, rate: float) -> Sequence: """ Construct a Sequence of events for an exponential ramp beginning at (t0, V0) and ending at (t1, V1) on the specified channel at maximum voltage resolution. Parameters ---------- channel: int V0, V1 : float t0, t1 : float rate : float Returns ------- seq : Sequence """ return Sequence.from_analog_data( 0, channel, *exponential_ramp(V0, V1, t0, t1, rate)) @staticmethod def analog_ramp_exp_c(conn: AnalogConnection, t0: float, t1: float, V0: float, V1: float, rate: float, with_delay: bool=True) \ -> Sequence: """ Construct a Sequence of events for an exponential ramp beginning at (t0, V0) and ending at (t1, V1) on the specified connection at maximum voltage resolution, optionally using timing delay information from `conn`. Parameters ---------- channel: int V0, V1 : float t0, t1 : float rate : float with_delay : float (optional) Returns ------- seq : Sequence """ return Sequence.from_analogc_data( conn, *exponential_ramp(V0, V1, t0, t1, rate), with_delay) @staticmethod def analog_sin(channel: int, t0: float, t1: float, amp: float, dc: float, freq: float, phase: float) -> Sequence: """ Construct a Sequence of events for a a sine wave beginning at `t0` and ending at `t1` with amplitude `amp`, DC offset `dc`, frequency `freq`, and phase shift `phase` on the specified channel. Parameters ---------- channel : int t0, t1 : float amp, dc, freq, phase : float Returns ------- seq : Sequence """ return Sequence.from_analog_data( 0, channel, *sin(amp, dc, freq, phase, t0, t1)) @staticmethod def analog_sin_c(conn: AnalogConnection, t0: float, t1: float, amp: float, dc: float, freq: float, phase: float, with_delay: bool=True) \ -> Sequence: """ Construct a Sequence of events for a a sine wave beginning at `t0` and ending at `t1` with amplitude `amp`, DC offset `dc`, frequency `freq`, and phase shift `phase` on the specified connection, optionally using timing delay information from `conn`. Parameters ---------- channel : int t0, t1 : float amp, dc, freq, phase : float with_delay : bool (optional) Returns ------- seq : Sequence """ return Sequence.from_analogc_data( conn, *sin(amp, dc, freq, phase, t0, t1), with_delay) @staticmethod def analog_cos(channel: int, t0: float, t1: float, amp: float, dc: float, freq: float, phase: float) -> Sequence: """ Construct a Sequence of events for a a cosine wave beginning at `t0` and ending at `t1` with amplitude `amp`, DC offset `dc`, frequency `freq`, and phase shift `phase` on the specified channel. Parameters ---------- channel : int amp, dc, freq, phase : float t0, t1 : float Returns ------- seq : Sequence """ return Sequence.from_analog_data( 0, channel, *cos(amp, dc, freq, phase, t0, t1)) @staticmethod def analog_cos_c(conn: AnalogConnection, t0: float, t1: float, amp: float, dc: float, freq: float, phase: float, with_delay: bool=True) \ -> Sequence: """ Construct a Sequence of events for a a cosine wave beginning at `t0` and ending at `t1` with amplitude `amp`, DC offset `dc`, frequency `freq`, and phase shift `phase` on the specified connection, optionally using timing delay information from `conn`. Parameters ---------- channel : int t0, t1 : float amp, dc, freq, phase : float with_delay : bool (optional) Returns ------- seq : Sequence """ return Sequence.from_analogc_data( conn, *cos(amp, dc, freq, phase, t0, t1), with_delay) def __lshift__(self, event: Event): """ Append an Event and return self. """ assert isinstance(event, Event), "__lshift__: object must be Event" assert event.time is not None, "__lshift__: Event must have time" self.events.append(event) return self def __getitem__(self, idx: int): return self.events[idx] def __add__(self, other: Sequence | list[Event]): """ Concatenate two Sequences. """ if isinstance(other, Sequence): return Sequence(self.events + other.events) elif isinstance(other, list): assert all( isinstance(e, Event) and e.time is not None for e in other), \ "__add__: all Events must have times" return Sequence(self.events + other) else: return ValueError return Sequence(self.events + other.events) def __radd__(self, other): """ Mainly so that you can call `sum` on an interator of Sequences. """ if isinstance(other, Sequence): return other.__add__(self) elif isinstance(other, int) and other == 0: return self elif isinstance(other, list): assert all( isinstance(e, Event) and e.time is not None for e in other), \ "__radd__: all Events must have times" return Sequence(other + self.events) else: raise NotImplementedError def __iadd__(self, other): if isinstance(other, Sequence): self.events += other.events elif isinstance(other, list): assert all( isinstance(e, Event) and e.time is not None for e in other), \ "__iadd__: all Events must have times" self.events += other else: raise NotImplementedError return self def join(self, other: Sequence): assert isinstance(other, Sequence), "join: can only join Sequences" return self + other @staticmethod def joinall(*seqs: Sequence): assert all(isinstance(o, Sequence) for o in seqs), "joinall: can only join Sequences" return sum(seqs) def __iter__(self): return iter(self.events) def __len__(self): return len(self.events) def __str__(self): return f"Sequence([\n" \ + ",\n".join( "\n".join(" " + s for s in str(e).split("\n")) for e in self.events ) \ + "\n])" def pop(self, idx: int=-1) -> Event: """ Pop the Event at index `idx` from self. """ return self.events.pop(idx) def insert(self, idx: int, event: Event): """ Insert an Event at index `idx` and return self. """ assert isinstance(event, Event), "insert: object must be Event" assert event.time is not None, "insert: Event must have a time" self.events.insert(idx, event) return self def append(self, event: Event): """ Append an Event and return self. """ assert isinstance(event, Event), "append: object must be Event" assert event.time is not None, "append: Event must have a time" self.events.append(event) return self def min_time(self) -> float: """ Find the time of the earliest event in self.events. """ return min(map(lambda e: e.time, self.events)) \ if len(self.events) > 0 else 0.0 def max_time(self) -> float: """ Find the time of the latest event in self.events. """ return max(map(lambda e: e.time, self.events)) \ if len(self.events) > 0 else 0.0 def when(self) -> (float, float): """ Return (self.min_time(), self.max_time()). """ return (self.min_time(), self.max_time()) def domains(self) -> set[(EventType, int, int)]: """ Return a set of all domains (digital/analog, board/connector/channel) involved in the sequence. """ return set(map(lambda e: e.domain(), self.events)) def _get_states(self, kind: EventType, **location) -> list[tuple]: ts = list() if kind == EventType.Digital: c = dict_get_multi(location, ("connector", "conn", "c")) ch = dict_get_multi(location, ("channel", "ch", "c")) for event in self.events: if event.kind != kind: continue _c = dict_get_multi(event, ("connector", "conn", "c")) _ch = _to_bitlist(dict_get_multi(event, ("mask", "m")))[ch] if event.kind == kind and _c == c and _ch: ts.append(( event.time, _to_bitlist(dict_get_multi(event, ("state", "s")))[ch] )) elif kind == EventType.Analog: ch = dict_get_multi(location, ("channel", "ch")) for event in self.events: if event.kind != kind: continue _ch = dict_get_multi(event, ("channel", "ch", "c")) if event.kind == kind and _ch == ch: ts.append(( event.time, dict_get_multi(event, ("state", "s")) )) else: raise Exception("Invalid EventType") return ts class Connection: """ Template for a connection running from a Computer to the experiment. Not intended for instantiation on its own. """ def __init__(self): raise NotImplementedError def __iter__(self): raise NotImplementedError def gen_kwargs(self): raise NotImplementedError def __getitem__(self, key: str): raise NotImplementedError def keys(self): raise NotImplementedError def __hash__(self): return hash(tuple(self)) def __getstate__(self): return self.__dict__.copy() def __setstate__(self, state): return self.__dict__.update(state) class DigitalConnection(Connection): """ Digital connection from a Computer to the experiment. """ def __init__(self, connector: int, channel: int, default: bool=False, delay_up: float=0.0, delay_down: float=0.0): self.connector = int(connector) self.channel = int(channel) self.default = bool(default) self.delay_up = float(delay_up) self.delay_down = float(delay_down) def __iter__(self): return iter([self.connector, self.channel]) def gen_kwargs(self): return { "kind": EventType.Digital, "connector": self.connector, "mask": [self.channel], "state": [self.channel] if self.default else 0 } def __getitem__(self, key: str): if key == "kind": return EventType.Digital elif key == "connector": return self.connector elif key == "channel": return self.channel elif key == "mask": return [self.channel] elif key == "state": return [self.channel] if self.default else 0 elif key == "default": return self.default else: raise KeyError def keys(self): return ["kind", "connector", "channel"] def __str__(self): kv = [ ("connector", self.connector), ("channel", self.channel), ("default", "HIGH" if self.default else "LOW"), ("delay_up", self.delay_up), ("delay_down", self.delay_down), ] return ( "DigitalConnection({\n" + ",\n".join(f" {k:>10s} = {v}" for k, v in kv) + "\n})" ) class AnalogConnection(Connection): """ Analog connection from a Computer to the experiment. """ def __init__(self, channel: int, default: float=0.0, delay: float=0.0): self.channel = int(channel) self.default = float(default) self.delay = float(delay) def __iter__(self): return iter([self.channel]) def gen_kwargs(self): return { "kind": EventType.Analog, "channel": self.channel, "state": self.default } def __getitem__(self, key: str): if key == "kind": return EventType.Analog elif key == "channel": return self.channel elif key == "state": return self.default elif key == "default": return self.default else: raise KeyError def keys(self): return ["kind", "channel"] def __str__(self): kv = [ ("channel", self.channel), ("default", self.default), ("delay", self.delay), ] return ( "AnalogConnection({\n" + ",\n".join(f" {k:>10s} = {v}" for k, v in kv) + "\n})" ) class ConnectionLayout: """ Class to house a set of Connections (digital or analog) from a Computer to the experiment. """ def __init__(self, **connections: dict[..., Connection]): self.connections = connections def __getitem__(self, key): if isinstance(key, str): return ConnectionLayout(**{key: self.connections[key]}) else: return ConnectionLayout(**{k: self.connections[k] for k in key}) def __getattr__(self, attr): return self.connections[attr] def __getstate__(self): return self.__dict__.copy() def __setstate__(self, state): self.__dict__.update(state) def __iter__(self): return iter([ connection.gen_kwargs() for connection in self.connections.values() ]) def get_connections(self): return list(self.connections.items()) def __str__(self): return ( "ConnectionLayout({\n" + ",\n".join( f" {l}:\n" + "\n".join(" " + s for s in str(c).split("\n")) for l, c in self.connections.items() ) + "\n})" ) class Computer: """ Main interface to the backend. Fields ------ address : str port : int ttl: int num_{boards,connectors,channels}_d : int Number of digital boards, connectors per board, and channels per connector. num_{boards,channels}_a : int Number of analog boards and channels per board. defaults : ConnectionLayout Default states of named connections to the experiment. """ def __init__(self, address: str, port: int, ttl: int=1, layout: tuple=(1, 4, 32, 1, 32, -10.0, 10.0), defaults: ConnectionLayout=None): """ Constructor. Parameters ---------- address : str port : int ttl : int (optional) layout : (int, int, int, int, int, float, float) Number of digital boards, connectors per board, and channels per connector; number of analog boards, channels per board, and minimum and maximum voltage on each channel. defaults : ConnectionLayout (optional) Defines (and names) established connections to the experiment. """ self.address = address self.port = port self.ttl = ttl self.num_boards_d = layout[0] self.num_connectors_d = layout[1] self.num_channels_d = layout[2] self.num_boards_a = layout[3] self.num_channels_a = layout[4] self.Vmin = layout[5] self.Vmax = layout[6] self.defaults = ConnectionLayout() if defaults is None else defaults self._connected = False self._queueing = False def debug(self, state: bool=True): """ Show (or not) the Entangleware debug window. """ assert self._connected, "Must be connected" debug_mode(state) return self def connect(self, timeout_sec=1.0): """ Connect to the backend. """ connect(timeout_sec, self.address, self.port, self.ttl) self._connected = True return self def default(self, kind: EventType, **data): """ Set the default state (time = 0) with Event-like parameters (see Event). Must be called before queueing any sequences. """ assert self._connected, "Must be connected" assert all(dict_haskey_multi(data, props) for props in EVENTPROPS[kind].values()), "missing information" if kind == EventType.Digital: b = dict_get_multi(data, ("board", "b"), 0) c = dict_get_multi(data, EVENTPROPS[kind]["c"]) m = dict_get_multi(data, EVENTPROPS[kind]["m"]) s = dict_get_multi(data, EVENTPROPS[kind]["s"]) m = ch(*m) if isinstance(m, (list, tuple, set)) else m s = ch(*s) if isinstance(s, (list, tuple, set)) else s set_digital_state(0.0, b, c, m, m, s) elif kind == EventType.Analog: b = dict_get_multi(data, ("board", "b"), 0) c = dict_get_multi(data, EVENTPROPS[kind]["c"]) s = dict_get_multi(data, EVENTPROPS[kind]["s"]) s = UNSHIFT_ANALOG(s) set_analog_state(0.0, b, c, s) else: raise Exception("Unknown EventType") return self def def_digital(self, connector: int, channel: int, state: bool, *args, **kwargs): """ Thin wrapper around Computer.default(EventType.Digital, ...) for a single channel. """ return self.default(EventType.Digital, connector=connector, mask=[channel], state=[channel] if state else 0 ) def def_analog(self, channel: int, state: float, *args, **kwargs): """ Thin wrapper around Computer.default(EventType.Analog, ...) for a single channel. """ return self.default(EventType.Analog, channel=channel, state=state ) def set_defaults(self): """ Set all channel defaults defined in `self.defaults`. """ for default_kwargs in self.defaults: self.default(**default_kwargs) return self def zero(self, kind: EventType): """ Zero out every available channel of either digital or analog type. Must be called before queueing any sequences. """ assert self._connected, "Must be connected" if kind == EventType.Digital: for i in range(self.num_boards_d): for j in range(self.num_connectors_d): set_digital_state(0.000, i, j, ch(*[k for k in range(self.num_channels_d)]), ch(*[k for k in range(self.num_channels_d)]), 0 ) elif kind == EventType.Analog: for i in range(self.num_boards_a): for j in range(self.num_channels_a): zero = UNSHIFT_ANALOG(0) set_analog_state(0.000, i, j, zero) else: raise Exception("Unknown EventType") return self def enqueue(self, sequence: Sequence): """ Enqueue and build the Events in a Sequence. """ assert self._connected, "Must be connected" assert isinstance(sequence, Sequence), "Object must be a Sequence" if not self._queueing: build_sequence() self._queueing = True for k, event in enumerate(sequence): if event.kind == EventType.Digital: set_digital_state( event.time, *event.gen_args( boards=self.num_boards_d, connectors=self.num_connectors_d, channels=self.num_channels_d ) ) elif event.kind == EventType.Analog: set_analog_state( event.time, *event.gen_args( boards=self.num_boards_a, channels=self.num_channels_a, Vmin=self.Vmin, Vmax=self.Vmax ) ) else: raise Exception("Unknown EventType") return self def run(self, printflag: bool=True): """ Run all built Events. """ assert self._connected, "Must be connected" self._queueing = False run_sequence(printflag) return self def rerun(self, printflag: bool=True): """ Re-run the last-loaded sequence. """ assert self._connected, "Must be connected" rerun_last_sequence(printflag) return self def rrun(self, printflag: bool=True): """ Call self.run if currently queueing (i.e. self.enqueue has been called, but neither self.run nor self.rrun yet), otherwise self.rerun. """ assert self._connected, "Must be connected" if self._queueing: self._queueing = False run_sequence(printflag) else: rerun_last_sequence(printflag) return self def stop(self): """ Halt execution of the current sequence. """ assert self._connected, "Must be connected" stop_sequence() return self def clear(self): """ Clear all Events from board memory. """ assert self._connected, "Must be connected" clear_sequence() return self def disconnect(self): """ Disconnect from the backend. """ assert self._connected, "Must be connected" disconnect() def ch(*ch_nums: int, num_channels: int=32) -> int: """ Return an int whose bits correspond to a selection of enumerated channels. For example, to select channels 0, 3, and 4 -- corresponding to the bit string 11001 -- the invocation `ch(0, 3, 4)` returns `25`. Repeated arguments toggle the state of the channel selection. """ acc = 0 for ch_num in ch_nums: if ch_num >= num_channels: raise Exception("Invalid channel number") acc ^= 1 << ch_num return acc # set_digital_state( # seqtime: f64, // time of state change # board: u8, // board number, only have one # connector: u8, // connector on board, 0-3 # channel_mask: u32, // bitmask for channels to be changed # output_enable_state: u32, // bitmask for which channels drive their outputs # output_state: u32 // state of channels # ) # set_analog_state( # seqtime: f64, // time of state change # board: u8, // board number, only have one # channel: u8, // 0-31 # value: f42 // [-10, +10] # )