-
Yb Tweezer authoredYb Tweezer authored
structures.py 50.72 KiB
from __future__ import annotations
from .entangleware_control_link import (
debug_mode,
connect,
set_digital_state,
set_analog_state,
build_sequence,
run_sequence,
rerun_last_sequence,
stop_sequence,
clear_sequence,
disconnect,
)
from .entangleware_math import (
linear_ramp,
exponential_ramp,
sin,
cos,
)
from enum import Enum
from math import sqrt
def _to_bitlist(n: int, B: int=32): # big-endian
return [(n >> k) % 2 for k in range(B)]
UNSHIFT_ANALOG = lambda Vout: \
-1.0060901 / (2 * (-0.0033839)) \
+ (+1 if Vout > -1.0060901 / (2 * (-0.0033839)) else -1) \
* sqrt(
(1.0060901 / (2 * (-0.0033839)))**2
- (0.32121935 - Vout) / (-0.0033839)
)
class EventType(Enum):
Digital = 0
Analog = 1
def __eq__(self, other: EventType | int):
assert isinstance(other, (EventType, int))
return self.value == other
def __ne__(self, other: EventType | int):
assert isinstance(other, (EventType, int))
return self.value != other
def __lt__(self, other: EventType | int):
assert isinstance(other, (EventType, int))
return self.value < other
def __gt__(self, other: EventType | int):
assert isinstance(other, (EventType, int))
return self.value > other
def __le__(self, other: EventType | int):
assert isinstance(other, (EventType, int))
return self.value <= other
def __ge__(self, other: EventType | int):
assert isinstance(other, (EventType, int))
return self.value >= other
def __hash__(self):
return hash(self.value)
def dict_haskey_multi(D: dict, keys: list):
if len(keys) > 0:
return True if keys[0] in D.keys() else dict_haskey_multi(D, keys[1:])
else:
return False
def dict_get_multi(D: dict, keys: list, default=None):
if len(keys) > 0:
try:
return D[keys[0]]
except KeyError:
return dict_get_multi(D, keys[1:], default)
else:
return default
def _to_bitstr(k: int, N: int=None) -> str:
b = str(bin(k))[2:]
n = len(b) if N is None or N < 0 else N
return ((n - len(b)) * "0" + b)[-n:]
EVENTPROPS = {
EventType.Digital: {
"c": ("connector", "conn", "c"),
"m": ("mask", "m"),
"s": ("state", "s"),
},
EventType.Analog: {
"c": ("channel", "ch", "c"),
"s": ("state", "s"),
},
}
IEVENTPROPS = {
EventType.Digital: {
"board": "board",
"b": "board",
"connector": "connector",
"conn": "connector",
"c": "connector",
"mask": "mask",
"m": "mask",
"state": "state",
"s": "state",
},
EventType.Analog: {
"board": "board",
"b": "board",
"channel": "channel",
"ch": "channel",
"c": "channel",
"state": "state",
"s": "state",
},
}
class Event:
"""
Holds the time and state data of an event -- either digital or analog.
Fields
------
kind : EventType
data : dict[str, Number]
Contains state data of the event, dependent on the event's type:
EventType.Digital:
{'board': int, 'connector': int, 'mask': int, 'state': int}
EventType.Analog:
{'board': int, 'channel': int, 'state': float}
time : float
Time of the event in seconds.
"""
def __init__(self, kind: EventType, time: float=None, **data):
"""
Constructor.
Parameters
----------
kind : EventType
time : float (optional)
Time of the event in seconds.
**data
State data of the event, dependent on `kind` as keyword arguments:
EventType.Digital:
{'board': int, 'connector': int, 'mask': int, 'state': int}
EventType.Analog:
{'board': int, 'channel': int, 'state': float}
"""
self.kind = kind
assert all(dict_haskey_multi(data, props)
for props in EVENTPROPS[kind].values())
self.data = data
for key, val in self.data.items():
if key in EVENTPROPS[EventType.Digital]["m"] \
or key in EVENTPROPS[EventType.Analog]["s"]:
self.data[key] \
= ch(*val) if isinstance(val, (list, tuple, set)) \
else val
self.time = time
@staticmethod
def digital(time: float=None, **data):
"""
Construct a digital event.
Parameters
----------
time : float (optional)
Time of the event in seconds.
**data
State data of the event:
{'board': int, 'connector': int, 'mask': int, 'state': int}
"""
_data = data.copy()
if "kind" in _data.keys():
del _data["kind"]
return Event(EventType.Digital, time, **_data)
@staticmethod
def digital1(time: float=None, **data_s):
"""
Construct a digital event on a single channel.
Parameters
----------
time : float (optional)
Time of the event in seconds.
**data_s
State data of the event:
{'board': int, 'connector': int, 'channel': int, 'state': int}
"""
board = dict_get_multi(data_s, ("board", "b"), 0)
connector = dict_get_multi(data_s, EVENTPROPS[EventType.Digital]["c"])
channel = dict_get_multi(data_s, EVENTPROPS[EventType.Analog]["c"])
state = dict_get_multi(data_s, EVENTPROPS[EventType.Digital]["s"])
data = {
"board": board,
"connector": connector,
"mask": 1 << channel,
"state": int(bool(state)) << channel,
}
return Event(EventType.Digital, time, **data)
@staticmethod
def digitalc(conn: DigitalConnection, state: int, time: float,
with_delay: bool=True):
"""
Construct a digital event on a single channel using a DigitalConnection,
optionally accounting for rising/falling delays.
Parameters
----------
conn : DigitalConnection
state : int
HIGH/LOW (1/0) state of the output on the specified connection.
time : float
Time of the event in seconds.
with_delay : bool (optional)
Use the delay_up/delay_down fields of the connection to adjust
`time`.
"""
assert isinstance(conn, DigitalConnection)
board = 0
connector = conn.connector
channel = conn.channel
state = int(bool(state))
data = {
"board": board,
"connector": connector,
"mask": 1 << channel,
"state": int(bool(state)) << channel,
}
_time = (time - (conn.delay_down, conn.delay_up)[state]) if with_delay \
else time
return Event(EventType.Digital, _time, **data)
@staticmethod
def analog(time: float=None, **data):
"""
Construct an analog event.
Parameters
----------
time : float (optional)
Time of the event in seconds.
**data
State data of the event:
{'board': int, 'channel': int, 'state': float}
"""
_data = data.copy()
if "kind" in _data.keys():
del _data["kind"]
return Event(EventType.Analog, time, **_data)
@staticmethod
def analogc(conn: AnalogConnection, state: float, time: float,
with_delay: bool=True):
"""
Construct an analog event on a single channel using an AnalogConnection,
optionally accounting for timing delays.
Parameters
----------
conn : AnalogConnection
state: float
Output voltage of the output on the specified connection.
time : float
Time of the event in seconds.
with_delay : bool (optional)
Use the delay field of the connection to adjust `time`.
"""
assert isinstance(conn, AnalogConnection)
board = 0
channel = conn.channel
state = float(state)
data = {
"board": board,
"channel": channel,
"state": state,
}
_time = (time - conn.delay) if with_delay else time
return Event(EventType.Analog, _time, **data)
def __matmul__(self, time: float) -> Event:
"""
Set self.time and return self, e.g.:
```
E = Event(...) @ time
```
"""
assert isinstance(time, (float, int))
self.time = time
return self
def gen_args(self, **kwargs) -> tuple:
"""
Generate the proper arguments to pass to the backend function.
Parameters
----------
**kwargs
Keyword arguments to ensure that the event is valid for the
computer. Expected keyword arguments depend on self.kind:
EventType.Digital:
{'boards': int, 'connectors': int, 'channels': int}
EventType.Analog:
{'boards': int, 'channels': int, 'Vmin': float, 'Vmax': float}
"""
if self.kind == EventType.Digital:
b = dict_get_multi(self.data, ("board", "b"), 0)
c = dict_get_multi(self.data, EVENTPROPS[self.kind]["c"])
m = dict_get_multi(self.data, EVENTPROPS[self.kind]["m"])
s = dict_get_multi(self.data, EVENTPROPS[self.kind]["s"])
assert b < kwargs["boards"]
assert c < kwargs["connectors"]
assert m < 2 ** kwargs["channels"]
assert s < 2 ** kwargs["channels"]
return (b, c, m, m, s)
elif self.kind == EventType.Analog:
b = dict_get_multi(self.data, ("board", "b"), 0)
c = dict_get_multi(self.data, EVENTPROPS[self.kind]["c"])
s = dict_get_multi(self.data, EVENTPROPS[self.kind]["s"])
s = UNSHIFT_ANALOG(s)
assert b < kwargs["boards"]
assert c < kwargs["channels"]
assert s >= kwargs["Vmin"]
assert s <= kwargs["Vmax"]
return (b, c, s)
else:
raise Exception("Unknown EventType")
def to_primitives(self) -> dict[str, ...]:
"""
Generate a dictionary containing all event data.
"""
alldata = {
"is_digital": True if self.kind == EventType.Digital else False,
"time": float(self.time)
}
alldata.update({
IEVENTPROPS[self.kind][key]: value
for key, value in self.data.items()
})
return alldata
@staticmethod
def from_primitives(alldata: dict[str, ...]):
"""
Construct an Event object from a dict containing event data following
the format output by Event.to_primitives.
"""
assert "is_digital" in alldata.keys(), \
"Event.from_primitives: missing required key 'is_digital'"
assert "time" in alldata.keys(), \
"Event.from_primitives: missing required key 'time'"
data = alldata.copy()
kind = EventType.Digital if data.pop("is_digital") else EventType.Analog
time = float(data.pop("time"))
return Event(kind, data, time)
def domain(self) -> (EventType, int, int):
if self.kind == EventType.Digital:
return (self.kind, self.board, self.connector)
elif self.kind == EventType.Analog:
return (self.kind, self.board, 0)
else:
raise Exception("Unknown EventType")
def __getitem__(self, key: str):
assert isinstance(key, str)
if key == "kind":
return self.kind
elif key == "time":
return self.time
elif key in self.data.keys():
return self.data.get(key)
else:
raise KeyError
def __getattr__(self, key: str):
if key in self.data.keys():
return self.data.get(key)
else:
raise AttributeError
def __str__(self):
kv = [
(s, _to_bitstr(self[s], 32) if s in ["mask", "state"] else self[s])
for s in sorted(self.data.keys())
]
return f"Event({self.kind},\n" \
+ ",\n".join(
f" {IEVENTPROPS[self.kind][k]:>9s} = {v}" for k, v in kv
) + "\n" \
+ f") @ t = {self.time}"
class Sequence:
"""
List-like of Event objects.
Fields
------
events : list[Event]
color: str
stack_idx: int
"""
def __init__(self, events: list[Event]=None, color: str=None,
stack_idx: int=None):
"""
Constructor.
Parameters
----------
events : list[Event] (optional)
color: str (optional)
stack_idx: int (optional)
"""
self.events = list() if events is None else events
self.color = color
self.stack_idx = stack_idx
def with_color(self, color: str):
self.color = color
return self
def with_stack_idx(self, stack_idx: int):
assert isinstance(stack_idx, int), "Stack idx must be an integer"
self.stack_idx = stack_idx
return self
@staticmethod
def from_digital_data(T: list[float], S: list[int], board: int,
connector: int, mask: int) -> Sequence:
"""
Construct a Sequence of digital events on `board`, `connector` with
`mask` from lists of times and states. `mask` is applied globally to all
states.
Parameters
----------
T : list[float]
S : list[int]
board : int
connector : int
mask : int
Returns
-------
seq : Sequence
"""
assert len(T) == len(S)
return Sequence([
Event(EventType.Digital,
board=board,
connector=connector,
mask=mask,
state=s,
time=t
) for t, s in zip(T, S)
])
@staticmethod
def from_digital1_data(T: list[float], S: list[int], board: int,
connector: int, channel: int) -> Sequence:
"""
Construct a Sequence of digital events on `board`, `connector`,
`channel` from lists of times and states.
Parameters
----------
T : list[float]
S : list[int]
board : int
connector : int
channel : int
"""
assert len(T) == len(S)
return Sequence([
Event(EventType.Digital,
board=board,
connector=connector,
mask=1 << channel,
state=int(bool(s)) << channel,
time=t
) for t, s in zip(T, S)
])
@staticmethod
def from_digitalc_data(T: list[float], S: list[int],
conn: DigitalConnection, with_delay: bool=True) -> Sequence:
"""
Construct a Sequence of digital events on `conn` from lists of times and
states, optionally using delay information from `conn`.
Parameters
----------
T : list[float]
S : list[int]
conn : DigitalConnection
with_delay : bool (optional)
"""
assert len(T) == len(S)
return Sequence.from_digital1_data(
[
t - (conn.delay_down, conn.delay_up)[S[i]]
for i, t in enumerate(T)
] if with_delay else T,
S,
*conn
)
@staticmethod
def from_analog_data(T: list[float], V: list[float], board: int,
channel: int) -> Sequence:
"""
Construct a Sequence of analog events on `board`, `channel` from lists
of times and voltages.
Parameters
----------
T, V : list[float]
board : int
channel : int
Returns
-------
seq : Sequence
"""
assert len(T) == len(V)
return Sequence([
Event(EventType.Analog,
board=board,
channel=channel,
state=v,
time=t
) for t, v in zip(T, V)
])
@staticmethod
def from_analogc_data(T: list[float], V: list[float],
conn: AnalogConnection, with_delay: bool=True) -> Sequence:
"""
Construct a Sequence of analog events on `conn` from lists of times and
voltages, optionally using delay information from `conn`.
Parameters
----------
T : list[float]
V : list[float]
with_delay : bool (optional)
"""
assert len(T) == len(V)
return Sequence.from_analog_data(
[t - conn.delay for t in T] if with_delay else T,
V,
*conn
)
def to_primitives(self) -> dict[str, ...]:
"""
Generate a dict containing all sequence data as only primitives.
"""
return {
"events": [event.to_primitives() for event in self.events],
"color": self.color,
"stack_idx": self.stack_idx
}
@staticmethod
def from_primitives(alldata: dict[str, ...]):
"""
Construct an Event object from a dict containing event data following
the format output by Event.to_primitives.
"""
assert "events" in alldata.keys(), \
"Sequence.from_primitives: missing required key 'events'"
return Sequence(
[Event.from_primitives(event_dict)
for event_dict in alldata["events"]],
alldata.get("color"),
alldata.get("stack_idx")
)
@staticmethod
def digital_hilo(connector: int, channel: int, t0: float, t1: float) \
-> Sequence:
"""
Construct a Sequence of events for a single digital HIGH at `t0`,
followed by a digital LOW at `t1` on the specified connector and
channel.
Parameters
----------
connector : int
channel : int
t0 : float
t1 : float
Returns
-------
seq : Sequence
"""
seq = (Sequence()
<< Event.digital(c=connector, m=[channel], s=[channel]) @ t0
<< Event.digital(c=connector, m=[channel], s=0) @ t1
)
return seq
@staticmethod
def digital_hilo_c(conn: DigitalConnection, t0: float, t1: float,
with_delay: bool=True) -> Sequence:
"""
Construct a Sequence of events for a single digital HIGH at `t0`,
followed by a digital LOW at `t1` on the specified connection,
optionally using delay information from the connection.
Parameters
----------
conn : DigitalConnection
t0 : float
t1 : float
with_delay : bool (optional)
Returns
-------
seq : Sequence
"""
seq = (Sequence()
<< Event.digitalc(conn, 1, t0, with_delay)
<< Event.digitalc(conn, 0, t1, with_delay)
)
return seq
@staticmethod
def digital_lohi(connector: int, channel: int, t0: float, t1: float) \
-> Sequence:
"""
Construct a Sequence if events for a single digital LOW at `t0`,
followed by a digital HIGH at `t1` on the specified connector and
channel.
Parameters
----------
connector : int
channel : int
t0 : float
t1 : float
Returns
-------
seq : Sequence
"""
seq = (Sequence()
<< Event.digital(c=connector, m=[channel], s=0) @ t0
<< Event.digital(c=connector, m=[channel], s=[channel]) @ t1
)
return seq
@staticmethod
def digital_lohi_c(conn: DigitalConnection, t0: float, t1: float,
with_delay: bool=True) -> Sequence:
"""
Construct a Sequence of events for a single digital LOW at `t0`,
followed by a digital HIGH at `t1` on the specified connection,
optionally using delay information from the connection.
Parameters
----------
conn : DigitalConnection
t0 : float
t1 : float
with_delay : bool (optional)
Returns
-------
seq : Sequence
"""
seq = (Sequence()
<< Event.digitalc(conn, 0, t0, with_delay)
<< Event.digitalc(conn, 1, t1, with_delay)
)
return seq
@staticmethod
def digital_pulse(connector: int, channel: int, t0: float, dt: float,
invert: bool=False) -> Sequence:
"""
Construct a Sequence of events for a single digital pulse of duration
`dt` starting at `t0` on the specified connector and channel. Pass
`invert=True` to swap HIGH <-> LOW.
Parameters
----------
connector : int
channel : int
t0 : float
dt : float
invert : bool (optional)
Returns
-------
seq : Sequence
"""
if not invert:
return Sequence.digital_hilo(connector, channel, t0, t0 + dt)
else:
return Sequence.digital_lohi(connector, channel, t0, t0 + dt)
@staticmethod
def digital_pulse_c(conn: DigitalConnection, t0: float, dt: float,
invert: bool=False, with_delay: bool=True) -> Sequence:
"""
Construct a Sequence of events for a single digital pulse of duration
`dt` starting at `t0` on the specified connection, optionally using
delays specified in the connection. Pass `invert=True` to swap
HIGH <-> LOW.
Parameters
----------
conn : Digital Connection
t0 : float
dt : float
invert : bool (optional)
with_delay : bool (optional)
Returns
-------
seq : Sequence
"""
if not invert:
return Sequence.digital_hilo_c(conn, t0, t0 + dt, with_delay)
else:
return Sequence.digital_lohi_c(conn, t0, t0 + dt, with_delay)
@staticmethod
def serial_bits(connector: int, channel: int, t0: float, val: int,
length: int, val_reg: int, length_reg: int, connector_clk: int=None,
channel_clk: int=None, connector_sync: int=None,
channel_sync: int=None, clk_freq=1e6) -> Sequence:
"""
Construct a Sequence of events for writing a series of `length` bits
over (`connector`, `channel`) starting at time `t0` and corresponding to
an integer value `val`. The bits will be written in small-endian order
(most-significant digit first) with a register address prepended at a
speed corresponding to a clock frequency `clk_freq` and an optional
clock signal can be generated on (`connector_clk`, `channel_clk`). If a
clock signal is generated, the bit-write timing will be shifted backward
in time by a quarter of a clock cycle such that bit values are taken on
the falling edges of the clock. "Sync" falling/rising edges book-ending
the operation can also be generated on (`connector_sync` and
`channel_sync`) if provided.
Parameters
----------
connector : int
channel : int
t0 : float
val : int
length : int
val_reg : int
length_reg : int
connector_clk : int (optional)
channel_clk : int (optional)
connector_sync : int (optional)
channel_sync : int (optional)
clk_freq : float (optional)
Returns
-------
seq : Sequence
"""
use_clock = (connector_clk is not None) and (channel_clk is not None)
use_sync = (connector_sync is not None) and (channel_sync is not None)
bits = (
_to_bitlist(val_reg, length_reg)[::-1]
+ _to_bitlist(val, length)[::-1]
)
T = 1 / clk_freq
seq = Sequence()
if use_clock:
for k, b in enumerate(bits):
(seq
<< Event.digital1(c=connector, ch=channel, s=b)
@ (t0 + k * T - T / 2)
<< Event.digital1(c=connector, ch=channel, s=0)
@ (t0 + k * T + T / 4)
<< Event.digital1(c=connector_clk, ch=channel_clk, s=1)
@ (t0 + k * T - T / 2)
<< Event.digital1(c=connector_clk, ch=channel_clk, s=0)
@ (t0 + k * T)
)
else:
for k, b in enumerate(bits):
(seq
<< Event.digital1(c=connector, ch=channel, s=b)
@ (t0 + k * T)
<< Event.digital1(c=connector, ch=channel, s=0)
@ (t0 + k * T + T / 2)
)
if use_sync:
(seq
<< Event.digital1(c=connector_sync, ch=channel_sync, s=1)
@ (t0 - T / 2 - use_clock * (T / 2))
<< Event.digital1(c=connector_sync, ch=channel_sync, s=0)
@ (t0 - use_clock * (T / 2))
<< Event.digital1(c=connector_sync, ch=channel_sync, s=1)
@ (t0 + len(bits) * T - use_clock * (T / 2))
)
return seq
@staticmethod
def serial_bits_c(conn: DigitalConnection, t0: float, val: int,
length: int, val_reg: int, length_reg: int,
conn_clk: DigitalConnection=None, conn_sync: DigitalConnection=None,
clk_freq: float=1e6, with_delay: bool=True) -> Sequence:
"""
Construct a Sequence of events for writing a series of `length` bits
`conn` starting at time `t0` and corresponding to an integer value
`val`, optionally using timing delay information from `conn`. The bits
will be written in small-endian order (most-significant digit first)
with a register address prepended at a speed corresponding to a clock
frequency `clk_freq` and an optional clock signal can be generated on
`conn_clk`. If a clock signal is generated, the bit-write timing will be
shifted backward in time by a quarter of a clock cycle such that bit
values are taken on the falling edges of the clock. "Sync"
falling/rising edges book-ending the operation can also be generated on
`conn_sync` if provided. Overall timings are such that the first bit
will always be written at `t0`.
Parameters
----------
conn : DigitalConnection
t0 : float
val : int
length : int
val_reg : int
length_reg : int
conn_clk : DigitalConnection (optional)
conn_sync : DigitalConnection (optional)
clk_freq : float (optional)
with_delay : bool (optional)
"""
use_clock = conn_clk is not None
use_sync = conn_sync is not None
bits = (
_to_bitlist(val_reg, length_reg)[::-1]
+ _to_bitlist(val, length)[::-1]
)
T = 1 / clk_freq
seq = Sequence()
if use_clock:
for k, b in enumerate(bits):
(seq
<< Event.digitalc(conn, b, t0 + k * T - T / 2)
<< Event.digitalc(conn, 0, t0 + k * T + T / 4)
<< Event.digitalc(conn_clk, 1, t0 + k * T - T / 2)
<< Event.digitalc(conn_clk, 0, t0 + k * T)
)
else:
for k, b in enumerate(bits):
(seq
<< Event.digitalc(conn, b, t0 + k * T)
<< Event.digitalc(conn, 0, t0 + k * T + T / 2)
)
if use_sync:
(seq
<< Event.digitalc(
conn_sync, 1, t0 - T / 2 - use_clock * (T / 2))
<< Event.digitalc(
conn_sync, 0, t0 - T / 4 - use_clock * (T / 2))
<< Event.digitalc(
conn_sync, 1, t0 + len(bits) * T - use_clock * (T / 2))
)
return seq
@staticmethod
def analog_ramp_lin(channel: int, t0: float, t1: float, V0: float,
V1: float) -> Sequence:
"""
Construct a Sequence of events for a linear analog ramp beginning at
(t0, V0) and ending at (t1, V1) on the specified channel at maximum
voltage resolution.
Parameters
----------
channel : int
t0, t1 : float
V0, V1 : float
Returns
-------
seq : Sequence
"""
return Sequence.from_analog_data(
*linear_ramp(V0, V1, t0, t1), 0, channel)
@staticmethod
def analog_ramp_lin_c(conn: AnalogConnection, t0: float, t1: float,
V0: float, V1: float, with_delay: bool=True) -> Sequence:
"""
Construct a Sequence of events for a linear analog ramp beginning at
(t0, V0) and ending at (t1, V1) on the specified connection at maximum
resolution, optionally using timing delay information from `conn`.
Parameters
----------
conn : AnalogConnection
t0, t1 : float
V0, V1 : float
with_delay : float (optional)
Returns
-------
seq : Sequence
"""
return Sequence.from_analogc_data(
*linear_ramp(V0, V1, t0, t1), *conn, with_delay)
@staticmethod
def analog_ramp_exp(channel: int, t0: float, t1: float, V0: float,
V1: float, rate: float) -> Sequence:
"""
Construct a Sequence of events for an exponential ramp beginning at
(t0, V0) and ending at (t1, V1) on the specified channel at maximum
voltage resolution.
Parameters
----------
channel: int
V0, V1 : float
t0, t1 : float
rate : float
Returns
-------
seq : Sequence
"""
return Sequence.from_analog_data(
*exponential_ramp(V0, V1, t0, t1, rate), 0, channel)
@staticmethod
def analog_ramp_exp_c(conn: AnalogConnection, t0: float, t1: float,
V0: float, V1: float, rate: float, with_delay: bool=True) \
-> Sequence:
"""
Construct a Sequence of events for an exponential ramp beginning at
(t0, V0) and ending at (t1, V1) on the specified connection at maximum
voltage resolution, optionally using timing delay information from
`conn`.
Parameters
----------
channel: int
V0, V1 : float
t0, t1 : float
rate : float
with_delay : float (optional)
Returns
-------
seq : Sequence
"""
return Sequence.from_analogc_data(
*exponential_ramp(V0, V1, t0, t1, rate), *conn, with_delay)
@staticmethod
def analog_sin(channel: int, t0: float, t1: float, amp: float, dc: float,
freq: float, phase: float) -> Sequence:
"""
Construct a Sequence of events for a a sine wave beginning at `t0` and
ending at `t1` with amplitude `amp`, DC offset `dc`, frequency `freq`,
and phase shift `phase` on the specified channel.
Parameters
----------
channel : int
t0, t1 : float
amp, dc, freq, phase : float
Returns
-------
seq : Sequence
"""
return Sequence.from_analog_data(
*sin(amp, dc, freq, phase, t0, t1), 0, channel)
@staticmethod
def analog_sin_c(conn: AnalogConnection, t0: float, t1: float, amp: float,
dc: float, freq: float, phase: float, with_delay: bool=True) \
-> Sequence:
"""
Construct a Sequence of events for a a sine wave beginning at `t0` and
ending at `t1` with amplitude `amp`, DC offset `dc`, frequency `freq`,
and phase shift `phase` on the specified connection, optionally using
timing delay information from `conn`.
Parameters
----------
channel : int
t0, t1 : float
amp, dc, freq, phase : float
with_delay : bool (optional)
Returns
-------
seq : Sequence
"""
return Sequence.from_analogc_data(
*sin(amp, dc, freq, phase, t0, t1), *conn, with_delay)
@staticmethod
def analog_cos(channel: int, t0: float, t1: float, amp: float, dc: float,
freq: float, phase: float) -> Sequence:
"""
Construct a Sequence of events for a a cosine wave beginning at `t0` and
ending at `t1` with amplitude `amp`, DC offset `dc`, frequency `freq`,
and phase shift `phase` on the specified channel.
Parameters
----------
channel : int
amp, dc, freq, phase : float
t0, t1 : float
Returns
-------
seq : Sequence
"""
return Sequence.from_analog_data(
*cos(amp, dc, freq, phase, t0, t1), 0, channel)
@staticmethod
def analog_cos_c(conn: AnalogConnection, t0: float, t1: float, amp: float,
dc: float, freq: float, phase: float, with_delay: bool=True) \
-> Sequence:
"""
Construct a Sequence of events for a a cosine wave beginning at `t0` and
ending at `t1` with amplitude `amp`, DC offset `dc`, frequency `freq`,
and phase shift `phase` on the specified connection, optionally using
timing delay information from `conn`.
Parameters
----------
channel : int
t0, t1 : float
amp, dc, freq, phase : float
with_delay : bool (optional)
Returns
-------
seq : Sequence
"""
return Sequence.from_analogc_data(
*cos(amp, dc, freq, phase, t0, t1), *conn, with_delay)
def __lshift__(self, event: Event):
"""
Append an Event and return self.
"""
assert isinstance(event, Event)
assert event.time is not None
self.events.append(event)
return self
def __getitem__(self, idx: int):
return self.events[idx]
def __add__(self, other: Sequence | list[Event]):
"""
Concatenate two Sequences.
"""
if isinstance(other, Sequence):
return Sequence(self.events + other.events)
elif isinstance(other, list):
assert all(
isinstance(e, Event) and e.time is not None for e in other)
return Sequence(self.events + other)
else:
return ValueError
return Sequence(self.events + other.events)
def __radd__(self, other):
"""
Mainly so that you can call `sum` on an interator of Sequences.
"""
if isinstance(other, Sequence):
return other.__add__(self)
elif isinstance(other, int) and other == 0:
return self
elif isinstance(other, list):
assert all(
isinstance(e, Event) and e.time is not None for e in other)
return Sequence(other + self.events)
else:
raise NotImplemented
def join(self, other: Sequence):
assert isinstance(other, Sequence)
return self + other
@staticmethod
def joinall(*seqs: Sequence):
assert all(isinstance(o, Sequence) for o in seqs)
return sum(seqs)
def __iter__(self):
return iter(self.events)
def __len__(self):
return len(self.events)
def __str__(self):
return f"Sequence([\n" \
+ ",\n".join(
"\n".join(" " + s for s in str(e).split("\n"))
for e in self.events
) \
+ "\n])"
def pop(self, idx: int=-1) -> Event:
"""
Pop the Event at index `idx` from self.
"""
return self.events.pop(idx)
def insert(self, idx: int, event: Event):
"""
Insert an Event at index `idx` and return self.
"""
assert isinstance(event, Event)
assert event.time is not None
self.events.insert(idx, event)
return self
def append(self, event: Event):
"""
Append an Event and return self.
"""
assert isinstance(event, Event)
assert event.time is not None
self.events.append(event)
return self
def min_time(self) -> float:
"""
Find the time of the earliest event in self.events.
"""
return min(map(lambda e: e.time, self.events)) \
if len(self.events) > 0 else 0.0
def max_time(self) -> float:
"""
Find the time of the latest event in self.events.
"""
return max(map(lambda e: e.time, self.events)) \
if len(self.events) > 0 else 0.0
def when(self) -> (float, float):
"""
Return (self.min_time(), self.max_time()).
"""
return (self.min_time(), self.max_time())
def domains(self) -> set[(EventType, int, int)]:
"""
Return a set of all domains (digital/analog, board/connector/channel)
involved in the sequence.
"""
return set(map(lambda e: e.domain(), self.events))
def _get_states(self, kind: EventType, **location) -> list[tuple]:
ts = list()
if kind == EventType.Digital:
c = dict_get_multi(location, ("connector", "conn", "c"))
ch = dict_get_multi(location, ("channel", "ch", "c"))
for event in self.events:
if event.kind != kind:
continue
_c = dict_get_multi(event, ("connector", "conn", "c"))
_ch = _to_bitlist(dict_get_multi(event, ("mask", "m")))[ch]
if event.kind == kind and _c == c and _ch:
ts.append((
event.time,
_to_bitlist(dict_get_multi(event, ("state", "s")))[ch]
))
elif kind == EventType.Analog:
ch = dict_get_multi(location, ("channel", "ch"))
for event in self.events:
if event.kind != kind:
continue
_ch = dict_get_multi(event, ("channel", "ch", "c"))
if event.kind == kind and _ch == ch:
ts.append((
event.time,
dict_get_multi(event, ("state", "s"))
))
else:
raise Exception("Invalid EventType")
return ts
class Connection:
"""
Template for a connection running from a Computer to the experiment. Not
intended for instantiation on its own.
"""
def __init__(self):
raise NotImplemented
def __iter__(self):
raise NotImplemented
def gen_kwargs(self):
raise NotImplemented
def __getitem__(self, key: str):
raise NotImplemented
def keys(self):
raise NotImplemented
def __hash__(self):
return hash(tuple(self))
def __getstate__(self):
return self.__dict__.copy()
def __setstate__(self, state):
return self.__dict__.update(state)
class DigitalConnection(Connection):
"""
Digital connection from a Computer to the experiment.
"""
def __init__(self, connector: int, channel: int, default: bool=False,
delay_up: float=0.0, delay_down: float=0.0):
self.connector = int(connector)
self.channel = int(channel)
self.default = bool(default)
self.delay_up = float(delay_up)
self.delay_down = float(delay_down)
def __iter__(self):
return iter([self.connector, self.channel])
def gen_kwargs(self):
return {
"kind": EventType.Digital,
"connector": self.connector,
"mask": [self.channel],
"state": [self.channel] if self.default else 0
}
def __getitem__(self, key: str):
if key == "kind":
return EventType.Digital
elif key == "connector":
return self.connector
elif key == "channel":
return self.channel
elif key == "mask":
return [self.channel]
elif key == "state":
return [self.channel] if self.default else 0
elif key == "default":
return self.default
else:
raise KeyError
def keys(self):
return ["kind", "connector", "channel"]
def __str__(self):
kv = [
("connector", self.connector),
("channel", self.channel),
("default", "HIGH" if self.default else "LOW"),
("delay_up", self.delay_up),
("delay_down", self.delay_down),
]
return (
"DigitalConnection({\n"
+ ",\n".join(f" {k:>10s} = {v}" for k, v in kv)
+ "\n})"
)
class AnalogConnection(Connection):
"""
Analog connection from a Computer to the experiment.
"""
def __init__(self, channel: int, default: float=0.0, delay: float=0.0):
self.channel = int(channel)
self.default = float(default)
self.delay = float(delay)
def __iter__(self):
return iter([self.channel])
def gen_kwargs(self):
return {
"kind": EventType.Analog,
"channel": self.channel,
"state": self.default
}
def __getitem__(self, key: str):
if key == "kind":
return EventType.Analog
elif key == "channel":
return self.channel
elif key == "state":
return self.default
elif key == "default":
return self.default
else:
raise KeyError
def keys(self):
return ["kind", "channel"]
def __str__(self):
kv = [
("channel", self.channel),
("default", self.default),
("delay", self.delay),
]
return (
"AnalogConnection({\n"
+ ",\n".join(f" {k:>10s} = {v}" for k, v in kv)
+ "\n})"
)
class ConnectionLayout:
"""
Class to house a set of Connections (digital or analog) from a Computer to
the experiment.
"""
def __init__(self, **connections: dict[..., Connection]):
self.connections = connections
def __getitem__(self, key):
if isinstance(key, str):
return ConnectionLayout(**{key: self.connections[key]})
else:
return ConnectionLayout(**{k: self.connections[k] for k in key})
def __getattr__(self, attr):
return self.connections[attr]
def __getstate__(self):
return self.__dict__.copy()
def __setstate__(self, state):
self.__dict__.update(state)
def __iter__(self):
return iter([
connection.gen_kwargs() for connection in self.connections.values()
])
def get_connections(self):
return list(self.connections.items())
def __str__(self):
return (
"ConnectionLayout({\n"
+ ",\n".join(
f" {l}:\n" + "\n".join(" " + s for s in str(c).split("\n"))
for l, c in self.connections.items()
) + "\n})"
)
class Computer:
"""
Main interface to the backend.
Fields
------
address : str
port : int
ttl: int
num_{boards,connectors,channels}_d : int
Number of digital boards, connectors per board, and channels per
connector.
num_{boards,channels}_a : int
Number of analog boards and channels per board.
defaults : ConnectionLayout
Default states of named connections to the experiment.
"""
def __init__(self, address: str, port: int, ttl: int=1,
layout: tuple=(1, 4, 32, 1, 32, -10.0, 10.0),
defaults: ConnectionLayout=None):
"""
Constructor.
Parameters
----------
address : str
port : int
ttl : int (optional)
layout : (int, int, int, int, int, float, float)
Number of digital boards, connectors per board, and channels per
connector; number of analog boards, channels per board, and minimum
and maximum voltage on each channel.
defaults : ConnectionLayout (optional)
Defines (and names) established connections to the experiment.
"""
self.address = address
self.port = port
self.ttl = ttl
self.num_boards_d = layout[0]
self.num_connectors_d = layout[1]
self.num_channels_d = layout[2]
self.num_boards_a = layout[3]
self.num_channels_a = layout[4]
self.Vmin = layout[5]
self.Vmax = layout[6]
self.defaults = ConnectionLayout() if defaults is None else defaults
self._connected = False
self._queueing = False
def debug(self, state: bool=True):
"""
Show (or not) the Entangleware debug window.
"""
assert self._connected, "Must be connected"
debug_mode(state)
return self
def connect(self, timeout_sec=1.0):
"""
Connect to the backend.
"""
connect(timeout_sec, self.address, self.port, self.ttl)
self._connected = True
return self
def default(self, kind: EventType, **data):
"""
Set the default state (time = 0) with Event-like parameters (see Event).
Must be called before queueing any sequences.
"""
assert self._connected, "Must be connected"
assert all(dict_haskey_multi(data, props)
for props in EVENTPROPS[kind].values())
if kind == EventType.Digital:
b = dict_get_multi(data, ("board", "b"), 0)
c = dict_get_multi(data, EVENTPROPS[kind]["c"])
m = dict_get_multi(data, EVENTPROPS[kind]["m"])
s = dict_get_multi(data, EVENTPROPS[kind]["s"])
m = ch(*m) if isinstance(m, (list, tuple, set)) else m
s = ch(*s) if isinstance(s, (list, tuple, set)) else s
set_digital_state(0.0, b, c, m, m, s)
elif kind == EventType.Analog:
b = dict_get_multi(data, ("board", "b"), 0)
c = dict_get_multi(data, EVENTPROPS[kind]["c"])
s = dict_get_multi(data, EVENTPROPS[kind]["s"])
s = UNSHIFT_ANALOG(s)
set_analog_state(0.0, b, c, s)
else:
raise Exception("Unknown EventType")
return self
def def_digital(self, connector: int, channel: int, state: bool,
*args, **kwargs):
"""
Thin wrapper around Computer.default(EventType.Digital, ...) for a
single channel.
"""
return self.default(EventType.Digital,
connector=connector,
mask=[channel],
state=[channel] if state else 0
)
def def_analog(self, channel: int, state: float, *args, **kwargs):
"""
Thin wrapper around Computer.default(EventType.Analog, ...) for a single
channel.
"""
return self.default(EventType.Analog,
channel=channel,
state=state
)
def set_defaults(self):
"""
Set all channel defaults defined in `self.defaults`.
"""
for default_kwargs in self.defaults:
self.default(**default_kwargs)
return self
def zero(self, kind: EventType):
"""
Zero out every available channel of either digital or analog type.
Must be called before queueing any sequences.
"""
assert self._connected, "Must be connected"
if kind == EventType.Digital:
for i in range(self.num_boards_d):
for j in range(self.num_connectors_d):
set_digital_state(0.000,
i, j,
ch(*[k for k in range(self.num_channels_d)]),
ch(*[k for k in range(self.num_channels_d)]),
0
)
elif kind == EventType.Analog:
for i in range(self.num_boards_a):
for j in range(self.num_channels_a):
zero = UNSHIFT_ANALOG(0)
set_analog_state(0.000, i, j, zero)
else:
raise Exception("Unknown EventType")
return self
def enqueue(self, sequence: Sequence):
"""
Enqueue and build the Events in a Sequence.
"""
assert self._connected, "Must be connected"
assert isinstance(sequence, Sequence), "Object must be a Sequence"
if not self._queueing:
build_sequence()
for event in sequence:
if event.kind == EventType.Digital:
set_digital_state(
event.time,
*event.gen_args(
boards=self.num_boards_d,
connectors=self.num_connectors_d,
channels=self.num_channels_d
)
)
elif event.kind == EventType.Analog:
set_analog_state(
event.time,
*event.gen_args(
boards=self.num_boards_a,
channels=self.num_channels_a,
Vmin=self.Vmin,
Vmax=self.Vmax
)
)
else:
raise Exception("Unknown EventType")
return self
def run(self):
"""
Run all built Events.
"""
assert self._connected, "Must be connected"
run_sequence()
return self
def rerun(self):
"""
Re-run the last-loaded sequence.
"""
assert self._connected, "Must be connected"
rerun_last_sequence()
return self
def stop(self):
"""
Halt execution of the current sequence.
"""
assert self._connected, "Must be connected"
stop_sequence()
return self
def clear(self):
"""
Clear all Events from board memory.
"""
assert self._connected, "Must be connected"
clear_sequence()
return self
def disconnect(self):
"""
Disconnect from the backend.
"""
assert self._connected, "Must be connected"
disconnect()
def ch(*ch_nums: int, num_channels: int=32) -> int:
"""
Return an int whose bits correspond to a selection of enumerated channels.
For example, to select channels 0, 3, and 4 -- corresponding to the bit
string 11001 -- the invocation `ch(0, 3, 4)` returns `25`. Repeated
arguments toggle the state of the channel selection.
"""
acc = 0
for ch_num in ch_nums:
if ch_num >= num_channels:
raise Exception("Invalid channel number")
acc ^= 1 << ch_num
return acc
# set_digital_state(
# seqtime: f64, // time of state change
# board: u8, // board number, only have one
# connector: u8, // connector on board, 0-3
# channel_mask: u32, // bitmask for channels to be changed
# output_enable_state: u32, // bitmask for which channels drive their outputs
# output_state: u32 // state of channels
# )
# set_analog_state(
# seqtime: f64, // time of state change
# board: u8, // board number, only have one
# channel: u8, // 0-31
# value: f42 // [-10, +10]
# )