chromium/tools/usb_gadget/echo_gadget.py

# Copyright 2014 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""USB echo gadget module.

This gadget has pairs of IN/OUT endpoints that echo packets back to the host.
"""

import uuid

import composite_gadget
import usb_constants
import usb_descriptors


class EchoCompositeFeature(composite_gadget.CompositeFeature):
  """Composite device feature that echos data back to the host.
  """

  def __init__(self, endpoints):
    """Create an echo gadget.
    """
    fs_interfaces = []
    hs_interfaces = []

    if len(endpoints) >= 1:
      iface_num, iface_string, in_endpoint, out_endpoint = endpoints[0]
      fs_intr_interface_desc = usb_descriptors.InterfaceDescriptor(
          bInterfaceNumber=iface_num,
          bInterfaceClass=usb_constants.DeviceClass.VENDOR,
          bInterfaceSubClass=0,
          bInterfaceProtocol=0,
          iInterface=iface_string,
      )
      fs_intr_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor(
          bEndpointAddress=out_endpoint,
          bmAttributes=usb_constants.TransferType.INTERRUPT,
          wMaxPacketSize=64,
          bInterval=1  # 1ms
      ))
      fs_intr_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor(
          bEndpointAddress=in_endpoint,
          bmAttributes=usb_constants.TransferType.INTERRUPT,
          wMaxPacketSize=64,
          bInterval=1  # 1ms
      ))
      fs_interfaces.append(fs_intr_interface_desc)

      hs_intr_interface_desc = usb_descriptors.InterfaceDescriptor(
          bInterfaceNumber=iface_num,
          bInterfaceClass=usb_constants.DeviceClass.VENDOR,
          bInterfaceSubClass=0,
          bInterfaceProtocol=0,
          iInterface=iface_string
      )
      hs_intr_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor(
          bEndpointAddress=out_endpoint,
          bmAttributes=usb_constants.TransferType.INTERRUPT,
          wMaxPacketSize=64,
          bInterval=4  # 1ms
      ))
      hs_intr_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor(
          bEndpointAddress=in_endpoint,
          bmAttributes=usb_constants.TransferType.INTERRUPT,
          wMaxPacketSize=64,
          bInterval=4  # 1ms
      ))
      hs_interfaces.append(hs_intr_interface_desc)

    if len(endpoints) >= 2:
      iface_num, iface_string, in_endpoint, out_endpoint = endpoints[1]
      fs_bulk_interface_desc = usb_descriptors.InterfaceDescriptor(
          bInterfaceNumber=iface_num,
          bInterfaceClass=usb_constants.DeviceClass.VENDOR,
          bInterfaceSubClass=0,
          bInterfaceProtocol=0,
          iInterface=iface_string
      )
      fs_bulk_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor(
          bEndpointAddress=out_endpoint,
          bmAttributes=usb_constants.TransferType.BULK,
          wMaxPacketSize=64,
          bInterval=0
      ))
      fs_bulk_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor(
          bEndpointAddress=in_endpoint,
          bmAttributes=usb_constants.TransferType.BULK,
          wMaxPacketSize=64,
          bInterval=0
      ))
      fs_interfaces.append(fs_bulk_interface_desc)

      hs_bulk_interface_desc = usb_descriptors.InterfaceDescriptor(
          bInterfaceNumber=iface_num,
          bInterfaceClass=usb_constants.DeviceClass.VENDOR,
          bInterfaceSubClass=0,
          bInterfaceProtocol=0,
          iInterface=iface_string
      )
      hs_bulk_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor(
          bEndpointAddress=out_endpoint,
          bmAttributes=usb_constants.TransferType.BULK,
          wMaxPacketSize=512,
          bInterval=0
      ))
      hs_bulk_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor(
          bEndpointAddress=in_endpoint,
          bmAttributes=usb_constants.TransferType.BULK,
          wMaxPacketSize=512,
          bInterval=0
      ))
      hs_interfaces.append(hs_bulk_interface_desc)

    if len(endpoints) >= 3:
      iface_num, iface_string, in_endpoint, out_endpoint = endpoints[2]
      fs_interfaces.append(usb_descriptors.InterfaceDescriptor(
          bInterfaceNumber=iface_num,
          bInterfaceClass=usb_constants.DeviceClass.VENDOR,
          bInterfaceSubClass=0,
          bInterfaceProtocol=0,
          iInterface=iface_string
      ))
      fs_isoc_interface_desc = usb_descriptors.InterfaceDescriptor(
          bInterfaceNumber=iface_num,
          bAlternateSetting=1,
          bInterfaceClass=usb_constants.DeviceClass.VENDOR,
          bInterfaceSubClass=0,
          bInterfaceProtocol=0,
          iInterface=iface_string
      )
      fs_isoc_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor(
          bEndpointAddress=out_endpoint,
          bmAttributes=usb_constants.TransferType.ISOCHRONOUS,
          wMaxPacketSize=1023,
          bInterval=1  # 1ms
      ))
      fs_isoc_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor(
          bEndpointAddress=in_endpoint,
          bmAttributes=usb_constants.TransferType.ISOCHRONOUS,
          wMaxPacketSize=1023,
          bInterval=1  # 1ms
      ))
      fs_interfaces.append(fs_isoc_interface_desc)

      hs_interfaces.append(usb_descriptors.InterfaceDescriptor(
          bInterfaceNumber=iface_num,
          bInterfaceClass=usb_constants.DeviceClass.VENDOR,
          bInterfaceSubClass=0,
          bInterfaceProtocol=0,
          iInterface=iface_string
      ))
      hs_isoc_interface_desc = usb_descriptors.InterfaceDescriptor(
          bInterfaceNumber=iface_num,
          bAlternateSetting=1,
          bInterfaceClass=usb_constants.DeviceClass.VENDOR,
          bInterfaceSubClass=0,
          bInterfaceProtocol=0,
          iInterface=iface_string
      )
      hs_isoc_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor(
          bEndpointAddress=out_endpoint,
          bmAttributes=usb_constants.TransferType.ISOCHRONOUS,
          wMaxPacketSize=512,
          bInterval=4  # 1ms
      ))
      hs_isoc_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor(
          bEndpointAddress=in_endpoint,
          bmAttributes=usb_constants.TransferType.ISOCHRONOUS,
          wMaxPacketSize=512,
          bInterval=4  # 1ms
      ))
      hs_interfaces.append(hs_isoc_interface_desc)

    super(EchoCompositeFeature, self).__init__(fs_interfaces, hs_interfaces)

  def ReceivePacket(self, endpoint, data):
    """Echo a packet back to the host.

    Args:
      endpoint: Incoming endpoint (must be an OUT pipe).
      data: Packet data.
    """
    assert endpoint & usb_constants.Dir.IN == 0

    self.SendPacket(endpoint | usb_constants.Dir.IN, data)


class EchoGadget(composite_gadget.CompositeGadget):
  """Echo gadget.
  """

  def __init__(self):
    """Create an echo gadget.
    """
    device_desc = usb_descriptors.DeviceDescriptor(
        idVendor=usb_constants.VendorID.GOOGLE,
        idProduct=usb_constants.ProductID.GOOGLE_ECHO_GADGET,
        bcdUSB=0x0200,
        iManufacturer=1,
        iProduct=2,
        iSerialNumber=3,
        bcdDevice=0x0100)

    feature = EchoCompositeFeature(
        endpoints=[(0, 4, 0x81, 0x01), (1, 5, 0x82, 0x02), (2, 6, 0x83, 0x03)])

    super(EchoGadget, self).__init__(device_desc, [feature])
    self.AddStringDescriptor(1, 'Google Inc.')
    self.AddStringDescriptor(2, 'Echo Gadget')
    self.AddStringDescriptor(3, '{:06X}'.format(uuid.getnode()))
    self.AddStringDescriptor(4, 'Interrupt Echo')
    self.AddStringDescriptor(5, 'Bulk Echo')
    self.AddStringDescriptor(6, 'Isochronous Echo')

    # Enable Microsoft OS Descriptors for Windows 8 and above.
    self.EnableMicrosoftOSDescriptorsV1(vendor_code=0x01)
    # These are used to force Windows to load WINUSB.SYS for the echo functions.
    self.SetMicrosoftCompatId(0, 'WINUSB')
    self.SetMicrosoftCompatId(1, 'WINUSB')
    self.SetMicrosoftCompatId(2, 'WINUSB')

    self.AddDeviceCapabilityDescriptor(usb_descriptors.ContainerIdDescriptor(
        ContainerID=uuid.uuid4().bytes_le))

def RegisterHandlers():
  """Registers web request handlers with the application server."""

  import server
  from tornado import web

  class WebConfigureHandler(web.RequestHandler):

    def post(self):
      server.SwitchGadget(EchoGadget())

  server.app.add_handlers('.*$', [
      (r'/echo/configure', WebConfigureHandler),
  ])