diff --git a/.github/workflows/pytest.yaml b/.github/workflows/pytest.yaml index 81b8ae52e..13b8406cf 100644 --- a/.github/workflows/pytest.yaml +++ b/.github/workflows/pytest.yaml @@ -51,11 +51,22 @@ jobs: sudo apt-get update sudo apt-get install -y libgpiod-dev liblgpio-dev + - name: Install sigrok-cli (Linux) + if: runner.os == 'Linux' + run: | + sudo apt-get update + sudo apt-get install -y sigrok-cli + - name: Install Qemu (macOS) if: runner.os == 'macOS' run: | brew install qemu + - name: Install sigrok-cli (macOS) + if: runner.os == 'macOS' + run: | + brew install sigrok-cli + - name: Cache Fedora Cloud images id: cache-fedora-cloud-images uses: actions/cache@v4 diff --git a/Dockerfile b/Dockerfile index 5b2ffb481..336c92636 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,7 +7,7 @@ RUN dnf install -y make git && \ COPY --from=uv /uv /uvx /bin/ FROM fedora:42 AS product -RUN dnf install -y python3 ustreamer libusb1 android-tools python3-libgpiod && \ +RUN dnf install -y python3 ustreamer libusb1 android-tools python3-libgpiod sigrok-cli && \ dnf clean all && \ rm -rf /var/cache/dnf COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/ diff --git a/docs/source/reference/package-apis/drivers/index.md b/docs/source/reference/package-apis/drivers/index.md index 8286663bb..71a539ea7 100644 --- a/docs/source/reference/package-apis/drivers/index.md +++ b/docs/source/reference/package-apis/drivers/index.md @@ -71,6 +71,8 @@ Drivers for debugging and programming devices: * **[QEMU](qemu.md)** (`jumpstarter-driver-qemu`) - QEMU virtualization platform * **[Corellium](corellium.md)** (`jumpstarter-driver-corellium`) - Corellium virtualization platform +* **[Sigrok](sigrok.md)** (`jumpstarter-driver-sigrok`) - Logic analyzer and + oscilloscope support via sigrok-cli * **[U-Boot](uboot.md)** (`jumpstarter-driver-uboot`) - Universal Bootloader interface * **[RideSX](ridesx.md)** (`jumpstarter-driver-ridesx`) - Flashing and power management for Qualcomm RideSX devices @@ -104,6 +106,7 @@ gpiod.md ridesx.md sdwire.md shell.md +sigrok.md ssh.md snmp.md tasmota.md diff --git a/docs/source/reference/package-apis/drivers/sigrok.md b/docs/source/reference/package-apis/drivers/sigrok.md new file mode 120000 index 000000000..979eeb03f --- /dev/null +++ b/docs/source/reference/package-apis/drivers/sigrok.md @@ -0,0 +1 @@ +../../../../../packages/jumpstarter-driver-sigrok/README.md \ No newline at end of file diff --git a/packages/jumpstarter-driver-sigrok/.gitignore b/packages/jumpstarter-driver-sigrok/.gitignore new file mode 100644 index 000000000..cbc5d672b --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/.gitignore @@ -0,0 +1,3 @@ +__pycache__/ +.coverage +coverage.xml diff --git a/packages/jumpstarter-driver-sigrok/README.md b/packages/jumpstarter-driver-sigrok/README.md new file mode 100644 index 000000000..13344e058 --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/README.md @@ -0,0 +1,227 @@ +# Sigrok Driver + +`jumpstarter-driver-sigrok` wraps [sigrok-cli](https://sigrok.org/wiki/Sigrok-cli) to provide logic analyzer and oscilloscope capture from Jumpstarter exporters. It supports: +- **Logic analyzers** (digital channels) +- **Oscilloscopes** (analog channels) - voltage waveform capture +- One-shot and streaming capture +- Multiple output formats with parsing (VCD, CSV, Bits, ASCII) + +## Installation + +```shell +pip3 install --extra-index-url https://pkg.jumpstarter.dev/simple/ jumpstarter-driver-sigrok +``` + +## Configuration (exporter) + +```yaml +export: + sigrok: + type: jumpstarter_driver_sigrok.driver.Sigrok + driver: fx2lafw # sigrok driver (demo, fx2lafw, rigol-ds, etc.) + conn: auto # optional: USB VID.PID, serial path, or "auto" for auto-detect + channels: # optional: map device channels to friendly names + D0: clk + D1: mosi + D2: miso + D3: cs +``` + +### Configuration Parameters + +| Parameter | Description | Type | Required | Default | +|-----------|-------------|------|----------|---------| +| `driver` | Sigrok driver name (e.g., `demo`, `fx2lafw`, `rigol-ds`) | str | yes | - | +| `conn` | Connection string (USB VID.PID, serial path, or `"auto"` for auto-detect) | str \| None | no | "auto" | +| `executable` | Path to `sigrok-cli` executable | str | no | Auto-detected from PATH | +| `channels` | Channel mapping from device names (D0, A0) to semantic names (clk, voltage) | dict[str, str] | no | {} (empty) | + +## CaptureConfig Parameters (client-side) + +| Parameter | Description | Type | Required | Default | +|-----------|-------------|------|----------|---------| +| `sample_rate` | Sampling rate (e.g., `"1M"`, `"8MHz"`, `"24000000"`) | str | no | "1M" | +| `samples` | Number of samples to capture (`None` for continuous) | int \| None | no | None | +| `pretrigger` | Number of samples to capture before trigger | int \| None | no | None | +| `triggers` | Trigger conditions by channel name (e.g., `{"cs": "falling"}`) | dict[str, str] \| None | no | None | +| `channels` | List of channel names to capture (overrides defaults) | list[str] \| None | no | None | +| `output_format` | Output format (vcd, csv, bits, ascii, srzip, binary) | str | no | "vcd" | + +## Client API + +- `scan()` — list devices for the configured driver +- `capture(config)` — one-shot capture, returns `CaptureResult` with base64 data +- `capture_stream(config)` — streaming capture via `--continuous` +- `get_driver_info()` — driver, conn, channel map +- `get_channel_map()` — device-to-semantic name mappings +- `list_output_formats()` — supported formats (csv, srzip, vcd, binary, bits, ascii) + +## Output Formats + +The driver supports multiple output formats. **VCD (Value Change Dump) is the default** because: +- ✅ **Efficient**: Only records signal changes (not every sample) +- ✅ **Precise timing**: Includes exact timestamps in nanoseconds +- ✅ **Widely supported**: Standard format for signal analysis tools +- ✅ **Mixed signals**: Handles both digital and analog data + +### Available Formats + +| Format | Use Case | Decoded By | +|--------|----------|------------| +| `vcd` (default) | Change-based signals with timing | `result.decode()` → `list[Sample]` | +| `csv` | All samples with timing | `result.decode()` → `list[Sample]` | +| `bits` | Bit sequences by channel | `result.decode()` → `dict[str, list[int]]` | +| `ascii` | ASCII art visualization | `result.decode()` → `str` | +| `srzip` | Raw sigrok session (for PulseView) | `result.data` (raw bytes) | +| `binary` | Raw binary data | `result.data` (raw bytes) | + +### Output Format Constants + +```python +from jumpstarter_driver_sigrok.common import OutputFormat + +config = CaptureConfig( + sample_rate="1MHz", + samples=1000, + output_format=OutputFormat.VCD, # or CSV, BITS, ASCII, SRZIP, BINARY +) +``` + +## Examples + +### Example 1: Simple Capture (VCD format - default) + +**Python client code:** +```python +from jumpstarter_driver_sigrok.common import CaptureConfig + +# Capture with default VCD format (efficient, change-based with timing) +config = CaptureConfig( + sample_rate="1MHz", + samples=1000, + channels=["D0", "D1", "D2"], # Use device channel names or mapped names +) +result = client.capture(config) + +# Decode VCD to get samples with timing +samples = result.decode() # list[Sample] +for sample in samples[:5]: + print(f"Time: {sample.time}s, Values: {sample.values}") +``` + +**Equivalent sigrok-cli command:** +```bash +sigrok-cli -d fx2lafw -C D0,D1,D2 \ + -c samplerate=1MHz --samples 1000 \ + -O vcd -o /tmp/capture.vcd +``` + +--- + +### Example 2: Triggered Capture with Pretrigger + +**Python client code:** +```python +from jumpstarter_driver_sigrok.common import CaptureConfig + +# Capture with trigger and pretrigger buffer (VCD format - default) +config = CaptureConfig( + sample_rate="8MHz", + samples=20000, + pretrigger=5000, # Capture 5000 samples before trigger + triggers={"D0": "rising"}, # Trigger on D0 rising edge + channels=["D0", "D1", "D2", "D3"], + # output_format defaults to VCD (efficient change-based format) +) +result = client.capture(config) + +# Decode to analyze signal changes with precise timing +samples = result.decode() # list[Sample] - only changes recorded +print(f"Captured {len(samples)} signal changes") + +# Access timing and values +for sample in samples[:3]: + print(f"Time: {sample.time}s, Changed: {sample.values}") +``` + +**Equivalent sigrok-cli command:** +```bash +sigrok-cli -d fx2lafw -C D0,D1,D2,D3 \ + -c samplerate=8MHz,samples=20000,pretrigger=5000 \ + --triggers D0=rising \ + -O vcd -o /tmp/capture.vcd +``` + +--- + +### Example 3: Oscilloscope (Analog Channels) + +**Exporter configuration:** +```yaml +export: + oscilloscope: + type: jumpstarter_driver_sigrok.driver.Sigrok + driver: rigol-ds # or demo for testing + conn: usb # or serial path + channels: + A0: CH1 + A1: CH2 +``` + +**Python client code:** +```python +from jumpstarter_driver_sigrok.common import CaptureConfig, OutputFormat + +# Capture analog waveforms +config = CaptureConfig( + sample_rate="1MHz", + samples=10000, + channels=["CH1", "CH2"], # Analog channels + output_format=OutputFormat.CSV, # CSV for voltage values +) +result = client.capture(config) + +# Parse voltage data +samples = result.decode() # list[Sample] +for sample in samples[:5]: + print(f"Time: {sample.time}s") + print(f" CH1: {sample.values.get('A0', 'N/A')}V") + print(f" CH2: {sample.values.get('A1', 'N/A')}V") +``` + +**Equivalent sigrok-cli command:** +```bash +sigrok-cli -d rigol-ds:conn=usb -C A0=CH1,A1=CH2 \ + -c samplerate=1MHz --samples 10000 \ + -O csv -o /tmp/capture.csv +``` + +--- + +### Example 4: Bits Format (Simple Bit Sequences) + +**Python client code:** +```python +from jumpstarter_driver_sigrok.common import CaptureConfig, OutputFormat + +# Capture in bits format (useful for visual inspection) +config = CaptureConfig( + sample_rate="100kHz", + samples=100, + channels=["D0", "D1", "D2"], + output_format=OutputFormat.BITS, +) +result = client.capture(config) + +# Get bit sequences per channel +bits_by_channel = result.decode() # dict[str, list[int]] +for channel, bits in bits_by_channel.items(): + print(f"{channel}: {''.join(map(str, bits[:20]))}") # First 20 bits +``` + +**Equivalent sigrok-cli command:** +```bash +sigrok-cli -d demo -C D0,D1,D2 \ + -c samplerate=100kHz --samples 100 \ + -O bits -o /tmp/capture.bits +``` diff --git a/packages/jumpstarter-driver-sigrok/examples/exporter.yaml b/packages/jumpstarter-driver-sigrok/examples/exporter.yaml new file mode 100644 index 000000000..7e1029ea9 --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/examples/exporter.yaml @@ -0,0 +1,22 @@ +apiVersion: jumpstarter.dev/v1alpha1 +kind: ExporterConfig +metadata: + namespace: default + name: demo +endpoint: grpc.jumpstarter.192.168.0.203.nip.io:8082 +token: "" +export: + sigrok: + type: jumpstarter_driver_sigrok.driver.Sigrok + config: + driver: demo + conn: auto + channels: + D0: vcc + D1: cs + D2: miso + D3: mosi + D4: clk + D5: sda + D6: scl + diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/__init__.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/__init__.py new file mode 100644 index 000000000..7b134cb86 --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/__init__.py @@ -0,0 +1,11 @@ +from jumpstarter_driver_sigrok.common import ( + CaptureConfig, + CaptureResult, + DecoderConfig, + OutputFormat, + Sample, +) +from jumpstarter_driver_sigrok.driver import Sigrok + +__all__ = ["Sigrok", "CaptureConfig", "CaptureResult", "DecoderConfig", "OutputFormat", "Sample"] + diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/client.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/client.py new file mode 100644 index 000000000..1e1dc9d20 --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/client.py @@ -0,0 +1,37 @@ +from dataclasses import dataclass + +from .common import CaptureConfig, CaptureResult +from jumpstarter.client import DriverClient + + +@dataclass(kw_only=True) +class SigrokClient(DriverClient): + """Client methods for the Sigrok driver.""" + + def scan(self) -> str: + return self.call("scan") + + def capture(self, config: CaptureConfig | dict) -> CaptureResult: + return CaptureResult.model_validate(self.call("capture", config)) + + def capture_stream(self, config: CaptureConfig | dict): + """Stream capture data from sigrok-cli. + + Args: + config: CaptureConfig or dict with capture parameters + + Yields: + bytes: Chunks of captured data + """ + for chunk in self.streamingcall("capture_stream", config): + yield chunk + + def get_driver_info(self) -> dict: + return self.call("get_driver_info") + + def get_channel_map(self) -> dict: + return self.call("get_channel_map") + + def list_output_formats(self) -> list[str]: + return self.call("list_output_formats") + diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py new file mode 100644 index 000000000..bffbb97e0 --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py @@ -0,0 +1,199 @@ +from __future__ import annotations + +from typing import Any, Iterator + +from pydantic import BaseModel, Field + + +class OutputFormat: + """Constants for sigrok output formats.""" + CSV = "csv" + BITS = "bits" + ASCII = "ascii" + BINARY = "binary" + SRZIP = "srzip" + VCD = "vcd" + + @classmethod + def all(cls) -> list[str]: + return [cls.CSV, cls.BITS, cls.ASCII, cls.BINARY, cls.SRZIP, cls.VCD] + + +class Sample(BaseModel): + """A single sample with timing information.""" + sample: int # Sample index + time: float # Time in seconds (full precision) + values: dict[str, int | float] # Channel values (digital: 0/1, analog: voltage) + + def __str__(self) -> str: + """Format sample with clean time display using appropriate unit (fs/ps/ns/μs/ms/s).""" + time_str = self._format_time(self.time) + return f"Sample(sample={self.sample}, time={time_str}, values={self.values})" + + @staticmethod + def _format_time(time_s: float) -> str: + """Format time in seconds to the most appropriate unit. + + Args: + time_s: Time in seconds + + Returns: + Formatted string like "1.5ns", "2.3μs", "1.5ms", "2s" + """ + # Special case for zero + if time_s == 0: + return "0s" + + abs_time = abs(time_s) + + # Define units in descending order (seconds to femtoseconds) + units = [ + (1.0, "s"), + (1e-3, "ms"), + (1e-6, "μs"), + (1e-9, "ns"), + (1e-12, "ps"), + (1e-15, "fs"), + ] + + # Find the most appropriate unit + for scale, unit in units: + if abs_time >= scale or scale == 1e-15: # Use fs as minimum + value = time_s / scale + # Format with up to 6 significant digits, remove trailing zeros + formatted = f"{value:.6g}" + return f"{formatted}{unit}" + + # Fallback (should never reach here) + return f"{time_s:.6g}s" + + +class DecoderConfig(BaseModel): + """Protocol decoder configuration (real-time during capture).""" + + name: str + channels: dict[str, str] | None = None + options: dict[str, Any] | None = None + annotations: list[str] | None = None + stack: list["DecoderConfig"] | None = None + + +class CaptureConfig(BaseModel): + sample_rate: str = Field(default="1M", description="e.g., 8MHz, 1M, 24000000") + samples: int | None = Field(default=None, description="number of samples; None for continuous") + pretrigger: int | None = Field(default=None, description="samples before trigger") + triggers: dict[str, str] | None = Field(default=None, description="e.g., {'D0': 'rising'}") + channels: list[str] | None = Field(default=None, description="override default channels by name") + output_format: str = Field( + default=OutputFormat.VCD, + description="Output format (default: vcd - efficient change-based format with timing). " + "Options: vcd, csv, srzip, binary, bits, ascii", + ) + decoders: list[DecoderConfig] | None = Field(default=None, description="real-time protocol decoding") + + +class CaptureResult(BaseModel): + """Result from a capture operation. + + Note: data is base64-encoded for reliable JSON transport. Client methods + automatically decode it to bytes for you. + """ + data_b64: str # Base64-encoded binary data + output_format: str + sample_rate: str + channel_map: dict[str, str] + triggers: dict[str, str] | None = None + decoders: list[DecoderConfig] | None = None + + def __str__(self) -> str: + """Format CaptureResult with truncated data_b64 field.""" + data_len = len(self.data_b64) + if data_len <= 50: + data_preview = self.data_b64 + else: + # Show first 50 and last 50 chars with ellipsis + data_preview = f"{self.data_b64[:25]}...{self.data_b64[-25:]} ({data_len} chars)" + + return ( + f"CaptureResult(output_format='{self.output_format}', " + f"sample_rate='{self.sample_rate}', " + f"data_size={len(self.data)} bytes, " + f"channels={len(self.channel_map)}, " + f"data_b64='{data_preview}')" + ) + + @property + def data(self) -> bytes: + """Get the captured data as bytes (auto-decodes from base64).""" + from base64 import b64decode + return b64decode(self.data_b64) + + def decode(self) -> Iterator[Sample] | dict[str, list[int]] | str: + """Parse captured data based on output format. + + Returns: + - CSV format: Iterator[Sample] yielding samples with timing and all values per sample + - VCD format: Iterator[Sample] yielding samples with timing and only changed values + - Bits format: dict[str, list[int]] with channel→bit sequences + - ASCII format: str with ASCII art visualization + - Other formats: raises NotImplementedError (use .data for raw bytes) + + Note: + Channel names in the output depend on how the data was captured: + - If captured with channel mapping, sigrok-cli outputs mapped names (vcc, cs, etc.) + - If captured without mapping, outputs device names (D0, D1, etc.) + + Raises: + NotImplementedError: For binary/srzip formats (use .data property) + """ + if self.output_format == OutputFormat.CSV: + from .csv import parse_csv + samples_data = parse_csv(self.data, self.sample_rate) + return (Sample.model_validate(s) for s in samples_data) + elif self.output_format == OutputFormat.VCD: + from .vcd import parse_vcd + samples_data = parse_vcd(self.data, self.sample_rate) + return (Sample.model_validate(s) for s in samples_data) + elif self.output_format == OutputFormat.BITS: + return self._parse_bits() + elif self.output_format == OutputFormat.ASCII: + return self.data.decode("utf-8") + else: + raise NotImplementedError( + f"Parsing not implemented for {self.output_format} format. " + f"Use .data property to get raw bytes." + ) + + def _parse_bits(self) -> dict[str, list[int]]: + """Parse bits format to dict of channel→bit sequences. + + Sigrok-cli bits format: "D0:10001\\nD1:01110\\n..." + Each line has format "channel_name:bits" + + Note: For large sample counts, sigrok-cli wraps bits across multiple + lines with repeated channel names. We accumulate all occurrences. + """ + text = self.data.decode("utf-8") + lines = [line.strip() for line in text.strip().split("\n") if line.strip()] + + result: dict[str, list[int]] = {} + + for line in lines: + # Bits format: "D0:10001" or "A0:10001" + if ":" in line: + channel_device_name, bits_str = line.split(":", 1) + channel_device_name = channel_device_name.strip() + + # Map device name (D0) to user-friendly name (vcc) if available + channel_name = self.channel_map.get(channel_device_name, channel_device_name) + + # Parse bits from this line + bits = [int(b) for b in bits_str if b in "01"] + + # Accumulate bits for this channel (may appear on multiple lines) + if channel_name not in result: + result[channel_name] = [] + result[channel_name].extend(bits) + + return result + diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv.py new file mode 100644 index 000000000..a0cbdde8a --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv.py @@ -0,0 +1,139 @@ +"""CSV format parser for sigrok captures.""" + +from __future__ import annotations + +import csv +from typing import Iterator + + +def parse_csv(data: bytes, sample_rate: str) -> Iterator[dict]: + """Parse CSV format to iterator of samples with timing. + + Args: + data: Raw CSV data as bytes + sample_rate: Sample rate string (e.g., "100kHz", "1MHz") + + Yields: + Dicts with keys: sample, time (seconds), values + """ + text = data.decode("utf-8") + lines = text.strip().split("\n") + + # Parse sample rate for timing calculation + sample_rate_hz = _parse_sample_rate_hz(sample_rate) + time_step_s = 1.0 / sample_rate_hz # seconds per sample + + # Skip comment lines and analog preview lines (format: "A0: -10.0000 V DC") + # The actual data starts after a header row with types like "logic,logic,V DC,V DC" + data_lines = _extract_csv_data_lines(lines) + + if not data_lines or len(data_lines) < 2: + return + + # Parse the CSV data + reader = csv.reader(data_lines) + + # First row is types (logic, V DC, etc.) - use for channel name inference + types_row = next(reader) + + # Get channel names from types + channel_names = _infer_channel_names(types_row) + + # Parse and yield data rows one by one + for idx, row in enumerate(reader): + values = _parse_csv_row(channel_names, row) + yield { + "sample": idx, + "time": idx * time_step_s, + "values": values, + } + + +def _parse_sample_rate_hz(sample_rate: str) -> float: + """Parse sample rate string to Hz.""" + rate = sample_rate.strip().upper() + multipliers = {"K": 1e3, "M": 1e6, "G": 1e9} + + for suffix, mult in multipliers.items(): + if rate.endswith(f"{suffix}HZ"): + return float(rate[:-3]) * mult + elif rate.endswith(suffix): + return float(rate[:-1]) * mult + + # Assume Hz if no suffix + return float(rate.rstrip("HZ")) + + +def _extract_csv_data_lines(lines: list[str]) -> list[str]: + """Extract actual CSV data lines, skipping comments and analog preview lines.""" + data_lines = [] + + for _i, line in enumerate(lines): + line = line.strip() + # Skip comment lines + if line.startswith(";"): + continue + # Skip analog preview lines (contain colon, not CSV comma-separated) + if ":" in line and "," not in line: + continue + # This is CSV data + data_lines.append(line) + + return data_lines + + +def _infer_channel_names(types_row: list[str]) -> list[str]: + """Infer channel names from CSV type header row. + + Args: + types_row: List of type strings like ["logic", "logic", "V DC", "V DC"] + + Returns: + List of channel names like ["D0", "D1", "A0", "A1"] + """ + channel_names = [] + digital_count = 0 + analog_count = 0 + + for type_str in types_row: + type_lower = type_str.lower() + if "logic" in type_lower: + channel_names.append(f"D{digital_count}") + digital_count += 1 + elif "v" in type_lower or "dc" in type_lower: + # Analog channel + channel_names.append(f"A{analog_count}") + analog_count += 1 + else: + # Unknown type, use generic name + channel_names.append(f"CH{len(channel_names)}") + + return channel_names + + +def _parse_csv_row(channel_names: list[str], row: list[str]) -> dict[str, int | float]: + """Parse a CSV data row into channel values. + + Args: + channel_names: List of channel names + row: List of value strings + + Returns: + Dict mapping channel name to parsed value + """ + values = {} + + for channel, value in zip(channel_names, row, strict=True): + value = value.strip() + # Try to parse as number (analog) or binary (digital) + try: + if "." in value or "e" in value.lower(): + values[channel] = float(value) + else: + values[channel] = int(value) + except ValueError: + # Keep as string if not a number + values[channel] = value + + return values + diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv_test.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv_test.py new file mode 100644 index 000000000..e15e545a0 --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv_test.py @@ -0,0 +1,133 @@ +"""Tests for CSV format parser.""" + +from shutil import which + +import pytest + +from .client import SigrokClient +from .common import CaptureConfig, CaptureResult, OutputFormat +from .driver import Sigrok +from jumpstarter.common.utils import serve + + +@pytest.fixture +def demo_driver_instance(): + """Create a Sigrok driver instance configured for the demo device.""" + # Demo driver has 8 digital channels (D0-D7) and 5 analog (A0-A4) + # Map device channels to decoder-friendly semantic names + return Sigrok( + driver="demo", + executable="sigrok-cli", + channels={ + "D0": "vcc", + "D1": "cs", + "D2": "miso", + "D3": "mosi", + "D4": "clk", + "D5": "sda", + "D6": "scl", + "D7": "gnd", + }, + ) + + +@pytest.fixture +def demo_client(demo_driver_instance): + """Create a client for the demo Sigrok driver.""" + with serve(demo_driver_instance) as client: + yield client + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_csv_format_basic(demo_client: SigrokClient): + """Test CSV format capture with demo driver.""" + cfg = CaptureConfig( + sample_rate="50kHz", + samples=50, + output_format=OutputFormat.CSV, + channels=["vcc", "cs"], # Select specific digital channels + ) + + result = demo_client.capture(cfg) + assert isinstance(result, CaptureResult) + assert isinstance(result.data, bytes) + decoded_data = list(result.decode()) + assert isinstance(decoded_data, list) + assert len(decoded_data) > 0 + # CSV format uses inferred names (D0, D1, etc.) based on column types + # Channel mapping is only preserved in VCD format + first_sample = decoded_data[0] + assert "D0" in first_sample.values or "D1" in first_sample.values + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_csv_format_timing(demo_client: SigrokClient): + """Test CSV format timing calculations with integer nanoseconds.""" + cfg = CaptureConfig( + sample_rate="100kHz", + samples=50, + output_format=OutputFormat.CSV, + channels=["D0", "D1", "D2"], # Select specific channels + ) + + result = demo_client.capture(cfg) + assert isinstance(result, CaptureResult) + + # Decode the CSV data + samples = list(result.decode()) + assert isinstance(samples, list) + assert len(samples) > 0 + + # Verify timing progresses correctly + for sample in samples: + assert isinstance(sample.time, float) + # Verify timing progresses (1/100kHz = 0.00001s per sample) + assert sample.time == sample.sample * 0.00001 + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_csv_format_analog_channels(demo_client: SigrokClient): + """Test CSV capture of analog channels with voltage values.""" + cfg = CaptureConfig( + sample_rate="100kHz", + samples=20, + output_format=OutputFormat.CSV, + channels=["A0", "A1"], # Select specific analog channels + ) + + result = demo_client.capture(cfg) + assert isinstance(result, CaptureResult) + assert isinstance(result.data, bytes) + decoded_data = list(result.decode()) + assert isinstance(decoded_data, list) + assert len(decoded_data) > 0 + + # Check first sample for analog values + first_sample = decoded_data[0] + assert len(first_sample.values) > 0 + + # Analog values should be floats (voltages) + for _channel, value in first_sample.values.items(): + assert isinstance(value, (int, float)) + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_csv_format_mixed_channels(demo_client: SigrokClient): + """Test CSV with both digital and analog channels.""" + cfg = CaptureConfig( + sample_rate="100kHz", + samples=30, + output_format=OutputFormat.CSV, + channels=["D0", "D1", "A0"], # Mix of digital and analog + ) + + result = demo_client.capture(cfg) + samples = list(result.decode()) + + assert isinstance(samples, list) + assert len(samples) > 0 + + # Verify we have values for channels + first_sample = samples[0] + assert len(first_sample.values) > 0 + diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py new file mode 100644 index 000000000..35c7b88d0 --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py @@ -0,0 +1,286 @@ +from __future__ import annotations + +import asyncio +import subprocess +from base64 import b64encode +from dataclasses import dataclass, field +from pathlib import Path +from shutil import which +from tempfile import TemporaryDirectory + +from .common import CaptureConfig, DecoderConfig, OutputFormat +from jumpstarter.driver import Driver, export + + +def find_sigrok_cli() -> str | None: + """Find sigrok-cli executable in PATH. + + Returns: + Path to executable or None if not found + """ + return which("sigrok-cli") + + +@dataclass(kw_only=True) +class Sigrok(Driver): + """Sigrok driver wrapping sigrok-cli for logic analyzer and oscilloscope support.""" + + driver: str = "demo" + conn: str | None = "auto" + executable: str | None = field(default_factory=find_sigrok_cli) + channels: dict[str, str] = field(default_factory=dict) + + def __post_init__(self): + if hasattr(super(), "__post_init__"): + super().__post_init__() + + def _ensure_executable(self): + """Ensure sigrok-cli is available.""" + if self.executable is None: + raise FileNotFoundError( + "sigrok-cli executable not found in PATH. " + "Please install sigrok-cli to use this driver." + ) + + @classmethod + def client(cls) -> str: + return "jumpstarter_driver_sigrok.client.SigrokClient" + + # --- Public API ----------------------------------------------------- + + @export + def scan(self) -> str: + """List devices for the configured driver.""" + self._ensure_executable() + assert self.executable is not None + cmd = [self.executable, "--driver", self.driver, "--scan"] + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + return result.stdout + + @export + def get_driver_info(self) -> dict: + return { + "driver": self.driver, + "conn": self.conn, + "channels": self.channels, + } + + @export + def get_channel_map(self) -> dict[str, str]: + return self.channels + + @export + def list_output_formats(self) -> list[str]: + return OutputFormat.all() + + @export + def capture(self, config: CaptureConfig | dict) -> dict: + """One-shot capture; returns dict with base64-encoded binary data.""" + self._ensure_executable() + cfg = CaptureConfig.model_validate(config) + cmd, outfile, tmpdir = self._build_capture_command(cfg) + + try: + self.logger.debug("Running sigrok-cli: %s", " ".join(cmd)) + subprocess.run(cmd, check=True) + + data = outfile.read_bytes() + # Return as dict with base64-encoded data (reliable for JSON transport) + return { + "data_b64": b64encode(data).decode("ascii"), + "output_format": cfg.output_format, + "sample_rate": cfg.sample_rate, + "channel_map": self.channels, + "triggers": cfg.triggers, + "decoders": [d.model_dump() for d in cfg.decoders] if cfg.decoders else None, + } + finally: + tmpdir.cleanup() + + @export + async def capture_stream(self, config: CaptureConfig | dict): + """Streaming capture; yields chunks of binary data from sigrok-cli stdout.""" + self._ensure_executable() + cfg = CaptureConfig.model_validate(config) + cmd = self._build_stream_command(cfg) + + self.logger.debug("streaming sigrok-cli: %s", " ".join(cmd)) + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + try: + if process.stdout is None: + raise RuntimeError("sigrok-cli stdout not available") + + # Stream data in chunks + while True: + chunk = await process.stdout.read(4096) + if not chunk: + break + yield chunk + finally: + process.terminate() + try: + await asyncio.wait_for(process.wait(), timeout=5) + except asyncio.TimeoutError: + process.kill() + + # --- Command builders ----------------------------------------------- + + def _build_capture_command(self, cfg: CaptureConfig) -> tuple[list[str], Path, TemporaryDirectory]: + tmpdir = TemporaryDirectory() + outfile = Path(tmpdir.name) / f"capture.{cfg.output_format}" + + cmd: list[str] = self._base_driver_args() + cmd += self._channel_args(cfg.channels) + cmd += self._config_args(cfg) + cmd += self._trigger_args(cfg) + cmd += self._decoder_args(cfg) + cmd += ["-O", cfg.output_format, "-o", str(outfile)] + + return cmd, outfile, tmpdir + + def _build_stream_command(self, cfg: CaptureConfig) -> list[str]: + cmd: list[str] = self._base_driver_args() + cmd += self._channel_args(cfg.channels) + cmd += self._config_args(cfg, continuous=True) + cmd += self._trigger_args(cfg) + cmd += self._decoder_args(cfg) + cmd += ["-O", cfg.output_format, "-o", "-"] + return cmd + + def _base_driver_args(self) -> list[str]: + assert self.executable is not None + if self.conn and self.conn != "auto": + return [self.executable, "-d", f"{self.driver}:conn={self.conn}"] + return [self.executable, "-d", self.driver] + + def _channel_args(self, selected_names: list[str] | None) -> list[str]: + """Build channel selection/renaming args for sigrok-cli. + + Args: + selected_names: Optional list of semantic names to include + + Returns: + List of args like ["-C", "D0=vcc,D1=cs,D2=miso"] + """ + if not self.channels: + return [] + + # Filter channels if specific names requested + if selected_names: + selected_lower = {name.lower() for name in selected_names} + filtered = {dev: user for dev, user in self.channels.items() if user.lower() in selected_lower} + else: + filtered = self.channels + + # Build channel map: device_name=user_name + channel_map = ",".join(f"{dev}={user}" for dev, user in filtered.items()) + return ["-C", channel_map] if channel_map else [] + + def _config_args(self, cfg: CaptureConfig, *, continuous: bool = False) -> list[str]: + parts = [f"samplerate={cfg.sample_rate}"] + if cfg.pretrigger is not None: + parts.append(f"pretrigger={cfg.pretrigger}") + + args: list[str] = [] + if parts: + args += ["-c", ",".join(parts)] + + # sigrok-cli requires one of: --samples, --frames, --time, or --continuous + # If samples is explicitly specified, use that even for streaming + if cfg.samples is not None: + args.extend(["--samples", str(cfg.samples)]) + elif continuous: + args.append("--continuous") + else: + # Default to 1000 samples if not specified + args.extend(["--samples", "1000"]) + + return args + + def _trigger_args(self, cfg: CaptureConfig) -> list[str]: + if not cfg.triggers: + return [] + trigger_parts = [] + for channel, condition in cfg.triggers.items(): + resolved = self._resolve_channel(channel) + trigger_parts.append(f"{resolved}={condition}") + return ["--triggers", ",".join(trigger_parts)] + + def _decoder_args(self, cfg: CaptureConfig) -> list[str]: + if not cfg.decoders: + return [] + + args: list[str] = [] + for decoder in self._flatten_decoders(cfg.decoders): + pin_map = self._resolve_decoder_channels(decoder) + segments = [decoder.name] + + for pin_name, channel_name in pin_map.items(): + segments.append(f"{pin_name}={self._resolve_channel(channel_name)}") + + if decoder.options: + for key, value in decoder.options.items(): + segments.append(f"{key}={value}") + + args += ["-P", ":".join(segments)] + + if decoder.annotations: + args += ["-A", f"{decoder.name}=" + ",".join(decoder.annotations)] + + return args + + def _flatten_decoders(self, decoders: list[DecoderConfig]) -> list[DecoderConfig]: + flat: list[DecoderConfig] = [] + for decoder in decoders: + flat.append(decoder) + if decoder.stack: + flat.extend(self._flatten_decoders(decoder.stack)) + return flat + + def _resolve_decoder_channels(self, decoder: DecoderConfig) -> dict[str, str]: + if decoder.channels: + return decoder.channels + + # Best-effort auto-mapping based on common decoder pin names + defaults = { + "spi": ["clk", "mosi", "miso", "cs"], + "i2c": ["scl", "sda"], + "uart": ["rx", "tx"], + } + pins = defaults.get(decoder.name.lower()) + if not pins: + return {} + + resolved: dict[str, str] = {} + available_lower = {name.lower(): name for name in self.channels.values()} + for pin in pins: + if pin in available_lower: + resolved[pin] = available_lower[pin] + return resolved + + def _resolve_channel(self, name_or_dn: str) -> str: + """Resolve a user-friendly channel name to device channel name. + + Args: + name_or_dn: User-friendly name (e.g., "clk", "mosi") or device name (e.g., "D0") + + Returns: + Device channel name (e.g., "D0", "D1") + """ + candidate = name_or_dn.strip() + + # If already a device channel name, return as-is + if candidate in self.channels: + return candidate + + # Search for user-friendly name in channel values + for dev_name, user_name in self.channels.items(): + if user_name.lower() == candidate.lower(): + return dev_name + + raise ValueError(f"Channel '{name_or_dn}' not found in channel map {self.channels}") diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py new file mode 100644 index 000000000..087490e61 --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py @@ -0,0 +1,495 @@ +from shutil import which + +import pytest + +from .common import CaptureConfig, CaptureResult, OutputFormat +from .driver import Sigrok +from jumpstarter.common.utils import serve + +# Skip all integration tests if sigrok-cli is not available +pytestmark = pytest.mark.skipif( + which("sigrok-cli") is None, + reason="sigrok-cli not found in PATH" +) + + +@pytest.fixture +def demo_driver_instance(): + """Create a Sigrok driver instance configured for the demo device.""" + # Demo driver has 8 digital channels (D0-D7) and 5 analog (A0-A4) + # Map device channels to decoder-friendly semantic names + return Sigrok( + driver="demo", + channels={ + "D0": "vcc", + "D1": "cs", + "D2": "miso", + "D3": "mosi", + "D4": "clk", + "D5": "sda", + "D6": "scl", + "D7": "gnd", + }, + ) + + +@pytest.fixture +def demo_client(demo_driver_instance): + """Create a client connected to demo driver via serve().""" + with serve(demo_driver_instance) as client: + yield client + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_scan_demo_driver(demo_client): + """Test scanning for demo driver via client.""" + result = demo_client.scan() + assert "demo" in result.lower() or "Demo device" in result + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_capture_with_demo_driver(demo_client): + """Test one-shot capture with demo driver via client. + + This test verifies client-server serialization through serve() pattern. + """ + cfg = CaptureConfig( + sample_rate="100kHz", + samples=100, + output_format="srzip", + ) + + result = demo_client.capture(cfg) + + # Verify we got a proper CaptureResult Pydantic model, not just a dict + assert isinstance(result, CaptureResult), f"Expected CaptureResult, got {type(result)}" + + # Verify model attributes work correctly - data should be bytes, not base64 string! + assert result.data + assert isinstance(result.data, bytes), f"Expected bytes, got {type(result.data)}" + assert len(result.data) > 0 + assert result.output_format == "srzip" + assert result.sample_rate == "100kHz" + assert isinstance(result.channel_map, dict) + assert len(result.channel_map) > 0 + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_capture_default_format(demo_client): + """Test capture with default output format (VCD). + + VCD is the default because it's the most efficient format: + - Only records changes (not every sample) + - Includes precise timing information + - Widely supported by signal analysis tools + """ + # Don't specify output_format - should default to VCD + cfg = CaptureConfig( + sample_rate="100kHz", + samples=50, + channels=["D0", "D1", "D2"], + ) + + result = demo_client.capture(cfg) + + # Verify we got VCD format by default + assert isinstance(result, CaptureResult) + assert result.output_format == OutputFormat.VCD + assert isinstance(result.data, bytes) + assert len(result.data) > 0 + + # Verify VCD data can be decoded + samples = list(result.decode()) + assert isinstance(samples, list) + assert len(samples) > 0 + + # Verify samples have timing information (VCD feature) + for sample in samples: + assert hasattr(sample, "time") + assert isinstance(sample.time, float) + assert hasattr(sample, "values") + assert isinstance(sample.values, dict) + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_capture_csv_format(demo_client): + """Test capture with CSV output format via client.""" + cfg = CaptureConfig( + sample_rate="50kHz", + samples=50, + output_format="csv", + ) + + result = demo_client.capture(cfg) + + # Verify CaptureResult model + assert isinstance(result, CaptureResult) + assert isinstance(result.data, bytes) + + # Decode bytes to string for CSV parsing + csv_text = result.data.decode("utf-8") + + # CSV should have headers and data + assert "vcc" in csv_text or "cs" in csv_text or "clk" in csv_text + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_capture_analog_channels(): + """Test capturing analog data from oscilloscope/demo driver. + + Verifies that the API works for analog channels (oscilloscopes) + as well as digital channels (logic analyzers). + """ + # Create driver with analog channel mappings + analog_driver = Sigrok( + driver="demo", + channels={ + "A0": "voltage_in", + "A1": "sine_wave", + "A2": "square_wave", + }, + ) + + with serve(analog_driver) as client: + cfg = CaptureConfig( + sample_rate="100kHz", + samples=20, + channels=["voltage_in", "sine_wave"], # Select specific analog channels + output_format="csv", + ) + + result = client.capture(cfg) + + # Verify we got analog data + assert isinstance(result, CaptureResult) + assert isinstance(result.data, bytes) + + # Parse CSV to check for analog voltage values + csv_text = result.data.decode("utf-8") + + # Should contain voltage values with units (V, mV) + assert "V" in csv_text or "mV" in csv_text + # Should contain our channel names or original analog channel names + assert "voltage_in" in csv_text or "sine_wave" in csv_text or "A0" in csv_text or "A1" in csv_text + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_capture_with_dict_config(demo_client): + """Test capture with dict config (not CaptureConfig object). + + Verifies that dict configs are properly validated and serialized. + """ + # Pass config as dict instead of CaptureConfig object + cfg_dict = { + "sample_rate": "100kHz", + "samples": 100, + "output_format": "srzip", + } + + result = demo_client.capture(cfg_dict) + + # Verify we still get a proper CaptureResult model + assert isinstance(result, CaptureResult) + assert result.data + assert isinstance(result.data, bytes) + assert len(result.data) > 0 + assert result.output_format == "srzip" + + +@pytest.mark.skip(reason="sigrok-cli demo driver doesn't support streaming to stdout (-o -)") +def test_capture_stream_with_demo(demo_client): + """Test streaming capture with demo driver via client. + + Note: sigrok-cli has limitations with streaming output to stdout. + The demo driver and most output formats don't produce data when using `-o -`. + This feature works better with real hardware and certain output formats. + """ + cfg = CaptureConfig( + sample_rate="100kHz", + samples=1000, + output_format="binary", + ) + + received_bytes = 0 + chunk_count = 0 + + # Collect all chunks + for chunk in demo_client.capture_stream(cfg): + received_bytes += len(chunk) + chunk_count += 1 + + # Should have received some data + assert received_bytes > 0 + assert chunk_count > 0 + + +def test_get_driver_info(demo_client): + """Test getting driver information via client. + + Verifies dict serialization through client-server boundary. + """ + info = demo_client.get_driver_info() + + # Verify it's a dict (not a custom object) + assert isinstance(info, dict) + assert info["driver"] == "demo" + assert "channels" in info + assert isinstance(info["channels"], dict) + + +def test_get_channel_map(demo_client): + """Test getting channel mappings via client. + + Verifies dict serialization through client-server boundary. + """ + channels = demo_client.get_channel_map() + + # Verify it's a dict with proper string keys/values + assert isinstance(channels, dict) + assert all(isinstance(k, str) and isinstance(v, str) for k, v in channels.items()) + assert channels["D0"] == "vcc" + assert channels["D4"] == "clk" + assert channels["D7"] == "gnd" + + +def test_list_output_formats(demo_client): + """Test listing supported output formats via client. + + Verifies list serialization through client-server boundary. + """ + formats = demo_client.list_output_formats() + + # Verify it's a proper list of strings + assert isinstance(formats, list) + assert all(isinstance(f, str) for f in formats) + assert "csv" in formats + assert "srzip" in formats + assert "vcd" in formats + assert "binary" in formats + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_decode_csv_format(demo_client): + """Test decoding CSV format to Sample objects with timing. + + Verifies: + - CSV parsing works through client-server boundary + - Sample objects have timing information + - Values are properly typed (int/float) + """ + from .common import OutputFormat, Sample + + cfg = CaptureConfig( + sample_rate="100kHz", + samples=50, + output_format=OutputFormat.CSV, + channels=["D0", "D1", "D2"], # Select specific channels + ) + + result = demo_client.capture(cfg) + assert isinstance(result, CaptureResult) + + # Decode the CSV data + samples = list(result.decode()) + assert isinstance(samples, list) + assert len(samples) > 0 + + # Verify all samples are Sample objects + for sample in samples: + assert isinstance(sample, Sample) + assert isinstance(sample.sample, int) + assert isinstance(sample.time, float) + assert isinstance(sample.values, dict) + + # Verify timing progresses (1/100kHz = 0.00001s per sample) + assert sample.time == sample.sample * 0.00001 + + # Verify values are present + assert len(sample.values) > 0 + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_decode_ascii_format(demo_client): + """Test decoding ASCII format returns string visualization. + + Verifies: + - ASCII format decoding works + - Returns string (not bytes) + """ + from .common import OutputFormat + + cfg = CaptureConfig( + sample_rate="50kHz", + samples=20, + output_format=OutputFormat.ASCII, + channels=["D0", "D1"], + ) + + result = demo_client.capture(cfg) + decoded = result.decode() + + # ASCII format should return string + assert isinstance(decoded, str) + assert len(decoded) > 0 + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_decode_bits_format(demo_client): + """Test decoding bits format to channel→bit sequences. + + Verifies: + - Bits format decoding works + - Returns dict with bit sequences + - Channel names are mapped from device names (D0) to user-friendly names (vcc) + """ + from .common import OutputFormat + + cfg = CaptureConfig( + sample_rate="100kHz", + samples=30, + output_format=OutputFormat.BITS, + channels=["D0", "D1", "D2"], + ) + + result = demo_client.capture(cfg) + decoded = result.decode() + + # Bits format should return dict + assert isinstance(decoded, dict) + assert len(decoded) > 0 + + # Should have user-friendly channel names (vcc, cs, miso) from channel_map + # Not generic names like CH0, CH1 + assert "vcc" in decoded or "D0" in decoded + assert "cs" in decoded or "D1" in decoded + assert "miso" in decoded or "D2" in decoded + + # Each channel should have a list of bits + for channel, bits in decoded.items(): + assert isinstance(channel, str) + assert isinstance(bits, list) + assert all(b in [0, 1] for b in bits) + # Should have bits (at least some, exact count may vary with demo driver timing) + assert len(bits) > 0 + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_decode_vcd_format(demo_client): + """Test decoding VCD format to Sample objects with timing (changes only). + + Verifies: + - VCD parsing works through client-server boundary + - Sample objects have timing information in nanoseconds + - Only changes are recorded (efficient representation) + """ + from .common import OutputFormat, Sample + + cfg = CaptureConfig( + sample_rate="100kHz", + samples=50, + output_format=OutputFormat.VCD, + channels=["D0", "D1", "D2"], # Select specific channels + ) + + result = demo_client.capture(cfg) + assert isinstance(result, CaptureResult) + + # Decode the VCD data + samples = list(result.decode()) + assert isinstance(samples, list) + assert len(samples) > 0 + + # Verify all samples are Sample objects + for sample in samples: + assert isinstance(sample, Sample) + assert isinstance(sample.sample, int) + assert isinstance(sample.time, float) + assert isinstance(sample.values, dict) + + # VCD only records changes, so each sample should have at least one value + assert len(sample.values) > 0 + + # Values should be integers for digital channels + for _channel, value in sample.values.items(): + assert isinstance(value, int) + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_decode_vcd_analog_channels(demo_client): + """Test decoding VCD with analog channels. + + Verifies: + - Analog values are parsed correctly in VCD format + - Timing information is in nanoseconds + """ + from .common import OutputFormat, Sample + + cfg = CaptureConfig( + sample_rate="100kHz", + samples=30, + output_format=OutputFormat.VCD, + channels=["A0", "A1"], # Analog channels + ) + + result = demo_client.capture(cfg) + samples = list(result.decode()) + + assert isinstance(samples, list) + assert len(samples) > 0 + + # Check that samples have analog values + first_sample = samples[0] + assert isinstance(first_sample, Sample) + assert isinstance(first_sample.time, float) + assert len(first_sample.values) > 0 + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_decode_unsupported_format_raises(demo_client): + """Test that decoding unsupported formats raises NotImplementedError.""" + from .common import OutputFormat + + cfg = CaptureConfig( + sample_rate="100kHz", + samples=10, + output_format=OutputFormat.BINARY, + ) + + result = demo_client.capture(cfg) + + # Binary format should not be decodable + with pytest.raises(NotImplementedError): + result.decode() + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_decode_analog_csv(demo_client): + """Test decoding CSV with analog channels (voltage values). + + Verifies: + - Analog values are parsed as floats + - Timing information is included + """ + from .common import OutputFormat, Sample + + cfg = CaptureConfig( + sample_rate="100kHz", + samples=30, + output_format=OutputFormat.CSV, + channels=["A0", "A1"], # Analog channels + ) + + result = demo_client.capture(cfg) + samples = list(result.decode()) + + assert isinstance(samples, list) + assert len(samples) > 0 + + # Check first sample for analog values + first_sample = samples[0] + assert isinstance(first_sample, Sample) + assert len(first_sample.values) > 0 + + # Analog values should be floats (voltages) + for _channel, value in first_sample.values.items(): + assert isinstance(value, (int, float)) diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd.py new file mode 100644 index 000000000..cd2551906 --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd.py @@ -0,0 +1,225 @@ +"""VCD (Value Change Dump) format parser for sigrok captures.""" + +from __future__ import annotations + +from typing import Iterator + + +def parse_vcd(data: bytes, sample_rate: str) -> Iterator[dict]: + """Parse VCD format to iterator of samples with timing (changes only). + + VCD format only records when signals change, making it efficient for + sparse data. Each sample represents a time point where one or more + signals changed. + + Args: + data: Raw VCD data as bytes + sample_rate: Sample rate string (not used for VCD as it has its own timescale) + + Yields: + Dicts with keys: sample, time (seconds), values + """ + text = data.decode("utf-8") + lines = text.strip().split("\n") + + # Parse VCD header to extract timescale and channel mapping + timescale_multiplier = 1e-9 # Default: 1 unit = 1 ns = 1e-9 seconds + channel_map: dict[str, str] = {} # symbol → channel name + + for line in lines: + line = line.strip() + + # Parse timescale (e.g., "$timescale 1 us $end" means 1 unit = 1000 ns) + if line.startswith("$timescale"): + timescale_multiplier = _parse_timescale(line) + + # Parse variable definitions (e.g., "$var wire 1 ! D0 $end") + if line.startswith("$var"): + parts = line.split() + if len(parts) >= 5: + symbol = parts[3] # e.g., "!" + channel = parts[4] # e.g., "D0" + channel_map[symbol] = channel + + if line == "$enddefinitions $end": + break + + # Parse and yield value changes one by one + sample_idx = 0 + + for line in lines: + line = line.strip() + if not line or line.startswith("$"): + continue + + # Timestamp line (e.g., "#100 1! 0" 1#") + if line.startswith("#"): + sample_data = _parse_vcd_timestamp_line(line, timescale_multiplier, channel_map) + if sample_data is not None: + sample_data["sample"] = sample_idx + yield sample_data + sample_idx += 1 + + +def _parse_timescale(line: str) -> float: + """Parse timescale line and return multiplier to convert to seconds.""" + parts = line.split() + if len(parts) >= 3: + value = parts[1] + unit = parts[2] + # Convert to seconds multiplier + unit_multipliers = {"s": 1.0, "ms": 1e-3, "us": 1e-6, "ns": 1e-9, "ps": 1e-12} + return float(value) * unit_multipliers.get(unit, 1.0) + return 1.0 + + +def _parse_vcd_timestamp_line(line: str, timescale_multiplier: float, channel_map: dict[str, str]) -> dict | None: + """Parse a VCD timestamp line with value changes. + + Args: + line: Line starting with # (e.g., "#100 1! 0" 1#") + timescale_multiplier: Multiplier to convert time units to seconds + channel_map: Mapping from VCD symbols to channel names + + Returns: + Dict with time (seconds) and values, or None if line is empty + """ + # Split timestamp from values + parts = line.split(maxsplit=1) + time_str = parts[0][1:] # Remove '#' prefix + + # Skip empty time lines + if not time_str: + return None + + time_units = int(time_str) + current_time_s = time_units * timescale_multiplier + current_values: dict[str, int | float] = {} + + # Parse value changes if present on the same line + if len(parts) > 1: + values_str = parts[1] + _parse_vcd_value_changes(values_str, channel_map, current_values) + + # Return sample data if we have values + if current_values: + return {"time": current_time_s, "values": current_values} + + return None + + +def _parse_vcd_value_changes(values_str: str, channel_map: dict[str, str], current_values: dict[str, int | float]): + """Parse value change tokens from a VCD line. + + Modifies current_values dict in place. + + Supports: + - Single-bit: "1!", "0abc" + - Binary: "b11110000 abc" + - Real: "r3.14159 xyz", "r-10.5 !", "r1.23e-5 aa" + """ + i = 0 + while i < len(values_str): + char = values_str[i] + + # Single bit change (e.g., "1!", "0abc" for multi-char identifiers) + if char in "01xzXZ": + symbol, new_i = _extract_symbol(values_str, i + 1) + if symbol in channel_map: + channel = channel_map[symbol] + current_values[channel] = 1 if char == "1" else 0 + i = new_i + + # Binary value (e.g., "b1010 !" or "b1010 abc") + elif char == "b": + value, symbol, new_i = _parse_binary_value(values_str, i, channel_map) + if symbol and value is not None: + current_values[channel_map[symbol]] = value + i = new_i + + # Real (analog) value (e.g., "r3.14 !" or "r-10.5 abc") + elif char == "r": + value, symbol, new_i = _parse_real_value(values_str, i, channel_map) + if symbol and value is not None: + current_values[channel_map[symbol]] = value + i = new_i + + # Skip whitespace + elif char == " ": + i += 1 + else: + i += 1 + + +def _extract_symbol(text: str, start: int) -> tuple[str, int]: + """Extract a VCD symbol (can be multi-character) from text. + + Returns: + Tuple of (symbol, next_position) + """ + end = start + while end < len(text) and text[end] != " ": + end += 1 + return text[start:end], end + + +def _parse_binary_value(values_str: str, start: int, channel_map: dict[str, str]) -> tuple[int | None, str | None, int]: + """Parse a binary value like "b1010 abc". + + Returns: + Tuple of (value, symbol, next_position) + """ + # Extract binary value + value_start = start + 1 + value_end = value_start + while value_end < len(values_str) and values_str[value_end] in "01xzXZ": + value_end += 1 + binary_value = values_str[value_start:value_end] + + # Skip whitespace before symbol + while value_end < len(values_str) and values_str[value_end] == " ": + value_end += 1 + + # Extract symbol + symbol, next_pos = _extract_symbol(values_str, value_end) + + if symbol in channel_map: + try: + return int(binary_value, 2), symbol, next_pos + except ValueError: + return 0, symbol, next_pos + + return None, None, next_pos + + +def _parse_real_value(values_str: str, start: int, channel_map: dict[str, str]) -> tuple[float | None, str | None, int]: + """Parse a real (analog) value like "r3.14 abc" or "r-10.5 !". + + Returns: + Tuple of (value, symbol, next_position) + """ + # Extract real value (number with optional sign, decimal, exponent) + value_start = start + 1 + value_end = value_start + while value_end < len(values_str) and values_str[value_end] not in " ": + if values_str[value_end] in "0123456789-.eE+": + value_end += 1 + else: + break + real_value = values_str[value_start:value_end] + + # Skip whitespace before symbol + while value_end < len(values_str) and values_str[value_end] == " ": + value_end += 1 + + # Extract symbol + symbol, next_pos = _extract_symbol(values_str, value_end) + + if symbol in channel_map: + try: + return float(real_value), symbol, next_pos + except ValueError: + return 0.0, symbol, next_pos + + return None, None, next_pos + diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd_test.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd_test.py new file mode 100644 index 000000000..bf0d829c4 --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd_test.py @@ -0,0 +1,217 @@ +"""Tests for VCD (Value Change Dump) format parser.""" + +from base64 import b64encode + +from .common import CaptureResult, OutputFormat, Sample + + +def test_vcd_parser_comprehensive(): + """Test VCD parser with manually constructed VCD data covering all features. + + This test validates: + - Single-character identifiers (!, ", #) + - Multi-character identifiers (aa, ab, abc) + - Timescale parsing (microseconds to nanoseconds) + - Single-bit values (0/1) + - X/Z state handling + - Binary values (vectors) + - Real (analog) values with various formats + """ + # Construct a comprehensive VCD file + vcd_content = """$date Mon Dec 8 2025 $end +$version Test VCD Generator $end +$timescale 1 us $end +$scope module test $end +$var wire 1 ! D0 $end +$var wire 1 " D1 $end +$var wire 1 # D2 $end +$var wire 1 aa CH95 $end +$var wire 1 ab CH96 $end +$var wire 8 abc BUS0 $end +$var real 1 xyz ANALOG0 $end +$upscope $end +$enddefinitions $end +#0 1! 0" 1# 0aa 1ab b00001111 abc r-10.5 xyz +#5 0! 1" x# 1aa +#10 z! 0" 1# b11110000 abc r3.14159 xyz +#25 1! 1" 0# 0aa 0ab b10101010 abc r0.0 xyz +#100 0! 0" 0# r1.23e-5 xyz +""" + + # Create a CaptureResult with this VCD data (no channel mapping for parser test) + result = CaptureResult( + data_b64=b64encode(vcd_content.encode("utf-8")).decode("ascii"), + output_format=OutputFormat.VCD, + sample_rate="1MHz", + channel_map={}, + triggers=None, + decoders=None, + ) + + # Parse the VCD + samples = list(result.decode()) + + # Verify we got the expected number of samples + assert len(samples) == 5 + + # Sample 0 at time 0us = 0s + s0 = samples[0] + assert s0.time == 0.0 + # Channel names come directly from VCD (not mapped) + assert s0.values["D0"] == 1 + assert s0.values["D1"] == 0 + assert s0.values["D2"] == 1 + assert s0.values["CH95"] == 0 # Multi-char identifier "aa" + assert s0.values["CH96"] == 1 # Multi-char identifier "ab" + assert s0.values["BUS0"] == 0b00001111 # Binary value + assert abs(s0.values["ANALOG0"] - (-10.5)) < 0.001 # Real value + + # Sample 1 at time 5us = 0.000005s + s1 = samples[1] + assert abs(s1.time - 0.000005) < 1e-12 + assert s1.values["D0"] == 0 + assert s1.values["D1"] == 1 + assert s1.values["D2"] == 0 # X converted to 0 + assert s1.values["CH95"] == 1 + + # Sample 2 at time 10us = 0.00001s + s2 = samples[2] + assert abs(s2.time - 0.00001) < 1e-12 + assert s2.values["D0"] == 0 # Z converted to 0 + assert s2.values["D1"] == 0 + assert s2.values["D2"] == 1 + assert s2.values["BUS0"] == 0b11110000 + assert abs(s2.values["ANALOG0"] - 3.14159) < 0.001 + + # Sample 3 at time 25us = 0.000025s + s3 = samples[3] + assert abs(s3.time - 0.000025) < 1e-12 + assert s3.values["D0"] == 1 + assert s3.values["D1"] == 1 + assert s3.values["D2"] == 0 + assert s3.values["CH95"] == 0 + assert s3.values["CH96"] == 0 + assert s3.values["BUS0"] == 0b10101010 + assert abs(s3.values["ANALOG0"] - 0.0) < 0.001 + + # Sample 4 at time 100us = 0.0001s + s4 = samples[4] + assert abs(s4.time - 0.0001) < 1e-12 + assert s4.values["D0"] == 0 + assert s4.values["D1"] == 0 + assert s4.values["D2"] == 0 + assert abs(s4.values["ANALOG0"] - 1.23e-5) < 1e-10 # Scientific notation + + +def test_vcd_parser_timescale_variations(): + """Test VCD parser with different timescale values.""" + # Test different timescales + test_cases = [ + ("1 ns", 1, 0), # 1ns timescale, time 0 = 0ns + ("1 us", 1000, 0), # 1us timescale, time 0 = 0ns + ("1 ms", 1000000, 0), # 1ms timescale, time 0 = 0ns + ("10 ns", 10, 100 * 10), # 10ns timescale, time 100 = 1000ns + ("100 ns", 100, 50 * 100), # 100ns timescale, time 50 = 5000ns + ] + + for timescale_str, _multiplier, expected_time_ns in test_cases: + vcd_content = f"""$timescale {timescale_str} $end +$var wire 1 ! D0 $end +$enddefinitions $end +#0 1! +#{100 if expected_time_ns else 0} 0! +""" + result = CaptureResult( + data_b64=b64encode(vcd_content.encode("utf-8")).decode("ascii"), + output_format=OutputFormat.VCD, + sample_rate="1MHz", + channel_map={}, + ) + + samples = list(result.decode()) + assert len(samples) >= 1 + # First sample at time 0 + assert samples[0].time == 0.0 + + +def test_vcd_parser_empty_timestamps(): + """Test VCD parser handles empty timestamp lines correctly.""" + vcd_content = """$timescale 1 ns $end +$var wire 1 ! D0 $end +$enddefinitions $end +#0 1! +#10 0! +# +#20 1! +""" + + result = CaptureResult( + data_b64=b64encode(vcd_content.encode("utf-8")).decode("ascii"), + output_format=OutputFormat.VCD, + sample_rate="1MHz", + channel_map={}, + ) + + samples = list(result.decode()) + # Should have 3 samples (empty timestamp line skipped) + assert len(samples) == 3 + assert samples[0].time == 0.0 + assert samples[1].time == 1e-8 # 10ns + assert samples[2].time == 2e-8 # 20ns + + +def test_vcd_parser_large_channel_count(): + """Test VCD parser with large channel counts using multi-char identifiers. + + According to libsigrok vcd_identifier(): + - Channels 0-93: Single char (!, ", ..., ~) + - Channels 94-769: Two lowercase letters (aa, ab, ..., zz) + - Channels 770+: Three lowercase letters (aaa, aab, ...) + """ + # Test identifiers at boundaries + vcd_content = """$timescale 1 ns $end +$var wire 1 ! CH0 $end +$var wire 1 ~ CH93 $end +$var wire 1 aa CH94 $end +$var wire 1 ab CH95 $end +$var wire 1 zz CH769 $end +$var wire 1 aaa CH770 $end +$var wire 1 abc CH800 $end +$enddefinitions $end +#0 1! 0~ 1aa 0ab 1zz 0aaa 1abc +#100 0! 1~ 0aa 1ab 0zz 1aaa 0abc +""" + + result = CaptureResult( + data_b64=b64encode(vcd_content.encode("utf-8")).decode("ascii"), + output_format=OutputFormat.VCD, + sample_rate="1MHz", + channel_map={}, + ) + + samples = list(result.decode()) + + # Verify first sample (channel names come directly from VCD) + assert len(samples) == 2 + s0 = samples[0] + assert isinstance(s0, Sample) + assert s0.time == 0.0 + assert s0.values["CH0"] == 1 # Single char: ! + assert s0.values["CH93"] == 0 # Single char: ~ + assert s0.values["CH94"] == 1 # Two char: aa + assert s0.values["CH95"] == 0 # Two char: ab + assert s0.values["CH769"] == 1 # Two char: zz + assert s0.values["CH770"] == 0 # Three char: aaa + assert s0.values["CH800"] == 1 # Three char: abc + + # Verify second sample + s1 = samples[1] + assert abs(s1.time - 1e-7) < 1e-15 # 100ns + assert s1.values["CH0"] == 0 + assert s1.values["CH93"] == 1 + assert s1.values["CH94"] == 0 + assert s1.values["CH95"] == 1 + assert s1.values["CH769"] == 0 + assert s1.values["CH770"] == 1 + assert s1.values["CH800"] == 0 + diff --git a/packages/jumpstarter-driver-sigrok/pyproject.toml b/packages/jumpstarter-driver-sigrok/pyproject.toml new file mode 100644 index 000000000..f6cd63aa4 --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/pyproject.toml @@ -0,0 +1,42 @@ +[project] +name = "jumpstarter-driver-sigrok" +dynamic = ["version", "urls"] +description = "Jumpstarter driver wrapping sigrok-cli for logic analyzer and oscilloscope support" +readme = "README.md" +license = "Apache-2.0" +authors = [ + { name = "Miguel Angel Ajo Pelayo", email = "miguelangel@ajo.es" } +] +requires-python = ">=3.11" +dependencies = [ + "jumpstarter", +] + +[tool.hatch.version] +source = "vcs" +raw-options = { 'root' = '../../'} + +[tool.hatch.metadata.hooks.vcs.urls] +Homepage = "https://jumpstarter.dev" +source_archive = "https://github.com/jumpstarter-dev/repo/archive/{commit_hash}.zip" + +[tool.pytest.ini_options] +addopts = "--cov --cov-report=html --cov-report=xml" +log_cli = true +log_cli_level = "INFO" +testpaths = ["jumpstarter_driver_sigrok"] +asyncio_default_fixture_loop_scope = "function" + +[build-system] +requires = ["hatchling", "hatch-vcs", "hatch-pin-jumpstarter"] +build-backend = "hatchling.build" + +[tool.hatch.build.hooks.pin_jumpstarter] +name = "pin_jumpstarter" + +[dependency-groups] +dev = [ + "pytest-cov>=6.0.0", + "pytest>=8.3.3", + "pytest-asyncio>=0.24.0", +] diff --git a/uv.lock b/uv.lock index 99fa450fa..495fdc1b7 100644 --- a/uv.lock +++ b/uv.lock @@ -31,6 +31,7 @@ members = [ "jumpstarter-driver-ridesx", "jumpstarter-driver-sdwire", "jumpstarter-driver-shell", + "jumpstarter-driver-sigrok", "jumpstarter-driver-snmp", "jumpstarter-driver-ssh", "jumpstarter-driver-ssh-mitm", @@ -2073,6 +2074,30 @@ dev = [ { name = "pytest-cov", specifier = ">=6.0.0" }, ] +[[package]] +name = "jumpstarter-driver-sigrok" +source = { editable = "packages/jumpstarter-driver-sigrok" } +dependencies = [ + { name = "jumpstarter" }, +] + +[package.dev-dependencies] +dev = [ + { name = "pytest" }, + { name = "pytest-asyncio" }, + { name = "pytest-cov" }, +] + +[package.metadata] +requires-dist = [{ name = "jumpstarter", editable = "packages/jumpstarter" }] + +[package.metadata.requires-dev] +dev = [ + { name = "pytest", specifier = ">=8.3.3" }, + { name = "pytest-asyncio", specifier = ">=0.24.0" }, + { name = "pytest-cov", specifier = ">=6.0.0" }, +] + [[package]] name = "jumpstarter-driver-snmp" source = { editable = "packages/jumpstarter-driver-snmp" }