Skip to content
Snippets Groups Projects
Commit 857a72c8 authored by xiyehu2's avatar xiyehu2
Browse files

Merge branch 'master' into 'master'

waveform.py implementation

See merge request whuie2/awg-control!2
parents 5492c1bc 03048958
No related branches found
No related tags found
1 merge request!2waveform.py implementation
......@@ -31,6 +31,7 @@ class AWG:
self.mem_size = int64(0) # maximum memory size of the AWG
self.full_scale = int32(0) # full scale scaling of output voltage, used for data calculation
self.channel = [0,0,0,0] # activated channels
self.ch_amp = [0,0,0,0] # channel output amplitude
self.mode = "" # current mode AWG is running on
def open(self, remote=False):
......@@ -69,7 +70,7 @@ class AWG:
"""
flag = 0
msg = "Checking error at " + message + " ... "
if custom_msg != "":
if message != "":
flag = 1
sys.stdout.write(msg)
err_reg = uint32(0)
......@@ -114,7 +115,7 @@ class AWG:
spcm_dwSetParam_i32(self.card, SPC_M2CMD, M2CMD_CARD_FORCETRIGGER)
self.check_error()
def set_sampling_rate(self, sr):
def set_sampling_rate(self, sr: int):
"""
set sampling rate
:param sr: 64bit integer between 50MHz and 625MHz
......@@ -137,6 +138,7 @@ class AWG:
if self.channel[ch] == 0:
self.channel[ch] = 1
self.ch_amp = amplitude
spcm_dwSetParam_i64(self.card, SPC_CHENABLE, int64(2**ch)) # see CHANNEL0-3 in regs.py for detail
spcm_dwSetParam_i32(self.card, SPC_ENABLEOUT0 + ch * (SPC_ENABLEOUT1 - SPC_ENABLEOUT0), 1)
spcm_dwSetParam_i32(self.card, SPC_AMP0 + ch * (SPC_AMP1 - SPC_AMP0), amplitude)
......@@ -190,13 +192,13 @@ class AWG:
def get_aligned_array(self, size):
"""
returns an array at a page-aligned memory location, necessary for data buffering
returns a numpy array at a page-aligned memory location
:param size: number of samples used for data calculation
:return: numpy array starting at the correct location
"""
data_length_bytes = uint32(size * 2 * np.sum(self.channel))
buffer = pvAllocMemPageAligned(data_length_bytes) # buffer now holds a page-aligned location
buffer_data = cast(addressof(pvBuffer), ptr16) # cast it to int16 array
buffer_data = cast(addressof(buffer), ptr16) # cast it to int16 array
array = np.frombuffer(buffer_data, dtype=int16)
return array
......@@ -210,7 +212,7 @@ class AWG:
spcm_dwSetParam_i32(self.card, SPC_SEQMODE_MAXSEGMENTS,nseg) # set number of sequences the memory is divided into
self.check_error()
def write_segment(self, data: np.array, segment: int):
def write_segment(self, data: np.ndarray, segment: int):
"""
write data onto a specified segment.
:param data: numpy array containing waveform data
......
File added
File added
File added
File added
File added
File added
from AWG import *
import time
class Waveform:
def __init__(self, cf: int, df: int, n: int, sample_rate: int):
"""
helper class to store basic waveform information.
:param cf: center frequency tone of tweezer array.
:param df: differential frequency between neighboring tweezers.
:param n: number of tweezers to the left/right of center frequency tone, total number of tweezer is 2n+1.
:param sample_rate: sampling rate of the AWG to generate correct number of samples.
"""
# define some useful numbers
scale = 2**11 # Mingkun uses 2^11, caltech uses 2^15, maybe this is scaling up a float to int?
num_tz = 2*n + 1 # total number of tweezers to be generated
max_amp = scale / np.sqrt(num_tz) # again, saw this from multiple sources, not sure why
# self.amplitude = max_amp * np.ones(num_tz)
self.amplitude: np.ndarray = max_amp # uniform amplitude for now, this will eventually be tweaked finely with experiments
self.omega: np.ndarray = 2*np.pi * np.linspace(cf - n*df, cf + n*df, num_tz) # frequency tones
self.phi: np.ndarray = 2*np.pi * np.random.rand(num_tz) # random initial phases from 0-2pi, also will be tweaked finely with experiments
self.sample_rate: int = sample_rate
# self.debug = {
# "mat1": 0,
# "mat2": 0,
# "mat3": 0
# }
def create_static_array(wfm: Waveform, sample_len: int) -> np.ndarray:
"""
create a static-array-generating waveform with user set number of samples
:param wfm: waveform object already initialized with basic parameters.
:param sample_len: total number of samples to generate, must be multiples of 512. Note that more sample != higher resolution.
:return: returns a 1D array with static-array-generating waveform.
"""
# construct time axis, t_total(s) = sample_len / sample_rate, dt = t_total / sample_len
t = np.arange(sample_len) / wfm.sample_rate
# calculate individual sin waves, sig_mat[i] corresponds to data for ith tweezer
sin_mat = wfm.amplitude * np.sin(np.outer(wfm.omega,t) + np.expand_dims(wfm.phi, axis=1)) # shape=(number of tweezers x sample_len)
# sum up all rows to get final signal
return np.sum(sin_mat, axis=0)
def create_path_table(wfm: Waveform) -> np.ndarray:
"""
create a dim-3 look up table where the table[i,j] contains a sine wave to move tweezer i to tweezer j
:param wfm: waveform object already initialized with basic parameters.
:return: dim-3 ndarray
"""
# setup basic variables
twopi = 2*np.pi
vmax = KILO(20) * MEGA(1) # convert units, 20 kHz/us -> 20e3 * 1e6 Hz/s
dw_max = wfm.omega[-1] - wfm.omega[0] # Longest move in frequency
t_max = 2 * dw_max / vmax # Longest move sets the maximum moving time
a_max = -vmax * 2 / t_max # maximum acceleration, negative sign because of magic
sample_len_max = int(np.ceil(t_max * 4/5 * wfm.sample_rate)) # get number of samples required for longest move, this sets the size of lookup table
sample_len_max += (512 - sample_len_max % 512) # make overall length a multiple of 512 so AWG doesn't freak out
# now we calculate all possible trajectories, go to Group Notes/Projects/Rearrangement for detail
n = len(wfm.omega) # total number of tweezers
phi_paths = np.zeros((n, n, sample_len_max)) # lookup table to store all moves
t = np.arange(sample_len_max) / wfm.sample_rate # time series
# iterate! I think this part can be vectorized as well... but unnecessary.
for i, omega_i in enumerate(wfm.omega):
for j, omega_j in enumerate(wfm.omega): # j is the target position, i is starting position
if i == j: continue # skip diagonal entries, duh
dw = omega_j - omega_i # delta omega in the equation
adw = abs(dw)
# I advise reading through the notes page first before going further
phi_j = wfm.phi[j] % twopi # wrap around two pi
phi_i = wfm.phi[i] % twopi
dphi = phi_j - phi_i # delta phi in the equation
if dphi < 0: dphi = abs(dphi) + twopi - phi_i # warp around for negative phase shift
t_tot = np.sqrt(abs(4 * dw / a_max)) # calculate minimum time to complete move
t_tot = ((t_tot - 6*dphi/adw) // (12*np.pi/adw) + 1) * 12*np.pi/adw # extend move time to arrive at the correct phase
a = 4*dw/(t_tot**2) # adjust acceleration accordingly to ensure we still get to omega_j
end = int(np.ceil(t_tot * wfm.sample_rate)) # convert to an index in samples
half = int(end / 2) # index of sample half-way through the move where equation changes
t1 = t[:half] # first half of the move, slicing to make life easier
t2 = t[half:end] - t_tot/2 # time series for second half of the move
# do calculation
phi_paths[i,j, :half] = wfm.phi[i] + omega_i*t1 + a/6*t1**3 # t<=T/2
phi_paths[i,j, half:end] = phi_paths[i,j,half-1] + (omega_i+a/2*(t_tot/2)**2)*t2 + a/2*t_tot/2*t2**2 - a/6*t2**3 # t>=T/2
phi_paths[i,j, end:] = omega_j*t[end:] + (phi_j - omega_j*t_tot) % twopi # fill the rest with parameters of target wave
# now compile everything into sine wave
phi_paths = wfm.amplitude * np.sin(phi_paths)
return phi_paths
def create_moving_array(sin_mat: np.ndarray, path_table: np.ndarray, paths: np.ndarray) -> np.ndarray:
"""
create a rearranging signal that moves tweezers as specified by paths
:param wfm: waveform object already initialized with basic parameters.
:param sin_mat: 2d array where ith entry contains static sin wave for ith tweezer. See create_static_array for detail.
:param path_table: lookup table returned from create_path_table().
:param paths: 1d array filled with tuples indicating moving trajectories. Example: np.array([(1,0),(2,1)]) moves tweezer1->0,tweezer2->1
:return: 1D array with rearrangement-generating waveform.
"""
# for i,j in paths:
# sin_mat[i] = phi_paths[i,j]
# fyi, line below is equivalent to the for loop above
sin_mat[paths[:,0]] = path_table[paths[:,0], paths[:,1]] # copy dynamic trajectories from path_table,
sin_mat[np.setdiff1d(paths[:,1], paths[:,0])] = 0 # turn off tweezers that need to be turned off, moving 3-2, 2-1, 1-0 will turn off 0.
return np.sum(sin_mat, axis=0) # sum up all rows to get final signal
# return np.sum(sin_mat, axis=0), sin_mat/wfm.amplitude
def old_code():
"""
collection of unused code that might be useful later...
"""
# def create_phase_paths_old(wfm):
# # setup basic variables
# vmax = KILO(20) * MEGA(1) # 20 kHz/us -> 20 kHz * 1e6 / s
# dw_max = wfm.omega[-1] - wfm.omega[0] # Longest move in frequency
# t_max = 2 * dw_max / vmax # Longest move sets the maximum moving time
# a = -vmax * 2 / t_max # constant acceleration, negative sign because of magic
# sample_len = int(np.ceil(t_max * wfm.sample_rate)) # get number of samples required for longest move
# sample_len += (512 - sample_len % 512) # make overall length a multiple of 512
# # t = np.zeros(padding+sample_len)
# t = np.arange(sample_len) / wfm.sample_rate # get time series
#
# # generate phi for every possible move
# n = len(wfm.omega) # total number of tweezers
# phi_paths = np.zeros((n, n, sample_len)) # map to store all moves
# for i, omega_i in enumerate(wfm.omega):
# for j, omega_j in enumerate(wfm.omega): # I set j to be the target position, i to be starting position
# if i == j: continue # skil diagonal entries
# dw = omega_j - omega_i # delta omega
# t_tot = np.sqrt(abs(4 * dw / a)) # total time to travel dw, t_tot <= t_max
# end = int(np.ceil(t_tot * wfm.sample_rate)) # total number of samples to move dw
# half = int(end / 2) # index of sample half-way through the move
# t1 = t[:half] # time series for first half of the move
# t2 = t[half:end] - t_tot/2 # time series for second half of the move
# # do calculation
# phi_paths[i,j, :half] = wfm.phi[i] + omega_i*t1 + a/6*t1**3 # t<=T/2 note we are changing the ith tweezer
# phi_paths[i,j, half:end] = phi_paths[i,j,half-1] + (omega_i+a/2*(t_tot/2)**2)*t2 + a/2*t_tot/2*t2**2 - a/6*t2**3 # t>=T/2
# # phi_paths[i,j, half:end] = phi_paths[i,j,half-1] + (omega_i+a*(t_tot/2)**2)*t2 + a/2*t_tot/2*t2**2 - a/6*t2**3 # t>=T/2
# phi_paths[i,j, end:] = omega_j*t[end:] # fill the rest of the array with target frequency wave
# # phi_paths[i,j, end:] = phi_paths[i,j, end-1]*t[end:] # fill the rest of the array with same value
# return phi_paths
# sig_mat = wfm.amplitude * np.sin(np.outer(wfm.omega,t) + wfm.phi[:, np.newaxis]) # shape=(number of tweezers x sample_len)
# self.debug["mat1"] = np.zeros((n, n, 2)) # for phase debugging
# for diagnostic purposes
# path_idx[i, j] = (t_tot, end)
# phi_const[i,j, :half] = wfm.phi[i]
# phi_const[i,j, half:end] = phi_const[i,j,half-1] +
# print(a, t_tot)
# return phi_paths, wfm.amplitude * np.sin(np.outer(wfm.omega,t) + np.expand_dims(wfm.phi, axis=1)), path_idx
pass
def main():
# sample usage
np.random.seed(0)
cf = MEGA(80)
df = MEGA(0.2)
n = 10
# period = 1/cf
sample = 512*1000
rate = MEGA(625)
# rate = sample*cf/1e5
print("Sampling rate: ", rate/1e6)
print("center f: ", cf, "df: ", df, "total n: ", 2*n+1)
wave = Waveform(cf, df, n, rate)
static_sig = create_static_array(wave, sample)
table = create_path_table(wave)
# mat_copy = mat.copy()
# mat /= wave.amplitude
t1 = time.perf_counter()
repath = np.array([(1,0), (2,1), (3,2), (4,3), (5,4), (6,5), (7,6), (8,7), (9,8), (10,9)])
move_sig = create_moving_array(create_static_array(wave, table.shape[2]), table, repath)
print("rearrange signal generation time: ", time.perf_counter() - t1)
np.savetxt("static_signal.txt", static_sig, delimiter=',')
np.savetxt("move_signal.txt", move_sig, delimiter=',')
# np.savetxt("initial_phi.txt", wave.phi, delimiter=',')
# np.savetxt("sig_mat.txt", sig_mat, delimiter=',')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment