Module blechpy.dio.load_intan_rhd_format
Expand source code
#! /bin/env python
#
# Michael Gibson 17 July 2015
# Modified Adrian Foy Sep 2018
import sys, struct, math, os, time
import numpy as np
from blechpy.dio.intanutil.read_header import read_header
from blechpy.dio.intanutil.get_bytes_per_data_block import get_bytes_per_data_block
from blechpy.dio.intanutil.read_one_data_block import read_one_data_block
from blechpy.dio.intanutil.notch_filter import notch_filter
from blechpy.dio.intanutil.data_to_result import data_to_result
def read_data(filename):
"""Reads Intan Technologies RHD2000 data file generated by evaluation board GUI.
Data are returned in a dictionary, for future extensibility.
"""
tic = time.time()
fid = open(filename, 'rb')
filesize = os.path.getsize(filename)
header = read_header(fid)
print('Found {} amplifier channel{}.'.format(header['num_amplifier_channels'], plural(header['num_amplifier_channels'])))
print('Found {} auxiliary input channel{}.'.format(header['num_aux_input_channels'], plural(header['num_aux_input_channels'])))
print('Found {} supply voltage channel{}.'.format(header['num_supply_voltage_channels'], plural(header['num_supply_voltage_channels'])))
print('Found {} board ADC channel{}.'.format(header['num_board_adc_channels'], plural(header['num_board_adc_channels'])))
print('Found {} board digital input channel{}.'.format(header['num_board_dig_in_channels'], plural(header['num_board_dig_in_channels'])))
print('Found {} board digital output channel{}.'.format(header['num_board_dig_out_channels'], plural(header['num_board_dig_out_channels'])))
print('Found {} temperature sensors channel{}.'.format(header['num_temp_sensor_channels'], plural(header['num_temp_sensor_channels'])))
print('')
# Determine how many samples the data file contains.
bytes_per_block = get_bytes_per_data_block(header)
# How many data blocks remain in this file?
data_present = False
bytes_remaining = filesize - fid.tell()
if bytes_remaining > 0:
data_present = True
if bytes_remaining % bytes_per_block != 0:
raise Exception('Something is wrong with file size : should have a whole number of data blocks')
num_data_blocks = int(bytes_remaining / bytes_per_block)
num_amplifier_samples = header['num_samples_per_data_block'] * num_data_blocks
num_aux_input_samples = int((header['num_samples_per_data_block'] / 4) * num_data_blocks)
num_supply_voltage_samples = 1 * num_data_blocks
num_board_adc_samples = header['num_samples_per_data_block'] * num_data_blocks
num_board_dig_in_samples = header['num_samples_per_data_block'] * num_data_blocks
num_board_dig_out_samples = header['num_samples_per_data_block'] * num_data_blocks
record_time = num_amplifier_samples / header['sample_rate']
if data_present:
print('File contains {:0.3f} seconds of data. Amplifiers were sampled at {:0.2f} kS/s.'.format(record_time, header['sample_rate'] / 1000))
else:
print('Header file contains no data. Amplifiers were sampled at {:0.2f} kS/s.'.format(header['sample_rate'] / 1000))
if data_present:
# Pre-allocate memory for data.
print('')
print('Allocating memory for data...')
data = {}
if (header['version']['major'] == 1 and header['version']['minor'] >= 2) or (header['version']['major'] > 1):
data['t_amplifier'] = np.zeros(num_amplifier_samples, dtype=np.int)
else:
data['t_amplifier'] = np.zeros(num_amplifier_samples, dtype=np.uint)
data['amplifier_data'] = np.zeros([header['num_amplifier_channels'], num_amplifier_samples], dtype=np.uint)
data['aux_input_data'] = np.zeros([header['num_aux_input_channels'], num_aux_input_samples], dtype=np.uint)
data['supply_voltage_data'] = np.zeros([header['num_supply_voltage_channels'], num_supply_voltage_samples], dtype=np.uint)
data['temp_sensor_data'] = np.zeros([header['num_temp_sensor_channels'], num_supply_voltage_samples], dtype=np.uint)
data['board_adc_data'] = np.zeros([header['num_board_adc_channels'], num_board_adc_samples], dtype=np.uint)
# by default, this script interprets digital events (digital inputs and outputs) as booleans
# if unsigned int values are preferred(0 for False, 1 for True), replace the 'dtype=np.bool' argument with 'dtype=np.uint' as shown
# the commented line below illustrates this for digital input data; the same can be done for digital out
#data['board_dig_in_data'] = np.zeros([header['num_board_dig_in_channels'], num_board_dig_in_samples], dtype=np.uint)
data['board_dig_in_data'] = np.zeros([header['num_board_dig_in_channels'], num_board_dig_in_samples], dtype=np.bool)
data['board_dig_in_raw'] = np.zeros(num_board_dig_in_samples, dtype=np.uint)
data['board_dig_out_data'] = np.zeros([header['num_board_dig_out_channels'], num_board_dig_out_samples], dtype=np.bool)
data['board_dig_out_raw'] = np.zeros(num_board_dig_out_samples, dtype=np.uint)
# Read sampled data from file.
print('Reading data from file...')
# Initialize indices used in looping
indices = {}
indices['amplifier'] = 0
indices['aux_input'] = 0
indices['supply_voltage'] = 0
indices['board_adc'] = 0
indices['board_dig_in'] = 0
indices['board_dig_out'] = 0
print_increment = 10
percent_done = print_increment
for i in range(num_data_blocks):
read_one_data_block(data, header, indices, fid)
# Increment indices
indices['amplifier'] += header['num_samples_per_data_block']
indices['aux_input'] += int(header['num_samples_per_data_block'] / 4)
indices['supply_voltage'] += 1
indices['board_adc'] += header['num_samples_per_data_block']
indices['board_dig_in'] += header['num_samples_per_data_block']
indices['board_dig_out'] += header['num_samples_per_data_block']
fraction_done = 100 * (1.0 * i / num_data_blocks)
if fraction_done >= percent_done:
print('{}% done...'.format(percent_done))
percent_done = percent_done + print_increment
# Make sure we have read exactly the right amount of data.
bytes_remaining = filesize - fid.tell()
if bytes_remaining != 0: raise Exception('Error: End of file not reached.')
# Close data file.
fid.close()
if (data_present):
print('Parsing data...')
# Extract digital input channels to separate variables.
for i in range(header['num_board_dig_in_channels']):
data['board_dig_in_data'][i, :] = np.not_equal(np.bitwise_and(data['board_dig_in_raw'], (1 << header['board_dig_in_channels'][i]['native_order'])), 0)
# Extract digital output channels to separate variables.
for i in range(header['num_board_dig_out_channels']):
data['board_dig_out_data'][i, :] = np.not_equal(np.bitwise_and(data['board_dig_out_raw'], (1 << header['board_dig_out_channels'][i]['native_order'])), 0)
# Scale voltage levels appropriately.
data['amplifier_data'] = np.multiply(0.195, (data['amplifier_data'].astype(np.int32) - 32768)) # units = microvolts
data['aux_input_data'] = np.multiply(37.4e-6, data['aux_input_data']) # units = volts
data['supply_voltage_data'] = np.multiply(74.8e-6, data['supply_voltage_data']) # units = volts
if header['eval_board_mode'] == 1:
data['board_adc_data'] = np.multiply(152.59e-6, (data['board_adc_data'].astype(np.int32) - 32768)) # units = volts
elif header['eval_board_mode'] == 13:
data['board_adc_data'] = np.multiply(312.5e-6, (data['board_adc_data'].astype(np.int32) - 32768)) # units = volts
else:
data['board_adc_data'] = np.multiply(50.354e-6, data['board_adc_data']) # units = volts
data['temp_sensor_data'] = np.multiply(0.01, data['temp_sensor_data']) # units = deg C
# Check for gaps in timestamps.
num_gaps = np.sum(np.not_equal(data['t_amplifier'][1:]-data['t_amplifier'][:-1], 1))
if num_gaps == 0:
print('No missing timestamps in data.')
else:
print('Warning: {0} gaps in timestamp data found. Time scale will not be uniform!'.format(num_gaps))
# Scale time steps (units = seconds).
data['t_amplifier'] = data['t_amplifier'] / header['sample_rate']
data['t_aux_input'] = data['t_amplifier'][range(0, len(data['t_amplifier']), 4)]
data['t_supply_voltage'] = data['t_amplifier'][range(0, len(data['t_amplifier']), header['num_samples_per_data_block'])]
data['t_board_adc'] = data['t_amplifier']
data['t_dig'] = data['t_amplifier']
data['t_temp_sensor'] = data['t_supply_voltage']
# If the software notch filter was selected during the recording, apply the
# same notch filter to amplifier data here.
if header['notch_filter_frequency'] > 0:
print('Applying notch filter...')
print_increment = 10
percent_done = print_increment
for i in range(header['num_amplifier_channels']):
data['amplifier_data'][i,:] = notch_filter(data['amplifier_data'][i,:], header['sample_rate'], header['notch_filter_frequency'], 10)
fraction_done = 100 * (i / header['num_amplifier_channels'])
if fraction_done >= percent_done:
print('{}% done...'.format(percent_done))
percent_done += print_increment
else:
data = [];
# Move variables to result struct.
result = data_to_result(header, data, data_present)
print('Done! Elapsed time: {0:0.1f} seconds'.format(time.time() - tic))
return result
def plural(n):
"""Utility function to optionally pluralize words based on the value of n.
"""
if n == 1:
return ''
else:
return 's'
if __name__ == '__main__':
a=read_data(sys.argv[1])
#print(a)
Functions
def plural(n)
-
Utility function to optionally pluralize words based on the value of n.
Expand source code
def plural(n): """Utility function to optionally pluralize words based on the value of n. """ if n == 1: return '' else: return 's'
def read_data(filename)
-
Reads Intan Technologies RHD2000 data file generated by evaluation board GUI.
Data are returned in a dictionary, for future extensibility.
Expand source code
def read_data(filename): """Reads Intan Technologies RHD2000 data file generated by evaluation board GUI. Data are returned in a dictionary, for future extensibility. """ tic = time.time() fid = open(filename, 'rb') filesize = os.path.getsize(filename) header = read_header(fid) print('Found {} amplifier channel{}.'.format(header['num_amplifier_channels'], plural(header['num_amplifier_channels']))) print('Found {} auxiliary input channel{}.'.format(header['num_aux_input_channels'], plural(header['num_aux_input_channels']))) print('Found {} supply voltage channel{}.'.format(header['num_supply_voltage_channels'], plural(header['num_supply_voltage_channels']))) print('Found {} board ADC channel{}.'.format(header['num_board_adc_channels'], plural(header['num_board_adc_channels']))) print('Found {} board digital input channel{}.'.format(header['num_board_dig_in_channels'], plural(header['num_board_dig_in_channels']))) print('Found {} board digital output channel{}.'.format(header['num_board_dig_out_channels'], plural(header['num_board_dig_out_channels']))) print('Found {} temperature sensors channel{}.'.format(header['num_temp_sensor_channels'], plural(header['num_temp_sensor_channels']))) print('') # Determine how many samples the data file contains. bytes_per_block = get_bytes_per_data_block(header) # How many data blocks remain in this file? data_present = False bytes_remaining = filesize - fid.tell() if bytes_remaining > 0: data_present = True if bytes_remaining % bytes_per_block != 0: raise Exception('Something is wrong with file size : should have a whole number of data blocks') num_data_blocks = int(bytes_remaining / bytes_per_block) num_amplifier_samples = header['num_samples_per_data_block'] * num_data_blocks num_aux_input_samples = int((header['num_samples_per_data_block'] / 4) * num_data_blocks) num_supply_voltage_samples = 1 * num_data_blocks num_board_adc_samples = header['num_samples_per_data_block'] * num_data_blocks num_board_dig_in_samples = header['num_samples_per_data_block'] * num_data_blocks num_board_dig_out_samples = header['num_samples_per_data_block'] * num_data_blocks record_time = num_amplifier_samples / header['sample_rate'] if data_present: print('File contains {:0.3f} seconds of data. Amplifiers were sampled at {:0.2f} kS/s.'.format(record_time, header['sample_rate'] / 1000)) else: print('Header file contains no data. Amplifiers were sampled at {:0.2f} kS/s.'.format(header['sample_rate'] / 1000)) if data_present: # Pre-allocate memory for data. print('') print('Allocating memory for data...') data = {} if (header['version']['major'] == 1 and header['version']['minor'] >= 2) or (header['version']['major'] > 1): data['t_amplifier'] = np.zeros(num_amplifier_samples, dtype=np.int) else: data['t_amplifier'] = np.zeros(num_amplifier_samples, dtype=np.uint) data['amplifier_data'] = np.zeros([header['num_amplifier_channels'], num_amplifier_samples], dtype=np.uint) data['aux_input_data'] = np.zeros([header['num_aux_input_channels'], num_aux_input_samples], dtype=np.uint) data['supply_voltage_data'] = np.zeros([header['num_supply_voltage_channels'], num_supply_voltage_samples], dtype=np.uint) data['temp_sensor_data'] = np.zeros([header['num_temp_sensor_channels'], num_supply_voltage_samples], dtype=np.uint) data['board_adc_data'] = np.zeros([header['num_board_adc_channels'], num_board_adc_samples], dtype=np.uint) # by default, this script interprets digital events (digital inputs and outputs) as booleans # if unsigned int values are preferred(0 for False, 1 for True), replace the 'dtype=np.bool' argument with 'dtype=np.uint' as shown # the commented line below illustrates this for digital input data; the same can be done for digital out #data['board_dig_in_data'] = np.zeros([header['num_board_dig_in_channels'], num_board_dig_in_samples], dtype=np.uint) data['board_dig_in_data'] = np.zeros([header['num_board_dig_in_channels'], num_board_dig_in_samples], dtype=np.bool) data['board_dig_in_raw'] = np.zeros(num_board_dig_in_samples, dtype=np.uint) data['board_dig_out_data'] = np.zeros([header['num_board_dig_out_channels'], num_board_dig_out_samples], dtype=np.bool) data['board_dig_out_raw'] = np.zeros(num_board_dig_out_samples, dtype=np.uint) # Read sampled data from file. print('Reading data from file...') # Initialize indices used in looping indices = {} indices['amplifier'] = 0 indices['aux_input'] = 0 indices['supply_voltage'] = 0 indices['board_adc'] = 0 indices['board_dig_in'] = 0 indices['board_dig_out'] = 0 print_increment = 10 percent_done = print_increment for i in range(num_data_blocks): read_one_data_block(data, header, indices, fid) # Increment indices indices['amplifier'] += header['num_samples_per_data_block'] indices['aux_input'] += int(header['num_samples_per_data_block'] / 4) indices['supply_voltage'] += 1 indices['board_adc'] += header['num_samples_per_data_block'] indices['board_dig_in'] += header['num_samples_per_data_block'] indices['board_dig_out'] += header['num_samples_per_data_block'] fraction_done = 100 * (1.0 * i / num_data_blocks) if fraction_done >= percent_done: print('{}% done...'.format(percent_done)) percent_done = percent_done + print_increment # Make sure we have read exactly the right amount of data. bytes_remaining = filesize - fid.tell() if bytes_remaining != 0: raise Exception('Error: End of file not reached.') # Close data file. fid.close() if (data_present): print('Parsing data...') # Extract digital input channels to separate variables. for i in range(header['num_board_dig_in_channels']): data['board_dig_in_data'][i, :] = np.not_equal(np.bitwise_and(data['board_dig_in_raw'], (1 << header['board_dig_in_channels'][i]['native_order'])), 0) # Extract digital output channels to separate variables. for i in range(header['num_board_dig_out_channels']): data['board_dig_out_data'][i, :] = np.not_equal(np.bitwise_and(data['board_dig_out_raw'], (1 << header['board_dig_out_channels'][i]['native_order'])), 0) # Scale voltage levels appropriately. data['amplifier_data'] = np.multiply(0.195, (data['amplifier_data'].astype(np.int32) - 32768)) # units = microvolts data['aux_input_data'] = np.multiply(37.4e-6, data['aux_input_data']) # units = volts data['supply_voltage_data'] = np.multiply(74.8e-6, data['supply_voltage_data']) # units = volts if header['eval_board_mode'] == 1: data['board_adc_data'] = np.multiply(152.59e-6, (data['board_adc_data'].astype(np.int32) - 32768)) # units = volts elif header['eval_board_mode'] == 13: data['board_adc_data'] = np.multiply(312.5e-6, (data['board_adc_data'].astype(np.int32) - 32768)) # units = volts else: data['board_adc_data'] = np.multiply(50.354e-6, data['board_adc_data']) # units = volts data['temp_sensor_data'] = np.multiply(0.01, data['temp_sensor_data']) # units = deg C # Check for gaps in timestamps. num_gaps = np.sum(np.not_equal(data['t_amplifier'][1:]-data['t_amplifier'][:-1], 1)) if num_gaps == 0: print('No missing timestamps in data.') else: print('Warning: {0} gaps in timestamp data found. Time scale will not be uniform!'.format(num_gaps)) # Scale time steps (units = seconds). data['t_amplifier'] = data['t_amplifier'] / header['sample_rate'] data['t_aux_input'] = data['t_amplifier'][range(0, len(data['t_amplifier']), 4)] data['t_supply_voltage'] = data['t_amplifier'][range(0, len(data['t_amplifier']), header['num_samples_per_data_block'])] data['t_board_adc'] = data['t_amplifier'] data['t_dig'] = data['t_amplifier'] data['t_temp_sensor'] = data['t_supply_voltage'] # If the software notch filter was selected during the recording, apply the # same notch filter to amplifier data here. if header['notch_filter_frequency'] > 0: print('Applying notch filter...') print_increment = 10 percent_done = print_increment for i in range(header['num_amplifier_channels']): data['amplifier_data'][i,:] = notch_filter(data['amplifier_data'][i,:], header['sample_rate'], header['notch_filter_frequency'], 10) fraction_done = 100 * (i / header['num_amplifier_channels']) if fraction_done >= percent_done: print('{}% done...'.format(percent_done)) percent_done += print_increment else: data = []; # Move variables to result struct. result = data_to_result(header, data, data_present) print('Done! Elapsed time: {0:0.1f} seconds'.format(time.time() - tic)) return result