Skip to content
Snippets Groups Projects
Commit 2335a8ca authored by whuie2's avatar whuie2
Browse files

preliminary abstraction ideas

parent aca0e827
No related branches found
No related tags found
No related merge requests found
......@@ -8,10 +8,12 @@ import time
class UdpMulticaster:
"""
UDP Multicaster:
This is used to 'multicast' to the network to find the Entangleware Control software.
The Entangleware Control Software, when running, will listen for this multicast event.
The time-to-live (MCAST_TTL) setting specifies how far within the intranet the multicast message will be allowed
to travel. '1' --> only the local subnet; '255' --> the entire intranet (and possibly leak to the internet)
This is used to 'multicast' to the network to find the Entangleware Control
software. The Entangleware Control Software, when running, will listen for
this multicast event. The time-to-live (MCAST_TTL) setting specifies how far
within the intranet the multicast message will be allowed to travel. '1' -->
only the local subnet; '255' --> the entire intranet (and possibly leak to
the internet)
"""
def __init__(self, mcast_group='239.255.45.57', mcast_port=50100, mcast_ttl=1):
self.MCAST_GRP = mcast_group
......@@ -31,11 +33,12 @@ class UdpMulticaster:
class TcpServer:
"""
TCP Server:
This is used by the python code to create a TCP server to which the Entangleware Control software will be a client.
The Entangleware software will receive the location of this server when the python code sends a multicast message
via UPD. After the Entangleware Control software attempts to connect, the connection is used to create the actual
TCP endpoint used for communication to/from the Entangleware Control software.
TCP Server: This is used by the python code to create a TCP server to which
the Entangleware Control software will be a client. The Entangleware
software will receive the location of this server when the python code sends
a multicast message via UPD. After the Entangleware Control software
attempts to connect, the connection is used to create the actual TCP
endpoint used for communication to/from the Entangleware Control software.
"""
def __init__(self, serveraddress=('', 0), timeout=1.0, backlog=1):
# private data ####
......@@ -62,8 +65,9 @@ class TcpServer:
class TcpEndPoint:
"""
TCP Endpoint:
This is the endpoint to which the TCP server passes the connection. The endpoint will handle most communication
to/from the Entangleware Control software.
This is the endpoint to which the TCP server passes the connection. The
endpoint will handle most communication to/from the Entangleware Control
software.
"""
def __init__(self, connection):
if isinstance(connection, socket.socket):
......@@ -101,8 +105,8 @@ class TcpEndPoint:
class ConnectionManager:
"""
Connection Manager:
This is the basic class that handles information about the connection to/from the Entangleware Control client.
Connection Manager: This is the basic class that handles information about
the connection to/from the Entangleware Control client.
"""
def __init__(self):
self.isConnected = False
......@@ -129,7 +133,8 @@ class ConnectionManager:
class DDS:
"""
This is an example of how to use python and Entangleware Control to control a AD9959 DDS. It may not work.
This is an example of how to use python and Entangleware Control to control
a AD9959 DDS. It may not work.
"""
def __init__(self, board, connector, mosi_pin, sclk_pin, cs_pin, reset_pin, ioupdate_pin):
self.board = board
......@@ -224,9 +229,11 @@ class DDS:
class Sequence:
"""
Sequence:
This class can be used to store a local version of the sequence before sending to the Entangleware Control software.
The sequence is initialized to be an empty byte array (with a memoryview object to insert data 'in-place'). As more
data is added, the byte array will automatically grow to accommodate any size (up to memory constraints)
This class can be used to store a local version of the sequence before
sending to the Entangleware Control software. The sequence is initialized to
be an empty byte array (with a memoryview object to insert data 'in-place').
As more data is added, the byte array will automatically grow to accommodate
any size (up to memory constraints)
"""
def __init__(self):
self.building = False
......@@ -269,11 +276,13 @@ class Sequence:
self.seqendindex = 0
def connect(timeout_sec=None):
# Create the Multicaster and the TCP Server. Multicast (i.e. send) the port number of the TCP server to the
# Entanglware Control software so that it can attempt to connect to python and receive data.
# If the Entangleware Control software is running, it has already subscribed to the Multicast location.
connmgr.udp_local = UdpMulticaster()
def connect(timeout_sec=None, address: str="127.0.0.1", port: int=50100, ttl: int=1):
# Create the Multicaster and the TCP Server. Multicast (i.e. send) the port
# number of the TCP server to the # Entanglware Control software so that it
# can attempt to connect to python and receive data. # If the Entangleware
# Control software is running, it has already subscribed to the Multicast
# location.
connmgr.udp_local = UdpMulticaster(address, port, ttl)
connmgr.tcp_server = TcpServer()
connmgr.udp_local.sendmsg(int(connmgr.tcp_server.serverport))
try:
......@@ -376,32 +385,31 @@ def stop_sequence():
def set_digital_state(seqtime, board, connector, channel_mask, output_enable_state, output_state):
"""Sets the digital output state.
If 'build_sequence' hasn't been executed before this method, the method will ignore 'time' and immediately change
the digital state.
If 'build_sequence' has been executed before this method, the method will queue this state into the sequence which
will execute, in time-order, after 'run_sequence' has been executed.
Parameters:
:param seqtime: Absolute time, in seconds, when state will change. (double)
:param board: 7820R board number (unsigned 8-bit integer) -- starts at '0'
:param connector: Connector of the 7820R (unsigned 8-bit integer) -- starts at '0'
:param channel_mask: Mask of the channel(s) to be changed (unsigned 32-bit integer)
:param output_enable_state: State of output enable for the channel(s) to be changed (unsigned 32-bit integer)
:param output_state: State of the channel(s) starting at 'time' (unsigned 32-bit integer)
Returns:
:return:
"""
Sets the digital output state. If 'build_sequence' hasn't been executed
before this method, the method will ignore 'time' and immediately change the
digital state. If 'build_sequence' has been executed before this method,
the method will queue this state into the sequence which will execute, in
time-order, after 'run_sequence' has been executed.
Parameters
----------
seqtime : float
Absolute time, in seconds, when state will change.
board : 8-bit int >= 0
7820R board number -- starts at '0'
connector : 8-bit int >= 0
Connector of the 7820R -- starts at '0'
channel_mask : 32-bit int >= 0
Mask of the channel(s) to be changed
output_enable_state : 32-bit int >= 0
State of output enable for the channel(s) to be changed
output_state : 32-bit int >= 0
State of the channel(s) starting at `time`
Returns
-------
None
"""
connector = (0x01 << 24) | ((board & 0xFF) << 8) | (connector & 0xFF)
lv_instruct = 0x14 # instruction to Entangleware Control software
......@@ -418,10 +426,10 @@ def set_analog_state(seq_time, board, channel, value):
numtype = (int, float)
# print(type(seq_time))
lv_instruct = 0x14 # instruction to Entangleware Control software
if isinstance(seq_time, numtype) and \
isinstance(board, numtype) and \
isinstance(channel, numtype) and \
isinstance(value, numtype):
if isinstance(seq_time, numtype) \
and isinstance(board, numtype) \
and isinstance(channel, numtype) \
and isinstance(value, numtype):
board_in_range = (0 <= board <= 255)
channel_in_range = (0 <= channel <= 255)
if board_in_range and channel_in_range:
......@@ -433,10 +441,10 @@ def set_analog_state(seq_time, board, channel, value):
msgseq.addElement(to_send) # add to queue if building
else:
connmgr.tcp_endpoint.sendmsg(to_send, 0, lv_instruct) # set default if not building
elif isinstance(seq_time, list) and \
isinstance(board, numtype) and \
isinstance(channel, numtype) and \
isinstance(value, list):
elif isinstance(seq_time, list) \
and isinstance(board, numtype) \
and isinstance(channel, numtype) \
and isinstance(value, list):
# print('in list mode')
board_in_range = (0 <= board <= 255)
channel_in_range = (0 <= channel <= 255)
......
import math
def exponential(val_start, val_end, t_start, t_end, decay_rate):
"""
This will generate two lists containing time data and analog data. This data will be an exponential decay function
that will start at (tstart,val_start) and end at (t_end,val_end)
This will generate two lists containing time data and analog data. This data
will be an exponential decay function that will start at
(t_start, val_start) and end at (t_end, val_end)
Parameters
----------
val_start : float
Starting voltage of the ramp
val_end : float
Ending voltage of the ramp
t_start : float
Starting time of the ramp
t_end : float
Ending time of the ramp
decay_rate : float
Decay rate of the ramping exponential function
:param val_start:
:param val_end:
:param t_start:
:param t_end:
:param decay_rate:
:return:
Returns
-------
timesteps : list[float]
List of times
analogsteps : list[float]
List of analog values
"""
# Set start and end values to an integer; The intent is to format as a 16-bit integer for the DAC
# you may wish to warn the user if their initial or final value is outside the range of the DAC
# Set start and end values to an integer; The intent is to format as a
# 16-bit integer for the DAC. You may wish to warn the user if their initial
# or final value is outside the range of the DAC
q_val_start = int((val_start / 20) * (2 ** 16))
q_val_end = int((val_end / 20) * (2 ** 16))
if q_val_end == q_val_start:
......
import lib.entangleware_control_link as ew
#from enum import Enum
from enum import Enum
class Computer:
def __init__(self, address: str):
pass
def __init__(self, address: str, port: int, ttl: int, boards: list):
self.address = address
self.port = port
self.ttl = ttl
self.boards = boards
def __getitem__(self, idx: int, /):
return self.boards[idx]
def connect(timeout_sec=None):
ew.connect(timeout_sec, self.address, self.port, self.ttl)
class Board:
def __init__(self, computer: Computer):
def __init__(self, num_connectors: int, connector_channels: int):
self.num_connectors = num_connectors
self.connector_channels = connector_channels
def __getitem__(self, idxs: tuple, /):
return ch_sel(*idxs)
def run_sequence(self, sequence: Sequencer):
pass
class Sequencer:
def __init__(self, board: Board):
pass
def __init__(self, board: Board, sequence=None):
self.board = board
self.sequence = list()
def __lshift__(self):
def __lshift__(self, event: Event, /):
assert event.time is not None
self.sequence.append(event)
def __getitem__(self, idx: int, /):
return self.sequence[idx]
def __add__(self, other: Sequencer, /):
pass
def
class EventType(Enum):
Digital = 0
Analog = 1
class Event:
def __init__(self):
pass
def __init__(self, kind: EventType, data, time=None):
self.kind = kind
self.data = data
self.time = time
def __matmul__(self):
pass
def ch_sel(*ch_nums: int) -> int:
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment