# Copyright (C) 2022 Sylvain Munaut and Harald Welte # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. import time from typing import Tuple from ice40usbtrace.packet import * PID_OUT = 0b0001 PID_IN = 0b1001 PID_SOF = 0b0101 PID_SETUP = 0b1101 PID_DATA0 = 0b0011 PID_DATA1 = 0b1011 PID_ACK = 0b0010 PID_NAK = 0b1010 PID_STALL = 0b1110 PID = { PID_OUT : "OUT", PID_IN : "IN", PID_SOF : "SOF", PID_SETUP : "SETUP", PID_DATA0 : "DATA0", PID_DATA1 : "DATA1", PID_ACK : "ACK", PID_NAK : "NAK", PID_STALL : "STALL", } class USBPacketParser: def __init__(self): self.ts = 0 self.frame_ts = 0 self.frame_no = 0 def parse_packet(self, buffer: bytes, ofs: int = 0) -> Tuple[USBPacket, int]: """Parse a packet from the start of 'buffer'. Args: buffer: Byte-buffer containing at least one complete header (4 bytes) at offset 'ofs' ofs: Offset in 'buffer' from which to parse the packet Returns: integer value indicating overall length of packet at 'ofs'. 0 in case packet is incomplete. """ # Readoff header hdr = USBPacketHdr.from_bytes(buffer[ofs+0:ofs+4]) # Timestamp self.ts = hdr.augment(self.frame_no, self.ts, self.frame_ts) # Default values plen = 4 # What PID ? if hdr.pid in [ PID_DATA0, PID_DATA1 ]: # Is it all here ? plen = 4 + hdr.dat if (len(buffer) - ofs) < plen: return None, 0 if hdr.pid == PID_DATA0: parsed = USBPacket_DATA0(hdr, buffer[ofs+4:ofs+4+hdr.dat]) else: parsed = USBPacket_DATA1(hdr, buffer[ofs+4:ofs+4+hdr.dat]) elif hdr.pid == PID_ACK: parsed = USBPacket_ACK(hdr) elif hdr.pid == PID_NAK: parsed = USBPacket_NAK(hdr) elif hdr.pid == PID_STALL: parsed = USBPacket_STALL(hdr) elif hdr.pid == PID_OUT: parsed = USBPacket_OUT(hdr) elif hdr.pid == PID_IN: parsed = USBPacket_IN(hdr) elif hdr.pid == PID_SETUP: parsed = USBPacket_SETUP(hdr) elif hdr.pid == PID_SOF: parsed = USBPacket_SOF(hdr) # Save new time self.frame_ts = self.ts self.frame_no = hdr.dat elif hdr.pid == 0: # Don't print TS overflow packets # FIXME: Line state reported in hdr_dat, could be used to # find bus resets / idle / ... if hdr.ok is True: return None, plen else: # Bad stream, discard a byte and retry return None, 1 return parsed, plen