Source code for gdt.missions.cgro.batse.phaii

# CONTAINS TECHNICAL DATA/COMPUTER SOFTWARE DELIVERED TO THE U.S. GOVERNMENT 
# WITH UNLIMITED RIGHTS
#
# Grant No.: 80NSSC21K0651
# Grantee Name: Universities Space Research Association
# Grantee Address: 425 3rd Street SW, Suite 950, Washington DC 20024
#
# Copyright 2024 by Universities Space Research Association (USRA). All rights 
# reserved.
#
# Developed by: Adam Goldstein
#               Universities Space Research Association
#               Science and Technology Institute
#               https://sti.usra.edu
#
# This work is a derivative of the Gamma-ray Data Tools (GDT), including the 
# Core and Fermi packages, originally developed by the following:
#
#     William Cleveland and Adam Goldstein
#     Universities Space Research Association
#     Science and Technology Institute
#     https://sti.usra.edu
#     
#     Daniel Kocevski
#     National Aeronautics and Space Administration (NASA)
#     Marshall Space Flight Center
#     Astrophysics Branch (ST-12)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not 
# use this file except in compliance with the License. You may obtain a copy of 
# the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software 
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 
# License for the specific language governing permissions and limitations under 
# the License.

import os
import numpy as np
import astropy.io.fits as fits
from astropy.coordinates import SkyCoord
import astropy.coordinates.representation as r

from gdt.core.coords import SpacecraftAxes
from gdt.core.coords.spacecraft import SpacecraftFrameModelMixin
from gdt.core.file import FitsFileContextManager
from gdt.core.phaii import Phaii
from gdt.core.data_primitives import Ebounds, Gti, TimeEnergyBins, TimeBins
from ..frame import *
from ..time import Time, from_day_time, to_day_time
from .detectors import BatseDetectors
from .headers import *


__all__ = ['BatsePhaii', 'BatsePhaiiMulti', 'BatsePhaiiCont', 
           'BatsePhaiiDiscla', 'BatsePhaiiTrigger', 'BatseEnergyCalib']

class BatseTimeEnergyBins(TimeEnergyBins):
    """Sub-class from gdt-core to add a tolerance in the calculating segments.
    Eventually this should be addressed in gdt-core.
    """
    def integrate_energy(self, emin=None, emax=None):
        temp = super().integrate_energy(emin=emin, emax=emax)
        return BatseTimeBins(temp.counts, temp.lo_edges, temp.hi_edges, 
                             temp.exposure)
        
    def _calculate_good_segments(self, lo_edges, hi_edges, tol=1e-4):
        """Calculates the ranges of data that are contiguous segments
        
        Args:
            lo_edges (np.array): The lower bin edges
            hi_edges (np.array): The upper bin edges
            tol (float, optional): A tolerance on matching bin edges. 
                                   Default is 1e-4 (0.1 ms)
        
        Returns:           
            ([(float, float), ...])
        """
        mask = np.abs(lo_edges[1:] - hi_edges[:-1]) > tol
        if mask.sum() == 0:
            return [(lo_edges[0], hi_edges[-1])]
        edges = np.concatenate(([lo_edges[0]], hi_edges[:-1][mask],
                                lo_edges[1:][mask], [hi_edges[-1]]))
        edges.sort()
        return edges.reshape(-1, 2).tolist()


class BatseTimeBins(TimeBins):
    """Sub-class from gdt-core to add a tolerance in the calculating segments.
    Eventually this should be addressed in gdt-core.
    """    
    def _calculate_good_segments(self, tol=1e-4):
        """Calculates the ranges of data that are contiguous segments
        
        Args:
            tol (float, optional): A tolerance on matching bin edges. Default is
                                   1e-4 (0.1 ms).

        Returns:
            ([(float, float), ...])
        """
        mask = np.abs(self.lo_edges[1:] - self.hi_edges[:-1]) > tol
        if mask.sum() == 0:
            return [self.range]
        times = np.concatenate(([self.lo_edges[0]], self.hi_edges[:-1][mask],
                                self.lo_edges[1:][mask], [self.hi_edges[-1]]))
        times.sort()
        return times.reshape(-1, 2).tolist()


[docs]class BatsePhaii(Phaii): """Class representing BATSE PHAII data. This class reads either trigger files and returns a :class:`BatsePhaiiTrigger` object or a continuous data file containing data from multiple detectors and returns a :class:`BatsePhaiiMulti` object. """ def __init__(self): super().__init__() self._ecalib = None self._detector = None @property def detector(self): """(:class:`BatseDetector`) or list: The BATSE detector(s)""" return self._detector @property def ecalib(self): """(:class:`BatseEnergyCalib`): Energy calibration data""" return self._ecalib
[docs] @classmethod def from_data(cls, data, gti=None, trigger_time=None, filename=None, headers=None, ecalib=None, detector=None): """Create a BATSE PHAII object from data. Args: data (:class:`~.data_primitives.TimeEnergyBins`): The PHAII data gti (:class:`~.data_primitives.Gti`, optional): The Good Time Intervals object. If omitted, the GTI spans (tstart, tstop) trigger_time (float, optional): The trigger time, if applicable. If provided, the data times will be shifted relative to the trigger time. filename (str, optional): The name of the file headers (:class:`~.headers.FileHeaders`): The file headers ecalib (:class:`BatseEnergyCalib`): The detector calibration detector (:class:`BatseDetectors` or list): The BATSE detector(s) Returns: (:class:`BatsePhaii`) """ obj = super().from_data(data, gti=gti, trigger_time=trigger_time, filename=filename, headers=headers) obj._ecalib = ecalib obj._detector = detector return obj
[docs] @classmethod def open(cls, file_path, **kwargs): """Open a BATSE PHAII FITS file and return either a :class:`BatsePhaiiTrigger` or :class:`BatsePhaiiMulti` object depending on the type of file. Args: file_path (str): The file path of the FITS file Returns: (:class:`BatsePhaiiTrigger` or :class:`BatsePhaiiMulti`) """ fname = os.path.basename(file_path) if 'bfits' in fname: return BatsePhaiiTrigger.open(file_path, **kwargs) else: return BatsePhaiiMulti.open(file_path, **kwargs)
[docs] def rebin_energy(self, method, *args, energy_range=(None, None)): """Rebin the PHAII in energy given a rebinning method. Produces a new PHAII object. Args: method (<function>): The rebinning function *args: Arguments to be passed to the rebinning function energy_range ((float, float), optional): The starting and ending energy to rebin. If omitted, uses the full range of data. Setting start or end to ``None`` will use the data from the beginning or end of the data, respectively. Returns (:class:`BatsePhaii`) """ return super().rebin_energy(method, *args, energy_range=energy_range, ecalib=self.ecalib, detector=self.detector)
[docs] def rebin_time(self, method, *args, time_range=(None, None)): """Rebin the PHAII in time given a rebinning method. Produces a new PHAII object. Args: method (<function>): The rebinning function *args: Arguments to be passed to the rebinning function time_range ((float, float), optional): The starting and ending time to rebin. If omitted, uses the full range of data. Setting start or end to ``None`` will use the data from the beginning or end of the data, respectively. Returns (:class:`BatsePhaii`) """ return super().rebin_time(method, *args, time_range=time_range, ecalib=self.ecalib, detector=self.detector)
[docs] def slice_energy(self, energy_ranges): """Slice the PHAII by one or more energy range. Produces a new PHAII object. Args: energy_ranges ([(float, float), ...]): The energy ranges to slice the data to. Returns: (:class:`BatsePhaii`) """ return super().slice_energy(energy_ranges=energy_ranges, ecalib=self.ecalib, detector=self.detector)
[docs] def slice_time(self, time_ranges): """Slice the PHAII by one or more time range. Produces a new PHAII object. The GTI will be automatically update to match the new time range(s). Args: time_ranges ([(float, float), ...]): The time ranges to slice the data to. Returns: (:class:`BatsePhaii`) """ return super().slice_time(time_ranges=time_ranges, ecalib=self.ecalib, detector=self.detector)
def _build_headers(self, trigtime, tstart, tstop, num_chans): headers = self.headers.copy() trig_day, trig_secs = (None, None) if trigtime is not None: time_obj = Time(trigtime, format='cgro') trig_day, trig_secs = to_day_time(time_obj) headers['PRIMARY']['TRIG-DAY'] = trig_day headers['PRIMARY']['TRIG-TIM'] = trig_secs tstart_obj = from_day_time(trig_day, tstart + trig_secs) tstop_obj = from_day_time(trig_day, tstop + trig_secs) else: tstart_obj = Time(tstart, format='cgro') tstop_obj = Time(tstop, format='cgro') start_day, start_secs = to_day_time(tstart_obj) stop_day, stop_secs = to_day_time(tstop_obj) headers['PRIMARY']['STRT-DAY'] = start_day headers['PRIMARY']['STRT-TIM'] = start_secs headers['PRIMARY']['END-DAY'] = stop_day headers['PRIMARY']['END-TIM'] = stop_secs try: headers['BATSE BURST SPECTRA']['LO_CHAN'] = 0 headers['BATSE BURST SPECTRA']['UP_CHAN'] = num_chans-1 except: pass return headers def _build_hdulist(self): raise NotImplementedError # create FITS and primary header hdulist = fits.HDUList() primary_hdu = fits.PrimaryHDU(header=self.headers['PRIMARY']) for key, val in self.headers['PRIMARY'].items(): primary_hdu.header[key] = val hdulist.append(primary_hdu) # the ecalib extension ecalib_hdu = self._ecalib_table() hdulist.append(ecalib_hdu) # the spectrum extension spectrum_hdu = self._spectrum_table() hdulist.append(spectrum_hdu) return hdulist def _ecalib_table(self): raise NotImplementedError num_det = self.ecalib.num_det cal_det_col = fits.Column(name='CAL_DET', format='{}I'.format(num_det), array=self.ecalib.det) cal_strt_col = fits.Column(name='CAL_STRT', format='{}D'.format(num_det), unit='TJD', array=self.ecalib.cal_start) cal_stop_col = fits.Column(name='CAL_STOP', format='{}D'.format(num_det), unit='TJD', array=self.ecalib.cal_start) e_edges = np.append(self.ebounds.low_edges(), self.ebounds.range[1]) e_edges_col = fits.Column(name='E_EDGES', unit='keV', format='{}E'.format(e_edges.size), array=e_edges.reshape(self.ecalib.num_det, -1)) width64_col = fits.Column(name='WIDTH64', format='{}I'.format(num_det), array=self.ecalib.width64) line_nrg_col = fits.Column(name='LINE_NRG', format='4E', unit='keV', array=self.ecalib.line_energy) line_chan_col = fits.Column(name='LINECHAN', format='4E', array=self.ecalib.line_chan) det_s_zn_col = fits.Column(name='DET_S_ZN', format='{}E'.format(num_det), unit='deg', array=self.ecalib.det_s_zn) det_e_zn_col = fits.Column(name='DET_E_ZN', format='{}E'.format(num_det), unit='deg', array=self.ecalib.det_e_zn) cal_name_col = fits.Column(name='CAL_NAME', format='16A', array=self.ecalib.cal_name) hdu = fits.BinTableHDU.from_columns([cal_det_col, cal_strt_col, cal_stop_col, e_edges_col, width64_col, line_nrg_col, line_chan_col, det_s_zn_col, det_e_zn_col, cal_name_col], header=self.headers['BATSE_E_CALIB']) for key, val in self.headers['BATSE_E_CALIB'].items(): hdu.header[key] = val hdu.header.comments['TTYPE1'] = 'Detector number: use to index cal. data' hdu.header.comments['TTYPE2'] = 'Start time of the calibration record' hdu.header.comments['TUNIT2'] = 'Truncated Julian days' hdu.header.comments['TTYPE3'] = 'Stop time of the calibration record' hdu.header.comments['TUNIT3'] = 'Truncated Julian days' hdu.header.comments['TTYPE4'] = 'Energy edges for each selected detector' hdu.header.comments['TTYPE5'] = 'Integer value of Width64 (SDs only)' hdu.header.comments['TTYPE6'] = 'Centroid energy of 4 calibration lines' hdu.header.comments['TTYPE7'] = 'Centroid channel of 4 calibration lines' hdu.header.comments['TTYPE8'] = 'Zenith of the source in detector coords' hdu.header.comments['TTYPE9'] = 'Zenith of the Earth in detector coords' hdu.header.comments['TTYPE10'] = 'Name of the channel-to-energy scheme used' return hdu def _spectrum_table(self): raise NotImplementedError tstart = np.copy(self.data.tstart) tstop = np.copy(self.data.tstop) times = np.vstack((tstart, tstop)).T times_col = fits.Column(name='TIMES', format='2E', unit='s', array=times) rates_col = fits.Column(name='RATES', unit='count /s', format='{}E'.format(self.num_chans), array=self.data.rates) errors_col = fits.Column(name='ERRORS', unit='count /s', format='{}E'.format(self.num_chans), array=self.data.rate_uncertainty) hdu = fits.BinTableHDU.from_columns([times_col, rates_col, errors_col], header=self.headers['BATSE BURST SPECTRA']) for key, val in self.headers['BATSE BURST SPECTRA'].items(): hdu.header[key] = val hdu.header.comments['TTYPE1'] = 'Array of rate start and stop times' hdu.header.comments['TUNIT1'] = 'Seconds since BSTST' hdu.header.comments['TTYPE2'] = 'N_Channel X N_Times array of rates' hdu.header.comments['TTYPE3'] = 'N_Channel X N_Times array of errors' return hdu
[docs]class BatsePhaiiMulti(FitsFileContextManager, SpacecraftFrameModelMixin): """BATSE data containing PHAII from multiple detectors. This is typically either CONT or DISCLA data. In addition to the PHAII data, these files also contain spacecraft orbit and attitude information. """ def __init__(self): super().__init__() self._data = [] self._headers = [] self._filetype = None self._ecalib = None self._tstart = None self._tstop = None self._exposure = None self._frame = None @property def detectors(self): """(list): The detectors in the file""" return self._ecalib.detectors @property def num_dets(self): """(int): Number of detectors in the file""" return self._ecalib.num_dets
[docs] def get_detector(self, det_var): """Retrieve the Phaii object for the given detector. Args: det_var (str, int, or :class:`BatseDetectors`) Returns: (:class:`BatsePhaii``) """ if isinstance(det_var, BatseDetectors): num = det_var.number elif isinstance(det_var, str): num = BatseDetectors.from_str(det_var).number elif isinstance(det_var, int): num = det_var else: raise TypeError('det_var must be a str, int, or BatseDetectors ' \ 'object') ecalib_det = self._ecalib.get_detector(num) e_edges = ecalib_det.edges_over_timespan(num, self._tstart[0], self._tstop[-1]) if self._filetype == 'cont': teb = BatseTimeEnergyBins(self._data['COUNTS'][:,:,num], self._tstart, self._tstop, self._exposure[:,num], e_edges[:-1], e_edges[1:]) return BatsePhaiiCont.from_data(teb, headers=self._headers, ecalib=ecalib_det, detector=BatseDetectors.from_num(num)) elif self._filetype == 'discla': # in DISCLA, the first 4 channels are the discriminator channels, # channel 5 is the uncoincidenced total LAD rate (counts?), and the # channel 6 is the total CPD rate (counts?) teb = BatseTimeEnergyBins(self._data['COUNTS'][:,:4,num], self._tstart, self._tstop, self._exposure[:,num], e_edges[:-1], e_edges[1:]) lad_lc = BatseTimeBins(self._data['COUNTS'][:,4,num], self._tstart, self._tstop, self._exposure[:,num]) cpd_lc = BatseTimeBins(self._data['COUNTS'][:,5,num], self._tstart, self._tstop, self._exposure[:,num]) return BatsePhaiiDiscla.from_data(teb, headers=self._headers, ecalib=ecalib_det, detector=BatseDetectors.from_num(num), data_lad_tot=lad_lc, data_cpd_tot=cpd_lc)
[docs] def get_spacecraft_frame(self): """Retrieves the spacecraft frame(s) from the file. Returns: (:class:`CgroFrame`) """ x_axis = SkyCoord(self._data['X_RA'], self._data['X_DEC'], unit='deg') z_axis = SkyCoord(self._data['Z_RA'], self._data['Z_DEC'], unit='deg') axes = SpacecraftAxes(x_pointing=x_axis, z_pointing=z_axis) sc_frame = CgroFrame(obstime=Time(self._data['MID_TIME'], format='cgro'), obsgeoloc = r.CartesianRepresentation( x=self._data['X_POS'], y=self._data['Y_POS'], z=self._data['Z_POS'], unit='km'), axes=axes, detectors=BatseDetectors) return sc_frame
[docs] @classmethod def open(cls, file_path, **kwargs): """Open a BATSE file containing PHA time series from multiple detectors. Args: file_path (str): The file path Returns: (:class:`BatsePhaiiMulti`) """ obj = super().open(file_path, **kwargs) hdrs = [hdu.header for hdu in obj.hdulist] if 'cont_' in obj.filename: try: headers = PhaiiContHeaders.from_headers(hdrs) except: try: headers = PhaiiContHeadersAlt1.from_headers(hdrs) except: headers = PhaiiContHeadersAlt2.from_headers(hdrs) filetype = 'cont' elif 'discla_' in obj.filename: try: headers = PhaiiDisclaHeaders.from_headers(hdrs) except: try: headers = PhaiiDisclaHeadersAlt1.from_headers(hdrs) except: headers = PhaiiDisclaHeadersAlt2.from_headers(hdrs) filetype = 'discla' else: raise RuntimeError('Unsupported filetype or not a PHAII file.') obj._ecalib = BatseEnergyCalib.from_hdu(obj.hdulist[1].data) obj._data = obj.hdulist[2].data obj._headers = headers obj._filetype = filetype if filetype == 'cont': obj._tstart = obj._data['MID_TIME'] - (1.024/86400.0) obj._tstop = obj._data['MID_TIME'] + (1.024/86400.0) obj._exposure = 2.048 - obj._data['DEADTIME'] elif filetype == 'discla': obj._tstart = obj._data['MID_TIME'] - (0.512/86400.0) obj._tstop = obj._data['MID_TIME'] + (0.512/86400.0) obj._exposure = 1.024 - obj._data['DEADTIME'] obj.close() return obj
[docs] def sum_detectors(self, det_var_list=None): """Sum data over multiple detectors and return a Phaii object. Note:: The exposures are averaged between multiple detectors and the energy edges are taken to be the geometric mean. Args: det_var_list (list of str, int, or :class:`BatseDetectors`, optional) If not set, will sum all available detectors. Returns: (:class:`BatsePhaii``) """ if det_var_list is None: det_var_list = self.detectors phaiis = [self.get_detector(det_var) for det_var in det_var_list] num_phaiis = len(phaiis) counts = np.copy(phaiis[0].data.counts) exposure = np.copy(phaiis[0].data.exposure) for i in range(1, num_phaiis): counts += phaiis[i].data.counts exposure += phaiis[i].data.exposure exposure /= num_phaiis ecalibs = [phaii.ecalib for phaii in phaiis] ecalib_sum = BatseEnergyCalib.combine_detectors(ecalibs) e_edges = ecalib_sum.edges_over_timespan(0, self._tstart[0], self._tstop[-1]) data = BatseTimeEnergyBins(counts, phaiis[0].data.tstart, phaiis[0].data.tstop, exposure, e_edges[:-1], e_edges[1:]) dets = [phaii.detector for phaii in phaiis] if self._filetype == 'cont': obj = BatsePhaiiCont.from_data(data, headers=phaiis[0].headers, ecalib=ecalib_sum, detector=dets) elif self._filetype == 'discla': lad_lc = BatseTimeBins.sum([phaii.lad_lightcurve for phaii in phaiis]) cpd_lc = BatseTimeBins.sum([phaii.cpd_lightcurve for phaii in phaiis]) obj = BatsePhaiiDiscla.from_data(data, headers=phaiis[0].headers, ecalib=ecalib_sum, detector=dets, data_lad_tot=lad_lc, data_cpd_tot=cpd_lc) return obj
def __repr__(self): return f'<{self.__class__.__name__}: {self.num_dets} detectors>'
[docs]class BatsePhaiiCont(BatsePhaii): """The continuous CONT data.""" pass
[docs]class BatsePhaiiDiscla(BatsePhaii): """The continuous LAD discriminator data.""" def __init__(self): super().__init__() self._data_lad_tot = None self._data_cpd_tot = None @property def cpd_lightcurve(self): """(:class:`TimeBins`): The total charged particle detector lightcurve""" return self._data_cpd_tot @property def lad_lightcurve(self): """(:class:`TimeBins`): The total uncoincidenced lightcurve""" return self._data_lad_tot
[docs] @classmethod def from_data(cls, data, gti=None, trigger_time=None, filename=None, headers=None, ecalib=None, detector=None, data_lad_tot=None, data_cpd_tot=None): """Create a PHAII object from data. Args: data (:class:`~.data_primitives.TimeEnergyBins`): The PHAII data gti (:class:`~.data_primitives.Gti`, optional): The Good Time Intervals object. If omitted, the GTI spans (tstart, tstop) trigger_time (float, optional): The trigger time, if applicable. If provided, the data times will be shifted relative to the trigger time. filename (str, optional): The name of the file headers (:class:`~.headers.FileHeaders`): The file headers detector (:class:`BatseDetectors` or list): The BATSE detector(s) ecalib (:class:`BatseEnergyCalib`): The detector calibration data_lad_tot (:class:`TimeBins`): The total uncoincidenced lightcurve data_cpd_tot (:class:`TimeBins`): The total charged particle detector lightcurve Returns: (:class:`BatsePhaiiDiscla`) """ obj = super().from_data(data, gti=gti, filename=filename, ecalib=ecalib, headers=headers, detector=detector) obj._data_lad_tot = data_lad_tot obj._data_cpd_tot = data_cpd_tot return obj
[docs] def rebin_energy(self, method, *args, energy_range=(None, None)): """Rebin the DISCLA in energy given a rebinning method. Produces a new DISCLA object. Args: method (<function>): The rebinning function *args: Arguments to be passed to the rebinning function energy_range ((float, float), optional): The starting and ending energy to rebin. If omitted, uses the full range of data. Setting start or end to ``None`` will use the data from the beginning or end of the data, respectively. Returns (:class:`BatsePhaiiDiscla`) """ obj = super().rebin_energy(method, *args, energy_range=energy_range) obj._data_lad_tot = self.lad_lightcurve obj._data_cpd_tot = self.cpd_lightcurve return obj
[docs] def rebin_time(self, method, *args, time_range=(None, None)): """Rebin the DISCLA in time given a rebinning method. Produces a new DISCLA object. This also rebins the associated LAD and CPD lightcurve objects. Args: method (<function>): The rebinning function *args: Arguments to be passed to the rebinning function time_range ((float, float), optional): The starting and ending time to rebin. If omitted, uses the full range of data. Setting start or end to ``None`` will use the data from the beginning or end of the data, respectively. Returns (:class:`BatsePhaiiDiscla`) """ obj = super().rebin_time(method, *args, time_range=time_range) cpd_lc = self.cpd_lightcurve.rebin(method, *args, tstart=time_range[0], tstop=time_range[1]) obj._data_cpd_tot = cpd_lc lad_lc = self.lad_lightcurve.rebin(method, *args, tstart=time_range[0], tstop=time_range[1]) obj._data_lad_tot = lad_lc return obj
[docs] def slice_energy(self, energy_ranges): """Slice the DISCLA by one or more energy range. Produces a new DISCLA object. Args: energy_ranges ([(float, float), ...]): The energy ranges to slice the data to. Returns: (:class:`BatsePhaiiDiscla`) """ obj = super().slice_energy(energy_ranges=energy_ranges) obj._data_lad_tot = self.lad_lightcurve obj._data_cpd_tot = self.cpd_lightcurve return obj
[docs] def slice_time(self, time_ranges): """Slice the DISCLA by one or more time range. Produces a new DISCLA object. The GTI will be automatically update to match the new time range(s). This also slices the associated LAD and CPD lightcurve objects. Args: time_ranges ([(float, float), ...]): The time ranges to slice the data to. Returns: (:class:`BatsePhaiiDiscla`) """ obj = super().slice_time(time_ranges=time_ranges) time_ranges = self._assert_range_list(time_ranges) cpd_lc = BatseTimeBins.merge([self.cpd_lightcurve.slice(*trange) \ for trange in time_ranges]) obj._data_cpd_tot = cpd_lc lad_lc = BatseTimeBins.merge([self.lad_lightcurve.slice(*trange) \ for trange in time_ranges]) obj._data_lad_tot = lad_lc return obj
[docs]class BatsePhaiiTrigger(BatsePhaii): """Class representing trigger BATSE PHAII data. """
[docs] @classmethod def open(cls, file_path, **kwargs): """Open a trigger BATSE PHAII FITS file. Args: file_path (str): The file path of the FITS file Returns: (:class:`BatsePhaiiTrigger`) """ obj = Phaii.open(file_path, **kwargs) hdrs = [hdu.header for hdu in obj.hdulist] if 'tts_bfits' in str(file_path): headers = PhaiiTriggerTtsHeaders.from_headers(hdrs) else: headers = PhaiiTriggerHeaders.from_headers(hdrs) trigtime = from_day_time(hdrs[0]['TRIG-DAY'], hdrs[0]['TRIG-TIM']) ecalib = BatseEnergyCalib.from_hdu(obj.hdulist[1].data) tstart, tstop = obj.column(2, 'TIMES').astype(np.float64).T tstart = tstart.astype(float) tstop = tstop.astype(float) rates = obj.column(2, 'RATES').astype(float) e_edges = ecalib.edges_over_timespan(ecalib.detectors[0], tstart[0], tstop[-1]) #mark: FIXME exposures are not stored, so assume binwidths exposure = tstop - tstart if rates.ndim == 2: counts = rates * exposure[:,np.newaxis] else: counts = (rates * exposure).reshape(-1,1) teb = BatseTimeEnergyBins(counts, tstart, tstop, exposure, e_edges[:-1], e_edges[1:]) det = BatseDetectors.from_num(ecalib.detectors[0]) obj.close() return cls.from_data(teb, trigger_time=trigtime.cgro, ecalib=ecalib, headers=headers, filename=obj.filename, detector=det)
[docs]class BatseEnergyCalib(): """BATSE Energy Calibration data, which is stored in PHAII and event list files. There may be multiple detectors in the energy calibration, and there may be multiple calibrations with associated time ranges. This class is not intended to be instantiated by the user, but is rather instantiated when reading a data file. """ def __init__(self): self._data = None @property def detectors(self): """(list): The detectors in the calibration""" return [int(d) for d in list(set(self._data['CAL_DET']))] @property def num_dets(self): """(int): Number of detectors in the calibration""" return len(self.detectors) @property def num_times(self): """(int): Number of calibration times""" return int(self._data['CAL_DET'].size / self.num_dets)
[docs] @classmethod def combine_detectors(cls, calib_list): """Combines the calibrations from different detectors. This assumes the number of calibrations are the same and made at the same times between all of the detectors. The output is a calibration with edges that are the geometric mean of the edges from the input detectors. Args: calib_list (list): List of :class:`BatseEnergyCalib` Returns: (:class:`BatseEnergyCalib`) """ new_calib = np.copy(calib_list[0]._data) for calib in calib_list[1:]: new_calib['E_EDGES'] *= calib._data['E_EDGES'] new_calib['E_EDGES'] = new_calib['E_EDGES'] ** (1.0/len(calib_list)) return cls.from_hdu(new_calib)
[docs] def edges_at_time(self, det, time): """The energy edges for a given detector at a given time. Args: det (int): The detector number time (float): The CGRO MET Returns: (np.array) """ # find the closest time interval that has a calibration tidx = self._time_index(det, time) # mask the array for the detector mask = self._data['CAL_DET'] == det # retrieve the calibrated energy edges for detector and time e_edges = self._data['E_EDGES'][mask,:][tidx,:] return e_edges
[docs] def edges_over_timespan(self, det, t0, t1): """The energy edges for a given detector covering a timespan. The timespan may cover multiple energy calibrations, so the energy calibrations are weighted by the amount of time spanned until the next calibration. Args: det (int): The detector number tstart (float): The start time in CGRO MET tstop (float): The stop time in CGRO MET Returns: (np.array) """ tidx0 = self._time_index(det, t0) tidx1 = self._time_index(det, t1) tidx = np.arange(tidx0, tidx1+1) time_edges = self._time_edges(det) time_edges = time_edges[:,tidx] # if t0 is before the first calibration tstop, then extend/truncate if t0 <= time_edges[0,0]: time_edges[0,0] = t0 # if t0 is after the first calibration else: time_edges[:,0] = [min(time_edges[1,0], t0), max(time_edges[1,0], t0)] # if t1 is after the last calibration tstart, then extend/truncate if t1 >= time_edges[1,-1]: time_edges[1,-1] = t1 # if t1 is before the last calibration else: if tidx.size == 1: time_edges[1,0] = t1 else: time_edges[:,-1] = [min(t1, time_edges[0,1]), max(t1, time_edges[0,1])] dt = time_edges[1,:] - time_edges[0,:] weights = dt / dt.sum() # mask the array for the detector mask = self._data['CAL_DET'] == det e_edges = np.zeros_like(self._data['E_EDGES'][0,:]) for i in range(len(tidx)): e_edges += self._data['E_EDGES'][mask,:][tidx[i],:] * weights[i] return e_edges
[docs] @classmethod def from_hdu(cls, hdu_data): """Create a :class:`BatseEnergyCalib` object from a FITS HDU data table Args: hdu_data (astropy.io.FITS_rec) The FITS data table Returns: (:class:`BatseEnergyCalib`) """ obj = cls() obj._data = hdu_data return obj
[docs] def get_detector(self, det): """Return a new :class:`BatseEnergyCalib` containing only the requested detector calibration. Args: det (int): Detector number Returns: (:class:`BatseEnergyCalib`) """ mask = self._data['CAL_DET'] == det return self.from_hdu(self._data[mask])
def _time_edges(self, det): """Return the calibration time edges for a detector """ # mask the array for the detector mask = self._data['CAL_DET'] == det time_edges = np.vstack([self._data['CAL_STRT'][mask], self._data['CAL_STOP'][mask]]) return time_edges def _time_index(self, det, time): """Retrieve the index into the calibration of the closest time interval. """ time_edges = self._time_edges(det) # find the closest time interval that has a calibration idx0, idx1 = np.abs(time_edges - time).argmin(axis=1) if idx0 != idx1: idx_test = np.argmin([np.abs(time_edges[0,idx0] - time), np.abs(time_edges[1,idx1] - time)]) tidx = [idx0, idx1][idx_test] else: tidx = idx0 return tidx def __repr__(self): return f'<{self.__class__.__name__}: {self.num_times} calibrations>'