Skip to content
Snippets Groups Projects
timebase.py 8.96 KiB
"""
Class for interacting with TimeBase DIM-3000 AOM driver. See the manual for more
details on individiual commands.

Requires package `pyserial`.
"""
from __future__ import annotations
import os
import time
import serial

LF = b"\n"

class DIM3000:
    """
    Class to drive communication with a TimeBase DIM-3000 AOM driver. All
    methods that do not have an explicit return type return `self`.
    """

    def __init__(self, addr: str, port: int):
        """
        Constructor. Initializes the object, but does not automatically connect.

        Parameters
        ----------
        addr : str
            Address of the device. Must be 'COM' or a '/dev' path.
        port : int
            Port number for the address. This is the number that goes after
            'COM' or '/dev/ttyACM'.
        """
        self.dev = None
        if addr == "COM":
            self.connection = f"{addr}{port}"
        elif addr.startswith("/dev"):
            if os.getuid() != 0:
                raise Exception("Requires root privileges")
            self.connection = f"{addr}{port}"
        else:
            raise Exception("Unrecognized address")
        self.is_connected = False

    def connect(self, timeout: float=1.0, check: bool=False):
        """
        Connect to the stored address. Disconnects and reconnects if already
        connected.

        Parameters
        ----------
        timeout : float = 1.0 (optional)
            Wait `timeout` seconds for a response from the device.
        check : bool = False (optional)
            Perform a test query after forming a connection.
        """
        if self.is_connected:
            self.close()

        try:
            self.dev = serial.Serial(
                self.connection, baudrate=19200, bytesize=8, parity="N",
                stopbits=1, timeout=timeout, writeTimeout=0)
        except serial.SerialException as err:
            raise RuntimeError(err)

        if check:
            try:
                self.info = self.ask("*IDN?")
            except Exception as err:
                raise RuntimeError("Failed connection check")

        self.is_connected = True
        return self

    def close(self):
        """
        Close any active connection. Can be reconnected later.
        """
        if self.is_connected:
            self.dev.close()
            self.dev = None
            self.is_connected = False
        return self

    def disconnect(self):
        """
        Alias for `self.close`.
        """
        return self.close()

    def _check(self):
        if not self.is_connected:
            raise Exception("Device is not connected")

    def get_timeout(self) -> float:
        """
        Get the current connection timeout in seconds.
        """
        return self.dev.timeout

    def set_timeout(self, val: float):
        """
        Change the timeout to the given value in seconds.
        """
        self._check()
        self.dev.timeout = val
        return self

    def send_raw(self, cmd: str):
        """
        Send `cmd` to the device unmodified.
        """
        self._check()
        self.dev.write(cmd)
        return self

    def send(self, cmd: str):
        """
        Send `cmd` to the device, appending newline if not already present.
        """
        _cmd = cmd
        if hasattr(_cmd, "encode"):
            _cmd = _cmd.encode()
        else:
            _cmd = b''.join(_cmd)
        if not _cmd.endswith(LF):
            _cmd += LF
        self.send_raw(_cmd)
        return self

    def receive_raw(self) -> bytes:
        """
        Receive exactly `size` bytes from the device.
        """
        self._check()
        return self.dev.readline()

    def receive(self) -> str:
        return self.receive_raw().decode()

    def ask(self, cmd: str) -> str:
        """
        Send followed by receive, returning the response as a str.
        """
        self._check()
        self.send(cmd)
        response = self.receive().strip()
        return response

    def get_info(self) -> str:
        """
        Report information about the unit.
        """
        return self.ask("*IDN?")

    def set_output(self, onoff: bool):
        """
        Enable or disable RF output.
        """
        self.send("OUT_on" if onoff else "OUT_off")
        return self

    def set_pulse_mode(self, onoff: bool):
        """
        Enable or disable pulse mode.
        """
        self.send("RFp_on" if onoff else "RFp_off")
        return self

    def set_pulse_freq(self, val: float):
        """
        Set the pulse frequency in Hz, [20, 1000].
        """
        if not (val >= 20.0 and val <= 1000.0):
            raise Exception("Pulse frequency must be between 20 and 1000 Hz")
        self.send(f"RFpfr:{val:.0f}")
        return self

    def set_pulse_duty(self, val: float):
        """
        Set the pulse duty cycle in percent, [1, 99].
        """
        if not (val >= 1.0 and val <= 99.0):
            raise Exception("Pulse duty cycle must be between 1 and 99 percent")
        self.send(f"RFpdt:{val:.0f}")
        return self

    def get_amplitude(self) -> float:
        """
        Get output amplitude in dBm.
        """
        resp = self.ask("AMP?")
        return float(resp) / 10.0

    def set_amplitude(self, val: float) -> float:
        """
        Set output amplitude in dBm, [14, 34].
        """
        if not (val >= 14.0 and val <= 34.0):
            raise Exception("Amplitude must be between 14 and 34 dBm")
        self.send(f"AMP:{val * 10:.0f}")
        return self

    def get_frequency(self) -> float:
        """
        Get the output frequency in MHz.
        """
        resp = self.ask("FRQ?")
        return float(resp) / 1e6

    def set_frequency(self, val: float):
        """
        Set the output frequency in MHz, [10, 400].
        """
        if not (val >= 10.0 and val <= 400.0):
            raise Exception("Frequency must be between 10 and 400 MHz")
        self.send(f"FRQ:{val * 1e6:.0f}")
        return self

    def get_frequency_step(self) -> float:
        """
        Get the frequency step size in MHz.
        """
        resp = self.ask("FRQs?")
        return float(resp) / 1e6

    def set_frequency_step(self, val: float):
        """
        Set the frequency step size in MHz, [1e-6, 10].
        """
        if not (val >= 1e-6 and val <= 10.0):
            raise Exception(
                "Frequency step size must be between 1e-6 and 10 MHz")
        self.send(f"FRQs:{val * 1e6:.0f}")
        return self

    def frequency_inc(self):
        """
        Increment the frequency by one step size.
        """
        self.send("FRQi")
        return self

    def frequency_dec(self):
        """
        Decrement the frequency by one step size.
        """
        self.send("FRQd")
        return self

    def set_frequency_mod(self, onoff: bool):
        """
        Enable or disable frequency modulation mode.
        """
        self.send("FM_on" if onoff else "FM_off")
        return self

    def set_frequency_mod_dev(self, val: int):
        """
        Set the frequency modulation deviation. Inputs are integers 0 through 15
        corresponding to frequency values following
            `val` * (800 Hz)^(`val`)
        """
        if val not in range(0, 16):
            raise Exception(
                "Frequency deviation number must be between 0 and 15")
        self.send(f"FMdev:{val:.0f}")
        return self

    def set_sweep_mode(self, val: int):
        """
        Set the sweep mode. Inputs are integers 0 through 4 corresponding to
            0 = sweep off
            1 = triangle wave with internal trigger
            2 = triangle wave with external trigger
            3 = sawtooth wave with internal trigger
            4 = sawtooth wave with external trigger
        """
        if val not in range(0, 5):
            raise Exception("Sweep mode number must be between 0 and 4")
        self.send(f"SSWPm:{val:.0f}")
        return self

    def set_sweep_range(self, f0: float, f1: float):
        """
        Set the beginning and ending frequencies of the sweep in MHz, [10, 400].
        """
        if not (f0 >= 10.0 and f0 <= 400.0) or not (f1 >= 10.0 and f1 <= 400.0):
            raise Exception("Frequencies must be between 10 and 400 MHz")
        self.send(f"SSWPs:{f0 * 1e6:.0f}")
        self.send(f"SSWPp:{f1 * 1e6:.0f}")
        return self

    def set_sweep_step(self, val: float):
        """
        Set the frequency sweep step size in MHz, [10, 400].
        """
        # if not (val >= 10.0 and val <= 400.0):
        #     raise Exception("Sweep step size must be between 10 and 400 MHz")
        self.send(f"SSWPf:{val * 1e6:.0f}")
        return self

    def set_sweep_step_time(self, val: float):
        """
        Set the frequency sweep step time in us, [4e-3, 262]. Values will be
        rounded down to the nearest multiple of 4 ns.
        """
        if not (val >= 4e-3 and val <= 262.0):
            raise Exception ("Sweep step time must be between 4e-3 and 262 us")
        self.send(f"SSWPt:{val * 1e3:.0f}")
        return self