# Copyright (C) 2022 Harald Welte # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. import abc from ice40usbtrace import USBPacketHandler from ice40usbtrace.packet import * from ice40usbtrace.utils import b2h class USBTransfer(abc.ABC): """Base Class for a USB Transfer. Used by derived classes to represent IN/OUT/SETUP transfers.""" def __init__(self, addr:int, ep:int): self.ep = ep self.addr = addr self.data = None def __str__(self): return '%03d %02x %5s %s' % (self.addr, self.ep, self.name, self.to_str()) class USBTransferINOUT(USBTransfer): def to_str(self): return b2h(self.data) class USBTransferIN(USBTransferINOUT): name = 'IN' class USBTransferOUT(USBTransferINOUT): name = 'OUT' class USBTransferSETUP(USBTransferINOUT): name = 'SETUP' class USBTransferHandler(abc.ABC): """Base class for deriving classes of USB transfer handlers.""" @abc.abstractmethod def handle_transfer(self, transfer: USBTransfer): """Method called for each USB Transfer.""" pass class USBTransferHandlerPrint(USBTransferHandler): """Simple USBTransferHandler that prints each transfer.""" def handle_transfer(self, transfer: USBTransfer): print(str(transfer)) class TransferCombiner(USBPacketHandler): """Class to combine a stream of USB packets into USB transfers.""" def __init__(self, transfer_handler: USBTransferHandler = USBTransferHandlerPrint()): self.reset() self.transfer_handler = transfer_handler def reset(self): self.state = None self.xfer = None def handle_packet(self, packet: USBPacket): if isinstance(packet, USBPacket_SOF): self.reset() return # FIXME: the state machine is way too simplistic. It needs to be extended to # properly cover all the various USB transactions if self.state == None: if isinstance(packet, USBPacket_IN): self.xfer = USBTransferIN(packet.addr, packet.ep) self.state = 'IN' elif isinstance(packet, USBPacket_OUT): self.xfer = USBTransferOUT(packet.addr, packet.ep) self.state = 'OUT' elif isinstance(packet, USBPacket_SETUP): self.xfer = USBTransferSETUP(packet.addr, packet.ep) self.state = 'SETUP' elif self.state in ['IN', 'OUT', 'SETUP']: if isinstance(packet, USBPacket_NAK): self.reset() elif isinstance(packet, USBPacket_DATA): self.xfer.data = packet.data self.transfer_handler.handle_transfer(self.xfer) # FIXME: check for final ACK/NAK self.reset()