Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 59 additions & 66 deletions libmushu/driver/gtec.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.


# TODO: update to new version of pyusb

import struct
import time
from exceptions import Exception
import logging

import usb
import usb.core
import usb.util
from scipy.signal import iirfilter
import numpy as np

Expand All @@ -36,7 +35,7 @@
logger.info('Logger started')

ID_VENDOR_GTEC = 0x153c
# I saw an am with this vendorid too
# I saw an amp with this vendorid too
ID_VENDOR_GTEC2 = 0x15c3
ID_PRODUCT_GUSB_AMP = 0x0001

Expand All @@ -47,49 +46,50 @@ class GUSBamp(Amplifier):

def __init__(self):
logger.info('Initializing GUSBamp instance')
# list of available amps
self.amps = []
for bus in usb.busses():
for device in bus.devices:
if (device.idVendor in [ID_VENDOR_GTEC, ID_VENDOR_GTEC2] and
device.idProduct == ID_PRODUCT_GUSB_AMP):
self.amps.append(device)
self.devh = None
# find the amplifier
self.dev = usb.core.find(custom_match=
lambda d:
d.idVendor in (ID_VENDOR_GTEC, ID_VENDOR_GTEC2) and
d.idProduct == ID_PRODUCT_GUSB_AMP)
assert(self.dev is not None)
self.mode = None
# Initialize the amplifier and make it ready.
device = self.amps[0]
self.devh = device.open()
# detach kernel driver if nessecairy
config = device.configurations[0]
self.devh.setConfiguration(config)
assert(len(config.interfaces) > 0)
# sometimes it is the other one
first_interface = config.interfaces[0][0]
if first_interface is None:
first_interface = config.interfaces[0][1]
first_setting = first_interface.alternateSetting
self.devh.claimInterface(first_interface)
self.devh.setAltInterface(first_interface)
# initialization straight from the usb-dump
# activate its first configuration
self.dev.set_configuration()
cfg = self.dev[0]
assert(cfg.bNumInterfaces > 0)
# access its interface
intf = cfg[(0,0)]
if intf is None:
intf = cfg[(0,1)]
# detach if necessary
intf_num = intf.bInterfaceNumber
if self.dev.is_kernel_driver_active(intf_num):
self.dev.detach_kernel_driver(intf_num)
usb.util.claim_interface(self.dev, intf_num)
try:
intf.set_altsetting()
except USBError:
pass
# initialize straight from the usb-dump
self.set_mode('data')
self.devh.controlMsg(CX_OUT, 0xb6, value=0x80, buffer=0)
self.devh.controlMsg(CX_OUT, 0xb5, value=0x80, buffer=0)
self.devh.controlMsg(CX_OUT, 0xb9, value=0x00, buffer="\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10")
self.dev.ctrl_transfer(CX_OUT, 0xb6, wValue=0x80, data_or_wLength=0)
self.dev.ctrl_transfer(CX_OUT, 0xb5, wValue=0x80, data_or_wLength=0)
self.dev.ctrl_transfer(CX_OUT, 0xb9, wValue=0x00, data_or_wLength="\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10")
self.set_slave_mode(False)
self.devh.controlMsg(CX_OUT, 0xd3, value=0x01, buffer=0)
self.devh.controlMsg(CX_OUT, 0xca, value=0x01, buffer=0)
self.devh.controlMsg(CX_OUT, 0xc8, value=0x01, buffer="\x00"*16)
self.dev.ctrl_transfer(CX_OUT, 0xd3, wValue=0x01, data_or_wLength=0)
self.dev.ctrl_transfer(CX_OUT, 0xca, wValue=0x01, data_or_wLength=0)
self.dev.ctrl_transfer(CX_OUT, 0xc8, wValue=0x01, data_or_wLength="\x00"*16)
self.set_common_reference()
self.set_common_ground()
self.set_calibration_mode('sine')
self.set_sampling_ferquency(128, [False for i in range(16)], None, None)
self.set_sampling_frequency(128, [False for i in range(16)], None, None)

def start(self):
self.devh.controlMsg(CX_OUT, 0xb5, value=0x08, buffer=0)
self.devh.controlMsg(CX_OUT, 0xf7, value=0x00, buffer=0)
self.dev.ctrl_transfer(CX_OUT, 0xb5, wValue=0x08, data_or_wLength=0)
self.dev.ctrl_transfer(CX_OUT, 0xf7, wValue=0x00, data_or_wLength=0)

def stop(self):
self.devh.controlMsg(CX_OUT, 0xb8, [])
self.dev.ctrl_transfer(CX_OUT, 0xb8, data_or_wLength=[])

def get_data(self):
"""Get data."""
Expand All @@ -101,7 +101,7 @@ def get_data(self):
size = 2028 #512
try:
# TODO what is the optimal timeout here?
data = self.devh.bulkRead(endpoint, size, 100)
data = self.dev.read(endpoint, size, timeout=100)
except usb.USBError:
data = []
data = ''.join(map(chr, data))
Expand All @@ -125,7 +125,7 @@ def get_channels(self):
def is_available():
for bus in usb.busses():
for device in bus.devices:
if (device.idVendor in [ID_VENDOR_GTEC, ID_VENDOR_GTEC2] and
if (device.idVendor in (ID_VENDOR_GTEC, ID_VENDOR_GTEC2) and
device.idProduct == ID_PRODUCT_GUSB_AMP):
return True
return False
Expand All @@ -135,24 +135,23 @@ def is_available():
###########################################################################

def set_mode(self, mode):
"""Set mode, 'impedance', 'data'."""
"""Set mode, 'impedance', 'calibrate', 'data'."""
if mode == 'impedance':
self.devh.controlMsg(CX_OUT, 0xc9, value=0x00, buffer=0)
self.devh.controlMsg(CX_OUT, 0xc2, value=0x03, buffer=0)
self.dev.ctrl_transfer(CX_OUT, 0xc9, wValue=0x00, data_or_wLength=0)
self.dev.ctrl_transfer(CX_OUT, 0xc2, wValue=0x03, data_or_wLength=0)
self.mode = 'impedance'
elif mode == 'calibrate':
self.devh.controlMsg(CX_OUT, 0xc1, value=0x00, buffer=0)
self.devh.controlMsg(CX_OUT, 0xc2, value=0x02, buffer=0)
self.dev.ctrl_transfer(CX_OUT, 0xc1, wValue=0x00, data_or_wLength=0)
self.dev.ctrl_transfer(CX_OUT, 0xc2, wValue=0x02, data_or_wLength=0)
self.mode = 'calibration'
elif mode == 'data':
self.devh.controlMsg(CX_OUT, 0xc0, value=0x00, buffer=0)
self.devh.controlMsg(CX_OUT, 0xc2, value=0x01, buffer=0)
self.dev.ctrl_transfer(CX_OUT, 0xc0, wValue=0x00, data_or_wLength=0)
self.dev.ctrl_transfer(CX_OUT, 0xc2, wValue=0x01, data_or_wLength=0)
self.mode = 'data'
else:
raise AmpError('Unknown mode: %s' % mode)


def set_sampling_ferquency(self, fs, channels, bpfilter, notchfilter):
def set_sampling_frequency(self, fs, channels, bpfilter, notchfilter):
""" Set the sampling frequency and filters for individual channels.

Parameters:
Expand Down Expand Up @@ -198,40 +197,38 @@ def set_sampling_ferquency(self, fs, channels, bpfilter, notchfilter):

# set the filters for all channels
if bpfilter == notchfilter == None:
self.devh.controlMsg(CX_OUT, 0xc6, value=0x01, buffer=bp_filter)
self.devh.controlMsg(CX_OUT, 0xc7, value=0x01, buffer=bs_filter)
self.dev.ctrl_transfer(CX_OUT, 0xc6, wValue=0x01, data_or_wLength=bp_filter)
self.dev.ctrl_transfer(CX_OUT, 0xc7, wValue=0x01, data_or_wLength=bs_filter)
else:
idx = 1
for i in channels:
if i:
self.devh.controlMsg(CX_OUT, 0xc6, value=idx, buffer=bp_filter)
self.devh.controlMsg(CX_OUT, 0xc7, value=idx, buffer=bs_filter)
self.dev.ctrl_transfer(CX_OUT, 0xc6, wValue=idx, data_or_wLength=bp_filter)
self.dev.ctrl_transfer(CX_OUT, 0xc7, wValue=idx, data_or_wLength=bs_filter)
idx += 1

# set the sampling frequency
self.devh.controlMsg(CX_OUT, 0xb6, value=fs, buffer=0)

self.dev.ctrl_transfer(CX_OUT, 0xb6, wValue=fs, data_or_wLength=0)

def set_calibration_mode(self, mode):
# buffer: [0x03, 0xd0, 0x07, 0x02, 0x00, 0xff, 0x07]
# ==== ==========
# (1) mode:
# (2) amplitude: little endian (0x07d0 = 2000)
if mode == 'sine':
self.devh.controlMsg(CX_OUT, 0xcb, value=0x00, buffer="\x03\xd0\x07\x02\x00\xff\x07")
self.dev.ctrl_transfer(CX_OUT, 0xcb, wValue=0x00, data_or_wLength="\x03\xd0\x07\x02\x00\xff\x07")
elif mode == 'sawtooth':
self.devh.controlMsg(CX_OUT, 0xcb, value=0x00, buffer="\x02\xd0\x07\x02\x00\xff\x07")
self.dev.ctrl_transfer(CX_OUT, 0xcb, wValue=0x00, data_or_wLength="\x02\xd0\x07\x02\x00\xff\x07")
elif mode == 'whitenoise':
self.devh.controlMsg(CX_OUT, 0xcb, value=0x00, buffer="\x05\xd0\x07\x02\x00\xff\x07")
self.dev.ctrl_transfer(CX_OUT, 0xcb, wValue=0x00, data_or_wLength="\x05\xd0\x07\x02\x00\xff\x07")
elif mode == 'square':
self.devh.controlMsg(CX_OUT, 0xcb, value=0x00, buffer="\x01\xd0\x07\x02\x00\xff\x07")
self.dev.ctrl_transfer(CX_OUT, 0xcb, wValue=0x00, data_or_wLength="\x01\xd0\x07\x02\x00\xff\x07")
else:
raise AmpError('Unknown mode: %s' % mode)

def calculate_impedance(self, u_measured, u_applied=1e4):
return (u_measured * 1e6) / (u_applied - u_measured) - 1e4


def set_common_ground(self, a=False, b=False, c=False, d=False):
"""Set common ground for the electrodes.

Expand All @@ -241,8 +238,7 @@ def set_common_ground(self, a=False, b=False, c=False, d=False):

"""
v = (d << 3) + (c << 2) + (b << 1) + a
self.devh.controlMsg(CX_OUT, 0xbe, value=v, buffer=0)

self.dev.ctrl_transfer(CX_OUT, 0xbe, wValue=v, data_or_wLength=0)

def set_common_reference(self, a=False, b=False, c=False, d=False):
"""Set common reference for the electrodes.
Expand All @@ -253,8 +249,7 @@ def set_common_reference(self, a=False, b=False, c=False, d=False):

"""
v = (d << 3) + (c << 2) + (b << 1) + a
self.devh.controlMsg(CX_OUT, 0xbf, value=v, buffer=0)

self.dev.ctrl_transfer(CX_OUT, 0xbf, wValue=v, data_or_wLength=0)

def set_slave_mode(self, slave):
"""Set amp into slave or master mode.
Expand All @@ -264,15 +259,13 @@ def set_slave_mode(self, slave):

"""
v = 1 if slave else 0
self.devh.controlMsg(CX_OUT, 0xcd, value=v, buffer=0)
self.dev.ctrl_transfer(CX_OUT, 0xcd, wValue=v, data_or_wLength=0)


class AmpError(Exception):
pass




def main():
amp = GUSBamp()
amp.start()
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ scipy>=0.10.1
sphinx
# amplifier
crypto>=1.0.0
pyusb>=1.0.0a3
pyusb>=1.0.0
pylsl>=1.10.4