Skip to content
Snippets Groups Projects
WaveBinFile.py 7.53 KiB
'''
Description: CoMPASS binary data parser
Author: Ming Fang
Date: 2022-08-22 19:00:26
LastEditors: Ming Fang
LastEditTime: 2023-06-07 11:12:42
'''
from pathlib import Path
import numpy as np


class WaveBinFile:
    """Paser for CoMPASS binary file.
    """
    def __init__(self, p, version=2) -> None:
        """Initialize with input file path and version number.

        Args:
            p (str | Path): Path to the binary data file saved by CoMPASS.
            version (int): Version of the CoMPASS software. Must be 1 or 2. Default is 2.

        Raises:
            ValueError: If input file is not accessible, or the version is incorrect.
        """
        self._filePath = Path(p)  # file path
        if not self._filePath.is_file():
            raise ValueError(f"{str(p)} is not accessible.")
        if version != 1 and version != 2:
            raise ValueError(f"Version number must be either 1 or 2, but got {version}.")
        self._headersize = 24  # header size in bytes
        if version == 2:
            self._headersize += 1
        self._version = version  # CoMPASS version number
        self._numberOfSamples = 0  # number of samples per pulse
        self._boardNumber = -1  # board number
        self._channelNumber = -1  # channel number
        self._getCommonHeader()
        self._fileObject = open(self._filePath, 'rb')  # data file object
        if version == 2:
            # skip first 2 bytes
            self._fileObject.read(2)
        self._pulseSize = self._headersize + 2 * self._numberOfSamples  # pulse size in bytes
        self._numberOfPulses = int(self._filePath.stat().st_size / self._pulseSize)  # total number of pulses in the file
        if version == 1:
            self._pulseType = np.dtype([('Board', np.int16),
                                       ('Channel', np.int16),
                                       ('Time Stamp', np.int64),
                                       ('Energy', np.int16),
                                       ('Energy Short', np.int16),
                                       ('Flags', np.int32),
                                       ('Number of Samples', np.int32),
                                       ('Samples', np.uint16, (self._numberOfSamples, ))])  # custom data type for one pulse
        else:
            self._pulseType = np.dtype([('Board', np.int16),
                                       ('Channel', np.int16),
                                       ('Time Stamp', np.int64),
                                       ('Energy', np.int16),
                                       ('Energy Short', np.int16),
                                       ('Flags', np.int32),
                                       ('Waveform Code', np.int8),
                                       ('Number of Samples', np.int32),
                                       ('Samples', np.uint16, (self._numberOfSamples, ))])
            # print(self.pulseType['Samples'])
        self._numberOfPulseUnread = self._numberOfPulses  # number of pulses that have not been read

    def skipNextPulse(self):
        """Skip the next pulse.
        """
        if self._numberOfPulseUnread > 0:
            self._fileObject.seek(self._pulseSize, 1)
            self._numberOfPulseUnread -= 1

    def skipNextNPulses(self, num: int):
        """Skip the next N pulses.

        Args:
            num (int): number of pulses to skip.
        """
        if self._numberOfPulseUnread > num:
            self._fileObject.seek(self._pulseSize*num, 1)
            self._numberOfPulseUnread -= num
        else:
            self._fileObject.seek(0, 2)
            self._numberOfPulseUnread = 0

    def readNextPulse(self):
        """Read the next pulse.

        Returns:
            self.pulseType: Custom data type for pulse object.
            
            None: If all pulses have been read
            
            The data members of self.PulseType can be accessed with the following string keys:
            
                "Board": np.int16, board number
                "Channel": np.int16, channel number
                "Time Stamp": np.int64, unit: ps
                "Energy": np.int16
                "Energy short": np.int16
                "Flags": np.int32
                "Waveform Code": np.int8, version 2 only
                "Number of Samples" np.int32
                "Samples": numpy array of np.uint16
        """
        if self._numberOfPulseUnread > 0:
            self._numberOfPulseUnread -= 1
            buffer = self._fileObject.read(self._pulseSize)
            return (np.frombuffer(buffer, dtype=self._pulseType))[0]
        else:
            return None

    def readNextNPulses(self, num: int):
        """Read the next N pulses.
        Note:
            Be carefule about the memory when reading a large file. 
            If the user read the whole file and there is not enough memory, program will crash.
            If that's the case, read the pulses in smaller chunks.

        Args:
            num (int): number of pulses to read.

        Returns:
            np.ndarray: Array of pulses. Number of pulses is at most `num`.
                        If reached EOF, less than `num` pulses will be returned.
            
            None: If all pulses have been read
        """
        if self._numberOfPulseUnread <= 0:
            return None
        elif self._numberOfPulseUnread > num:
            self._numberOfPulseUnread -= num
        else:
            num = self._numberOfPulseUnread
            self._numberOfPulseUnread = 0
        buffer = self._fileObject.read(self._pulseSize*num)
        return np.frombuffer(buffer, dtype=self._pulseType)
        
    def rewind(self):
        """Go to the begnning of the file.
        """
        self._fileObject.seek(0)
        self._numberOfPulseUnread = self._numberOfPulses

    @property
    def versionNumber(self):
        """The CoMPASS version number.

        Returns:
            int
        """
        return self._version
    
    @property
    def numberOfSamplesPerPulse(self):
        """The number of samples per pulse.

        Returns:
            int
        """
        return self._numberOfSamples

    @property
    def boardNumber(self):
        """Board number.

        Returns:
            int
        """
        return self._boardNumber

    @property
    def channelNumber(self):
        """Channel number.

        Returns:
            int
        """
        return self._channelNumber

    @property
    def totalNumberOfPulses(self):
        """The total number of pulses recorded on file.

        Returns:
            int
        """
        return self._numberOfPulses

    @property
    def numberOfPulsesUnread(self):
        """The number of pulses that haven't been read.

        Returns:
            int
        """
        return self._numberOfPulseUnread
    
    
    def _getCommonHeader(self):
        """Read common headers of all pulses.
        """
        with open(self._filePath, 'rb') as f:
            if self._version == 2:
                f.read(2)
            # read first header
            self._boardNumber = int.from_bytes(f.read(2), 'little')
            self._channelNumber = int.from_bytes(f.read(2), 'little')
            if self._version == 1:
                f.read(16)
            else:
                f.read(17)
            self._numberOfSamples = int.from_bytes(f.read(4), 'little')

    def __del__(self):
        self._fileObject.close()