"""Lightweight PyUSB helpers for working with UVC cameras.
The goal of this module is to provide a thin, well-documented layer on top of
PyUSB that understands the UVC descriptor layout and the standard probing
protocol. It is intentionally minimal so that example scripts can reuse the
parsing and streaming logic without pulling in the full libuvc bindings.
"""
from __future__ import annotations
import contextlib
import ctypes
import dataclasses
import errno
import json
import logging
import os
import pathlib
import queue
import threading
import time
from typing import Callable, Dict, Iterable, Iterator, List, Optional, Set, Tuple, Union
import usb.core
import usb.util
import usb1
try: # Optional dependency for MJPEG preview
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib
GST_AVAILABLE = True
except (ImportError, ValueError):
GST_AVAILABLE = False
LOG = logging.getLogger(__name__)
__all__: List[str]
__version__ = "0.1.0"
_AUTO_DETACH_VC = os.environ.get("LIBUSB_UVC_AUTO_DETACH_VC", "1").strip().lower() not in {
"0",
"false",
"no",
"off",
}
if not _AUTO_DETACH_VC:
LOG.debug("Auto-detach of VC interfaces disabled via LIBUSB_UVC_AUTO_DETACH_VC")
_LIBUSB_HOTPLUG_DISABLED = False
_LIBUSB_HOTPLUG_ATTEMPTED = False
def _auto_detach_vc_enabled() -> bool:
return _AUTO_DETACH_VC
[docs]
def load_quirks() -> Dict[str, dict]:
"""Load per-GUID control definitions from the packaged quirks directory."""
base_dir = pathlib.Path(__file__).resolve().parent
candidate_dirs = [
base_dir / "quirks",
base_dir / "src" / "libusb_uvc" / "quirks",
]
try: # pragma: no cover - importlib availability
import importlib.util
spec = importlib.util.find_spec("libusb_uvc")
if spec and spec.submodule_search_locations:
candidate_dirs.append(pathlib.Path(spec.submodule_search_locations[0]) / "quirks")
except Exception:
pass
quirks_dir = None
for candidate in candidate_dirs:
if candidate.is_dir():
quirks_dir = candidate
break
quirks: Dict[str, dict] = {}
if quirks_dir is None:
return quirks
for json_path in sorted(quirks_dir.glob("*.json")):
try:
with json_path.open("r", encoding="utf-8") as handle:
data = json.load(handle)
except (OSError, json.JSONDecodeError) as exc:
LOG.warning("Failed to load quirks file %s: %s", json_path, exc)
continue
guid = str(data.get("guid", "")).lower()
if not guid:
LOG.debug("Skipping quirks file %s without GUID", json_path)
continue
quirks[guid] = data
return quirks
def _disable_hotplug_and_get_backend():
"""Try to reinitialise libusb without the udev hotplug monitor.
Some sandboxes block access to udev, causing ``libusb_init`` to return
``LIBUSB_ERROR_OTHER``. In that situation we ask libusb to skip device
discovery so that PyUSB can still enumerate already-present devices.
``usb.core.find`` raises :class:`usb.core.NoBackendError` when this happens.
"""
global _LIBUSB_HOTPLUG_ATTEMPTED, _LIBUSB_HOTPLUG_DISABLED
if _LIBUSB_HOTPLUG_DISABLED or _LIBUSB_HOTPLUG_ATTEMPTED:
from usb.backend import libusb1 # lazy import to avoid circular refs
backend = libusb1.get_backend()
return backend
_LIBUSB_HOTPLUG_ATTEMPTED = True
try:
libusb = ctypes.CDLL("libusb-1.0.so.0")
except OSError:
return None
set_option = getattr(libusb, "libusb_set_option", None)
if set_option is None:
return None
try:
set_option.argtypes = [ctypes.c_void_p, ctypes.c_int]
set_option.restype = ctypes.c_int
except AttributeError:
return None
# LIBUSB_OPTION_NO_DEVICE_DISCOVERY. Passing NULL targets the default
# context before its first initialisation attempt.
if set_option(None, 2) != 0:
return None
from usb.backend import libusb1
# Reset internal module state so the next get_backend() call retries the
# initialisation path. These attributes are considered private but this is
# the only reliable way to request a fresh backend in PyUSB today.
libusb1._lib = None # type: ignore[attr-defined]
libusb1._lib_object = None # type: ignore[attr-defined]
backend = libusb1.get_backend()
if backend is not None:
_LIBUSB_HOTPLUG_DISABLED = True
return backend
[docs]
class CodecPreference(str):
"""Simple codec discriminator used when selecting a stream format."""
AUTO = "auto"
YUYV = "yuyv"
MJPEG = "mjpeg"
FRAME_BASED = "frame-based"
H264 = "h264"
H265 = "h265"
[docs]
class DecoderPreference(str):
"""Optional decoder selection for compressed payloads."""
AUTO = "auto"
NONE = "none"
PYAV = "pyav"
GSTREAMER = "gstreamer"
def _normalise_decoder_preference(
preference: Optional[Union[str, DecoderPreference, Iterable[str]]]
) -> Optional[List[str]]:
if preference is None:
return []
if isinstance(preference, DecoderPreference):
tokens = [preference.value]
elif isinstance(preference, str):
tokens = [token.strip().lower() for token in preference.split(",") if token.strip()]
elif isinstance(preference, Iterable):
tokens = [str(token).strip().lower() for token in preference if str(token).strip()]
else:
tokens = [str(preference).strip().lower()]
if not tokens:
return []
tokens = [token for token in tokens if token]
if any(token == DecoderPreference.NONE for token in tokens):
return None
tokens = [token for token in tokens if token != DecoderPreference.AUTO]
deduped: List[str] = []
for token in tokens:
if token and token not in deduped:
deduped.append(token)
return deduped
# --- UVC Constants ---
UVC_CLASS = 0x0E
VC_SUBCLASS = 0x01
VS_SUBCLASS = 0x02
CS_INTERFACE = 0x24
# Video Control (VC) descriptor subtypes
VC_HEADER = 0x01
VC_INPUT_TERMINAL = 0x02
VC_OUTPUT_TERMINAL = 0x03
VC_SELECTOR_UNIT = 0x04
VC_PROCESSING_UNIT = 0x05
VC_EXTENSION_UNIT = 0x06
# Video Streaming (VS) descriptor subtypes
VS_INPUT_HEADER = 0x01
VS_FORMAT_UNCOMPRESSED = 0x04
VS_FRAME_UNCOMPRESSED = 0x05
VS_FORMAT_MJPEG = 0x06
VS_FRAME_MJPEG = 0x07
VS_FORMAT_FRAME_BASED = 0x10
VS_FRAME_FRAME_BASED = 0x11
# Still image format/frame descriptor subtypes
VS_FORMAT_UNCOMPRESSED_STILL = 0x30
VS_FRAME_UNCOMPRESSED_STILL = 0x31
VS_FORMAT_MJPEG_STILL = 0x32
VS_FRAME_MJPEG_STILL = 0x33
VS_STILL_IMAGE_FRAME_DESCRIPTOR = 0x03
# UVC Payload Header constants
BH_FID = 0x01
BH_EOF = 0x02
BH_PTS = 0x04
BH_SCR = 0x08
BH_RES = 0x10
BH_STI = 0x20
BH_ERR = 0x40
BH_EOH = 0x80
# UVC Request Codes for control transfers
SET_CUR = 0x01
GET_CUR = 0x81
GET_MIN = 0x82
GET_MAX = 0x83
GET_RES = 0x84
GET_LEN = 0x85
GET_INFO = 0x86
GET_DEF = 0x87
# VideoStreaming control selectors
VS_PROBE_CONTROL = 0x01
VS_COMMIT_CONTROL = 0x02
VS_STILL_PROBE_CONTROL = 0x03
VS_STILL_COMMIT_CONTROL = 0x04
VS_STILL_IMAGE_TRIGGER_CONTROL = 0x05
# Standard UVC Control Selectors
# (Incomplete list, add more as needed)
# Camera Terminal (CT) Controls
CT_EXPOSURE_TIME_ABSOLUTE_CONTROL = 0x04
CT_ZOOM_ABSOLUTE_CONTROL = 0x0B
# Processing Unit (PU) Controls
PU_BRIGHTNESS_CONTROL = 0x02
PU_CONTRAST_CONTROL = 0x03
PU_GAIN_CONTROL = 0x04
PU_WHITE_BALANCE_TEMPERATURE_CONTROL = 0x0A
PU_WHITE_BALANCE_TEMPERATURE_AUTO_CONTROL = 0x0B
UVC_CONTROL_MAPPING = {
"Camera Terminal": {
2: "Auto Exposure Mode",
3: "Auto Exposure Priority",
4: "Exposure Time, Absolute",
11: "Zoom, Absolute",
},
"Processing Unit": {
1: "Backlight Compensation",
2: "Brightness",
3: "Contrast",
4: "Gain",
5: "Power Line Frequency",
6: "Hue",
7: "Saturation",
8: "Sharpness",
9: "Gamma",
10: "White Balance Temperature",
11: "White Balance Temperature, Auto",
}
}
# Pre-computed request types used for control transfers on interfaces
REQ_TYPE_IN = usb.util.build_request_type(
usb.util.CTRL_IN, usb.util.CTRL_TYPE_CLASS, usb.util.CTRL_RECIPIENT_INTERFACE
)
REQ_TYPE_OUT = usb.util.build_request_type(
usb.util.CTRL_OUT, usb.util.CTRL_TYPE_CLASS, usb.util.CTRL_RECIPIENT_INTERFACE
)
[docs]
class UVCError(RuntimeError):
"""Raised when the camera reports unexpected errors."""
# --- Data Structures for Descriptors ---
[docs]
@dataclasses.dataclass
class FrameInfo:
"""Frame descriptor summary collected from a VS frame descriptor."""
frame_index: int
width: int
height: int
default_interval: int
intervals_100ns: List[int]
max_frame_size: int
bm_capabilities: int = 0
[docs]
def intervals_hz(self) -> List[float]:
unique = sorted({v for v in self.intervals_100ns if v})
return [_interval_to_hz(v) for v in unique]
@property
def intervals(self) -> List[float]:
"""Backward compatibility alias returning frame intervals in Hz."""
return self.intervals_hz()
@property
def supports_still(self) -> bool:
"""Return True when the frame advertises still-image support."""
return bool(self.bm_capabilities & 0x01)
[docs]
def pick_interval(
self,
target_fps: Optional[float],
*,
strict: bool = False,
tolerance_hz: float = 1e-3,
) -> int:
"""Return the closest advertised frame interval to ``target_fps``."""
intervals = [value for value in self.intervals_100ns if value]
if not intervals:
if self.default_interval:
return self.default_interval
if target_fps and target_fps > 0:
return int(round(1e7 / target_fps))
raise ValueError("Frame descriptor does not advertise any intervals")
if target_fps is None or target_fps <= 0:
return self.default_interval or intervals[0]
target_interval = int(round(1e7 / target_fps))
best = min(intervals, key=lambda value: abs(value - target_interval))
if strict:
actual_fps = _interval_to_hz(best)
if abs(actual_fps - target_fps) > tolerance_hz:
raise ValueError(
f"No advertised frame interval matches {target_fps} fps (closest {actual_fps:.6f} fps)"
)
return best
@dataclasses.dataclass
class StillFrameInfo:
"""Still image frame descriptor (Method 2)."""
width: int
height: int
endpoint_address: int
frame_index: int
compression_indices: List[int] = dataclasses.field(default_factory=list)
format_index: int = 0
format_subtype: Optional[int] = None
max_frame_size: int = 0
[docs]
@dataclasses.dataclass
class AltSettingInfo:
"""Information about an alternate streaming interface setting."""
alternate_setting: int
endpoint_address: Optional[int]
endpoint_attributes: Optional[int]
max_packet_size: int
[docs]
def is_isochronous(self) -> bool:
if self.endpoint_attributes is None:
return False
return usb.util.endpoint_type(self.endpoint_attributes) == usb.util.ENDPOINT_TYPE_ISO
[docs]
@dataclasses.dataclass
class StreamingInterface:
"""Grouping of the per-interface formats and alternate settings."""
interface_number: int
formats: List[StreamFormat] = dataclasses.field(default_factory=list)
alt_settings: List[AltSettingInfo] = dataclasses.field(default_factory=list)
[docs]
def get_alt(self, alternate_setting: int) -> Optional[AltSettingInfo]:
for alt in self.alt_settings:
if alt.alternate_setting == alternate_setting:
return alt
return None
[docs]
def find_alt_by_endpoint(self, endpoint_address: int) -> Optional[AltSettingInfo]:
for alt in self.alt_settings:
if alt.endpoint_address == endpoint_address:
return alt
return None
[docs]
def select_alt_for_payload(self, required_payload: int) -> Optional[AltSettingInfo]:
candidates = [alt for alt in self.alt_settings if alt.max_packet_size]
if not candidates:
return None
for alt in sorted(candidates, key=lambda a: a.max_packet_size):
if alt.max_packet_size >= required_payload:
return alt
return max(candidates, key=lambda a: a.max_packet_size)
[docs]
def find_frame(
self, width: int, height: int, *, format_index: Optional[int] = None, subtype: Optional[int] = None
) -> Optional[Tuple[StreamFormat, FrameInfo]]:
for fmt in self.formats:
if format_index is not None and fmt.format_index != format_index:
continue
if subtype is not None and fmt.subtype != subtype:
continue
for frame in fmt.frames:
if frame.width == width and frame.height == height:
return fmt, frame
return None
[docs]
def iter_still_frames(self) -> Iterator[Tuple[StreamFormat, FrameInfo]]:
for fmt in self.formats:
for frame in fmt.frames:
if frame.supports_still:
yield fmt, frame
[docs]
def find_still_frame(
self,
width: int,
height: int,
*,
format_index: Optional[int] = None,
subtype: Optional[int] = None,
) -> Optional[Tuple[StreamFormat, FrameInfo]]:
for fmt, frame in self.iter_still_frames():
if format_index is not None and fmt.format_index != format_index:
continue
if subtype is not None and fmt.subtype != subtype:
continue
if frame.width == width and frame.height == height:
return fmt, frame
return None
[docs]
@dataclasses.dataclass
class UVCControl:
"""Represents a discovered UVC control."""
unit_id: int
selector: int
name: str
type: str
[docs]
@dataclasses.dataclass
class UVCUnit:
"""Represents a Unit in the Video Control topology."""
unit_id: int
type: str
# --- CORRECTION: a default value here caused the TypeError in the child class ---
controls: List[UVCControl]
[docs]
@dataclasses.dataclass
class ExtensionUnit(UVCUnit):
"""Represents an Extension Unit with its specific GUID."""
guid: str
[docs]
@dataclasses.dataclass
class CapturedFrame:
"""Container returned by :meth:`UVCCamera.read_frame` and asynchronous streams."""
payload: Union[bytes, bytearray]
format: StreamFormat
frame: Union[FrameInfo, StillFrameInfo]
fid: int
pts: Optional[int]
timestamp: float = dataclasses.field(default_factory=time.time)
sequence: int = 0
_rgb_cache: Optional[object] = dataclasses.field(default=None, init=False, repr=False)
decoded: Optional[object] = dataclasses.field(default=None, repr=False)
[docs]
def to_bytes(self) -> bytes:
return bytes(self.payload)
[docs]
def to_rgb(self):
if self._rgb_cache is None:
if self.decoded is not None:
import numpy as _np
self._rgb_cache = _np.array(self.decoded, copy=True)
else:
self._rgb_cache = decode_to_rgb(self.payload, self.format, self.frame)
return self._rgb_cache
[docs]
def to_bgr(self):
try:
import cv2
except ImportError as exc:
raise RuntimeError("OpenCV is required for BGR conversion") from exc
rgb = self.to_rgb()
return cv2.cvtColor(rgb, cv2.COLOR_RGB2BGR)
@dataclasses.dataclass
class FrameAssemblyResult:
"""Result produced by :class:`FrameReassembler` for each completed frame."""
payload: Optional[bytearray]
fid: Optional[int]
pts: Optional[int]
reason: str
error: bool = False
duration: Optional[float] = None
[docs]
@dataclasses.dataclass
class StreamStats:
"""Cumulative counters describing a stream's behaviour."""
frames_completed: int = 0
frames_dropped: int = 0
bytes_delivered: int = 0
last_frame_duration_s: Optional[float] = None
average_frame_duration_s: Optional[float] = None
measured_frames: int = 0
last_drop_reason: Optional[str] = None
class FrameReassembler:
"""Stateful helper that converts UVC packets into complete frame payloads."""
def __init__(
self,
*,
expected_size: Optional[int],
max_payload_size: Optional[int] = None,
packet_limit: Optional[int] = None,
) -> None:
self._expected_size = expected_size
if packet_limit is not None:
self._packet_limit = packet_limit
elif expected_size and max_payload_size:
self._packet_limit = max(4, (expected_size // max_payload_size) + 16)
else:
self._packet_limit = None
self._buffer = bytearray()
self._current_fid: Optional[int] = None
self._current_pts: Optional[int] = None
self._frame_error = False
self._packets_seen = 0
self._frame_started_at: Optional[float] = None
def feed(self, packet: bytes) -> List[FrameAssemblyResult]:
results: List[FrameAssemblyResult] = []
if not packet:
return results
header_len = packet[0]
if header_len < 2 or header_len > len(packet):
result = self._finalize("bad-header")
if result:
results.append(result)
return results
flags = packet[1]
fid = flags & BH_FID
eof = bool(flags & BH_EOF)
err = bool(flags & BH_ERR)
payload = packet[header_len:]
if self._current_fid is None:
self._start_frame(fid, err)
elif fid != self._current_fid:
result = self._finalize("fid-toggle")
if result:
results.append(result)
self._start_frame(fid, err)
elif err:
self._frame_error = True
if flags & BH_PTS and header_len >= 6:
self._current_pts = int.from_bytes(packet[2:6], "little")
if payload:
self._buffer.extend(payload)
self._packets_seen += 1
if self._expected_size is not None and len(self._buffer) > self._expected_size:
self._frame_error = True
if self._packet_limit and self._packets_seen > self._packet_limit:
result = self._finalize("packet-limit")
if result:
results.append(result)
return results
if eof:
result = self._finalize("eof")
if result:
results.append(result)
return results
def _start_frame(self, fid: int, err: bool) -> None:
self._frame_started_at = time.monotonic()
self._buffer = bytearray()
self._current_fid = fid
self._frame_error = err
self._packets_seen = 0
self._current_pts = None
def _reset_state(self) -> None:
self._buffer = bytearray()
self._current_fid = None
self._frame_error = False
self._current_pts = None
self._packets_seen = 0
self._frame_started_at = None
def _finalize(self, reason: str) -> Optional[FrameAssemblyResult]:
if self._current_fid is None:
self._reset_state()
return None
payload: Optional[bytearray] = None
error = self._frame_error
duration = None
if self._frame_started_at is not None:
duration = max(0.0, time.monotonic() - self._frame_started_at)
if not error and self._buffer:
if self._expected_size is not None and len(self._buffer) != self._expected_size:
error = True
else:
payload = self._buffer
else:
error = True
result = FrameAssemblyResult(
payload=payload,
fid=self._current_fid,
pts=self._current_pts,
reason=reason,
error=error or payload is None,
duration=duration,
)
self._reset_state()
return result
[docs]
@dataclasses.dataclass
class ControlEntry:
"""Rich metadata describing a validated UVC control."""
interface_number: int
unit_id: int
selector: int
name: str
type: str
info: int
minimum: Optional[int]
maximum: Optional[int]
step: Optional[int]
default: Optional[int]
length: Optional[int]
raw_minimum: Optional[bytes] = None
raw_maximum: Optional[bytes] = None
raw_step: Optional[bytes] = None
raw_default: Optional[bytes] = None
metadata: Dict[str, object] = dataclasses.field(default_factory=dict)
[docs]
def is_writable(self) -> bool:
return bool(self.info & 0x02)
[docs]
def is_readable(self) -> bool:
return bool(self.info & 0x01)
[docs]
def find_uvc_devices(vid: Optional[int] = None, pid: Optional[int] = None) -> List[usb.core.Device]:
"""Return every USB device that looks like a UVC camera."""
try:
devices = usb.core.find(find_all=True)
except usb.core.NoBackendError:
backend = _disable_hotplug_and_get_backend()
if backend is None:
raise
devices = usb.core.find(find_all=True, backend=backend)
if devices is None:
return []
result = []
for dev in devices:
if vid is not None and dev.idVendor != vid:
continue
if pid is not None and dev.idProduct != pid:
continue
if any(intf.bInterfaceClass == UVC_CLASS for cfg in dev for intf in cfg):
result.append(dev)
return result
[docs]
def iter_video_streaming_interfaces(dev: usb.core.Device) -> Iterator[usb.core.Interface]:
"""Yield every interface whose class/subclass matches UVC streaming."""
for cfg in dev:
for intf in cfg:
if intf.bInterfaceClass == UVC_CLASS and intf.bInterfaceSubClass == VS_SUBCLASS:
yield intf
[docs]
def list_control_units(dev: usb.core.Device) -> Dict[int, List[UVCUnit]]:
"""Build UVCUnit descriptions for all Video Control interfaces on dev.
We try to detach the VC interface from the kernel driver (uvcvideo) so that
user-space control transfers work even when the module is loaded.
"""
unit_map: Dict[int, List[UVCUnit]] = {}
for cfg in dev:
for intf in cfg:
if intf.bInterfaceClass == UVC_CLASS and intf.bInterfaceSubClass == VC_SUBCLASS:
reattach = False
should_detach = _auto_detach_vc_enabled()
if should_detach:
try:
if dev.is_kernel_driver_active(intf.bInterfaceNumber):
dev.detach_kernel_driver(intf.bInterfaceNumber)
LOG.info(
"Detached kernel driver from VC interface %s",
intf.bInterfaceNumber,
)
reattach = True
except (usb.core.USBError, NotImplementedError, AttributeError):
reattach = False
else:
LOG.debug(
"Auto-detach disabled; reading VC interface %s without detaching kernel driver",
intf.bInterfaceNumber,
)
try:
if intf.bAlternateSetting == 0 and intf.extra_descriptors:
units = parse_vc_descriptors(bytes(intf.extra_descriptors))
if units:
unit_map[intf.bInterfaceNumber] = units
finally:
if reattach:
with contextlib.suppress(usb.core.USBError):
dev.attach_kernel_driver(intf.bInterfaceNumber)
return unit_map
def parse_vc_descriptors(extra: bytes) -> List[UVCUnit]:
"""Parse the raw `extra_descriptors` blob for a VC interface."""
units: List[UVCUnit] = []
idx = 0
while idx + 2 < len(extra):
length = extra[idx]
if length == 0 or idx + length > len(extra):
break
dtype = extra[idx + 1]
subtype = extra[idx + 2]
payload = extra[idx : idx + length]
if dtype == CS_INTERFACE:
unit = None
if subtype == VC_INPUT_TERMINAL:
unit = _parse_input_terminal(payload)
elif subtype == VC_PROCESSING_UNIT:
unit = _parse_processing_unit(payload)
elif subtype == VC_EXTENSION_UNIT:
unit = _parse_extension_unit(payload)
if unit:
units.append(unit)
idx += length
return units
def _parse_input_terminal(desc: bytes) -> Optional[UVCUnit]:
if len(desc) < 8:
return None
unit_id = desc[3]
controls = []
if len(desc) >= 18:
bitmap = int.from_bytes(desc[15:18], "little")
control_map = UVC_CONTROL_MAPPING.get("Camera Terminal", {})
for i in range(24):
if (bitmap >> i) & 1:
selector = i + 1
control_name = control_map.get(selector, f"Unknown Control Selector {selector}")
controls.append(
UVCControl(
unit_id=unit_id,
selector=selector,
name=control_name,
type="Camera Terminal",
)
)
return UVCUnit(unit_id=unit_id, type="Input Terminal", controls=controls)
def _parse_processing_unit(desc: bytes) -> Optional[UVCUnit]:
"""
Parse Processing Unit descriptor robustly:
- read bControlSize at offset 7
- read bmControls starting at offset 8 for bControlSize bytes
"""
if len(desc) < 10:
return None
unit_id = desc[3]
controls: List[UVCControl] = []
# Processing Unit: bControlSize @ offset 7, bmControls start @8
bControlSize = desc[7] if len(desc) > 7 else 0
ctrl_start = 8
ctrl_end = ctrl_start + bControlSize
if bControlSize > 0 and ctrl_end <= len(desc):
bitmap_bytes = desc[ctrl_start:ctrl_end]
bitmap = int.from_bytes(bitmap_bytes, "little")
control_map = UVC_CONTROL_MAPPING.get("Processing Unit", {})
max_bits = 8 * bControlSize
for i in range(max_bits):
if (bitmap >> i) & 1:
selector = i + 1
control_name = control_map.get(selector, f"Unknown Control Selector {selector}")
controls.append(
UVCControl(
unit_id=unit_id,
selector=selector,
name=control_name,
type="Processing Unit",
)
)
return UVCUnit(unit_id=unit_id, type="Processing Unit", controls=controls)
def _parse_extension_unit(desc: bytes) -> Optional[ExtensionUnit]:
"""Parse an Extension Unit descriptor to find its GUID."""
if len(desc) < 24:
return None
unit_id = desc[3]
guid_bytes = desc[4:20]
guid_str = (
f"{guid_bytes[3]:02x}{guid_bytes[2]:02x}{guid_bytes[1]:02x}{guid_bytes[0]:02x}-"
f"{guid_bytes[5]:02x}{guid_bytes[4]:02x}-"
f"{guid_bytes[7]:02x}{guid_bytes[6]:02x}-"
f"{guid_bytes[8]:02x}{guid_bytes[9]:02x}-"
f"{guid_bytes[10]:02x}{guid_bytes[11]:02x}{guid_bytes[12]:02x}{guid_bytes[13]:02x}{guid_bytes[14]:02x}{guid_bytes[15]:02x}"
)
controls: List[UVCControl] = []
b_num_controls = desc[20] if len(desc) > 20 else 0
b_nr_in_pins = desc[21] if len(desc) > 21 else 0
control_size_offset = 22 + b_nr_in_pins
if control_size_offset >= len(desc):
return ExtensionUnit(unit_id=unit_id, type="Extension Unit", guid=guid_str, controls=controls)
b_control_size = desc[control_size_offset]
controls_offset = control_size_offset + 1
controls_end = controls_offset + b_control_size
if controls_end > len(desc):
controls_end = len(desc)
bitmap_bytes = desc[controls_offset:controls_end]
bitmap = int.from_bytes(bitmap_bytes, "little") if bitmap_bytes else 0
max_bits = max(b_num_controls, 8 * b_control_size)
for index in range(max_bits):
if b_num_controls and index >= b_num_controls:
break
if (bitmap >> index) & 1:
selector = index + 1
controls.append(
UVCControl(
unit_id=unit_id,
selector=selector,
name=f"Selector {selector}",
type="Extension Unit",
)
)
return ExtensionUnit(unit_id=unit_id, type="Extension Unit", guid=guid_str, controls=controls)
[docs]
def list_streaming_interfaces(dev: usb.core.Device) -> Dict[int, StreamingInterface]:
"""Build :class:`StreamingInterface` descriptions for *dev*."""
interfaces: Dict[int, StreamingInterface] = {}
for cfg in dev:
for intf in cfg:
if intf.bInterfaceClass != UVC_CLASS or intf.bInterfaceSubClass != VS_SUBCLASS:
continue
info = interfaces.setdefault(
intf.bInterfaceNumber, StreamingInterface(interface_number=intf.bInterfaceNumber)
)
endpoint_address, endpoint_attributes, max_packet_size = None, None, 0
if intf.bNumEndpoints:
ep = intf[0]
endpoint_address = ep.bEndpointAddress
endpoint_attributes = ep.bmAttributes
max_packet_size = _iso_payload_capacity(ep.wMaxPacketSize)
info.alt_settings.append(
AltSettingInfo(
alternate_setting=intf.bAlternateSetting,
endpoint_address=endpoint_address,
endpoint_attributes=endpoint_attributes,
max_packet_size=max_packet_size,
)
)
if intf.bAlternateSetting == 0 and intf.extra_descriptors:
info.formats = parse_vs_descriptors(bytes(intf.extra_descriptors))
for interface in interfaces.values():
interface.alt_settings.sort(key=lambda alt: alt.alternate_setting)
return interfaces
[docs]
def parse_vs_descriptors(extra: bytes) -> List[StreamFormat]:
"""Parse the raw ``extra_descriptors`` blob for a VS interface."""
formats: List[StreamFormat] = []
idx = 0
current_format: Optional[StreamFormat] = None
while idx + 2 < len(extra):
length = extra[idx]
if length == 0 or idx + length > len(extra):
break
dtype = extra[idx + 1]
subtype = extra[idx + 2]
payload = extra[idx : idx + length]
if dtype == CS_INTERFACE:
if subtype in {VS_FORMAT_UNCOMPRESSED, VS_FORMAT_MJPEG, VS_FORMAT_FRAME_BASED}:
current_format = _parse_format_descriptor(payload)
formats.append(current_format)
elif subtype in {VS_FRAME_UNCOMPRESSED, VS_FRAME_MJPEG, VS_FRAME_FRAME_BASED} and current_format:
frame = _parse_frame_descriptor(payload)
if frame:
current_format.frames.append(frame)
elif subtype == VS_STILL_IMAGE_FRAME_DESCRIPTOR and current_format:
current_format.still_frames.extend(
_parse_still_frame_descriptor(
payload,
format_index=current_format.format_index,
format_subtype=current_format.subtype,
)
)
idx += length
return formats
def _parse_format_descriptor(desc: bytes) -> StreamFormat:
fmt_index = desc[3]
subtype = desc[2]
guid = desc[5:21]
if subtype == VS_FORMAT_MJPEG:
name = "MJPEG"
elif subtype == VS_FORMAT_UNCOMPRESSED:
name = _format_fourcc(guid)
elif subtype == VS_FORMAT_FRAME_BASED:
name = f"Frame-based {_format_fourcc(guid)}"
else:
name = f"Subtype 0x{subtype:02x}"
return StreamFormat(description=name, format_index=fmt_index, subtype=subtype, guid=guid)
def _parse_frame_descriptor(desc: bytes) -> Optional[FrameInfo]:
if len(desc) < 26:
return None
frame_index = desc[3]
bm_capabilities = desc[4] if len(desc) > 4 else 0
width = int.from_bytes(desc[5:7], "little")
height = int.from_bytes(desc[7:9], "little")
max_frame_size = int.from_bytes(desc[17:21], "little")
default_interval = int.from_bytes(desc[21:25], "little")
interval_type = desc[25]
intervals: List[int] = []
offset = 26
if interval_type == 0:
if len(desc) >= offset + 12:
min_interval = int.from_bytes(desc[offset : offset + 4], "little")
max_interval = int.from_bytes(desc[offset + 4 : offset + 8], "little")
step = int.from_bytes(desc[offset + 8 : offset + 12], "little")
intervals.extend(v for v in (min_interval, max_interval, default_interval) if v)
else:
for _ in range(interval_type):
if offset + 4 > len(desc):
break
value = int.from_bytes(desc[offset : offset + 4], "little")
if value:
intervals.append(value)
offset += 4
if default_interval and default_interval not in intervals:
intervals.append(default_interval)
if not intervals:
intervals = [default_interval] if default_interval else []
return FrameInfo(
frame_index=frame_index,
width=width,
height=height,
default_interval=default_interval,
intervals_100ns=sorted(set(intervals)),
max_frame_size=max_frame_size,
bm_capabilities=bm_capabilities,
)
def _parse_still_frame_descriptor(
desc: bytes, *, format_index: int, format_subtype: int
) -> List[StillFrameInfo]:
if len(desc) < 5:
return []
endpoint = desc[3]
num_sizes = desc[4]
offset = 5
frames: List[StillFrameInfo] = []
for idx in range(1, num_sizes + 1):
if offset + 4 > len(desc):
break
width = int.from_bytes(desc[offset : offset + 2], "little")
height = int.from_bytes(desc[offset + 2 : offset + 4], "little")
frames.append(
StillFrameInfo(
width=width,
height=height,
endpoint_address=endpoint,
frame_index=idx,
format_index=format_index,
format_subtype=format_subtype,
)
)
offset += 4
if offset >= len(desc):
return frames
num_compression = desc[offset]
offset += 1
compressions: List[int] = []
for _ in range(num_compression):
if offset >= len(desc):
break
compressions.append(int(desc[offset]))
offset += 1
if compressions:
for frame in frames:
frame.compression_indices = list(compressions)
return frames
[docs]
def describe_device(dev: usb.core.Device) -> str:
"""Human readable summary of vendor/product/serial info."""
try:
vendor = usb.util.get_string(dev, dev.iManufacturer)
except (ValueError, usb.core.USBError):
vendor = None
try:
product = usb.util.get_string(dev, dev.iProduct)
except (ValueError, usb.core.USBError):
product = None
try:
serial = usb.util.get_string(dev, dev.iSerialNumber)
except (ValueError, usb.core.USBError):
serial = None
vendor = vendor or f"VID_{dev.idVendor:04x}"
product = product or f"PID_{dev.idProduct:04x}"
serial = serial or "?"
return f"{vendor} {product} (S/N {serial})"
[docs]
def resolve_stream_preference(
interface: StreamingInterface,
width: int,
height: int,
codec: str = CodecPreference.AUTO,
) -> Tuple[StreamFormat, FrameInfo]:
"""Select a (format, frame) tuple based on resolution and codec preference.
``codec`` may be one of ``auto`` (YUYV → MJPEG → frame-based), ``yuyv``,
``mjpeg``, ``frame-based``, ``h264`` or ``h265``. The frame-based variants
filter UVC ``VS_FORMAT_FRAME_BASED`` descriptors, matching on the reported
description when a specific codec is requested. Raises
:class:`UVCError` if the requested combination does not exist.
"""
codec = codec.lower()
def _frame_based_predicate(target: str):
target = target.lower()
def _predicate(fmt: StreamFormat) -> bool:
desc = (fmt.description or "").lower()
if target == CodecPreference.H264:
return "264" in desc
if target == CodecPreference.H265:
return "265" in desc or "hevc" in desc
return True
return _predicate
def _find(subtype: int) -> Optional[Tuple[StreamFormat, FrameInfo]]:
match = interface.find_frame(width, height, subtype=subtype)
if match is not None:
return match
if width and height:
return None
return interface.find_frame(0, 0, subtype=subtype)
def _frame_based_predicate(target: str):
target = target.lower()
def _predicate(fmt: StreamFormat) -> bool:
desc = (fmt.description or "").lower()
if target == CodecPreference.H264:
return "264" in desc
if target == CodecPreference.H265:
return "265" in desc or "hevc" in desc
return True
return _predicate
def _find_frame_based(predicate=None) -> Optional[Tuple[StreamFormat, FrameInfo]]:
for fmt in interface.formats:
if fmt.subtype != VS_FORMAT_FRAME_BASED:
continue
if predicate and not predicate(fmt):
continue
match = interface.find_frame(width, height, format_index=fmt.format_index)
if match is None and width and height:
continue
if match is None:
match = interface.find_frame(0, 0, format_index=fmt.format_index)
if match is not None:
return match
return None
if codec in (CodecPreference.H264, CodecPreference.H265, CodecPreference.FRAME_BASED):
predicate = None if codec == CodecPreference.FRAME_BASED else _frame_based_predicate(codec)
match = _find_frame_based(predicate)
if match is not None:
return match
raise UVCError(f"Requested codec '{codec}' not available for this interface")
order: List[int]
if codec == CodecPreference.YUYV:
order = [VS_FORMAT_UNCOMPRESSED]
elif codec == CodecPreference.MJPEG:
order = [VS_FORMAT_MJPEG]
else:
order = [VS_FORMAT_UNCOMPRESSED, VS_FORMAT_MJPEG, VS_FORMAT_FRAME_BASED]
for subtype in order:
if subtype == VS_FORMAT_FRAME_BASED:
match = _find_frame_based()
else:
match = _find(subtype)
if match is not None:
return match
match = interface.find_frame(width, height)
if match is None and width and height:
raise UVCError(
f"Resolution {width}x{height} not advertised on interface {interface.interface_number}"
)
if match is None:
match = interface.find_frame(0, 0)
if match is None:
raise UVCError("No streaming formats advertised on this interface")
if codec != CodecPreference.AUTO:
raise UVCError(f"Requested codec '{codec}' not available for this interface")
return match
[docs]
def resolve_still_preference(
interface: StreamingInterface,
width: int,
height: int,
codec: str = CodecPreference.AUTO,
) -> Tuple[StreamFormat, Union[FrameInfo, StillFrameInfo]]:
codec = codec.lower()
def _match_candidates(
candidates: List[Tuple[StreamFormat, Union[FrameInfo, StillFrameInfo]]]
) -> Optional[Tuple[StreamFormat, Union[FrameInfo, StillFrameInfo]]]:
if not candidates:
return None
if width and height:
for fmt, frame in candidates:
if frame.width == width and frame.height == height:
return fmt, frame
if width or height:
for fmt, frame in candidates:
if (not width or frame.width == width) and (not height or frame.height == height):
return fmt, frame
# Prefer the highest resolution otherwise.
return max(candidates, key=lambda item: item[1].width * item[1].height)
def _collect(
subtype: Optional[int],
predicate: Optional[Callable[[StreamFormat], bool]] = None,
) -> Optional[Tuple[StreamFormat, Union[FrameInfo, StillFrameInfo]]]:
method1: List[Tuple[StreamFormat, FrameInfo]] = []
method2: List[Tuple[StreamFormat, StillFrameInfo]] = []
for fmt in interface.formats:
if subtype is not None and fmt.subtype != subtype:
continue
if predicate and not predicate(fmt):
continue
for frame in fmt.frames:
if frame.supports_still:
method1.append((fmt, frame))
for still in fmt.still_frames:
method2.append((fmt, still))
match = _match_candidates(method1)
if match is not None:
return match
return _match_candidates(method2)
codec_filters: List[Tuple[Optional[int], Optional[Callable[[StreamFormat], bool]]]]
if codec == CodecPreference.YUYV:
codec_filters = [(VS_FORMAT_UNCOMPRESSED, None)]
elif codec == CodecPreference.MJPEG:
codec_filters = [(VS_FORMAT_MJPEG, None)]
elif codec == CodecPreference.FRAME_BASED:
codec_filters = [(VS_FORMAT_FRAME_BASED, None)]
elif codec == CodecPreference.H264:
codec_filters = [(VS_FORMAT_FRAME_BASED, _frame_based_predicate(codec))]
elif codec == CodecPreference.H265:
codec_filters = [(VS_FORMAT_FRAME_BASED, _frame_based_predicate(codec))]
else:
codec_filters = [
(VS_FORMAT_UNCOMPRESSED, None),
(VS_FORMAT_MJPEG, None),
(VS_FORMAT_FRAME_BASED, None),
]
for subtype, predicate in codec_filters:
match = _collect(subtype, predicate)
if match is not None:
return match
match = _collect(None)
if match is not None:
return match
raise UVCError("No still-image capable frames advertised on this interface")
[docs]
def probe_streaming_interface(
dev: usb.core.Device,
interface_number: int,
stream_format: StreamFormat,
frame: FrameInfo,
frame_rate: Optional[float],
do_commit: bool,
alt_setting: Optional[int],
keep_alt: bool = False,
*,
strict_interval: bool = False,
payload_hint: int = 0,
) -> dict:
"""Claim *interface_number* and run VS_PROBE/VS_COMMIT.
When ``alt_setting`` is provided and ``do_commit`` is true, the function
selects that alternate setting after the commit. If ``keep_alt`` is false
(default) the interface is switched back to alternate 0 before returning so
that enumeration scripts leave the camera untouched. Streaming code can set
``keep_alt`` to True and manage the lifecycle manually.
"""
try:
dev.set_configuration()
except usb.core.USBError:
# The device was already configured.
pass
reattach = False
try:
if dev.is_kernel_driver_active(interface_number):
dev.detach_kernel_driver(interface_number)
reattach = True
except (usb.core.USBError, NotImplementedError, AttributeError):
pass
usb.util.claim_interface(dev, interface_number)
try:
info = perform_probe_commit(
dev,
interface_number,
stream_format,
frame,
frame_rate,
do_commit,
strict_interval=strict_interval,
payload_hint=payload_hint,
)
if do_commit and alt_setting is not None:
try:
dev.set_interface_altsetting(interface=interface_number, alternate_setting=alt_setting)
except usb.core.USBError as exc:
info["alt_setting_error"] = str(exc)
else:
info["alt_setting"] = alt_setting
return info
finally:
if do_commit and alt_setting is not None and not keep_alt:
with contextlib.suppress(usb.core.USBError):
dev.set_interface_altsetting(interface=interface_number, alternate_setting=0)
usb.util.release_interface(dev, interface_number)
if reattach:
with contextlib.suppress(usb.core.USBError):
dev.attach_kernel_driver(interface_number)
def _perform_probe_commit_with_length(
dev: usb.core.Device,
interface_number: int,
stream_format: StreamFormat,
frame: FrameInfo,
frame_rate: Optional[float],
do_commit: bool,
bm_hint: int = 1,
*,
strict_interval: bool = False,
payload_hint: int = 0,
length: int,
probe_selector: int = VS_PROBE_CONTROL,
commit_selector: int = VS_COMMIT_CONTROL,
) -> dict:
"""Send VS_PROBE (and optionally VS_COMMIT) using the provided selection."""
template = _read_control(dev, GET_CUR, probe_selector, interface_number, length)
source = "GET_CUR"
if template is None:
template = _read_control(dev, GET_DEF, probe_selector, interface_number, length)
source = "GET_DEF"
if template is None:
template = bytes(length)
source = "zero"
template_bytes = bytes(template)
LOG.debug("VS_PROBE template (%s)=%s", source, _hex_dump(template_bytes))
payload = bytearray(length)
candidate_interval = None
effective_hint = 1 if bm_hint else 0
if frame_rate is not None and frame_rate > 0:
try:
candidate_interval = frame.pick_interval(frame_rate, strict=strict_interval)
effective_hint = 1
except ValueError:
candidate_interval = None
effective_hint = 0
else:
effective_hint = 0
_set_le_value(payload, 0, effective_hint, 2)
if len(payload) > 2:
payload[2] = stream_format.format_index
if len(payload) > 3:
payload[3] = frame.frame_index
if effective_hint and candidate_interval is not None:
_set_le_value(payload, 4, candidate_interval, 4)
if payload_hint and LOG.isEnabledFor(logging.DEBUG):
LOG.debug("Available ISO capacity hint=%s bytes", payload_hint)
try:
LOG.debug(
"SET_CUR selector=0x%02x len=%s bmHint=%s fmt=%s frame=%s interval=%s payload=%s",
probe_selector,
length,
effective_hint,
stream_format.format_index,
frame.frame_index,
candidate_interval,
_hex_dump(payload),
)
_write_control(dev, SET_CUR, probe_selector, interface_number, payload)
except usb.core.USBError as exc:
LOG.debug(
"SET_CUR selector=0x%02x failed errno=%s payload=%s",
probe_selector,
getattr(exc, "errno", None),
_hex_dump(payload),
)
raise
negotiated = _read_control(dev, GET_CUR, probe_selector, interface_number, length)
if negotiated is None:
negotiated_bytes = bytes(payload)
else:
negotiated_bytes = bytes(negotiated)
LOG.debug("GET_CUR selector=0x%02x payload=%s", probe_selector, _hex_dump(negotiated_bytes))
negotiation_info = _parse_probe_payload(negotiated_bytes)
if do_commit:
try:
LOG.debug("SET_CUR selector=0x%02x payload=%s", commit_selector, _hex_dump(negotiated_bytes))
_write_control(dev, SET_CUR, commit_selector, interface_number, negotiated_bytes)
except usb.core.USBError as exc:
LOG.debug(
"SET_CUR selector=0x%02x failed errno=%s payload=%s",
commit_selector,
getattr(exc, "errno", None),
_hex_dump(negotiated_bytes),
)
raise
negotiation_info.update(
{
"chosen_interval": negotiation_info.get("dwFrameInterval"),
"requested_rate_hz": frame_rate,
"committed": do_commit,
}
)
return negotiation_info
def _parse_still_probe_payload(payload: bytes) -> dict:
result: Dict[str, Optional[int]] = {
"bFormatIndex": payload[0] if len(payload) > 0 else None,
"bFrameIndex": payload[1] if len(payload) > 1 else None,
"bCompressionIndex": payload[2] if len(payload) > 2 else None,
"dwMaxVideoFrameSize": None,
"dwMaxPayloadTransferSize": None,
}
if len(payload) >= 7:
result["dwMaxVideoFrameSize"] = int.from_bytes(payload[3:7], "little")
if len(payload) >= 11:
result["dwMaxPayloadTransferSize"] = int.from_bytes(payload[7:11], "little")
return result
def _perform_still_probe_with_length(
dev: usb.core.Device,
interface_number: int,
stream_format: StreamFormat,
frame: FrameInfo,
compression_index: int,
do_commit: bool,
*,
length: int,
) -> dict:
template = _read_control(dev, GET_CUR, VS_STILL_PROBE_CONTROL, interface_number, length)
source = "GET_CUR"
if template is None:
template = _read_control(dev, GET_DEF, VS_STILL_PROBE_CONTROL, interface_number, length)
source = "GET_DEF"
if template is None:
template = bytes(length)
source = "zero"
LOG.debug("VS_STILL_PROBE template (%s)=%s", source, _hex_dump(bytes(template)))
payload = bytearray(template)
if len(payload) > 0:
payload[0] = stream_format.format_index
if len(payload) > 1:
payload[1] = frame.frame_index
if len(payload) > 2:
payload[2] = compression_index & 0xFF
try:
LOG.debug(
"SET_CUR selector=0x%02x payload=%s",
VS_STILL_PROBE_CONTROL,
_hex_dump(payload),
)
_write_control(dev, SET_CUR, VS_STILL_PROBE_CONTROL, interface_number, payload)
except usb.core.USBError as exc:
LOG.debug(
"SET_CUR selector=0x%02x failed errno=%s payload=%s",
VS_STILL_PROBE_CONTROL,
getattr(exc, "errno", None),
_hex_dump(payload),
)
raise
negotiated = _read_control(dev, GET_CUR, VS_STILL_PROBE_CONTROL, interface_number, length)
negotiated_bytes = bytes(negotiated) if negotiated is not None else bytes(payload)
LOG.debug(
"GET_CUR selector=0x%02x payload=%s",
VS_STILL_PROBE_CONTROL,
_hex_dump(negotiated_bytes),
)
if do_commit:
try:
_write_control(dev, SET_CUR, VS_STILL_COMMIT_CONTROL, interface_number, negotiated_bytes)
except usb.core.USBError as exc:
LOG.debug(
"SET_CUR selector=0x%02x failed errno=%s payload=%s",
VS_STILL_COMMIT_CONTROL,
getattr(exc, "errno", None),
_hex_dump(negotiated_bytes),
)
raise
info = _parse_still_probe_payload(negotiated_bytes)
info.update({"committed": do_commit})
return info
from .decoders import (
DEFAULT_BACKEND_ORDER,
RecorderBackend,
create_decoder_backend,
DecoderUnavailable,
create_mjpeg_gstreamer_recorder,
)
from .uvc_async import IsoConfig, UVCPacketStream, InterruptConfig, InterruptListener
[docs]
class UVCCamera:
"""Minimal helper to configure a streaming interface and fetch frames."""
def __init__(self, device: usb.core.Device, interface: StreamingInterface):
self.device = device
self.interface = interface
self.interface_number = interface.interface_number
self._claimed = False
self._reattach = False
self._active_alt = 0
self._endpoint_address: Optional[int] = None
self._max_payload: Optional[int] = None
self._format: Optional[StreamFormat] = None
self._frame: Optional[Union[FrameInfo, StillFrameInfo]] = None
self._async_ctx: Optional[usb1.USBContext] = None
self._async_handle: Optional[usb1.USBDeviceHandle] = None
self._async_stream: Optional[UVCPacketStream] = None
self._control_interface: Optional[int] = None
self._control_endpoint: Optional[int] = None
self._control_packet_size: Optional[int] = None
self._control_claimed = False
self._vc_listener: Optional[InterruptListener] = None
self._control_cache: Dict[Tuple[int, int, int], ControlEntry] = {}
self._control_name_map: Dict[str, ControlEntry] = {}
self._needs_device_reset = False
self._committed_frame_interval: Optional[int] = None
self._committed_payload: Optional[int] = None
self._committed_frame_size: Optional[int] = None
self._committed_format_index: Optional[int] = None
self._committed_frame_index: Optional[int] = None
self._still_format: Optional[StreamFormat] = None
self._still_frame: Optional[Union[FrameInfo, StillFrameInfo]] = None
self._still_compression_index: int = 1
self._still_payload: int = 0
self._still_alt_info: Optional[AltSettingInfo] = None
self._still_frame_size: Optional[int] = None
self._still_method: int = 1
self._still_endpoint_hint: Optional[int] = None
self._still_candidates: List[Tuple[StreamFormat, Union[FrameInfo, StillFrameInfo]]] = []
self._still_candidate_pos: int = 0
self._still_allow_fallback: bool = False
self._still_requested_compression: int = 1
self._still_requested_codec: str = CodecPreference.AUTO
self._sync_stats = StreamStats()
vc_interface = None
for cfg in device:
for intf in cfg:
if intf.bInterfaceClass == UVC_CLASS and intf.bInterfaceSubClass == 1:
vc_interface = intf
break
if vc_interface is not None:
break
if vc_interface is not None:
self._control_interface = vc_interface.bInterfaceNumber
LOG.info("Detected Video Control interface=%s", self._control_interface)
# Look for an explicitly advertised interrupt endpoint.
for ep in getattr(vc_interface, "endpoints", lambda: [])():
if (
usb.util.endpoint_direction(ep.bEndpointAddress)
== usb.util.ENDPOINT_IN
and usb.util.endpoint_type(ep.bmAttributes)
== usb.util.ENDPOINT_TYPE_INTR
):
self._control_endpoint = ep.bEndpointAddress
self._control_packet_size = ep.wMaxPacketSize or 16
LOG.info(
"Found VC interrupt endpoint 0x%02x size=%s",
self._control_endpoint,
self._control_packet_size,
)
break
else:
LOG.warning("No Video Control interface found")
[docs]
@classmethod
def from_device(
cls,
device: usb.core.Device,
interface_number: int,
) -> "UVCCamera":
interfaces = list_streaming_interfaces(device)
if interface_number not in interfaces:
raise UVCError(f"Interface {interface_number} is not a UVC streaming interface")
return cls(device, interfaces[interface_number])
[docs]
@classmethod
def open(
cls,
*,
vid: Optional[int] = None,
pid: Optional[int] = None,
device_index: int = 0,
interface: int = 1,
) -> "UVCCamera":
devices = find_uvc_devices(vid, pid)
if not devices:
raise UVCError("No matching UVC devices found")
if not (0 <= device_index < len(devices)):
raise UVCError(f"Device index {device_index} out of range (found {len(devices)})")
return cls.from_device(devices[device_index], interface)
[docs]
def close(self) -> None:
self.stop_streaming()
self.stop_async_stream()
def __enter__(self) -> "UVCCamera":
return self
def __exit__(self, exc_type, exc, tb) -> None:
self.close()
@property
def active_alt_setting(self) -> int:
"""Return the currently selected alternate setting (0 when idle)."""
return self._active_alt
@property
def endpoint_address(self) -> Optional[int]:
"""USB endpoint address used for streaming (``None`` if not configured)."""
return self._endpoint_address
@property
def max_payload_size(self) -> Optional[int]:
"""Maximum payload size requested when reading packets."""
return self._max_payload
@property
def current_format(self) -> Optional[StreamFormat]:
return self._format
@property
def current_frame(self) -> Optional[FrameInfo]:
return self._frame
@property
def current_resolution(self) -> Optional[Tuple[int, int]]:
if self._frame is None:
return None
return self._frame.width, self._frame.height
# ------------------------------------------------------------------
# VC control helpers
# ------------------------------------------------------------------
[docs]
def read_vc_control(
self,
unit_id: int,
selector: int,
request: int,
length: int,
interface_number: Optional[int] = None,
) -> Optional[bytes]:
"""Read a Video Control value even while an async stream is active."""
interface = interface_number if interface_number is not None else self._control_interface
if interface is None:
raise UVCError("No Video Control interface available on this device")
if (
self._async_handle is not None
and self._control_claimed
and interface == self._control_interface
):
w_value = selector << 8
w_index = _vc_w_index(interface, unit_id)
try:
data = self._async_handle.controlRead(
REQ_TYPE_IN,
request,
w_value,
w_index,
length,
timeout=500,
)
except usb1.USBError as exc:
raise UVCError(f"VC GET request failed: {exc}") from exc
return bytes(data)
with claim_vc_interface(self.device, interface, auto_reattach=True):
return vc_ctrl_get(self.device, interface, unit_id, selector, request, length)
[docs]
def write_vc_control(
self,
unit_id: int,
selector: int,
payload: bytes,
request: int = SET_CUR,
interface_number: Optional[int] = None,
) -> None:
"""Write a Video Control value even while an async stream is active."""
interface = interface_number if interface_number is not None else self._control_interface
if interface is None:
raise UVCError("No Video Control interface available on this device")
if (
self._async_handle is not None
and self._control_claimed
and interface == self._control_interface
):
w_value = selector << 8
w_index = _vc_w_index(interface, unit_id)
try:
self._async_handle.controlWrite(
REQ_TYPE_OUT,
request,
w_value,
w_index,
payload,
timeout=500,
)
except usb1.USBError as exc:
raise UVCError(f"VC SET request failed: {exc}") from exc
return
with claim_vc_interface(self.device, interface, auto_reattach=True):
vc_ctrl_set(self.device, interface, unit_id, selector, payload)
# ------------------------------------------------------------------
# High-level control API
# ------------------------------------------------------------------
def _refresh_control_cache(self) -> None:
cache: Dict[Tuple[int, int, int], ControlEntry] = {}
name_map: Dict[str, ControlEntry] = {}
control_units = list_control_units(self.device)
for interface_number, units in control_units.items():
with claim_vc_interface(self.device, interface_number):
manager = UVCControlsManager(self.device, units, interface_number=interface_number)
for entry in manager.get_controls():
key = (entry.interface_number, entry.unit_id, entry.selector)
cache[key] = entry
name_map.setdefault(entry.name.lower(), entry)
self._control_cache = cache
self._control_name_map = name_map
[docs]
def enumerate_controls(self, *, refresh: bool = False) -> List[ControlEntry]:
if refresh or not self._control_cache:
self._refresh_control_cache()
return list(self._control_cache.values())
def _resolve_control(
self,
key: Union[str, Tuple[int, int], Tuple[int, int, int], ControlEntry, UVCControl],
*,
interface_hint: Optional[int] = None,
) -> ControlEntry:
self.enumerate_controls()
if isinstance(key, ControlEntry):
return key
if isinstance(key, UVCControl):
key = (interface_hint or self._control_interface or 0, key.unit_id, key.selector)
if isinstance(key, str):
entry = self._control_name_map.get(key.lower())
if entry is None:
raise KeyError(f"Unknown control name '{key}'")
return entry
if isinstance(key, tuple):
if len(key) == 3:
interface_number, unit_id, selector = key
entry = self._control_cache.get((interface_number, unit_id, selector))
if entry is None:
raise KeyError(f"No control for interface={interface_number} unit={unit_id} selector={selector}")
return entry
if len(key) == 2:
unit_id, selector = key
candidates = [
entry for entry in self._control_cache.values()
if entry.unit_id == unit_id and entry.selector == selector
]
if not candidates:
raise KeyError(f"No control for unit={unit_id} selector={selector}")
if interface_hint is not None:
for entry in candidates:
if entry.interface_number == interface_hint:
return entry
if self._control_interface is not None:
for entry in candidates:
if entry.interface_number == self._control_interface:
return entry
return candidates[0]
raise KeyError(f"Unsupported control key: {key!r}")
[docs]
def get_control(
self,
key: Union[str, Tuple[int, int], Tuple[int, int, int], ControlEntry, UVCControl],
*,
raw: bool = False,
interface_hint: Optional[int] = None,
) -> Optional[Union[int, bytes]]:
entry = self._resolve_control(key, interface_hint=interface_hint)
length = entry.length
if not length:
if entry.raw_default:
length = len(entry.raw_default)
elif entry.raw_maximum:
length = len(entry.raw_maximum)
elif entry.raw_minimum:
length = len(entry.raw_minimum)
else:
length = 4
data = self.read_vc_control(
entry.unit_id,
entry.selector,
GET_CUR,
length,
interface_number=entry.interface_number,
)
if data is None or raw:
return data
if len(data) <= 4:
signed = entry.minimum is not None and entry.minimum < 0
return int.from_bytes(data[: len(data)], "little", signed=signed)
return data
[docs]
def set_control(
self,
key: Union[str, Tuple[int, int], Tuple[int, int, int], ControlEntry, UVCControl],
value: Union[int, bytes, bytearray],
*,
raw: bool = False,
interface_hint: Optional[int] = None,
) -> None:
entry = self._resolve_control(key, interface_hint=interface_hint)
if raw:
if not isinstance(value, (bytes, bytearray)):
raise TypeError("Raw control values must be bytes-like")
payload = bytes(value)
else:
if not isinstance(value, int):
raise TypeError("Control values must be integers (or set raw=True for bytes)")
length = entry.length
if not length:
if entry.raw_default:
length = len(entry.raw_default)
elif entry.raw_maximum:
length = len(entry.raw_maximum)
elif entry.raw_minimum:
length = len(entry.raw_minimum)
else:
length = 2
signed = entry.minimum is not None and entry.minimum < 0
payload = int(value).to_bytes(max(1, length), "little", signed=signed)
self.write_vc_control(
entry.unit_id,
entry.selector,
payload,
interface_number=entry.interface_number,
)
# ------------------------------------------------------------------
# Interface management
# ------------------------------------------------------------------
def _ensure_claimed(self) -> None:
if self._claimed:
return
try:
self.device.set_configuration()
except usb.core.USBError:
pass
try:
if self.device.is_kernel_driver_active(self.interface_number):
self.device.detach_kernel_driver(self.interface_number)
self._reattach = True
self._needs_device_reset = True
except (usb.core.USBError, NotImplementedError, AttributeError):
pass
usb.util.claim_interface(self.device, self.interface_number)
self._claimed = True
def _release_interface(self, *, reset_alt: bool = True) -> None:
if not self._claimed:
return
if reset_alt and self._active_alt:
with contextlib.suppress(usb.core.USBError):
self.device.set_interface_altsetting(
interface=self.interface_number, alternate_setting=0
)
self._active_alt = 0
with contextlib.suppress(usb.core.USBError):
usb.util.release_interface(self.device, self.interface_number)
if self._reattach:
with contextlib.suppress(usb.core.USBError):
self.device.attach_kernel_driver(self.interface_number)
self._claimed = False
self._reattach = False
if reset_alt:
self._endpoint_address = None
self._max_payload = None
self._format = None
self._frame = None
# ------------------------------------------------------------------
# Configuration helpers
# ------------------------------------------------------------------
[docs]
def select_stream(
self,
*,
width: Optional[int] = None,
height: Optional[int] = None,
codec: CodecPreference = CodecPreference.AUTO,
format_index: Optional[int] = None,
frame_index: Optional[int] = None,
) -> Tuple[StreamFormat, FrameInfo]:
"""Resolve a streaming format/frame pairing using either dimensions or indexes."""
if width is not None and height is not None:
return resolve_stream_preference(
self.interface,
width,
height,
codec=codec,
)
if format_index is not None:
for fmt in self.interface.formats:
if fmt.format_index != format_index:
continue
if frame_index is None:
if not fmt.frames:
raise UVCError(f"Format index {format_index} exposes no frames")
return fmt, fmt.frames[0]
for frame in fmt.frames:
if frame.frame_index == frame_index:
return fmt, frame
raise UVCError(f"Format index {format_index} / frame {frame_index} not advertised")
raise UVCError("Specify either width/height or a format/frame index when selecting a stream")
[docs]
def select_still_image(
self,
*,
width: Optional[int] = None,
height: Optional[int] = None,
codec: CodecPreference = CodecPreference.AUTO,
format_index: Optional[int] = None,
frame_index: Optional[int] = None,
) -> Tuple[StreamFormat, Union[FrameInfo, StillFrameInfo]]:
"""Resolve a still-image capable format/frame pairing."""
if width is not None and height is not None:
return resolve_still_preference(
self.interface,
width,
height,
codec=codec,
)
if format_index is not None:
for fmt in self.interface.formats:
if fmt.format_index != format_index:
continue
if frame_index is None:
candidates: List[Tuple[StreamFormat, Union[FrameInfo, StillFrameInfo]]] = []
candidates.extend((fmt, frame) for frame in fmt.frames if frame.supports_still)
candidates.extend((fmt, frame) for frame in fmt.still_frames)
if not candidates:
raise UVCError(f"Format index {format_index} exposes no still-capable frames")
return max(candidates, key=lambda item: item[1].width * item[1].height)
for frame in fmt.frames:
if frame.frame_index == frame_index and frame.supports_still:
return fmt, frame
for frame in fmt.still_frames:
if frame.frame_index == frame_index:
return fmt, frame
raise UVCError(
f"Format index {format_index} / frame {frame_index} not advertised for still images"
)
# Default: choose the highest-resolution still-capable frame.
best: Optional[Tuple[StreamFormat, Union[FrameInfo, StillFrameInfo]]] = None
best_area = -1
for fmt, frame in self.interface.iter_still_frames():
area = frame.width * frame.height
if area > best_area:
best = (fmt, frame)
best_area = area
if best_area >= 0:
return best # type: ignore[return-value]
# No method 1 support; look for Method 2 descriptors instead.
for fmt in self.interface.formats:
for frame in fmt.still_frames:
area = frame.width * frame.height
if area > best_area:
best = (fmt, frame)
best_area = area
if best is None:
raise UVCError("No still-image capable frames advertised on this interface")
return best
def _configure_specific_still(
self,
stream_format: StreamFormat,
frame_choice: Union[FrameInfo, StillFrameInfo],
compression_index: int,
) -> dict:
requested_method = 2 if isinstance(frame_choice, StillFrameInfo) else 1
if requested_method == 1 and isinstance(frame_choice, FrameInfo):
if not frame_choice.supports_still:
raise UVCError("Selected frame does not advertise still-image support")
compression = compression_index if compression_index and compression_index > 0 else 1
if isinstance(frame_choice, StillFrameInfo) and frame_choice.compression_indices:
if compression not in frame_choice.compression_indices:
preferred = frame_choice.compression_indices[0]
if compression_index and compression_index not in frame_choice.compression_indices:
LOG.warning(
"Requested still compression index %s not advertised; using %s",
compression_index,
preferred,
)
compression = preferred
self._ensure_claimed()
info = perform_still_probe_commit(
self.device,
self.interface_number,
stream_format,
frame_choice,
compression_index=compression,
do_commit=True,
)
negotiated_format_idx = info.get("bFormatIndex")
negotiated_frame_idx = info.get("bFrameIndex")
actual_format = stream_format
if isinstance(negotiated_format_idx, int) and negotiated_format_idx:
actual_format = next(
(fmt for fmt in self.interface.formats if fmt.format_index == negotiated_format_idx),
stream_format,
)
actual_frame: Union[FrameInfo, StillFrameInfo] = frame_choice
if isinstance(negotiated_frame_idx, int) and negotiated_frame_idx and actual_format:
candidate: Optional[Union[FrameInfo, StillFrameInfo]] = None
if actual_format.still_frames and requested_method == 2:
candidate = next(
(frame for frame in actual_format.still_frames if frame.frame_index == negotiated_frame_idx),
None,
)
if candidate is None:
candidate = next(
(frame for frame in actual_format.frames if frame.frame_index == negotiated_frame_idx),
None,
)
if candidate is None and actual_format.still_frames:
candidate = next(
(frame for frame in actual_format.still_frames if frame.frame_index == negotiated_frame_idx),
None,
)
if candidate is not None:
actual_frame = candidate
if isinstance(actual_frame, StillFrameInfo):
method = 2
matching_stream = next(
(
candidate
for candidate in actual_format.frames
if candidate.width == actual_frame.width and candidate.height == actual_frame.height
),
None,
)
if actual_frame.max_frame_size == 0 and matching_stream is not None:
actual_frame.max_frame_size = matching_stream.max_frame_size
if actual_frame.max_frame_size == 0 and actual_format.subtype == VS_FORMAT_UNCOMPRESSED:
actual_frame.max_frame_size = actual_frame.width * actual_frame.height * 2
else:
method = 1
if not actual_frame.supports_still:
raise UVCError("Negotiated still frame does not advertise still-image support")
negotiated_compression = info.get("bCompressionIndex")
effective_compression = compression
if isinstance(negotiated_compression, int) and negotiated_compression:
effective_compression = negotiated_compression
elif method == 2 and isinstance(actual_frame, StillFrameInfo) and actual_frame.compression_indices:
if effective_compression not in actual_frame.compression_indices:
effective_compression = actual_frame.compression_indices[0]
endpoint_hint: Optional[int] = None
if isinstance(actual_frame, StillFrameInfo):
endpoint_hint = actual_frame.endpoint_address or None
frame_max_size = getattr(actual_frame, "max_frame_size", 0)
payload_hint = (
info.get("dwMaxPayloadTransferSize")
or info.get("dwMaxVideoFrameSize")
or frame_max_size
or 0
)
alt_info: Optional[AltSettingInfo] = None
if endpoint_hint:
alt_info = self.interface.find_alt_by_endpoint(endpoint_hint)
if alt_info is None and payload_hint:
alt_info = self.interface.select_alt_for_payload(payload_hint)
if alt_info is None and endpoint_hint:
alt_info = self.interface.find_alt_by_endpoint(endpoint_hint)
if alt_info is None and self._endpoint_address is None:
alt_info = next(
(alt for alt in reversed(self.interface.alt_settings) if alt.endpoint_address),
None,
)
if alt_info is None and self._endpoint_address is None:
raise UVCError("Unable to resolve an alternate setting for still capture")
frame_size_guess = info.get("dwMaxVideoFrameSize") or frame_max_size
if not frame_size_guess and actual_format.subtype == VS_FORMAT_UNCOMPRESSED:
frame_size_guess = actual_frame.width * actual_frame.height * 2
self._still_method = method
self._still_format = actual_format
self._still_frame = actual_frame
self._still_endpoint_hint = endpoint_hint if endpoint_hint else None
self._still_compression_index = effective_compression
self._still_payload = int(payload_hint)
self._still_alt_info = alt_info
self._still_frame_size = int(frame_size_guess or 0)
return info
def _still_candidate_key(
self,
stream_format: StreamFormat,
frame: Union[FrameInfo, StillFrameInfo],
) -> Tuple[str, int, int]:
kind = "still" if isinstance(frame, StillFrameInfo) else "stream"
return (kind, stream_format.format_index, frame.frame_index)
def _collect_still_candidates(
self,
codec: str,
) -> List[Tuple[StreamFormat, Union[FrameInfo, StillFrameInfo]]]:
codec = codec.lower()
def _frame_based_predicate(target: str):
target = target.lower()
def _predicate(fmt: StreamFormat) -> bool:
desc = (fmt.description or "").lower()
if target == CodecPreference.H264:
return "264" in desc
if target == CodecPreference.H265:
return "265" in desc or "hevc" in desc
return True
return _predicate
if codec == CodecPreference.YUYV:
filters = [(VS_FORMAT_UNCOMPRESSED, None)]
include_remaining = False
elif codec == CodecPreference.MJPEG:
filters = [(VS_FORMAT_MJPEG, None)]
include_remaining = False
elif codec == CodecPreference.FRAME_BASED:
filters = [(VS_FORMAT_FRAME_BASED, None)]
include_remaining = False
elif codec == CodecPreference.H264:
filters = [(VS_FORMAT_FRAME_BASED, _frame_based_predicate(codec))]
include_remaining = False
elif codec == CodecPreference.H265:
filters = [(VS_FORMAT_FRAME_BASED, _frame_based_predicate(codec))]
include_remaining = False
else:
filters = [
(VS_FORMAT_MJPEG, None),
(VS_FORMAT_UNCOMPRESSED, None),
(VS_FORMAT_FRAME_BASED, None),
]
include_remaining = True
seen: set = set()
result: List[Tuple[StreamFormat, Union[FrameInfo, StillFrameInfo]]] = []
def _add_for_subtype(subtype: Optional[int], predicate=None) -> None:
subset: List[Tuple[StreamFormat, Union[FrameInfo, StillFrameInfo]]] = []
for fmt in self.interface.formats:
if subtype is not None and fmt.subtype != subtype:
continue
if predicate and not predicate(fmt):
continue
for still in fmt.still_frames:
key = self._still_candidate_key(fmt, still)
if key in seen:
continue
seen.add(key)
subset.append((fmt, still))
for frame in fmt.frames:
if not frame.supports_still:
continue
key = self._still_candidate_key(fmt, frame)
if key in seen:
continue
seen.add(key)
subset.append((fmt, frame))
subset.sort(key=lambda item: item[1].width * item[1].height, reverse=True)
result.extend(subset)
for subtype, predicate in filters:
_add_for_subtype(subtype, predicate)
if include_remaining:
_add_for_subtype(None)
return result
def _set_current_still_candidate_position(self) -> None:
if (
not self._still_candidates
or self._still_format is None
or self._still_frame is None
):
self._still_candidate_pos = 0
return
key = self._still_candidate_key(self._still_format, self._still_frame)
for idx, (fmt, frame) in enumerate(self._still_candidates):
if self._still_candidate_key(fmt, frame) == key:
self._still_candidate_pos = idx
return
self._still_candidates.insert(0, (self._still_format, self._still_frame))
self._still_candidate_pos = 0
[docs]
def stream(
self,
*,
width: Optional[int] = None,
height: Optional[int] = None,
codec: CodecPreference = CodecPreference.AUTO,
decoder: Optional[Union[str, DecoderPreference, Iterable[str]]] = DecoderPreference.AUTO,
format_index: Optional[int] = None,
frame_index: Optional[int] = None,
frame_rate: Optional[float] = None,
strict_fps: bool = False,
queue_size: int = 4,
skip_initial: int = 2,
transfers: int = 16,
packets_per_transfer: int = 64,
timeout_ms: int = 2000,
duration: Optional[float] = None,
record_to: Optional[Union[str, pathlib.Path]] = None,
) -> "FrameStream":
"""Return a managed frame iterator for continuous streaming.
Parameters
----------
decoder:
Optional decoder backend preference for frame-based codecs (for example
H.264/H.265). Use :data:`DecoderPreference.NONE` to keep raw payloads
and defer decoding. MJPEG and uncompressed formats ignore this setting.
record_to:
Optional file path for writing the compressed payloads (requires a decoder backend that supports recording).
"""
stream_format, frame = self.select_stream(
width=width,
height=height,
codec=codec,
format_index=format_index,
frame_index=frame_index,
)
return FrameStream(
camera=self,
stream_format=stream_format,
frame=frame,
frame_rate=frame_rate,
strict_fps=strict_fps,
queue_size=queue_size,
skip_initial=skip_initial,
transfers=transfers,
packets_per_transfer=packets_per_transfer,
timeout_ms=timeout_ms,
duration=duration,
decoder_preference=decoder,
record_path=record_to,
)
[docs]
def stop_streaming(self) -> None:
"""Return the interface to its idle state."""
self._release_interface()
self._reset_device()
# ------------------------------------------------------------------
# Still image helpers
# ------------------------------------------------------------------
[docs]
def capture_still_image(self, *, timeout_ms: int = 2000) -> CapturedFrame:
"""Trigger and fetch a single still image using the negotiated settings."""
if self._still_format is None or self._still_frame is None:
raise UVCError("Still image parameters not configured; call configure_still_image() first")
attempt_index = self._still_candidate_pos
visited_keys: Set[Tuple[str, int, int]] = set()
while True:
current_key = self._still_candidate_key(self._still_format, self._still_frame)
if current_key in visited_keys:
raise UVCError(
"Still capture exhausted advertised candidates without a usable payload"
)
visited_keys.add(current_key)
was_claimed = self._claimed
self._ensure_claimed()
previous_alt = self._active_alt
previous_endpoint = self._endpoint_address
previous_payload = self._max_payload
previous_format = self._format
previous_frame = self._frame
alt_info = self._still_alt_info
if alt_info is not None and alt_info.endpoint_address is None:
alt_info = None
if alt_info is not None:
if alt_info.alternate_setting != self._active_alt:
try:
self.device.set_interface_altsetting(
interface=self.interface_number,
alternate_setting=alt_info.alternate_setting,
)
except usb.core.USBError as exc:
raise UVCError(f"Failed to select still-image alternate setting: {exc}") from exc
self._active_alt = alt_info.alternate_setting
self._endpoint_address = alt_info.endpoint_address
self._max_payload = alt_info.max_packet_size or self._max_payload
# Ensure decoding metadata refers to the still frame while we capture.
self._format = self._still_format
self._frame = self._still_frame
original_frame_size: Optional[int] = None
if self._frame is not None and self._still_frame_size:
original_frame_size = self._frame.max_frame_size
self._frame.max_frame_size = self._still_frame_size
frame: Optional[CapturedFrame] = None
exc_info: Optional[UVCError] = None
try:
trigger_value = 0x01
if self._still_method == 2:
endpoint_hint = self._still_endpoint_hint
alt_endpoint = self._still_alt_info.endpoint_address if self._still_alt_info else None
if endpoint_hint and endpoint_hint != 0:
trigger_value = 0x02
elif alt_endpoint and alt_endpoint != previous_endpoint:
trigger_value = 0x02
_write_control(
self.device,
SET_CUR,
VS_STILL_IMAGE_TRIGGER_CONTROL,
self.interface_number,
bytes([trigger_value]),
)
with contextlib.suppress(usb.core.USBError):
if self._endpoint_address is not None:
self.device.clear_halt(self._endpoint_address)
try:
frame = self.read_frame(timeout_ms=timeout_ms, overall_timeout_ms=timeout_ms)
except usb.core.USBError as exc:
exc_info = UVCError(f"Still capture failed: {exc}")
except UVCError as exc:
exc_info = exc
finally:
self._format = previous_format
self._frame = previous_frame
if self._frame is not None and original_frame_size is not None:
self._frame.max_frame_size = original_frame_size
if alt_info is not None and previous_alt != self._active_alt:
with contextlib.suppress(usb.core.USBError):
self.device.set_interface_altsetting(
interface=self.interface_number,
alternate_setting=previous_alt,
)
self._active_alt = previous_alt
self._endpoint_address = previous_endpoint
self._max_payload = previous_payload
if not was_claimed:
self._release_interface(reset_alt=False)
if frame is not None:
return frame
if exc_info is None:
raise UVCError("Still capture produced no frame data")
timed_out = "Timed out waiting for frame" in str(exc_info)
if timed_out and self._still_allow_fallback:
current_frame = self._still_frame
fallback_configured = False
while attempt_index + 1 < len(self._still_candidates):
attempt_index += 1
next_fmt, next_frame = self._still_candidates[attempt_index]
LOG.warning(
"Still capture timed out at %sx%s; retrying with %sx%s",
current_frame.width if isinstance(current_frame, (FrameInfo, StillFrameInfo)) else "?",
current_frame.height if isinstance(current_frame, (FrameInfo, StillFrameInfo)) else "?",
next_frame.width,
next_frame.height,
)
try:
info = self._configure_specific_still(
next_fmt,
next_frame,
self._still_compression_index or self._still_requested_compression,
)
except UVCError as cfg_exc:
LOG.warning(
"Fallback still configuration %sx%s failed: %s",
next_frame.width,
next_frame.height,
cfg_exc,
)
continue
self._set_current_still_candidate_position()
attempt_index = self._still_candidate_pos
self._still_requested_compression = self._still_compression_index
LOG.info("Fallback still PROBE/COMMIT info: %s", info)
fallback_configured = True
break
if fallback_configured:
continue
raise exc_info
# ------------------------------------------------------------------
# Asynchronous streaming (isochronous transfers via libusb1)
# ------------------------------------------------------------------
[docs]
def start_async_stream(
self,
packet_callback: Callable[[bytes], None],
*,
transfers: int = 8,
packets_per_transfer: int = 32,
timeout_ms: int = 1000,
) -> None:
"""Start ISO streaming with robust VC polling keep-alive."""
if self._format is None or self._frame is None:
raise UVCError("Stream not configured; call configure_stream() first")
if self._endpoint_address is None or not self._max_payload:
raise UVCError("Streaming endpoint not initialised")
if self._async_stream is not None:
raise UVCError("Asynchronous stream already active")
endpoint = self._endpoint_address
alt = self._active_alt
self._needs_device_reset = True
bus = getattr(self.device, "bus", None)
address = getattr(self.device, "address", None)
if bus is None or address is None:
raise UVCError("Unable to determine device bus/address for libusb1 handle")
self._release_interface(reset_alt=False)
ctx = usb1.USBContext()
handle = None
for dev_handle in ctx.getDeviceList():
if dev_handle.getBusNumber() == bus and dev_handle.getDeviceAddress() == address:
handle = dev_handle.open()
break
if handle is None:
ctx.close()
raise UVCError("Failed to reopen device via libusb1 lookup")
handle.setAutoDetachKernelDriver(True)
control_claimed = False
if self._control_interface is not None and self._control_endpoint is not None:
try:
handle.claimInterface(self._control_interface)
control_claimed = True
LOG.info("Claimed VC interface %s", self._control_interface)
except usb1.USBError as exc:
LOG.warning("Failed to claim VC interface: %s", exc)
try:
handle.claimInterface(self.interface_number)
LOG.info("Claimed streaming interface %s", self.interface_number)
except usb1.USBError as exc:
with contextlib.suppress(usb1.USBError):
if control_claimed and self._control_interface is not None:
handle.releaseInterface(self._control_interface)
handle.close()
ctx.close()
raise UVCError(
f"Failed to claim VS interface {self.interface_number}: {exc}"
) from exc
try:
handle.setInterfaceAltSetting(self.interface_number, 0)
time.sleep(0.05)
self._run_libusb_probe_commit(handle)
handle.setInterfaceAltSetting(self.interface_number, alt)
LOG.info(
"VS interface %s set to alt %s", self.interface_number, alt
)
time.sleep(0.1)
except usb1.USBError as exc:
with contextlib.suppress(usb1.USBError):
handle.releaseInterface(self.interface_number)
if control_claimed and self._control_interface is not None:
handle.releaseInterface(self._control_interface)
handle.close()
ctx.close()
raise UVCError(f"Failed to set alternate setting: {exc}") from exc
with contextlib.suppress(usb1.USBError):
LOG.debug("Clearing halt on endpoint 0x%02x", endpoint)
handle.clearHalt(endpoint)
iso_config = IsoConfig(
endpoint=endpoint,
packet_size=self._max_payload,
transfers=transfers,
packets_per_transfer=packets_per_transfer,
timeout_ms=timeout_ms,
)
def _callback(data: bytes) -> None:
if self._async_stream and self._async_stream.is_active():
packet_callback(data)
stream = UVCPacketStream(ctx, handle, iso_config, _callback)
time.sleep(0.15)
stream.start()
self._async_ctx = ctx
self._async_handle = handle
self._async_stream = stream
self._control_claimed = control_claimed
if control_claimed and self._control_endpoint is not None and self._control_packet_size:
try:
self._vc_listener = InterruptListener(
ctx,
handle,
InterruptConfig(
endpoint=self._control_endpoint,
packet_size=self._control_packet_size,
timeout_ms=0,
),
lambda data: LOG.debug("VC interrupt data=%s", data.hex()),
)
self._vc_listener.start()
LOG.info(
"VC interrupt listener started on endpoint 0x%02x",
self._control_endpoint,
)
except usb1.USBError as exc:
LOG.warning("Failed to start VC interrupt listener: %s", exc)
self._vc_listener = None
[docs]
def poll_async_events(self, timeout: float = 0.1) -> None:
if self._async_ctx is None or self._async_stream is None:
return
tv = int(timeout * 1e6)
with contextlib.suppress(Exception):
self._async_stream.handle_events_and_resubmit(tv)
def _reset_device(self) -> None:
if not self._needs_device_reset:
return
with contextlib.suppress(usb.core.USBError):
LOG.debug("Resetting USB device to restore kernel state")
self.device.reset()
self._needs_device_reset = False
def _run_libusb_probe_commit(self, handle: usb1.USBDeviceHandle) -> None:
"""Perform a full, robust PROBE/COMMIT sequence using a libusb1 handle."""
if self._format is None or self._frame is None:
raise UVCError("Stream not configured; call configure_stream() first")
length = 34 # Use the length that is known to work for most devices.
timeout = 1000
req_in = usb1.TYPE_CLASS | usb1.RECIPIENT_INTERFACE | usb1.ENDPOINT_IN
req_out = usb1.TYPE_CLASS | usb1.RECIPIENT_INTERFACE | usb1.ENDPOINT_OUT
try:
template = handle.controlRead(
req_in, GET_CUR, VS_PROBE_CONTROL << 8, self.interface_number, length, timeout
)
LOG.debug("libusb1 PROBE template from GET_CUR: %s", template.hex())
except usb1.USBError:
LOG.debug("libusb1 PROBE GET_CUR failed, using zeroed buffer")
template = bytes(length)
buf = bytearray(template)
interval = self._committed_frame_interval or self._frame.pick_interval(None)
bm_hint = 1
buf[0:2] = bm_hint.to_bytes(2, "little")
buf[2] = self._committed_format_index or self._format.format_index
buf[3] = self._committed_frame_index or self._frame.frame_index
buf[4:8] = int(interval or 0).to_bytes(4, "little")
LOG.debug("libusb1 PROBE SET_CUR: %s", bytes(buf).hex())
handle.controlWrite(
req_out, SET_CUR, VS_PROBE_CONTROL << 8, self.interface_number, bytes(buf), timeout
)
negotiated = bytes(handle.controlRead(
req_in, GET_CUR, VS_PROBE_CONTROL << 8, self.interface_number, length, timeout
))
LOG.debug("libusb1 PROBE GET_CUR (negotiated): %s", negotiated.hex())
LOG.debug("libusb1 COMMIT SET_CUR: %s", negotiated.hex())
handle.controlWrite(
req_out, SET_CUR, VS_COMMIT_CONTROL << 8, self.interface_number, negotiated, timeout
)
[docs]
def stop_async_stream(self) -> None:
if self._async_stream is None:
LOG.debug("No async stream to stop")
return
LOG.info("Stopping async stream")
if self._vc_listener is not None:
self._vc_listener.stop()
self._vc_listener = None
self._async_stream.stop()
if self._async_handle is not None:
if self._control_claimed and self._control_interface is not None:
with contextlib.suppress(usb1.USBError):
LOG.debug("Releasing VC interface %s", self._control_interface)
self._async_handle.releaseInterface(self._control_interface)
with contextlib.suppress(usb1.USBError):
LOG.debug("Resetting VS interface %s to alt 0", self.interface_number)
self._async_handle.setInterfaceAltSetting(self.interface_number, 0)
time.sleep(0.1)
with contextlib.suppress(usb1.USBError):
self._async_handle.releaseInterface(self.interface_number)
with contextlib.suppress(usb1.USBError, AssertionError):
self._async_handle.close()
if self._async_ctx is not None:
with contextlib.suppress(Exception):
self._async_ctx.close()
self._async_stream = None
self._async_handle = None
self._async_ctx = None
self._control_claimed = False
# Ensure the kernel driver is reattached for VC and VS interfaces.
if self._control_interface is not None:
with contextlib.suppress(usb.core.USBError):
if not self.device.is_kernel_driver_active(self._control_interface):
LOG.debug("Reattaching kernel driver on VC interface %s", self._control_interface)
self.device.attach_kernel_driver(self._control_interface)
with contextlib.suppress(usb.core.USBError):
if not self.device.is_kernel_driver_active(self.interface_number):
LOG.debug("Reattaching kernel driver on VS interface %s", self.interface_number)
self.device.attach_kernel_driver(self.interface_number)
self._reset_device()
LOG.info("Async stream stopped")
[docs]
def read_frame(
self,
timeout_ms: int = 1000,
*,
overall_timeout_ms: Optional[int] = None,
) -> CapturedFrame:
"""Read a single video frame from the streaming endpoint."""
if not self._claimed or self._endpoint_address is None or self._max_payload is None:
raise UVCError("Stream not configured; call configure_stream() first")
expected_size = self._frame.max_frame_size if self._frame else None
if (
self._format is not None
and (
self._format.subtype == VS_FORMAT_MJPEG
or self._format.subtype == VS_FORMAT_FRAME_BASED
or "MJPG" in (self._format.description or "").upper()
)
):
expected_size = None
reassembler = FrameReassembler(
expected_size=expected_size,
max_payload_size=self._max_payload,
)
start_time = time.monotonic()
overall_deadline = None
if overall_timeout_ms is not None and overall_timeout_ms > 0:
overall_deadline = start_time + (overall_timeout_ms / 1000.0)
while True:
if overall_deadline is not None and time.monotonic() >= overall_deadline:
raise UVCError("Timed out waiting for frame")
try:
packet = self.device.read(
self._endpoint_address,
self._max_payload,
timeout_ms,
)
except usb.core.USBError as exc:
if exc.errno == errno.ETIMEDOUT:
if overall_deadline is not None and time.monotonic() >= overall_deadline:
raise UVCError("Timed out waiting for frame")
continue
raise
if not packet:
continue
for result in reassembler.feed(packet):
if (
result.payload is None
or result.error
or self._format is None
or self._frame is None
):
self._sync_stats.frames_dropped += 1
self._sync_stats.last_drop_reason = result.reason
continue
decoded = _decode_payload_once(self._format, result.payload)
size = len(result.payload)
self._sync_stats.frames_completed += 1
self._sync_stats.bytes_delivered += size
if result.duration is not None:
self._sync_stats.last_frame_duration_s = result.duration
samples = self._sync_stats.measured_frames
if samples == 0 or self._sync_stats.average_frame_duration_s is None:
self._sync_stats.average_frame_duration_s = result.duration
else:
prev = self._sync_stats.average_frame_duration_s
self._sync_stats.average_frame_duration_s = (
prev * samples + result.duration
) / (samples + 1)
self._sync_stats.measured_frames = samples + 1
return CapturedFrame(
payload=result.payload,
format=self._format,
frame=self._frame,
fid=result.fid if result.fid is not None else 0,
pts=result.pts,
decoded=decoded,
)
[docs]
def get_stream_stats(self) -> StreamStats:
"""Return a snapshot of synchronous capture statistics."""
return dataclasses.replace(self._sync_stats)
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _interval_to_hz(interval_100ns: int) -> float:
return 1e7 / interval_100ns if interval_100ns else 0.0
def _format_fourcc(guid: bytes) -> str:
if len(guid) >= 4:
code = guid[:4]
try:
text = code.decode("ascii")
text = text.rstrip("\x00")
if text and all(32 <= ord(ch) < 127 for ch in text):
return text
return f"0x{code.hex()}"
except UnicodeDecodeError:
return code.hex()
return "UNKNOWN"
def _iso_payload_capacity(w_max_packet_size: int) -> int:
"""Return the actual payload size taking additional transactions into account."""
base = w_max_packet_size & 0x7FF
multiplier = ((w_max_packet_size >> 11) & 0x3) + 1
return base * multiplier
def _get_control_length(dev: usb.core.Device, interface_number: int, selector: int) -> Optional[int]:
try:
data = dev.ctrl_transfer(REQ_TYPE_IN, GET_LEN, selector << 8, interface_number, 2)
except usb.core.USBError:
return None
if len(data) >= 2:
return int.from_bytes(data[:2], "little")
return None
def _read_control(
dev: usb.core.Device,
request: int,
selector: int,
interface_number: int,
length: int,
) -> Optional[bytes]:
try:
return dev.ctrl_transfer(REQ_TYPE_IN, request, selector << 8, interface_number, length)
except usb.core.USBError:
return None
def _write_control(
dev: usb.core.Device,
request: int,
selector: int,
interface_number: int,
data: bytes,
) -> None:
dev.ctrl_transfer(REQ_TYPE_OUT, request, selector << 8, interface_number, data)
def _set_le_value(buf: bytearray, offset: int, value: int, size: int) -> None:
if offset + size <= len(buf):
buf[offset : offset + size] = int(value).to_bytes(size, "little", signed=False)
def _hex_dump(data: bytes, limit: int = 64) -> str:
if not data:
return ""
hexed = data.hex()
if len(data) <= limit:
return hexed
omitted = len(data) - limit
return f"{hexed[: 2 * limit]}...( +{omitted}B)"
def _parse_probe_payload(payload: bytes) -> dict:
def le16(off: int) -> Optional[int]:
return int.from_bytes(payload[off : off + 2], "little") if off + 2 <= len(payload) else None
def le32(off: int) -> Optional[int]:
return int.from_bytes(payload[off : off + 4], "little") if off + 4 <= len(payload) else None
result = {
"bmHint": le16(0),
"bFormatIndex": payload[2] if len(payload) > 2 else None,
"bFrameIndex": payload[3] if len(payload) > 3 else None,
"dwFrameInterval": le32(4),
"dwMaxVideoFrameSize": le32(18),
"dwMaxPayloadTransferSize": le32(22),
}
interval = result.get("dwFrameInterval")
if interval:
result["frame_rate_hz"] = _interval_to_hz(interval)
return result
def _normalise_record_path(path: pathlib.Path, stream_format: StreamFormat) -> pathlib.Path:
if stream_format.subtype == VS_FORMAT_MJPEG:
if path.suffix.lower() != ".avi":
LOG.info("MJPEG recording forced to AVI container (.avi)")
return path.with_suffix(".avi")
return path
if path.suffix.lower() != ".mkv":
LOG.info("Recording output forced to Matroska container (.mkv)")
return path.with_suffix(".mkv")
return path
def _decode_payload_once(
format_descriptor: Optional[StreamFormat],
payload: Union[bytes, bytearray],
decoder_order: Optional[List[str]] = None,
) -> Optional[object]:
if format_descriptor is None or format_descriptor.subtype != VS_FORMAT_FRAME_BASED:
return None
try:
decoder = create_decoder_backend(
format_descriptor.description,
preference=decoder_order,
)
except DecoderUnavailable:
return None
try:
data = payload if isinstance(payload, bytes) else bytes(payload)
with decoder:
frames = decoder.decode_packet(data)
frames.extend(decoder.flush())
except Exception as exc: # pragma: no cover - backend dependent
LOG.warning("On-demand decoder failed: %s", exc)
return None
if not frames:
return None
if len(frames) > 1:
LOG.debug("One-shot decoder produced %s frames; returning the first one", len(frames))
return frames[0]
[docs]
class FrameStream:
"""Context manager and iterator yielding :class:`CapturedFrame` objects."""
def __init__(
self,
camera: UVCCamera,
stream_format: StreamFormat,
frame: FrameInfo,
frame_rate: Optional[float],
*,
strict_fps: bool,
queue_size: int,
skip_initial: int,
transfers: int,
packets_per_transfer: int,
timeout_ms: int,
duration: Optional[float],
decoder_preference: Optional[Union[str, DecoderPreference, Iterable[str]]] = None,
record_path: Optional[Union[str, pathlib.Path]] = None,
) -> None:
self._camera = camera
self._format = stream_format
self._frame = frame
self._frame_rate = frame_rate
self._negotiated_fps = frame_rate
self._strict_fps = strict_fps
self._queue: "queue.Queue[Optional[CapturedFrame]]" = queue.Queue(maxsize=max(1, queue_size))
self._skip_initial = max(0, skip_initial)
self._transfers = transfers
self._packets_per_transfer = packets_per_transfer
self._timeout_ms = timeout_ms
self._duration = duration
self._decoder_preference = decoder_preference
self._decoder_order = _normalise_decoder_preference(decoder_preference)
self._decoder_failures: Set[str] = set()
self._decoder_backend_name: Optional[str] = None
self._decoder_backend_key: Optional[str] = None
self._decoder_exhausted = False
if self._decoder_order is None:
self._decoder_preference_label = DecoderPreference.NONE
elif not self._decoder_order:
self._decoder_preference_label = DecoderPreference.AUTO
else:
self._decoder_preference_label = ", ".join(self._decoder_order)
self._stop_event = threading.Event()
self._poll_thread: Optional[threading.Thread] = None
self._active = False
self._start_time = 0.0
self._sequence = 0
self._stats = StreamStats()
self._record_path = (
_normalise_record_path(pathlib.Path(record_path).expanduser(), stream_format)
if record_path
else None
)
self._recorder: Optional[RecorderBackend] = None
is_mjpeg = stream_format.subtype == VS_FORMAT_MJPEG or "MJPG" in stream_format.description.upper()
is_frame_based = stream_format.subtype == VS_FORMAT_FRAME_BASED
decoder_requested = bool(self._decoder_order)
self._decoder_applicable = is_frame_based or (is_mjpeg and decoder_requested)
self._expected_size = (
None
if is_mjpeg or is_frame_based
else frame.max_frame_size or (frame.width * frame.height * 2)
)
self._reassembler = FrameReassembler(expected_size=self._expected_size)
if self._record_path and not self._decoder_applicable:
LOG.warning(
"Recording requested but no decoder backend will be activated for %s; skipping recorder setup",
stream_format.description,
)
self._decoder = None
self._decoder_failed = False
if self._decoder_order is None:
if self._decoder_applicable:
LOG.info(
"Decoder preference 'none' disables decoding for %s",
stream_format.description,
)
elif self._decoder_applicable:
self._install_decoder(initial=True)
elif decoder_preference not in (None, DecoderPreference.AUTO):
LOG.info(
"Decoder preference %s ignored for non-decodable format %s",
self._decoder_preference_label,
stream_format.description,
)
elif is_mjpeg:
LOG.info(
"Decoder preference auto keeps legacy pipeline for %s",
stream_format.description,
)
def __enter__(self) -> "FrameStream":
negotiation = self._camera.configure_stream(
self._format,
self._frame,
frame_rate=self._frame_rate,
strict_fps=self._strict_fps,
)
LOG.debug("FrameStream negotiation: %s", negotiation)
self._negotiated_fps = (
negotiation.get("calculated_fps")
or negotiation.get("frame_rate_hz")
or self._frame_rate
)
if self._negotiated_fps:
LOG.info("Stream running at %.2f fps", self._negotiated_fps)
self._reassembler = FrameReassembler(expected_size=self._expected_size)
self._camera.start_async_stream(
self._on_packet,
transfers=self._transfers,
packets_per_transfer=self._packets_per_transfer,
timeout_ms=self._timeout_ms,
)
self._start_time = time.time()
self._stop_event.clear()
self._active = True
self._poll_thread = threading.Thread(target=self._poll_loop, name="uvc-frame-poll", daemon=True)
self._poll_thread.start()
return self
def __exit__(self, exc_type, exc, tb) -> None:
self.close()
def __iter__(self):
while True:
if self._duration is not None and self._active:
if time.time() - self._start_time >= self._duration:
LOG.info("FrameStream duration %.2fs reached", self._duration)
self.close()
try:
item = self._queue.get(timeout=0.1)
except queue.Empty:
if not self._active and self._queue.empty():
break
continue
if item is None:
break
yield item
[docs]
def close(self) -> None:
if not self._active:
return
self._stop_event.set()
self._active = False
try:
self._queue.put_nowait(None)
except queue.Full:
pass
self._camera.stop_async_stream()
self._camera.stop_streaming()
self._release_decoder()
self._shutdown_recorder()
if self._poll_thread is not None:
self._poll_thread.join(timeout=0.5)
self._poll_thread = None
@property
def stats(self) -> StreamStats:
"""Return a snapshot of the accumulated stream statistics."""
return dataclasses.replace(self._stats)
def _poll_loop(self) -> None:
while not self._stop_event.is_set():
self._camera.poll_async_events(0.05)
def _enqueue(self, frame: CapturedFrame) -> None:
if not self._active:
return
try:
self._queue.put_nowait(frame)
except queue.Full:
try:
_ = self._queue.get_nowait()
except queue.Empty:
pass
try:
self._queue.put_nowait(frame)
except queue.Full:
LOG.debug("FrameStream queue full; dropping frame %s", frame.sequence)
def _release_decoder(self) -> None:
if self._decoder is None:
return
try:
self._decoder.close()
except Exception: # pragma: no cover - best-effort cleanup
LOG.debug("Decoder backend close() failed", exc_info=True)
self._decoder = None
self._decoder_backend_name = None
self._decoder_backend_key = None
def _install_decoder(self, *, initial: bool = False) -> None:
if not self._decoder_applicable or self._decoder_order is None:
return
if self._decoder is not None:
self._release_decoder()
if self._decoder_order:
candidates = [name for name in self._decoder_order if name not in self._decoder_failures]
if not candidates:
LOG.debug("All requested decoder backends exhausted for %s", self._format.description)
self._decoder = None
self._decoder_backend_name = None
self._decoder_backend_key = None
self._decoder_failed = True
self._decoder_exhausted = True
return
preference: Optional[List[str]] = candidates
else:
if not self._decoder_failures:
preference = None
else:
remaining = [name for name in DEFAULT_BACKEND_ORDER if name not in self._decoder_failures]
if not remaining:
LOG.debug("All decoder backends exhausted for %s", self._format.description)
self._decoder = None
self._decoder_backend_name = None
self._decoder_backend_key = None
self._decoder_failed = True
self._decoder_exhausted = True
return
preference = remaining
try:
backend = create_decoder_backend(
self._format.description,
preference=preference,
)
except DecoderUnavailable as exc:
if initial:
LOG.warning("No decoder backend available for %s: %s", self._format.description, exc)
else:
LOG.debug("Decoder backend unavailable for %s: %s", self._format.description, exc)
self._decoder = None
self._decoder_backend_name = None
self._decoder_backend_key = None
self._decoder_failed = True
self._decoder_exhausted = True
return
self._decoder = backend
self._decoder_failed = False
self._decoder_exhausted = False
backend_name = getattr(backend, "backend_name", None)
label = backend_name or type(backend).__name__
self._decoder_backend_name = str(label)
self._decoder_backend_key = str(label).lower()
if initial:
LOG.info("Decoder backend %s active for %s", label, self._format.description)
else:
LOG.info("Decoder backend switched to %s for %s", label, self._format.description)
self._install_recorder(backend)
def _install_recorder(self, backend) -> None:
if self._record_path is None or self._recorder is not None:
return
recorder = None
fallback_used = False
try:
recorder = backend.create_recorder(
self._record_path,
width=self._frame.width,
height=self._frame.height,
fps=self._negotiated_fps,
)
except Exception as exc: # pragma: no cover - defensive
LOG.warning("Failed to start recorder: %s", exc)
recorder = None
if recorder is None and self._format.subtype == VS_FORMAT_MJPEG:
recorder = create_mjpeg_gstreamer_recorder(
self._record_path,
fps=self._negotiated_fps,
)
if recorder is not None:
fallback_used = True
if recorder is None:
if self._format.subtype == VS_FORMAT_MJPEG:
raise RuntimeError(
"Recording requires either the PyAV or GStreamer backends. "
"Install at least one (pip install av / python3-gi gstreamer1.0-plugins-good)."
)
raise RuntimeError(
"Recording compressed streams requires the PyAV backend (pip install av)."
)
if self._format.subtype == VS_FORMAT_MJPEG:
if fallback_used:
label = "GStreamer fallback"
else:
label = self._decoder_backend_name or type(backend).__name__
LOG.info("Recording MJPEG payloads via %s", label)
self._recorder = recorder
def _shutdown_recorder(self) -> None:
if self._recorder is None:
return
try:
self._recorder.close()
except Exception: # pragma: no cover - best-effort
LOG.debug("Recorder close() failed", exc_info=True)
finally:
self._recorder = None
def _decode_payload(self, payload: Union[bytes, bytearray]) -> Optional[object]:
if not self._decoder_applicable or self._decoder_order is None:
return None
if self._decoder is None and not self._decoder_exhausted:
self._install_decoder()
if not self._decoder or self._decoder_failed or self._decoder_exhausted:
return None
try:
data = payload if isinstance(payload, bytes) else bytes(payload)
frames = self._decoder.decode_packet(data)
except Exception as exc: # pragma: no cover - backend dependent
backend_label = self._decoder_backend_name or type(self._decoder).__name__
backend_key = self._decoder_backend_key
LOG.warning("Decoder backend %s failed: %s", backend_label, exc)
if backend_key:
self._decoder_failures.add(backend_key)
self._decoder_failed = True
self._release_decoder()
if not self._decoder_exhausted:
self._install_decoder()
return None
if not frames:
return None
if len(frames) > 1:
LOG.debug("Decoder produced %s frames; using the first one", len(frames))
return frames[0]
def _handle_frame_result(self, result: FrameAssemblyResult) -> None:
size = len(result.payload) if result.payload else 0
if result.payload is None or result.error:
self._stats.frames_dropped += 1
self._stats.last_drop_reason = result.reason
LOG.debug(
"FrameStream dropped frame (reason=%s size=%s)",
result.reason,
size,
)
return
payload_data = bytes(result.payload)
payload_for_record = payload_data
if self._recorder is not None:
if self._format.subtype == VS_FORMAT_MJPEG:
payload_for_record = _strip_mjpeg_app_markers(payload_data)
try:
self._recorder.submit(payload_for_record, fid=result.fid or 0, pts=result.pts)
except Exception: # pragma: no cover - best effort
LOG.debug("Recorder submit failed", exc_info=True)
if self._skip_initial > 0:
self._skip_initial -= 1
self._stats.frames_dropped += 1
self._stats.last_drop_reason = "skip-initial"
LOG.debug(
"Skipping initial frame (remaining=%s size=%s)",
self._skip_initial,
size,
)
return
self._stats.frames_completed += 1
self._stats.bytes_delivered += size
if result.duration is not None:
self._stats.last_frame_duration_s = result.duration
samples = self._stats.measured_frames
if samples == 0 or self._stats.average_frame_duration_s is None:
self._stats.average_frame_duration_s = result.duration
else:
prev = self._stats.average_frame_duration_s
self._stats.average_frame_duration_s = (
prev * samples + result.duration
) / (samples + 1)
self._stats.measured_frames = samples + 1
self._sequence += 1
decoded = self._decode_payload(payload_data)
frame = CapturedFrame(
payload=payload_data,
format=self._format,
frame=self._frame,
fid=result.fid if result.fid is not None else 0,
pts=result.pts,
timestamp=time.time(),
sequence=self._sequence,
decoded=decoded,
)
LOG.debug(
"FrameStream accepted frame #%s (reason=%s size=%s)",
frame.sequence,
result.reason,
size,
)
self._enqueue(frame)
def _on_packet(self, packet: bytes) -> None:
if not self._active or not packet:
return
for result in self._reassembler.feed(packet):
self._handle_frame_result(result)
[docs]
def yuy2_to_rgb(payload: bytes, width: int, height: int):
"""Convert a single YUY2 frame into an RGB ``numpy.ndarray``.
The function imports :mod:`numpy` lazily so that users who only need the
descriptor utilities do not have to install it.
"""
try:
import numpy as np
except ModuleNotFoundError as exc: # pragma: no cover - optional dependency
raise RuntimeError("numpy is required to convert YUY2 payloads") from exc
if width % 2:
raise ValueError("YUY2 frames must have an even width")
expected = width * height * 2
if len(payload) != expected:
raise ValueError(f"YUY2 payload length {len(payload)} does not match {width}x{height}")
data = np.frombuffer(payload, dtype=np.uint8)
grouped = data.reshape((height, width // 2, 4))
y0 = grouped[:, :, 0].astype(np.int32) - 16
u = grouped[:, :, 1].astype(np.int32) - 128
y1 = grouped[:, :, 2].astype(np.int32) - 16
v = grouped[:, :, 3].astype(np.int32) - 128
y = np.empty((height, width), dtype=np.int32)
y[:, 0::2] = y0
y[:, 1::2] = y1
u_full = np.repeat(u, 2, axis=1)
v_full = np.repeat(v, 2, axis=1)
c = np.clip(y, 0, None)
r = (298 * c + 409 * v_full + 128) >> 8
g = (298 * c - 100 * u_full - 208 * v_full + 128) >> 8
b = (298 * c + 516 * u_full + 128) >> 8
rgb = np.stack((r, g, b), axis=-1)
return np.clip(rgb, 0, 255).astype(np.uint8)
def gray8_to_rgb(payload: bytes, width: int, height: int):
"""Convert an 8-bit grayscale payload into an RGB array."""
try:
import numpy as np
except ModuleNotFoundError as exc: # pragma: no cover - optional dependency
raise RuntimeError("numpy is required to convert grayscale payloads") from exc
expected = width * height
if len(payload) != expected:
raise ValueError(f"GRAY8 payload length {len(payload)} does not match {width}x{height}")
gray = np.frombuffer(payload, dtype=np.uint8).reshape((height, width))
return np.repeat(gray[:, :, None], 3, axis=2)
def gray16_to_rgb(payload: bytes, width: int, height: int):
"""Convert a 16-bit grayscale payload into an RGB array (scaled to 8-bit)."""
try:
import numpy as np
except ModuleNotFoundError as exc: # pragma: no cover - optional dependency
raise RuntimeError("numpy is required to convert grayscale payloads") from exc
expected = width * height * 2
if len(payload) != expected:
raise ValueError(f"GRAY16 payload length {len(payload)} does not match {width}x{height}")
gray16 = np.frombuffer(payload, dtype=np.uint16).reshape((height, width))
gray8 = (gray16 >> 8).astype(np.uint8)
return np.repeat(gray8[:, :, None], 3, axis=2)
def _trim_mjpeg_payload(payload: bytes) -> bytes:
"""Return payload truncated at the JPEG EOI marker if trailing garbage is present."""
if not payload:
return payload
eoi = payload.rfind(b"\xff\xd9")
if eoi == -1:
return payload
if eoi + 2 == len(payload):
return payload
trimmed = payload[: eoi + 2]
LOG.debug("Trimming MJPEG payload from %s to %s bytes (extraneous %s bytes)", len(payload), len(trimmed), len(payload) - len(trimmed))
return trimmed
def _strip_mjpeg_app_markers(payload: bytes) -> bytes:
if len(payload) < 4 or not payload.startswith(b"\xff\xd8"):
return payload
out = bytearray()
out.extend(payload[0:2]) # SOI
offset = 2
length = len(payload)
while offset + 1 < length:
if payload[offset] != 0xFF:
out.extend(payload[offset:])
break
marker_start = offset
while offset < length and payload[offset] == 0xFF:
offset += 1
if offset >= length:
break
marker = payload[offset]
offset += 1
if marker == 0xDA: # SOS: rest is entropy-coded
out.extend(payload[marker_start:])
break
if marker == 0xD9: # EOI
out.extend(payload[marker_start:])
break
if offset + 1 > length:
break
seg_len = int.from_bytes(payload[offset:offset + 2], "big")
segment_end = offset + seg_len
if segment_end > length:
break
if 0xE0 <= marker <= 0xEF:
# Skip APP segments entirely.
offset = segment_end
continue
out.extend(payload[marker_start:segment_end])
offset = segment_end
return bytes(out)
[docs]
def decode_to_rgb(payload: bytes, stream_format: StreamFormat, frame: FrameInfo):
"""Convert a raw payload into an RGB image (numpy array).
Supports YUY2/YUYV and MJPEG. Raises :class:`RuntimeError` if decoding is
not possible due to missing dependencies (e.g. OpenCV for MJPEG).
"""
name = stream_format.description.upper()
payload_len = len(payload)
if stream_format.subtype == VS_FORMAT_UNCOMPRESSED:
if payload_len == frame.width * frame.height:
return gray8_to_rgb(payload, frame.width, frame.height)
if (
payload_len == frame.width * frame.height * 2
and "YUY" not in name
and "YUV" not in name
):
return gray16_to_rgb(payload, frame.width, frame.height)
if "YUY" in name or "YUV" in name:
return yuy2_to_rgb(payload, frame.width, frame.height)
# Fallback to the previous behaviour for other uncompressed formats.
return yuy2_to_rgb(payload, frame.width, frame.height)
if stream_format.subtype == VS_FORMAT_MJPEG or "MJPG" in name:
try:
import cv2
import numpy as np
except ImportError as exc: # pragma: no cover - optional dependency
raise RuntimeError("OpenCV required for MJPEG decoding") from exc
cleaned = _trim_mjpeg_payload(payload)
arr = np.frombuffer(cleaned, dtype=np.uint8)
bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if bgr is None:
raise RuntimeError("Failed to decode MJPEG frame (corrupt or unsupported payload)")
rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
return rgb
raise RuntimeError(f"Unsupported codec for conversion: {stream_format.description}")
class MJPEGPreviewPipeline:
"""Feed MJPEG frames into a GStreamer pipeline for quick preview."""
def __init__(self, fps: float):
if not GST_AVAILABLE:
raise RuntimeError("GStreamer bindings not available; install python3-gi and gst packages")
Gst.init(None)
fps_num = max(1, int(round(fps))) if fps > 0 else 30
pipeline_desc = (
f"appsrc name=src is-live=true do-timestamp=true format=time "
f"caps=image/jpeg,framerate={fps_num}/1 ! "
"jpegdec ! videoconvert ! autovideosink sync=false"
)
self._pipeline = Gst.parse_launch(pipeline_desc)
self._appsrc = self._pipeline.get_by_name("src")
self._loop = GLib.MainLoop()
self._thread = threading.Thread(target=self._loop.run, daemon=True)
self._pipeline.set_state(Gst.State.PLAYING)
self._thread.start()
self._fps = fps
def push(self, payload: bytes, timestamp_s: float) -> None:
buf = Gst.Buffer.new_allocate(None, len(payload), None)
buf.fill(0, payload)
if self._fps > 0:
duration = Gst.util_uint64_scale_int(
1, Gst.SECOND, max(1, int(round(self._fps)))
)
buf.duration = duration
timestamp = int(timestamp_s * Gst.SECOND)
buf.pts = buf.dts = timestamp
self._appsrc.emit("push-buffer", buf)
def close(self) -> None:
if self._pipeline:
with contextlib.suppress(Exception):
self._appsrc.emit("end-of-stream")
self._pipeline.set_state(Gst.State.NULL)
if self._loop.is_running():
self._loop.quit()
self._thread.join(timeout=2)
import contextlib
# ---------------------------------------------------------------------------
# VC (Video Control) helpers — user-space control even if uvcvideo is loaded
# ---------------------------------------------------------------------------
[docs]
def find_vc_interface_number(dev: usb.core.Device) -> int:
"""Return the interface number of the first VC interface (class 0x0e, subclass 0x01)."""
for cfg in dev:
for intf in cfg:
if intf.bInterfaceClass == UVC_CLASS and intf.bInterfaceSubClass == VC_SUBCLASS:
return intf.bInterfaceNumber
return 0
def _vc_w_index(vc_interface: int, unit_id: int) -> int:
# entity/unit id occupies the high byte, interface number in the low byte
return ((unit_id & 0xFF) << 8) | (vc_interface & 0xFF)
def _vc_w_value(selector: int) -> int:
# selector in high byte
return (selector & 0xFF) << 8
[docs]
@contextlib.contextmanager
def claim_vc_interface(
dev: usb.core.Device,
vc_if: int,
*,
auto_reattach: bool = True,
auto_detach: Optional[bool] = None,
):
"""Detach only the VC interface from the kernel, claim it, then release and reattach."""
reattach = False
detach = _auto_detach_vc_enabled() if auto_detach is None else bool(auto_detach)
try:
dev.set_configuration()
except usb.core.USBError:
pass
if detach:
try:
if dev.is_kernel_driver_active(vc_if):
dev.detach_kernel_driver(vc_if)
reattach = True
except (usb.core.USBError, NotImplementedError, AttributeError):
pass
else:
LOG.debug(
"Auto-detach disabled; attempting to claim VC interface %s without detaching kernel driver",
vc_if,
)
try:
usb.util.claim_interface(dev, vc_if)
except usb.core.USBError as exc:
if detach or getattr(exc, "errno", None) not in {errno.EBUSY, errno.EPERM}:
raise
LOG.warning(
"Failed to claim VC interface %s while LIBUSB_UVC_AUTO_DETACH_VC=0: %s",
vc_if,
exc,
)
raise RuntimeError(
"VC interface is busy. Detach the kernel driver or enable auto-detach."
) from exc
try:
yield
finally:
with contextlib.suppress(usb.core.USBError):
usb.util.release_interface(dev, vc_if)
if auto_reattach and reattach:
with contextlib.suppress(usb.core.USBError):
dev.attach_kernel_driver(vc_if)
[docs]
def vc_ctrl_get(dev: usb.core.Device, vc_if: int, unit_id: int, selector: int, request: int, length: int):
"""Low level VC GET_* (GET_CUR/MIN/MAX/RES/DEF/INFO/LEN)."""
wValue = _vc_w_value(selector)
wIndex = _vc_w_index(vc_if, unit_id)
try:
data = dev.ctrl_transfer(REQ_TYPE_IN, request, wValue, wIndex, length, timeout=500)
return bytes(data)
except usb.core.USBError:
return None
[docs]
def vc_ctrl_set(dev: usb.core.Device, vc_if: int, unit_id: int, selector: int, payload: bytes):
"""Low level VC SET_CUR."""
wValue = _vc_w_value(selector)
wIndex = _vc_w_index(vc_if, unit_id)
return dev.ctrl_transfer(REQ_TYPE_OUT, SET_CUR, wValue, wIndex, payload, timeout=500)
def _vc_get_len(dev: usb.core.Device, vc_if: int, unit_id: int, selector: int):
data = vc_ctrl_get(dev, vc_if, unit_id, selector, GET_LEN, 2)
if not data or len(data) < 2:
return None
value = int.from_bytes(data[:2], "little")
return value or None
[docs]
def read_vc_control_value(dev: usb.core.Device, vc_if: int, unit_id: int, selector: int, request: int, *, length_hint: int = 64):
"""Read a VC control using GET_LEN if available, else length_hint."""
length = _vc_get_len(dev, vc_if, unit_id, selector) or length_hint
return vc_ctrl_get(dev, vc_if, unit_id, selector, request, length)
[docs]
def write_vc_control_value(dev: usb.core.Device, vc_if: int, unit_id: int, selector: int, payload: bytes):
return vc_ctrl_set(dev, vc_if, unit_id, selector, payload)
[docs]
class UVCControlsManager:
"""Validate UVC controls and enrich them with quirks metadata."""
def __init__(
self,
device: usb.core.Device,
units: List[UVCUnit],
interface_number: Optional[int] = None,
) -> None:
self._device = device
self._interface = (
interface_number
if interface_number is not None
else find_vc_interface_number(device)
)
self._units = units
self._quirks = load_quirks()
self._controls: List[ControlEntry] = []
self._initialise()
def _initialise(self) -> None:
def _bytes_to_int(data: Optional[bytes], *, signed: bool = False) -> Optional[int]:
if not data:
return None
if len(data) > 4:
return None
return int.from_bytes(data, "little", signed=signed)
def _should_use_signed(min_raw: Optional[bytes], max_raw: Optional[bytes]) -> bool:
if not min_raw or not max_raw:
return False
if len(min_raw) != len(max_raw):
return False
if len(min_raw) not in (2, 4):
return False
min_unsigned = int.from_bytes(min_raw, "little", signed=False)
max_unsigned = int.from_bytes(max_raw, "little", signed=False)
return min_unsigned > max_unsigned
def _payload_length(length: int, min_raw: Optional[bytes], default_raw: Optional[bytes]) -> Optional[int]:
if length:
return length
if default_raw:
return len(default_raw)
if min_raw:
return len(min_raw)
return None
def _match_get_info(info_value: int, definition: dict) -> Optional[int]:
if definition is None:
return 0
score = 0
expected_info = definition.get("expected_info")
if expected_info is not None:
if isinstance(expected_info, (list, tuple, set)):
values = {int(v) for v in expected_info}
if info_value not in values:
return None
else:
if info_value != int(expected_info):
return None
score += 2
info_expect = definition.get("get_info_expect")
if info_expect is not None:
if isinstance(info_expect, dict):
value = info_expect.get("value")
if value is not None and info_value != int(value):
return None
if value is not None:
score += 2
for key, bit_value in info_expect.items():
if key == "value":
continue
key_str = str(key).upper()
if key_str.startswith("D") and key_str[1:].isdigit():
bit_index = int(key_str[1:])
if bit_index < 0 or bit_index > 7:
continue
expected_bit = 1 if int(bit_value) else 0
if ((info_value >> bit_index) & 0x01) != expected_bit:
return None
score += 1
else:
try:
expected_value = int(info_expect)
except (TypeError, ValueError):
expected_value = None
if expected_value is not None:
if info_value != expected_value:
return None
score += 2
return score
def _match_length(payload_len: Optional[int], definition: dict) -> Optional[int]:
if payload_len is None or definition is None:
return 0
expected = definition.get("expected_length")
payload_spec = definition.get("payload")
score = 0
if expected is None and isinstance(payload_spec, dict):
expected = payload_spec.get("fixed_len")
if expected is None:
expected = payload_spec.get("expected_length")
allowed_lengths: Optional[set] = None
if expected is not None:
if isinstance(expected, (list, tuple, set)):
allowed_lengths = {int(v) for v in expected}
else:
allowed_lengths = {int(expected)}
if allowed_lengths is not None:
if payload_len not in allowed_lengths:
return None
score += 2
if isinstance(payload_spec, dict):
min_len = payload_spec.get("min_len")
max_len = payload_spec.get("max_len")
if min_len is not None and payload_len < int(min_len):
return None
if max_len is not None and payload_len > int(max_len):
return None
if min_len is not None or max_len is not None:
score += 1
return score
def _consume_definition(
definitions: List[dict],
*,
selector: int,
info_value: int,
payload_len: Optional[int],
min_raw: Optional[bytes],
max_raw: Optional[bytes],
step_raw: Optional[bytes],
default_raw: Optional[bytes],
) -> Optional[dict]:
best_def = None
best_score = -1
for definition in definitions:
if not isinstance(definition, dict):
continue
if definition.get("_used"):
continue
score = 0
expected_selector = definition.get("selector")
if expected_selector is not None:
try:
if int(expected_selector) != selector:
continue
score += 5
except (TypeError, ValueError):
continue
info_score = _match_get_info(info_value, definition)
if info_score is None:
continue
score += info_score
length_score = _match_length(payload_len, definition)
if length_score is None:
continue
score += length_score
if score <= 0:
continue
if score > best_score:
best_score = score
best_def = definition
if best_def is not None:
best_def["_used"] = True
return best_def
for unit in self._units:
controls = getattr(unit, "controls", []) or []
if not controls:
continue
guid = ""
quirk_map: Dict[str, dict] = {}
quirk_definitions: List[dict] = []
if isinstance(unit, ExtensionUnit):
guid = unit.guid.lower()
quirk_entry = self._quirks.get(guid, {})
if isinstance(quirk_entry, dict):
quirk_controls = quirk_entry.get("controls", {})
if isinstance(quirk_controls, dict):
quirk_map = {str(k): v for k, v in quirk_controls.items()}
elif isinstance(quirk_controls, list):
for item in quirk_controls:
if isinstance(item, dict):
# Shallow copy so we can mark entries as used during matching.
quirk_definitions.append(dict(item))
for control in controls:
control_type = control.type
info = vc_ctrl_get(
self._device,
self._interface,
control.unit_id,
control.selector,
GET_INFO,
1,
)
if not info or not info[0]:
continue
length = _vc_get_len(self._device, self._interface, control.unit_id, control.selector) or 0
length_hint = max(1, min(length or 4, 32))
min_raw = vc_ctrl_get(
self._device,
self._interface,
control.unit_id,
control.selector,
GET_MIN,
length_hint,
)
max_raw = vc_ctrl_get(
self._device,
self._interface,
control.unit_id,
control.selector,
GET_MAX,
length_hint,
)
step_raw = vc_ctrl_get(
self._device,
self._interface,
control.unit_id,
control.selector,
GET_RES,
length_hint,
)
default_raw = vc_ctrl_get(
self._device,
self._interface,
control.unit_id,
control.selector,
GET_DEF,
length_hint,
)
payload_len = _payload_length(length, min_raw, default_raw)
signed = _should_use_signed(min_raw, max_raw)
name = control.name
metadata: Dict[str, object] = {}
if quirk_map:
override = quirk_map.get(str(control.selector))
if isinstance(override, dict):
override_name = override.get("name")
if override_name:
name = override_name
metadata = {k: v for k, v in override.items() if k != "name"}
elif quirk_definitions:
matched = _consume_definition(
quirk_definitions,
selector=control.selector,
info_value=info[0],
payload_len=payload_len,
min_raw=min_raw,
max_raw=max_raw,
step_raw=step_raw,
default_raw=default_raw,
)
if matched:
override_name = matched.get("name")
if override_name:
name = override_name
override_type = matched.get("type")
if override_type:
control_type = str(override_type)
metadata = {
k: v
for k, v in matched.items()
if not k.startswith("_") and k != "name"
}
metadata.setdefault("info_byte", info[0])
if payload_len is not None and "payload_length" not in metadata:
metadata["payload_length"] = payload_len
entry = ControlEntry(
interface_number=self._interface,
unit_id=control.unit_id,
selector=control.selector,
name=name,
type=control_type,
info=info[0],
minimum=_bytes_to_int(min_raw, signed=signed),
maximum=_bytes_to_int(max_raw, signed=signed),
step=_bytes_to_int(step_raw, signed=signed),
default=_bytes_to_int(default_raw, signed=signed),
length=length or (len(default_raw) if default_raw else len(min_raw) if min_raw else None),
raw_minimum=min_raw,
raw_maximum=max_raw,
raw_step=step_raw,
raw_default=default_raw,
metadata=metadata,
)
self._controls.append(entry)
[docs]
def get_controls(self) -> List[ControlEntry]:
return list(self._controls)
__all__ = [
"AltSettingInfo",
"CapturedFrame",
"StreamStats",
"FrameInfo",
"StreamFormat",
"StreamingInterface",
"ControlEntry",
"UVCControl",
"UVCUnit",
"ExtensionUnit",
"UVCCamera",
"FrameStream",
"UVCError",
"CodecPreference",
"DecoderPreference",
"describe_device",
"find_uvc_devices",
"iter_video_streaming_interfaces",
"list_streaming_interfaces",
"list_control_units",
"parse_vs_descriptors",
"perform_probe_commit",
"perform_still_probe_commit",
"probe_streaming_interface",
"select_format_and_frame",
"resolve_stream_preference",
"resolve_still_preference",
"yuy2_to_rgb",
"decode_to_rgb",
"VS_FORMAT_UNCOMPRESSED",
"VS_FORMAT_MJPEG",
"VS_FORMAT_FRAME_BASED",
"REQ_TYPE_IN",
"GET_CUR",
"find_vc_interface_number",
"claim_vc_interface",
"vc_ctrl_get",
"vc_ctrl_set",
"read_vc_control_value",
"write_vc_control_value",
"UVCControlsManager",
"load_quirks",
]