Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • whuie2/awg-control
  • xiyehu2/awg-control
2 results
Show changes
Commits on Source (3)
......@@ -31,6 +31,7 @@ class AWG:
self.mem_size = int64(0) # maximum memory size of the AWG
self.full_scale = int32(0) # full scale scaling of output voltage, used for data calculation
self.channel = [0,0,0,0] # activated channels
self.ch_amp = [0,0,0,0] # channel output amplitude
self.mode = "" # current mode AWG is running on
def open(self, remote=False):
......@@ -69,7 +70,7 @@ class AWG:
"""
flag = 0
msg = "Checking error at " + message + " ... "
if custom_msg != "":
if message != "":
flag = 1
sys.stdout.write(msg)
err_reg = uint32(0)
......@@ -114,7 +115,7 @@ class AWG:
spcm_dwSetParam_i32(self.card, SPC_M2CMD, M2CMD_CARD_FORCETRIGGER)
self.check_error()
def set_sampling_rate(self, sr):
def set_sampling_rate(self, sr: int):
"""
set sampling rate
:param sr: 64bit integer between 50MHz and 625MHz
......@@ -137,6 +138,7 @@ class AWG:
if self.channel[ch] == 0:
self.channel[ch] = 1
self.ch_amp = amplitude
spcm_dwSetParam_i64(self.card, SPC_CHENABLE, int64(2**ch)) # see CHANNEL0-3 in regs.py for detail
spcm_dwSetParam_i32(self.card, SPC_ENABLEOUT0 + ch * (SPC_ENABLEOUT1 - SPC_ENABLEOUT0), 1)
spcm_dwSetParam_i32(self.card, SPC_AMP0 + ch * (SPC_AMP1 - SPC_AMP0), amplitude)
......@@ -190,13 +192,13 @@ class AWG:
def get_aligned_array(self, size):
"""
returns an array at a page-aligned memory location, necessary for data buffering
returns a numpy array at a page-aligned memory location
:param size: number of samples used for data calculation
:return: numpy array starting at the correct location
"""
data_length_bytes = uint32(size * 2 * np.sum(self.channel))
buffer = pvAllocMemPageAligned(data_length_bytes) # buffer now holds a page-aligned location
buffer_data = cast(addressof(pvBuffer), ptr16) # cast it to int16 array
buffer_data = cast(addressof(buffer), ptr16) # cast it to int16 array
array = np.frombuffer(buffer_data, dtype=int16)
return array
......@@ -210,7 +212,7 @@ class AWG:
spcm_dwSetParam_i32(self.card, SPC_SEQMODE_MAXSEGMENTS,nseg) # set number of sequences the memory is divided into
self.check_error()
def write_segment(self, data: np.array, segment: int):
def write_segment(self, data: np.ndarray, segment: int):
"""
write data onto a specified segment.
:param data: numpy array containing waveform data
......
File added
File added
File added
File added
File added
File added
File added
File added
File added
from AWG import *
import time
class Waveform:
def __init__(self, cf: int, df: int, n: int, sample_rate: int):
"""
helper class to store basic waveform information.
:param cf: center frequency tone of tweezer array.
:param df: differential frequency between neighboring tweezers.
:param n: number of tweezers to the left/right of center frequency tone, total number of tweezer is 2n+1.
:param sample_rate: sampling rate of the AWG to generate correct number of samples.
"""
# define some useful numbers
scale = 2**11 # Mingkun uses 2^11, caltech uses 2^15, maybe this is scaling up a float to int?
num_tz = 2*n + 1 # total number of tweezers to be generated
max_amp = scale / np.sqrt(num_tz) # again, saw this from multiple sources, not sure why
# self.amplitude = max_amp * np.ones(num_tz)
self.amplitude: np.ndarray = max_amp # uniform amplitude for now, this will eventually be tweaked finely with experiments
self.omega: np.ndarray = 2*np.pi * np.linspace(cf - n*df, cf + n*df, num_tz) # frequency tones
self.phi: np.ndarray = 2*np.pi * np.random.rand(num_tz) # random initial phases from 0-2pi, also will be tweaked finely with experiments
self.sample_rate: int = sample_rate
# self.debug = {
# "mat1": 0,
# "mat2": 0,
# "mat3": 0
# }
def create_static_array(wfm: Waveform, sample_len: int) -> np.ndarray:
"""
create a static-array-generating waveform with user set number of samples
:param wfm: waveform object already initialized with basic parameters.
:param sample_len: total number of samples to generate, must be multiples of 512. Note that more sample != higher resolution.
:return: returns a 1D array with static-array-generating waveform.
"""
# construct time axis, t_total(s) = sample_len / sample_rate, dt = t_total / sample_len
t = np.arange(sample_len) / wfm.sample_rate
# calculate individual sin waves, sig_mat[i] corresponds to data for ith tweezer
sin_mat = wfm.amplitude * np.sin(np.outer(wfm.omega,t) + np.expand_dims(wfm.phi, axis=1)) # shape=(number of tweezers x sample_len)
# sum up all rows to get final signal
return np.sum(sin_mat, axis=0)
def create_path_table(wfm: Waveform) -> np.ndarray:
"""
create a dim-3 look up table where the table[i,j] contains a sine wave to move tweezer i to tweezer j
:param wfm: waveform object already initialized with basic parameters.
:return: dim-3 ndarray
"""
# setup basic variables
twopi = 2*np.pi
vmax = KILO(20) * MEGA(1) # convert units, 20 kHz/us -> 20e3 * 1e6 Hz/s
dw_max = wfm.omega[-1] - wfm.omega[0] # Longest move in frequency
t_max = 2 * dw_max / vmax # Longest move sets the maximum moving time
a_max = -vmax * 2 / t_max # maximum acceleration, negative sign because of magic
sample_len_max = int(np.ceil(t_max * 4/5 * wfm.sample_rate)) # get number of samples required for longest move, this sets the size of lookup table
sample_len_max += (512 - sample_len_max % 512) # make overall length a multiple of 512 so AWG doesn't freak out
# now we calculate all possible trajectories, go to Group Notes/Projects/Rearrangement for detail
n = len(wfm.omega) # total number of tweezers
phi_paths = np.zeros((n, n, sample_len_max)) # lookup table to store all moves
t = np.arange(sample_len_max) / wfm.sample_rate # time series
# iterate! I think this part can be vectorized as well... but unnecessary.
for i, omega_i in enumerate(wfm.omega):
for j, omega_j in enumerate(wfm.omega): # j is the target position, i is starting position
if i == j: continue # skip diagonal entries, duh
dw = omega_j - omega_i # delta omega in the equation
adw = abs(dw)
# I advise reading through the notes page first before going further
phi_j = wfm.phi[j] % twopi # wrap around two pi
phi_i = wfm.phi[i] % twopi
dphi = phi_j - phi_i # delta phi in the equation
if dphi < 0: dphi = abs(dphi) + twopi - phi_i # warp around for negative phase shift
t_tot = np.sqrt(abs(4 * dw / a_max)) # calculate minimum time to complete move
t_tot = ((t_tot - 6*dphi/adw) // (12*np.pi/adw) + 1) * 12*np.pi/adw # extend move time to arrive at the correct phase
a = 4*dw/(t_tot**2) # adjust acceleration accordingly to ensure we still get to omega_j
end = int(np.ceil(t_tot * wfm.sample_rate)) # convert to an index in samples
half = int(end / 2) # index of sample half-way through the move where equation changes
t1 = t[:half] # first half of the move, slicing to make life easier
t2 = t[half:end] - t_tot/2 # time series for second half of the move
# do calculation
phi_paths[i,j, :half] = wfm.phi[i] + omega_i*t1 + a/6*t1**3 # t<=T/2
phi_paths[i,j, half:end] = phi_paths[i,j,half-1] + (omega_i+a/2*(t_tot/2)**2)*t2 + a/2*t_tot/2*t2**2 - a/6*t2**3 # t>=T/2
phi_paths[i,j, end:] = omega_j*t[end:] + (phi_j - omega_j*t_tot) % twopi # fill the rest with parameters of target wave
# now compile everything into sine wave
phi_paths = wfm.amplitude * np.sin(phi_paths)
return phi_paths
def create_moving_array(sin_mat: np.ndarray, path_table: np.ndarray, paths: np.ndarray) -> np.ndarray:
"""
create a rearranging signal that moves tweezers as specified by paths
:param wfm: waveform object already initialized with basic parameters.
:param sin_mat: 2d array where ith entry contains static sin wave for ith tweezer. See create_static_array for detail.
:param path_table: lookup table returned from create_path_table().
:param paths: 1d array filled with tuples indicating moving trajectories. Example: np.array([(1,0),(2,1)]) moves tweezer1->0,tweezer2->1
:return: 1D array with rearrangement-generating waveform.
"""
# for i,j in paths:
# sin_mat[i] = phi_paths[i,j]
# fyi, line below is equivalent to the for loop above
sin_mat[paths[:,0]] = path_table[paths[:,0], paths[:,1]] # copy dynamic trajectories from path_table,
sin_mat[np.setdiff1d(paths[:,1], paths[:,0])] = 0 # turn off tweezers that need to be turned off, moving 3-2, 2-1, 1-0 will turn off 0.
return np.sum(sin_mat, axis=0) # sum up all rows to get final signal
# return np.sum(sin_mat, axis=0), sin_mat/wfm.amplitude
def old_code():
"""
collection of unused code that might be useful later...
"""
# def create_phase_paths_old(wfm):
# # setup basic variables
# vmax = KILO(20) * MEGA(1) # 20 kHz/us -> 20 kHz * 1e6 / s
# dw_max = wfm.omega[-1] - wfm.omega[0] # Longest move in frequency
# t_max = 2 * dw_max / vmax # Longest move sets the maximum moving time
# a = -vmax * 2 / t_max # constant acceleration, negative sign because of magic
# sample_len = int(np.ceil(t_max * wfm.sample_rate)) # get number of samples required for longest move
# sample_len += (512 - sample_len % 512) # make overall length a multiple of 512
# # t = np.zeros(padding+sample_len)
# t = np.arange(sample_len) / wfm.sample_rate # get time series
#
# # generate phi for every possible move
# n = len(wfm.omega) # total number of tweezers
# phi_paths = np.zeros((n, n, sample_len)) # map to store all moves
# for i, omega_i in enumerate(wfm.omega):
# for j, omega_j in enumerate(wfm.omega): # I set j to be the target position, i to be starting position
# if i == j: continue # skil diagonal entries
# dw = omega_j - omega_i # delta omega
# t_tot = np.sqrt(abs(4 * dw / a)) # total time to travel dw, t_tot <= t_max
# end = int(np.ceil(t_tot * wfm.sample_rate)) # total number of samples to move dw
# half = int(end / 2) # index of sample half-way through the move
# t1 = t[:half] # time series for first half of the move
# t2 = t[half:end] - t_tot/2 # time series for second half of the move
# # do calculation
# phi_paths[i,j, :half] = wfm.phi[i] + omega_i*t1 + a/6*t1**3 # t<=T/2 note we are changing the ith tweezer
# phi_paths[i,j, half:end] = phi_paths[i,j,half-1] + (omega_i+a/2*(t_tot/2)**2)*t2 + a/2*t_tot/2*t2**2 - a/6*t2**3 # t>=T/2
# # phi_paths[i,j, half:end] = phi_paths[i,j,half-1] + (omega_i+a*(t_tot/2)**2)*t2 + a/2*t_tot/2*t2**2 - a/6*t2**3 # t>=T/2
# phi_paths[i,j, end:] = omega_j*t[end:] # fill the rest of the array with target frequency wave
# # phi_paths[i,j, end:] = phi_paths[i,j, end-1]*t[end:] # fill the rest of the array with same value
# return phi_paths
# sig_mat = wfm.amplitude * np.sin(np.outer(wfm.omega,t) + wfm.phi[:, np.newaxis]) # shape=(number of tweezers x sample_len)
# self.debug["mat1"] = np.zeros((n, n, 2)) # for phase debugging
# for diagnostic purposes
# path_idx[i, j] = (t_tot, end)
# phi_const[i,j, :half] = wfm.phi[i]
# phi_const[i,j, half:end] = phi_const[i,j,half-1] +
# print(a, t_tot)
# return phi_paths, wfm.amplitude * np.sin(np.outer(wfm.omega,t) + np.expand_dims(wfm.phi, axis=1)), path_idx
pass
def main():
# sample usage
np.random.seed(0)
cf = MEGA(80)
df = MEGA(0.2)
n = 10
# period = 1/cf
sample = 512*1000
rate = MEGA(625)
# rate = sample*cf/1e5
print("Sampling rate: ", rate/1e6)
print("center f: ", cf, "df: ", df, "total n: ", 2*n+1)
wave = Waveform(cf, df, n, rate)
static_sig = create_static_array(wave, sample)
table = create_path_table(wave)
# mat_copy = mat.copy()
# mat /= wave.amplitude
t1 = time.perf_counter()
repath = np.array([(1,0), (2,1), (3,2), (4,3), (5,4), (6,5), (7,6), (8,7), (9,8), (10,9)])
move_sig = create_moving_array(create_static_array(wave, table.shape[2]), table, repath)
print("rearrange signal generation time: ", time.perf_counter() - t1)
np.savetxt("static_signal.txt", static_sig, delimiter=',')
np.savetxt("move_signal.txt", move_sig, delimiter=',')
# np.savetxt("initial_phi.txt", wave.phi, delimiter=',')
# np.savetxt("sig_mat.txt", sig_mat, delimiter=',')