Module blechpy.dio.rawIO
Expand source code
import numpy as np
import os, re
from blechpy.dio import load_intan_rhd_format
from blechpy.utils import print_tools as pt, userIO
support_rec_types = {'one file per channel':'amp-\S-\d*\.dat',
'one file per signal type':'amplifier\.dat'}
voltage_scaling = 0.195
def get_sampling_rate(rec_dir):
'''Returns sampling rate in Hz of intan recording data
Parameters
----------
rec_dir : str, full path to raw recording directory
Returns
-------
float : sampling rate in Hz
'''
info_file = os.path.join(rec_dir, 'info.rhd')
if not os.path.exists(info_file):
raise FileNotFoundError('No info.rhd in recording directory')
info = load_intan_rhd_format.read_data(info_file)
return info['frequency_parameters']['amplifier_sample_rate']
def read_rec_info(file_dir, shell=True):
'''Reads the info.rhd file to get relevant parameters.
Parameters
----------
file_dir : str, path to recording directory
Returns
-------
dict, necessary analysis info from info.rhd
fields: amplifier_sampling_rate, dig_in_sampling_rate, notch_filter,
ports (list, corresponds to channels), channels (list)
Throws
------
FileNotFoundError : if info.rhd is not in file_dir
'''
info_file = os.path.join(file_dir,'info.rhd')
if not os.path.isfile(info_file):
raise FileNotFoundError('info.rhd file not found in %s' % file_dir)
out = {}
print('Reading info.rhd file...')
try:
info = load_intan_rhd_format.read_data(info_file)
except Exception as e:
# TODO: Have a way to manually input settings
info = None
userIO.tell_user('%s was unable to be read. May be corrupted or '
'recording may have been interrupted' % info_file,
shell=True)
raise e
freq_params = info['frequency_parameters']
notch_freq = freq_params['notch_filter_frequency']
amp_fs = freq_params['amplifier_sample_rate']
dig_in_fs = freq_params['board_dig_in_sample_rate']
out = {'amplifier_sampling_rate':amp_fs,
'dig_in_sampling_rate':dig_in_fs,
'notch_filter':notch_freq}
amp_ch = info['amplifier_channels']
ports = [x['port_prefix'] for x in amp_ch]
channels = [x['native_order'] for x in amp_ch]
out['ports'] = ports
out['channels'] = channels
out['num_channels'] = len(channels)
if info.get('board_dig_in_channels'):
dig_in = info['board_dig_in_channels']
din = [x['native_order'] for x in dig_in]
out['dig_in'] = din
if info.get('board_dig_out_channels'):
dig_out = info['board_dig_out_channels']
dout = [x['native_order'] for x in dig_out]
out['dig_out'] = dout
out['file_type'] = get_recording_filetype(file_dir)
print('\nRecording Info\n--------------\n')
print(pt.print_dict(out))
return out
def read_time_dat(file_dir,sampling_rate=None):
'''Reads the time vector out of time.dat and converts to seconds
Parameters
----------
file_dir : str, path to recording directory
sampling_rate: int (optional), sampling rate of amplifier data in Hz
Returns
-------
numpy.ndarray, time vector in seconds
Throws
------
FileNotFoundError : if time.dat is not in file_dir
'''
time_file = os.path.join(file_dir,'time.dat')
if not os.path.isfile(time_file):
raise FileNotFoundError('Time file not found at %s' % time_file)
if sampling_rate is None:
rec_info = read_rec_info(file_dir)
sampling_rate = rec_info['amplifier_sampling_rate']
time = np.fromfile(time_file,dtype=np.dtype('int32'))
time = time.astype('float')/sampling_rate
return time
def read_amplifier_dat(file_dir,num_channels=None):
'''Reads intan amplifier.dat file to get recording channel data from
recordings done with 'one file per signal type' setting
WARNING: Memory intensive as all channel data will be held in memory
TO_CHECK: Do we analyze with scaled or unscaled voltage, Narendra's code
uses unscaled, scale factor to get microvolts is 0.195
Parameters
----------
file_dir: str, path to recording directory
num_channels: int (optional), number of channels in recording, if not
provided, info is taken from info.rhd
Returns
-------
numpy.ndarray : array with row for each channel corresponding to channels
in rec_info from read_rec_info, voltage is unscaled, to
get microvolts multiply by 0.195
Throws
------
FileNotFoundError : if amplifier.dat file is not found in file_dir
'''
if num_channels is None:
rec_info = read_rec_info(file_dir)
num_channels = len(rec_info['channels'])
amp_file = os.path.join(file_dir,'amplifier.dat')
if not os.path.isfile(amp_file):
raise FileNotFoundError('Could not find amplfier file at %s' % amp_file)
amp_dat = np.fromfile(amp_file,dtype=np.dtype('int16'))
amp_dat = amp_dat.reshape(num_channels,-1,order='F')
return amp_dat
def read_digital_dat(file_dir,dig_channels=None,dig_type='in'):
'''Reads digitalin.dat from intan recording with file_type 'one file per signal type'
Parameters
----------
file_dir : str, file directory for recording data
dig_channels : list (optional), digital channel numbers to get
dig_type : {'in','out'}, type of digital signal to get (default 'in')
Returns
-------
numpy.ndarray : one row per digital_input channel corresponding to dig_in
from rec_info
Throws
------
FileNotFoundError : if digitalin.dat is not in file_dir
'''
if dig_channels is None:
rec_info = read_rec_info(file_dir)
dig_channels = rec_info['dig_%s' % dig_type]
dat_file = os.path.join(file_dir,'digital%s.dat' % dig_type)
if not os.path.isfile(dat_file):
raise FileNotFoundError('No file found at %s' % dig_in_file)
file_dat = np.fromfile(dat_file,dtype=np.dtype('int16'))
chan_dat = []
for ch in dig_channels:
tmp_dat = (file_dat&pow(2,ch)>0).astype(np.dtype('uint16'))
chan_dat.append(tmp_dat)
out = np.array(chan_dat)
return out
def read_one_channel_file(file_name):
'''Reads a single amp or din channel file created by an intan 'one file per
channel' recording
Parameters
----------
file_name : str, absolute path to file to read data from
Returns
-------
numpy.ndarray : int16, 1D array of data from amp file
Throws
------
FileNotFoundError : if file_name is not found
'''
if not os.path.isfile(file_name):
raise FileNotFoundError('Could not locate file %s' % file_name)
chan_dat = np.fromfile(file_name,dtype=np.dtype('int16'))
return chan_dat
def get_recording_filetype(file_dir):
'''Check Intan recording directory to determine type of recording and thus
extraction method to use. Asks user to confirm, and manually correct if
incorrect
Parameters
----------
file_dir : str, recording directory to check
Returns
-------
str : file_type of recording
'''
file_list = os.listdir(file_dir)
file_type = None
for k,v in support_rec_types.items():
regex = re.compile(v)
if any([True for x in file_list if regex.match(x) is not None]):
file_type = k
if file_type is None:
msg = '\n '.join(['unsupported recording type. Supported types are:',
*list(support_rec_types.keys())])
else:
msg = '\"'+file_type+'\"'
return file_type
# Removing query since this is pretty accurate
#query = 'Detected recording type is %s \nIs this correct?: ' % msg
#q = userIO.ask_user(query,,
# shell=shell)
#if q == 1:
# return file_type
#else:
# choice = userIO.select_from_list('Select correct recording type',
# list(support_rec_types.keys()),
# 'Select Recording Type',
# shell=shell)
# choice = list(support_rec_types.keys())[choice]
# return choice
Functions
def get_recording_filetype(file_dir)
-
Check Intan recording directory to determine type of recording and thus extraction method to use. Asks user to confirm, and manually correct if incorrect
Parameters
file_dir
:str, recording directory to check
Returns
str
:file_type
ofrecording
Expand source code
def get_recording_filetype(file_dir): '''Check Intan recording directory to determine type of recording and thus extraction method to use. Asks user to confirm, and manually correct if incorrect Parameters ---------- file_dir : str, recording directory to check Returns ------- str : file_type of recording ''' file_list = os.listdir(file_dir) file_type = None for k,v in support_rec_types.items(): regex = re.compile(v) if any([True for x in file_list if regex.match(x) is not None]): file_type = k if file_type is None: msg = '\n '.join(['unsupported recording type. Supported types are:', *list(support_rec_types.keys())]) else: msg = '\"'+file_type+'\"' return file_type
def get_sampling_rate(rec_dir)
-
Returns sampling rate in Hz of intan recording data
Parameters
rec_dir
:str, full path to raw recording directory
Returns
float
:sampling rate in Hz
Expand source code
def get_sampling_rate(rec_dir): '''Returns sampling rate in Hz of intan recording data Parameters ---------- rec_dir : str, full path to raw recording directory Returns ------- float : sampling rate in Hz ''' info_file = os.path.join(rec_dir, 'info.rhd') if not os.path.exists(info_file): raise FileNotFoundError('No info.rhd in recording directory') info = load_intan_rhd_format.read_data(info_file) return info['frequency_parameters']['amplifier_sample_rate']
def read_amplifier_dat(file_dir, num_channels=None)
-
Reads intan amplifier.dat file to get recording channel data from recordings done with 'one file per signal type' setting WARNING: Memory intensive as all channel data will be held in memory TO_CHECK: Do we analyze with scaled or unscaled voltage, Narendra's code uses unscaled, scale factor to get microvolts is 0.195
Parameters
file_dir
:str, path to recording directory
num_channels
:int (optional), number
ofchannels in recording, if not
- provided, info is taken from info.rhd
Returns
numpy.ndarray : array with row for each channel corresponding to channels
- in rec_info from read_rec_info, voltage is unscaled, to get microvolts multiply by 0.195
Throws
FileNotFoundError : if amplifier.dat file is not found in file_dir
Expand source code
def read_amplifier_dat(file_dir,num_channels=None): '''Reads intan amplifier.dat file to get recording channel data from recordings done with 'one file per signal type' setting WARNING: Memory intensive as all channel data will be held in memory TO_CHECK: Do we analyze with scaled or unscaled voltage, Narendra's code uses unscaled, scale factor to get microvolts is 0.195 Parameters ---------- file_dir: str, path to recording directory num_channels: int (optional), number of channels in recording, if not provided, info is taken from info.rhd Returns ------- numpy.ndarray : array with row for each channel corresponding to channels in rec_info from read_rec_info, voltage is unscaled, to get microvolts multiply by 0.195 Throws ------ FileNotFoundError : if amplifier.dat file is not found in file_dir ''' if num_channels is None: rec_info = read_rec_info(file_dir) num_channels = len(rec_info['channels']) amp_file = os.path.join(file_dir,'amplifier.dat') if not os.path.isfile(amp_file): raise FileNotFoundError('Could not find amplfier file at %s' % amp_file) amp_dat = np.fromfile(amp_file,dtype=np.dtype('int16')) amp_dat = amp_dat.reshape(num_channels,-1,order='F') return amp_dat
def read_digital_dat(file_dir, dig_channels=None, dig_type='in')
-
Reads digitalin.dat from intan recording with file_type 'one file per signal type'
Parameters
file_dir
:str, file directory for recording data
dig_channels
:list (optional), digital channel numbers to get
dig_type
:{'in','out'}, type
ofdigital signal to get (default 'in')
Returns
numpy.ndarray : one row per digital_input channel corresponding to dig_in
- from rec_info
Throws
FileNotFoundError : if digitalin.dat is not in file_dir
Expand source code
def read_digital_dat(file_dir,dig_channels=None,dig_type='in'): '''Reads digitalin.dat from intan recording with file_type 'one file per signal type' Parameters ---------- file_dir : str, file directory for recording data dig_channels : list (optional), digital channel numbers to get dig_type : {'in','out'}, type of digital signal to get (default 'in') Returns ------- numpy.ndarray : one row per digital_input channel corresponding to dig_in from rec_info Throws ------ FileNotFoundError : if digitalin.dat is not in file_dir ''' if dig_channels is None: rec_info = read_rec_info(file_dir) dig_channels = rec_info['dig_%s' % dig_type] dat_file = os.path.join(file_dir,'digital%s.dat' % dig_type) if not os.path.isfile(dat_file): raise FileNotFoundError('No file found at %s' % dig_in_file) file_dat = np.fromfile(dat_file,dtype=np.dtype('int16')) chan_dat = [] for ch in dig_channels: tmp_dat = (file_dat&pow(2,ch)>0).astype(np.dtype('uint16')) chan_dat.append(tmp_dat) out = np.array(chan_dat) return out
def read_one_channel_file(file_name)
-
Reads a single amp or din channel file created by an intan 'one file per channel' recording
Parameters
file_name
:str, absolute path to file to read data from
Returns
numpy.ndarray : int16, 1D array
ofdata from amp file
Throws
FileNotFoundError : if file_name is not found
Expand source code
def read_one_channel_file(file_name): '''Reads a single amp or din channel file created by an intan 'one file per channel' recording Parameters ---------- file_name : str, absolute path to file to read data from Returns ------- numpy.ndarray : int16, 1D array of data from amp file Throws ------ FileNotFoundError : if file_name is not found ''' if not os.path.isfile(file_name): raise FileNotFoundError('Could not locate file %s' % file_name) chan_dat = np.fromfile(file_name,dtype=np.dtype('int16')) return chan_dat
def read_rec_info(file_dir, shell=True)
-
Reads the info.rhd file to get relevant parameters. Parameters
file_dir
:str, path to recording directory
Returns
dict, necessary analysis info from info.rhd
- fields: amplifier_sampling_rate, dig_in_sampling_rate, notch_filter, ports (list, corresponds to channels), channels (list)
Throws
FileNotFoundError : if info.rhd is not in file_dir
Expand source code
def read_rec_info(file_dir, shell=True): '''Reads the info.rhd file to get relevant parameters. Parameters ---------- file_dir : str, path to recording directory Returns ------- dict, necessary analysis info from info.rhd fields: amplifier_sampling_rate, dig_in_sampling_rate, notch_filter, ports (list, corresponds to channels), channels (list) Throws ------ FileNotFoundError : if info.rhd is not in file_dir ''' info_file = os.path.join(file_dir,'info.rhd') if not os.path.isfile(info_file): raise FileNotFoundError('info.rhd file not found in %s' % file_dir) out = {} print('Reading info.rhd file...') try: info = load_intan_rhd_format.read_data(info_file) except Exception as e: # TODO: Have a way to manually input settings info = None userIO.tell_user('%s was unable to be read. May be corrupted or ' 'recording may have been interrupted' % info_file, shell=True) raise e freq_params = info['frequency_parameters'] notch_freq = freq_params['notch_filter_frequency'] amp_fs = freq_params['amplifier_sample_rate'] dig_in_fs = freq_params['board_dig_in_sample_rate'] out = {'amplifier_sampling_rate':amp_fs, 'dig_in_sampling_rate':dig_in_fs, 'notch_filter':notch_freq} amp_ch = info['amplifier_channels'] ports = [x['port_prefix'] for x in amp_ch] channels = [x['native_order'] for x in amp_ch] out['ports'] = ports out['channels'] = channels out['num_channels'] = len(channels) if info.get('board_dig_in_channels'): dig_in = info['board_dig_in_channels'] din = [x['native_order'] for x in dig_in] out['dig_in'] = din if info.get('board_dig_out_channels'): dig_out = info['board_dig_out_channels'] dout = [x['native_order'] for x in dig_out] out['dig_out'] = dout out['file_type'] = get_recording_filetype(file_dir) print('\nRecording Info\n--------------\n') print(pt.print_dict(out)) return out
def read_time_dat(file_dir, sampling_rate=None)
-
Reads the time vector out of time.dat and converts to seconds
Parameters
file_dir
:str, path to recording directory
sampling_rate
:int (optional), sampling rate
ofamplifier data in Hz
Returns
numpy.ndarray, time vector in seconds
Throws
FileNotFoundError : if time.dat is not in file_dir
Expand source code
def read_time_dat(file_dir,sampling_rate=None): '''Reads the time vector out of time.dat and converts to seconds Parameters ---------- file_dir : str, path to recording directory sampling_rate: int (optional), sampling rate of amplifier data in Hz Returns ------- numpy.ndarray, time vector in seconds Throws ------ FileNotFoundError : if time.dat is not in file_dir ''' time_file = os.path.join(file_dir,'time.dat') if not os.path.isfile(time_file): raise FileNotFoundError('Time file not found at %s' % time_file) if sampling_rate is None: rec_info = read_rec_info(file_dir) sampling_rate = rec_info['amplifier_sampling_rate'] time = np.fromfile(time_file,dtype=np.dtype('int32')) time = time.astype('float')/sampling_rate return time