Skip to content
Snippets Groups Projects
Commit 13d07cd1 authored by xiyehu2's avatar xiyehu2
Browse files

Reduced path table generation merge

parent 31b2ca37
No related branches found
No related tags found
1 merge request!4Reduced path table generation merge
...@@ -34,7 +34,7 @@ class AWG: ...@@ -34,7 +34,7 @@ class AWG:
self.ch_amp = [0,0,0,0] # channel output amplitude self.ch_amp = [0,0,0,0] # channel output amplitude
self.mode = "" # current mode AWG is running on self.mode = "" # current mode AWG is running on
def open(self, remote=False): def open(self, remote=False, id=b'/dev/spcm0'):
""" """
opens and initializes instance variables opens and initializes instance variables
:param remote: flag to determine remote connection, default is False :param remote: flag to determine remote connection, default is False
...@@ -42,7 +42,7 @@ class AWG: ...@@ -42,7 +42,7 @@ class AWG:
if remote: if remote:
self.card = spcm_hOpen(create_string_buffer(b'TCPIP::192.168.1.10::inst0::INSTR')) self.card = spcm_hOpen(create_string_buffer(b'TCPIP::192.168.1.10::inst0::INSTR'))
else: else:
self.card = spcm_hOpen(create_string_buffer(b'/dev/spcm0')) self.card = spcm_hOpen(create_string_buffer(id))
if self.card is None: if self.card is None:
sys.stdout.write("no card found...\n") sys.stdout.write("no card found...\n")
else: else:
...@@ -54,13 +54,14 @@ class AWG: ...@@ -54,13 +54,14 @@ class AWG:
spcm_dwGetParam_i32(self.card, SPC_MIINST_MAXADCVALUE, spcm_dwGetParam_i32(self.card, SPC_MIINST_MAXADCVALUE,
byref(self.full_scale)) # full scale value for data generation purpose byref(self.full_scale)) # full scale value for data generation purpose
name = szTypeToName(self.card_type.value) name = szTypeToName(self.card_type.value)
sys.stdout.write("Found: {0} sn {1:05d}\n".format(name, self.serial_number.value)) sys.stdout.write("Card: {0} sn {1:05d}\n".format(name, self.serial_number.value))
sys.stdout.write("Sample Rate: {:.1f} MHz\n".format(self.sample_rate.value / 1000000)) sys.stdout.write("Max sample Rate: {:.1f} MHz\n".format(self.sample_rate.value / 1000000))
sys.stdout.write("Memory size: {:.0f} MBytes\n".format(self.mem_size.value / 1024 / 1024)) sys.stdout.write("Memory size: {:.0f} MBytes\n\n".format(self.mem_size.value / 1024 / 1024))
self.check_error() # self.check_error()
def close(self): def close(self):
spcm_vClose(self.card) spcm_vClose(self.card)
print("AWG is closed")
def check_error(self, message="") -> bool: def check_error(self, message="") -> bool:
""" """
...@@ -68,23 +69,21 @@ class AWG: ...@@ -68,23 +69,21 @@ class AWG:
:param message: caller defined string for debugging purposes :param message: caller defined string for debugging purposes
:return: 1 if error is found, 0 otherwise :return: 1 if error is found, 0 otherwise
""" """
flag = 0
msg = "Checking error at " + message + " ... "
if message != "":
flag = 1
sys.stdout.write(msg)
err_reg = uint32(0) err_reg = uint32(0)
err_val = int32(0) err_val = int32(0)
err_text = '' err_text = ""
err_code = spcm_dwGetErrorInfo_i32(self.card, byref(err_reg), byref(err_val), err_text) err_code = spcm_dwGetErrorInfo_i32(self.card, byref(err_reg), byref(err_val), err_text)
if err_code: if err_code:
if flag: print(
sys.stdout.write(err_text) f"{message}\n"
else: f"error code (see spcerr.py): {hex(err_code)}\n"
sys.stdout.write(msg + err_text) # f"error text: {err_text}"
f"error register: {err_reg.value}\n"
f"error val: {err_val.value}\n"
)
self.close()
exit(0)
return True return True
elif flag:
sys.stdout.write("no error\n")
return False return False
def run(self): def run(self):
...@@ -93,27 +92,28 @@ class AWG: ...@@ -93,27 +92,28 @@ class AWG:
""" """
spcm_dwSetParam_i32(self.card, SPC_M2CMD, spcm_dwSetParam_i32(self.card, SPC_M2CMD,
M2CMD_CARD_START | M2CMD_CARD_ENABLETRIGGER) M2CMD_CARD_START | M2CMD_CARD_ENABLETRIGGER)
self.check_error() # self.check_error("Checking error at run")
def stop(self): def stop(self):
""" """
stop the card, this is different from closing the card stop the card, this is different from closing the card
""" """
spcm_dwSetParam_i32(self.card, SPC_M2CMD, M2CMD_CARD_STOP) spcm_dwSetParam_i32(self.card, SPC_M2CMD, M2CMD_CARD_STOP)
self.check_error() # self.check_error("Checking error at stop")
def reset(self): def reset(self):
""" """
resets the board, this clears all onboard memory and settings resets the board, this clears all onboard memory and settings
""" """
spcm_dwSetParam_i32(self.card, SPC_M2CMD, M2CMD_CARD_RESET) spcm_dwSetParam_i32(self.card, SPC_M2CMD, M2CMD_CARD_RESET)
# self.check_error("Checking error at reset")
def force_trigger(self): def force_trigger(self):
""" """
force a trigger event, this completely mimics an actual trigger event force a trigger event, this completely mimics an actual trigger event
""" """
spcm_dwSetParam_i32(self.card, SPC_M2CMD, M2CMD_CARD_FORCETRIGGER) spcm_dwSetParam_i32(self.card, SPC_M2CMD, M2CMD_CARD_FORCETRIGGER)
self.check_error() # self.check_error("Checking error at force_trigger")
def set_sampling_rate(self, sr: int): def set_sampling_rate(self, sr: int):
""" """
...@@ -122,9 +122,10 @@ class AWG: ...@@ -122,9 +122,10 @@ class AWG:
""" """
self.sample_rate = int64(sr) self.sample_rate = int64(sr)
spcm_dwSetParam_i64(self.card, SPC_SAMPLERATE, sr) spcm_dwSetParam_i64(self.card, SPC_SAMPLERATE, sr)
self.check_error() # self.check_error("Checking error at set_sampling_rate")
print(f"Setting sampling rate to {self.sample_rate.value / 1e6} MHz")
def enable_channel(self, ch: int, amplitude: int=1000, stoplvl: str="ZERO"): def toggle_channel(self, ch: int, amplitude: int=1000, stoplvl: str="ZERO"):
""" """
enable/disable individual channel and set its parameters enable/disable individual channel and set its parameters
:param ch: enables channel 0-3. :param ch: enables channel 0-3.
...@@ -142,37 +143,12 @@ class AWG: ...@@ -142,37 +143,12 @@ class AWG:
spcm_dwSetParam_i64(self.card, SPC_CHENABLE, int64(2**ch)) # see CHANNEL0-3 in regs.py for detail spcm_dwSetParam_i64(self.card, SPC_CHENABLE, int64(2**ch)) # see CHANNEL0-3 in regs.py for detail
spcm_dwSetParam_i32(self.card, SPC_ENABLEOUT0 + ch * (SPC_ENABLEOUT1 - SPC_ENABLEOUT0), 1) spcm_dwSetParam_i32(self.card, SPC_ENABLEOUT0 + ch * (SPC_ENABLEOUT1 - SPC_ENABLEOUT0), 1)
spcm_dwSetParam_i32(self.card, SPC_AMP0 + ch * (SPC_AMP1 - SPC_AMP0), amplitude) spcm_dwSetParam_i32(self.card, SPC_AMP0 + ch * (SPC_AMP1 - SPC_AMP0), amplitude)
spcm_dwSetParam_i32(self.card, SPC_CH0_STOPLEVEL + ch * (SPC_CH1_STOPLEVEL - SPC_CH0_STOPLEVEL), spcm_dwSetParam_i32(self.card, SPC_CH0_STOPLEVEL + ch * (SPC_CH1_STOPLEVEL - SPC_CH0_STOPLEVEL), stopmask[stoplvl])
stopmask[stoplvl])
else: else:
self.channel[ch] = 0 self.channel[ch] = 0
spcm_dwSetParam_i32(self.card, SPC_ENABLEOUT0 + ch * (SPC_ENABLEOUT1 - SPC_ENABLEOUT0), 0) spcm_dwSetParam_i32(self.card, SPC_ENABLEOUT0 + ch * (SPC_ENABLEOUT1 - SPC_ENABLEOUT0), 0)
self.check_error() # self.check_error("Checking error at enable_channel")
print("Channels enabled: ", self.channel)
# Deprecated
# def set_channel(self, ch: list, amplitude: list=(1000,1000,1000,1000), stoplvl: list=(16,16,16,16)):
# """
# set and enable channel outputs, max number of channel is 4
# :param ch: enables channel 0-3.
# Example: [0,1,0,1] enables channel 1 and 3.
# :param amplitude: sets output amplitude of each channel between 80-2500mV, default level is 1000mV.
# Example: [80,1000,0,0] sets output level for channel 0 and 1 to be 80 and 1000 mV.
# :param stoplvl: sets channels pause behavior, ZERO = 16, LOW = 2, HIGH = 4, HOLDLAST = 8.
# Example: [16, 2, 8, 8]
# """
# self.channel = ch
# mask = (CHANNEL0 & 1 * ch[0]) | \
# (CHANNEL1 & 2 * ch[1]) | \
# (CHANNEL2 & 4 * ch[2]) | \
# (CHANNEL3 & 8 * ch[3]) # create channel mask
# spcm_dwSetParam_i64(self.card, SPC_CHENABLE, int64(mask)) # activate channels
# for i in range(4):
# # enable channel outputs, set output amplitudes and pause behavior
# if ch[i] == 0: continue
# spcm_dwSetParam_i32(self.card, SPC_ENABLEOUT0 + i * (SPC_ENABLEOUT1 - SPC_ENABLEOUT0), 1)
# spcm_dwSetParam_i32(self.card, SPC_AMP0 + i * (SPC_AMP1 - SPC_AMP0), amplitude[i])
# spcm_dwSetParam_i32(self.card, SPC_CH0_STOPLEVEL + i * (SPC_CH1_STOPLEVEL - SPC_CH0_STOPLEVEL), stoplvl[i])
# self.check_error()
def set_trigger(self, **kwargs): def set_trigger(self, **kwargs):
""" """
...@@ -188,19 +164,19 @@ class AWG: ...@@ -188,19 +164,19 @@ class AWG:
if key == "EXT1": if key == "EXT1":
spcm_dwSetParam_i32(self.card, SPC_TRIG_ORMASK, SPC_TMASK_EXT1) # using external channel 1 as trigger spcm_dwSetParam_i32(self.card, SPC_TRIG_ORMASK, SPC_TMASK_EXT1) # using external channel 1 as trigger
spcm_dwSetParam_i32(self.card, SPC_TRIG_EXT1_MODE, value) spcm_dwSetParam_i32(self.card, SPC_TRIG_EXT1_MODE, value)
self.check_error() # self.check_error("Checking error at set_trigger")
def get_aligned_array(self, size): def get_aligned_buf(self, size):
""" """
returns a numpy array at a page-aligned memory location returns a numpy array at a page-aligned memory location
:param size: number of samples used for data calculation :param size: number of samples used for data calculation
:return: numpy array starting at the correct location :return:
""" """
data_length_bytes = uint32(size * 2 * np.sum(self.channel)) data_length_bytes = int(size * 2 * np.sum(self.channel))
buffer = pvAllocMemPageAligned(data_length_bytes) # buffer now holds a page-aligned location buffer = pvAllocMemPageAligned(data_length_bytes) # buffer now holds a page-aligned location
buffer_data = cast(addressof(buffer), ptr16) # cast it to int16 array buffer_data = cast(addressof(buffer), ptr16) # cast it to int16 array
array = np.frombuffer(buffer_data, dtype=int16) # array = np.frombuffer(buffer_data, dtype=int16)
return array return buffer, buffer_data
def set_sequence_mode(self, nseg: int): def set_sequence_mode(self, nseg: int):
""" """
...@@ -208,35 +184,42 @@ class AWG: ...@@ -208,35 +184,42 @@ class AWG:
:param nseg: number of segments the memory is divided into, must be powers of 2. :param nseg: number of segments the memory is divided into, must be powers of 2.
""" """
self.mode = "Sequence Replay" self.mode = "Sequence Replay"
spcm_dwSetParam_i32(self.card, SPC_CARDMODE, SPC_REP_STD_SEQUENCE) # Sequence replay mode spcm_dwSetParam_i32(self.card, SPC_CARDMODE, SPC_REP_STD_SEQUENCE) # Sequence replay mode
spcm_dwSetParam_i32(self.card, SPC_SEQMODE_MAXSEGMENTS,nseg) # set number of sequences the memory is divided into spcm_dwSetParam_i32(self.card, SPC_SEQMODE_MAXSEGMENTS, nseg) # set number of sequences the memory is divided into
self.check_error() spcm_dwSetParam_i32(self.card, SPC_SEQMODE_STARTSTEP, 0) # set starting step to be 0
# self.check_error("Checking error at set_sequence_mode")
def write_segment(self, data: np.ndarray, segment: int): def write_segment(self, data: np.ndarray, segment: int):
""" """
write data onto a specified segment. write data onto a specified segment in sequence replay mode
:param data: numpy array containing waveform data :param data: numpy array containing waveform data
:param segment: the segment to write on :param segment: the segment to write on
:return: :return:
""" """
if self.mode != "Sequence Replay": if self.mode != "Sequence Replay":
print("Wrong method, current mode is: " + self.mode) print("Wrong method, current mode is: " + self.mode)
return return
nch = np.sum(self.channel) # number of activated channels nch = np.sum(self.channel) # number of activated channels
if data.dtype != int: # if data.dtype != int:
sys.stdout.write("data must be in int type\n") # sys.stdout.write("data must be in int type\n")
return # return
if data.size > self.mem_size.value / np.sum(self.channel) / SPC_SEQMODE_MAXSEGMENTS: if data.size > self.mem_size.value / np.sum(self.channel) / 2:
sys.stdout.write("data is big") sys.stdout.write("data is too big")
return return
spcm_dwSetParam_i32(self.card, SPC_SEQMODE_WRITESEGMENT, segment) # set current segment to write on spcm_dwSetParam_i32(self.card, SPC_SEQMODE_WRITESEGMENT, segment) # set current segment to write on
spcm_dwSetParam_i32(self.card, SPC_SEQMODE_SEGMENTSIZE, data.size) # set size of segment in unit of samples spcm_dwSetParam_i32(self.card, SPC_SEQMODE_SEGMENTSIZE, data.size) # set size of segment in unit of samples
# data transfer # data transfer
# ptr = data.ctypes.data_as(POINTER(int16)) sample_len = data.size
buflength = data.size * 2 * nch # samples * (2 bytes/sample) * number of activated channels buflength = uint32(sample_len * 2 * nch) # samples * (2 bytes/sample) * number of activated channels
spcm_dwDefTransfer_i64(self.card, SPCM_BUF_DATA, SPCM_DIR_PCTOCARD, int32(0), data, int64(0), buflength) data_ptr = data.ctypes.data_as(ptr16) # cast data array into a c-like array
buffer = pvAllocMemPageAligned(sample_len * 2) # buffer now holds a page-aligned location
buffer_data = cast(addressof(buffer), ptr16) # cast it to int16 array
memmove(buffer_data, data_ptr, sample_len * 2) # moving data into the page-aligned block
spcm_dwDefTransfer_i64(self.card, SPCM_BUF_DATA, SPCM_DIR_PCTOCARD, int32(0), buffer, int64(0), buflength)
spcm_dwSetParam_i32(self.card, SPC_M2CMD, M2CMD_DATA_STARTDMA | M2CMD_DATA_WAITDMA) spcm_dwSetParam_i32(self.card, SPC_M2CMD, M2CMD_DATA_STARTDMA | M2CMD_DATA_WAITDMA)
self.check_error() # self.check_error("Checking error at write_segment")
def configure_step(self, step: int, segment: int, nextstep: int, loop: int, condition: int): def configure_step(self, step: int, segment: int, nextstep: int, loop: int, condition: int):
""" """
...@@ -253,5 +236,5 @@ class AWG: ...@@ -253,5 +236,5 @@ class AWG:
return return
mask = (condition << 32) | (loop << 32) | (nextstep << 16) | segment # 64-bit mask mask = (condition << 32) | (loop << 32) | (nextstep << 16) | segment # 64-bit mask
spcm_dwSetParam_i64(self.card, SPC_SEQMODE_STEPMEM0 + step, mask) spcm_dwSetParam_i64(self.card, SPC_SEQMODE_STEPMEM0 + step, mask)
self.check_error() # self.check_error("Checking error at configure_step")
import numpy as np
from waveform import *
from cupyx.profiler import benchmark
import cupy as cp
def test(f_idx, t_idx):
paths, off = get_rearrange_paths(f_idx, t_idx)
create_moving_array_reduced(_sig, _table_cp, paths, off)
def count(paths, path_table, off):
counter = 0
for i, j in paths:
if i == j:
continue
if (i, j) in path_table:
counter += 1
# else:
# print((i, j), "not in path table")
for i in off:
counter += 1
return counter
data = np.load("data/table-half_31.npz", allow_pickle=True)
table = data['path_table'].item()
twzr = data['wfm'].item()
static_sig = data['static_sig']
target = data['target']
t_idx = np.nonzero(target)[0]
nt = twzr.omega.size
_table_cp = {}
for key in table:
_table_cp[key] = cp.array(table[key])
n_repeat = 500
_sig = cp.array(static_sig)
times = np.zeros((nt+1,2))
filling_ratio = np.zeros((nt+1,2))
n_move = np.zeros((nt+1,n_repeat))
for i in range(nt+1):
f_prob = i/nt
calc_t = np.zeros(n_repeat)
ratio = np.zeros(n_repeat)
nm = np.zeros(n_repeat)
print(i, f_prob)
for j in range(n_repeat):
filled = np.random.rand(nt)
tr = filled < f_prob
fa = filled >= f_prob
filled[tr] = 1
filled[fa] = 0
f_idx = np.nonzero(filled)[0]
b = benchmark(
test,
(f_idx, t_idx),
n_repeat=1
)
# stuff to save
ratio[j] = f_idx.size / nt
calc_t[j] = b.gpu_times + b.cpu_times
paths, off = get_rearrange_paths(f_idx, t_idx)
nm[j] = count(paths, _table_cp, off)
# n_move[i,0] = np.mean(nm)
# n_move[i,1] = np.var(nm)
n_move[i] = nm
times[i,0] = np.mean(calc_t)
times[i,1] = np.var(calc_t)
filling_ratio[i,0] = np.mean(ratio)
filling_ratio[i,1] = np.var(ratio)
np.savez(
f"data/reduced-benchmark_{nt}-half.npz",
wfm=twzr,
target=target,
filling_ratio=filling_ratio,
times=times,
n_move=n_move
)
from AWG import *
import code
import readline
import rlcompleter
def trigger():
global awg
awg.force_trigger()
print("forcing a trigger...")
def stop():
global awg
awg.stop()
print("stopping awg output...")
def start():
global awg
awg.run()
awg.force_trigger()
print("starting awg output...")
def close():
global awg
awg.close()
print("closing awg and quitting...")
exit(0)
# load waveform data
wfm_data = np.load("data/single.npz", allow_pickle=True)
static_sig = wfm_data['signal']
empty = np.zeros(static_sig.shape)
wfm = wfm_data['wfm'].item()
sampling_rate = wfm.sample_rate
# AWG stuff
awg = AWG()
awg.open(id=b'/dev/spcm1') # change this to b'/dev/spcm0' for top AWG
awg.set_sampling_rate(sampling_rate)
awg.toggle_channel(0, amplitude=2500)
awg.set_trigger(EXT0=SPC_TM_POS)
awg.set_sequence_mode(2)
awg.write_segment(static_sig, segment=0)
awg.write_segment(empty, segment=1)
awg.configure_step(step=0, segment=0, nextstep=1, loop=1, condition=SPCSEQ_ENDLOOPONTRIG)
awg.configure_step(step=1, segment=1, nextstep=0, loop=1, condition=SPCSEQ_ENDLOOPONTRIG)
# console
vars = globals()
vars.update(locals())
readline.set_completer(rlcompleter.Completer(vars).complete)
readline.parse_and_bind("tab: complete")
code.InteractiveConsole(vars).interact()
from waveform import *
import os
cf = MEGA(105) # center frequency
df = MEGA(2.5) # delta frequency
n = 2 # number of tones to generate in either direction of cf, total number of tweezer is 2*n+1
nt = 2*n+1 # total number of tweezers
# sampling_rate and sample_len must follow some relationship explained in the wiki page.
sampling_rate = int(614.4e6) # sampling rate
sample_len = 512 * 600 # sample_len
# amplitude uniformity adjustments, just make sure its called amps at the end
scale = 2**11
ampMax = scale/np.sqrt(nt)
amps = ampMax * np.ones(nt)
corr = np.array([1.53312825, 1.35073669, 1.252971, 1.06263741, 1.])
amps *= np.sqrt(corr)
# phase adjustments
# phase = np.load("array_phase.npz")['phase'][:nt] # phase table from Caltech
twzr = Waveform(cf, df, n, sampling_rate)
twzr.amplitude = amps
# twzr.phi = phase
static_sig = create_static_array(twzr, sample_len)
empty = np.zeros(sample_len)
path = "data"
if not os.path.exists(path): os.mkdir(path)
fname = os.path.join(path, "waveform_data.npz")
np.savez(fname, signal=static_sig, wfm=twzr)
from AWG import *
from waveform import * from waveform import *
sampling_rate = MEGA(625) np.random.seed(0)
# parameters to play around
# ----------------------------------------------------------------------------
cf = MEGA(105) # center frequency
df = MEGA(2.5) # delta frequency
n = 2 # number of tones to generate in either direction of cf, total number of tweezer is 2*n+1
nt = 2*n+1 # total number of tweezers
# sampling_rate and sample_len must follow some relationship explained in the wiki page.
sampling_rate = int(614.4e6) # sampling rate
sample_len = 512 * 600 # sample_len
# amplitude uniformity adjustments, just make sure its called amps at the end
scale = 2**11
ampMax = scale/np.sqrt(nt)
amps = ampMax * np.ones(nt)
corr = np.array([1.53312825, 1.35073669, 1.252971, 1.06263741, 1.])
amps *= np.sqrt(corr)
# phase adjustments
phase = np.load("array_phase.npz")['phase'][:nt] # phase table from Caltech
# ----------------------------------------------------------------------------
# MAGIC DO NOT CHANGE. one day i will understand, maybe
# ----------------------------------------------------------------------------
pvAllocMemPageAligned(sample_len * 2)
# ----------------------------------------------------------------------------
# setting up awg sequence, explained in wiki
# ----------------------------------------------------------------------------
awg = AWG() awg = AWG()
awg.open() awg.open(id=b'/dev/spcm1') # change this to b'/dev/spcm0' for top AWG
awg.set_sampling_rate(sampling_rate) awg.set_sampling_rate(sampling_rate)
awg.enable_channel(0) awg.toggle_channel(0, amplitude=2500)
awg.set_trigger(EXT0=SPC_TM_POS) awg.set_trigger(EXT0=SPC_TM_POS)
awg.set_sequence_mode(2) awg.set_sequence_mode(2)
cf = MEGA(105) twzr = Waveform(cf, df, n, sampling_rate)
df = MEGA(2.5) twzr.amplitude = amps
n = 2 twzr.phi = phase
# period = 1/cf static_sig = create_static_array(twzr, sample_len)
sample_len = 512 * 100 empty = np.zeros(sample_len)
wave = Waveform(cf, df, n, sampling_rate)
static_sig = create_static_array(wave, sample_len)
aligned_arr = awg.get_aligned_array(static_sig.size) awg.write_segment(static_sig, segment=0)
adr = aligned_arr.ctypes.data awg.write_segment(empty, segment=1)
aligned_arr[:] = static_sig.ctypes.data_as(POINTER(int16))
adr = aligned_arr.ctypes.data
awg.write_segment(aligned_arr, segment=0)
awg.configure_step(step=0, segment=0, nextstep=1, loop=1, condition=SPCSEQ_ENDLOOPONTRIG) awg.configure_step(step=0, segment=0, nextstep=1, loop=1, condition=SPCSEQ_ENDLOOPONTRIG)
awg.configure_step(step=1, segment=1, nextstep=0, loop=1, condition=SPCSEQ_ENDLOOPONTRIG) awg.configure_step(step=1, segment=1, nextstep=0, loop=1, condition=SPCSEQ_ENDLOOPONTRIG)
# ----------------------------------------------------------------------------
awg.force_trigger() # running the awg
# ----------------------------------------------------------------------------
awg.run()
awg.force_trigger() # this is equivalement to an actual hardware trigger
print("Outputing...")
while True: while True:
command = input("enter s to stop, e to output nothing: ") command = input("enter c to stop, s to switch sequence step: ")
if command == 's': if command == 'c':
awg.stop() awg.stop()
print("stopping s=card") print("stopping")
break break
elif command == 'e': elif command == 's':
awg.force_trigger() awg.force_trigger()
print("switching to empty output") print("switching sequence step")
awg.stop()
# ----------------------------------------------------------------------------
awg.close() awg.close()
from typing import Dict, Tuple, Any
from scipy.interpolate import interp1d
from AWG import * from AWG import *
import time
class Waveform: class Waveform:
def __init__(self, cf: int, df: int, n: int, sample_rate: int): def __init__(self, cf: int, df: int, n: int, sample_rate: int, freq_res):
""" """
helper class to store basic waveform information. helper class to store basic waveform information.
:param cf: center frequency tone of tweezer array. :param cf: center frequency tone of tweezer array.
...@@ -12,37 +13,44 @@ class Waveform: ...@@ -12,37 +13,44 @@ class Waveform:
:param sample_rate: sampling rate of the AWG to generate correct number of samples. :param sample_rate: sampling rate of the AWG to generate correct number of samples.
""" """
# define some useful numbers # define some useful numbers
scale = 2**11 # Mingkun uses 2^11, caltech uses 2^15, maybe this is scaling up a float to int? scale = 2 ** 12 # sets the amplitude scale, max must not exceed 2**15-1
num_tz = 2*n + 1 # total number of tweezers to be generated num_tz = 2 * n + 1 # total number of tweezers to be generated
max_amp = scale / np.sqrt(num_tz) # again, saw this from multiple sources, not sure why max_amp = scale / np.sqrt(num_tz) # scale down by number of tweezers
# self.amplitude = max_amp * np.ones(num_tz) self.amplitude = max_amp * np.ones(num_tz)
self.amplitude: np.ndarray = max_amp # uniform amplitude for now, this will eventually be tweaked finely with experiments # self.amplitude: np.ndarray = max_amp # uniform amplitude
self.omega: np.ndarray = 2*np.pi * np.linspace(cf - n*df, cf + n*df, num_tz) # frequency tones self.omega = 2 * np.pi * np.linspace(cf - n * df, cf + n * df, num_tz) # frequency tones
self.phi: np.ndarray = 2*np.pi * np.random.rand(num_tz) # random initial phases from 0-2pi, also will be tweaked finely with experiments self.phi = 2 * np.pi * np.random.rand(num_tz) # random initial phases from 0-2pi
self.sample_rate: int = sample_rate self.sample_rate: int = sample_rate
# self.debug = { self.freq_res = freq_res
# "mat1": 0, # formula for minimum sample length
# "mat2": 0, # sample_len_min = 512 * m
# "mat3": 0 # m * 512 * freq_resolution / sampling_rate = k, k % 2 == 0
# } self.sample_len_min = 2 * sample_rate / freq_res # !!! this only works for sample_rate = 614.4e6 !!!
def create_static_array(wfm: Waveform, sample_len: int) -> np.ndarray: def create_static_array(wfm: Waveform) -> np.ndarray:
""" """
create a static-array-generating waveform with user set number of samples create a static-array-generating waveform with user set number of samples
:param wfm: waveform object already initialized with basic parameters. :param wfm: waveform object already initialized with basic parameters.
:param sample_len: total number of samples to generate, must be multiples of 512. Note that more sample != higher resolution.
:return: returns a 1D array with static-array-generating waveform. :return: returns a 1D array with static-array-generating waveform.
""" """
# construct time axis, t_total(s) = sample_len / sample_rate, dt = t_total / sample_len # construct time axis, t_total(s) = sample_len / sample_rate, dt = t_total / sample_len
t = np.arange(sample_len) / wfm.sample_rate t = np.arange(wfm.sample_len_min) / wfm.sample_rate
# calculate individual sin waves, sig_mat[i] corresponds to data for ith tweezer # calculate individual sin waves, sig_mat[i] corresponds to data for ith tweezer
sin_mat = wfm.amplitude * np.sin(np.outer(wfm.omega,t) + np.expand_dims(wfm.phi, axis=1)) # shape=(number of tweezers x sample_len) # sin_mat = wfm.amplitude * np.sin(np.outer(wfm.omega,t) + np.expand_dims(wfm.phi, axis=1)) # shape=(number of tweezers x sample_len)
sin_mat = np.sin(
np.outer(wfm.omega, t) + np.expand_dims(wfm.phi, axis=1)
# shape=(number of tweezers x sample_len)
)
sin_mat = (wfm.amplitude * sin_mat.T).T # this works, trust me
# sum up all rows to get final signal # sum up all rows to get final signal
return np.sum(sin_mat, axis=0) sig = np.sum(sin_mat, axis=0)
if np.max(sig) >= 2 ** 15 - 1:
print("Signal amp exceeds maximum")
return sig.astype(np.int16)
def create_path_table(wfm: Waveform) -> any: def create_path_table(wfm: Waveform) -> any:
...@@ -51,24 +59,33 @@ def create_path_table(wfm: Waveform) -> any: ...@@ -51,24 +59,33 @@ def create_path_table(wfm: Waveform) -> any:
:param wfm: waveform object already initialized with basic parameters. :param wfm: waveform object already initialized with basic parameters.
:return: dim-3 ndarray :return: dim-3 ndarray
""" """
# interpolate optimal amplitudes
# data = np.load("data/optimal_amps.npz")
w = wfm.omega
a = wfm.amplitude
omega_interp = interp1d(w, a, kind='cubic')
# setup basic variables # setup basic variables
twopi = 2*np.pi twopi = 2 * np.pi
vmax = KILO(20) * MEGA(1) # convert units, 20 kHz/us -> 20e3 * 1e6 Hz/s vmax = KILO(20) * MEGA(1) # convert units, 20 kHz/us -> 20e3 * 1e6 Hz/s
dw_max = wfm.omega[-1] - wfm.omega[0] # Longest move in frequency dw_max = wfm.omega[-1] - wfm.omega[0] # Longest move in frequency
t_max = 2 * dw_max / vmax # Longest move sets the maximum moving time t_max = 2 * dw_max / vmax # Longest move sets the maximum moving time
a_max = -vmax * 2 / t_max # maximum acceleration, negative sign because of magic a_max = -vmax * 2 / t_max # maximum acceleration, negative sign because of magic
sample_len_max = int(np.ceil(t_max * 4/5 * wfm.sample_rate)) # get number of samples required for longest move, this sets the size of lookup table
sample_len_max += (512 - sample_len_max % 512) # make overall length a multiple of 512 so AWG doesn't freak out # get number of samples required for longest move,this sets the size of lookup table
sample_len = int(np.ceil(t_max * wfm.sample_rate))
# sample_len += (512 - sample_len % 512) # make overall length a multiple of 512 so AWG doesn't freak out
sample_len += wfm.sample_len_min - sample_len % wfm.sample_len_min
# now we calculate all possible trajectories, go to Group Notes/Projects/Rearrangement for detail # now we calculate all possible trajectories, go to Group Notes/Projects/Rearrangement for detail
n = len(wfm.omega) # total number of tweezers n = len(wfm.omega) # total number of tweezers
phi_paths = np.zeros((n, n, sample_len_max)) # lookup table to store all moves path_table = np.zeros((n, n, sample_len)) # lookup table to store all moves
t = np.arange(sample_len_max) / wfm.sample_rate # time series t = np.arange(sample_len) / wfm.sample_rate # time series
# iterate! I think this part can be vectorized as well... but unnecessary. # iterate! I think this part can be vectorized as well... but unnecessary.
for i, omega_i in enumerate(wfm.omega): for i, omega_i in enumerate(wfm.omega):
for j, omega_j in enumerate(wfm.omega): # j is the target position, i is starting position for j, omega_j in enumerate(wfm.omega): # j is the target position, i is starting position
if i == j: if i == j:
phi_paths[i,i] = omega_i*t + wfm.phi[i] path_table[i, i] = wfm.amplitude[i] * np.sin(omega_i * t + wfm.phi[i])
continue # skip diagonal entries continue # skip diagonal entries
dw = omega_j - omega_i # delta omega in the equation dw = omega_j - omega_i # delta omega in the equation
adw = abs(dw) adw = abs(dw)
...@@ -79,129 +96,240 @@ def create_path_table(wfm: Waveform) -> any: ...@@ -79,129 +96,240 @@ def create_path_table(wfm: Waveform) -> any:
dphi = phi_j - phi_i # delta phi in the equation dphi = phi_j - phi_i # delta phi in the equation
if dphi < 0: dphi = abs(dphi) + twopi - phi_i # warp around for negative phase shift if dphi < 0: dphi = abs(dphi) + twopi - phi_i # warp around for negative phase shift
t_tot = np.sqrt(abs(4 * dw / a_max)) # calculate minimum time to complete move t_tot = np.sqrt(abs(4 * dw / a_max)) # calculate minimum time to complete move
t_tot = ((t_tot - 6*dphi/adw) // (12*np.pi/adw) + 1) * 12*np.pi/adw # extend move time to arrive at the correct phase t_tot = ((t_tot - 6 * dphi / adw) // (
a = 4*dw/(t_tot**2) # adjust acceleration accordingly to ensure we still get to omega_j 12 * np.pi / adw) + 1) * 12 * np.pi / adw # extend move time to arrive at the correct phase
a = 4 * dw / (t_tot ** 2) # adjust acceleration accordingly to ensure we still get to omega_j
end = int(np.ceil(t_tot * wfm.sample_rate)) # convert to an index in samples end = int(np.ceil(t_tot * wfm.sample_rate)) # convert to an index in samples
half = int(end / 2) # index of sample half-way through the move where equation changes half = int(end / 2) # index of sample half-way through the move where equation changes
# if end % 2 == 0: half += 1
t1 = t[:half] # first half of the move, slicing to make life easier t1 = t[:half] # first half of the move, slicing to make life easier
t2 = t[half:end] - t_tot/2 # time series for second half of the move t2 = t[half:end] - t_tot / 2 # time series for second half of the move
# do calculation # interpolate amplitudes during the move
phi_paths[i,j, :half] = wfm.phi[i] + omega_i*t1 + a/6*t1**3 # t<=T/2 amps = np.zeros(sample_len)
phi_paths[i,j, half:end] = phi_paths[i,j,half-1] + (omega_i+a/2*(t_tot/2)**2)*t2 + a/2*t_tot/2*t2**2 - a/6*t2**3 # t>=T/2 inst_w = np.zeros(end)
phi_paths[i,j, end:] = omega_j*t[end:] + (phi_j - omega_j*t_tot) % twopi # fill the rest with parameters of target wave inst_w[0] = omega_i
inst_w[-1] = omega_j
inst_w[1:half] = omega_i + 0.5 * a * t1[1:] ** 2
inst_w[half:end - 1] = omega_i + \
a / 2 * (t_tot / 2) ** 2 + \
a * t_tot / 2 * t2[:-1] - \
a / 2 * t2[:-1] ** 2
amps[:end] = omega_interp(inst_w)
amps[end:] = wfm.amplitude[j]
# calculate sine wave
path_table[i, j, :half] = wfm.phi[i] + omega_i * t1 + a / 6 * t1 ** 3 # t<=T/2
path_table[i, j, half:end] = path_table[i, j, half - 1] + \
(omega_i + a / 2 * (t_tot / 2) ** 2) * t2 + \
a / 2 * t_tot / 2 * t2 ** 2 - \
a / 6 * t2 ** 3 # t>=T/2
path_table[i, j, end:] = omega_j * t[end:] + (phi_j - omega_j * t_tot) % twopi
path_table[i, j] = amps * np.sin(path_table[i, j])
# now compile everything into sine wave # now compile everything into sine wave
phi_paths = wfm.amplitude * np.sin(phi_paths) path_table = path_table.astype(np.int16)
return phi_paths.astype(int), np.sum(phi_paths.diagonal().T, axis=0, dtype=int) return path_table
# return path_table.astype(int), np.sum(path_table.diagonal().T, axis=0, dtype=int)
def create_moving_array(sig: np.ndarray, path_table: np.ndarray, paths: np.ndarray) -> np.ndarray: def create_path_table_reduced(
wfm: Waveform, target_idx, max_dist=np.inf
) -> Tuple[Dict[Tuple[int, int], np.ndarray], np.ndarray]:
""" """
create a rearranging signal that moves tweezers as specified by paths create a dim-3 look up table where the table[i,j] contains a sine wave to move tweezer i to tweezer j
:param target_idx: indices of target pattern
:param max_dist: maximum move distance in indices
:param wfm: waveform object already initialized with basic parameters. :param wfm: waveform object already initialized with basic parameters.
:param sin_mat: 2d array where ith entry contains static sin wave for ith tweezer. See create_static_array for detail. :return: dictionary containing rearrange paths
:param path_table: lookup table returned from create_path_table().
:param paths: 1d array filled with tuples indicating moving trajectories. Example: np.array([(1,0),(2,1)]) moves tweezer1->0,tweezer2->1
:return: 1D array with rearrangement-generating waveform.
""" """
# for i,j in paths: # interpolate optimal amplitudes
# sin_mat[i] = phi_paths[i,j] # data = np.load("data/optimal_amps.npz")
# fyi, line below is equivalent to the for loop above w = wfm.omega
# sin_mat[paths[:,0]] = path_table[paths[:,0], paths[:,1]] # copy dynamic trajectories from path_table, a = wfm.amplitude
# sin_mat[np.setdiff1d(paths[:,1], paths[:,0])] = 0 # turn off tweezers that need to be turned off, moving 3-2, 2-1, 1-0 will turn off 0. omega_interp = interp1d(w, a, kind='cubic')
# return np.sum(sin_mat, axis=0) # sum up all rows to get final signal
# return np.sum(sin_mat, axis=0), sin_mat/wfm.amplitude
sig += -np.sum(path_table[paths[:,0], paths[:,0]], axis=0) + np.sum(path_table[paths[:,0], paths[:,1]], axis=0)
# obtain all move combinations:
n = len(wfm.omega) # total number of tweezers
moves = []
dw_max = 0 # longest move, this sets the size of path_table
for i in range(n):
moves.append([])
for j in target_idx:
if i < j and True: # only allow uni-direction moves
continue
if abs(i - j) <= max_dist:
moves[i].append(j)
dw = abs(wfm.omega[j] - wfm.omega[i])
if dw_max < dw: dw_max = dw
def old_code(): # setup basic variables
twopi = 2 * np.pi
vmax = KILO(20) * MEGA(1) # convert units, 20 kHz/us -> 20e3 * 1e6 Hz/s
t_max = 2 * dw_max / vmax # Longest move sets the maximum moving time
a_max = vmax * 2 / t_max # maximum acceleration, negative sign because of magic
# get number of samples required for longest move,this sets the size of lookup table
sample_len = int(np.ceil(t_max * wfm.sample_rate))
# sample_len += (512 - sample_len % 512) # make overall length a multiple of 512 so AWG doesn't freak out
sample_len += wfm.sample_len_min - sample_len % wfm.sample_len_min
sample_len = int(sample_len)
# now we calculate all possible trajectories, go to Group Notes/Projects/Rearrangement for detail
path_table = {} # lookup table to store all moves
static_sig = np.zeros(sample_len) # for fast real-time waveform generation purposes
t = np.arange(sample_len) / wfm.sample_rate # time series
# iterate! I think this part can be vectorized as well... but unnecessary.
for i in range(n):
omega_i = wfm.omega[i]
for j in moves[i]: # j is the target position, i is starting position
omega_j = wfm.omega[j]
if i == j:
path = (
wfm.amplitude[i] * np.sin(omega_i * t + wfm.phi[i])
).astype(np.int16)
# path = omega_i * t + wfm.phi[i]
path_table[(i, i)] = path
static_sig += path
continue # skip diagonal entries
path = np.zeros(sample_len)
# I advise reading through the notes page first before going further
dw = omega_j - omega_i # delta omega in the equation
adw = abs(dw)
t_tot = np.sqrt(abs(4 * dw / a_max)) # calculate minimum time to complete move
phi_j = wfm.phi[j] % twopi # wrap around two pi
phi_i = wfm.phi[i] % twopi
dphi = (phi_j - phi_i) % twopi # delta phi in the equation
if dphi < 0: dphi = abs(dphi) + twopi - phi_i # warp around for negative phase shift
t_tot += 12 * np.pi / adw - (
(t_tot - 6 * dphi / adw) %
(12 * np.pi / adw)) # extend move time to arrive at the correct phase
a = 4 * adw / (t_tot ** 2) # adjust acceleration accordingly to ensure we still get to omega_j
end = int(np.round(t_tot * wfm.sample_rate)) # convert to an index in samples
# print(f'({i},{j}), {end}')
half = int(end / 2) + 1 # index of sample half-way through the move where equation changes
# t_tot = t[end]
t1 = t[:half] # first half of the move, slicing to make life easier
t2 = t[half:end] - t_tot / 2 # time series for second half of the move
# a = 4 * adw / (t_tot ** 2) # adjust acceleration accordingly to ensure we still get to omega_j
# interpolate amplitudes during the move
amps = np.zeros(sample_len)
inst_w = np.zeros(end)
inst_w[0] = omega_i
inst_w[-1] = omega_j
inst_w[1:half] = omega_i - 0.5 * a * t1[1:] ** 2
inst_w[half:end - 1] = omega_i - \
a / 2 * (t_tot / 2) ** 2 - \
a * t_tot / 2 * t2[:-1] + \
a / 2 * t2[:-1] ** 2
sw = omega_i
bw = omega_j
if omega_i > omega_j:
sw = omega_j
bw = omega_i
inst_w[inst_w < sw] = sw
inst_w[inst_w > bw] = bw
amps[:end] = omega_interp(inst_w)
amps[end:] = wfm.amplitude[j]
# frequency/phase diagnostic
# print(i,j)
# print(inst_w[-2] - omega_j)
# print(a*t_tot**3/24 % np.pi - dphi)
# print(end)
# print()
# calculate sine wave
path[:half] = wfm.phi[i] + omega_i * t1 - a / 6 * t1 ** 3 # t<=T/2
# ph = wfm.phi[i] + omega_i * t_tot / 2 + a / 6 * (t_tot / 2) ** 3
path[half:end] = path[half-1] + \
(omega_i - a / 2 * (t_tot / 2) ** 2) * t2 - \
a / 2 * t_tot / 2 * t2 ** 2 + \
a / 6 * t2 ** 3 # t>=T/2
path[end:] = path[end-1] + omega_j * (t[end:] - t[end-1])
path = (amps * np.sin(path)).astype(np.int16)
path_table[(i, j)] = path
for key in path_table:
if key[0] != key[1]:
path_table[key] -= path_table[(key[1], key[1])] # for fast real-time generation
# path_table[key] = path_table[key].astype(np.int16)
return path_table, static_sig.astype(np.int16)
def get_rearrange_paths(
filled_idx: np.ndarray,
target_idx: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
""" """
collection of unused code that might be useful later... Calculate rearranging paths.
:param filled_idx: indices of tweezer positions filled with atoms.
:param target_idx: indices of tweezer positions in target pattern.
:returns: 2d array containing rearranging paths. 1d array containing tweezer positions to be turned off.
""" """
# def create_phase_paths_old(wfm): n = 0
# # setup basic variables t_size = target_idx.size
# vmax = KILO(20) * MEGA(1) # 20 kHz/us -> 20 kHz * 1e6 / s f_size = filled_idx.size
# dw_max = wfm.omega[-1] - wfm.omega[0] # Longest move in frequency reserve = f_size - t_size
# t_max = 2 * dw_max / vmax # Longest move sets the maximum moving time # move_paths = []
# a = -vmax * 2 / t_max # constant acceleration, negative sign because of magic # static_paths = []
# sample_len = int(np.ceil(t_max * wfm.sample_rate)) # get number of samples required for longest move paths = []
# sample_len += (512 - sample_len % 512) # make overall length a multiple of 512 i = 0
# # t = np.zeros(padding+sample_len) j = 0
# t = np.arange(sample_len) / wfm.sample_rate # get time series while i < f_size:
# if j == t_size: break
# # generate phi for every possible move if filled_idx[i] == target_idx[j]:
# n = len(wfm.omega) # total number of tweezers # paths.append((filled_idx[i], filled_idx[i]))
# phi_paths = np.zeros((n, n, sample_len)) # map to store all moves j += 1
# for i, omega_i in enumerate(wfm.omega): i = j
# for j, omega_j in enumerate(wfm.omega): # I set j to be the target position, i to be starting position elif (reserve > 0
# if i == j: continue # skil diagonal entries and filled_idx[i] < target_idx[j]
# dw = omega_j - omega_i # delta omega and abs(filled_idx[i + 1] - target_idx[j]) < abs(filled_idx[i + 1] - target_idx[j])):
# t_tot = np.sqrt(abs(4 * dw / a)) # total time to travel dw, t_tot <= t_max i += 1
# end = int(np.ceil(t_tot * wfm.sample_rate)) # total number of samples to move dw reserve -= 1
# half = int(end / 2) # index of sample half-way through the move else:
# t1 = t[:half] # time series for first half of the move paths.append((filled_idx[i], target_idx[j]))
# t2 = t[half:end] - t_tot/2 # time series for second half of the move i += 1
# # do calculation j += 1
# phi_paths[i,j, :half] = wfm.phi[i] + omega_i*t1 + a/6*t1**3 # t<=T/2 note we are changing the ith tweezer off = []
# phi_paths[i,j, half:end] = phi_paths[i,j,half-1] + (omega_i+a/2*(t_tot/2)**2)*t2 + a/2*t_tot/2*t2**2 - a/6*t2**3 # t>=T/2 if reserve < 0:
# # phi_paths[i,j, half:end] = phi_paths[i,j,half-1] + (omega_i+a*(t_tot/2)**2)*t2 + a/2*t_tot/2*t2**2 - a/6*t2**3 # t>=T/2 for i in range(abs(reserve)):
# phi_paths[i,j, end:] = omega_j*t[end:] # fill the rest of the array with target frequency wave off.append(target_idx[-1 - i])
# # phi_paths[i,j, end:] = phi_paths[i,j, end-1]*t[end:] # fill the rest of the array with same value return np.array(paths), np.array(off)
# return phi_paths
# sig_mat = wfm.amplitude * np.sin(np.outer(wfm.omega,t) + wfm.phi[:, np.newaxis]) # shape=(number of tweezers x sample_len)
# self.debug["mat1"] = np.zeros((n, n, 2)) # for phase debugging
# for diagnostic purposes
# path_idx[i, j] = (t_tot, end)
# phi_const[i,j, :half] = wfm.phi[i]
# phi_const[i,j, half:end] = phi_const[i,j,half-1] +
# print(a, t_tot)
# return phi_paths, wfm.amplitude * np.sin(np.outer(wfm.omega,t) + np.expand_dims(wfm.phi, axis=1)), path_idx
pass
def create_moving_array(path_table: np.ndarray, paths: np.ndarray) -> np.ndarray:
"""
create a rearranging signal that moves tweezers as specified by paths
:param path_table: lookup table returned from create_path_table().
:param paths: 2d array with moving trajectories, [:,0] stores start pos, [:,1] stores end pos.
"""
return np.sum(path_table[paths[:, 0], paths[:, 1]], axis=0)
def main():
# sample usage
np.random.seed(0)
cf = MEGA(80)
df = MEGA(1)
n = 3
# period = 1/cf
sample = 512*1000
rate = MEGA(625)
# rate = sample*cf/1e5
print(f"Sampling rate: {rate/1e6}MHz")
print(f"center f: {cf/1e6}MHz\n"
f"df: {df/1e6}MHz\n"
f"total n: {2*n+1}")
wave = Waveform(cf, df, n, rate)
static_sig = create_static_array(wave, sample)
static_sig = static_sig.astype(int)
table, move_sig = create_path_table(wave)
repath = np.array([(1,0), (2,1), (3,2)])
create_moving_array(move_sig, table, repath)
np.savetxt("data/static_signal.txt", static_sig, delimiter=',')
np.savetxt("data/move_signal.txt", move_sig, delimiter=',')
# np.savetxt("initial_phi.txt", wave.phi, delimiter=',')
# np.savetxt("sig_mat.txt", sig_mat, delimiter=',')
def size_test(n):
# sample usage
np.random.seed(0)
cf = MEGA(80)
df = MEGA(1)
rate = MEGA(625)
wave = Waveform(cf, df, n, rate)
table = create_path_table(wave)
return table.nbytes
# main()
def create_moving_array_reduced(
sig: np.ndarray,
path_table: Dict,
paths: np.ndarray,
off: np.ndarray
):
"""
create a rearranging signal that moves tweezers as specified by paths.
:param sig: initially a static-array-generating waveform.
:param path_table: lookup table returned from create_path_table_reduced().
:param paths: 2d array with moving trajectories, [:,0] stores start pos, [:,1] stores end pos.
:param off: 1d array with tweezer indices that need to be set to 0.
"""
for i, j in paths:
if i == j:
continue
if (i, j) in path_table:
sig += path_table[(i, j)]
# else:
# print((i, j), "not in path table")
for i in off:
sig -= path_table[(i, i)]
pass
import scipy.signal as scisig
import numpy as np
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d
import os
from mpl_toolkits.axes_grid1 import make_axes_locatable
file = np.load("data/waveform_data.npz", allow_pickle=True)
signal = file['signal']
wfm = file['wfm'].item()
sr = wfm.sample_rate
f, t, Sxx = scisig.stft(signal, fs=sr, nperseg=256*100)
f /= 1e6
t *= 1e3
Sxx[abs(Sxx) < 0.01] = 0
fig, ax = plt.subplots()
im = ax.pcolormesh(t, f, np.abs(Sxx), shading='gouraud')
plt.title("Signal Spectrogram Frequency")
plt.ylabel('Frequency [MHz]')
plt.xlabel('Time [ms]')
plt.ylim(95,115)
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size="5%", pad=0.05)
fig.colorbar(im, cax=cax)
if True:
plt.savefig("data/Spectrogram-frequency.png", dpi=1200)
fig, ax = plt.subplots()
im = ax.pcolormesh(t, f, np.angle(Sxx), shading='gouraud')
plt.title("Signal Spectrogram Phase")
plt.ylabel('Frequency [MHz]')
plt.xlabel('Time [ms]')
plt.ylim(95,115)
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size="5%", pad=0.05)
fig.colorbar(im, cax=cax)
if True:
plt.savefig("data/Spectrogram-phase.png", dpi=1200)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment