From 9a1cf0eb5afa0507f890ffa119b86c5a0343382d Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Sun, 7 Dec 2025 22:53:43 +0100 Subject: [PATCH 01/10] sigrok: initial sigrok driver --- .../reference/package-apis/drivers/index.md | 3 + .../reference/package-apis/drivers/sigrok.md | 1 + packages/jumpstarter-driver-sigrok/.gitignore | 3 + packages/jumpstarter-driver-sigrok/README.md | 110 +++++++ .../examples/exporter.yaml | 21 ++ .../jumpstarter_driver_sigrok/__init__.py | 5 + .../jumpstarter_driver_sigrok/client.py | 37 +++ .../jumpstarter_driver_sigrok/common.py | 49 ++++ .../jumpstarter_driver_sigrok/driver.py | 277 ++++++++++++++++++ .../jumpstarter_driver_sigrok/driver_test.py | 225 ++++++++++++++ .../jumpstarter-driver-sigrok/pyproject.toml | 42 +++ 11 files changed, 773 insertions(+) create mode 120000 docs/source/reference/package-apis/drivers/sigrok.md create mode 100644 packages/jumpstarter-driver-sigrok/.gitignore create mode 100644 packages/jumpstarter-driver-sigrok/README.md create mode 100644 packages/jumpstarter-driver-sigrok/examples/exporter.yaml create mode 100644 packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/__init__.py create mode 100644 packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/client.py create mode 100644 packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py create mode 100644 packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py create mode 100644 packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py create mode 100644 packages/jumpstarter-driver-sigrok/pyproject.toml diff --git a/docs/source/reference/package-apis/drivers/index.md b/docs/source/reference/package-apis/drivers/index.md index 8286663bb..71a539ea7 100644 --- a/docs/source/reference/package-apis/drivers/index.md +++ b/docs/source/reference/package-apis/drivers/index.md @@ -71,6 +71,8 @@ Drivers for debugging and programming devices: * **[QEMU](qemu.md)** (`jumpstarter-driver-qemu`) - QEMU virtualization platform * **[Corellium](corellium.md)** (`jumpstarter-driver-corellium`) - Corellium virtualization platform +* **[Sigrok](sigrok.md)** (`jumpstarter-driver-sigrok`) - Logic analyzer and + oscilloscope support via sigrok-cli * **[U-Boot](uboot.md)** (`jumpstarter-driver-uboot`) - Universal Bootloader interface * **[RideSX](ridesx.md)** (`jumpstarter-driver-ridesx`) - Flashing and power management for Qualcomm RideSX devices @@ -104,6 +106,7 @@ gpiod.md ridesx.md sdwire.md shell.md +sigrok.md ssh.md snmp.md tasmota.md diff --git a/docs/source/reference/package-apis/drivers/sigrok.md b/docs/source/reference/package-apis/drivers/sigrok.md new file mode 120000 index 000000000..979eeb03f --- /dev/null +++ b/docs/source/reference/package-apis/drivers/sigrok.md @@ -0,0 +1 @@ +../../../../../packages/jumpstarter-driver-sigrok/README.md \ No newline at end of file diff --git a/packages/jumpstarter-driver-sigrok/.gitignore b/packages/jumpstarter-driver-sigrok/.gitignore new file mode 100644 index 000000000..cbc5d672b --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/.gitignore @@ -0,0 +1,3 @@ +__pycache__/ +.coverage +coverage.xml diff --git a/packages/jumpstarter-driver-sigrok/README.md b/packages/jumpstarter-driver-sigrok/README.md new file mode 100644 index 000000000..4c1a75b35 --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/README.md @@ -0,0 +1,110 @@ +# Sigrok Driver + +`jumpstarter-driver-sigrok` wraps `sigrok-cli` to provide logic analyzer and oscilloscope capture from Jumpstarter exporters. It supports: +- **Logic analyzers** (digital channels) - with protocol decoding (SPI, I2C, UART, etc.) +- **Oscilloscopes** (analog channels) - voltage waveform capture +- One-shot and streaming capture +- Decoder-friendly channel mappings +- Real-time protocol decoding + +## Installation + +```shell +pip3 install --extra-index-url https://pkg.jumpstarter.dev/simple/ jumpstarter-driver-sigrok +``` + +## Configuration (exporter) + +```yaml +export: + sigrok: + type: jumpstarter_driver_sigrok.driver.Sigrok + driver: demo # sigrok driver (demo, fx2lafw, etc.) + conn: null # optional: USB VID.PID or serial path + executable: null # optional: path to sigrok-cli (auto-detected) + channels: # channel mappings (device_name: semantic_name) + D0: vcc + D1: cs + D2: miso + D3: mosi + D4: clk + D5: sda + D6: scl +``` + +## CaptureConfig (client-side) + +```python +from jumpstarter_driver_sigrok.common import CaptureConfig, DecoderConfig + +config = CaptureConfig( + sample_rate="8MHz", + samples=20000, + pretrigger=5000, + triggers={"cs": "falling"}, + decoders=[ + DecoderConfig( + name="spi", + channels={"clk": "clk", "mosi": "mosi", "miso": "miso", "cs": "cs"}, + annotations=["mosi-data"], + ) + ], +) +``` + +This maps to: +```bash +sigrok-cli -d fx2lafw -c samplerate=8MHz,samples=20000,pretrigger=5000 --triggers D1=falling \ + -P spi:clk=D4:mosi=D3:miso=D2:cs=D1 -A spi=mosi-data +``` + +## Client API + +- `scan()` — list devices for the configured driver +- `capture(config)` — one-shot capture, returns `CaptureResult` with base64 data +- `capture_stream(config)` — streaming capture via `--continuous` +- `get_driver_info()` — driver, conn, channel map +- `get_channel_map()` — device-to-semantic name mappings +- `list_output_formats()` — supported formats (csv, srzip, vcd, binary, bits, ascii) + +## Examples + +### Logic Analyzer (Digital Channels) + +One-shot with trigger: +```bash +sigrok-cli -d fx2lafw -c samplerate=8MHz,samples=20000,pretrigger=5000 --triggers D0=rising -o out.sr +``` + +Real-time decode (SPI): +```bash +sigrok-cli -d fx2lafw -c samplerate=1M --continuous \ + -P spi:clk=D4:mosi=D3:miso=D2:cs=D1 -A spi=mosi-data +``` + +### Oscilloscope (Analog Channels) + +```yaml +export: + oscilloscope: + type: jumpstarter_driver_sigrok.driver.Sigrok + driver: rigol-ds # or demo for testing + conn: usb # or serial path + channels: + A0: CH1 + A1: CH2 +``` + +```python +from jumpstarter_driver_sigrok.common import CaptureConfig + +# Capture analog waveforms +config = CaptureConfig( + sample_rate="1MHz", + samples=10000, + channels=["CH1", "CH2"], # Analog channels + output_format="csv", # or "vcd" for waveform viewers +) +result = client.capture(config) +waveform_data = result.data # bytes with voltage measurements +``` diff --git a/packages/jumpstarter-driver-sigrok/examples/exporter.yaml b/packages/jumpstarter-driver-sigrok/examples/exporter.yaml new file mode 100644 index 000000000..847a99b29 --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/examples/exporter.yaml @@ -0,0 +1,21 @@ +apiVersion: jumpstarter.dev/v1alpha1 +kind: ExporterConfig +metadata: + namespace: default + name: demo +endpoint: grpc.jumpstarter.192.168.0.203.nip.io:8082 +token: "" +export: + sigrok: + type: jumpstarter_driver_sigrok.driver.Sigrok + driver: demo + conn: null + channels: + D0: vcc + D1: cs + D2: miso + D3: mosi + D4: clk + D5: sda + D6: scl + diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/__init__.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/__init__.py new file mode 100644 index 000000000..106d87ae4 --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/__init__.py @@ -0,0 +1,5 @@ +from jumpstarter_driver_sigrok.common import CaptureConfig, CaptureResult, DecoderConfig +from jumpstarter_driver_sigrok.driver import Sigrok + +__all__ = ["Sigrok", "CaptureConfig", "CaptureResult", "DecoderConfig"] + diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/client.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/client.py new file mode 100644 index 000000000..1e1dc9d20 --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/client.py @@ -0,0 +1,37 @@ +from dataclasses import dataclass + +from .common import CaptureConfig, CaptureResult +from jumpstarter.client import DriverClient + + +@dataclass(kw_only=True) +class SigrokClient(DriverClient): + """Client methods for the Sigrok driver.""" + + def scan(self) -> str: + return self.call("scan") + + def capture(self, config: CaptureConfig | dict) -> CaptureResult: + return CaptureResult.model_validate(self.call("capture", config)) + + def capture_stream(self, config: CaptureConfig | dict): + """Stream capture data from sigrok-cli. + + Args: + config: CaptureConfig or dict with capture parameters + + Yields: + bytes: Chunks of captured data + """ + for chunk in self.streamingcall("capture_stream", config): + yield chunk + + def get_driver_info(self) -> dict: + return self.call("get_driver_info") + + def get_channel_map(self) -> dict: + return self.call("get_channel_map") + + def list_output_formats(self) -> list[str]: + return self.call("list_output_formats") + diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py new file mode 100644 index 000000000..cc0110e15 --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +from typing import Any + +from pydantic import BaseModel, Field + + +class DecoderConfig(BaseModel): + """Protocol decoder configuration (real-time during capture).""" + + name: str + channels: dict[str, str] | None = None + options: dict[str, Any] | None = None + annotations: list[str] | None = None + stack: list["DecoderConfig"] | None = None + + +class CaptureConfig(BaseModel): + sample_rate: str = Field(default="1M", description="e.g., 8MHz, 1M, 24000000") + samples: int | None = Field(default=None, description="number of samples; None for continuous") + pretrigger: int | None = Field(default=None, description="samples before trigger") + triggers: dict[str, str] | None = Field(default=None, description="e.g., {'D0': 'rising'}") + channels: list[str] | None = Field(default=None, description="override default channels by name") + output_format: str = Field( + default="srzip", + description="csv, srzip, vcd, binary, bits, ascii", + ) + decoders: list[DecoderConfig] | None = Field(default=None, description="real-time protocol decoding") + + +class CaptureResult(BaseModel): + """Result from a capture operation. + + Note: data is base64-encoded for reliable JSON transport. Client methods + automatically decode it to bytes for you. + """ + data_b64: str # Base64-encoded binary data + output_format: str + sample_rate: str + channel_map: dict[str, str] + triggers: dict[str, str] | None = None + decoders: list[DecoderConfig] | None = None + + @property + def data(self) -> bytes: + """Get the captured data as bytes (auto-decodes from base64).""" + from base64 import b64decode + return b64decode(self.data_b64) + diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py new file mode 100644 index 000000000..92081b5c6 --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py @@ -0,0 +1,277 @@ +from __future__ import annotations + +import asyncio +import subprocess +from base64 import b64encode +from dataclasses import dataclass, field +from pathlib import Path +from shutil import which +from tempfile import TemporaryDirectory + +from .common import CaptureConfig, DecoderConfig +from jumpstarter.driver import Driver, export + + +def find_sigrok_cli() -> str: + executable = which("sigrok-cli") + if executable is None: + raise FileNotFoundError("sigrok-cli executable not found in PATH") + return executable + + +def _default_channel_map() -> dict[str, str]: + # Decoder-friendly default names for demo driver + # Maps device channel name -> semantic name + return {"D0": "vcc", "D1": "cs", "D2": "miso", "D3": "mosi", "D4": "clk", "D5": "sda", "D6": "scl"} + + +@dataclass(kw_only=True) +class Sigrok(Driver): + """Sigrok driver wrapping sigrok-cli for logic analyzer and oscilloscope support.""" + + driver: str = "demo" + conn: str | None = None + executable: str = field(default_factory=find_sigrok_cli) + channels: dict[str, str] = field(default_factory=_default_channel_map) + + def __post_init__(self): + if hasattr(super(), "__post_init__"): + super().__post_init__() + + @classmethod + def client(cls) -> str: + return "jumpstarter_driver_sigrok.client.SigrokClient" + + # --- Public API ----------------------------------------------------- + + @export + def scan(self) -> str: + """List devices for the configured driver.""" + cmd = [self.executable, "--driver", self.driver, "--scan"] + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + return result.stdout + + @export + def get_driver_info(self) -> dict: + return { + "driver": self.driver, + "conn": self.conn, + "channels": self.channels, + } + + @export + def get_channel_map(self) -> dict[int, str]: + return self.channels + + @export + def list_output_formats(self) -> list[str]: + return ["csv", "srzip", "vcd", "binary", "bits", "ascii"] + + @export + def capture(self, config: CaptureConfig | dict) -> dict: + """One-shot capture; returns dict with base64-encoded binary data.""" + cfg = CaptureConfig.model_validate(config) + cmd, outfile, tmpdir = self._build_capture_command(cfg) + + try: + self.logger.debug("running sigrok-cli: %s", " ".join(cmd)) + subprocess.run(cmd, check=True) + + data = outfile.read_bytes() + # Return as dict with base64-encoded data (reliable for JSON transport) + return { + "data_b64": b64encode(data).decode("ascii"), + "output_format": cfg.output_format, + "sample_rate": cfg.sample_rate, + "channel_map": self.channels, + "triggers": cfg.triggers, + "decoders": [d.model_dump() for d in cfg.decoders] if cfg.decoders else None, + } + finally: + tmpdir.cleanup() + + @export + async def capture_stream(self, config: CaptureConfig | dict): + """Streaming capture; yields chunks of binary data from sigrok-cli stdout.""" + cfg = CaptureConfig.model_validate(config) + cmd = self._build_stream_command(cfg) + + self.logger.debug("streaming sigrok-cli: %s", " ".join(cmd)) + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + try: + if process.stdout is None: + raise RuntimeError("sigrok-cli stdout not available") + + # Stream data in chunks + while True: + chunk = await process.stdout.read(4096) + if not chunk: + break + yield chunk + finally: + process.terminate() + try: + await asyncio.wait_for(process.wait(), timeout=5) + except asyncio.TimeoutError: + process.kill() + + # --- Command builders ----------------------------------------------- + + def _build_capture_command(self, cfg: CaptureConfig) -> tuple[list[str], Path, TemporaryDirectory]: + tmpdir = TemporaryDirectory() + outfile = Path(tmpdir.name) / f"capture.{cfg.output_format}" + + cmd: list[str] = self._base_driver_args() + cmd += self._channel_args(cfg.channels) + cmd += self._config_args(cfg) + cmd += self._trigger_args(cfg) + cmd += self._decoder_args(cfg) + cmd += ["-O", cfg.output_format, "-o", str(outfile)] + + return cmd, outfile, tmpdir + + def _build_stream_command(self, cfg: CaptureConfig) -> list[str]: + cmd: list[str] = self._base_driver_args() + cmd += self._channel_args(cfg.channels) + cmd += self._config_args(cfg, continuous=True) + cmd += self._trigger_args(cfg) + cmd += self._decoder_args(cfg) + cmd += ["-O", cfg.output_format, "-o", "-"] + return cmd + + def _base_driver_args(self) -> list[str]: + if self.conn: + return [self.executable, "-d", f"{self.driver}:conn={self.conn}"] + return [self.executable, "-d", self.driver] + + def _channel_args(self, selected_names: list[str] | None) -> list[str]: + """Build channel selection/renaming args for sigrok-cli. + + Args: + selected_names: Optional list of semantic names to include + + Returns: + List of args like ["-C", "D0=vcc,D1=cs,D2=miso"] + """ + if not self.channels: + return [] + + # Filter channels if specific names requested + if selected_names: + selected_lower = {name.lower() for name in selected_names} + filtered = {dev: user for dev, user in self.channels.items() if user.lower() in selected_lower} + else: + filtered = self.channels + + # Build channel map: device_name=user_name + channel_map = ",".join(f"{dev}={user}" for dev, user in filtered.items()) + return ["-C", channel_map] if channel_map else [] + + def _config_args(self, cfg: CaptureConfig, *, continuous: bool = False) -> list[str]: + parts = [f"samplerate={cfg.sample_rate}"] + if cfg.pretrigger is not None: + parts.append(f"pretrigger={cfg.pretrigger}") + + args: list[str] = [] + if parts: + args += ["-c", ",".join(parts)] + + # sigrok-cli requires one of: --samples, --frames, --time, or --continuous + # If samples is explicitly specified, use that even for streaming + if cfg.samples is not None: + args.extend(["--samples", str(cfg.samples)]) + elif continuous: + args.append("--continuous") + else: + # Default to 1000 samples if not specified + args.extend(["--samples", "1000"]) + + return args + + def _trigger_args(self, cfg: CaptureConfig) -> list[str]: + if not cfg.triggers: + return [] + trigger_parts = [] + for channel, condition in cfg.triggers.items(): + resolved = self._resolve_channel(channel) + trigger_parts.append(f"{resolved}={condition}") + return ["--triggers", ",".join(trigger_parts)] + + def _decoder_args(self, cfg: CaptureConfig) -> list[str]: + if not cfg.decoders: + return [] + + args: list[str] = [] + for decoder in self._flatten_decoders(cfg.decoders): + pin_map = self._resolve_decoder_channels(decoder) + segments = [decoder.name] + + for pin_name, channel_name in pin_map.items(): + segments.append(f"{pin_name}={self._resolve_channel(channel_name)}") + + if decoder.options: + for key, value in decoder.options.items(): + segments.append(f"{key}={value}") + + args += ["-P", ":".join(segments)] + + if decoder.annotations: + args += ["-A", f"{decoder.name}=" + ",".join(decoder.annotations)] + + return args + + def _flatten_decoders(self, decoders: list[DecoderConfig]) -> list[DecoderConfig]: + flat: list[DecoderConfig] = [] + for decoder in decoders: + flat.append(decoder) + if decoder.stack: + flat.extend(self._flatten_decoders(decoder.stack)) + return flat + + def _resolve_decoder_channels(self, decoder: DecoderConfig) -> dict[str, str]: + if decoder.channels: + return decoder.channels + + # Best-effort auto-mapping based on common decoder pin names + defaults = { + "spi": ["clk", "mosi", "miso", "cs"], + "i2c": ["scl", "sda"], + "uart": ["rx", "tx"], + } + pins = defaults.get(decoder.name.lower()) + if not pins: + return {} + + resolved: dict[str, str] = {} + available_lower = {name.lower(): name for name in self.channels.values()} + for pin in pins: + if pin in available_lower: + resolved[pin] = available_lower[pin] + return resolved + + def _resolve_channel(self, name_or_dn: str) -> str: + """Resolve a user-friendly channel name to device channel name. + + Args: + name_or_dn: User-friendly name (e.g., "clk", "mosi") or device name (e.g., "D0") + + Returns: + Device channel name (e.g., "D0", "D1") + """ + candidate = name_or_dn.strip() + + # If already a device channel name, return as-is + if candidate in self.channels: + return candidate + + # Search for user-friendly name in channel values + for dev_name, user_name in self.channels.items(): + if user_name.lower() == candidate.lower(): + return dev_name + + raise ValueError(f"Channel '{name_or_dn}' not found in channel map {self.channels}") diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py new file mode 100644 index 000000000..ea736288f --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py @@ -0,0 +1,225 @@ +from shutil import which + +import pytest + +from .common import CaptureConfig, CaptureResult +from .driver import Sigrok +from jumpstarter.common.utils import serve + + +@pytest.fixture +def demo_driver_instance(): + """Create a Sigrok driver instance configured for the demo device.""" + # Demo driver has 8 digital channels (D0-D7) and 5 analog (A0-A4) + # Map device channels to decoder-friendly semantic names + return Sigrok( + driver="demo", + channels={ + "D0": "vcc", + "D1": "cs", + "D2": "miso", + "D3": "mosi", + "D4": "clk", + "D5": "sda", + "D6": "scl", + "D7": "gnd", + }, + ) + + +@pytest.fixture +def demo_client(demo_driver_instance): + """Create a client connected to demo driver via serve().""" + with serve(demo_driver_instance) as client: + yield client + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_scan_demo_driver(demo_client): + """Test scanning for demo driver via client.""" + result = demo_client.scan() + assert "demo" in result.lower() or "Demo device" in result + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_capture_with_demo_driver(demo_client): + """Test one-shot capture with demo driver via client. + + This test verifies client-server serialization through serve() pattern. + """ + cfg = CaptureConfig( + sample_rate="100kHz", + samples=100, + output_format="srzip", + ) + + result = demo_client.capture(cfg) + + # Verify we got a proper CaptureResult Pydantic model, not just a dict + assert isinstance(result, CaptureResult), f"Expected CaptureResult, got {type(result)}" + + # Verify model attributes work correctly - data should be bytes, not base64 string! + assert result.data + assert isinstance(result.data, bytes), f"Expected bytes, got {type(result.data)}" + assert len(result.data) > 0 + assert result.output_format == "srzip" + assert result.sample_rate == "100kHz" + assert isinstance(result.channel_map, dict) + assert len(result.channel_map) > 0 + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_capture_csv_format(demo_client): + """Test capture with CSV output format via client.""" + cfg = CaptureConfig( + sample_rate="50kHz", + samples=50, + output_format="csv", + ) + + result = demo_client.capture(cfg) + + # Verify CaptureResult model + assert isinstance(result, CaptureResult) + assert isinstance(result.data, bytes) + + # Decode bytes to string for CSV parsing + csv_text = result.data.decode("utf-8") + + # CSV should have headers and data + assert "vcc" in csv_text or "cs" in csv_text or "clk" in csv_text + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_capture_analog_channels(): + """Test capturing analog data from oscilloscope/demo driver. + + Verifies that the API works for analog channels (oscilloscopes) + as well as digital channels (logic analyzers). + """ + # Create driver with analog channel mappings + analog_driver = Sigrok( + driver="demo", + channels={ + "A0": "voltage_in", + "A1": "sine_wave", + "A2": "square_wave", + }, + ) + + with serve(analog_driver) as client: + cfg = CaptureConfig( + sample_rate="100kHz", + samples=20, + channels=["voltage_in", "sine_wave"], # Select specific analog channels + output_format="csv", + ) + + result = client.capture(cfg) + + # Verify we got analog data + assert isinstance(result, CaptureResult) + assert isinstance(result.data, bytes) + + # Parse CSV to check for analog voltage values + csv_text = result.data.decode("utf-8") + + # Should contain voltage values with units (V, mV) + assert "V" in csv_text or "mV" in csv_text + # Should contain our channel names or original analog channel names + assert "voltage_in" in csv_text or "sine_wave" in csv_text or "A0" in csv_text or "A1" in csv_text + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_capture_with_dict_config(demo_client): + """Test capture with dict config (not CaptureConfig object). + + Verifies that dict configs are properly validated and serialized. + """ + # Pass config as dict instead of CaptureConfig object + cfg_dict = { + "sample_rate": "100kHz", + "samples": 100, + "output_format": "srzip", + } + + result = demo_client.capture(cfg_dict) + + # Verify we still get a proper CaptureResult model + assert isinstance(result, CaptureResult) + assert result.data + assert isinstance(result.data, bytes) + assert len(result.data) > 0 + assert result.output_format == "srzip" + + +@pytest.mark.skip(reason="sigrok-cli demo driver doesn't support streaming to stdout (-o -)") +def test_capture_stream_with_demo(demo_client): + """Test streaming capture with demo driver via client. + + Note: sigrok-cli has limitations with streaming output to stdout. + The demo driver and most output formats don't produce data when using `-o -`. + This feature works better with real hardware and certain output formats. + """ + cfg = CaptureConfig( + sample_rate="100kHz", + samples=1000, + output_format="binary", + ) + + received_bytes = 0 + chunk_count = 0 + + # Collect all chunks + for chunk in demo_client.capture_stream(cfg): + received_bytes += len(chunk) + chunk_count += 1 + + # Should have received some data + assert received_bytes > 0 + assert chunk_count > 0 + + +def test_get_driver_info(demo_client): + """Test getting driver information via client. + + Verifies dict serialization through client-server boundary. + """ + info = demo_client.get_driver_info() + + # Verify it's a dict (not a custom object) + assert isinstance(info, dict) + assert info["driver"] == "demo" + assert "channels" in info + assert isinstance(info["channels"], dict) + + +def test_get_channel_map(demo_client): + """Test getting channel mappings via client. + + Verifies dict serialization through client-server boundary. + """ + channels = demo_client.get_channel_map() + + # Verify it's a dict with proper string keys/values + assert isinstance(channels, dict) + assert all(isinstance(k, str) and isinstance(v, str) for k, v in channels.items()) + assert channels["D0"] == "vcc" + assert channels["D4"] == "clk" + assert channels["D7"] == "gnd" + + +def test_list_output_formats(demo_client): + """Test listing supported output formats via client. + + Verifies list serialization through client-server boundary. + """ + formats = demo_client.list_output_formats() + + # Verify it's a proper list of strings + assert isinstance(formats, list) + assert all(isinstance(f, str) for f in formats) + assert "csv" in formats + assert "srzip" in formats + assert "vcd" in formats + assert "binary" in formats diff --git a/packages/jumpstarter-driver-sigrok/pyproject.toml b/packages/jumpstarter-driver-sigrok/pyproject.toml new file mode 100644 index 000000000..f6cd63aa4 --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/pyproject.toml @@ -0,0 +1,42 @@ +[project] +name = "jumpstarter-driver-sigrok" +dynamic = ["version", "urls"] +description = "Jumpstarter driver wrapping sigrok-cli for logic analyzer and oscilloscope support" +readme = "README.md" +license = "Apache-2.0" +authors = [ + { name = "Miguel Angel Ajo Pelayo", email = "miguelangel@ajo.es" } +] +requires-python = ">=3.11" +dependencies = [ + "jumpstarter", +] + +[tool.hatch.version] +source = "vcs" +raw-options = { 'root' = '../../'} + +[tool.hatch.metadata.hooks.vcs.urls] +Homepage = "https://jumpstarter.dev" +source_archive = "https://github.com/jumpstarter-dev/repo/archive/{commit_hash}.zip" + +[tool.pytest.ini_options] +addopts = "--cov --cov-report=html --cov-report=xml" +log_cli = true +log_cli_level = "INFO" +testpaths = ["jumpstarter_driver_sigrok"] +asyncio_default_fixture_loop_scope = "function" + +[build-system] +requires = ["hatchling", "hatch-vcs", "hatch-pin-jumpstarter"] +build-backend = "hatchling.build" + +[tool.hatch.build.hooks.pin_jumpstarter] +name = "pin_jumpstarter" + +[dependency-groups] +dev = [ + "pytest-cov>=6.0.0", + "pytest>=8.3.3", + "pytest-asyncio>=0.24.0", +] From db223a479ee2b7692b0cbf1883fde22d135c181d Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Mon, 8 Dec 2025 11:32:57 +0100 Subject: [PATCH 02/10] sigrok: csv and vcd parsing --- .../jumpstarter_driver_sigrok/__init__.py | 10 +- .../jumpstarter_driver_sigrok/common.py | 77 ++++++ .../jumpstarter_driver_sigrok/csv.py | 141 +++++++++++ .../jumpstarter_driver_sigrok/csv_test.py | 132 ++++++++++ .../jumpstarter_driver_sigrok/driver.py | 4 +- .../jumpstarter_driver_sigrok/driver_test.py | 218 ++++++++++++++++ .../jumpstarter_driver_sigrok/vcd.py | 226 +++++++++++++++++ .../jumpstarter_driver_sigrok/vcd_test.py | 232 ++++++++++++++++++ uv.lock | 25 ++ 9 files changed, 1061 insertions(+), 4 deletions(-) create mode 100644 packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv.py create mode 100644 packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv_test.py create mode 100644 packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd.py create mode 100644 packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd_test.py diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/__init__.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/__init__.py index 106d87ae4..7b134cb86 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/__init__.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/__init__.py @@ -1,5 +1,11 @@ -from jumpstarter_driver_sigrok.common import CaptureConfig, CaptureResult, DecoderConfig +from jumpstarter_driver_sigrok.common import ( + CaptureConfig, + CaptureResult, + DecoderConfig, + OutputFormat, + Sample, +) from jumpstarter_driver_sigrok.driver import Sigrok -__all__ = ["Sigrok", "CaptureConfig", "CaptureResult", "DecoderConfig"] +__all__ = ["Sigrok", "CaptureConfig", "CaptureResult", "DecoderConfig", "OutputFormat", "Sample"] diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py index cc0110e15..aae5e669d 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py @@ -5,6 +5,27 @@ from pydantic import BaseModel, Field +class OutputFormat: + """Constants for sigrok output formats.""" + CSV = "csv" + BITS = "bits" + ASCII = "ascii" + BINARY = "binary" + SRZIP = "srzip" + VCD = "vcd" + + @classmethod + def all(cls) -> list[str]: + return [cls.CSV, cls.BITS, cls.ASCII, cls.BINARY, cls.SRZIP, cls.VCD] + + +class Sample(BaseModel): + """A single sample with timing information.""" + sample: int # Sample index + time_ns: int # Time in nanoseconds + values: dict[str, int | float] # Channel values (digital: 0/1, analog: voltage) + + class DecoderConfig(BaseModel): """Protocol decoder configuration (real-time during capture).""" @@ -47,3 +68,59 @@ def data(self) -> bytes: from base64 import b64decode return b64decode(self.data_b64) + def decode(self) -> list[Sample] | dict[str, list[int]] | str: + """Parse captured data based on output format. + + Returns: + - CSV format: list[Sample] with timing and all values per sample + - VCD format: list[Sample] with timing and only changed values + - Bits format: dict[str, list[int]] with channel→bit sequences + - ASCII format: str with ASCII art visualization + - Other formats: raises NotImplementedError (use .data for raw bytes) + + Raises: + NotImplementedError: For binary/srzip formats (use .data property) + """ + if self.output_format == OutputFormat.CSV: + from .csv import parse_csv + samples_data = parse_csv(self.data, self.sample_rate) + return [Sample.model_validate(s) for s in samples_data] + elif self.output_format == OutputFormat.VCD: + from .vcd import parse_vcd + samples_data = parse_vcd(self.data, self.sample_rate) + return [Sample.model_validate(s) for s in samples_data] + elif self.output_format == OutputFormat.BITS: + return self._parse_bits() + elif self.output_format == OutputFormat.ASCII: + return self.data.decode("utf-8") + else: + raise NotImplementedError( + f"Parsing not implemented for {self.output_format} format. " + f"Use .data property to get raw bytes." + ) + + def _parse_bits(self) -> dict[str, list[int]]: + """Parse bits format to dict of channel→bit sequences.""" + text = self.data.decode("utf-8") + lines = [line.strip() for line in text.strip().split("\n") if line.strip()] + + # bits format is just columns of 0/1 + # TODO: Need to determine channel mapping from somewhere + # For now, return as generic numbered channels + result: dict[str, list[int]] = {} + + for line in lines: + # Each line might be space/comma separated bits + bits = [int(b) for b in line if b in "01"] + if not result: + # Initialize channels + for i, bit in enumerate(bits): + result[f"CH{i}"] = [bit] + else: + # Append to existing channels + for i, bit in enumerate(bits): + if f"CH{i}" in result: + result[f"CH{i}"].append(bit) + + return result + diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv.py new file mode 100644 index 000000000..1a7626d06 --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv.py @@ -0,0 +1,141 @@ +"""CSV format parser for sigrok captures.""" + +from __future__ import annotations + +import csv + + +def parse_csv(data: bytes, sample_rate: str) -> list[dict]: + """Parse CSV format to list of samples with timing. + + Args: + data: Raw CSV data as bytes + sample_rate: Sample rate string (e.g., "100kHz", "1MHz") + + Returns: + List of dicts with keys: sample, time_ns, values + """ + text = data.decode("utf-8") + lines = text.strip().split("\n") + + # Parse sample rate for timing calculation + sample_rate_hz = _parse_sample_rate_hz(sample_rate) + time_step_ns = int(1_000_000_000.0 / sample_rate_hz) + + # Skip comment lines and analog preview lines (format: "A0: -10.0000 V DC") + # The actual data starts after a header row with types like "logic,logic,V DC,V DC" + data_lines = _extract_csv_data_lines(lines) + + if not data_lines or len(data_lines) < 2: + return [] + + # Parse the CSV data + reader = csv.reader(data_lines) + + # First row is types (logic, V DC, etc.) - use for channel name inference + types_row = next(reader) + + # Get channel names from types + channel_names = _infer_channel_names(types_row) + + # Parse data rows + samples: list[dict] = [] + for idx, row in enumerate(reader): + values = _parse_csv_row(channel_names, row) + samples.append({ + "sample": idx, + "time_ns": idx * time_step_ns, + "values": values, + }) + + return samples + + +def _parse_sample_rate_hz(sample_rate: str) -> float: + """Parse sample rate string to Hz.""" + rate = sample_rate.strip().upper() + multipliers = {"K": 1e3, "M": 1e6, "G": 1e9} + + for suffix, mult in multipliers.items(): + if rate.endswith(f"{suffix}HZ"): + return float(rate[:-3]) * mult + elif rate.endswith(suffix): + return float(rate[:-1]) * mult + + # Assume Hz if no suffix + return float(rate.rstrip("HZ")) + + +def _extract_csv_data_lines(lines: list[str]) -> list[str]: + """Extract actual CSV data lines, skipping comments and analog preview lines.""" + data_lines = [] + + for _i, line in enumerate(lines): + line = line.strip() + # Skip comment lines + if line.startswith(";"): + continue + # Skip analog preview lines (contain colon, not CSV comma-separated) + if ":" in line and "," not in line: + continue + # This is CSV data + data_lines.append(line) + + return data_lines + + +def _infer_channel_names(types_row: list[str]) -> list[str]: + """Infer channel names from CSV type header row. + + Args: + types_row: List of type strings like ["logic", "logic", "V DC", "V DC"] + + Returns: + List of channel names like ["D0", "D1", "A0", "A1"] + """ + channel_names = [] + digital_count = 0 + analog_count = 0 + + for type_str in types_row: + type_lower = type_str.lower() + if "logic" in type_lower: + channel_names.append(f"D{digital_count}") + digital_count += 1 + elif "v" in type_lower or "dc" in type_lower: + # Analog channel + channel_names.append(f"A{analog_count}") + analog_count += 1 + else: + # Unknown type, use generic name + channel_names.append(f"CH{len(channel_names)}") + + return channel_names + + +def _parse_csv_row(channel_names: list[str], row: list[str]) -> dict[str, int | float]: + """Parse a CSV data row into channel values. + + Args: + channel_names: List of channel names + row: List of value strings + + Returns: + Dict mapping channel name to parsed value + """ + values = {} + + for channel, value in zip(channel_names, row, strict=True): + value = value.strip() + # Try to parse as number (analog) or binary (digital) + try: + if "." in value or "e" in value.lower(): + values[channel] = float(value) + else: + values[channel] = int(value) + except ValueError: + # Keep as string if not a number + values[channel] = value + + return values + diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv_test.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv_test.py new file mode 100644 index 000000000..f0589e962 --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv_test.py @@ -0,0 +1,132 @@ +"""Tests for CSV format parser.""" + +from shutil import which + +import pytest + +from .client import SigrokClient +from .common import CaptureConfig, CaptureResult, OutputFormat +from .driver import Sigrok +from jumpstarter.common.utils import serve + + +@pytest.fixture +def demo_driver_instance(): + """Create a Sigrok driver instance configured for the demo device.""" + # Demo driver has 8 digital channels (D0-D7) and 5 analog (A0-A4) + # Map device channels to decoder-friendly semantic names + return Sigrok( + driver="demo", + executable="sigrok-cli", + channels={ + "D0": "vcc", + "D1": "cs", + "D2": "miso", + "D3": "mosi", + "D4": "clk", + "D5": "sda", + "D6": "scl", + "D7": "gnd", + }, + ) + + +@pytest.fixture +def demo_client(demo_driver_instance): + """Create a client for the demo Sigrok driver.""" + with serve(demo_driver_instance) as client: + yield client + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_csv_format_basic(demo_client: SigrokClient): + """Test CSV format capture with demo driver.""" + cfg = CaptureConfig( + sample_rate="50kHz", + samples=50, + output_format=OutputFormat.CSV, + channels=["vcc", "cs"], # Select specific digital channels + ) + + result = demo_client.capture(cfg) + assert isinstance(result, CaptureResult) + assert isinstance(result.data, bytes) + decoded_data = result.decode() + assert isinstance(decoded_data, list) + assert len(decoded_data) > 0 + # Verify channel names are in the data + first_sample = decoded_data[0] + assert "D0" in first_sample.values or "D1" in first_sample.values + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_csv_format_timing(demo_client: SigrokClient): + """Test CSV format timing calculations with integer nanoseconds.""" + cfg = CaptureConfig( + sample_rate="100kHz", + samples=50, + output_format=OutputFormat.CSV, + channels=["D0", "D1", "D2"], # Select specific channels + ) + + result = demo_client.capture(cfg) + assert isinstance(result, CaptureResult) + + # Decode the CSV data + samples = result.decode() + assert isinstance(samples, list) + assert len(samples) > 0 + + # Verify timing progresses correctly + for sample in samples: + assert isinstance(sample.time_ns, int) + # Verify timing progresses (1/100kHz = 10,000ns per sample) + assert sample.time_ns == sample.sample * 10_000 + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_csv_format_analog_channels(demo_client: SigrokClient): + """Test CSV capture of analog channels with voltage values.""" + cfg = CaptureConfig( + sample_rate="100kHz", + samples=20, + output_format=OutputFormat.CSV, + channels=["A0", "A1"], # Select specific analog channels + ) + + result = demo_client.capture(cfg) + assert isinstance(result, CaptureResult) + assert isinstance(result.data, bytes) + decoded_data = result.decode() + assert isinstance(decoded_data, list) + assert len(decoded_data) > 0 + + # Check first sample for analog values + first_sample = decoded_data[0] + assert len(first_sample.values) > 0 + + # Analog values should be floats (voltages) + for _channel, value in first_sample.values.items(): + assert isinstance(value, (int, float)) + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_csv_format_mixed_channels(demo_client: SigrokClient): + """Test CSV with both digital and analog channels.""" + cfg = CaptureConfig( + sample_rate="100kHz", + samples=30, + output_format=OutputFormat.CSV, + channels=["D0", "D1", "A0"], # Mix of digital and analog + ) + + result = demo_client.capture(cfg) + samples = result.decode() + + assert isinstance(samples, list) + assert len(samples) > 0 + + # Verify we have values for channels + first_sample = samples[0] + assert len(first_sample.values) > 0 + diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py index 92081b5c6..852cd1333 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py @@ -8,7 +8,7 @@ from shutil import which from tempfile import TemporaryDirectory -from .common import CaptureConfig, DecoderConfig +from .common import CaptureConfig, DecoderConfig, OutputFormat from jumpstarter.driver import Driver, export @@ -65,7 +65,7 @@ def get_channel_map(self) -> dict[int, str]: @export def list_output_formats(self) -> list[str]: - return ["csv", "srzip", "vcd", "binary", "bits", "ascii"] + return OutputFormat.all() @export def capture(self, config: CaptureConfig | dict) -> dict: diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py index ea736288f..2766a36c8 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py @@ -223,3 +223,221 @@ def test_list_output_formats(demo_client): assert "srzip" in formats assert "vcd" in formats assert "binary" in formats + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_decode_csv_format(demo_client): + """Test decoding CSV format to Sample objects with timing. + + Verifies: + - CSV parsing works through client-server boundary + - Sample objects have timing information + - Values are properly typed (int/float) + """ + from .common import OutputFormat, Sample + + cfg = CaptureConfig( + sample_rate="100kHz", + samples=50, + output_format=OutputFormat.CSV, + channels=["D0", "D1", "D2"], # Select specific channels + ) + + result = demo_client.capture(cfg) + assert isinstance(result, CaptureResult) + + # Decode the CSV data + samples = result.decode() + assert isinstance(samples, list) + assert len(samples) > 0 + + # Verify all samples are Sample objects + for sample in samples: + assert isinstance(sample, Sample) + assert isinstance(sample.sample, int) + assert isinstance(sample.time_ns, int) + assert isinstance(sample.values, dict) + + # Verify timing progresses (1/100kHz = 10,000ns per sample) + assert sample.time_ns == sample.sample * 10_000 + + # Verify values are present + assert len(sample.values) > 0 + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_decode_ascii_format(demo_client): + """Test decoding ASCII format returns string visualization. + + Verifies: + - ASCII format decoding works + - Returns string (not bytes) + """ + from .common import OutputFormat + + cfg = CaptureConfig( + sample_rate="50kHz", + samples=20, + output_format=OutputFormat.ASCII, + channels=["D0", "D1"], + ) + + result = demo_client.capture(cfg) + decoded = result.decode() + + # ASCII format should return string + assert isinstance(decoded, str) + assert len(decoded) > 0 + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_decode_bits_format(demo_client): + """Test decoding bits format to channel→bit sequences. + + Verifies: + - Bits format decoding works + - Returns dict with bit sequences + """ + from .common import OutputFormat + + cfg = CaptureConfig( + sample_rate="100kHz", + samples=30, + output_format=OutputFormat.BITS, + channels=["D0", "D1", "D2"], + ) + + result = demo_client.capture(cfg) + decoded = result.decode() + + # Bits format should return dict + assert isinstance(decoded, dict) + assert len(decoded) > 0 + + # Each channel should have a list of bits + for channel, bits in decoded.items(): + assert isinstance(channel, str) + assert isinstance(bits, list) + assert all(b in [0, 1] for b in bits) + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_decode_vcd_format(demo_client): + """Test decoding VCD format to Sample objects with timing (changes only). + + Verifies: + - VCD parsing works through client-server boundary + - Sample objects have timing information in nanoseconds + - Only changes are recorded (efficient representation) + """ + from .common import OutputFormat, Sample + + cfg = CaptureConfig( + sample_rate="100kHz", + samples=50, + output_format=OutputFormat.VCD, + channels=["D0", "D1", "D2"], # Select specific channels + ) + + result = demo_client.capture(cfg) + assert isinstance(result, CaptureResult) + + # Decode the VCD data + samples = result.decode() + assert isinstance(samples, list) + assert len(samples) > 0 + + # Verify all samples are Sample objects + for sample in samples: + assert isinstance(sample, Sample) + assert isinstance(sample.sample, int) + assert isinstance(sample.time_ns, int) + assert isinstance(sample.values, dict) + + # VCD only records changes, so each sample should have at least one value + assert len(sample.values) > 0 + + # Values should be integers for digital channels + for _channel, value in sample.values.items(): + assert isinstance(value, int) + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_decode_vcd_analog_channels(demo_client): + """Test decoding VCD with analog channels. + + Verifies: + - Analog values are parsed correctly in VCD format + - Timing information is in nanoseconds + """ + from .common import OutputFormat, Sample + + cfg = CaptureConfig( + sample_rate="100kHz", + samples=30, + output_format=OutputFormat.VCD, + channels=["A0", "A1"], # Analog channels + ) + + result = demo_client.capture(cfg) + samples = result.decode() + + assert isinstance(samples, list) + assert len(samples) > 0 + + # Check that samples have analog values + first_sample = samples[0] + assert isinstance(first_sample, Sample) + assert isinstance(first_sample.time_ns, int) + assert len(first_sample.values) > 0 + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_decode_unsupported_format_raises(demo_client): + """Test that decoding unsupported formats raises NotImplementedError.""" + from .common import OutputFormat + + cfg = CaptureConfig( + sample_rate="100kHz", + samples=10, + output_format=OutputFormat.BINARY, + ) + + result = demo_client.capture(cfg) + + # Binary format should not be decodable + with pytest.raises(NotImplementedError): + result.decode() + + +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_decode_analog_csv(demo_client): + """Test decoding CSV with analog channels (voltage values). + + Verifies: + - Analog values are parsed as floats + - Timing information is included + """ + from .common import OutputFormat, Sample + + cfg = CaptureConfig( + sample_rate="100kHz", + samples=30, + output_format=OutputFormat.CSV, + channels=["A0", "A1"], # Analog channels + ) + + result = demo_client.capture(cfg) + samples = result.decode() + + assert isinstance(samples, list) + assert len(samples) > 0 + + # Check first sample for analog values + first_sample = samples[0] + assert isinstance(first_sample, Sample) + assert len(first_sample.values) > 0 + + # Analog values should be floats (voltages) + for _channel, value in first_sample.values.items(): + assert isinstance(value, (int, float)) diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd.py new file mode 100644 index 000000000..c777e79d0 --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd.py @@ -0,0 +1,226 @@ +"""VCD (Value Change Dump) format parser for sigrok captures.""" + +from __future__ import annotations + + +def parse_vcd(data: bytes, sample_rate: str) -> list[dict]: + """Parse VCD format to list of samples with timing (changes only). + + VCD format only records when signals change, making it efficient for + sparse data. Each sample represents a time point where one or more + signals changed. + + Args: + data: Raw VCD data as bytes + sample_rate: Sample rate string (not used for VCD as it has its own timescale) + + Returns: + List of dicts with keys: sample, time_ns, values + """ + text = data.decode("utf-8") + lines = text.strip().split("\n") + + # Parse VCD header to extract timescale and channel mapping + timescale_multiplier = 1 # Default: 1 unit = 1 ns + channel_map: dict[str, str] = {} # symbol → channel name + + for line in lines: + line = line.strip() + + # Parse timescale (e.g., "$timescale 1 us $end" means 1 unit = 1000 ns) + if line.startswith("$timescale"): + timescale_multiplier = _parse_timescale(line) + + # Parse variable definitions (e.g., "$var wire 1 ! D0 $end") + if line.startswith("$var"): + parts = line.split() + if len(parts) >= 5: + symbol = parts[3] # e.g., "!" + channel = parts[4] # e.g., "D0" + channel_map[symbol] = channel + + if line == "$enddefinitions $end": + break + + # Parse value changes + samples: list[dict] = [] + sample_idx = 0 + + for line in lines: + line = line.strip() + if not line or line.startswith("$"): + continue + + # Timestamp line (e.g., "#100 1! 0" 1#") + if line.startswith("#"): + sample_data = _parse_vcd_timestamp_line(line, timescale_multiplier, channel_map) + if sample_data is not None: + sample_data["sample"] = sample_idx + samples.append(sample_data) + sample_idx += 1 + + return samples + + +def _parse_timescale(line: str) -> int: + """Parse timescale line and return multiplier to convert to nanoseconds.""" + parts = line.split() + if len(parts) >= 3: + value = parts[1] + unit = parts[2] + # Convert to nanoseconds multiplier + unit_multipliers = {"s": 1e9, "ms": 1e6, "us": 1e3, "ns": 1, "ps": 1e-3} + return int(float(value) * unit_multipliers.get(unit, 1)) + return 1 + + +def _parse_vcd_timestamp_line(line: str, timescale_multiplier: int, channel_map: dict[str, str]) -> dict | None: + """Parse a VCD timestamp line with value changes. + + Args: + line: Line starting with # (e.g., "#100 1! 0" 1#") + timescale_multiplier: Multiplier to convert time units to nanoseconds + channel_map: Mapping from VCD symbols to channel names + + Returns: + Dict with time_ns and values, or None if line is empty + """ + # Split timestamp from values + parts = line.split(maxsplit=1) + time_str = parts[0][1:] # Remove '#' prefix + + # Skip empty time lines + if not time_str: + return None + + time_units = int(time_str) + current_time_ns = time_units * timescale_multiplier + current_values: dict[str, int | float] = {} + + # Parse value changes if present on the same line + if len(parts) > 1: + values_str = parts[1] + _parse_vcd_value_changes(values_str, channel_map, current_values) + + # Return sample data if we have values + if current_values: + return {"time_ns": current_time_ns, "values": current_values} + + return None + + +def _parse_vcd_value_changes(values_str: str, channel_map: dict[str, str], current_values: dict[str, int | float]): + """Parse value change tokens from a VCD line. + + Modifies current_values dict in place. + + Supports: + - Single-bit: "1!", "0abc" + - Binary: "b11110000 abc" + - Real: "r3.14159 xyz", "r-10.5 !", "r1.23e-5 aa" + """ + i = 0 + while i < len(values_str): + char = values_str[i] + + # Single bit change (e.g., "1!", "0abc" for multi-char identifiers) + if char in "01xzXZ": + symbol, new_i = _extract_symbol(values_str, i + 1) + if symbol in channel_map: + channel = channel_map[symbol] + current_values[channel] = 1 if char == "1" else 0 + i = new_i + + # Binary value (e.g., "b1010 !" or "b1010 abc") + elif char == "b": + value, symbol, new_i = _parse_binary_value(values_str, i, channel_map) + if symbol and value is not None: + current_values[channel_map[symbol]] = value + i = new_i + + # Real (analog) value (e.g., "r3.14 !" or "r-10.5 abc") + elif char == "r": + value, symbol, new_i = _parse_real_value(values_str, i, channel_map) + if symbol and value is not None: + current_values[channel_map[symbol]] = value + i = new_i + + # Skip whitespace + elif char == " ": + i += 1 + else: + i += 1 + + +def _extract_symbol(text: str, start: int) -> tuple[str, int]: + """Extract a VCD symbol (can be multi-character) from text. + + Returns: + Tuple of (symbol, next_position) + """ + end = start + while end < len(text) and text[end] != " ": + end += 1 + return text[start:end], end + + +def _parse_binary_value(values_str: str, start: int, channel_map: dict[str, str]) -> tuple[int | None, str | None, int]: + """Parse a binary value like "b1010 abc". + + Returns: + Tuple of (value, symbol, next_position) + """ + # Extract binary value + value_start = start + 1 + value_end = value_start + while value_end < len(values_str) and values_str[value_end] in "01xzXZ": + value_end += 1 + binary_value = values_str[value_start:value_end] + + # Skip whitespace before symbol + while value_end < len(values_str) and values_str[value_end] == " ": + value_end += 1 + + # Extract symbol + symbol, next_pos = _extract_symbol(values_str, value_end) + + if symbol in channel_map: + try: + return int(binary_value, 2), symbol, next_pos + except ValueError: + return 0, symbol, next_pos + + return None, None, next_pos + + +def _parse_real_value(values_str: str, start: int, channel_map: dict[str, str]) -> tuple[float | None, str | None, int]: + """Parse a real (analog) value like "r3.14 abc" or "r-10.5 !". + + Returns: + Tuple of (value, symbol, next_position) + """ + # Extract real value (number with optional sign, decimal, exponent) + value_start = start + 1 + value_end = value_start + while value_end < len(values_str) and values_str[value_end] not in " ": + if values_str[value_end] in "0123456789-.eE+": + value_end += 1 + else: + break + real_value = values_str[value_start:value_end] + + # Skip whitespace before symbol + while value_end < len(values_str) and values_str[value_end] == " ": + value_end += 1 + + # Extract symbol + symbol, next_pos = _extract_symbol(values_str, value_end) + + if symbol in channel_map: + try: + return float(real_value), symbol, next_pos + except ValueError: + return 0.0, symbol, next_pos + + return None, None, next_pos + diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd_test.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd_test.py new file mode 100644 index 000000000..6af6825d6 --- /dev/null +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd_test.py @@ -0,0 +1,232 @@ +"""Tests for VCD (Value Change Dump) format parser.""" + +from base64 import b64encode + +from .common import CaptureResult, OutputFormat, Sample + + +def test_vcd_parser_comprehensive(): + """Test VCD parser with manually constructed VCD data covering all features. + + This test validates: + - Single-character identifiers (!, ", #) + - Multi-character identifiers (aa, ab, abc) + - Timescale parsing (microseconds to nanoseconds) + - Single-bit values (0/1) + - X/Z state handling + - Binary values (vectors) + - Real (analog) values with various formats + """ + # Construct a comprehensive VCD file + vcd_content = """$date Mon Dec 8 2025 $end +$version Test VCD Generator $end +$timescale 1 us $end +$scope module test $end +$var wire 1 ! D0 $end +$var wire 1 " D1 $end +$var wire 1 # D2 $end +$var wire 1 aa CH95 $end +$var wire 1 ab CH96 $end +$var wire 8 abc BUS0 $end +$var real 1 xyz ANALOG0 $end +$upscope $end +$enddefinitions $end +#0 1! 0" 1# 0aa 1ab b00001111 abc r-10.5 xyz +#5 0! 1" x# 1aa +#10 z! 0" 1# b11110000 abc r3.14159 xyz +#25 1! 1" 0# 0aa 0ab b10101010 abc r0.0 xyz +#100 0! 0" 0# r1.23e-5 xyz +""" + + # Create a CaptureResult with this VCD data + result = CaptureResult( + data_b64=b64encode(vcd_content.encode("utf-8")).decode("ascii"), + output_format=OutputFormat.VCD, + sample_rate="1MHz", + channel_map={ + "D0": "d0", + "D1": "d1", + "D2": "d2", + "CH95": "ch95", + "CH96": "ch96", + "BUS0": "bus", + "ANALOG0": "analog", + }, + triggers=None, + decoders=None, + ) + + # Parse the VCD + samples = result.decode() + + # Verify we got the expected number of samples + assert len(samples) == 5 + + # Sample 0 at time 0us = 0ns + s0 = samples[0] + assert s0.time_ns == 0 + assert s0.values["D0"] == 1 + assert s0.values["D1"] == 0 + assert s0.values["D2"] == 1 + assert s0.values["CH95"] == 0 # Multi-char identifier "aa" + assert s0.values["CH96"] == 1 # Multi-char identifier "ab" + assert s0.values["BUS0"] == 0b00001111 # Binary value + assert abs(s0.values["ANALOG0"] - (-10.5)) < 0.001 # Real value + + # Sample 1 at time 5us = 5000ns + s1 = samples[1] + assert s1.time_ns == 5000 + assert s1.values["D0"] == 0 + assert s1.values["D1"] == 1 + assert s1.values["D2"] == 0 # X converted to 0 + assert s1.values["CH95"] == 1 + + # Sample 2 at time 10us = 10000ns + s2 = samples[2] + assert s2.time_ns == 10000 + assert s2.values["D0"] == 0 # Z converted to 0 + assert s2.values["D1"] == 0 + assert s2.values["D2"] == 1 + assert s2.values["BUS0"] == 0b11110000 + assert abs(s2.values["ANALOG0"] - 3.14159) < 0.001 + + # Sample 3 at time 25us = 25000ns + s3 = samples[3] + assert s3.time_ns == 25000 + assert s3.values["D0"] == 1 + assert s3.values["D1"] == 1 + assert s3.values["D2"] == 0 + assert s3.values["CH95"] == 0 + assert s3.values["CH96"] == 0 + assert s3.values["BUS0"] == 0b10101010 + assert abs(s3.values["ANALOG0"] - 0.0) < 0.001 + + # Sample 4 at time 100us = 100000ns + s4 = samples[4] + assert s4.time_ns == 100000 + assert s4.values["D0"] == 0 + assert s4.values["D1"] == 0 + assert s4.values["D2"] == 0 + assert abs(s4.values["ANALOG0"] - 1.23e-5) < 1e-10 # Scientific notation + + +def test_vcd_parser_timescale_variations(): + """Test VCD parser with different timescale values.""" + # Test different timescales + test_cases = [ + ("1 ns", 1, 0), # 1ns timescale, time 0 = 0ns + ("1 us", 1000, 0), # 1us timescale, time 0 = 0ns + ("1 ms", 1000000, 0), # 1ms timescale, time 0 = 0ns + ("10 ns", 10, 100 * 10), # 10ns timescale, time 100 = 1000ns + ("100 ns", 100, 50 * 100), # 100ns timescale, time 50 = 5000ns + ] + + for timescale_str, _multiplier, expected_time_ns in test_cases: + vcd_content = f"""$timescale {timescale_str} $end +$var wire 1 ! D0 $end +$enddefinitions $end +#0 1! +#{100 if expected_time_ns else 0} 0! +""" + result = CaptureResult( + data_b64=b64encode(vcd_content.encode("utf-8")).decode("ascii"), + output_format=OutputFormat.VCD, + sample_rate="1MHz", + channel_map={"D0": "d0"}, + ) + + samples = result.decode() + assert len(samples) >= 1 + # First sample at time 0 + assert samples[0].time_ns == 0 + + +def test_vcd_parser_empty_timestamps(): + """Test VCD parser handles empty timestamp lines correctly.""" + vcd_content = """$timescale 1 ns $end +$var wire 1 ! D0 $end +$enddefinitions $end +#0 1! +#10 0! +# +#20 1! +""" + + result = CaptureResult( + data_b64=b64encode(vcd_content.encode("utf-8")).decode("ascii"), + output_format=OutputFormat.VCD, + sample_rate="1MHz", + channel_map={"D0": "d0"}, + ) + + samples = result.decode() + # Should have 3 samples (empty timestamp line skipped) + assert len(samples) == 3 + assert samples[0].time_ns == 0 + assert samples[1].time_ns == 10 + assert samples[2].time_ns == 20 + + +def test_vcd_parser_large_channel_count(): + """Test VCD parser with large channel counts using multi-char identifiers. + + According to libsigrok vcd_identifier(): + - Channels 0-93: Single char (!, ", ..., ~) + - Channels 94-769: Two lowercase letters (aa, ab, ..., zz) + - Channels 770+: Three lowercase letters (aaa, aab, ...) + """ + # Test identifiers at boundaries + vcd_content = """$timescale 1 ns $end +$var wire 1 ! CH0 $end +$var wire 1 ~ CH93 $end +$var wire 1 aa CH94 $end +$var wire 1 ab CH95 $end +$var wire 1 zz CH769 $end +$var wire 1 aaa CH770 $end +$var wire 1 abc CH800 $end +$enddefinitions $end +#0 1! 0~ 1aa 0ab 1zz 0aaa 1abc +#100 0! 1~ 0aa 1ab 0zz 1aaa 0abc +""" + + result = CaptureResult( + data_b64=b64encode(vcd_content.encode("utf-8")).decode("ascii"), + output_format=OutputFormat.VCD, + sample_rate="1MHz", + channel_map={ + "CH0": "ch0", + "CH93": "ch93", + "CH94": "ch94", + "CH95": "ch95", + "CH769": "ch769", + "CH770": "ch770", + "CH800": "ch800", + }, + ) + + samples = result.decode() + + # Verify first sample + assert len(samples) == 2 + s0 = samples[0] + assert isinstance(s0, Sample) + assert s0.time_ns == 0 + assert s0.values["CH0"] == 1 # Single char: ! + assert s0.values["CH93"] == 0 # Single char: ~ + assert s0.values["CH94"] == 1 # Two char: aa + assert s0.values["CH95"] == 0 # Two char: ab + assert s0.values["CH769"] == 1 # Two char: zz + assert s0.values["CH770"] == 0 # Three char: aaa + assert s0.values["CH800"] == 1 # Three char: abc + + # Verify second sample + s1 = samples[1] + assert s1.time_ns == 100 + assert s1.values["CH0"] == 0 + assert s1.values["CH93"] == 1 + assert s1.values["CH94"] == 0 + assert s1.values["CH95"] == 1 + assert s1.values["CH769"] == 0 + assert s1.values["CH770"] == 1 + assert s1.values["CH800"] == 0 + diff --git a/uv.lock b/uv.lock index 99fa450fa..495fdc1b7 100644 --- a/uv.lock +++ b/uv.lock @@ -31,6 +31,7 @@ members = [ "jumpstarter-driver-ridesx", "jumpstarter-driver-sdwire", "jumpstarter-driver-shell", + "jumpstarter-driver-sigrok", "jumpstarter-driver-snmp", "jumpstarter-driver-ssh", "jumpstarter-driver-ssh-mitm", @@ -2073,6 +2074,30 @@ dev = [ { name = "pytest-cov", specifier = ">=6.0.0" }, ] +[[package]] +name = "jumpstarter-driver-sigrok" +source = { editable = "packages/jumpstarter-driver-sigrok" } +dependencies = [ + { name = "jumpstarter" }, +] + +[package.dev-dependencies] +dev = [ + { name = "pytest" }, + { name = "pytest-asyncio" }, + { name = "pytest-cov" }, +] + +[package.metadata] +requires-dist = [{ name = "jumpstarter", editable = "packages/jumpstarter" }] + +[package.metadata.requires-dev] +dev = [ + { name = "pytest", specifier = ">=8.3.3" }, + { name = "pytest-asyncio", specifier = ">=0.24.0" }, + { name = "pytest-cov", specifier = ">=6.0.0" }, +] + [[package]] name = "jumpstarter-driver-snmp" source = { editable = "packages/jumpstarter-driver-snmp" } From c8647c4ae93f15271a5fabbe2905ec472202fac0 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Mon, 8 Dec 2025 12:07:15 +0100 Subject: [PATCH 03/10] sigrok: improve bit parsing channel mapping --- .../jumpstarter_driver_sigrok/common.py | 38 +++++++++++-------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py index aae5e669d..a5c2b18e8 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py @@ -100,27 +100,35 @@ def decode(self) -> list[Sample] | dict[str, list[int]] | str: ) def _parse_bits(self) -> dict[str, list[int]]: - """Parse bits format to dict of channel→bit sequences.""" + """Parse bits format to dict of channel→bit sequences. + + Sigrok-cli bits format: "D0:10001\\nD1:01110\\n..." + Each line has format "channel_name:bits" + + Note: For large sample counts, sigrok-cli wraps bits across multiple + lines with repeated channel names. We accumulate all occurrences. + """ text = self.data.decode("utf-8") lines = [line.strip() for line in text.strip().split("\n") if line.strip()] - # bits format is just columns of 0/1 - # TODO: Need to determine channel mapping from somewhere - # For now, return as generic numbered channels result: dict[str, list[int]] = {} for line in lines: - # Each line might be space/comma separated bits - bits = [int(b) for b in line if b in "01"] - if not result: - # Initialize channels - for i, bit in enumerate(bits): - result[f"CH{i}"] = [bit] - else: - # Append to existing channels - for i, bit in enumerate(bits): - if f"CH{i}" in result: - result[f"CH{i}"].append(bit) + # Bits format: "D0:10001" or "A0:10001" + if ":" in line: + channel_device_name, bits_str = line.split(":", 1) + channel_device_name = channel_device_name.strip() + + # Map device name (D0) to user-friendly name (vcc) if available + channel_name = self.channel_map.get(channel_device_name, channel_device_name) + + # Parse bits from this line + bits = [int(b) for b in bits_str if b in "01"] + + # Accumulate bits for this channel (may appear on multiple lines) + if channel_name not in result: + result[channel_name] = [] + result[channel_name].extend(bits) return result From 12b1f2df5a939a0002c10dc766cd3516becb26f1 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Mon, 8 Dec 2025 12:07:30 +0100 Subject: [PATCH 04/10] sigrok: improve documentation --- packages/jumpstarter-driver-sigrok/README.md | 209 ++++++++++++++---- .../jumpstarter_driver_sigrok/common.py | 5 +- .../jumpstarter_driver_sigrok/driver.py | 8 +- .../jumpstarter_driver_sigrok/driver_test.py | 48 +++- 4 files changed, 214 insertions(+), 56 deletions(-) diff --git a/packages/jumpstarter-driver-sigrok/README.md b/packages/jumpstarter-driver-sigrok/README.md index 4c1a75b35..4e42a404b 100644 --- a/packages/jumpstarter-driver-sigrok/README.md +++ b/packages/jumpstarter-driver-sigrok/README.md @@ -1,11 +1,10 @@ # Sigrok Driver -`jumpstarter-driver-sigrok` wraps `sigrok-cli` to provide logic analyzer and oscilloscope capture from Jumpstarter exporters. It supports: -- **Logic analyzers** (digital channels) - with protocol decoding (SPI, I2C, UART, etc.) +`jumpstarter-driver-sigrok` wraps [sigrok-cli](https://sigrok.org/wiki/Sigrok-cli) to provide logic analyzer and oscilloscope capture from Jumpstarter exporters. It supports: +- **Logic analyzers** (digital channels) - **Oscilloscopes** (analog channels) - voltage waveform capture - One-shot and streaming capture -- Decoder-friendly channel mappings -- Real-time protocol decoding +- Multiple output formats with parsing (VCD, CSV, Bits, ASCII) ## Installation @@ -19,44 +18,34 @@ pip3 install --extra-index-url https://pkg.jumpstarter.dev/simple/ jumpstarter-d export: sigrok: type: jumpstarter_driver_sigrok.driver.Sigrok - driver: demo # sigrok driver (demo, fx2lafw, etc.) - conn: null # optional: USB VID.PID or serial path - executable: null # optional: path to sigrok-cli (auto-detected) - channels: # channel mappings (device_name: semantic_name) - D0: vcc - D1: cs + driver: fx2lafw # sigrok driver (demo, fx2lafw, rigol-ds, etc.) + conn: null # optional: USB VID.PID, serial path, or null for auto + channels: # optional: map device channels to friendly names + D0: clk + D1: mosi D2: miso - D3: mosi - D4: clk - D5: sda - D6: scl + D3: cs ``` -## CaptureConfig (client-side) +### Configuration Parameters -```python -from jumpstarter_driver_sigrok.common import CaptureConfig, DecoderConfig +| Parameter | Description | Type | Required | Default | +|-----------|-------------|------|----------|---------| +| `driver` | Sigrok driver name (e.g., `demo`, `fx2lafw`, `rigol-ds`) | str | yes | - | +| `conn` | Connection string (USB VID.PID, serial path, or `null` for auto-detect) | str \| None | no | None | +| `executable` | Path to `sigrok-cli` executable | str | no | Auto-detected from PATH | +| `channels` | Channel mapping from device names (D0, A0) to semantic names (clk, voltage) | dict[str, str] | no | {} (empty) | -config = CaptureConfig( - sample_rate="8MHz", - samples=20000, - pretrigger=5000, - triggers={"cs": "falling"}, - decoders=[ - DecoderConfig( - name="spi", - channels={"clk": "clk", "mosi": "mosi", "miso": "miso", "cs": "cs"}, - annotations=["mosi-data"], - ) - ], -) -``` +## CaptureConfig Parameters (client-side) -This maps to: -```bash -sigrok-cli -d fx2lafw -c samplerate=8MHz,samples=20000,pretrigger=5000 --triggers D1=falling \ - -P spi:clk=D4:mosi=D3:miso=D2:cs=D1 -A spi=mosi-data -``` +| Parameter | Description | Type | Required | Default | +|-----------|-------------|------|----------|---------| +| `sample_rate` | Sampling rate (e.g., `"1M"`, `"8MHz"`, `"24000000"`) | str | no | "1M" | +| `samples` | Number of samples to capture (`None` for continuous) | int \| None | no | None | +| `pretrigger` | Number of samples to capture before trigger | int \| None | no | None | +| `triggers` | Trigger conditions by channel name (e.g., `{"cs": "falling"}`) | dict[str, str] \| None | no | None | +| `channels` | List of channel names to capture (overrides defaults) | list[str] \| None | no | None | +| `output_format` | Output format (vcd, csv, bits, ascii, srzip, binary) | str | no | "vcd" | ## Client API @@ -67,23 +56,107 @@ sigrok-cli -d fx2lafw -c samplerate=8MHz,samples=20000,pretrigger=5000 --trigger - `get_channel_map()` — device-to-semantic name mappings - `list_output_formats()` — supported formats (csv, srzip, vcd, binary, bits, ascii) +## Output Formats + +The driver supports multiple output formats. **VCD (Value Change Dump) is the default** because: +- ✅ **Efficient**: Only records signal changes (not every sample) +- ✅ **Precise timing**: Includes exact timestamps in nanoseconds +- ✅ **Widely supported**: Standard format for signal analysis tools +- ✅ **Mixed signals**: Handles both digital and analog data + +### Available Formats + +| Format | Use Case | Decoded By | +|--------|----------|------------| +| `vcd` (default) | Change-based signals with timing | `result.decode()` → `list[Sample]` | +| `csv` | All samples with timing | `result.decode()` → `list[Sample]` | +| `bits` | Bit sequences by channel | `result.decode()` → `dict[str, list[int]]` | +| `ascii` | ASCII art visualization | `result.decode()` → `str` | +| `srzip` | Raw sigrok session (for PulseView) | `result.data` (raw bytes) | +| `binary` | Raw binary data | `result.data` (raw bytes) | + +### Output Format Constants + +```python +from jumpstarter_driver_sigrok.common import OutputFormat + +config = CaptureConfig( + sample_rate="1MHz", + samples=1000, + output_format=OutputFormat.VCD, # or CSV, BITS, ASCII, SRZIP, BINARY +) +``` + ## Examples -### Logic Analyzer (Digital Channels) +### Example 1: Simple Capture (VCD format - default) + +**Python client code:** +```python +from jumpstarter_driver_sigrok.common import CaptureConfig + +# Capture with default VCD format (efficient, change-based with timing) +config = CaptureConfig( + sample_rate="1MHz", + samples=1000, + channels=["D0", "D1", "D2"], # Use device channel names or mapped names +) +result = client.capture(config) -One-shot with trigger: +# Decode VCD to get samples with timing +samples = result.decode() # list[Sample] +for sample in samples[:5]: + print(f"Time: {sample.time_ns}ns, Values: {sample.values}") +``` + +**Equivalent sigrok-cli command:** ```bash -sigrok-cli -d fx2lafw -c samplerate=8MHz,samples=20000,pretrigger=5000 --triggers D0=rising -o out.sr +sigrok-cli -d fx2lafw -C D0,D1,D2 \ + -c samplerate=1MHz --samples 1000 \ + -O vcd -o /tmp/capture.vcd ``` -Real-time decode (SPI): +--- + +### Example 2: Triggered Capture with Pretrigger + +**Python client code:** +```python +from jumpstarter_driver_sigrok.common import CaptureConfig + +# Capture with trigger and pretrigger buffer (VCD format - default) +config = CaptureConfig( + sample_rate="8MHz", + samples=20000, + pretrigger=5000, # Capture 5000 samples before trigger + triggers={"D0": "rising"}, # Trigger on D0 rising edge + channels=["D0", "D1", "D2", "D3"], + # output_format defaults to VCD (efficient change-based format) +) +result = client.capture(config) + +# Decode to analyze signal changes with precise timing +samples = result.decode() # list[Sample] - only changes recorded +print(f"Captured {len(samples)} signal changes") + +# Access timing and values +for sample in samples[:3]: + print(f"Time: {sample.time_ns}ns, Changed: {sample.values}") +``` + +**Equivalent sigrok-cli command:** ```bash -sigrok-cli -d fx2lafw -c samplerate=1M --continuous \ - -P spi:clk=D4:mosi=D3:miso=D2:cs=D1 -A spi=mosi-data +sigrok-cli -d fx2lafw -C D0,D1,D2,D3 \ + -c samplerate=8MHz,samples=20000,pretrigger=5000 \ + --triggers D0=rising \ + -O vcd -o /tmp/capture.vcd ``` -### Oscilloscope (Analog Channels) +--- +### Example 3: Oscilloscope (Analog Channels) + +**Exporter configuration:** ```yaml export: oscilloscope: @@ -95,16 +168,60 @@ export: A1: CH2 ``` +**Python client code:** ```python -from jumpstarter_driver_sigrok.common import CaptureConfig +from jumpstarter_driver_sigrok.common import CaptureConfig, OutputFormat # Capture analog waveforms config = CaptureConfig( sample_rate="1MHz", samples=10000, channels=["CH1", "CH2"], # Analog channels - output_format="csv", # or "vcd" for waveform viewers + output_format=OutputFormat.CSV, # CSV for voltage values ) result = client.capture(config) -waveform_data = result.data # bytes with voltage measurements + +# Parse voltage data +samples = result.decode() # list[Sample] +for sample in samples[:5]: + print(f"Time: {sample.time_ns}ns") + print(f" CH1: {sample.values.get('A0', 'N/A')}V") + print(f" CH2: {sample.values.get('A1', 'N/A')}V") +``` + +**Equivalent sigrok-cli command:** +```bash +sigrok-cli -d rigol-ds:conn=usb -C A0=CH1,A1=CH2 \ + -c samplerate=1MHz --samples 10000 \ + -O csv -o /tmp/capture.csv +``` + +--- + +### Example 4: Bits Format (Simple Bit Sequences) + +**Python client code:** +```python +from jumpstarter_driver_sigrok.common import CaptureConfig, OutputFormat + +# Capture in bits format (useful for visual inspection) +config = CaptureConfig( + sample_rate="100kHz", + samples=100, + channels=["D0", "D1", "D2"], + output_format=OutputFormat.BITS, +) +result = client.capture(config) + +# Get bit sequences per channel +bits_by_channel = result.decode() # dict[str, list[int]] +for channel, bits in bits_by_channel.items(): + print(f"{channel}: {''.join(map(str, bits[:20]))}") # First 20 bits +``` + +**Equivalent sigrok-cli command:** +```bash +sigrok-cli -d demo -C D0,D1,D2 \ + -c samplerate=100kHz --samples 100 \ + -O bits -o /tmp/capture.bits ``` diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py index a5c2b18e8..10f12662b 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py @@ -43,8 +43,9 @@ class CaptureConfig(BaseModel): triggers: dict[str, str] | None = Field(default=None, description="e.g., {'D0': 'rising'}") channels: list[str] | None = Field(default=None, description="override default channels by name") output_format: str = Field( - default="srzip", - description="csv, srzip, vcd, binary, bits, ascii", + default=OutputFormat.VCD, + description="Output format (default: vcd - efficient change-based format with timing). " + "Options: vcd, csv, srzip, binary, bits, ascii", ) decoders: list[DecoderConfig] | None = Field(default=None, description="real-time protocol decoding") diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py index 852cd1333..eac3f2fbd 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py @@ -19,12 +19,6 @@ def find_sigrok_cli() -> str: return executable -def _default_channel_map() -> dict[str, str]: - # Decoder-friendly default names for demo driver - # Maps device channel name -> semantic name - return {"D0": "vcc", "D1": "cs", "D2": "miso", "D3": "mosi", "D4": "clk", "D5": "sda", "D6": "scl"} - - @dataclass(kw_only=True) class Sigrok(Driver): """Sigrok driver wrapping sigrok-cli for logic analyzer and oscilloscope support.""" @@ -32,7 +26,7 @@ class Sigrok(Driver): driver: str = "demo" conn: str | None = None executable: str = field(default_factory=find_sigrok_cli) - channels: dict[str, str] = field(default_factory=_default_channel_map) + channels: dict[str, str] = field(default_factory=dict) def __post_init__(self): if hasattr(super(), "__post_init__"): diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py index 2766a36c8..1f580634e 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py @@ -2,7 +2,7 @@ import pytest -from .common import CaptureConfig, CaptureResult +from .common import CaptureConfig, CaptureResult, OutputFormat from .driver import Sigrok from jumpstarter.common.utils import serve @@ -68,6 +68,43 @@ def test_capture_with_demo_driver(demo_client): assert len(result.channel_map) > 0 +@pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") +def test_capture_default_format(demo_client): + """Test capture with default output format (VCD). + + VCD is the default because it's the most efficient format: + - Only records changes (not every sample) + - Includes precise timing information + - Widely supported by signal analysis tools + """ + # Don't specify output_format - should default to VCD + cfg = CaptureConfig( + sample_rate="100kHz", + samples=50, + channels=["D0", "D1", "D2"], + ) + + result = demo_client.capture(cfg) + + # Verify we got VCD format by default + assert isinstance(result, CaptureResult) + assert result.output_format == OutputFormat.VCD + assert isinstance(result.data, bytes) + assert len(result.data) > 0 + + # Verify VCD data can be decoded + samples = result.decode() + assert isinstance(samples, list) + assert len(samples) > 0 + + # Verify samples have timing information (VCD feature) + for sample in samples: + assert hasattr(sample, "time_ns") + assert isinstance(sample.time_ns, int) + assert hasattr(sample, "values") + assert isinstance(sample.values, dict) + + @pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") def test_capture_csv_format(demo_client): """Test capture with CSV output format via client.""" @@ -297,6 +334,7 @@ def test_decode_bits_format(demo_client): Verifies: - Bits format decoding works - Returns dict with bit sequences + - Channel names are mapped from device names (D0) to user-friendly names (vcc) """ from .common import OutputFormat @@ -314,11 +352,19 @@ def test_decode_bits_format(demo_client): assert isinstance(decoded, dict) assert len(decoded) > 0 + # Should have user-friendly channel names (vcc, cs, miso) from channel_map + # Not generic names like CH0, CH1 + assert "vcc" in decoded or "D0" in decoded + assert "cs" in decoded or "D1" in decoded + assert "miso" in decoded or "D2" in decoded + # Each channel should have a list of bits for channel, bits in decoded.items(): assert isinstance(channel, str) assert isinstance(bits, list) assert all(b in [0, 1] for b in bits) + # Should have bits (at least some, exact count may vary with demo driver timing) + assert len(bits) > 0 @pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") From cc3c55b69e5f65aa64d1f6b3255030df6e516935 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Mon, 8 Dec 2025 17:08:23 +0100 Subject: [PATCH 05/10] sigrok: tests should pass without sigrok-cli --- .../jumpstarter_driver_sigrok/driver.py | 27 ++++++++++++++----- .../jumpstarter_driver_sigrok/driver_test.py | 6 +++++ 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py index eac3f2fbd..88bca75a1 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py @@ -12,11 +12,13 @@ from jumpstarter.driver import Driver, export -def find_sigrok_cli() -> str: - executable = which("sigrok-cli") - if executable is None: - raise FileNotFoundError("sigrok-cli executable not found in PATH") - return executable +def find_sigrok_cli() -> str | None: + """Find sigrok-cli executable in PATH. + + Returns: + Path to executable or None if not found + """ + return which("sigrok-cli") @dataclass(kw_only=True) @@ -25,13 +27,21 @@ class Sigrok(Driver): driver: str = "demo" conn: str | None = None - executable: str = field(default_factory=find_sigrok_cli) + executable: str | None = field(default_factory=find_sigrok_cli) channels: dict[str, str] = field(default_factory=dict) def __post_init__(self): if hasattr(super(), "__post_init__"): super().__post_init__() + def _ensure_executable(self): + """Ensure sigrok-cli is available.""" + if self.executable is None: + raise FileNotFoundError( + "sigrok-cli executable not found in PATH. " + "Please install sigrok-cli to use this driver." + ) + @classmethod def client(cls) -> str: return "jumpstarter_driver_sigrok.client.SigrokClient" @@ -41,6 +51,8 @@ def client(cls) -> str: @export def scan(self) -> str: """List devices for the configured driver.""" + self._ensure_executable() + assert self.executable is not None cmd = [self.executable, "--driver", self.driver, "--scan"] result = subprocess.run(cmd, capture_output=True, text=True, check=True) return result.stdout @@ -64,6 +76,7 @@ def list_output_formats(self) -> list[str]: @export def capture(self, config: CaptureConfig | dict) -> dict: """One-shot capture; returns dict with base64-encoded binary data.""" + self._ensure_executable() cfg = CaptureConfig.model_validate(config) cmd, outfile, tmpdir = self._build_capture_command(cfg) @@ -87,6 +100,7 @@ def capture(self, config: CaptureConfig | dict) -> dict: @export async def capture_stream(self, config: CaptureConfig | dict): """Streaming capture; yields chunks of binary data from sigrok-cli stdout.""" + self._ensure_executable() cfg = CaptureConfig.model_validate(config) cmd = self._build_stream_command(cfg) @@ -139,6 +153,7 @@ def _build_stream_command(self, cfg: CaptureConfig) -> list[str]: return cmd def _base_driver_args(self) -> list[str]: + assert self.executable is not None if self.conn: return [self.executable, "-d", f"{self.driver}:conn={self.conn}"] return [self.executable, "-d", self.driver] diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py index 1f580634e..f6db8e066 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py @@ -6,6 +6,12 @@ from .driver import Sigrok from jumpstarter.common.utils import serve +# Skip all integration tests if sigrok-cli is not available +pytestmark = pytest.mark.skipif( + which("sigrok-cli") is None, + reason="sigrok-cli not found in PATH" +) + @pytest.fixture def demo_driver_instance(): From 6595ce9be94fb95030b0fe29c75b62d7e85acd68 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Mon, 8 Dec 2025 17:10:27 +0100 Subject: [PATCH 06/10] sigrok: add sigrok-cli for testing --- .github/workflows/pytest.yaml | 11 +++++++++++ Dockerfile | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/.github/workflows/pytest.yaml b/.github/workflows/pytest.yaml index 81b8ae52e..13b8406cf 100644 --- a/.github/workflows/pytest.yaml +++ b/.github/workflows/pytest.yaml @@ -51,11 +51,22 @@ jobs: sudo apt-get update sudo apt-get install -y libgpiod-dev liblgpio-dev + - name: Install sigrok-cli (Linux) + if: runner.os == 'Linux' + run: | + sudo apt-get update + sudo apt-get install -y sigrok-cli + - name: Install Qemu (macOS) if: runner.os == 'macOS' run: | brew install qemu + - name: Install sigrok-cli (macOS) + if: runner.os == 'macOS' + run: | + brew install sigrok-cli + - name: Cache Fedora Cloud images id: cache-fedora-cloud-images uses: actions/cache@v4 diff --git a/Dockerfile b/Dockerfile index 5b2ffb481..336c92636 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,7 +7,7 @@ RUN dnf install -y make git && \ COPY --from=uv /uv /uvx /bin/ FROM fedora:42 AS product -RUN dnf install -y python3 ustreamer libusb1 android-tools python3-libgpiod && \ +RUN dnf install -y python3 ustreamer libusb1 android-tools python3-libgpiod sigrok-cli && \ dnf clean all && \ rm -rf /var/cache/dnf COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/ From a6605f59c385fb1d69fb59be8700881947828b42 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Mon, 8 Dec 2025 21:42:16 +0100 Subject: [PATCH 07/10] sigrok: generate decoded output instead of list --- .../jumpstarter_driver_sigrok/common.py | 12 +++++------ .../jumpstarter_driver_sigrok/csv.py | 20 +++++++++---------- .../jumpstarter_driver_sigrok/csv_test.py | 8 ++++---- .../jumpstarter_driver_sigrok/driver_test.py | 10 +++++----- .../jumpstarter_driver_sigrok/vcd.py | 17 ++++++++-------- .../jumpstarter_driver_sigrok/vcd_test.py | 8 ++++---- 6 files changed, 36 insertions(+), 39 deletions(-) diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py index 10f12662b..9dddd405d 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import Any, Iterator from pydantic import BaseModel, Field @@ -69,12 +69,12 @@ def data(self) -> bytes: from base64 import b64decode return b64decode(self.data_b64) - def decode(self) -> list[Sample] | dict[str, list[int]] | str: + def decode(self) -> Iterator[Sample] | dict[str, list[int]] | str: """Parse captured data based on output format. Returns: - - CSV format: list[Sample] with timing and all values per sample - - VCD format: list[Sample] with timing and only changed values + - CSV format: Iterator[Sample] yielding samples with timing and all values per sample + - VCD format: Iterator[Sample] yielding samples with timing and only changed values - Bits format: dict[str, list[int]] with channel→bit sequences - ASCII format: str with ASCII art visualization - Other formats: raises NotImplementedError (use .data for raw bytes) @@ -85,11 +85,11 @@ def decode(self) -> list[Sample] | dict[str, list[int]] | str: if self.output_format == OutputFormat.CSV: from .csv import parse_csv samples_data = parse_csv(self.data, self.sample_rate) - return [Sample.model_validate(s) for s in samples_data] + return (Sample.model_validate(s) for s in samples_data) elif self.output_format == OutputFormat.VCD: from .vcd import parse_vcd samples_data = parse_vcd(self.data, self.sample_rate) - return [Sample.model_validate(s) for s in samples_data] + return (Sample.model_validate(s) for s in samples_data) elif self.output_format == OutputFormat.BITS: return self._parse_bits() elif self.output_format == OutputFormat.ASCII: diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv.py index 1a7626d06..cda197c80 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv.py @@ -3,17 +3,18 @@ from __future__ import annotations import csv +from typing import Iterator -def parse_csv(data: bytes, sample_rate: str) -> list[dict]: - """Parse CSV format to list of samples with timing. +def parse_csv(data: bytes, sample_rate: str) -> Iterator[dict]: + """Parse CSV format to iterator of samples with timing. Args: data: Raw CSV data as bytes sample_rate: Sample rate string (e.g., "100kHz", "1MHz") - Returns: - List of dicts with keys: sample, time_ns, values + Yields: + Dicts with keys: sample, time_ns, values """ text = data.decode("utf-8") lines = text.strip().split("\n") @@ -27,7 +28,7 @@ def parse_csv(data: bytes, sample_rate: str) -> list[dict]: data_lines = _extract_csv_data_lines(lines) if not data_lines or len(data_lines) < 2: - return [] + return # Parse the CSV data reader = csv.reader(data_lines) @@ -38,17 +39,14 @@ def parse_csv(data: bytes, sample_rate: str) -> list[dict]: # Get channel names from types channel_names = _infer_channel_names(types_row) - # Parse data rows - samples: list[dict] = [] + # Parse and yield data rows one by one for idx, row in enumerate(reader): values = _parse_csv_row(channel_names, row) - samples.append({ + yield { "sample": idx, "time_ns": idx * time_step_ns, "values": values, - }) - - return samples + } def _parse_sample_rate_hz(sample_rate: str) -> float: diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv_test.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv_test.py index f0589e962..de61720a0 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv_test.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv_test.py @@ -51,7 +51,7 @@ def test_csv_format_basic(demo_client: SigrokClient): result = demo_client.capture(cfg) assert isinstance(result, CaptureResult) assert isinstance(result.data, bytes) - decoded_data = result.decode() + decoded_data = list(result.decode()) assert isinstance(decoded_data, list) assert len(decoded_data) > 0 # Verify channel names are in the data @@ -73,7 +73,7 @@ def test_csv_format_timing(demo_client: SigrokClient): assert isinstance(result, CaptureResult) # Decode the CSV data - samples = result.decode() + samples = list(result.decode()) assert isinstance(samples, list) assert len(samples) > 0 @@ -97,7 +97,7 @@ def test_csv_format_analog_channels(demo_client: SigrokClient): result = demo_client.capture(cfg) assert isinstance(result, CaptureResult) assert isinstance(result.data, bytes) - decoded_data = result.decode() + decoded_data = list(result.decode()) assert isinstance(decoded_data, list) assert len(decoded_data) > 0 @@ -121,7 +121,7 @@ def test_csv_format_mixed_channels(demo_client: SigrokClient): ) result = demo_client.capture(cfg) - samples = result.decode() + samples = list(result.decode()) assert isinstance(samples, list) assert len(samples) > 0 diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py index f6db8e066..4c25b0f1d 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py @@ -99,7 +99,7 @@ def test_capture_default_format(demo_client): assert len(result.data) > 0 # Verify VCD data can be decoded - samples = result.decode() + samples = list(result.decode()) assert isinstance(samples, list) assert len(samples) > 0 @@ -290,7 +290,7 @@ def test_decode_csv_format(demo_client): assert isinstance(result, CaptureResult) # Decode the CSV data - samples = result.decode() + samples = list(result.decode()) assert isinstance(samples, list) assert len(samples) > 0 @@ -395,7 +395,7 @@ def test_decode_vcd_format(demo_client): assert isinstance(result, CaptureResult) # Decode the VCD data - samples = result.decode() + samples = list(result.decode()) assert isinstance(samples, list) assert len(samples) > 0 @@ -432,7 +432,7 @@ def test_decode_vcd_analog_channels(demo_client): ) result = demo_client.capture(cfg) - samples = result.decode() + samples = list(result.decode()) assert isinstance(samples, list) assert len(samples) > 0 @@ -480,7 +480,7 @@ def test_decode_analog_csv(demo_client): ) result = demo_client.capture(cfg) - samples = result.decode() + samples = list(result.decode()) assert isinstance(samples, list) assert len(samples) > 0 diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd.py index c777e79d0..ed0f6bd70 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd.py @@ -2,9 +2,11 @@ from __future__ import annotations +from typing import Iterator -def parse_vcd(data: bytes, sample_rate: str) -> list[dict]: - """Parse VCD format to list of samples with timing (changes only). + +def parse_vcd(data: bytes, sample_rate: str) -> Iterator[dict]: + """Parse VCD format to iterator of samples with timing (changes only). VCD format only records when signals change, making it efficient for sparse data. Each sample represents a time point where one or more @@ -14,8 +16,8 @@ def parse_vcd(data: bytes, sample_rate: str) -> list[dict]: data: Raw VCD data as bytes sample_rate: Sample rate string (not used for VCD as it has its own timescale) - Returns: - List of dicts with keys: sample, time_ns, values + Yields: + Dicts with keys: sample, time_ns, values """ text = data.decode("utf-8") lines = text.strip().split("\n") @@ -42,8 +44,7 @@ def parse_vcd(data: bytes, sample_rate: str) -> list[dict]: if line == "$enddefinitions $end": break - # Parse value changes - samples: list[dict] = [] + # Parse and yield value changes one by one sample_idx = 0 for line in lines: @@ -56,11 +57,9 @@ def parse_vcd(data: bytes, sample_rate: str) -> list[dict]: sample_data = _parse_vcd_timestamp_line(line, timescale_multiplier, channel_map) if sample_data is not None: sample_data["sample"] = sample_idx - samples.append(sample_data) + yield sample_data sample_idx += 1 - return samples - def _parse_timescale(line: str) -> int: """Parse timescale line and return multiplier to convert to nanoseconds.""" diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd_test.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd_test.py index 6af6825d6..ef4c67fbd 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd_test.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd_test.py @@ -57,7 +57,7 @@ def test_vcd_parser_comprehensive(): ) # Parse the VCD - samples = result.decode() + samples = list(result.decode()) # Verify we got the expected number of samples assert len(samples) == 5 @@ -135,7 +135,7 @@ def test_vcd_parser_timescale_variations(): channel_map={"D0": "d0"}, ) - samples = result.decode() + samples = list(result.decode()) assert len(samples) >= 1 # First sample at time 0 assert samples[0].time_ns == 0 @@ -159,7 +159,7 @@ def test_vcd_parser_empty_timestamps(): channel_map={"D0": "d0"}, ) - samples = result.decode() + samples = list(result.decode()) # Should have 3 samples (empty timestamp line skipped) assert len(samples) == 3 assert samples[0].time_ns == 0 @@ -204,7 +204,7 @@ def test_vcd_parser_large_channel_count(): }, ) - samples = result.decode() + samples = list(result.decode()) # Verify first sample assert len(samples) == 2 From 7db5b1a40fd660f7c73fa63237034cb2f835a10f Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Mon, 8 Dec 2025 22:08:14 +0100 Subject: [PATCH 08/10] sigrok: channel mapping config fix also, VCD tests should not expect channel mapping when not interacting with sigrok-cli, since sigrok-cli is the one performing mappings. --- .../examples/exporter.yaml | 21 +++++++------- .../jumpstarter_driver_sigrok/common.py | 5 ++++ .../jumpstarter_driver_sigrok/csv_test.py | 3 +- .../jumpstarter_driver_sigrok/driver.py | 4 +-- .../jumpstarter_driver_sigrok/vcd_test.py | 29 +++++-------------- 5 files changed, 27 insertions(+), 35 deletions(-) diff --git a/packages/jumpstarter-driver-sigrok/examples/exporter.yaml b/packages/jumpstarter-driver-sigrok/examples/exporter.yaml index 847a99b29..5400150af 100644 --- a/packages/jumpstarter-driver-sigrok/examples/exporter.yaml +++ b/packages/jumpstarter-driver-sigrok/examples/exporter.yaml @@ -8,14 +8,15 @@ token: "" export: sigrok: type: jumpstarter_driver_sigrok.driver.Sigrok - driver: demo - conn: null - channels: - D0: vcc - D1: cs - D2: miso - D3: mosi - D4: clk - D5: sda - D6: scl + config: + driver: demo + conn: null + channels: + D0: vcc + D1: cs + D2: miso + D3: mosi + D4: clk + D5: sda + D6: scl diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py index 9dddd405d..70c24ca96 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py @@ -79,6 +79,11 @@ def decode(self) -> Iterator[Sample] | dict[str, list[int]] | str: - ASCII format: str with ASCII art visualization - Other formats: raises NotImplementedError (use .data for raw bytes) + Note: + Channel names in the output depend on how the data was captured: + - If captured with channel mapping, sigrok-cli outputs mapped names (vcc, cs, etc.) + - If captured without mapping, outputs device names (D0, D1, etc.) + Raises: NotImplementedError: For binary/srzip formats (use .data property) """ diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv_test.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv_test.py index de61720a0..ff944d45a 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv_test.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv_test.py @@ -54,7 +54,8 @@ def test_csv_format_basic(demo_client: SigrokClient): decoded_data = list(result.decode()) assert isinstance(decoded_data, list) assert len(decoded_data) > 0 - # Verify channel names are in the data + # CSV format uses inferred names (D0, D1, etc.) based on column types + # Channel mapping is only preserved in VCD format first_sample = decoded_data[0] assert "D0" in first_sample.values or "D1" in first_sample.values diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py index 88bca75a1..ec1765afd 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py @@ -66,7 +66,7 @@ def get_driver_info(self) -> dict: } @export - def get_channel_map(self) -> dict[int, str]: + def get_channel_map(self) -> dict[str, str]: return self.channels @export @@ -81,7 +81,7 @@ def capture(self, config: CaptureConfig | dict) -> dict: cmd, outfile, tmpdir = self._build_capture_command(cfg) try: - self.logger.debug("running sigrok-cli: %s", " ".join(cmd)) + self.logger.debug("Running sigrok-cli: %s", " ".join(cmd)) subprocess.run(cmd, check=True) data = outfile.read_bytes() diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd_test.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd_test.py index ef4c67fbd..032dfee07 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd_test.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd_test.py @@ -38,20 +38,12 @@ def test_vcd_parser_comprehensive(): #100 0! 0" 0# r1.23e-5 xyz """ - # Create a CaptureResult with this VCD data + # Create a CaptureResult with this VCD data (no channel mapping for parser test) result = CaptureResult( data_b64=b64encode(vcd_content.encode("utf-8")).decode("ascii"), output_format=OutputFormat.VCD, sample_rate="1MHz", - channel_map={ - "D0": "d0", - "D1": "d1", - "D2": "d2", - "CH95": "ch95", - "CH96": "ch96", - "BUS0": "bus", - "ANALOG0": "analog", - }, + channel_map={}, triggers=None, decoders=None, ) @@ -65,6 +57,7 @@ def test_vcd_parser_comprehensive(): # Sample 0 at time 0us = 0ns s0 = samples[0] assert s0.time_ns == 0 + # Channel names come directly from VCD (not mapped) assert s0.values["D0"] == 1 assert s0.values["D1"] == 0 assert s0.values["D2"] == 1 @@ -132,7 +125,7 @@ def test_vcd_parser_timescale_variations(): data_b64=b64encode(vcd_content.encode("utf-8")).decode("ascii"), output_format=OutputFormat.VCD, sample_rate="1MHz", - channel_map={"D0": "d0"}, + channel_map={}, ) samples = list(result.decode()) @@ -156,7 +149,7 @@ def test_vcd_parser_empty_timestamps(): data_b64=b64encode(vcd_content.encode("utf-8")).decode("ascii"), output_format=OutputFormat.VCD, sample_rate="1MHz", - channel_map={"D0": "d0"}, + channel_map={}, ) samples = list(result.decode()) @@ -193,20 +186,12 @@ def test_vcd_parser_large_channel_count(): data_b64=b64encode(vcd_content.encode("utf-8")).decode("ascii"), output_format=OutputFormat.VCD, sample_rate="1MHz", - channel_map={ - "CH0": "ch0", - "CH93": "ch93", - "CH94": "ch94", - "CH95": "ch95", - "CH769": "ch769", - "CH770": "ch770", - "CH800": "ch800", - }, + channel_map={}, ) samples = list(result.decode()) - # Verify first sample + # Verify first sample (channel names come directly from VCD) assert len(samples) == 2 s0 = samples[0] assert isinstance(s0, Sample) From 7c3dbaa812c6d91de6f854f5b8560515ecb9058d Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Mon, 8 Dec 2025 22:39:21 +0100 Subject: [PATCH 09/10] sigrok: auto instead of null, and better time print --- packages/jumpstarter-driver-sigrok/README.md | 10 ++--- .../examples/exporter.yaml | 2 +- .../jumpstarter_driver_sigrok/common.py | 44 ++++++++++++++++++- .../jumpstarter_driver_sigrok/csv.py | 6 +-- .../jumpstarter_driver_sigrok/csv_test.py | 6 +-- .../jumpstarter_driver_sigrok/driver.py | 4 +- .../jumpstarter_driver_sigrok/driver_test.py | 14 +++--- .../jumpstarter_driver_sigrok/vcd.py | 26 +++++------ .../jumpstarter_driver_sigrok/vcd_test.py | 32 +++++++------- 9 files changed, 93 insertions(+), 51 deletions(-) diff --git a/packages/jumpstarter-driver-sigrok/README.md b/packages/jumpstarter-driver-sigrok/README.md index 4e42a404b..13344e058 100644 --- a/packages/jumpstarter-driver-sigrok/README.md +++ b/packages/jumpstarter-driver-sigrok/README.md @@ -19,7 +19,7 @@ export: sigrok: type: jumpstarter_driver_sigrok.driver.Sigrok driver: fx2lafw # sigrok driver (demo, fx2lafw, rigol-ds, etc.) - conn: null # optional: USB VID.PID, serial path, or null for auto + conn: auto # optional: USB VID.PID, serial path, or "auto" for auto-detect channels: # optional: map device channels to friendly names D0: clk D1: mosi @@ -32,7 +32,7 @@ export: | Parameter | Description | Type | Required | Default | |-----------|-------------|------|----------|---------| | `driver` | Sigrok driver name (e.g., `demo`, `fx2lafw`, `rigol-ds`) | str | yes | - | -| `conn` | Connection string (USB VID.PID, serial path, or `null` for auto-detect) | str \| None | no | None | +| `conn` | Connection string (USB VID.PID, serial path, or `"auto"` for auto-detect) | str \| None | no | "auto" | | `executable` | Path to `sigrok-cli` executable | str | no | Auto-detected from PATH | | `channels` | Channel mapping from device names (D0, A0) to semantic names (clk, voltage) | dict[str, str] | no | {} (empty) | @@ -106,7 +106,7 @@ result = client.capture(config) # Decode VCD to get samples with timing samples = result.decode() # list[Sample] for sample in samples[:5]: - print(f"Time: {sample.time_ns}ns, Values: {sample.values}") + print(f"Time: {sample.time}s, Values: {sample.values}") ``` **Equivalent sigrok-cli command:** @@ -141,7 +141,7 @@ print(f"Captured {len(samples)} signal changes") # Access timing and values for sample in samples[:3]: - print(f"Time: {sample.time_ns}ns, Changed: {sample.values}") + print(f"Time: {sample.time}s, Changed: {sample.values}") ``` **Equivalent sigrok-cli command:** @@ -184,7 +184,7 @@ result = client.capture(config) # Parse voltage data samples = result.decode() # list[Sample] for sample in samples[:5]: - print(f"Time: {sample.time_ns}ns") + print(f"Time: {sample.time}s") print(f" CH1: {sample.values.get('A0', 'N/A')}V") print(f" CH2: {sample.values.get('A1', 'N/A')}V") ``` diff --git a/packages/jumpstarter-driver-sigrok/examples/exporter.yaml b/packages/jumpstarter-driver-sigrok/examples/exporter.yaml index 5400150af..7e1029ea9 100644 --- a/packages/jumpstarter-driver-sigrok/examples/exporter.yaml +++ b/packages/jumpstarter-driver-sigrok/examples/exporter.yaml @@ -10,7 +10,7 @@ export: type: jumpstarter_driver_sigrok.driver.Sigrok config: driver: demo - conn: null + conn: auto channels: D0: vcc D1: cs diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py index 70c24ca96..cc683a694 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py @@ -22,9 +22,51 @@ def all(cls) -> list[str]: class Sample(BaseModel): """A single sample with timing information.""" sample: int # Sample index - time_ns: int # Time in nanoseconds + time: float # Time in seconds (full precision) values: dict[str, int | float] # Channel values (digital: 0/1, analog: voltage) + def __str__(self) -> str: + """Format sample with clean time display using appropriate unit (fs/ps/ns/μs/ms/s).""" + time_str = self._format_time(self.time) + return f"Sample(sample={self.sample}, time={time_str}, values={self.values})" + + @staticmethod + def _format_time(time_s: float) -> str: + """Format time in seconds to the most appropriate unit. + + Args: + time_s: Time in seconds + + Returns: + Formatted string like "1.5ns", "2.3μs", "1.5ms", "2s" + """ + # Special case for zero + if time_s == 0: + return "0s" + + abs_time = abs(time_s) + + # Define units in descending order (seconds to femtoseconds) + units = [ + (1.0, "s"), + (1e-3, "ms"), + (1e-6, "μs"), + (1e-9, "ns"), + (1e-12, "ps"), + (1e-15, "fs"), + ] + + # Find the most appropriate unit + for scale, unit in units: + if abs_time >= scale or scale == 1e-15: # Use fs as minimum + value = time_s / scale + # Format with up to 6 significant digits, remove trailing zeros + formatted = f"{value:.6g}" + return f"{formatted}{unit}" + + # Fallback (should never reach here) + return f"{time_s:.6g}s" + class DecoderConfig(BaseModel): """Protocol decoder configuration (real-time during capture).""" diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv.py index cda197c80..a0cbdde8a 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv.py @@ -14,14 +14,14 @@ def parse_csv(data: bytes, sample_rate: str) -> Iterator[dict]: sample_rate: Sample rate string (e.g., "100kHz", "1MHz") Yields: - Dicts with keys: sample, time_ns, values + Dicts with keys: sample, time (seconds), values """ text = data.decode("utf-8") lines = text.strip().split("\n") # Parse sample rate for timing calculation sample_rate_hz = _parse_sample_rate_hz(sample_rate) - time_step_ns = int(1_000_000_000.0 / sample_rate_hz) + time_step_s = 1.0 / sample_rate_hz # seconds per sample # Skip comment lines and analog preview lines (format: "A0: -10.0000 V DC") # The actual data starts after a header row with types like "logic,logic,V DC,V DC" @@ -44,7 +44,7 @@ def parse_csv(data: bytes, sample_rate: str) -> Iterator[dict]: values = _parse_csv_row(channel_names, row) yield { "sample": idx, - "time_ns": idx * time_step_ns, + "time": idx * time_step_s, "values": values, } diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv_test.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv_test.py index ff944d45a..e15e545a0 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv_test.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/csv_test.py @@ -80,9 +80,9 @@ def test_csv_format_timing(demo_client: SigrokClient): # Verify timing progresses correctly for sample in samples: - assert isinstance(sample.time_ns, int) - # Verify timing progresses (1/100kHz = 10,000ns per sample) - assert sample.time_ns == sample.sample * 10_000 + assert isinstance(sample.time, float) + # Verify timing progresses (1/100kHz = 0.00001s per sample) + assert sample.time == sample.sample * 0.00001 @pytest.mark.skipif(which("sigrok-cli") is None, reason="sigrok-cli not installed") diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py index ec1765afd..35c7b88d0 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver.py @@ -26,7 +26,7 @@ class Sigrok(Driver): """Sigrok driver wrapping sigrok-cli for logic analyzer and oscilloscope support.""" driver: str = "demo" - conn: str | None = None + conn: str | None = "auto" executable: str | None = field(default_factory=find_sigrok_cli) channels: dict[str, str] = field(default_factory=dict) @@ -154,7 +154,7 @@ def _build_stream_command(self, cfg: CaptureConfig) -> list[str]: def _base_driver_args(self) -> list[str]: assert self.executable is not None - if self.conn: + if self.conn and self.conn != "auto": return [self.executable, "-d", f"{self.driver}:conn={self.conn}"] return [self.executable, "-d", self.driver] diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py index 4c25b0f1d..087490e61 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/driver_test.py @@ -105,8 +105,8 @@ def test_capture_default_format(demo_client): # Verify samples have timing information (VCD feature) for sample in samples: - assert hasattr(sample, "time_ns") - assert isinstance(sample.time_ns, int) + assert hasattr(sample, "time") + assert isinstance(sample.time, float) assert hasattr(sample, "values") assert isinstance(sample.values, dict) @@ -298,11 +298,11 @@ def test_decode_csv_format(demo_client): for sample in samples: assert isinstance(sample, Sample) assert isinstance(sample.sample, int) - assert isinstance(sample.time_ns, int) + assert isinstance(sample.time, float) assert isinstance(sample.values, dict) - # Verify timing progresses (1/100kHz = 10,000ns per sample) - assert sample.time_ns == sample.sample * 10_000 + # Verify timing progresses (1/100kHz = 0.00001s per sample) + assert sample.time == sample.sample * 0.00001 # Verify values are present assert len(sample.values) > 0 @@ -403,7 +403,7 @@ def test_decode_vcd_format(demo_client): for sample in samples: assert isinstance(sample, Sample) assert isinstance(sample.sample, int) - assert isinstance(sample.time_ns, int) + assert isinstance(sample.time, float) assert isinstance(sample.values, dict) # VCD only records changes, so each sample should have at least one value @@ -440,7 +440,7 @@ def test_decode_vcd_analog_channels(demo_client): # Check that samples have analog values first_sample = samples[0] assert isinstance(first_sample, Sample) - assert isinstance(first_sample.time_ns, int) + assert isinstance(first_sample.time, float) assert len(first_sample.values) > 0 diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd.py index ed0f6bd70..cd2551906 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd.py @@ -17,13 +17,13 @@ def parse_vcd(data: bytes, sample_rate: str) -> Iterator[dict]: sample_rate: Sample rate string (not used for VCD as it has its own timescale) Yields: - Dicts with keys: sample, time_ns, values + Dicts with keys: sample, time (seconds), values """ text = data.decode("utf-8") lines = text.strip().split("\n") # Parse VCD header to extract timescale and channel mapping - timescale_multiplier = 1 # Default: 1 unit = 1 ns + timescale_multiplier = 1e-9 # Default: 1 unit = 1 ns = 1e-9 seconds channel_map: dict[str, str] = {} # symbol → channel name for line in lines: @@ -61,28 +61,28 @@ def parse_vcd(data: bytes, sample_rate: str) -> Iterator[dict]: sample_idx += 1 -def _parse_timescale(line: str) -> int: - """Parse timescale line and return multiplier to convert to nanoseconds.""" +def _parse_timescale(line: str) -> float: + """Parse timescale line and return multiplier to convert to seconds.""" parts = line.split() if len(parts) >= 3: value = parts[1] unit = parts[2] - # Convert to nanoseconds multiplier - unit_multipliers = {"s": 1e9, "ms": 1e6, "us": 1e3, "ns": 1, "ps": 1e-3} - return int(float(value) * unit_multipliers.get(unit, 1)) - return 1 + # Convert to seconds multiplier + unit_multipliers = {"s": 1.0, "ms": 1e-3, "us": 1e-6, "ns": 1e-9, "ps": 1e-12} + return float(value) * unit_multipliers.get(unit, 1.0) + return 1.0 -def _parse_vcd_timestamp_line(line: str, timescale_multiplier: int, channel_map: dict[str, str]) -> dict | None: +def _parse_vcd_timestamp_line(line: str, timescale_multiplier: float, channel_map: dict[str, str]) -> dict | None: """Parse a VCD timestamp line with value changes. Args: line: Line starting with # (e.g., "#100 1! 0" 1#") - timescale_multiplier: Multiplier to convert time units to nanoseconds + timescale_multiplier: Multiplier to convert time units to seconds channel_map: Mapping from VCD symbols to channel names Returns: - Dict with time_ns and values, or None if line is empty + Dict with time (seconds) and values, or None if line is empty """ # Split timestamp from values parts = line.split(maxsplit=1) @@ -93,7 +93,7 @@ def _parse_vcd_timestamp_line(line: str, timescale_multiplier: int, channel_map: return None time_units = int(time_str) - current_time_ns = time_units * timescale_multiplier + current_time_s = time_units * timescale_multiplier current_values: dict[str, int | float] = {} # Parse value changes if present on the same line @@ -103,7 +103,7 @@ def _parse_vcd_timestamp_line(line: str, timescale_multiplier: int, channel_map: # Return sample data if we have values if current_values: - return {"time_ns": current_time_ns, "values": current_values} + return {"time": current_time_s, "values": current_values} return None diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd_test.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd_test.py index 032dfee07..bf0d829c4 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd_test.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/vcd_test.py @@ -54,9 +54,9 @@ def test_vcd_parser_comprehensive(): # Verify we got the expected number of samples assert len(samples) == 5 - # Sample 0 at time 0us = 0ns + # Sample 0 at time 0us = 0s s0 = samples[0] - assert s0.time_ns == 0 + assert s0.time == 0.0 # Channel names come directly from VCD (not mapped) assert s0.values["D0"] == 1 assert s0.values["D1"] == 0 @@ -66,26 +66,26 @@ def test_vcd_parser_comprehensive(): assert s0.values["BUS0"] == 0b00001111 # Binary value assert abs(s0.values["ANALOG0"] - (-10.5)) < 0.001 # Real value - # Sample 1 at time 5us = 5000ns + # Sample 1 at time 5us = 0.000005s s1 = samples[1] - assert s1.time_ns == 5000 + assert abs(s1.time - 0.000005) < 1e-12 assert s1.values["D0"] == 0 assert s1.values["D1"] == 1 assert s1.values["D2"] == 0 # X converted to 0 assert s1.values["CH95"] == 1 - # Sample 2 at time 10us = 10000ns + # Sample 2 at time 10us = 0.00001s s2 = samples[2] - assert s2.time_ns == 10000 + assert abs(s2.time - 0.00001) < 1e-12 assert s2.values["D0"] == 0 # Z converted to 0 assert s2.values["D1"] == 0 assert s2.values["D2"] == 1 assert s2.values["BUS0"] == 0b11110000 assert abs(s2.values["ANALOG0"] - 3.14159) < 0.001 - # Sample 3 at time 25us = 25000ns + # Sample 3 at time 25us = 0.000025s s3 = samples[3] - assert s3.time_ns == 25000 + assert abs(s3.time - 0.000025) < 1e-12 assert s3.values["D0"] == 1 assert s3.values["D1"] == 1 assert s3.values["D2"] == 0 @@ -94,9 +94,9 @@ def test_vcd_parser_comprehensive(): assert s3.values["BUS0"] == 0b10101010 assert abs(s3.values["ANALOG0"] - 0.0) < 0.001 - # Sample 4 at time 100us = 100000ns + # Sample 4 at time 100us = 0.0001s s4 = samples[4] - assert s4.time_ns == 100000 + assert abs(s4.time - 0.0001) < 1e-12 assert s4.values["D0"] == 0 assert s4.values["D1"] == 0 assert s4.values["D2"] == 0 @@ -131,7 +131,7 @@ def test_vcd_parser_timescale_variations(): samples = list(result.decode()) assert len(samples) >= 1 # First sample at time 0 - assert samples[0].time_ns == 0 + assert samples[0].time == 0.0 def test_vcd_parser_empty_timestamps(): @@ -155,9 +155,9 @@ def test_vcd_parser_empty_timestamps(): samples = list(result.decode()) # Should have 3 samples (empty timestamp line skipped) assert len(samples) == 3 - assert samples[0].time_ns == 0 - assert samples[1].time_ns == 10 - assert samples[2].time_ns == 20 + assert samples[0].time == 0.0 + assert samples[1].time == 1e-8 # 10ns + assert samples[2].time == 2e-8 # 20ns def test_vcd_parser_large_channel_count(): @@ -195,7 +195,7 @@ def test_vcd_parser_large_channel_count(): assert len(samples) == 2 s0 = samples[0] assert isinstance(s0, Sample) - assert s0.time_ns == 0 + assert s0.time == 0.0 assert s0.values["CH0"] == 1 # Single char: ! assert s0.values["CH93"] == 0 # Single char: ~ assert s0.values["CH94"] == 1 # Two char: aa @@ -206,7 +206,7 @@ def test_vcd_parser_large_channel_count(): # Verify second sample s1 = samples[1] - assert s1.time_ns == 100 + assert abs(s1.time - 1e-7) < 1e-15 # 100ns assert s1.values["CH0"] == 0 assert s1.values["CH93"] == 1 assert s1.values["CH94"] == 0 From fbf0c007feac1b391a33212f735900202ecc7544 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Mon, 8 Dec 2025 22:48:03 +0100 Subject: [PATCH 10/10] sigrok: better CaptureResult output --- .../jumpstarter_driver_sigrok/common.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py index cc683a694..bffbb97e0 100644 --- a/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py +++ b/packages/jumpstarter-driver-sigrok/jumpstarter_driver_sigrok/common.py @@ -105,6 +105,23 @@ class CaptureResult(BaseModel): triggers: dict[str, str] | None = None decoders: list[DecoderConfig] | None = None + def __str__(self) -> str: + """Format CaptureResult with truncated data_b64 field.""" + data_len = len(self.data_b64) + if data_len <= 50: + data_preview = self.data_b64 + else: + # Show first 50 and last 50 chars with ellipsis + data_preview = f"{self.data_b64[:25]}...{self.data_b64[-25:]} ({data_len} chars)" + + return ( + f"CaptureResult(output_format='{self.output_format}', " + f"sample_rate='{self.sample_rate}', " + f"data_size={len(self.data)} bytes, " + f"channels={len(self.channel_map)}, " + f"data_b64='{data_preview}')" + ) + @property def data(self) -> bytes: """Get the captured data as bytes (auto-decodes from base64)."""