Skip to content

nidaqlib

nidaqlib

nidaqlib — Experiment-facing NI-DAQmx acquisition layer.

nidaqlib is not a replacement for NI's nidaqmx-python. It is a typed, lifecycle-managed acquisition layer built on top of it, designed to fit the same scientific-instrumentation ecosystem as alicatlib and sartoriuslib.

Core API is async (built on anyio); a sync facade is available at :mod:nidaqlib.sync for scripts, notebooks, and REPL use.

See docs/design.md for the architectural design.

AcquisitionMode

Bases: StrEnum

Sample-clock acquisition mode.

Mirrors a subset of nidaqmx.constants.AcquisitionType. Kept as a library-side enum so :class:TaskSpec round-trips through JSON without pulling NI's enum machinery into the serialisation layer.

ON_DEMAND class-attribute instance-attribute

ON_DEMAND = 'on_demand'

Software-timed; no hardware sample clock is configured.

AcquisitionSummary dataclass

AcquisitionSummary(
    blocks_emitted=0,
    blocks_dropped=0,
    errors_observed=0,
    started_at=(lambda: datetime.now(UTC))(),
    finished_at=None,
)

Per-run counters, yielded alongside the block stream.

Mirrors sartoriuslib.AcquisitionSummary shape but is intentionally mutable: counters are updated in place during the run so consumers can poll progress (e.g. for a TUI bar) and read final counts after exit. The recorder is the only writer; consumers MUST treat the object as read-only.

Attributes:

Name Type Description
blocks_emitted int

Total :class:DaqBlock records sent into the outbound stream.

blocks_dropped int

Records dropped because of an :class:OverflowPolicy.DROP_* decision.

errors_observed int

Wrapped NI errors seen during the run, regardless of :class:ErrorPolicy.

started_at datetime

Wall-clock at recorder entry.

finished_at datetime | None

Wall-clock at recorder exit. None while the recorder is still running.

AnalogEdgeStartTrigger dataclass

AnalogEdgeStartTrigger(
    *, source, level, slope=AnalogTriggerSlope.RISING
)

Bases: TriggerSpec

Start the task when an analog channel crosses level.

Maps to task.triggers.start_trigger.cfg_anlg_edge_start_trig.

Attributes:

Name Type Description
level float

Threshold level, in the source channel's engineering units.

slope AnalogTriggerSlope

Active slope (rising / falling). Rising by default.

from_dict classmethod

from_dict(data)

Deserialise, restoring :class:AnalogTriggerSlope from its value.

Source code in src/nidaqlib/tasks/triggers.py
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> Self:
    """Deserialise, restoring :class:`AnalogTriggerSlope` from its value."""
    kind = data.get("kind")
    if kind != cls.kind:
        raise NIDaqValidationError(f"kind mismatch: expected {cls.kind!r}, got {kind!r}")
    try:
        slope = AnalogTriggerSlope(data.get("slope", AnalogTriggerSlope.RISING.value))
    except ValueError as exc:
        raise NIDaqValidationError(f"unknown AnalogTriggerSlope {data.get('slope')!r}") from exc
    return cls(
        source=str(data["source"]),
        level=float(data["level"]),
        slope=slope,
    )

to_dict

to_dict()

Serialise; encode :class:AnalogTriggerSlope to its string value.

Source code in src/nidaqlib/tasks/triggers.py
def to_dict(self) -> dict[str, Any]:
    """Serialise; encode :class:`AnalogTriggerSlope` to its string value."""
    return {
        "kind": self.kind,
        "source": self.source,
        "level": self.level,
        "slope": self.slope.value,
    }

AnalogInputBase dataclass

AnalogInputBase(
    *,
    physical_channel,
    name=None,
    unit=None,
    metadata=_empty_metadata(),
    adc_timing_mode=None,
    adc_custom_timing_mode=None,
    auto_zero_mode=None,
)

Bases: ChannelSpec

Shared base for analog-input channel specs.

Carries the per-channel knobs NI exposes as channel properties on the object returned by add_ai_*_chan(...) — currently ADC timing mode and auto-zero mode. Hardware support is module-specific: NI surfaces unsupported attributes as a DaqError at set time, which the backend re-raises as :class:~nidaqlib.errors.NIDaqBackendError.

Attributes:

Name Type Description
adc_timing_mode ADCTimingMode | None

One of :class:nidaqmx.constants.ADCTimingMode, or None to leave the device default in place. Trades conversion rate against resolution and configures line-frequency rejection on delta-sigma modules. Choose HIGH_RESOLUTION for slow / high-precision work, HIGH_SPEED for throughput, BEST_50_HZ_REJECTION / BEST_60_HZ_REJECTION to suppress mains-frequency hum, or CUSTOM to address a device-specific timing mode via :attr:adc_custom_timing_mode.

adc_custom_timing_mode int | None

Device-specific integer code, only meaningful when adc_timing_mode is ADCTimingMode.CUSTOM. Required in that case; rejected otherwise.

auto_zero_mode AutoZeroType | None

One of :class:nidaqmx.constants.AutoZeroType, or None to leave the device default in place. ONCE performs a single auto-zero at acquisition start (the most common useful setting); EVERY_SAMPLE autozeros each conversion at the cost of throughput; NONE skips auto-zero entirely.

__post_init__

__post_init__()

Validate the ADC-timing pairing on top of the base channel checks.

Source code in src/nidaqlib/channels/analog_input.py
def __post_init__(self) -> None:
    """Validate the ADC-timing pairing on top of the base channel checks."""
    # Direct call (not super()) — the @dataclass(slots=True) decorator
    # rewrites the class, which leaves super()'s __class__ cell pointing
    # at a now-unrelated class object.
    ChannelSpec.__post_init__(self)
    from nidaqmx.constants import ADCTimingMode  # noqa: PLC0415

    is_custom = self.adc_timing_mode is ADCTimingMode.CUSTOM
    if self.adc_custom_timing_mode is not None and not is_custom:
        raise NIDaqValidationError(
            f"adc_custom_timing_mode is only valid with "
            f"adc_timing_mode=ADCTimingMode.CUSTOM on {self.display_name!r}; "
            f"got adc_timing_mode={self.adc_timing_mode!r}"
        )
    if is_custom and self.adc_custom_timing_mode is None:
        raise NIDaqValidationError(
            f"adc_timing_mode=ADCTimingMode.CUSTOM requires adc_custom_timing_mode "
            f"on {self.display_name!r}"
        )

to_dict

to_dict()

Serialise via each enum's .value so the payload is JSON-encodable.

Source code in src/nidaqlib/channels/analog_input.py
def to_dict(self) -> dict[str, Any]:
    """Serialise via each enum's ``.value`` so the payload is JSON-encodable."""
    # Direct call (not super()) — see __post_init__ note.
    payload = ChannelSpec.to_dict(self)
    payload["adc_timing_mode"] = (
        self.adc_timing_mode.value if self.adc_timing_mode is not None else None
    )
    payload["auto_zero_mode"] = (
        self.auto_zero_mode.value if self.auto_zero_mode is not None else None
    )
    return payload

AnalogInputVoltage dataclass

AnalogInputVoltage(
    *,
    physical_channel,
    name=None,
    unit=None,
    metadata=_empty_metadata(),
    adc_timing_mode=None,
    adc_custom_timing_mode=None,
    auto_zero_mode=None,
    min_val=-10.0,
    max_val=10.0,
    terminal_config=None,
    custom_scale_name=None,
)

Bases: AnalogInputBase

Voltage analog-input channel.

Maps to Task.ai_channels.add_ai_voltage_chan on the NI side.

Attributes:

Name Type Description
min_val float

Lower limit of the expected input range, in volts.

max_val float

Upper limit of the expected input range, in volts. The NI driver uses the (min, max) range to select the most appropriate on-board gain.

terminal_config TerminalConfiguration | None

Terminal configuration (RSE / NRSE / DIFF / PSEUDO_DIFF). None lets NI pick the device default.

custom_scale_name str | None

Optional name of a pre-configured custom scale registered in MAX. When set, min_val/max_val are scaled engineering units, not volts.

Inherits :attr:adc_timing_mode and :attr:adc_custom_timing_mode from :class:AnalogInputBase.

__post_init__

__post_init__()

Validate the voltage range.

Source code in src/nidaqlib/channels/analog_input.py
def __post_init__(self) -> None:
    """Validate the voltage range."""
    AnalogInputBase.__post_init__(self)
    if self.min_val >= self.max_val:
        raise NIDaqValidationError(
            f"min_val must be < max_val for {self.display_name!r}; "
            f"got {self.min_val!r} >= {self.max_val!r}"
        )

from_dict classmethod

from_dict(data)

Reconstruct, restoring enum members from their serialised .value ints.

Source code in src/nidaqlib/channels/analog_input.py
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> Self:
    """Reconstruct, restoring enum members from their serialised ``.value`` ints."""
    kind = data.get("kind")
    if kind != cls.kind:
        raise NIDaqValidationError(f"kind mismatch: expected {cls.kind!r}, got {kind!r}")
    payload = {k: v for k, v in data.items() if k != "kind"}
    AnalogInputBase._restore_base_enums(payload)
    if payload.get("terminal_config") is not None:
        payload["terminal_config"] = _coerce_terminal_config(payload["terminal_config"])
    return cls(**payload)

to_dict

to_dict()

Serialise, encoding terminal_config via its .value int.

Source code in src/nidaqlib/channels/analog_input.py
def to_dict(self) -> dict[str, Any]:
    """Serialise, encoding ``terminal_config`` via its ``.value`` int."""
    payload = AnalogInputBase.to_dict(self)
    payload["terminal_config"] = (
        self.terminal_config.value if self.terminal_config is not None else None
    )
    return payload

AnalogOutputVoltage dataclass

AnalogOutputVoltage(
    *,
    physical_channel,
    name=None,
    unit=None,
    metadata=_empty_metadata(),
    min_val=-10.0,
    max_val=10.0,
    safe_min=None,
    safe_max=None,
    requires_confirm=True,
    terminal_config=None,
    custom_scale_name=None,
)

Bases: ChannelSpec

Voltage analog-output channel.

Maps to Task.ao_channels.add_ao_voltage_chan on the NI side. Writes are gated through :meth:DaqSession.write, which rejects out-of-range values against safe_min / safe_max and requires confirm=True whenever any target channel sets requires_confirm (design doc §17.1).

Attributes:

Name Type Description
min_val float

Lower bound of the device output range, in volts.

max_val float

Upper bound of the device output range, in volts. NI uses (min, max) to select the output gain.

safe_min float | None

Optional lower-end safety clamp for application writes. None means "use min_val as the clamp." Out-of-range writes raise :class:NIDaqValidationError — never silently clamped.

safe_max float | None

Optional upper-end safety clamp. None means "use max_val."

requires_confirm bool

When True, every :meth:DaqSession.write targeting this channel must pass confirm=True. Defaults to True — outputs default to safe.

terminal_config TerminalConfiguration | None

Terminal configuration (RSE / DIFF / ...). None lets NI pick the device default.

custom_scale_name str | None

Optional name of a pre-configured custom scale registered in MAX. When set, min_val / max_val are engineering units, not volts.

effective_safe_max property

effective_safe_max

Resolved upper clamp — falls back to :attr:max_val.

effective_safe_min property

effective_safe_min

Resolved lower clamp — falls back to :attr:min_val.

__post_init__

__post_init__()

Validate the output and safety ranges.

Source code in src/nidaqlib/channels/analog_output.py
def __post_init__(self) -> None:
    """Validate the output and safety ranges."""
    ChannelSpec.__post_init__(self)
    if self.min_val >= self.max_val:
        raise NIDaqValidationError(
            f"min_val must be < max_val for {self.display_name!r}; "
            f"got {self.min_val!r} >= {self.max_val!r}"
        )
    lo = self.effective_safe_min
    hi = self.effective_safe_max
    if lo > hi:
        raise NIDaqValidationError(
            f"safe_min must be <= safe_max for {self.display_name!r}; got {lo!r} > {hi!r}"
        )
    if lo < self.min_val or hi > self.max_val:
        raise NIDaqValidationError(
            f"safe range [{lo}, {hi}] must stay inside device range "
            f"[{self.min_val}, {self.max_val}] for {self.display_name!r}"
        )

AnalogTriggerSlope

Bases: StrEnum

Active slope for an analog edge trigger.

Mirrors nidaqmx.constants.Slope. Kept library-side so :class:AnalogEdgeStartTrigger round-trips through JSON without pulling NI's enum machinery into the serialisation layer.

ChannelSpec dataclass

ChannelSpec(
    *,
    physical_channel,
    name=None,
    unit=None,
    metadata=_empty_metadata(),
)

Application-facing description of one DAQ channel.

Attributes:

Name Type Description
physical_channel str

NI physical channel identifier, e.g. "Dev1/ai0".

name str | None

Optional friendly name; defaults to the physical channel.

unit str | None

Optional engineering unit string ("V", "degC", ...). Used by sinks for column headers; not interpreted by the backend.

metadata Mapping[str, str | int | float | bool]

Free-form scalar metadata propagated into emitted records.

display_name property

display_name

Return name if set, otherwise the physical channel.

kind class-attribute

kind = ''

Discriminator used by :meth:from_dict. Concrete subclasses override.

__post_init__

__post_init__()

Validate and freeze common channel metadata.

Source code in src/nidaqlib/channels/base.py
def __post_init__(self) -> None:
    """Validate and freeze common channel metadata."""
    if not self.physical_channel:
        raise NIDaqValidationError("physical_channel must be a non-empty string")
    if self.name is not None and not self.name:
        raise NIDaqValidationError("name must be non-empty when provided")
    object.__setattr__(self, "metadata", MappingProxyType(dict(self.metadata)))

from_dict classmethod

from_dict(data)

Deserialise from a dict produced by :meth:to_dict.

On the base class, this dispatches to the registered subclass for the kind discriminator. On a concrete subclass, this validates that kind matches and constructs the dataclass directly.

Parameters:

Name Type Description Default
data Mapping[str, Any]

Mapping carrying the kind discriminator and field values.

required

Raises:

Type Description
NIDaqValidationError

kind is missing, unknown, or does not match the concrete class.

Source code in src/nidaqlib/channels/base.py
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> Self:
    """Deserialise from a dict produced by :meth:`to_dict`.

    On the base class, this dispatches to the registered subclass for the
    ``kind`` discriminator. On a concrete subclass, this validates that
    ``kind`` matches and constructs the dataclass directly.

    Args:
        data: Mapping carrying the ``kind`` discriminator and field values.

    Raises:
        NIDaqValidationError: ``kind`` is missing, unknown, or does not
            match the concrete class.
    """
    kind = data.get("kind")
    if cls is ChannelSpec:
        if not isinstance(kind, str):
            raise NIDaqValidationError(
                f"channel spec dict missing 'kind' discriminator (got {kind!r})"
            )
        target = _CHANNEL_REGISTRY.get(kind)
        if target is None:
            raise NIDaqValidationError(f"unknown channel kind {kind!r}")
        # Mypy can't see that the registered class returns Self here; the
        # dispatch is dynamic by design, so cast at the boundary.
        return target.from_dict(data)  # type: ignore[return-value]
    if kind != cls.kind:
        raise NIDaqValidationError(f"kind mismatch: expected {cls.kind!r}, got {kind!r}")
    payload = {k: v for k, v in data.items() if k != "kind"}
    return cls(**payload)

to_dict

to_dict()

Serialise to a JSON/TOML-friendly dict, including kind.

Returns:

Type Description
dict[str, Any]

A dict carrying kind plus every dataclass field. Mappings are

dict[str, Any]

copied to plain dict so the result is JSON-encodable.

Source code in src/nidaqlib/channels/base.py
def to_dict(self) -> dict[str, Any]:
    """Serialise to a JSON/TOML-friendly dict, including ``kind``.

    Returns:
        A dict carrying ``kind`` plus every dataclass field. Mappings are
        copied to plain ``dict`` so the result is JSON-encodable.
    """
    payload: dict[str, Any] = {}
    for spec in dataclasses.fields(self):
        value = getattr(self, spec.name)
        if isinstance(value, Mapping):
            value = dict(cast("Mapping[str, Any]", value))
        payload[spec.name] = value
    payload["kind"] = self.kind
    return payload

CounterEdgeCountInput dataclass

CounterEdgeCountInput(
    *,
    physical_channel,
    name=None,
    unit=None,
    metadata=_empty_metadata(),
    edge=Edge.RISING,
    initial_count=0,
    count_up=True,
)

Bases: ChannelSpec

Edge-count counter-input channel.

Maps to Task.ci_channels.add_ci_count_edges_chan on the NI side. Useful for encoders, totalisers, or anything that needs raw edge accumulation.

Attributes:

Name Type Description
edge Edge

Edge that increments / decrements the counter. Rising by default.

initial_count int

Starting value of the counter. Defaults to 0.

count_up bool

When True (default), every active edge increments the counter; when False, decrements. Mirrors NI's CountDirection.COUNT_UP / COUNT_DOWN.

__post_init__

__post_init__()

Validate common channel metadata.

Source code in src/nidaqlib/channels/counter_input.py
def __post_init__(self) -> None:
    """Validate common channel metadata."""
    ChannelSpec.__post_init__(self)

from_dict classmethod

from_dict(data)

Deserialise, restoring :class:Edge from its string value.

Source code in src/nidaqlib/channels/counter_input.py
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> Self:
    """Deserialise, restoring :class:`Edge` from its string value."""
    kind = data.get("kind")
    if kind != cls.kind:
        raise NIDaqValidationError(f"kind mismatch: expected {cls.kind!r}, got {kind!r}")
    payload = {k: v for k, v in data.items() if k != "kind"}
    try:
        payload["edge"] = Edge(payload.get("edge", Edge.RISING.value))
    except ValueError as exc:
        raise NIDaqValidationError(f"unknown Edge {payload.get('edge')!r}") from exc
    return cls(**payload)

to_dict

to_dict()

Serialise; encode :class:Edge to its string value.

Source code in src/nidaqlib/channels/counter_input.py
def to_dict(self) -> dict[str, Any]:
    """Serialise; encode :class:`Edge` to its string value."""
    payload = ChannelSpec.to_dict(self)
    payload["edge"] = self.edge.value
    return payload

CounterFrequencyInput dataclass

CounterFrequencyInput(
    *,
    physical_channel,
    name=None,
    unit=None,
    metadata=_empty_metadata(),
    min_val,
    max_val,
    edge=Edge.RISING,
)

Bases: ChannelSpec

Frequency-measurement counter-input channel.

Maps to Task.ci_channels.add_ci_freq_chan on the NI side. NI uses (min_val, max_val) to choose timebases that resolve frequencies in the expected range.

Attributes:

Name Type Description
min_val float

Lower bound of the expected frequency, in Hz.

max_val float

Upper bound of the expected frequency, in Hz.

edge Edge

Edge of the input signal that increments the counter. Rising by default.

__post_init__

__post_init__()

Validate the expected frequency range.

Source code in src/nidaqlib/channels/counter_input.py
def __post_init__(self) -> None:
    """Validate the expected frequency range."""
    ChannelSpec.__post_init__(self)
    if self.min_val <= 0.0:
        raise NIDaqValidationError(f"min_val must be > 0 for {self.display_name!r}")
    if self.min_val >= self.max_val:
        raise NIDaqValidationError(
            f"min_val must be < max_val for {self.display_name!r}; "
            f"got {self.min_val!r} >= {self.max_val!r}"
        )

from_dict classmethod

from_dict(data)

Deserialise, restoring :class:Edge from its string value.

Source code in src/nidaqlib/channels/counter_input.py
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> Self:
    """Deserialise, restoring :class:`Edge` from its string value."""
    kind = data.get("kind")
    if kind != cls.kind:
        raise NIDaqValidationError(f"kind mismatch: expected {cls.kind!r}, got {kind!r}")
    payload = {k: v for k, v in data.items() if k != "kind"}
    try:
        payload["edge"] = Edge(payload.get("edge", Edge.RISING.value))
    except ValueError as exc:
        raise NIDaqValidationError(f"unknown Edge {payload.get('edge')!r}") from exc
    return cls(**payload)

to_dict

to_dict()

Serialise; encode :class:Edge to its string value.

Source code in src/nidaqlib/channels/counter_input.py
def to_dict(self) -> dict[str, Any]:
    """Serialise; encode :class:`Edge` to its string value."""
    payload = ChannelSpec.to_dict(self)
    payload["edge"] = self.edge.value
    return payload

CounterPeriodInput dataclass

CounterPeriodInput(
    *,
    physical_channel,
    name=None,
    unit=None,
    metadata=_empty_metadata(),
    min_val,
    max_val,
    edge=Edge.RISING,
)

Bases: ChannelSpec

Period-measurement counter-input channel.

Maps to Task.ci_channels.add_ci_period_chan on the NI side.

Attributes:

Name Type Description
min_val float

Lower bound of the expected period, in seconds.

max_val float

Upper bound of the expected period, in seconds.

edge Edge

Starting edge of the period measurement. Rising by default.

__post_init__

__post_init__()

Validate the expected period range.

Source code in src/nidaqlib/channels/counter_input.py
def __post_init__(self) -> None:
    """Validate the expected period range."""
    ChannelSpec.__post_init__(self)
    if self.min_val <= 0.0:
        raise NIDaqValidationError(f"min_val must be > 0 for {self.display_name!r}")
    if self.min_val >= self.max_val:
        raise NIDaqValidationError(
            f"min_val must be < max_val for {self.display_name!r}; "
            f"got {self.min_val!r} >= {self.max_val!r}"
        )

from_dict classmethod

from_dict(data)

Deserialise, restoring :class:Edge from its string value.

Source code in src/nidaqlib/channels/counter_input.py
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> Self:
    """Deserialise, restoring :class:`Edge` from its string value."""
    kind = data.get("kind")
    if kind != cls.kind:
        raise NIDaqValidationError(f"kind mismatch: expected {cls.kind!r}, got {kind!r}")
    payload = {k: v for k, v in data.items() if k != "kind"}
    try:
        payload["edge"] = Edge(payload.get("edge", Edge.RISING.value))
    except ValueError as exc:
        raise NIDaqValidationError(f"unknown Edge {payload.get('edge')!r}") from exc
    return cls(**payload)

to_dict

to_dict()

Serialise; encode :class:Edge to its string value.

Source code in src/nidaqlib/channels/counter_input.py
def to_dict(self) -> dict[str, Any]:
    """Serialise; encode :class:`Edge` to its string value."""
    payload = ChannelSpec.to_dict(self)
    payload["edge"] = self.edge.value
    return payload

CounterPulseFrequency dataclass

CounterPulseFrequency(
    *,
    physical_channel,
    name=None,
    unit=None,
    metadata=_empty_metadata(),
    frequency,
    duty_cycle=0.5,
    initial_delay=0.0,
    idle_high=False,
    requires_confirm=True,
)

Bases: ChannelSpec

Pulse-train counter output specified by frequency + duty cycle.

Maps to Task.co_channels.add_co_pulse_chan_freq on the NI side.

Attributes:

Name Type Description
frequency float

Pulse-train frequency, in Hz.

duty_cycle float

Fractional duty cycle in (0.0, 1.0). 0.5 = square wave.

initial_delay float

Optional delay before the first pulse, in seconds. Defaults to 0.

idle_high bool

When True, the line idles high (active-low pulses); otherwise idles low (active-high pulses).

requires_confirm bool

When True, every :meth:DaqSession.write targeting this channel must pass confirm=True. Defaults to True — counter outputs default to safe.

__post_init__

__post_init__()

Validate pulse-train parameters.

Source code in src/nidaqlib/channels/counter_output.py
def __post_init__(self) -> None:
    """Validate pulse-train parameters."""
    ChannelSpec.__post_init__(self)
    if self.frequency <= 0.0:
        raise NIDaqValidationError(f"frequency must be > 0 for {self.display_name!r}")
    if not 0.0 < self.duty_cycle < 1.0:
        raise NIDaqValidationError(
            f"duty_cycle must be in (0.0, 1.0) for {self.display_name!r}; "
            f"got {self.duty_cycle!r}"
        )
    if self.initial_delay < 0.0:
        raise NIDaqValidationError(f"initial_delay must be >= 0 for {self.display_name!r}")

CounterPulseTicks dataclass

CounterPulseTicks(
    *,
    physical_channel,
    name=None,
    unit=None,
    metadata=_empty_metadata(),
    source_terminal,
    high_ticks,
    low_ticks,
    initial_delay=0,
    idle_high=False,
    requires_confirm=True,
)

Bases: ChannelSpec

Pulse-train counter output specified by high / low tick counts.

Maps to Task.co_channels.add_co_pulse_chan_ticks on the NI side. The tick reference is given by source_terminal.

Attributes:

Name Type Description
source_terminal str

NI terminal supplying the tick clock (e.g. "/Dev1/20MHzTimebase").

high_ticks int

Number of source ticks in the high state.

low_ticks int

Number of source ticks in the low state.

initial_delay int

Optional initial-delay tick count.

idle_high bool

When True, the line idles high.

requires_confirm bool

Defaults to True.

__post_init__

__post_init__()

Validate pulse tick parameters.

Source code in src/nidaqlib/channels/counter_output.py
def __post_init__(self) -> None:
    """Validate pulse tick parameters."""
    ChannelSpec.__post_init__(self)
    if self.high_ticks <= 0 or self.low_ticks <= 0:
        raise NIDaqValidationError(
            f"high_ticks and low_ticks must be > 0 for {self.display_name!r}"
        )
    if self.initial_delay < 0:
        raise NIDaqValidationError(f"initial_delay must be >= 0 for {self.display_name!r}")

CounterPulseTime dataclass

CounterPulseTime(
    *,
    physical_channel,
    name=None,
    unit=None,
    metadata=_empty_metadata(),
    high_time,
    low_time,
    initial_delay=0.0,
    idle_high=False,
    requires_confirm=True,
)

Bases: ChannelSpec

Pulse-train counter output specified by high / low durations in seconds.

Maps to Task.co_channels.add_co_pulse_chan_time on the NI side.

Attributes:

Name Type Description
high_time float

High-state duration, in seconds.

low_time float

Low-state duration, in seconds.

initial_delay float

Optional delay before the first pulse, in seconds.

idle_high bool

When True, the line idles high (active-low pulses).

requires_confirm bool

Defaults to True.

__post_init__

__post_init__()

Validate pulse timing parameters.

Source code in src/nidaqlib/channels/counter_output.py
def __post_init__(self) -> None:
    """Validate pulse timing parameters."""
    ChannelSpec.__post_init__(self)
    if self.high_time <= 0.0 or self.low_time <= 0.0:
        raise NIDaqValidationError(
            f"high_time and low_time must be > 0 for {self.display_name!r}"
        )
    if self.initial_delay < 0.0:
        raise NIDaqValidationError(f"initial_delay must be >= 0 for {self.display_name!r}")

DaqBlock dataclass

DaqBlock(
    *,
    device,
    task=None,
    channels,
    data,
    block_index,
    first_sample_index,
    samples_per_channel,
    block_period_ns,
    t_mono_ns,
    t_utc,
    t_midpoint_mono_ns,
    task_started_at,
    t0,
    read_started_at,
    read_finished_at,
    elapsed_s,
    units,
    error=None,
)

One rectangular block of hardware-clocked samples.

The data field is the natural shape for Parquet row groups, NumPy slicing, and TDMS — do not scalarize unless the user opts in via :func:nidaqlib.block_to_rows.

To recover the wall-clock or monotonic timestamp of sample k::

t_mono_k = block.t_mono_ns + k * block.block_period_ns
elapsed_k = (block.first_sample_index + k) / block.sample_rate_hz
t_wall_k = block.task_started_at + timedelta(seconds=elapsed_k)

Do not interpolate sample times off read_started_at — that drifts block-to-block.

Attributes:

Name Type Description
device str

Manager-add name, or TaskSpec.name when emitted directly.

task str | None

Underlying TaskSpec.name.

channels tuple[str, ...]

Channel display names in the row order of data.

data ndarray

NumPy array of shape (len(channels), samples_per_channel). dtype is float64 for AI voltage.

block_index int

0-based, monotonic per task. Resets on a new task.

first_sample_index int

Cumulative sample offset since task_started_at.

samples_per_channel int

data.shape[1].

block_period_ns int | None

Integer nanoseconds between consecutive samples. None for on-demand reads (no clock).

t_mono_ns int

time.monotonic_ns() at sample index 0. Canonical join key.

t_utc datetime

Wall-clock at sample index 0 (UTC, tz-aware).

t_midpoint_mono_ns int | None

Midpoint of the full block window in monotonic_ns; None for on-demand blocks.

task_started_at datetime

Wall-clock anchor for sample-time reconstruction.

t0 datetime

Wall-clock at the first sample of this block; equals t_utc but kept as separate provenance.

read_started_at datetime

Wall-clock just before the read (provenance).

read_finished_at datetime

Wall-clock just after the read (provenance).

elapsed_s float

read_finished_at - read_started_at in seconds.

units Mapping[str, str | None]

Engineering units keyed by channel display name.

error NIDaqError | None

Populated only under ErrorPolicy.RETURN.

sample_rate_hz property

sample_rate_hz

Convenience: 1e9 / block_period_ns; None for on-demand.

__post_init__

__post_init__()

Validate the rectangular-shape invariant.

Raises:

Type Description
NIDaqValidationError

data.shape does not equal (len(channels), samples_per_channel).

Source code in src/nidaqlib/tasks/models.py
def __post_init__(self) -> None:
    """Validate the rectangular-shape invariant.

    Raises:
        NIDaqValidationError: ``data.shape`` does not equal
            ``(len(channels), samples_per_channel)``.
    """
    from nidaqlib.errors import NIDaqValidationError  # noqa: PLC0415

    n_channels = len(self.channels)
    expected = (n_channels, self.samples_per_channel)
    actual = tuple(self.data.shape)
    if actual != expected:
        raise NIDaqValidationError(
            f"DaqBlock data shape {actual} does not match (channels, "
            f"samples_per_channel) = {expected}"
        )

DaqManager

DaqManager(*, error_policy=ErrorPolicy.RAISE)

Lifecycle, dispatch, and group operations across multiple NI tasks.

Construction does not touch the driver. Add tasks via :meth:add (lazy — no NI calls), then call :meth:start to bring them up. :meth:close always tears down in reverse-add order.

The manager is async-context-manager-aware: async with DaqManager() closes every session on exit, even on raised errors.

Create a manager.

Parameters:

Name Type Description Default
error_policy ErrorPolicy

Default policy for group operations (:meth:start, :meth:stop, :meth:poll, :meth:read_block). :attr:ErrorPolicy.RAISE collects errors into an :class:ExceptionGroup; :attr:ErrorPolicy.RETURN surfaces them as DeviceResult.error rows and continues.

RAISE
Source code in src/nidaqlib/manager.py
def __init__(self, *, error_policy: ErrorPolicy = ErrorPolicy.RAISE) -> None:
    """Create a manager.

    Args:
        error_policy: Default policy for group operations
            (:meth:`start`, :meth:`stop`, :meth:`poll`,
            :meth:`read_block`). :attr:`ErrorPolicy.RAISE` collects
            errors into an :class:`ExceptionGroup`;
            :attr:`ErrorPolicy.RETURN` surfaces them as
            ``DeviceResult.error`` rows and continues.
    """
    self._error_policy = error_policy
    self._sessions: dict[str, DaqSession] = {}
    self._specs: dict[str, TaskSpec] = {}
    self._refcounts: dict[str, int] = {}
    self._order: list[str] = []
    self._task_locks: dict[str, anyio.Lock] = {}
    self._device_locks: dict[str, anyio.Lock] = {}
    self._task_devices: dict[str, tuple[str, ...]] = {}
    self._global_lock = anyio.Lock()
    self._closed = False
    # Cache of (device → product_type) so the module-level preflight
    # only queries the backend once per device. ``_NOT_QUERIED`` is the
    # sentinel for "haven't tried yet"; ``None`` means "queried, NI
    # returned no info / the device alias is unknown".
    self._device_product_cache: dict[str, str | None] = {}

error_policy property

error_policy

The default error policy for group operations.

is_closed property

is_closed

True once :meth:close has run.

names property

names

Names of currently managed tasks, in add-order.

add async

add(name, source, *, backend=None)

Register a task with this manager. Idempotent on duplicate name.

Performs a best-effort preflight conflict check against tasks already managed (design doc §15.3). NI is the final authority — the preflight only catches obvious overlaps.

add does not allocate NI resources — it constructs a :class:DaqSession and records it. The session's :meth:DaqSession.configure (which creates the NI task and applies channels / timing / logging / triggers) runs lazily on the first :meth:start for the task. Any NI rejection of the spec (bad physical channel, unsupported channel kind, sample rate above device max, …) therefore surfaces at :meth:start time, not :meth:add time. The preflight catches operator-side overlap only; everything NI validates lives downstream.

Parameters:

Name Type Description Default
name str

Manager-side label for this task. Must be unique.

required
source TaskSpec | DaqSession

Either a :class:TaskSpec (manager constructs a fresh :class:DaqSession) or a pre-built :class:DaqSession (manager registers it as-is). The pre-built form aligns with the ecosystem add(name, source) convention shared by :class:watlowlib.WatlowManager, :class:alicatlib.AlicatManager, and :class:sartoriuslib.SartoriusManager.

required
backend DaqBackend | None

Optional :class:DaqBackend. Defaults to :class:NidaqmxBackend (lazy import). Ignored when source is a :class:DaqSession (which already carries its own backend).

None

Returns:

Name Type Description
The DaqSession

class:DaqSession registered under name. Re-adding

DaqSession

the same (name, source) returns the existing session and

DaqSession

bumps a refcount.

Raises:

Type Description
NIDaqTaskStateError

name already maps to a different spec, or the manager is closed.

NIDaqResourceError

source overlaps physical channels with an already-managed task.

Source code in src/nidaqlib/manager.py
async def add(
    self,
    name: str,
    source: TaskSpec | DaqSession,
    *,
    backend: DaqBackend | None = None,
) -> DaqSession:
    """Register a task with this manager. Idempotent on duplicate ``name``.

    Performs a best-effort preflight conflict check against tasks
    already managed (design doc §15.3). NI is the final authority —
    the preflight only catches obvious overlaps.

    ``add`` does **not** allocate NI resources — it constructs a
    :class:`DaqSession` and records it. The session's
    :meth:`DaqSession.configure` (which creates the NI task and applies
    channels / timing / logging / triggers) runs lazily on the first
    :meth:`start` for the task. Any NI rejection of the ``spec`` (bad
    physical channel, unsupported channel kind, sample rate above
    device max, …) therefore surfaces at :meth:`start` time, not
    :meth:`add` time. The preflight catches operator-side overlap
    only; everything NI validates lives downstream.

    Args:
        name: Manager-side label for this task. Must be unique.
        source: Either a :class:`TaskSpec` (manager constructs a
            fresh :class:`DaqSession`) or a pre-built
            :class:`DaqSession` (manager registers it as-is). The
            pre-built form aligns with the ecosystem ``add(name,
            source)`` convention shared by :class:`watlowlib.WatlowManager`,
            :class:`alicatlib.AlicatManager`, and
            :class:`sartoriuslib.SartoriusManager`.
        backend: Optional :class:`DaqBackend`. Defaults to
            :class:`NidaqmxBackend` (lazy import). Ignored when
            ``source`` is a :class:`DaqSession` (which already
            carries its own backend).

    Returns:
        The :class:`DaqSession` registered under ``name``. Re-adding
        the same ``(name, source)`` returns the existing session and
        bumps a refcount.

    Raises:
        NIDaqTaskStateError: ``name`` already maps to a different
            spec, or the manager is closed.
        NIDaqResourceError: ``source`` overlaps physical channels
            with an already-managed task.
    """
    if self._closed:
        raise NIDaqTaskStateError(
            "DaqManager is closed",
            context=ErrorContext(task_name=name, command_name="manager.add"),
        )
    if isinstance(source, DaqSession):
        spec = source.spec
        prebuilt: DaqSession | None = source
    else:
        spec = source
        prebuilt = None
    async with self._global_lock:
        existing = self._sessions.get(name)
        if existing is not None:
            if self._specs.get(name) is not spec and self._specs.get(name) != spec:
                raise NIDaqTaskStateError(
                    f"task {name!r} already registered with a different spec",
                    context=ErrorContext(task_name=name, command_name="manager.add"),
                )
            self._refcounts[name] = self._refcounts.get(name, 1) + 1
            return existing

        if prebuilt is not None:
            session = prebuilt
        else:
            if backend is None:
                from nidaqlib.backend.nidaqmx_backend import NidaqmxBackend  # noqa: PLC0415

                backend = NidaqmxBackend()
            await self._preflight_conflicts(name, spec, backend)
            session = DaqSession(spec, backend)
        self._sessions[name] = session
        self._specs[name] = spec
        self._refcounts[name] = 1
        self._order.append(name)
        self._task_locks[name] = anyio.Lock()
        devices = tuple(sorted({_device_of(ch.physical_channel) for ch in spec.channels}))
        self._task_devices[name] = devices
        for ch in spec.channels:
            dev = _device_of(ch.physical_channel)
            self._device_locks.setdefault(dev, anyio.Lock())
        return session

close async

close()

Tear down every managed session in LIFO order. Idempotent.

Failures are collected into an :class:ExceptionGroup; one slow / broken close does not prevent others from running.

Source code in src/nidaqlib/manager.py
async def close(self) -> None:
    """Tear down every managed session in LIFO order. Idempotent.

    Failures are collected into an :class:`ExceptionGroup`; one slow /
    broken close does not prevent others from running.
    """
    if self._closed:
        return
    self._closed = True
    # Snapshot under the global lock, then close outside it so unrelated
    # ops (e.g. a recorder still draining) are not blocked on a long NI
    # close call.
    async with self._global_lock:
        order = list(reversed(self._order))
        sessions = {name: self._sessions[name] for name in order if name in self._sessions}
        self._sessions.clear()
        self._specs.clear()
        self._refcounts.clear()
        self._order.clear()
        self._task_locks.clear()
        self._task_devices.clear()
        self._device_locks.clear()
    errors: list[BaseException] = []
    for name in order:
        session = sessions.get(name)
        if session is None:
            continue
        try:
            await session.close()
        except BaseException as exc:
            # Collected and re-grouped below — one slow / broken close
            # must not prevent the rest of the LIFO unwind from running.
            errors.append(exc)
    if errors:
        raise BaseExceptionGroup("DaqManager.close: one or more sessions failed", errors)

get

get(name)

Return the session registered under name.

Raises:

Type Description
KeyError

name is unknown.

Source code in src/nidaqlib/manager.py
def get(self, name: str) -> DaqSession:
    """Return the session registered under ``name``.

    Raises:
        KeyError: ``name`` is unknown.
    """
    return self._sessions[name]

poll async

poll(names=None, *, timeout=None, error_policy=None)

Poll one or more tasks once each. Returns one :class:DaqReading per task.

Source code in src/nidaqlib/manager.py
async def poll(
    self,
    names: Sequence[str] | None = None,
    *,
    timeout: float | None = None,  # noqa: ASYNC109 — NI per-call timeout, not coroutine
    error_policy: ErrorPolicy | None = None,
) -> Mapping[str, DeviceResult[DaqReading]]:
    """Poll one or more tasks once each. Returns one :class:`DaqReading` per task."""

    async def _do(session: DaqSession) -> DaqReading:
        return await session.poll(timeout=timeout)

    return await self._for_each(
        names,
        "poll",
        _do,
        error_policy=error_policy,
    )

read_block async

read_block(
    samples_per_channel,
    names=None,
    *,
    timeout=None,
    error_policy=None,
)

Read one block per task in parallel.

Source code in src/nidaqlib/manager.py
async def read_block(
    self,
    samples_per_channel: int,
    names: Sequence[str] | None = None,
    *,
    timeout: float | None = None,  # noqa: ASYNC109 — NI per-call timeout, not coroutine
    error_policy: ErrorPolicy | None = None,
) -> Mapping[str, DeviceResult[DaqBlock]]:
    """Read one block per task in parallel."""

    async def _do(session: DaqSession) -> DaqBlock:
        return await session.read_block(samples_per_channel, timeout=timeout)

    return await self._for_each(
        names,
        "read_block",
        _do,
        error_policy=error_policy,
    )

remove async

remove(name)

Decrement refcount; tear down on the last :meth:remove.

A no-op for unknown names — matches sibling parity.

Raises:

Type Description
NIDaqError

Surfaced from session close (collected into a group when called from :meth:close).

Source code in src/nidaqlib/manager.py
async def remove(self, name: str) -> None:
    """Decrement refcount; tear down on the last :meth:`remove`.

    A no-op for unknown names — matches sibling parity.

    Raises:
        NIDaqError: Surfaced from session close (collected into a
            group when called from :meth:`close`).
    """
    async with self._global_lock:
        if name not in self._sessions:
            return
        self._refcounts[name] -= 1
        if self._refcounts[name] > 0:
            return
        session = self._sessions.pop(name)
        self._specs.pop(name, None)
        self._refcounts.pop(name, None)
        self._task_locks.pop(name, None)
        self._task_devices.pop(name, None)
        with contextlib.suppress(ValueError):
            self._order.remove(name)
    # Close outside the global lock so a slow NI close doesn't block
    # other manager ops on unrelated tasks.
    await session.close()

start async

start(names=None, *, error_policy=None, confirm=False)

Start one or more managed tasks. Defaults to all in add-order.

Source code in src/nidaqlib/manager.py
async def start(
    self,
    names: Sequence[str] | None = None,
    *,
    error_policy: ErrorPolicy | None = None,
    confirm: bool = False,
) -> Mapping[str, DeviceResult[None]]:
    """Start one or more managed tasks. Defaults to all in add-order."""

    async def _do(session: DaqSession) -> None:
        await _configure_then_start(session, confirm=confirm)

    return await self._for_each(
        names,
        "start",
        _do,
        error_policy=error_policy,
    )

start_synchronized async

start_synchronized(
    master, slaves, *, error_policy=None, confirm=False
)

Arm slaves first, then start master.

Multi-task synchronisation requires strict ordering: each slave is configured against a shared sample clock or trigger and must reach the armed-and-waiting state before the master is started — once the master arms its clock or fires its trigger, the slaves react immediately. If a slave is started after the master, samples before its first edge are lost.

Slaves are armed sequentially (not concurrently): NI's start_task returns once the task is armed, so issuing the starts in order guarantees every slave has reached the armed state before the master starts. This is intentionally simpler than the parallel fan-out used by :meth:start; the difference matters when one slave fails to arm — the master must not start at all.

On failure during slave arming, every slave that had already armed is stopped (in reverse order) before the error is raised; the master is not started.

Parameters:

Name Type Description Default
master str

Manager-add name of the master task.

required
slaves Sequence[str]

Manager-add names of the slave tasks. Order is respected — slaves are armed left-to-right.

required
error_policy ErrorPolicy | None

Optional override; defaults to the manager's policy.

None
confirm bool

Required when any task being started can actuate hardware immediately.

False

Returns:

Name Type Description
One Mapping[str, DeviceResult[None]]

class:DeviceResult[None] per task (master plus every

Mapping[str, DeviceResult[None]]

entry of slaves), keyed by name.

Raises:

Type Description
KeyError

master or any entry of slaves is unknown.

BaseExceptionGroup

One or more tasks failed under :attr:ErrorPolicy.RAISE.

Source code in src/nidaqlib/manager.py
async def start_synchronized(
    self,
    master: str,
    slaves: Sequence[str],
    *,
    error_policy: ErrorPolicy | None = None,
    confirm: bool = False,
) -> Mapping[str, DeviceResult[None]]:
    """Arm ``slaves`` first, then start ``master``.

    Multi-task synchronisation requires strict ordering: each slave is
    configured against a shared sample clock or trigger and must reach
    the *armed-and-waiting* state before the master is started — once
    the master arms its clock or fires its trigger, the slaves react
    immediately. If a slave is started after the master, samples
    before its first edge are lost.

    Slaves are armed sequentially (not concurrently): NI's
    ``start_task`` returns once the task is armed, so issuing the
    starts in order guarantees every slave has reached the armed state
    before the master starts. This is intentionally simpler than the
    parallel fan-out used by :meth:`start`; the difference matters
    when one slave fails to arm — the master must not start at all.

    On failure during slave arming, every slave that had already
    armed is stopped (in reverse order) before the error is raised;
    the master is not started.

    Args:
        master: Manager-add name of the master task.
        slaves: Manager-add names of the slave tasks. Order is
            respected — slaves are armed left-to-right.
        error_policy: Optional override; defaults to the manager's
            policy.
        confirm: Required when any task being started can actuate
            hardware immediately.

    Returns:
        One :class:`DeviceResult[None]` per task (``master`` plus every
        entry of ``slaves``), keyed by name.

    Raises:
        KeyError: ``master`` or any entry of ``slaves`` is unknown.
        BaseExceptionGroup: One or more tasks failed under
            :attr:`ErrorPolicy.RAISE`.
    """
    unknown = [n for n in (master, *slaves) if n not in self._sessions]
    if unknown:
        raise KeyError(f"unknown task name(s): {unknown!r}")
    if master in slaves:
        raise NIDaqTaskStateError(
            f"task {master!r} cannot be both master and slave",
            context=ErrorContext(task_name=master, command_name="start_synchronized"),
        )

    policy = error_policy if error_policy is not None else self._error_policy
    results: dict[str, DeviceResult[None]] = {}
    errors: list[BaseException] = []
    armed: list[str] = []

    for name in slaves:
        session = self._sessions[name]
        try:
            async with self._operation_locks(name):
                await _configure_then_start(session, confirm=confirm)
            results[name] = DeviceResult.success(None)
            armed.append(name)
        except NIDaqError as exc:
            results[name] = DeviceResult.failure(exc)
            errors.append(exc)
            # Roll back: stop every slave that armed before this one,
            # in reverse order. Best-effort — collect rollback errors
            # but never raise from the rollback path.
            for prior in reversed(armed):
                prior_session = self._sessions[prior]
                try:
                    async with self._operation_locks(prior):
                        await prior_session.stop()
                except NIDaqError as rollback_exc:
                    errors.append(rollback_exc)
            # Do not start the master.
            results[master] = DeviceResult.failure(
                NIDaqTaskStateError(
                    f"master {master!r} not started: slave {name!r} failed to arm",
                    context=ErrorContext(task_name=master, command_name="start_synchronized"),
                ),
            )
            if policy is ErrorPolicy.RAISE:
                raise BaseExceptionGroup(
                    "DaqManager.start_synchronized: slave arming failed",
                    errors,
                ) from exc
            return results

    # All slaves armed — start the master.
    master_session = self._sessions[master]
    try:
        async with self._operation_locks(master):
            await _configure_then_start(master_session, confirm=confirm)
        results[master] = DeviceResult.success(None)
    except NIDaqError as exc:
        results[master] = DeviceResult.failure(exc)
        errors.append(exc)
        if policy is ErrorPolicy.RAISE:
            raise BaseExceptionGroup(
                "DaqManager.start_synchronized: master start failed",
                errors,
            ) from exc

    return results

stop async

stop(names=None, *, error_policy=None)

Stop one or more managed tasks. Defaults to all in reverse-add.

Source code in src/nidaqlib/manager.py
async def stop(
    self,
    names: Sequence[str] | None = None,
    *,
    error_policy: ErrorPolicy | None = None,
) -> Mapping[str, DeviceResult[None]]:
    """Stop one or more managed tasks. Defaults to all in reverse-add."""
    targets = self._resolve_names(names)
    if names is None:
        targets = list(reversed(targets))
    return await self._for_each_targets(
        targets,
        "stop",
        self._call_stop,
        error_policy=error_policy,
    )

DaqReading dataclass

DaqReading(
    *,
    device,
    task=None,
    values,
    units,
    t_mono_ns,
    t_utc,
    t_midpoint_mono_ns=None,
    requested_at,
    received_at,
    latency_s,
    metadata=_empty_metadata(),
    error=None,
)

One scalar (or low-rate) reading across the channels of a task.

Attributes:

Name Type Description
device str

Manager-add name, or TaskSpec.name when emitted directly from a session. The cross-instrument join key.

task str | None

Underlying TaskSpec.name (optional second key).

values Mapping[str, float | int | bool]

One entry per channel, keyed by channel display name.

units Mapping[str, str | None]

Engineering units, keyed by channel display name. None entries indicate "no unit declared on the channel spec."

t_mono_ns int

time.monotonic_ns() at the midpoint of the request/receive window. Canonical join key.

t_utc datetime

Wall-clock at the midpoint of the request/receive window.

t_midpoint_mono_ns int | None

Optional integration-window midpoint. None for software-timed polling — t_mono_ns already names the midpoint of the I/O window.

requested_at datetime

Wall-clock immediately before the read (provenance).

received_at datetime

Wall-clock immediately after the read returns (provenance).

latency_s float

received_at - requested_at in seconds.

metadata Mapping[str, str | int | float | bool]

Free-form scalar metadata.

error NIDaqError | None

Populated only under ErrorPolicy.RETURN. Always None under the default RAISE policy.

DaqSession

DaqSession(spec, backend, *, timeout=10.0)

Owns one underlying NI task plus its lifecycle state.

Construction does not touch the driver. Call :meth:start (or use :func:open_device) to create the task, add channels, and configure timing. read_block / poll are valid once started.

Create a session for spec against backend.

The constructor only stores its arguments; it never touches the driver. That keeps __init__ exception-free and avoids a partially-initialised task object on configuration errors.

Parameters:

Name Type Description Default
spec TaskSpec

Declarative :class:TaskSpec to materialise.

required
backend DaqBackend

Backend that proxies operations into NI (or the fake).

required
timeout float

Default per-operation timeout in seconds. Individual read_block / poll calls may override.

10.0
Source code in src/nidaqlib/tasks/session.py
def __init__(
    self,
    spec: TaskSpec,
    backend: DaqBackend,
    *,
    timeout: float = 10.0,
) -> None:
    """Create a session for ``spec`` against ``backend``.

    The constructor only stores its arguments; it never touches the
    driver. That keeps ``__init__`` exception-free and avoids a
    partially-initialised task object on configuration errors.

    Args:
        spec: Declarative :class:`TaskSpec` to materialise.
        backend: Backend that proxies operations into NI (or the fake).
        timeout: Default per-operation timeout in seconds. Individual
            ``read_block`` / ``poll`` calls may override.
    """
    self._spec = spec
    self._backend = backend
    self._timeout = timeout
    self._task: Any = None
    self._lock = anyio.Lock()
    self._configured = False
    self._started = False
    self._closed = False
    self._task_started_at: datetime | None = None
    self._first_sample_index: int = 0
    self._block_index: int = 0
    # Counter incremented every time the recorder swallows a
    # NIDaqTransientError under ErrorPolicy.RETURN. Reset on task
    # rebuild (a fresh configure() call).
    self._recoverable_error_count: int = 0
    # Cached identity, populated by configure_sync (one backend call per
    # task build). Keeps :meth:`snapshot` I/O-free.
    self._device_info: DeviceInfo | None = None
    # Last enriched error context observed; updated when the session
    # surfaces a NIDaqError from its own methods. Snapshots embed this.
    self._last_error_context: ErrorContext | None = None
    # Bridge bookkeeping — populated only when streaming/block.py opts
    # into the every-N-samples callback path.
    self._callback_handle: CallbackHandle | None = None

device_info property

device_info

Cached identity for the device backing this task; None before configure.

has_active_callback_bridge property

has_active_callback_bridge

True while a §11.3.2 callback bridge is registered.

is_closed property

is_closed

True once :meth:close has run (idempotent).

is_configured property

is_configured

True after :meth:configure succeeds and before :meth:close.

A configured session has a backing NI task with channels, timing, logging, and triggers applied — but task.start() has not yet been called. Buffer-event callback registration (§11.3.2) is only valid in this window.

is_started property

is_started

True between :meth:start and :meth:stop.

raw_task property

raw_task

The underlying backend task handle.

For :class:~nidaqlib.backend.nidaqmx_backend.NidaqmxBackend this is an nidaqmx.Task; for the fake backend it is an opaque _FakeTask. Use this for advanced NI features that aren't exposed via the wrapper — the escape hatch from design doc §7.4.

The handle is available once :meth:configure has succeeded — that is, in either the configured-not-started or started state. The callback bridge (§11.3.2) needs the handle pre-start to register the buffer event.

Raises:

Type Description
NIDaqTaskStateError

The session has not been configured yet.

recoverable_error_count property

recoverable_error_count

Count of :class:NIDaqTransientError events swallowed under RETURN policy.

Reset to 0 on every :meth:configure (i.e. fresh task build).

spec property

spec

The :class:TaskSpec this session was constructed from.

task_started_at property

task_started_at

Wall-clock anchor for sample-time reconstruction.

Returns None until :meth:start has succeeded. Once set, this value is the truth that :class:DaqBlock.task_started_at carries — it is captured exactly once per session, immediately before backend.start_task, so that the first sample's wall-clock can be reconstructed deterministically from task_started_at + first_sample_index / rate_hz (design doc §8.7).

__aenter__ async

__aenter__()

Enter the async context — no-op; :func:open_device already configured/started.

Source code in src/nidaqlib/tasks/session.py
async def __aenter__(self) -> DaqSession:
    """Enter the async context — no-op; :func:`open_device` already configured/started."""
    return self

__aexit__ async

__aexit__(*exc_info)

Exit the async context — calls :meth:close.

Source code in src/nidaqlib/tasks/session.py
async def __aexit__(self, *exc_info: object) -> None:
    """Exit the async context — calls :meth:`close`."""
    del exc_info
    await self.close()

acquire async

acquire(samples_per_channel, *, timeout=None)

Run one finite acquisition and return its :class:DaqBlock.

Convenience wrapper for the §12.3 finite-mode pattern: configure finite, start, read, stop. Requires a session whose :class:Timing.mode is :attr:AcquisitionMode.FINITE. After the read completes, the underlying NI task is stopped — call :meth:start again before another acquisition.

Parameters:

Name Type Description Default
samples_per_channel int

Number of samples per channel to read.

required
timeout float | None

Optional per-call timeout in seconds. Falls back to the session-wide default.

None

Raises:

Type Description
NIDaqTaskStateError

The session is not started, is closed, or its timing mode is not :attr:AcquisitionMode.FINITE.

NIDaqReadError / NIDaqTimeoutError

Surfaced from the backend.

Source code in src/nidaqlib/tasks/session.py
async def acquire(
    self,
    samples_per_channel: int,
    *,
    timeout: float | None = None,  # noqa: ASYNC109 — NI per-call timeout, not coroutine
) -> DaqBlock:
    """Run one finite acquisition and return its :class:`DaqBlock`.

    Convenience wrapper for the §12.3 finite-mode pattern: configure
    finite, start, read, stop. Requires a session whose
    :class:`Timing.mode` is :attr:`AcquisitionMode.FINITE`. After the
    read completes, the underlying NI task is stopped — call
    :meth:`start` again before another acquisition.

    Args:
        samples_per_channel: Number of samples per channel to read.
        timeout: Optional per-call timeout in seconds. Falls back to
            the session-wide default.

    Raises:
        NIDaqTaskStateError: The session is not started, is closed, or
            its timing mode is not :attr:`AcquisitionMode.FINITE`.
        NIDaqReadError / NIDaqTimeoutError: Surfaced from the backend.
    """
    self._require_started("acquire")
    timing = self._spec.timing
    if timing is None or timing.mode is not AcquisitionMode.FINITE:
        raise NIDaqTaskStateError(
            f"acquire() requires Timing.mode=FINITE; got {timing.mode if timing else None}",
            context=ErrorContext(task_name=self._spec.name, command_name="acquire"),
        )
    block = await self.read_block(samples_per_channel, timeout=timeout)
    await self.stop()
    return block

close async

close()

Stop (if needed) and close the underlying task. Idempotent.

__aexit__ always calls this; explicit call is rare. Sessions that have opted into the every-N-samples callback bridge MUST instead use the recorder context manager — the bridge has its own ordered shutdown protocol (design doc §11.3.2) that this method does not implement.

Source code in src/nidaqlib/tasks/session.py
async def close(self) -> None:
    """Stop (if needed) and close the underlying task. Idempotent.

    ``__aexit__`` always calls this; explicit call is rare. Sessions that
    have opted into the every-N-samples callback bridge MUST instead use
    the recorder context manager — the bridge has its own ordered
    shutdown protocol (design doc §11.3.2) that this method does not
    implement.
    """
    if self._closed:
        return
    if self._callback_handle is not None:
        raise NIDaqTaskStateError(
            "cannot close a session while an every-N-samples callback bridge is active; "
            "exit the record(..., use_callback_bridge=True) context first",
            context=ErrorContext(task_name=self._spec.name, command_name="close"),
        )
    self._closed = True
    if self._task is None:
        return
    async with self._lock:
        if self._started:
            await run_sync(self._backend.stop_task, self._task)
            self._started = False
        await run_sync(self._backend.close_task, self._task)
        self._task = None
        self._configured = False

configure async

configure()

Create the underlying task and apply channels / timing / logging / trigger.

After this method, raw_task is available and any pre-start hooks (notably the §11.3.2 buffer-event callback registration) may run. task.start() is not called — use :meth:start for that.

On failure, the partial task is torn down so the session does not leak NI resources.

Raises:

Type Description
NIDaqTaskStateError

Already configured, started, or closed.

Source code in src/nidaqlib/tasks/session.py
async def configure(self) -> None:
    """Create the underlying task and apply channels / timing / logging / trigger.

    After this method, ``raw_task`` is available and any pre-start hooks
    (notably the §11.3.2 buffer-event callback registration) may run.
    ``task.start()`` is **not** called — use :meth:`start` for that.

    On failure, the partial task is torn down so the session does not
    leak NI resources.

    Raises:
        NIDaqTaskStateError: Already configured, started, or closed.
    """
    if self._closed:
        raise NIDaqTaskStateError(
            f"session for task {self._spec.name!r} is closed",
            context=ErrorContext(task_name=self._spec.name, command_name="configure"),
        )
    if self._configured:
        raise NIDaqTaskStateError(
            f"session for task {self._spec.name!r} is already configured",
            context=ErrorContext(task_name=self._spec.name, command_name="configure"),
        )
    async with self._lock:
        await run_sync(self._configure_sync)
        self._configured = True

poll async

poll(*, timeout=None)

One-shot scalar read across all channels.

Valid only for sessions that are not actively buffering a sample clock (Timing.mode == ON_DEMAND or no Timing at all). For the live-scalar use case during a high-rate acquisition, use :func:record and read the most recent block's last column.

Raises:

Type Description
NIDaqTaskStateError

The session is buffering a sample clock (continuous or finite mode and started).

Source code in src/nidaqlib/tasks/session.py
async def poll(
    self,
    *,
    timeout: float | None = None,  # noqa: ASYNC109 — NI per-call timeout, not coroutine
) -> DaqReading:
    """One-shot scalar read across all channels.

    Valid only for sessions that are not actively buffering a sample
    clock (``Timing.mode == ON_DEMAND`` or no ``Timing`` at all). For the
    live-scalar use case during a high-rate acquisition, use
    :func:`record` and read the most recent block's last column.

    Raises:
        NIDaqTaskStateError: The session is buffering a sample clock
            (continuous or finite mode and started).
    """
    self._require_started("poll")
    self._require_analog_input_task("poll")
    timing = self._spec.timing
    if timing is not None and timing.mode in (
        AcquisitionMode.CONTINUOUS,
        AcquisitionMode.FINITE,
    ):
        raise NIDaqTaskStateError(
            f"poll() is invalid for {timing.mode.value} tasks; use record() and "
            "inspect the most recent DaqBlock instead",
            context=ErrorContext(task_name=self._spec.name, command_name="poll"),
        )
    eff_timeout = timeout if timeout is not None else self._timeout
    async with self._lock:
        requested_at = datetime.now(UTC)
        monotonic_ns_start = time.monotonic_ns()
        data = await run_sync(
            self._backend.read_block,
            self._task,
            1,
            eff_timeout,
        )
        received_at = datetime.now(UTC)
        monotonic_ns_end = time.monotonic_ns()
    t_utc = requested_at + (received_at - requested_at) / 2
    t_mono_ns = (monotonic_ns_start + monotonic_ns_end) // 2
    # data shape is (n_channels, 1) — squeeze to per-channel scalars.
    names = self._channel_names()
    units = self._channel_units()
    values: dict[str, float | int | bool] = {
        name: float(data[i, 0]) for i, name in enumerate(names)
    }
    return DaqReading(
        device=self._spec.name,
        task=self._spec.name,
        values=values,
        units=units,
        t_mono_ns=t_mono_ns,
        t_utc=t_utc,
        t_midpoint_mono_ns=None,
        requested_at=requested_at,
        received_at=received_at,
        latency_s=(received_at - requested_at).total_seconds(),
        metadata=dict(self._spec.metadata),
        error=None,
    )

read_block async

read_block(samples_per_channel, *, timeout=None)

Read one rectangular :class:DaqBlock.

Wraps the backend read in an run_sync so the event loop stays responsive during the blocking NI call. Increments the per-session first_sample_index cursor.

Parameters:

Name Type Description Default
samples_per_channel int

Samples per channel for this block.

required
timeout float | None

Optional per-call timeout in seconds; falls back to the session-wide default.

None

Raises:

Type Description
NIDaqTaskStateError

The session is not started or is closed.

NIDaqReadError / NIDaqTimeoutError

Surfaced from the backend.

Source code in src/nidaqlib/tasks/session.py
async def read_block(
    self,
    samples_per_channel: int,
    *,
    timeout: float | None = None,  # noqa: ASYNC109 — NI per-call timeout, not coroutine
) -> DaqBlock:
    """Read one rectangular :class:`DaqBlock`.

    Wraps the backend read in an ``run_sync`` so the
    event loop stays responsive during the blocking NI call. Increments
    the per-session ``first_sample_index`` cursor.

    Args:
        samples_per_channel: Samples per channel for this block.
        timeout: Optional per-call timeout in seconds; falls back to the
            session-wide default.

    Raises:
        NIDaqTaskStateError: The session is not started or is closed.
        NIDaqReadError / NIDaqTimeoutError: Surfaced from the backend.
    """
    self._require_started("read_block")
    self._require_analog_input_task("read_block")
    eff_timeout = timeout if timeout is not None else self._timeout
    async with self._lock:
        read_started_at = datetime.now(UTC)
        monotonic_ns = time.monotonic_ns()
        data = await run_sync(
            self._backend.read_block,
            self._task,
            samples_per_channel,
            eff_timeout,
        )
        read_finished_at = datetime.now(UTC)
        block = self._build_block(
            data=data,
            samples_per_channel=samples_per_channel,
            read_started_at=read_started_at,
            read_finished_at=read_finished_at,
            monotonic_ns=monotonic_ns,
        )
    return block

snapshot async

snapshot()

Return an :class:NIDaqSnapshot of this session's current state.

No I/O — built from the cached :class:DeviceInfo (one backend call at configure time) and the session's own lifecycle flags. Safe to call from any thread / event loop / callback context.

Source code in src/nidaqlib/tasks/session.py
async def snapshot(self) -> NIDaqSnapshot:
    """Return an :class:`NIDaqSnapshot` of this session's current state.

    No I/O — built from the cached :class:`DeviceInfo` (one backend call
    at configure time) and the session's own lifecycle flags. Safe to
    call from any thread / event loop / callback context.
    """
    info = self._device_info
    timing = self._spec.timing
    return NIDaqSnapshot(
        name=self._spec.name,
        model=info.product_type if info is not None else None,
        firmware=None,
        serial=info.serial_number if info is not None else None,
        connected=self._configured and not self._closed,
        last_error=self._last_error_context,
        recoverable_error_count=self._recoverable_error_count,
        captured_at=datetime.now(UTC),
        task_name=self._spec.name,
        task_state=self.task_state(),
        channel_count=len(self._spec.channels),
        timing_mode=timing.mode.value if timing is not None else None,
        rate_hz=timing.rate_hz if timing is not None else None,
        physical_channels=tuple(ch.physical_channel for ch in self._spec.channels),
        product_type=info.product_type if info is not None else None,
        chassis=None,
        physical_module=info.name if info is not None else None,
    )

start async

start(*, confirm=False)

Start the configured task.

:meth:configure must have run first. This method calls NI's task.start() and records the wall-clock anchor used for §8.7 sample-time reconstruction. Calling :meth:start again after :meth:stop reuses the configured task and resets the block/sample counters for a new run.

confirm=True is required for task kinds whose start call can actuate hardware immediately (currently counter-output pulse trains).

Raises:

Type Description
NIDaqTaskStateError

Not configured, already started, or closed.

NIDaqValidationError

Starting would actuate hardware without explicit confirmation.

Source code in src/nidaqlib/tasks/session.py
async def start(self, *, confirm: bool = False) -> None:
    """Start the configured task.

    :meth:`configure` must have run first. This method calls NI's
    ``task.start()`` and records the wall-clock anchor used for §8.7
    sample-time reconstruction. Calling :meth:`start` again after
    :meth:`stop` reuses the configured task and resets the
    block/sample counters for a new run.

    ``confirm=True`` is required for task kinds whose ``start`` call
    can actuate hardware immediately (currently counter-output pulse
    trains).

    Raises:
        NIDaqTaskStateError: Not configured, already started, or closed.
        NIDaqValidationError: Starting would actuate hardware without
            explicit confirmation.
    """
    if self._closed:
        raise NIDaqTaskStateError(
            f"session for task {self._spec.name!r} is closed",
            context=ErrorContext(task_name=self._spec.name, command_name="start"),
        )
    if not self._configured:
        raise NIDaqTaskStateError(
            f"session for task {self._spec.name!r} must be configured before start",
            context=ErrorContext(task_name=self._spec.name, command_name="start"),
        )
    if self._started:
        raise NIDaqTaskStateError(
            f"session for task {self._spec.name!r} is already started",
            context=ErrorContext(task_name=self._spec.name, command_name="start"),
        )
    self._validate_start_safety(confirm=confirm)
    async with self._lock:
        # Capture the wall-clock anchor as close to the start as possible
        # — `start_task` returns once NI has armed the clock, so the
        # first sample's wall-clock is approximately this timestamp + a
        # bounded device latency.
        anchor = datetime.now(UTC)
        try:
            await run_sync(self._backend.start_task, self._task)
        except BaseException:
            await run_sync(self._backend.close_task, self._task)
            self._task = None
            self._configured = False
            raise
        self._task_started_at = anchor
        self._first_sample_index = 0
        self._block_index = 0
        self._started = True

stop async

stop()

Stop the underlying task. Idempotent for not-yet-started sessions.

Does NOT close the task. Use :meth:close to release NI resources.

Source code in src/nidaqlib/tasks/session.py
async def stop(self) -> None:
    """Stop the underlying task. Idempotent for not-yet-started sessions.

    Does NOT close the task. Use :meth:`close` to release NI resources.
    """
    if not self._started or self._closed or self._task is None:
        return
    async with self._lock:
        await run_sync(self._backend.stop_task, self._task)
        self._started = False

task_state

task_state()

Return the coarse :class:TaskState projection of this session.

I/O-free — derived from internal lifecycle flags.

Source code in src/nidaqlib/tasks/session.py
def task_state(self) -> TaskState:
    """Return the coarse :class:`TaskState` projection of this session.

    I/O-free — derived from internal lifecycle flags.
    """
    if self._closed:
        return TaskState.CLOSED
    if self._started:
        return TaskState.RUNNING
    if self._configured:
        # `configured-but-not-started` covers both "never started" and
        # "started then stopped". We distinguish by whether the start
        # anchor was ever set.
        return TaskState.STOPPED if self._task_started_at is not None else TaskState.CONFIGURED
    return TaskState.CREATED

write async

write(values, *, confirm=False, timeout=None)

Write one sample-per-channel to the task's output channels.

Safety gate (design doc §17):

  • Keys of values must match the display names of the task's output channels (AO and/or DO). Unknown or missing keys raise :class:NIDaqValidationError before any I/O.
  • For analog-output channels with safe_min / safe_max set, values outside the resolved clamp window raise :class:NIDaqValidationError. Never silently clamped.
  • If any target channel has requires_confirm=True and confirm is False, the call raises :class:NIDaqValidationError.

Parameters:

Name Type Description Default
values Mapping[str, float | bool]

One value per output channel keyed by display name.

required
confirm bool

Operator confirmation. Required (must be True) whenever any target channel sets requires_confirm.

False
timeout float | None

Per-call timeout in seconds. Falls back to the session-wide default.

None

Raises:

Type Description
NIDaqTaskStateError

The session is not started or is closed.

NIDaqValidationError

Safety-gate or shape rejection (see above).

NIDaqWriteError / NIDaqTimeoutError

Surfaced from the backend.

Source code in src/nidaqlib/tasks/session.py
async def write(
    self,
    values: Mapping[str, float | bool],
    *,
    confirm: bool = False,
    timeout: float | None = None,  # noqa: ASYNC109 — NI per-call timeout, not coroutine
) -> None:
    """Write one sample-per-channel to the task's output channels.

    Safety gate (design doc §17):

    - Keys of ``values`` must match the display names of the task's
      output channels (AO and/or DO). Unknown or missing keys raise
      :class:`NIDaqValidationError` before any I/O.
    - For analog-output channels with ``safe_min`` / ``safe_max`` set,
      values outside the resolved clamp window raise
      :class:`NIDaqValidationError`. **Never silently clamped.**
    - If any target channel has ``requires_confirm=True`` and
      ``confirm`` is ``False``, the call raises
      :class:`NIDaqValidationError`.

    Args:
        values: One value per output channel keyed by display name.
        confirm: Operator confirmation. Required (must be ``True``)
            whenever any target channel sets ``requires_confirm``.
        timeout: Per-call timeout in seconds. Falls back to the
            session-wide default.

    Raises:
        NIDaqTaskStateError: The session is not started or is closed.
        NIDaqValidationError: Safety-gate or shape rejection (see above).
        NIDaqWriteError / NIDaqTimeoutError: Surfaced from the backend.
    """
    # Late import — keeps the channel modules out of the session-import
    # graph for sessions that never write.
    from nidaqlib.channels.analog_output import AnalogOutputVoltage  # noqa: PLC0415
    from nidaqlib.channels.counter_output import (  # noqa: PLC0415
        CounterPulseFrequency,
        CounterPulseTicks,
        CounterPulseTime,
    )
    from nidaqlib.channels.digital_output import DigitalOutput  # noqa: PLC0415

    self._require_started("write")

    output_channels = [
        ch for ch in self._spec.channels if isinstance(ch, (AnalogOutputVoltage, DigitalOutput))
    ]
    if not output_channels:
        if any(
            isinstance(ch, (CounterPulseFrequency, CounterPulseTime, CounterPulseTicks))
            for ch in self._spec.channels
        ):
            raise NIDaqValidationError(
                "counter-output pulse trains are controlled by start()/stop(), not write(); "
                "start them with confirm=True",
                context=ErrorContext(task_name=self._spec.name, command_name="write"),
            )
        raise NIDaqValidationError(
            f"task {self._spec.name!r} has no output channels to write",
            context=ErrorContext(task_name=self._spec.name, command_name="write"),
        )
    has_ao = any(isinstance(ch, AnalogOutputVoltage) for ch in output_channels)
    has_do = any(isinstance(ch, DigitalOutput) for ch in output_channels)
    if has_ao and has_do:
        raise NIDaqValidationError(
            "write() does not support mixing analog-output and digital-output "
            "channels in one task",
            context=ErrorContext(task_name=self._spec.name, command_name="write"),
        )

    target_names = {ch.display_name for ch in output_channels}
    provided_names = set(values.keys())
    unknown = provided_names - target_names
    missing = target_names - provided_names
    if unknown or missing:
        raise NIDaqValidationError(
            f"write keys do not match task outputs (unknown={sorted(unknown)!r}, "
            f"missing={sorted(missing)!r})",
            context=ErrorContext(task_name=self._spec.name, command_name="write"),
        )

    needs_confirm = any(getattr(ch, "requires_confirm", False) for ch in output_channels)
    if needs_confirm and not confirm:
        raise NIDaqConfirmationRequiredError(
            f"task {self._spec.name!r}: write requires confirm=True (one or more "
            "channels are marked requires_confirm)",
            context=ErrorContext(task_name=self._spec.name, command_name="write"),
        )

    for ch in output_channels:
        value = values[ch.display_name]
        if isinstance(ch, AnalogOutputVoltage):
            lo = ch.effective_safe_min
            hi = ch.effective_safe_max
            fvalue = float(value)
            if fvalue < lo or fvalue > hi:
                raise NIDaqValidationError(
                    f"value {fvalue!r} for AO channel {ch.display_name!r} is outside "
                    f"safe range [{lo}, {hi}]",
                    context=ErrorContext(
                        task_name=self._spec.name,
                        channel_name=ch.display_name,
                        physical_channel=ch.physical_channel,
                        command_name="write",
                    ),
                )

    eff_timeout = timeout if timeout is not None else self._timeout
    async with self._lock:
        await run_sync(
            self._backend.write,
            self._task,
            dict(values),
            eff_timeout,
        )

DeviceInfo dataclass

DeviceInfo(
    *,
    name,
    product_type,
    serial_number,
    ai_physical_channels,
    ao_physical_channels,
    di_lines,
    do_lines,
    ci_physical_channels,
    co_physical_channels,
)

Snapshot of one NI device's identity and physical-channel inventory.

Cached on a :class:~nidaqlib.tasks.session.DaqSession at configure time and surfaced by :meth:DaqSession.snapshot. The :func:~nidaqlib.system.discovery.find_devices enumeration also wraps a populated :class:DeviceInfo inside each successful :class:DiscoveryResult.

DeviceResult dataclass

DeviceResult(value, error)

One per-task outcome from a manager group operation.

The mapping key carries the task name; this record carries only the outcome. Use :meth:success / :meth:failure to construct.

Attributes:

Name Type Description
value T | None

The operation's success value, or None on error.

error NIDaqError | None

The wrapped :class:NIDaqError, or None on success.

ok property

ok

True when the operation succeeded for this task.

failure staticmethod

failure(error)

Build a failed result carrying error.

Source code in src/nidaqlib/manager.py
@staticmethod
def failure(error: NIDaqError) -> DeviceResult[Any]:
    """Build a failed result carrying ``error``."""
    return DeviceResult(value=None, error=error)

success staticmethod

success(value)

Build a successful result carrying value.

Source code in src/nidaqlib/manager.py
@staticmethod
def success[U](value: U) -> DeviceResult[U]:
    """Build a successful result carrying ``value``."""
    return DeviceResult(value=value, error=None)

DeviceSnapshot dataclass

DeviceSnapshot(
    *,
    name,
    model,
    firmware,
    serial,
    connected,
    last_error,
    recoverable_error_count,
    captured_at,
)

Cross-instrument snapshot of one device's state, captured without I/O.

Shape-shared across sibling libraries (alicatlib, sartoriuslib, watlowlib). NI extras sit on :class:NIDaqSnapshot.

Attributes:

Name Type Description
name str

Device / task name.

model str | None

Hardware model string, when known.

firmware str | None

Firmware version, when known. Always None for NI.

serial str | None

Device serial number, when known.

connected bool

True when the underlying resource is reachable.

last_error ErrorContext | None

Most recent :class:ErrorContext, or None.

recoverable_error_count int

Running count of swallowed :class:NIDaqTransientError events.

captured_at datetime

UTC, tz-aware wall-clock at snapshot construction.

DigitalEdgeReferenceTrigger dataclass

DigitalEdgeReferenceTrigger(
    *, source, pretrigger_samples, edge=Edge.RISING
)

Bases: TriggerSpec

Reference trigger — capture pretrigger_samples before, the rest after.

Maps to task.triggers.reference_trigger.cfg_dig_edge_ref_trig. Only valid for finite acquisitions; NI rejects continuous + reference trigger combinations at configure time, and the wrapper does not second-guess that.

Attributes:

Name Type Description
pretrigger_samples int

Number of samples per channel to retain from before the edge fires. Must be > 0.

edge Edge

Active edge of the trigger. Rising by default.

__post_init__

__post_init__()

Reject zero / negative pretrigger windows up-front.

Raises:

Type Description
NIDaqValidationError

pretrigger_samples is not positive.

Source code in src/nidaqlib/tasks/triggers.py
def __post_init__(self) -> None:
    """Reject zero / negative pretrigger windows up-front.

    Raises:
        NIDaqValidationError: ``pretrigger_samples`` is not positive.
    """
    if self.pretrigger_samples <= 0:
        raise NIDaqValidationError(
            f"pretrigger_samples must be > 0, got {self.pretrigger_samples}"
        )

from_dict classmethod

from_dict(data)

Deserialise, restoring :class:Edge from its string value.

Source code in src/nidaqlib/tasks/triggers.py
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> Self:
    """Deserialise, restoring :class:`Edge` from its string value."""
    kind = data.get("kind")
    if kind != cls.kind:
        raise NIDaqValidationError(f"kind mismatch: expected {cls.kind!r}, got {kind!r}")
    try:
        edge = Edge(data.get("edge", Edge.RISING.value))
    except ValueError as exc:
        raise NIDaqValidationError(f"unknown Edge {data.get('edge')!r}") from exc
    return cls(
        source=str(data["source"]),
        pretrigger_samples=int(data["pretrigger_samples"]),
        edge=edge,
    )

to_dict

to_dict()

Serialise; encode :class:Edge to its string value.

Source code in src/nidaqlib/tasks/triggers.py
def to_dict(self) -> dict[str, Any]:
    """Serialise; encode :class:`Edge` to its string value."""
    return {
        "kind": self.kind,
        "source": self.source,
        "pretrigger_samples": self.pretrigger_samples,
        "edge": self.edge.value,
    }

DigitalEdgeStartTrigger dataclass

DigitalEdgeStartTrigger(*, source, edge=Edge.RISING)

Bases: TriggerSpec

Start the task on a digital edge from source.

Maps to task.triggers.start_trigger.cfg_dig_edge_start_trig. After :meth:DaqSession.start returns, the task is armed — the first sample is acquired only after NI sees the configured edge on source.

Attributes:

Name Type Description
edge Edge

Active edge of the trigger. Rising by default.

from_dict classmethod

from_dict(data)

Deserialise, restoring :class:Edge from its string value.

Source code in src/nidaqlib/tasks/triggers.py
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> Self:
    """Deserialise, restoring :class:`Edge` from its string value."""
    kind = data.get("kind")
    if kind != cls.kind:
        raise NIDaqValidationError(f"kind mismatch: expected {cls.kind!r}, got {kind!r}")
    try:
        edge = Edge(data.get("edge", Edge.RISING.value))
    except ValueError as exc:
        raise NIDaqValidationError(f"unknown Edge {data.get('edge')!r}") from exc
    return cls(source=str(data["source"]), edge=edge)

to_dict

to_dict()

Serialise enums to .value so the payload is JSON-encodable.

Source code in src/nidaqlib/tasks/triggers.py
def to_dict(self) -> dict[str, Any]:
    """Serialise enums to ``.value`` so the payload is JSON-encodable."""
    return {
        "kind": self.kind,
        "source": self.source,
        "edge": self.edge.value,
    }

DigitalInput dataclass

DigitalInput(
    *,
    physical_channel,
    name=None,
    unit=None,
    metadata=_empty_metadata(),
    line_grouping_per_line=True,
)

Bases: ChannelSpec

Digital-input line or port.

Maps to Task.di_channels.add_di_chan on the NI side. physical_channel accepts NI's line / port grammar (Dev1/port0/line0, Dev1/port0:7, ...).

Attributes:

Name Type Description
line_grouping_per_line bool

When True, the backend treats each line as its own channel (NI LineGrouping.CHAN_PER_LINE). Defaults to True so multi-line specs round-trip cleanly into per-line reads. Set to False for one-channel-for-all-lines (CHAN_FOR_ALL_LINES).

DigitalOutput dataclass

DigitalOutput(
    *,
    physical_channel,
    name=None,
    unit=None,
    metadata=_empty_metadata(),
    requires_confirm=True,
    line_grouping_per_line=True,
)

Bases: ChannelSpec

Digital-output line or port.

Maps to Task.do_channels.add_do_chan on the NI side. Writes are gated through :meth:DaqSession.write, which requires confirm=True whenever any target channel sets requires_confirm (design doc §17.1).

Attributes:

Name Type Description
requires_confirm bool

When True, every :meth:DaqSession.write targeting this channel must pass confirm=True. Defaults to True — digital outputs are assumed to drive a real-world actuator unless the spec explicitly opts out.

line_grouping_per_line bool

When True, the backend treats each line as its own channel. Same semantics as :class:DigitalInput.line_grouping_per_line.

DiscoveryResult dataclass

DiscoveryResult(
    *,
    ok,
    port,
    address=None,
    baudrate=None,
    protocol=None,
    device_info=None,
    error=None,
    elapsed_s=0.0,
)

One enumeration row from :func:find_devices.

Shape-shared across sibling libraries (alicatlib, sartoriuslib, watlowlib) so cross-instrument tooling can join on a common discovery record.

Attributes:

Name Type Description
ok bool

True when the row describes a healthy device. False is reserved for enumeration-level driver failures, which surface as a single ok=False row carrying error.

port str

NI device name (Dev1, cDAQ1Mod3). Empty string only when ok=False and the driver call failed before any name could be read.

address str | int | None

Always None for NI (no multi-drop address concept).

baudrate int | None

Always None for NI (no serial line).

protocol ProtocolKind | None

Always None for NI (no wire protocol).

device_info DeviceInfo | None

Populated only when ok=True. None on error rows.

error NIDaqError | None

Populated only when ok=False. None on healthy rows.

elapsed_s float

Wall-clock seconds spent enumerating this entry.

Edge

Bases: StrEnum

Active edge for the sample clock or a trigger.

Mirrors nidaqmx.constants.Edge.

ErrorContext dataclass

ErrorContext(
    port=None,
    address=None,
    command_name=None,
    protocol=None,
    task_name=None,
    channel_name=None,
    physical_channel=None,
    ni_error_code=None,
    extra=_empty_extra(),
)

Structured context attached to every :class:NIDaqError.

Base fields are shared across the sibling libraries so cross-instrument log readers can join exceptions on a common shape. NI extras (task_name, physical_channel, ni_error_code) sit alongside.

extra accepts any Mapping and is always frozen into a read-only :class:types.MappingProxyType at construction so the shared empty sentinel can never be mutated through error.context.extra[k] = v.

Base fields (shape-shared with sibling libs): port: NI device name (Dev1, cDAQ1Mod3), or None. address: Always None for NI (no multi-drop address concept). command_name: Logical operation name ("read", "start", "configure_timing", ...). The unified name; sibling libs also call this command_name. protocol: Always None for NI (no wire protocol). extra: Free-form additional context.

NI extras

task_name: TaskSpec.name of the task at fault. channel_name: Display name of the at-fault channel (optional). physical_channel: NI physical-channel string (e.g. Dev1/ai0). ni_error_code: NI DAQmx error code, when known.

merged

merged(**updates)

Return a new context with updates overlaid. Unknown keys go to extra.

Source code in src/nidaqlib/errors.py
def merged(self, **updates: Any) -> Self:
    """Return a new context with ``updates`` overlaid. Unknown keys go to ``extra``."""
    known: dict[str, Any] = {}
    extra_updates: dict[str, Any] = {}
    for key, value in updates.items():
        if key in _CONTEXT_KNOWN_FIELDS:
            known[key] = value
        else:
            extra_updates[key] = value

    new_extra: Mapping[str, Any] = (
        MappingProxyType({**self.extra, **extra_updates}) if extra_updates else self.extra
    )
    return replace(self, **known, extra=new_extra)

ErrorPolicy

Bases: StrEnum

How recorders react to wrapped NI errors during a read.

RAISE class-attribute instance-attribute

RAISE = 'raise'

Cancel the recorder's task group and re-raise the error.

RETURN class-attribute instance-attribute

RETURN = 'return'

Emit a :class:DaqBlock (or :class:DaqReading) with .error set, then continue.

The recorder MUST advance timing counters (block_index / first_sample_index / t_mono_ns) on error records so consumers can detect dropped intervals. Consumers MUST gate on error is None before reading data.

NIDaqBackendError

NIDaqBackendError(message='', *, context=None)

Bases: NIDaqError

The backend rejected an operation or surfaced a generic NI failure.

Used when the failure is not a clean fit for the more specific subclasses (read, timeout, validation, state). Wraps :class:nidaqmx.errors.DaqError via __cause__.

Source code in src/nidaqlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    """Initialise with a human-readable message and optional context.

    Args:
        message: Short, human-readable summary suitable for logs.
        context: Structured fields about the failing operation. ``None``
            yields an empty :class:`ErrorContext`.
    """
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

NIDaqConfigurationError

NIDaqConfigurationError(message='', *, context=None)

Bases: NIDaqError

Configuration-level error (bad spec, missing required field, ...).

Source code in src/nidaqlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    """Initialise with a human-readable message and optional context.

    Args:
        message: Short, human-readable summary suitable for logs.
        context: Structured fields about the failing operation. ``None``
            yields an empty :class:`ErrorContext`.
    """
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

NIDaqConfirmationRequiredError

NIDaqConfirmationRequiredError(message='', *, context=None)

Bases: NIDaqConfigurationError

A safety-gated start was attempted without confirm=True.

Raised by :func:open_device and :meth:DaqManager.start when a task that drives hardware (counter pulse outputs, analog outputs) is started without the explicit confirm=True opt-in. See design §5.10 (safe-start gate) and the ecosystem ConfirmationRequiredError convention shared with :mod:watlowlib and :mod:sartoriuslib.

Source code in src/nidaqlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    """Initialise with a human-readable message and optional context.

    Args:
        message: Short, human-readable summary suitable for logs.
        context: Structured fields about the failing operation. ``None``
            yields an empty :class:`ErrorContext`.
    """
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

NIDaqConnectionError

NIDaqConnectionError(message='', *, context=None)

Bases: NIDaqError

Communication with the NI backend was lost or could not be established.

Aligns with the ecosystem ConnectionError convention (matching :class:watlowlib.WatlowConnectionError, :class:alicatlib.AlicatConnectionError, :class:sartoriuslib.SartoriusConnectionError). NI's backend rarely distinguishes "connection lost" from generic backend errors at the driver layer; this class is the family seam for those that do.

Source code in src/nidaqlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    """Initialise with a human-readable message and optional context.

    Args:
        message: Short, human-readable summary suitable for logs.
        context: Structured fields about the failing operation. ``None``
            yields an empty :class:`ErrorContext`.
    """
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

NIDaqDependencyError

NIDaqDependencyError(message='', *, context=None)

Bases: NIDaqError

A required dependency (driver, optional extra) is unavailable.

Source code in src/nidaqlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    """Initialise with a human-readable message and optional context.

    Args:
        message: Short, human-readable summary suitable for logs.
        context: Structured fields about the failing operation. ``None``
            yields an empty :class:`ErrorContext`.
    """
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

NIDaqDiscoveryResult dataclass

NIDaqDiscoveryResult(
    *,
    ok,
    port,
    address=None,
    baudrate=None,
    protocol=None,
    device_info=None,
    error=None,
    elapsed_s=0.0,
    product_type=None,
    serial_number=None,
    chassis=None,
    physical_module=None,
)

Bases: DiscoveryResult

NI-specific discovery row carrying product / chassis identity.

Subclasses :class:DiscoveryResult without renaming any base fields. The NI extras (product_type, serial_number, chassis, physical_module) sit alongside the shape-shared base.

NIDaqError

NIDaqError(message='', *, context=None)

Bases: Exception

Base class for every :mod:nidaqlib exception.

Carries a typed :class:ErrorContext. The message is the human-readable summary; the context is the machine-readable detail.

Initialise with a human-readable message and optional context.

Parameters:

Name Type Description Default
message str

Short, human-readable summary suitable for logs.

''
context ErrorContext | None

Structured fields about the failing operation. None yields an empty :class:ErrorContext.

None
Source code in src/nidaqlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    """Initialise with a human-readable message and optional context.

    Args:
        message: Short, human-readable summary suitable for logs.
        context: Structured fields about the failing operation. ``None``
            yields an empty :class:`ErrorContext`.
    """
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

with_context

with_context(**updates)

Return a copy of this error with its context updated.

Useful when an inner layer raises and an outer layer wants to enrich the context (for instance adding task_name or operation).

Source code in src/nidaqlib/errors.py
def with_context(self, **updates: Any) -> Self:
    """Return a copy of this error with its context updated.

    Useful when an inner layer raises and an outer layer wants to enrich
    the context (for instance adding ``task_name`` or ``operation``).
    """
    cls = type(self)
    new = cls.__new__(cls)
    new.args = self.args
    try:
        new.__dict__.update(self.__dict__)
    except AttributeError:  # pragma: no cover — no slotted subclass today
        for slot in getattr(cls, "__slots__", ()):
            if hasattr(self, slot):
                object.__setattr__(new, slot, getattr(self, slot))
    new.context = self.context.merged(**updates)
    new.__cause__ = self.__cause__
    new.__context__ = self.__context__
    new.__traceback__ = self.__traceback__
    return new

NIDaqReadError

NIDaqReadError(message='', *, context=None)

Bases: NIDaqError

A read against the underlying NI task failed.

Source code in src/nidaqlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    """Initialise with a human-readable message and optional context.

    Args:
        message: Short, human-readable summary suitable for logs.
        context: Structured fields about the failing operation. ``None``
            yields an empty :class:`ErrorContext`.
    """
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

NIDaqResourceError

NIDaqResourceError(message='', *, context=None)

Bases: NIDaqError

A physical-channel conflict was detected by the manager preflight.

Best-effort signal — NI is the final authority. Raised by :meth:DaqManager.add when the new task's channels overlap with one already managed; ErrorContext.extra carries the conflicting task names under "conflicts".

Source code in src/nidaqlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    """Initialise with a human-readable message and optional context.

    Args:
        message: Short, human-readable summary suitable for logs.
        context: Structured fields about the failing operation. ``None``
            yields an empty :class:`ErrorContext`.
    """
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

NIDaqSinkDependencyError

NIDaqSinkDependencyError(message='', *, context=None)

Bases: NIDaqSinkError

A sink's optional dependency (pyarrow, asyncpg, ...) is missing.

Source code in src/nidaqlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    """Initialise with a human-readable message and optional context.

    Args:
        message: Short, human-readable summary suitable for logs.
        context: Structured fields about the failing operation. ``None``
            yields an empty :class:`ErrorContext`.
    """
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

NIDaqSinkError

NIDaqSinkError(message='', *, context=None)

Bases: NIDaqError

Base class for sink-layer failures.

Source code in src/nidaqlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    """Initialise with a human-readable message and optional context.

    Args:
        message: Short, human-readable summary suitable for logs.
        context: Structured fields about the failing operation. ``None``
            yields an empty :class:`ErrorContext`.
    """
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

NIDaqSinkSchemaError

NIDaqSinkSchemaError(message='', *, context=None)

Bases: NIDaqSinkError

A sink rejected an input record's shape.

Most commonly raised by row-oriented sinks (CsvSink, JsonlSink) when handed a :class:~nidaqlib.tasks.DaqBlock without accept_blocks=True — silently scalarising would surprise users with 1-GB CSV files at 10 kHz × 8 channels (design doc §14.1).

Source code in src/nidaqlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    """Initialise with a human-readable message and optional context.

    Args:
        message: Short, human-readable summary suitable for logs.
        context: Structured fields about the failing operation. ``None``
            yields an empty :class:`ErrorContext`.
    """
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

NIDaqSinkWriteError

NIDaqSinkWriteError(message='', *, context=None)

Bases: NIDaqSinkError

A sink failed while writing a batch (file I/O, DB error, ...).

Source code in src/nidaqlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    """Initialise with a human-readable message and optional context.

    Args:
        message: Short, human-readable summary suitable for logs.
        context: Structured fields about the failing operation. ``None``
            yields an empty :class:`ErrorContext`.
    """
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

NIDaqSnapshot dataclass

NIDaqSnapshot(
    *,
    name,
    model,
    firmware,
    serial,
    connected,
    last_error,
    recoverable_error_count,
    captured_at,
    task_name,
    task_state,
    channel_count,
    timing_mode,
    rate_hz,
    physical_channels,
    product_type,
    chassis,
    physical_module,
)

Bases: DeviceSnapshot

NI-specific snapshot — adds task state and physical inventory.

NIDaqTaskStateError

NIDaqTaskStateError(message='', *, context=None)

Bases: NIDaqError

Operation invalid for the task's current lifecycle state.

Raised, for example, by :meth:DaqSession.poll when the task is buffered and started — two consumers on the same NI buffer would race.

Source code in src/nidaqlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    """Initialise with a human-readable message and optional context.

    Args:
        message: Short, human-readable summary suitable for logs.
        context: Structured fields about the failing operation. ``None``
            yields an empty :class:`ErrorContext`.
    """
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

NIDaqTimeoutError

NIDaqTimeoutError(message='', *, context=None)

Bases: NIDaqError

An NI read or write exceeded its configured timeout.

Distinct from :class:NIDaqTransientError: this is a hard timeout that means the operation gave up. Transient errors mean "retry safe."

Source code in src/nidaqlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    """Initialise with a human-readable message and optional context.

    Args:
        message: Short, human-readable summary suitable for logs.
        context: Structured fields about the failing operation. ``None``
            yields an empty :class:`ErrorContext`.
    """
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

NIDaqTransientError

NIDaqTransientError(message='', *, context=None)

Bases: NIDaqError

A driver-layer error that is safe to retry without rebuilding the task.

Surfaced by the backend when an NI DAQmx call fails with a code in the documented "retry-safe" set (see :data:nidaqlib.backend.nidaqmx_backend._TRANSIENT_NI_CODES). Common examples: buffer-overrun under ErrorPolicy.RETURN and the "samples still arriving" code that NI returns when a read window slid just ahead of the producer.

Source code in src/nidaqlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    """Initialise with a human-readable message and optional context.

    Args:
        message: Short, human-readable summary suitable for logs.
        context: Structured fields about the failing operation. ``None``
            yields an empty :class:`ErrorContext`.
    """
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

NIDaqValidationError

NIDaqValidationError(message='', *, context=None)

Bases: NIDaqConfigurationError

Request validation failed before any I/O.

Source code in src/nidaqlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    """Initialise with a human-readable message and optional context.

    Args:
        message: Short, human-readable summary suitable for logs.
        context: Structured fields about the failing operation. ``None``
            yields an empty :class:`ErrorContext`.
    """
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

NIDaqWriteError

NIDaqWriteError(message='', *, context=None)

Bases: NIDaqError

A write against the underlying NI task failed.

Raised by :meth:DaqSession.write when the backend rejects the write. Out-of-range values fail earlier as :class:NIDaqValidationError.

Source code in src/nidaqlib/errors.py
def __init__(self, message: str = "", *, context: ErrorContext | None = None) -> None:
    """Initialise with a human-readable message and optional context.

    Args:
        message: Short, human-readable summary suitable for logs.
        context: Structured fields about the failing operation. ``None``
            yields an empty :class:`ErrorContext`.
    """
    super().__init__(message)
    self.context = context if context is not None else _EMPTY_CONTEXT

NidaqConfig dataclass

NidaqConfig(
    *,
    default_timeout_s=10.0,
    default_sample_rate_hz=1000.0,
    default_buffer_size=16,
    default_chunk_size=1000,
    eager_tasks=False,
)

Process-wide default settings.

Anything that varies per task (channel ranges, trigger source, TDMS path) belongs on :class:~nidaqlib.tasks.TaskSpec, not here.

Attributes:

Name Type Description
default_timeout_s float

Fallback NI read/write timeout, in seconds. Used when the call site does not supply one explicitly.

default_sample_rate_hz float

Fallback Timing.rate_hz when the :class:~nidaqlib.tasks.Timing field is unset.

default_buffer_size int

AnyIO send-stream capacity for record(), measured in :class:~nidaqlib.tasks.DaqBlock slots.

default_chunk_size int

Samples per channel per emitted DaqBlock for record().

eager_tasks bool

Opt-in to asyncio.eager_task_factory. No-op on trio. See :func:nidaqlib._runtime.install_eager_task_factory.

replace

replace(**updates)

Return a copy of this config with updates applied.

Source code in src/nidaqlib/config.py
def replace(self, **updates: Any) -> Self:
    """Return a copy of this config with ``updates`` applied."""
    return replace(self, **updates)

OverflowPolicy

Bases: StrEnum

Behaviour when the recorder's outbound stream is full.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Producer awaits consumer. Risks NI buffer overrun on hardware-clocked tasks.

DROP_NEWEST class-attribute instance-attribute

DROP_NEWEST = 'drop_newest'

Drop the about-to-be-enqueued block. Bounds consumer latency; loses freshest data.

DROP_OLDEST class-attribute instance-attribute

DROP_OLDEST = 'drop_oldest'

Drop the oldest queued block. Keeps newest data; loses older queued blocks.

PollSource

Bases: Protocol

Anything that yields per-name :class:DeviceResult[DaqReading] per call.

Same name across all four sibling libraries. The recorder layer's :func:record_polled accepts any :class:PollSource instance, decoupling the polled producer from the concrete session/manager types.

poll async

poll(names=None)

Read once across the named resources (or all, when names is None).

Source code in src/nidaqlib/streaming/poll_source.py
async def poll(
    self,
    names: Iterable[str] | None = None,
) -> Mapping[str, DeviceResult[DaqReading]]:
    """Read once across the named resources (or all, when ``names`` is ``None``)."""
    ...

PollSourceAdapter

PollSourceAdapter(session)

Wrap a polled :class:DaqSession as a :class:PollSource.

Multi-channel by design: one DAQ task covers many channels, so the returned mapping has exactly one entry — keyed by the task name — carrying a multi-channel :class:DaqReading. Individual channels stay inside reading.values.

Example::

adapter = PollSourceAdapter(session)
async with record_polled(adapter, rate_hz=2.0) as recording:
    async for results in recording.stream:
        reading = results[session.spec.name].value
        ...

Parameters:

Name Type Description Default
session DaqSession

A started :class:DaqSession whose timing is None or :attr:AcquisitionMode.ON_DEMAND. The same constraint as :meth:DaqSession.poll.

required
Source code in src/nidaqlib/streaming/poll_source.py
def __init__(self, session: DaqSession) -> None:
    self._session = session

name property

name

Mapping key the adapter will use for emitted results.

poll async

poll(names=None)

Read one :class:DaqReading and wrap it as {name: DeviceResult.success(reading)}.

names is accepted for Protocol uniformity. When provided, the adapter only emits a row if the session's name appears in names — otherwise the returned mapping is empty.

Source code in src/nidaqlib/streaming/poll_source.py
async def poll(
    self,
    names: Iterable[str] | None = None,
) -> Mapping[str, DeviceResult[DaqReading]]:
    """Read one :class:`DaqReading` and wrap it as ``{name: DeviceResult.success(reading)}``.

    ``names`` is accepted for Protocol uniformity. When provided, the
    adapter only emits a row if the session's name appears in
    ``names`` — otherwise the returned mapping is empty.
    """
    key = self._session.spec.name
    if names is not None and key not in set(names):
        return {}
    from nidaqlib.errors import NIDaqError  # noqa: PLC0415 — late to dodge cycles
    from nidaqlib.manager import DeviceResult  # noqa: PLC0415

    try:
        reading = await self._session.poll()
    except NIDaqError as exc:
        return {key: DeviceResult.failure(exc)}
    return {key: DeviceResult.success(reading)}

ProtocolKind

Bases: StrEnum

Wire protocol kind for cross-library :class:ErrorContext symmetry.

Has no members because NI DAQmx is not a wire-protocol library — every NI error context carries protocol=None. The type exists for shape parity with sibling libs (alicatlib, sartoriuslib, watlowlib) whose ProtocolKind enums name real serial / MODBUS / RS-485 variants.

Recording dataclass

Recording(stream, summary, rate_hz)

Active-recording handle returned by :func:record / :func:record_polled.

Attributes:

Name Type Description
stream AsyncIterator[T]

Async iterator of payloads. Closes when the recorder context manager exits.

summary AcquisitionSummary

Mutable :class:AcquisitionSummary updated in place during the run. summary.finished_at is set on context exit.

rate_hz float | None

Configured cadence of the active recording. None for on-demand mode.

RunMetadata dataclass

RunMetadata(
    *,
    run_id,
    started_at,
    nidaqlib_version=(lambda: __version__)(),
    nidaqmx_python_version=_detect_nidaqmx_version(),
    ni_driver_version=_detect_ni_driver_version(),
    python_version=(lambda: sys.version.split()[0])(),
    platform=platform.platform(),
    task_specs=_empty_task_specs(),
    user_metadata=_empty_user_metadata(),
)

Provenance bundle for one acquisition run (design doc §18.2).

Attributes:

Name Type Description
run_id str

Caller-chosen identifier for the run (e.g. UUID, ISO timestamp, experiment name). Must be unique within the caller's storage scheme — :mod:nidaqlib does not enforce uniqueness.

started_at datetime

Wall-clock timestamp at which the run began. UTC.

nidaqlib_version str

Version of this package.

nidaqmx_python_version str

Version of the nidaqmx-python binding.

ni_driver_version str | None

NI-DAQmx driver version, or None if the driver is not installed (e.g., CI environment).

python_version str

Runtime Python version string.

platform str

Platform string from :func:platform.platform.

task_specs Mapping[str, TaskSpec]

One :class:TaskSpec per task in the run, keyed by the manager-add name.

user_metadata Mapping[str, object]

Free-form mapping the operator wants persisted alongside the run (git commit, sample ID, recipe name, ...). Values must be JSON-serialisable.

for_run classmethod

for_run(
    run_id,
    *,
    task_specs=None,
    user_metadata=None,
    started_at=None,
)

Construct a :class:RunMetadata with auto-detected versions.

Convenience wrapper around the dataclass constructor that supplies the version / platform / timestamp defaults so callers only need to pass the run-specific fields.

Source code in src/nidaqlib/tasks/metadata.py
@classmethod
def for_run(
    cls,
    run_id: str,
    *,
    task_specs: Mapping[str, TaskSpec] | None = None,
    user_metadata: Mapping[str, object] | None = None,
    started_at: datetime | None = None,
) -> Self:
    """Construct a :class:`RunMetadata` with auto-detected versions.

    Convenience wrapper around the dataclass constructor that supplies
    the version / platform / timestamp defaults so callers only need to
    pass the run-specific fields.
    """
    return cls(
        run_id=run_id,
        started_at=started_at if started_at is not None else datetime.now(UTC),
        task_specs=dict(task_specs) if task_specs is not None else {},
        user_metadata=dict(user_metadata) if user_metadata is not None else {},
    )

from_dict classmethod

from_dict(data)

Deserialise from a dict produced by :meth:to_dict.

Raises:

Type Description
NIDaqValidationError

A required field is missing or malformed.

Source code in src/nidaqlib/tasks/metadata.py
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> Self:
    """Deserialise from a dict produced by :meth:`to_dict`.

    Raises:
        NIDaqValidationError: A required field is missing or malformed.
    """
    required = ("run_id", "started_at")
    for key in required:
        if key not in data:
            raise NIDaqValidationError(f"RunMetadata missing required field {key!r}")
    try:
        started_at = datetime.fromisoformat(str(data["started_at"]))
    except ValueError as exc:
        raise NIDaqValidationError(
            f"RunMetadata.started_at must be ISO-8601, got {data['started_at']!r}"
        ) from exc
    raw_specs = data.get("task_specs", {})
    if not isinstance(raw_specs, Mapping):
        raise NIDaqValidationError(
            f"RunMetadata.task_specs must be a mapping, got {type(raw_specs).__name__}"
        )
    task_specs: dict[str, TaskSpec] = {}
    for name, payload in raw_specs.items():  # pyright: ignore[reportUnknownVariableType]
        if not isinstance(payload, Mapping):
            raise NIDaqValidationError(f"RunMetadata.task_specs[{name!r}] must be a mapping")
        task_specs[str(name)] = TaskSpec.from_dict(payload)  # pyright: ignore[reportUnknownArgumentType]
    user_metadata_raw = data.get("user_metadata", {})
    if not isinstance(user_metadata_raw, Mapping):
        raise NIDaqValidationError(
            f"RunMetadata.user_metadata must be a mapping, got "
            f"{type(user_metadata_raw).__name__}"
        )
    return cls(
        run_id=str(data["run_id"]),
        started_at=started_at,
        nidaqlib_version=str(data.get("nidaqlib_version", __version__)),
        nidaqmx_python_version=str(data.get("nidaqmx_python_version", "unknown")),
        ni_driver_version=(
            str(data["ni_driver_version"])
            if data.get("ni_driver_version") is not None
            else None
        ),
        python_version=str(data.get("python_version", sys.version.split()[0])),
        platform=str(data.get("platform", platform.platform())),
        task_specs=task_specs,
        user_metadata=dict(user_metadata_raw),  # pyright: ignore[reportUnknownArgumentType]
    )

replace

replace(**updates)

Return a copy of this metadata with updates applied.

Source code in src/nidaqlib/tasks/metadata.py
def replace(self, **updates: Any) -> Self:
    """Return a copy of this metadata with ``updates`` applied."""
    return dataclasses.replace(self, **updates)

to_dict

to_dict()

Serialise to a JSON-friendly dict.

task_specs round-trips through :meth:TaskSpec.to_dict, which in turn dispatches each channel and trigger by kind.

Source code in src/nidaqlib/tasks/metadata.py
def to_dict(self) -> dict[str, Any]:
    """Serialise to a JSON-friendly dict.

    ``task_specs`` round-trips through :meth:`TaskSpec.to_dict`, which
    in turn dispatches each channel and trigger by ``kind``.
    """
    return {
        "run_id": self.run_id,
        "started_at": self.started_at.isoformat(),
        "nidaqlib_version": self.nidaqlib_version,
        "nidaqmx_python_version": self.nidaqmx_python_version,
        "ni_driver_version": self.ni_driver_version,
        "python_version": self.python_version,
        "platform": self.platform,
        "task_specs": {name: spec.to_dict() for name, spec in self.task_specs.items()},
        "user_metadata": dict(self.user_metadata),
    }

TaskBuilder

TaskBuilder(name)

Fluent builder for :class:TaskSpec.

Example

spec = ( ... TaskBuilder("ai_demo") ... .add_channel(AnalogInputVoltage(physical_channel="Dev1/ai0")) ... .with_timing(Timing(rate_hz=1000.0)) ... .build() ... )

Create a builder for a task named name.

Parameters:

Name Type Description Default
name str

Task name. Will become :attr:TaskSpec.name.

required
Source code in src/nidaqlib/tasks/builder.py
def __init__(self, name: str) -> None:
    """Create a builder for a task named ``name``.

    Args:
        name: Task name. Will become :attr:`TaskSpec.name`.
    """
    self._name = name
    self._channels: list[ChannelSpec] = []
    self._timing: Timing | None = None
    self._metadata: dict[str, str | int | float | bool] = {}

add_channel

add_channel(channel)

Append a channel to the task. Returns self for chaining.

Source code in src/nidaqlib/tasks/builder.py
def add_channel(self, channel: ChannelSpec) -> Self:
    """Append a channel to the task. Returns self for chaining."""
    self._channels.append(channel)
    return self

build

build()

Construct the immutable :class:TaskSpec.

Source code in src/nidaqlib/tasks/builder.py
def build(self) -> TaskSpec:
    """Construct the immutable :class:`TaskSpec`."""
    return TaskSpec(
        name=self._name,
        channels=tuple(self._channels),
        timing=self._timing,
        metadata=dict(self._metadata),
    )

with_metadata

with_metadata(metadata)

Merge metadata into the builder's metadata dict.

Returns self for chaining. Later calls overwrite earlier keys.

Source code in src/nidaqlib/tasks/builder.py
def with_metadata(self, metadata: Mapping[str, str | int | float | bool]) -> Self:
    """Merge ``metadata`` into the builder's metadata dict.

    Returns self for chaining. Later calls overwrite earlier keys.
    """
    self._metadata.update(metadata)
    return self

with_timing

with_timing(timing)

Set the task's :class:Timing. Returns self for chaining.

Source code in src/nidaqlib/tasks/builder.py
def with_timing(self, timing: Timing) -> Self:
    """Set the task's :class:`Timing`. Returns self for chaining."""
    self._timing = timing
    return self

TaskSpec dataclass

TaskSpec(
    *,
    name,
    channels,
    timing=None,
    trigger=None,
    logging=None,
    metadata=_empty_metadata(),
)

Declarative description of one NI task.

Attributes:

Name Type Description
name str

Task name. Must be unique within an :class:~nidaqlib.DaqManager and labels :class:DaqReading / :class:DaqBlock rows.

channels Sequence[ChannelSpec]

One or more :class:~nidaqlib.channels.ChannelSpec instances. Order is preserved and is the source of truth for DaqBlock.channels row ordering.

timing Timing | None

Optional :class:Timing. None means on-demand / software-polled.

trigger TriggerSpec | None

Optional :class:~nidaqlib.tasks.triggers.TriggerSpec. None means "start as soon as :meth:DaqSession.start returns" (NI's default).

logging TdmsLogging | None

Optional :class:TdmsLogging for driver-side TDMS. None disables TDMS (the default).

metadata Mapping[str, str | int | float | bool]

Free-form scalar metadata propagated into emitted records.

__post_init__

__post_init__()

Validate the channel list shape (the cheap, always-true invariants).

Raises:

Type Description
NIDaqValidationError

channels is empty or contains a non-:class:ChannelSpec element.

Source code in src/nidaqlib/tasks/spec.py
def __post_init__(self) -> None:
    """Validate the channel list shape (the cheap, always-true invariants).

    Raises:
        NIDaqValidationError: ``channels`` is empty or contains a
            non-:class:`ChannelSpec` element.
    """
    if len(self.channels) == 0:
        raise NIDaqValidationError(f"TaskSpec {self.name!r}: at least one channel is required")
    if not self.name:
        raise NIDaqValidationError("TaskSpec.name must be a non-empty string")
    channels = tuple(self.channels)
    object.__setattr__(self, "channels", channels)
    for ch in self.channels:
        if not isinstance(ch, ChannelSpec):  # pyright: ignore[reportUnnecessaryIsInstance]
            raise NIDaqValidationError(
                f"TaskSpec {self.name!r}: channels must be ChannelSpec instances, "
                f"got {type(ch).__name__}"
            )
    names = [ch.display_name for ch in self.channels]
    duplicates = sorted({name for name in names if names.count(name) > 1})
    if duplicates:
        raise NIDaqValidationError(
            f"TaskSpec {self.name!r}: duplicate channel display names {duplicates!r}"
        )
    object.__setattr__(self, "metadata", MappingProxyType(dict(self.metadata)))

from_dict classmethod

from_dict(data)

Deserialise from a dict produced by :meth:to_dict.

Parameters:

Name Type Description Default
data Mapping[str, Any]

Mapping carrying the task-spec fields.

required

Raises:

Type Description
NIDaqValidationError

A channel or trigger entry has an unknown kind, or required structural fields are malformed.

Source code in src/nidaqlib/tasks/spec.py
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> Self:
    """Deserialise from a dict produced by :meth:`to_dict`.

    Args:
        data: Mapping carrying the task-spec fields.

    Raises:
        NIDaqValidationError: A channel or trigger entry has an unknown
            ``kind``, or required structural fields are malformed.
    """
    from nidaqlib.tasks.triggers import TriggerSpec  # noqa: PLC0415

    timing_payload = data.get("timing")
    timing = Timing.from_dict(timing_payload) if timing_payload is not None else None
    trigger_payload = data.get("trigger")
    if trigger_payload is None:
        trigger = None
    elif isinstance(trigger_payload, Mapping):
        trigger = TriggerSpec.from_dict(trigger_payload)  # pyright: ignore[reportUnknownArgumentType]
    else:
        raise NIDaqValidationError(
            f"TaskSpec.trigger must be a mapping or null, got {type(trigger_payload).__name__}"
        )
    logging_payload = data.get("logging")
    logging = TdmsLogging.from_dict(logging_payload) if logging_payload is not None else None
    raw_channels: object = data.get("channels", [])
    if not isinstance(raw_channels, list):
        raise NIDaqValidationError(
            f"TaskSpec.channels must be a list, got {type(raw_channels).__name__}"
        )
    channels: list[ChannelSpec] = []
    for ch in raw_channels:  # pyright: ignore[reportUnknownVariableType]
        if not isinstance(ch, Mapping):
            raise NIDaqValidationError(
                f"TaskSpec.channels[*] must be a mapping, got {type(ch).__name__}"  # pyright: ignore[reportUnknownArgumentType]
            )
        channels.append(ChannelSpec.from_dict(ch))  # pyright: ignore[reportUnknownArgumentType]
    metadata_raw: object = data.get("metadata", {})
    if not isinstance(metadata_raw, Mapping):
        raise NIDaqValidationError(
            f"TaskSpec.metadata must be a mapping, got {type(metadata_raw).__name__}"
        )
    return cls(
        name=str(data["name"]),
        channels=channels,
        timing=timing,
        trigger=trigger,
        logging=logging,
        metadata=dict(metadata_raw),  # pyright: ignore[reportUnknownArgumentType]
    )

replace

replace(**updates)

Return a copy of this spec with updates applied.

Mirrors dataclasses.replace but is exposed as a method for consistency with the rest of the API.

Source code in src/nidaqlib/tasks/spec.py
def replace(self, **updates: Any) -> Self:
    """Return a copy of this spec with ``updates`` applied.

    Mirrors ``dataclasses.replace`` but is exposed as a method for
    consistency with the rest of the API.
    """
    return dataclasses.replace(self, **updates)

to_dict

to_dict()

Serialise to a JSON-friendly dict, dispatching channels by kind.

Source code in src/nidaqlib/tasks/spec.py
def to_dict(self) -> dict[str, Any]:
    """Serialise to a JSON-friendly dict, dispatching channels by ``kind``."""
    return {
        "name": self.name,
        "channels": [ch.to_dict() for ch in self.channels],
        "timing": self.timing.to_dict() if self.timing is not None else None,
        "trigger": self.trigger.to_dict() if self.trigger is not None else None,
        "logging": self.logging.to_dict() if self.logging is not None else None,
        "metadata": dict(self.metadata),
    }

TaskState

Bases: StrEnum

Coarse projection of NI DAQmx's documented task lifecycle.

NI's underlying verified/reserved/committed states all bucket into :attr:CONFIGURED because the wrapper does not separately track them today. The other four are 1:1 with NI's documented states.

CLOSED class-attribute instance-attribute

CLOSED = 'CLOSED'

Backing NI task has been released.

CONFIGURED class-attribute instance-attribute

CONFIGURED = 'CONFIGURED'

Channels + timing applied (NI verified/reserved/committed).

CREATED class-attribute instance-attribute

CREATED = 'CREATED'

Task has been constructed; channels not yet applied.

RUNNING class-attribute instance-attribute

RUNNING = 'RUNNING'

task.start() has been called.

STOPPED class-attribute instance-attribute

STOPPED = 'STOPPED'

Stopped but not yet closed; may transition back to RUNNING.

TdmsLogging dataclass

TdmsLogging(
    *,
    path,
    operation=_default_logging_operation(),
    mode=_default_logging_mode(),
    group_name=None,
)

Driver-side TDMS logging configuration (design doc §14.6).

Attached to :attr:TaskSpec.logging. The wrapper does not write TDMS by hand — nidaqmx-python exposes task-level driver-side logging via task.in_stream.configure_logging(...). nidaqlib configures the knobs and otherwise stays out of the way.

Attributes:

Name Type Description
path str | Path

Destination .tdms file. Stringified into NI's call.

operation LoggingOperation

How NI handles a pre-existing file. Defaults to :class:nidaqmx.constants.LoggingOperation.OPEN_OR_CREATE.

mode LoggingMode

Write-and-read vs. write-only. Defaults to :class:nidaqmx.constants.LoggingMode.LOG_AND_READ. Choose :class:nidaqmx.constants.LoggingMode.LOG for a write-only stream — :func:~nidaqlib.streaming.record detects this and emits an empty stream rather than blocking forever in read_block.

group_name str | None

Optional TDMS group name. None lets NI default.

from_dict classmethod

from_dict(data)

Deserialise, restoring enum members from their .value ints.

Source code in src/nidaqlib/tasks/spec.py
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> Self:
    """Deserialise, restoring enum members from their ``.value`` ints."""
    from nidaqmx.constants import (  # noqa: PLC0415
        LoggingMode,
        LoggingOperation,
    )

    op_raw = data.get("operation", LoggingOperation.OPEN_OR_CREATE.value)
    try:
        operation = LoggingOperation(op_raw)
    except ValueError as exc:
        raise NIDaqValidationError(f"unknown LoggingOperation {op_raw!r}") from exc
    mode_raw = data.get("mode", LoggingMode.LOG_AND_READ.value)
    try:
        mode = LoggingMode(mode_raw)
    except ValueError as exc:
        raise NIDaqValidationError(f"unknown LoggingMode {mode_raw!r}") from exc
    return cls(
        path=str(data["path"]),
        operation=operation,
        mode=mode,
        group_name=data.get("group_name"),
    )

to_dict

to_dict()

Serialise to a JSON-friendly dict using each enum's .value.

Source code in src/nidaqlib/tasks/spec.py
def to_dict(self) -> dict[str, Any]:
    """Serialise to a JSON-friendly dict using each enum's ``.value``."""
    return {
        "path": str(self.path),
        "operation": self.operation.value,
        "mode": self.mode.value,
        "group_name": self.group_name,
    }

ThermocoupleInput dataclass

ThermocoupleInput(
    *,
    physical_channel,
    name=None,
    unit=None,
    metadata=_empty_metadata(),
    adc_timing_mode=None,
    adc_custom_timing_mode=None,
    auto_zero_mode=None,
    thermocouple_type,
    min_val,
    max_val,
    cjc_source=None,
    cjc_val=None,
    units=_default_temperature_units(),
)

Bases: AnalogInputBase

Thermocouple analog-input channel.

Maps to Task.ai_channels.add_ai_thrmcpl_chan on the NI side. The enum-typed fields are stored as int values matching nidaqmx.constants so that to_dict/from_dict round-trips through JSON without dragging NI's enum machinery into the serialisation layer.

Attributes:

Name Type Description
thermocouple_type ThermocoupleType

One of nidaqmx.constants.ThermocoupleType (J, K, T, ...). Required; no sane default.

min_val float

Lower limit of the expected temperature, in units.

max_val float

Upper limit of the expected temperature, in units.

cjc_source CJCSource | None

Cold-junction compensation source. None lets NI pick the device default (typically built-in).

cjc_val float | None

Cold-junction reference temperature, in units. Only relevant for CJCSource.CONSTANT_USER_VALUE.

units TemperatureUnits

Temperature units for min_val / max_val and the returned data. Defaults to degrees Celsius.

Inherits :attr:adc_timing_mode and :attr:adc_custom_timing_mode from :class:AnalogInputBase — most useful here, since the NI 9213 / 9214 thermocouple modules expose the full set of timing modes.

__post_init__

__post_init__()

Validate the temperature range.

Source code in src/nidaqlib/channels/analog_input.py
def __post_init__(self) -> None:
    """Validate the temperature range."""
    AnalogInputBase.__post_init__(self)
    if self.min_val >= self.max_val:
        raise NIDaqValidationError(
            f"min_val must be < max_val for {self.display_name!r}; "
            f"got {self.min_val!r} >= {self.max_val!r}"
        )

from_dict classmethod

from_dict(data)

Reconstruct, restoring enum members from their serialised .value ints.

Source code in src/nidaqlib/channels/analog_input.py
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> Self:
    """Reconstruct, restoring enum members from their serialised ``.value`` ints."""
    kind = data.get("kind")
    if kind != cls.kind:
        raise NIDaqValidationError(f"kind mismatch: expected {cls.kind!r}, got {kind!r}")
    from nidaqmx.constants import (  # noqa: PLC0415
        CJCSource,
        TemperatureUnits,
        ThermocoupleType,
    )

    payload = {k: v for k, v in data.items() if k != "kind"}
    AnalogInputBase._restore_base_enums(payload)
    try:
        payload["thermocouple_type"] = ThermocoupleType(payload["thermocouple_type"])
    except (KeyError, ValueError) as exc:
        raise NIDaqValidationError(
            f"unknown ThermocoupleType {payload.get('thermocouple_type')!r}"
        ) from exc
    if payload.get("cjc_source") is not None:
        try:
            payload["cjc_source"] = CJCSource(payload["cjc_source"])
        except ValueError as exc:
            raise NIDaqValidationError(
                f"unknown CJCSource {payload.get('cjc_source')!r}"
            ) from exc
    units_raw = payload.get("units", TemperatureUnits.DEG_C.value)
    try:
        payload["units"] = TemperatureUnits(units_raw)
    except ValueError as exc:
        raise NIDaqValidationError(f"unknown TemperatureUnits {units_raw!r}") from exc
    return cls(**payload)

to_dict

to_dict()

Serialise, encoding TC-specific enums via their .value ints.

Source code in src/nidaqlib/channels/analog_input.py
def to_dict(self) -> dict[str, Any]:
    """Serialise, encoding TC-specific enums via their ``.value`` ints."""
    payload = AnalogInputBase.to_dict(self)
    payload["thermocouple_type"] = self.thermocouple_type.value
    payload["cjc_source"] = self.cjc_source.value if self.cjc_source is not None else None
    payload["units"] = self.units.value
    return payload

Timing dataclass

Timing(
    *,
    rate_hz,
    mode=AcquisitionMode.CONTINUOUS,
    samples_per_channel=None,
    source=None,
    active_edge=Edge.RISING,
)

Sample-clock timing configuration.

Attributes:

Name Type Description
rate_hz float

Sample clock rate, in Hz. Required for hardware-timed modes (finite / continuous).

mode AcquisitionMode

Acquisition mode. Defaults to continuous.

samples_per_channel int | None

For FINITE, the total number of samples per channel. For CONTINUOUS, this sizes the on-board buffer. NI chooses a sensible default when None.

source str | None

Optional sample-clock source terminal (e.g. an external terminal name); None selects the on-board clock.

active_edge Edge

Sample-clock active edge. Rising by default.

__post_init__

__post_init__()

Validate timing parameters before they reach NI.

Source code in src/nidaqlib/tasks/spec.py
def __post_init__(self) -> None:
    """Validate timing parameters before they reach NI."""
    if self.rate_hz <= 0.0:
        raise NIDaqValidationError(f"rate_hz must be > 0, got {self.rate_hz!r}")
    if self.samples_per_channel is not None and self.samples_per_channel <= 0:
        raise NIDaqValidationError(
            f"samples_per_channel must be > 0 when set, got {self.samples_per_channel!r}"
        )

from_dict classmethod

from_dict(data)

Deserialise from a dict produced by :meth:to_dict.

Parameters:

Name Type Description Default
data Mapping[str, Any]

Mapping carrying the timing fields.

required

Raises:

Type Description
NIDaqValidationError

An enum field carries an unknown value.

Source code in src/nidaqlib/tasks/spec.py
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> Self:
    """Deserialise from a dict produced by :meth:`to_dict`.

    Args:
        data: Mapping carrying the timing fields.

    Raises:
        NIDaqValidationError: An enum field carries an unknown value.
    """
    try:
        mode = AcquisitionMode(data.get("mode", AcquisitionMode.CONTINUOUS.value))
    except ValueError as exc:
        raise NIDaqValidationError(f"unknown AcquisitionMode {data.get('mode')!r}") from exc
    try:
        edge = Edge(data.get("active_edge", Edge.RISING.value))
    except ValueError as exc:
        raise NIDaqValidationError(f"unknown Edge {data.get('active_edge')!r}") from exc
    return cls(
        rate_hz=float(data["rate_hz"]),
        mode=mode,
        samples_per_channel=(
            int(data["samples_per_channel"])
            if data.get("samples_per_channel") is not None
            else None
        ),
        source=data.get("source"),
        active_edge=edge,
    )

to_dict

to_dict()

Serialise to a JSON-friendly dict.

Enum members serialise to their string values so the result is JSON-encodable without a custom encoder.

Source code in src/nidaqlib/tasks/spec.py
def to_dict(self) -> dict[str, Any]:
    """Serialise to a JSON-friendly dict.

    Enum members serialise to their string values so the result is
    JSON-encodable without a custom encoder.
    """
    return {
        "rate_hz": self.rate_hz,
        "mode": self.mode.value,
        "samples_per_channel": self.samples_per_channel,
        "source": self.source,
        "active_edge": self.active_edge.value,
    }

TriggerSpec dataclass

TriggerSpec(*, source)

Base class for task-level trigger configurations.

Subclasses declare a non-empty :attr:kind and are registered via :func:register_trigger_kind so :meth:from_dict on the base can dispatch by discriminator.

Attributes:

Name Type Description
source str

NI terminal supplying the trigger (e.g. "PFI0", "/Dev1/PFI0", "/Dev1/ai/StartTrigger"). Subclasses use this verbatim.

kind class-attribute

kind = ''

Discriminator used by :meth:from_dict. Concrete subclasses override.

from_dict classmethod

from_dict(data)

Deserialise; on the base, dispatch by kind to a registered subclass.

Raises:

Type Description
NIDaqValidationError

kind is missing, unknown, or does not match the concrete class.

Source code in src/nidaqlib/tasks/triggers.py
@classmethod
def from_dict(cls, data: Mapping[str, Any]) -> Self:
    """Deserialise; on the base, dispatch by ``kind`` to a registered subclass.

    Raises:
        NIDaqValidationError: ``kind`` is missing, unknown, or does not
            match the concrete class.
    """
    kind = data.get("kind")
    if cls is TriggerSpec:
        if not isinstance(kind, str):
            raise NIDaqValidationError(
                f"trigger spec dict missing 'kind' discriminator (got {kind!r})"
            )
        target = _TRIGGER_REGISTRY.get(kind)
        if target is None:
            raise NIDaqValidationError(f"unknown trigger kind {kind!r}")
        return target.from_dict(data)  # type: ignore[return-value]
    if kind != cls.kind:
        raise NIDaqValidationError(f"kind mismatch: expected {cls.kind!r}, got {kind!r}")
    payload = {k: v for k, v in data.items() if k != "kind"}
    return cls(**payload)

to_dict

to_dict()

Serialise to a JSON-friendly dict, including kind.

Source code in src/nidaqlib/tasks/triggers.py
def to_dict(self) -> dict[str, Any]:
    """Serialise to a JSON-friendly dict, including ``kind``."""
    payload = dataclasses.asdict(self)
    payload["kind"] = self.kind
    return payload

block_to_rows

block_to_rows(block)

Unroll a :class:DaqBlock into one row per (channel, sample).

Per-sample timestamps reconstruct from block.t_mono_ns, block.block_period_ns, and block.first_sample_index. For on-demand blocks (no clock), samples are spaced uniformly within the read window.

Each row carries:

  • device, task, channel — join keys.
  • block_index, sample_index — block- and task-level indices.
  • t_mono_ns — reconstructed monotonic nanoseconds for this sample.
  • t_utc — reconstructed wall-clock (ISO 8601) for this sample.
  • value — the scalar sample value.
  • unit — engineering unit for the channel (or None).
  • error_type / error_message — populated only on error blocks.
Source code in src/nidaqlib/sinks/base.py
def block_to_rows(block: DaqBlock) -> list[dict[str, float | int | str | bool | None]]:
    """Unroll a :class:`DaqBlock` into one row per ``(channel, sample)``.

    Per-sample timestamps reconstruct from ``block.t_mono_ns``,
    ``block.block_period_ns``, and ``block.first_sample_index``. For
    on-demand blocks (no clock), samples are spaced uniformly within the
    read window.

    Each row carries:

    - ``device``, ``task``, ``channel`` — join keys.
    - ``block_index``, ``sample_index`` — block- and task-level indices.
    - ``t_mono_ns`` — reconstructed monotonic nanoseconds for this sample.
    - ``t_utc`` — reconstructed wall-clock (ISO 8601) for this sample.
    - ``value`` — the scalar sample value.
    - ``unit`` — engineering unit for the channel (or ``None``).
    - ``error_type`` / ``error_message`` — populated only on error blocks.
    """
    from datetime import timedelta  # noqa: PLC0415

    n_channels = len(block.channels)
    n_samples = block.samples_per_channel
    period_ns = block.block_period_ns
    if period_ns is None:
        span_ns = int((block.read_finished_at - block.read_started_at).total_seconds() * 1e9)
        period_ns = span_ns // max(1, n_samples)

    err = block.error
    err_type = f"{type(err).__module__}.{type(err).__qualname__}" if err is not None else None
    err_msg = str(err) if err is not None else None

    rows: list[dict[str, float | int | str | bool | None]] = []
    for c in range(n_channels):
        ch_name = block.channels[c]
        unit = block.units.get(ch_name)
        for k in range(n_samples):
            absolute = block.first_sample_index + k
            sample_t_mono_ns = block.t_mono_ns + k * period_ns
            sample_t_utc = block.t_utc + timedelta(microseconds=(k * period_ns) / 1_000)
            rows.append(
                {
                    "device": block.device,
                    "task": block.task,
                    "channel": ch_name,
                    "block_index": block.block_index,
                    "sample_index": absolute,
                    "t_mono_ns": sample_t_mono_ns,
                    "t_utc": sample_t_utc.isoformat(),
                    "value": float(block.data[c, k]),
                    "unit": unit,
                    "error_type": err_type,
                    "error_message": err_msg,
                }
            )
    return rows

config_from_env

config_from_env(prefix=DEFAULT_ENV_PREFIX)

Best-effort env loader.

Only reads well-known keys. Missing or unparseable values fall back to :class:NidaqConfig's defaults — this function never raises.

Recognised keys (with prefix="NIDAQLIB_"):

  • NIDAQLIB_DEFAULT_TIMEOUT_S — float seconds
  • NIDAQLIB_DEFAULT_SAMPLE_RATE_HZ — float Hz
  • NIDAQLIB_DEFAULT_BUFFER_SIZE — int slots
  • NIDAQLIB_DEFAULT_CHUNK_SIZE — int samples
  • NIDAQLIB_EAGER_TASKS"1" / "true" / "yes"

Parameters:

Name Type Description Default
prefix str

Prefix to prepend to each env key. Defaults to "NIDAQLIB_".

DEFAULT_ENV_PREFIX

Returns:

Name Type Description
A NidaqConfig

class:NidaqConfig populated from env where parseable.

Source code in src/nidaqlib/config.py
def config_from_env(prefix: str = DEFAULT_ENV_PREFIX) -> NidaqConfig:
    """Best-effort env loader.

    Only reads well-known keys. Missing or unparseable values fall back to
    :class:`NidaqConfig`'s defaults — this function never raises.

    Recognised keys (with ``prefix="NIDAQLIB_"``):

    - ``NIDAQLIB_DEFAULT_TIMEOUT_S`` — float seconds
    - ``NIDAQLIB_DEFAULT_SAMPLE_RATE_HZ`` — float Hz
    - ``NIDAQLIB_DEFAULT_BUFFER_SIZE`` — int slots
    - ``NIDAQLIB_DEFAULT_CHUNK_SIZE`` — int samples
    - ``NIDAQLIB_EAGER_TASKS`` — ``"1"`` / ``"true"`` / ``"yes"``

    Args:
        prefix: Prefix to prepend to each env key. Defaults to
            ``"NIDAQLIB_"``.

    Returns:
        A :class:`NidaqConfig` populated from env where parseable.
    """
    base = NidaqConfig()
    return NidaqConfig(
        default_timeout_s=_float_env(f"{prefix}DEFAULT_TIMEOUT_S", base.default_timeout_s),
        default_sample_rate_hz=_float_env(
            f"{prefix}DEFAULT_SAMPLE_RATE_HZ", base.default_sample_rate_hz
        ),
        default_buffer_size=_int_env(f"{prefix}DEFAULT_BUFFER_SIZE", base.default_buffer_size),
        default_chunk_size=_int_env(f"{prefix}DEFAULT_CHUNK_SIZE", base.default_chunk_size),
        eager_tasks=_bool_env(f"{prefix}EAGER_TASKS", base.eager_tasks),
    )

find_devices

find_devices()

Enumerate NI DAQ devices visible to the driver. Never raises.

Returns:

Name Type Description
One list[DiscoveryResult]

class:NIDaqDiscoveryResult per device. Empty list when no

list[DiscoveryResult]

NI hardware is present. On enumeration-level failure (driver

list[DiscoveryResult]

missing, system call raises), returns a single ok=False row

list[DiscoveryResult]

with error populated.

Source code in src/nidaqlib/system/discovery.py
def find_devices() -> list[DiscoveryResult]:
    """Enumerate NI DAQ devices visible to the driver. Never raises.

    Returns:
        One :class:`NIDaqDiscoveryResult` per device. Empty list when no
        NI hardware is present. On enumeration-level failure (driver
        missing, system call raises), returns a single ``ok=False`` row
        with ``error`` populated.
    """
    try:
        nidaqmx = _import_nidaqmx()
    except NIDaqDependencyError as exc:
        return [
            DiscoveryResult(
                ok=False,
                port="",
                error=exc,
                elapsed_s=0.0,
            )
        ]

    started = time.monotonic()
    try:
        system = nidaqmx.system.System.local()
        devices = list(system.devices)
    except (nidaqmx.errors.DaqNotFoundError, nidaqmx.errors.DaqNotSupportedError) as exc:
        # Driver not installed, or platform unsupported (e.g. darwin). Neither
        # subclasses DaqError, so they need their own clause — surface as a
        # dependency failure rather than a backend error.
        elapsed = time.monotonic() - started
        wrapped: NIDaqError = NIDaqDependencyError(str(exc))
        wrapped.__cause__ = exc
        return [
            DiscoveryResult(
                ok=False,
                port="",
                error=wrapped,
                elapsed_s=elapsed,
            )
        ]
    except nidaqmx.errors.DaqError as exc:  # pragma: no cover — hardware path
        elapsed = time.monotonic() - started
        wrapped = NIDaqBackendError(
            "failed to enumerate NI devices",
            context=ErrorContext(
                command_name="find_devices",
                ni_error_code=getattr(exc, "error_code", None),
            ),
        )
        wrapped.__cause__ = exc
        return [
            DiscoveryResult(
                ok=False,
                port="",
                error=wrapped,
                elapsed_s=elapsed,
            )
        ]

    results: list[DiscoveryResult] = []
    for dev in devices:
        per_started = time.monotonic()
        name = str(_safe_attr(dev, "name", ""))
        product_type = _safe_attr(dev, "product_type")
        serial_num = _safe_attr(dev, "serial_num")
        info = DeviceInfo(
            name=name,
            product_type=str(product_type) if product_type is not None else None,
            serial_number=str(serial_num or "") or None,
            ai_physical_channels=_channel_names(_safe_attr(dev, "ai_physical_chans", ())),
            ao_physical_channels=_channel_names(_safe_attr(dev, "ao_physical_chans", ())),
            di_lines=_channel_names(_safe_attr(dev, "di_lines", ())),
            do_lines=_channel_names(_safe_attr(dev, "do_lines", ())),
            ci_physical_channels=_channel_names(_safe_attr(dev, "ci_physical_chans", ())),
            co_physical_channels=_channel_names(_safe_attr(dev, "co_physical_chans", ())),
        )
        results.append(
            NIDaqDiscoveryResult(
                ok=True,
                port=name,
                device_info=info,
                product_type=info.product_type,
                serial_number=info.serial_number,
                chassis=_chassis_of(dev),
                physical_module=_physical_module_of(dev),
                elapsed_s=time.monotonic() - per_started,
            )
        )
    return results

open_device async

open_device(
    spec,
    *,
    backend=None,
    timeout=10.0,
    autostart=True,
    confirm_start=False,
)

Open and return a configured :class:DaqSession.

Usage forms::

async with await open_device(spec) as session:
    ...

session = await open_device(spec)
try:
    ...
finally:
    await session.close()

Mirrors the ecosystem open_device shape used by alicatlib, watlowlib, and sartoriuslib. The DAQ-specific deviation: the spec is the declarative task description (channels, timing, triggers) rather than a serial port string.

Parameters:

Name Type Description Default
spec TaskSpec

Declarative :class:TaskSpec to materialise.

required
backend DaqBackend | None

Optional :class:~nidaqlib.backend.base.DaqBackend. Defaults to :class:~nidaqlib.backend.nidaqmx_backend.NidaqmxBackend — tests typically pass a :class:~nidaqlib.backend.fake.FakeDaqBackend here.

None
timeout float

Default per-operation timeout, in seconds.

10.0
autostart bool

When True (default), the session is configured AND started before this function returns. When False, the session is only configured — the caller is responsible for await session.start() before any acquisition. Required for the §11.3.2 callback bridge, which must register the buffer event before NI's task.start(); pass the unstarted session to :func:~nidaqlib.streaming.block.record with use_callback_bridge=True and the recorder owns the start.

True
confirm_start bool

Required when starting the task can actuate hardware immediately (for example counter-output pulse trains). Only consulted when autostart=True.

False

Returns:

Type Description
DaqSession

A configured :class:DaqSession. Started iff autostart=True.

Source code in src/nidaqlib/tasks/__init__.py
async def open_device(
    spec: TaskSpec,
    *,
    backend: DaqBackend | None = None,
    timeout: float = 10.0,  # noqa: ASYNC109 — NI per-call timeout, not coroutine
    autostart: bool = True,
    confirm_start: bool = False,
) -> DaqSession:
    """Open and return a configured :class:`DaqSession`.

    Usage forms::

        async with await open_device(spec) as session:
            ...

        session = await open_device(spec)
        try:
            ...
        finally:
            await session.close()

    Mirrors the ecosystem ``open_device`` shape used by ``alicatlib``,
    ``watlowlib``, and ``sartoriuslib``. The DAQ-specific deviation: the
    ``spec`` is the declarative task description (channels, timing,
    triggers) rather than a serial port string.

    Args:
        spec: Declarative :class:`TaskSpec` to materialise.
        backend: Optional :class:`~nidaqlib.backend.base.DaqBackend`. Defaults
            to :class:`~nidaqlib.backend.nidaqmx_backend.NidaqmxBackend` —
            tests typically pass a
            :class:`~nidaqlib.backend.fake.FakeDaqBackend` here.
        timeout: Default per-operation timeout, in seconds.
        autostart: When ``True`` (default), the session is configured AND
            started before this function returns. When ``False``, the
            session is only configured — the caller is responsible for
            ``await session.start()`` before any acquisition. Required
            for the §11.3.2 callback bridge, which must register the
            buffer event before NI's ``task.start()``; pass the
            unstarted session to :func:`~nidaqlib.streaming.block.record`
            with ``use_callback_bridge=True`` and the recorder owns the
            start.
        confirm_start: Required when starting the task can actuate hardware
            immediately (for example counter-output pulse trains). Only
            consulted when ``autostart=True``.

    Returns:
        A configured :class:`DaqSession`. Started iff ``autostart=True``.
    """
    if backend is None:
        # Local import — keeps the production `nidaqmx` import off the
        # critical path of test sessions that supply a fake backend.
        from nidaqlib.backend.nidaqmx_backend import NidaqmxBackend  # noqa: PLC0415

        backend = NidaqmxBackend()
    session = DaqSession(spec, backend, timeout=timeout)
    if autostart:
        # Validate up-front so a missing ``confirm_start`` for an actuating
        # task fails before any backend resources are allocated.
        session._validate_start_safety(confirm=confirm_start)  # pyright: ignore[reportPrivateUsage]
    try:
        await session.configure()
        if autostart:
            await session.start(confirm=confirm_start)
    except BaseException:
        # Open failed mid-pipeline; release any partial state so the
        # caller isn't left with a half-configured session.
        await session.close()
        raise
    return session

read_sidecar

read_sidecar(tdms_path)

Read a sidecar adjacent to tdms_path and reconstruct a :class:RunMetadata.

Raises:

Type Description
FileNotFoundError

The sidecar does not exist.

NIDaqValidationError

The sidecar JSON is structurally invalid.

Source code in src/nidaqlib/tasks/metadata.py
def read_sidecar(tdms_path: str | Path) -> RunMetadata:
    """Read a sidecar adjacent to ``tdms_path`` and reconstruct a :class:`RunMetadata`.

    Raises:
        FileNotFoundError: The sidecar does not exist.
        NIDaqValidationError: The sidecar JSON is structurally invalid.
    """
    sidecar = sidecar_path_for(tdms_path)
    payload: object = json.loads(sidecar.read_text(encoding="utf-8"))
    if not isinstance(payload, dict):
        raise NIDaqValidationError(
            f"sidecar {sidecar!s} must contain a JSON object, got {type(payload).__name__}"
        )
    return RunMetadata.from_dict(payload)  # pyright: ignore[reportUnknownArgumentType]

reading_to_row

reading_to_row(reading)

Flatten a :class:DaqReading into a single row dict.

Layout:

  • device, task — join keys.
  • t_mono_ns — int, canonical monotonic join key.
  • t_utc — ISO 8601, wall-clock acquisition midpoint.
  • t_midpoint_mono_ns — int or None (integration-window midpoint).
  • requested_at / received_at — ISO 8601, I/O provenance.
  • latency_s — float seconds.
  • one column per channel (values keys), values flattened.
  • one <channel>_unit column per channel.
  • error_type / error_message — populated only on error rows.

The same row layout is used by every row-oriented sink.

Source code in src/nidaqlib/sinks/base.py
def reading_to_row(reading: DaqReading) -> dict[str, float | int | str | bool | None]:
    """Flatten a :class:`DaqReading` into a single row dict.

    Layout:

    - ``device``, ``task`` — join keys.
    - ``t_mono_ns`` — int, canonical monotonic join key.
    - ``t_utc`` — ISO 8601, wall-clock acquisition midpoint.
    - ``t_midpoint_mono_ns`` — int or None (integration-window midpoint).
    - ``requested_at`` / ``received_at`` — ISO 8601, I/O provenance.
    - ``latency_s`` — float seconds.
    - one column per channel (``values`` keys), values flattened.
    - one ``<channel>_unit`` column per channel.
    - ``error_type`` / ``error_message`` — populated only on error rows.

    The same row layout is used by every row-oriented sink.
    """
    row: dict[str, float | int | str | bool | None] = {
        "device": reading.device,
        "task": reading.task,
        "t_mono_ns": reading.t_mono_ns,
        "t_utc": reading.t_utc.isoformat(),
        "t_midpoint_mono_ns": reading.t_midpoint_mono_ns,
        "requested_at": reading.requested_at.isoformat(),
        "received_at": reading.received_at.isoformat(),
        "latency_s": reading.latency_s,
    }
    row.update(reading.values)
    row.update({f"{ch}_unit": unit for ch, unit in reading.units.items()})
    err = reading.error
    if err is not None:
        row["error_type"] = f"{type(err).__module__}.{type(err).__qualname__}"
        row["error_message"] = str(err)
    else:
        row["error_type"] = None
        row["error_message"] = None
    return row

record async

record(
    source,
    *,
    chunk_size,
    timeout=10.0,
    buffer_size=16,
    error_policy=ErrorPolicy.RAISE,
    overflow=OverflowPolicy.DROP_OLDEST,
    use_callback_bridge=False,
)

Hardware-clocked block acquisition.

Yields a :class:Recording[DaqBlock]. The stream is closed when this context manager exits; summary is mutated in place during the run and is safe to read after exit.

Parameters:

Name Type Description Default
source DaqSession

A configured :class:DaqSession. Required state depends on use_callback_bridge:

  • use_callback_bridge=False (Option A) — source must be started; wrap with :func:~nidaqlib.tasks.open_device (the default autostart=True).
  • use_callback_bridge=True (Option B / §11.3.2) — source must be configured but not yet started; pass autostart=False to open_device and let the recorder own the start. NI rejects buffer-event registration on a running task with -200960 ("Register all your DAQmx software events prior to starting the task").
required
chunk_size int

Samples per channel per emitted :class:DaqBlock.

required
timeout float

Per-read timeout in seconds (Option A only — Option B reads from the NI buffer with timeout 0).

10.0
buffer_size int

AnyIO memory-object stream buffer, in :class:DaqBlock slots. Older blocks may be dropped per overflow when full.

16
error_policy ErrorPolicy

:attr:ErrorPolicy.RAISE (default) cancels the task group on error; :attr:ErrorPolicy.RETURN emits an error-tagged :class:DaqBlock and continues. Option B (callback bridge) currently honours only :attr:RAISE — :attr:RETURN is wired for the Option A producer.

RAISE
overflow OverflowPolicy

Backpressure policy. DROP_OLDEST is the hardware-clocked default — see design doc §13.3.

DROP_OLDEST
use_callback_bridge bool

Opt into the §11.3.2 every-N-samples callback path. Default False selects Option A (blocking read in a worker thread).

False

Raises:

Type Description
NIDaqTaskStateError

source is in the wrong lifecycle state for the selected mode (see source argument above).

ValueError

chunk_size < 1 or buffer_size < 1.

Source code in src/nidaqlib/streaming/block.py
@asynccontextmanager
async def record(
    source: DaqSession,
    *,
    chunk_size: int,
    timeout: float = 10.0,  # noqa: ASYNC109 — per-NI-read timeout, not coroutine
    buffer_size: int = 16,
    error_policy: ErrorPolicy = ErrorPolicy.RAISE,
    overflow: OverflowPolicy = OverflowPolicy.DROP_OLDEST,
    use_callback_bridge: bool = False,
) -> AsyncGenerator[Recording[DaqBlock]]:
    """Hardware-clocked block acquisition.

    Yields a :class:`Recording[DaqBlock]`. The stream is closed when this
    context manager exits; ``summary`` is mutated in place during the run
    and is safe to read after exit.

    Args:
        source: A configured :class:`DaqSession`. Required state depends on
            ``use_callback_bridge``:

            * ``use_callback_bridge=False`` (Option A) — ``source`` must be
              **started**; wrap with :func:`~nidaqlib.tasks.open_device` (the
              default ``autostart=True``).
            * ``use_callback_bridge=True`` (Option B / §11.3.2) — ``source``
              must be **configured but not yet started**; pass
              ``autostart=False`` to ``open_device`` and let the recorder own
              the start. NI rejects buffer-event registration on a running
              task with -200960 ("Register all your DAQmx software events
              prior to starting the task").
        chunk_size: Samples per channel per emitted :class:`DaqBlock`.
        timeout: Per-read timeout in seconds (Option A only — Option B reads
            from the NI buffer with timeout 0).
        buffer_size: AnyIO memory-object stream buffer, in :class:`DaqBlock`
            slots. Older blocks may be dropped per ``overflow`` when full.
        error_policy: :attr:`ErrorPolicy.RAISE` (default) cancels the task
            group on error; :attr:`ErrorPolicy.RETURN` emits an error-tagged
            :class:`DaqBlock` and continues. Option B (callback bridge)
            currently honours only :attr:`RAISE` — :attr:`RETURN` is wired
            for the Option A producer.
        overflow: Backpressure policy. ``DROP_OLDEST`` is the
            hardware-clocked default — see design doc §13.3.
        use_callback_bridge: Opt into the §11.3.2 every-N-samples callback
            path. Default ``False`` selects Option A (blocking read in a
            worker thread).

    Raises:
        NIDaqTaskStateError: ``source`` is in the wrong lifecycle state for
            the selected mode (see ``source`` argument above).
        ValueError: ``chunk_size < 1`` or ``buffer_size < 1``.
    """
    if chunk_size < 1:
        raise ValueError(f"chunk_size must be >= 1, got {chunk_size!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")
    _validate_record_state(
        source,
        error_policy=error_policy,
        use_callback_bridge=use_callback_bridge,
    )

    summary = AcquisitionSummary()
    timing = source.spec.timing
    rate_hz: float | None = (
        timing.rate_hz
        if timing is not None and timing.mode is not AcquisitionMode.ON_DEMAND
        else None
    )
    tx, rx = anyio.create_memory_object_stream[DaqBlock](max_buffer_size=buffer_size)
    drop_rx = rx.clone()

    # TDMS LoggingMode.LOG bypasses the application read path. If we tried
    # to drive the producer, ``read_block`` would block forever waiting on
    # samples that never arrive. Detect at entry and emit an empty stream
    # so consumers see ``blocks_emitted == 0``.
    if _is_log_only(source):
        async with rx, drop_rx:
            await tx.aclose()
            try:
                yield Recording(stream=rx, summary=summary, rate_hz=rate_hz)
            finally:
                summary.finished_at = datetime.now(UTC)
        return

    async with anyio.create_task_group() as tg, rx, drop_rx:
        if use_callback_bridge:
            await _start_bridge_producer(tg, source, tx, drop_rx, summary, chunk_size, overflow)
        else:
            _ = tg.start_soon(
                _blocking_producer,
                source,
                tx,
                drop_rx,
                summary,
                chunk_size,
                timeout,
                overflow,
                error_policy,
            )
        try:
            yield Recording(stream=rx, summary=summary, rate_hz=rate_hz)
        finally:
            await tx.aclose()
            tg.cancel()
    summary.finished_at = datetime.now(UTC)

record_polled async

record_polled(
    source,
    *,
    rate_hz,
    error_policy=ErrorPolicy.RAISE,
    overflow=OverflowPolicy.BLOCK,
    buffer_size=64,
)

Software-timed scalar polling at rate_hz.

Yields a :class:Recording[T]. The per-tick payload type T depends on source:

  • :class:DaqSession → one :class:DaqReading per tick.
  • :class:DaqManagerMapping[str, DeviceResult[DaqReading]] per tick (matches :meth:DaqManager.poll).
  • Any :class:PollSource (including :class:PollSourceAdapter) → Mapping[str, DeviceResult[DaqReading]] returned by its poll().

summary is updated in place during the run; a final snapshot is frozen on exit.

Parameters:

Name Type Description Default
source DaqSession | DaqManager | PollSource

A started :class:DaqSession (whose timing is None or :attr:AcquisitionMode.ON_DEMAND), a :class:DaqManager, or any :class:PollSource.

required
rate_hz float

Target poll rate, in Hz. Must be > 0.

required
error_policy ErrorPolicy

:attr:RAISE cancels the recorder on a poll error; :attr:RETURN emits a :class:DaqReading (or per-task :class:DeviceResult row) with the error attached and continues.

RAISE
overflow OverflowPolicy

Backpressure policy. Defaults to :attr:BLOCK — software-timed pollers can pause safely (design §13.3).

BLOCK
buffer_size int

AnyIO send-stream capacity in payload slots.

64

Raises:

Type Description
NIDaqTaskStateError

A session source is not started, or a manager source is closed / has no managed tasks.

ValueError

rate_hz <= 0 or buffer_size < 1.

Source code in src/nidaqlib/streaming/recorder.py
@asynccontextmanager
async def record_polled(
    source: DaqSession | DaqManager | PollSource,
    *,
    rate_hz: float,
    error_policy: ErrorPolicy = ErrorPolicy.RAISE,
    overflow: OverflowPolicy = OverflowPolicy.BLOCK,
    buffer_size: int = 64,
) -> AsyncGenerator[Recording[_PolledItem]]:
    """Software-timed scalar polling at ``rate_hz``.

    Yields a :class:`Recording[T]`. The per-tick payload type ``T`` depends
    on ``source``:

    - :class:`DaqSession` → one :class:`DaqReading` per tick.
    - :class:`DaqManager` → ``Mapping[str, DeviceResult[DaqReading]]`` per
      tick (matches :meth:`DaqManager.poll`).
    - Any :class:`PollSource` (including :class:`PollSourceAdapter`) →
      ``Mapping[str, DeviceResult[DaqReading]]`` returned by its ``poll()``.

    ``summary`` is updated in place during the run; a final snapshot is
    frozen on exit.

    Args:
        source: A started :class:`DaqSession` (whose timing is ``None`` or
            :attr:`AcquisitionMode.ON_DEMAND`), a :class:`DaqManager`, or
            any :class:`PollSource`.
        rate_hz: Target poll rate, in Hz. Must be > 0.
        error_policy: :attr:`RAISE` cancels the recorder on a poll error;
            :attr:`RETURN` emits a :class:`DaqReading` (or per-task
            :class:`DeviceResult` row) with the error attached and continues.
        overflow: Backpressure policy. Defaults to :attr:`BLOCK` —
            software-timed pollers can pause safely (design §13.3).
        buffer_size: AnyIO send-stream capacity in payload slots.

    Raises:
        NIDaqTaskStateError: A session ``source`` is not started, or a
            manager ``source`` is closed / has no managed tasks.
        ValueError: ``rate_hz <= 0`` or ``buffer_size < 1``.
    """
    if rate_hz <= 0:
        raise ValueError(f"rate_hz must be > 0, got {rate_hz!r}")
    if buffer_size < 1:
        raise ValueError(f"buffer_size must be >= 1, got {buffer_size!r}")

    # Late imports — manager imports streaming.block, session pulls in
    # backend modules; both are heavy enough to keep off the recorder
    # module's import path.
    from nidaqlib.manager import DaqManager  # noqa: PLC0415
    from nidaqlib.tasks.session import DaqSession  # noqa: PLC0415

    if isinstance(source, DaqManager):
        if source.is_closed:
            raise NIDaqTaskStateError(
                "record_polled() requires an open DaqManager; got a closed manager",
                context=ErrorContext(command_name="record_polled"),
            )
        if not source.names:
            raise NIDaqTaskStateError(
                "record_polled() requires a DaqManager with at least one task",
                context=ErrorContext(command_name="record_polled"),
            )
    elif isinstance(source, DaqSession) and not source.is_started:
        raise NIDaqTaskStateError(
            f"record_polled() requires a started session; task {source.spec.name!r} is not running",
            context=ErrorContext(task_name=source.spec.name, command_name="record_polled"),
        )

    summary = AcquisitionSummary()
    period = 1.0 / rate_hz
    tx, rx = anyio.create_memory_object_stream[_PolledItem](max_buffer_size=buffer_size)
    drop_rx = rx.clone()

    async with anyio.create_task_group() as tg, rx, drop_rx:
        if isinstance(source, DaqManager):
            _ = tg.start_soon(
                _polled_manager_producer,
                source,
                tx,
                drop_rx,
                summary,
                period,
                error_policy,
                overflow,
            )
        elif isinstance(source, DaqSession):
            _ = tg.start_soon(
                _polled_producer,
                source,
                tx,
                drop_rx,
                summary,
                period,
                error_policy,
                overflow,
            )
        else:
            # Any other PollSource — drive its poll() directly. Emits the
            # same Mapping[str, DeviceResult[DaqReading]] shape as the
            # manager path.
            _ = tg.start_soon(
                _polled_source_producer,
                source,
                tx,
                drop_rx,
                summary,
                period,
                error_policy,
                overflow,
            )
        try:
            yield Recording(stream=rx, summary=summary, rate_hz=rate_hz)
        finally:
            await tx.aclose()
            tg.cancel()
    summary.finished_at = datetime.now(UTC)

sidecar_path_for

sidecar_path_for(tdms_path)

Return the conventional sidecar path for tdms_path.

run.tdmsrun.metadata.json. The .tdms suffix is replaced with .metadata.json; any other extension gets .metadata.json appended.

Source code in src/nidaqlib/tasks/metadata.py
def sidecar_path_for(tdms_path: str | Path) -> Path:
    """Return the conventional sidecar path for ``tdms_path``.

    ``run.tdms`` → ``run.metadata.json``. The ``.tdms`` suffix is replaced
    with ``.metadata.json``; any other extension gets ``.metadata.json``
    appended.
    """
    path = Path(tdms_path)
    if path.suffix.lower() == ".tdms":
        return path.with_suffix(".metadata.json")
    return path.with_name(path.name + ".metadata.json")

to_pint

to_pint(unit)

Return a pint-compatible unit string for unit, or None.

Accepts
  • NoneNone.
  • A string already in pint form ("degC", "V", ...) — passed through unchanged when it's in the known set; otherwise returned as-is so unfamiliar units don't get silently dropped.
  • An nidaqmx.constants.TemperatureUnits member → mapped through the dedicated temperature table.

Lossy by design: no tuple, no discriminator, no exception on unknown units — same contract as the sibling libraries.

Source code in src/nidaqlib/units.py
def to_pint(unit: object) -> str | None:
    """Return a pint-compatible unit string for ``unit``, or ``None``.

    Accepts:
        - ``None`` → ``None``.
        - A string already in pint form (``"degC"``, ``"V"``, ...) — passed
          through unchanged when it's in the known set; otherwise returned
          as-is so unfamiliar units don't get silently dropped.
        - An ``nidaqmx.constants.TemperatureUnits`` member → mapped through
          the dedicated temperature table.

    Lossy by design: no tuple, no discriminator, no exception on unknown
    units — same contract as the sibling libraries.
    """
    if unit is None:
        return None
    if isinstance(unit, str):
        return _STRING_PASSTHROUGH.get(unit, unit)
    # Anything else — try the TemperatureUnits enum table. The lazy import
    # keeps this module callable when nidaqmx is absent.
    temperature = _temperature_table()
    if unit in temperature:
        return temperature[unit]
    # Last-ditch: many NI enums expose .name as a SHOUTING string. We don't
    # try to be clever; let callers handle the unknown case.
    name = getattr(unit, "name", None)
    if isinstance(name, str):
        return _STRING_PASSTHROUGH.get(name)
    return None

write_sidecar

write_sidecar(tdms_path, metadata, *, indent=2)

Write metadata next to tdms_path as <base>.metadata.json.

The TDMS file itself does not need to exist yet — the sidecar can be written before, during, or after the acquisition.

Parameters:

Name Type Description Default
tdms_path str | Path

The TDMS file path. Determines the sidecar location via :func:sidecar_path_for.

required
metadata RunMetadata

The :class:RunMetadata to serialise.

required
indent int | None

json.dumps indent parameter. None writes compactly.

2

Returns:

Name Type Description
The Path

class:pathlib.Path of the written sidecar.

Source code in src/nidaqlib/tasks/metadata.py
def write_sidecar(
    tdms_path: str | Path,
    metadata: RunMetadata,
    *,
    indent: int | None = 2,
) -> Path:
    """Write ``metadata`` next to ``tdms_path`` as ``<base>.metadata.json``.

    The TDMS file itself does not need to exist yet — the sidecar can be
    written before, during, or after the acquisition.

    Args:
        tdms_path: The TDMS file path. Determines the sidecar location via
            :func:`sidecar_path_for`.
        metadata: The :class:`RunMetadata` to serialise.
        indent: ``json.dumps`` indent parameter. ``None`` writes compactly.

    Returns:
        The :class:`pathlib.Path` of the written sidecar.
    """
    sidecar = sidecar_path_for(tdms_path)
    sidecar.parent.mkdir(parents=True, exist_ok=True)
    sidecar.write_text(json.dumps(metadata.to_dict(), indent=indent), encoding="utf-8")
    return sidecar