Source code for tcv.diag.dmpx

# -*- coding: utf-8 -*-
"""
XTOMO diagnostics data

Written by Nicola Vianello
"""

import logging
import os  # for correctly handling the directory position
from scipy import io  # this is needed to load the settings of xtomo
import numpy as np

import tcv  # this is the tcv main library component
import xarray  # this is needed as tdi save into an xarray


log = logging.getLogger(__name__)  # pylint: disable=invalid-name


[docs]class Top(object): """ Class to load and analyze the data contained in the top chamber of DMPX. All the methods described as classmethod and so they can be called directly Methods: read: Read the data calibrated in the same way as mpxdata matlab routine. It can read also single chords, accordingly to the call to Top gains: Return the gains used for the calibration of the chors geo : return the geometrical position of the chords spectrogram: It create and eventually plot the spectrograms(s) """ @staticmethod
[docs] def channels(shot, los=None): """ Parameters ---------- Same as fromshot. Returns ------- String array containing the channels, already sorted out according to shot number """ if los is None: los = np.arange(64) + 1 else: los = np.atleast_1d(los) # we provide here all the definition we need if shot < 24087 or shot > 24725: dtNe1 = r'\atlas::dt100_northeast_001:' dtNe2 = r'\atlas::dt100_northeast_002:' else: dtNe1 = r'\atlas::dt100_southwest_001:' dtNe2 = r'\atlas::dt100_southwest_002:' # we define the table corresponding to correct cabling of the signals # cabling for the shots > 26555 if shot >= 26555: cd2Cords = np.arange(1, 65, 2) cd2Chans = np.arange(1, 33, 1) cd1Cords = np.arange(2, 66, 2) cd1Chans = np.append(np.arange(16, 0, -1), np.arange(32, 16, -1)) elif shot > 20643 and shot < 26555: cd2Cords = np.arange(1, 65, 2) cd2Chans = np.arange(1, 33, 1) cd1Cords = np.arange(2, 66, 2) cd1Chans = np.arange(32, 0, -1) else: raise Exception('This programs does not load shots before 20643') # now we define the appropriate board according to the shot number # and the presence of fast signals cd2Cards = np.repeat(dtNe2, cd2Cords.size) cd1Cards = np.repeat(dtNe1, cd1Cords.size) # now we collect all together the signals and sort from HFS to LFS allCords = np.append(cd1Cords, cd2Cords) allChans = np.append(cd1Chans, cd2Chans) allCards = np.append(cd1Cards, cd2Cards) # sort the index from HFS to LFS allChans = allChans[np.argsort(allCords)] allCards = allCards[np.argsort(allCords)] # now select the chosen diods indexLos = los - 1 # we convert from LoS to index _chosenChans = allChans[indexLos] _chosenCards = allCards[indexLos] return _chosenCards, _chosenChans
@staticmethod
[docs] def fromshot(shot, los=None): """ Return the calibrated DMPX signals. Parameters ---------- shot : int or MDSConnection Shot number or connection instance camera : int Number of the XTOMO camera los : int or sequence of ints Optional argument with lines of sight (LoS) of the chosen camera. If None, it loads all the 20 channels Returns ------- An xarray.DataArray containing the data, time basis and all the information in dictionary Examples -------- >>> from tcv.diag.dmpx import Top >>> data = Top.fromshot(50730, los=32) """ if los is None: los = np.arange(64) + 1 else: los = np.atleast_1d(los) cards, channels = Top.channels(shot, los=los) Top._check_dtaq_trigger(shot) fast = Top._is_fast(shot, cards[0], channels[0]) values = [] with tcv.shot(shot) as conn: for card, channel in zip(cards, channels): values.append(conn.tdi(Top._node(card, channel, fast), dims='time')) # now we create the xarray data = xarray.concat(values, dim='los') data['los'] = los # correct for bad timing if shot > 19923 and shot < 20939: data.time = (data.time + 0.04) * 0.9697 - 0.04 # correct for missing shots if shot >= 25102 and shot < 25933 and (36 in los): data.value[np.argmin(np.abs(los - 36)), :] = 0 log.warn('Channel 36 was missing for this shot') elif shot >= 27127 and shot <= 28124: # missing channels, problem with cable 2, repaired by DF in Dec # 2004 missing = np.asarray([3, 64, 62, 60, 58, 56]) for fault in missing: if fault in los: data.values[np.argmin(np.abs(los - fault)), :] = 0 log.warn('Channel %s missing for this shot', fault) if shot >= 27185 and (44 in los): # one more channel missing !... data.values[np.argmin(np.abs(los-44)), :] = 0 log.warn('Channel 44 missing for this shot') if shot >= 28219 and shot < 31446: missing = np.asarray([19, 21]) for fault in missing: if fault in los: data.values[np.argmin(np.abs(los-fault)), :] = 0 log.warn('Channel %s missing for this shot', fault) # chose calibrate the signals # read the gain _calib, _gains = Top.gains(shot, los=los) data.values *= (_calib / _gains).reshape(los.size, 1) return data
@staticmethod
[docs] def gains(shot, los=None): """ Provide the proper calibration factor for the signals so that can obtain directly the calibration used call: Calibration, Gain = Top.gains() """ conn = tcv.shot(shot) # After #26575, the real voltage is indicated in the Vista window, # before it was the reference voltage mm = 500 if shot < 26765 else 1 voltage = conn.tdi( r'\VSYSTEM::TCV_PUBLICDB_R["ILOT:DAC_V_01"]').values * mm if shot == 32035: voltage = 2000 # we first load all the calibration and then choose the correct one # according to the ordering gainC = np.zeros(64) gainR = np.zeros(64) I = np.where(shot >= np.r_[20030, 23323, 26555, 27127, 29921, 30759, 31446])[0][-1] if I == 0: log.warn('Detector gain dependence on the high voltage value not ' 'included in the signal calibration') calib = Top.calibration_data('mpx_calib_first.mat') calib_coeff_t = np.squeeze(calib['calib_coeff']) gainC[:] = 1 if I == 1: log.warn('Detector gain dependence on the high voltage value not ' 'included in the signal calibration') calib = Top.calibration_data('mpx_calib_sept02.mat') calib_coeff_t = np.squeeze(calib['calib_coeff']) gainC[:] = 1 if I == 2: log.warn('Detector gain dependence on the high voltage value not ' 'included in the signal calibration') log.warn('There were leaks in the top detector wire chamber for ' '26554<shot<27128') log.warn('Calibration is not very meaningful') calib = Top.calibration_data('mpx_calib_may04.mat') calib_coeff_t = np.np.squeeze(calib['calib_coeff']) gainC[:] = 1 if I == 3: log.warn( 'Same gain dependence on the high voltage value taken for ' 'each channel') calib = Top.calibration_data('mpx_calib_july04.mat') calib_coeff = np.squeeze(calib['calib_coeff']) R = np.squeeze(calib['R']) calib_coeff_t = calib_coeff calib = Top.calibration_data('mpx_calib_may05.mat') C = np.squeeze(calib['C']) V = np.squeeze(calib['V']) C = np.mean(C[:, :64], 1) # Use the same gain for each channel gainC[:] = np.exp(np.interp(voltage, V, np.log(C))) gainR[:] = R if I == 4: log.warn( 'Same gain dependence on the high voltage value taken for ' 'each channel') calib = Top.calibration_data('mpx_calib_may05.mat') calib_coeff = np.np.squeeze(calib['calib_coeff']) C = np.squeeze(calib['C']) V = np.squeeze(calib['V']) calib_coeff_t = calib_coeff C = np.mean(C[:, :64], 1) # Use the same gain for each channel gainC[:] = np.exp(np.interp(voltage, V, np.log(C))) # use the previous relative calibration gainR[:] = Top.calibration_data('mpx_calib_july04.mat')['R'] if I == 5: # In this case, the different behaviour of the wires is contained # in the matrix of gains. The calibration coefficients are in a # vector: one value per wire, same value for all tensions. log.info( 'Gain dependence on the high voltage value calibrated for ' 'each channel') log.warn( 'Leaks in the bottom detector, no relative calibration of ' 'the two detectors') calib = Top.calibration_data('mpx_calib_oct05.mat') calib_coeff = np.squeeze(calib['calib_coeff']) C = np.squeeze(calib['C']) V = np.squeeze(calib['V']) calib_coeff_t = calib_coeff # Interpolation to get the proper gains wrt to the high tension # value gainC[:] = [np.interp(voltage, V, np.log(C[:, jj])) for jj in range(64)] gainR[:] = np.nan if I == 6: # In this case, the different behaviour of the wires is contained # in the matrix of calibration coefficients. The gains are in a # vector: one value per tension, same value for all wires. log.info( 'Gain dependence on the high voltage value calibrated for ' 'each channel') calib = Top.calibration_data('mpx_calib_dec05_bis.mat') calib_coeff_top = np.squeeze(calib['calib_coeff_top']) C_top_av = np.squeeze(calib['C_top_av']) V_top = np.squeeze(calib['V_top']) R = np.squeeze(calib['R']) calib_coeff_t = [] for jj in range(64): # Interpolation to get the proper calibration coefficient wrt # the high tension value calib_coeff_t.append( np.interp(voltage, V_top, calib_coeff_top[:, jj])) gainC[:] = np.exp(np.interp(voltage, V_top, np.log(C_top_av))) gainR = R # now we can order accordingly to the index for shot > 26555 # the ordering of the calibration is awful II = np.zeros(64, dtype=int) if shot >= 26555: II[0:63:2] = np.r_[32:64] II[1:64:2] = np.r_[15:-1:-1, 31:15:-1] calib_coeff_t = np.asarray(calib_coeff_t)[II] gainC = gainC[II] # limit the output to the chosen diods if los is None: indexLos = np.arange(64) else: indexLos = np.atleast_1d(los) - 1 cOut = calib_coeff_t[indexLos] gOut = gainC[indexLos] return cOut, gOut
@staticmethod
[docs] def geo(shot): """ Parameters ---------- Input: shot: shot number Returns ------- The radial and vertical coordinates of the LoS Examples ------- In [1]: from tcv.diag.dmpx import Top In [2]: x, y = Top.geo(50766) """ indices = np.arange(64) _geoF = Top.calibration_data('dmpx_los.mat') xc = _geoF['xchordt'][:, indices] yc = _geoF['ychordt'][:, indices] return xc, yc
@staticmethod
[docs] def calibration_data(name): return io.loadmat(os.path.join( os.path.dirname(os.path.realpath(__file__)), 'dmpxcalib', name))
@staticmethod def _check_dtaq_trigger(shot): with tcv.shot(shot) as conn: if shot < 24087 or shot > 24725: dtNe1 = r'\atlas::dt100_northeast_001:' dtNe2 = r'\atlas::dt100_northeast_002:' else: dtNe1 = r'\atlas::dt100_southwest_001:' dtNe2 = r'\atlas::dt100_southwest_002:' mode1 = conn.tdi(dtNe1 + 'MODE').values mode2 = conn.tdi(dtNe2 + 'MODE').values mode3 = 4 mode = mode1 * mode2 * mode3 if conn.shot >= 24087 and mode != 64: log.warn( 'Random temporal gap (5 to 25 us) between the two or ' 'three MPX acquisition cards.') log.warn( 'Random temporal gap (0.1 to 0.2ms) between DTACQ and ' 'TCV.') raise Warning('DTACQ not in mode 4') @staticmethod def _is_fast(shot, card, channel): """ We check if fast nodes are collected for shot below 34988 and eventually load fast data. """ if shot > 34988: log.info('Loading fast data after big opening') return False with tcv.shot(shot) as conn: try: conn.tdi('{}selected:channel_{:03}'.format(card, channel)) log.info('Loading high frequency for old shot') return True except: log.info('Loading low frequency for old shot') return False @staticmethod def _node(card, channel, fast): if fast: return '{}selected:channel_{:03}'.format(card, channel) else: return '{}channel_{:03}'.format(card, channel)