API

Usage

Connection

Connecting to STU

To communicate with the ICOtronic system use the the async context manager of the Connection class to open and close the connection to the STU:

>>> from asyncio import run
>>> from icotronic.can import Connection

>>> async def create_and_shutdown_connection():
...     async with Connection() as stu:
...         ... # ← Your code goes here

>>> run(create_and_shutdown_connection())

Connecting to Sensor Node

To connect to a sensor node (e.g. SHA, SMH, STH) use the async context manager of the coroutine STU.connect_sensor_node(). To connect to a node you need to know one of the identifiers of the node. In the example below we connect to a node with the name Test-STH:

>>> from asyncio import run
>>> from netaddr import EUI
>>> from icotronic.can import Connection

>>> async def connect_to_sensor_node(identifier):
...     async with Connection() as stu:
...         async with stu.connect_sensor_node(identifier) as sensor_node:
...             return await sensor_node.get_mac_address()

>>> mac_address = run(connect_to_sensor_node("Test-STH"))
>>> isinstance(mac_address, EUI)
True

By default STU.connect_sensor_node() assumes that you want to connect to a generic sensor node (e.g. a sensory milling head (SMH)). To connect to an STH (a sensor node with additional functionality), use STH for the sensor_node_class parameter:

>>> from asyncio import run
>>> from icotronic.can import Connection, STH

>>> async def get_sensor_range(identifier):
...     async with Connection() as stu:
...         async with stu.connect_sensor_node(identifier, STH) as sth:
...             return await sth.get_acceleration_sensor_range_in_g()

>>> sensor_range = run(get_sensor_range("Test-STH"))
>>> 0 <= sensor_range <= 200
True

Streaming

Reading Data

After you connected to the sensor node use the coroutine SensorNode.open_data_stream() to open the data stream and an async for statement to iterate over the received streaming data. The following code:

>>> from asyncio import run
>>> from icotronic.can import Connection
>>> from icotronic.can import StreamingConfiguration

>>> async def read_streaming_data():
...     async with Connection() as stu:
...         async with stu.connect_sensor_node("Test-STH") as sensor_node:
...             channels = StreamingConfiguration(first=True)
...             async with sensor_node.open_data_stream(channels) as stream:
...                 async for data, lost_messages in stream:
...                     return data

>>> data = run(read_streaming_data())

>>> # Example Output: [32579, 32637, 32575]@1724251001.976368 #123
>>> data
[..., ..., ...]@... #...

>>> len(data.values)
3
>>> isinstance(data.timestamp, float)
True
>>> all(0 <= value <= 2**16-1 for value in data.values)
True
>>> 0 <= data.counter <= 255
True
  1. connects to a node called Test-STH,

  2. opens a data stream for the first measurement channel,

  3. receives a single streaming data object,

  4. prints its representation and

  5. shows some of the properties of the streaming data object.

The data returned by the async for (stream) is an object of the class StreamingData with the following attributes:

  • StreamingData.values: a list containing either two or three values,

  • StreamingData.timestamp: the timestamp when the data was collected

  • StreamingData.counter: a cyclic message counter (0 – 255) that can be used to detect data loss

Note

The amount of data stored in StreamingData.values depends on the enabled streaming channels. For the recommended amount of one or three enabled channels the list contains three values. For

  • one enabled channel all three values belong to the same channel, while

  • for the three enabled channels

    • the first value belongs to the first channel,

    • the second value belongs to the second channel,

    • and the third value belongs to the third channel.

By default StreamingData.values contains 16-bit ADC values. To convert the data into multiples of g (the standard gravity) you can

  • use the coroutine STH.get_acceleration_conversion_function() to retrieve a function that converts 16-bit ADC values values into multiples of g and then

  • use this function to convert streaming data with the method StreamingData.apply().

In the example below we convert the first retrieved streaming data object and return it:

>>> from asyncio import run
>>> from icotronic.can import Connection
>>> from icotronic.can import STH, StreamingConfiguration

>>> async def read_streaming_data_g():
...     async with Connection() as stu:
...         async with stu.connect_sensor_node("Test-STH", STH) as sth:
...             conversion_to_g = (await
...                 sth.get_acceleration_conversion_function())
...             channels = StreamingConfiguration(first=True)
...             async with sth.open_data_stream(channels) as stream:
...                 async for data, lost_messages in stream:
...                     data.apply(conversion_to_g)
...                     return data

>>> streaming_data = run(read_streaming_data_g())
>>> len(streaming_data.values)
3
>>> all([-100 <= value <= 100 for value in streaming_data.values])
True

Collecting Multiple Values

While working directly with the class StreamingData might make sense for small projects often you:

  • want to collect a bunch of data values,

  • work on data for a specific measurement channel, or

  • convert data values.

For that case you can use the class MeasurementData. The example code below

  • collects data for all three measurement channels,

  • applies a conversion into multiples of g for the first channel (only) and

  • checks that the values for the first channel are all between -2 g and 2 g.

>>> from asyncio import run
>>> from time import monotonic
>>> from icotronic.can import Connection, STH, StreamingConfiguration
>>> from icotronic.measurement import Conversion, MeasurementData

>>> async def collect_streaming_data(identifier):
...    async with Connection() as stu:
...        async with stu.connect_sensor_node(identifier, STH) as sth:
...            conversion_to_g = (await
...                sth.get_acceleration_conversion_function())
...            all_channels = StreamingConfiguration(
...                first=True, second=True, third=True
...            )
...            measurement_data = MeasurementData(all_channels)
...            async with sth.open_data_stream(all_channels) as stream:
...                messages = 10
...                async for data, _ in stream:
...                    measurement_data.append(data)
...                    messages -= 1
...                    if messages <= 0:
...                        break
...            measurement_data.apply(Conversion(first=conversion_to_g))
...            return measurement_data

>>> data = run(collect_streaming_data(identifier="Test-STH"))
>>> first_channel_in_g = data.first()
>>> all(-2 <= data.value <= 2 for data in first_channel_in_g)
True

Converting Data Values

The class Conversion allows you to apply different functions to the different channels of the streaming data (attributes first, second and third). Each function gets a measurement value (of type float) and should return a value of type float. If you do not want to apply any conversion to a certain channel you can use the (default value) of None for the Conversion channel attribute. In the example below we apply a Conversion object that

  • does not change the data for the first channel,

  • multiplies the values of the second channel by two and

  • adds the value 10 to the data of the second channel.

>>> from icotronic.can import StreamingConfiguration, StreamingData
>>> from icotronic.measurement import Conversion, MeasurementData

>>> measurement_data = MeasurementData(
...     StreamingConfiguration(first=True, second=True, third=True))
>>> s1 = StreamingData(counter=1, timestamp=1757946559.499677,
...                    values=[1.0, 2.0, 3.0])
>>> s2 = StreamingData(counter=2, timestamp=1757946559.499680,
...                    values=[4.0, 5.0, 6.0])
>>> measurement_data.append(s1)
>>> measurement_data.append(s2)
>>> measurement_data
Channel 1 enabled, Channel 2 enabled, Channel 3 enabled
[1.0, 2.0, 3.0]@1757946559.499677 #1
[4.0, 5.0, 6.0]@1757946559.49968 #2
>>> measurement_data.first()
1.0@1757946559.499677 #1
4.0@1757946559.49968 #2
>>> measurement_data.second()
2.0@1757946559.499677 #1
5.0@1757946559.49968 #2
>>> measurement_data.third()
3.0@1757946559.499677 #1
6.0@1757946559.49968 #2

>>> def multiply_by_two(value):
...     return value * 2
>>> conversion = Conversion(second=multiply_by_two,
...                         third=(lambda value: value + 10))
>>> measurement_data.apply(conversion)
Channel 1 enabled, Channel 2 enabled, Channel 3 enabled
[1.0, 4.0, 13.0]@1757946559.499677 #1
[4.0, 10.0, 16.0]@1757946559.49968 #2
>>> measurement_data.first()
1.0@1757946559.499677 #1
4.0@1757946559.49968 #2
>>> measurement_data.second()
4.0@1757946559.499677 #1
10.0@1757946559.49968 #2
>>> measurement_data.third()
13.0@1757946559.499677 #1
16.0@1757946559.49968 #2

Storing Data

If you want to store streaming data for later use you can use the Storage class to open a context manager that lets you store data as HDF5 file via the method add_streaming_data() of the class StorageData. The code below shows how to store one second of measurement data in a file called measurement.hdf5.

>>> from asyncio import run
>>> from pathlib import Path
>>> from time import monotonic
>>> from icotronic.can import Connection, STH, StreamingConfiguration
>>> from icotronic.measurement.storage import Storage

>>> async def store_streaming_data(identifier, storage):
...     async with Connection() as stu:
...         async with stu.connect_sensor_node(identifier, STH) as sth:
...             conversion_to_g = (await
...                 sth.get_acceleration_conversion_function())
...
...             # Store acceleration range as metadata
...             storage.write_sensor_range(
...                 await sth.get_acceleration_sensor_range_in_g()
...             )
...             # Store sampling rate (and ADC configuration as metadata)
...             storage.write_sample_rate(await sth.get_adc_configuration())
...
...             async with sth.open_data_stream(
...                 storage.streaming_configuration
...             ) as stream:
...                 # Read data for about one seconds
...                 end = monotonic() + 1
...                 async for data, _ in stream:
...                     # Convert from ADC bit value into multiples of g
...                     storage.add_streaming_data(
...                         data.apply(conversion_to_g))
...                     if monotonic() > end:
...                         break

>>> filepath = Path("measurement.hdf5") # Store data in HDF5 file
>>> with Storage(filepath, StreamingConfiguration(first=True)) as storage:
...     run(store_streaming_data("Test-STH", storage))

Since HDF5 is a standard file format you can use general purpose tools such as HDFView to view the stored data. To specifically analyze the data produced by the ICOtronic package you can also use one of the scripts of the ICOlyzer package.

For more information about the measurement format, please take a look at the section “Measurement Data” of the general ICOtronic package documentation.

Determining Data Loss

Sometimes the

  • connection to your sensor node might be bad or

  • code might run too slow to retrieve/process streaming data.

In both cases there will be some form of data loss. The ICOtronic library currently takes multiple measures to detect data loss.

Bad Connection

The iterator for streaming data AsyncStreamBuffer will raise a StreamingTimeoutError, if there is no streaming data for a certain amount of time (default: 5 seconds). The class AsyncStreamBuffer also provides access to statistics that can be used to determine the amount of lost data. For example, if you iterate through the streaming messages with async for, then in addition to the streaming data, the iterator will also return the amount of lost messages since the last successfully received message (lost_messages in the example below):

async with sensor_node.open_data_stream(channels) as stream:
    async for data, lost_messages in stream:
        if lost_messages > 0:
            print(f"Lost {lost_messages} messages!")

To access the overall data quality, since the start of streaming you can use the method AsyncStreamBuffer.dataloss(). The example code below shows how to use this method:

>>> from asyncio import run
>>> from time import monotonic
>>> from icotronic.can import Connection, StreamingConfiguration

>>> async def determine_data_loss(identifier):
...     async with Connection() as stu:
...         async with stu.connect_sensor_node(identifier) as sensor_node:
...              end = monotonic() + 1 # Read data for roughly one second
...              channels = StreamingConfiguration(first=True)
...              async with sensor_node.open_data_stream(channels) as stream:
...                  async for data, lost_messages in stream:
...                      if monotonic() > end:
...                          break
...
...                  return stream.dataloss()

>>> data_loss = run(determine_data_loss(identifier="Test-STH"))
>>> data_loss < 0.2 # We assume that the data loss was less than 20 %
True

If you want to calculate the amount of data loss for a specific time-span you can use the method AsyncStreamBuffer.reset_stats() to reset the message statistics at the start of the time-span. In the following example we stream data for (roughly) 2.1 seconds and return a list with the amount of data loss over periods of 0.5 seconds:

>>> from asyncio import run
>>> from time import monotonic
>>> from icotronic.can import Connection, StreamingConfiguration

>>> async def determine_data_loss(identifier):
...       async with Connection() as stu:
...           async with stu.connect_sensor_node(identifier) as sensor_node:
...               start = monotonic()
...               end = start + 2.1
...               last_reset = start
...               data_lost = []
...               channels = StreamingConfiguration(first=True)
...               async with sensor_node.open_data_stream(channels) as stream:
...                   async for data, lost_messages in stream:
...                       current = monotonic()
...                       if current >= last_reset + 0.5:
...                          data_lost.append(stream.dataloss())
...                          stream.reset_stats()
...                          last_reset = current
...                       if current > end:
...                           break
...
...                   return data_lost

>>> data_lost = run(determine_data_loss(identifier="Test-STH"))
>>> len(data_lost)
4
>>> all(map(lambda loss: loss < 0.1, data_lost))
True

Note

We used a overall runtime of 2.1 seconds, since in a timing interval of 2 seconds there is always the possibility that the code above either returns three or four data loss values depending on the specific timing.

Slow Processing of Data

The buffer of the CAN controller is only able to store a certain amount of streaming messages before it has to drop them to make room for new ones. For this reason the ICOtronic library will raise a StreamingBufferError, if the buffer for streaming messages exceeds a certain threshold (default: 10 000 messages).

Auxiliary Functionality

Reading Names

After your are connected to a node you can read its (advertisement) name using the coroutine SensorNode.get_name():

>>> from asyncio import run
>>> from icotronic.can import Connection

>>> async def read_sensor_name(name):
...     async with Connection() as stu:
...         async with stu.connect_sensor_node(name) as sensor_node:
...             sensor_name = await sensor_node.get_name()
...             return sensor_name

>>> sensor_name = "Test-STH"
>>> run(read_sensor_name(sensor_name))
'Test-STH'

Changing ADC Configuration

To change

  • the sample rate/frequency (via prescaler, acquisition time and oversampling rate) or

  • the reference voltage

of the analog digital converter (ADC) of your sensor node you can use the coroutine SensorNode.set_adc_configuration(). Since all of the parameters of this coroutine use default values, you can also just call it without changing any parameters to apply the default ADC configuration.

Note

Some sensor nodes use a different reference voltage (not 3.3V). In this case applying the default configuration might not be what you want.

To retrieve the current ADC configuration use the coroutine SensorNode.get_adc_configuration(), which will return an ADCConfiguration object. This object provides the method ADCConfiguration.sample_rate() to calculate the sampling rate/frequency based on the current value of prescaler, acquisition time and oversampling rate.

In the example below we

  • apply the default ADC configuration on the sensor node,

  • retrieve the configuration via the coroutine SensorNode.get_adc_configuration(), and then

  • print the sampling rate/frequency based on the retrieved ADC configuration values.

>>> from asyncio import run
>>> from icotronic.can import Connection
>>> from icotronic.can.adc import ADCConfiguration

>>> async def write_read_adc_config(name):
...     async with Connection() as stu:
...         async with stu.connect_sensor_node(name) as sensor_node:
...             # Set default configuration
...             await sensor_node.set_adc_configuration()
...             # Retrieve ADC configuration
...             adc_configuration = await sensor_node.get_adc_configuration()
...             print("Sample Rate: "
...                   f"{adc_configuration.sample_rate():0.2f} Hz")

>>> run(write_read_adc_config("Test-STH"))
Sample Rate: 9523.81 Hz

One option to decrease the sample rate from the default value of about 9524 Hz is to change the

  • prescaler,

  • acquisition time or

  • oversampling rate.

For a

  • formula on how to calculate the sample rate based on the values above and

  • a list of suggested sample rates

please take a look at the section “Sampling Rate” of the general ICOtronic system documentation. The example code below shows you how to change the sample rate to about 4762 Hz.

>>> from asyncio import run
>>> from icotronic.can import Connection
>>> from icotronic.can.adc import ADCConfiguration

>>> async def change_sample_rate(name):
...     async with Connection() as stu:
...         async with stu.connect_sensor_node(name) as sensor_node:
...             # Retrieve current reference voltage
...             adc_configuration = await sensor_node.get_adc_configuration()
...             reference_voltage = adc_configuration.reference_voltage
...             # Change sample rate
...             configuration = ADCConfiguration(
...                                 prescaler=2,
...                                 acquisition_time=8,
...                                 oversampling_rate=128,
...                                 reference_voltage=reference_voltage)
...             print("Set sample rate to "
...                   f"{configuration.sample_rate():0.2f} Hz")
...             await sensor_node.set_adc_configuration(**configuration)

>>> run(change_sample_rate("Test-STH"))
Set sample rate to 4761.90 Hz

Changing Channel Configuration

Some ICOtronic sensor nodes support changing the mapping from the three available measurement channels to different sensor channels. To get the current mapping you can use the coroutine SensorNode.get_sensor_configuration(). The following code:

from asyncio import run
from icotronic.can import Connection
from icotronic.can.error import UnsupportedFeatureException

async def read_sensor_configuration(identifier):
   async with Connection() as stu:
        async with stu.connect_sensor_node(identifier) as sensor_node:
            try:
                print(await sensor_node.get_sensor_configuration())
            except UnsupportedFeatureException as error:
                print(error)

run(read_sensor_configuration("Test-STH"))

will either

  • print the current sensor configuration e.g.:

    M1: S1, M2: S2, M3: S3
    
  • or print an error message:

    Reading sensor configuration not supported
    

    if the sensor node does not support getting (or setting) the sensor configuration.

To set the sensor configuration you can use the coroutine SensorNode.set_sensor_configuration().

The method expects a SensorConfiguration object as parameter. The following code:

from asyncio import run
from icotronic.can import Connection, SensorConfiguration
from icotronic.can.error import UnsupportedFeatureException

async def set_sensor_configuration(identifier):
    async with Connection() as stu:
        async with stu.connect_sensor_node(identifier) as sensor_node:
            try:
                config = await sensor_node.get_sensor_configuration()
                print(f"Sensor configuration: {config}")
                await sensor_node.set_sensor_configuration(
                    SensorConfiguration(first=2, second=4, third=1)
                )
                config = await sensor_node.get_sensor_configuration()
                print(f"Updated sensor configuration: {config}")
            except UnsupportedFeatureException:
                print("Sensor configuration not supported by device")

run(set_sensor_configuration("Test-STH"))

will set the sensor configuration for

  • the first measurement channel to the sensor channel 2,

  • the second measurement channel to the sensor channel 4, and

  • the third measurement channel to the sensor channel 1.

For hardware that supports channel configuration the code should print the following text:

Sensor configuration: M1: S1, M2: S2, M3: S3
Updated sensor configuration: M1: S2, M2: S4, M3: S1

Classes & Functions

Connection

class icotronic.can.Connection

Basic class to initialize CAN communication

To actually connect to the CAN bus you need to use the async context manager, provided by this class. If you want to manage the connection yourself, please just use __aenter__ and __aexit__ manually.

Examples

Create a new connection (without connecting to the CAN bus)

>>> connection = Connection()
async __aenter__()

Connect to the STU

Return type:

STU

Returns:

An object that can be used to communicate with the STU

Raises:

CANInitError – if the CAN initialization fails

Examples

Import required library code

>>> from asyncio import run

Use a context manager to handle the cleanup process automatically

>>> async def connect_can_context():
...     async with Connection() as stu:
...         pass
>>> run(connect_can_context())

Create and shutdown the connection explicitly

>>> async def connect_can_manual():
...     connection = Connection()
...     connected = await connection.__aenter__()
...     await connection.__aexit__(None, None, None)
>>> run(connect_can_manual())

Nodes

class icotronic.can.STU(spu)

Communicate and control a connected STU

Parameters:

spu (SPU) – The SPU object that created this STU instance

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Create an STU object

>>> async def create_stu():
...     async with Connection() as stu:
...         pass # call some coroutines of `stu` object
>>> run(create_stu())
async activate_bluetooth()

Activate Bluetooth on the STU

Return type:

None

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Activate Bluetooth on the STU

>>> async def activate():
...     async with Connection() as stu:
...         await stu.activate_bluetooth()
>>> run(activate())
async collect_sensor_nodes(timeout=5)

Collect available sensor nodes

This coroutine collects sensor nodes until either

  • no new sensor node was found or

  • until the given timeout, if no sensor node was found.

Parameters:

timeout – The timeout in seconds until this coroutine returns, if no sensor node was found at all

Return type:

list[SensorNodeInfo]

Returns:

A list of found sensor nodes

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Collect available sensor nodes

>>> async def collect_sensor_nodes():
...     async with Connection() as stu:
...         return await stu.collect_sensor_nodes()
>>> # We assume that at least one sensor node is available
>>> nodes = run(collect_sensor_nodes())
>>> len(nodes) >= 1
True
connect_sensor_node(identifier, sensor_node_class=<class 'icotronic.can.node.sensor.SensorNode'>)

Connect to a sensor node (e.g. SHA, SMH or STH)

Parameters:
  • identifier (int | str | EUI) –

    The

    • MAC address (EUI),

    • name (str), or

    • node number (int)

    of the sensor node we want to connect to

  • sensor_node_class (type[SensorNode]) – Sensor node subclass that should be returned by context manager

Return type:

AsyncSensorNodeManager

Returns:

A context manager that returns a sensor node object for the connected node

Raises:

ValueError – If you use an invalid name or node number as identifier

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Connect to the sensor node with node number 0

>>> async def connect_sensor_node():
...     async with Connection() as stu:
...         async with stu.connect_sensor_node(0):
...             connected = await stu.is_connected()
...         after = await stu.is_connected()
...         return (connected, after)
>>> run(connect_sensor_node())
(True, False)
async connect_with_mac_address(mac_address)

Connect to a Bluetooth sensor node using its MAC address

Parameters:

mac_address (EUI) – The MAC address of the sensor node

Return type:

None

Examples

Import required library code

>>> from asyncio import run, sleep
>>> from icotronic.can.connection import Connection

Connect to a sensor node via its MAC address

>>> async def get_bluetooth_mac():
...     async with Connection() as stu:
...         await stu.activate_bluetooth()
...         # Wait for Bluetooth activation to take place
...         await sleep(2)
...         return await stu.get_mac_address(0)
>>> mac_address = run(get_bluetooth_mac())
>>> mac_address != EUI(0)
True
>>> async def connect(mac_address):
...     async with Connection() as stu:
...         await stu.deactivate_bluetooth()
...         # We assume that at least one STH is available
...         connected = before = await stu.is_connected()
...         await stu.activate_bluetooth()
...         while not connected:
...             await stu.connect_with_mac_address(mac_address)
...             await sleep(0.1)
...             connected = await stu.is_connected()
...         await stu.deactivate_bluetooth()
...         after = await stu.is_connected()
...         # Return status of Bluetooth node connect response
...         return before, connected, after
>>> run(connect(mac_address))
(False, True, False)
async connect_with_number(sensor_node_number=0)

Connect to a Bluetooth node using a node number

Parameters:

sensor_node_number (int) – The number of the Bluetooth node (0 up to the number of available nodes - 1)

Return type:

bool

Returns:

  • True, if

    1. in search mode,

    2. at least single node was found,

    3. no legacy mode,

    4. and scanning mode active

  • False, otherwise

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Connect to node “0”

>>> async def connect_bluetooth_sensor_node_number():
...     async with Connection() as stu:
...         await stu.activate_bluetooth()
...         # We assume that at least one STH is available
...         connected = before = await stu.is_connected()
...         while not connected:
...             connected = await stu.connect_with_number(0)
...         await stu.deactivate_bluetooth()
...         after = await stu.is_connected()
...         # Return status of Bluetooth node connect response
...         return before, connected, after
>>> run(connect_bluetooth_sensor_node_number())
(False, True, False)
async deactivate_bluetooth()

Deactivate Bluetooth on the STU

Return type:

None

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Deactivate Bluetooth on STU 1

>>> async def deactivate_bluetooth():
...     async with Connection() as stu:
...         await stu.deactivate_bluetooth()
>>> run(deactivate_bluetooth())
async get_available_nodes()

Retrieve the number of available sensor nodes

Return type:

int

Returns:

The number of available sensor nodes

Examples

Import required library code

>>> from asyncio import run, sleep
>>> from icotronic.can.connection import Connection

Get the number of available Bluetooth nodes at STU 1

>>> async def get_number_bluetooth_nodes():
...     async with Connection() as stu:
...         await stu.activate_bluetooth()
...
...         # We assume at least one STH is available
...         number_sths = 0
...         while number_sths <= 0:
...             number_sths = await stu.get_available_nodes()
...             await sleep(0.1)
...
...         return number_sths
>>> run(get_number_bluetooth_nodes()) >= 0
1
async get_mac_address(sensor_node_number=255)

Retrieve the MAC address of the STU or a sensor node

Note

Bluetooth needs to be activated before calling this coroutine, otherwise an incorrect MAC address will be returned (for sensor nodes).

Parameters:

sensor_node_number (int) – The node number of the Bluetooth node (0 up to the number of available nodes - 1) or 0x00 (SENSOR_NODE_NUMBER_SELF_ADDRESSING) to retrieve the MAC address of the STU itself

Return type:

EUI

Returns:

The MAC address of the specified sensor node

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Retrieve the MAC address of STH 1

>>> async def get_bluetooth_mac():
...     async with Connection() as stu:
...         await stu.activate_bluetooth()
...         return await stu.get_mac_address(0)
>>> mac_address = run(get_bluetooth_mac())
>>> isinstance(mac_address, EUI)
True
>>> mac_address != EUI(0)
True
async get_name(sensor_node_number=255)

Retrieve the name of a sensor node

Parameters:

sensor_node_number (int) – The number of the Bluetooth node (0 up to the number of available nodes - 1); Use the special node number SENSOR_NODE_NUMBER_SELF_ADDRESSING to retrieve the name of the STU itself.

Note

You are probably only interested in the name of the STU itself (SENSOR_NODE_NUMBER_SELF_ADDRESSING), if you want to know the advertisement name of the STU in OTA (Over The Air) update mode for flashing a new firmware onto the STU.

Return type:

str

Returns:

The (Bluetooth broadcast) name of the node

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Get Bluetooth advertisement name of node “0” from STU 1

>>> async def get_bluetooth_node_name():
...     async with Connection() as stu:
...         await stu.activate_bluetooth()
...         # We assume that at least one STH is available
...         return await stu.get_name(0)
>>> name = run(get_bluetooth_node_name())
>>> isinstance(name, str)
True
>>> 0 <= len(name) <= 8
True
async get_rssi(sensor_node_number)

Retrieve the RSSI (Received Signal Strength Indication) of an STH

Parameters:

sensor_node_number (int) – The number of the Bluetooth node (0 up to the number of available nodes)

Returns:

The RSSI of the node

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Retrieve the RSSI of a disconnected STH

>>> async def get_bluetooth_rssi():
...     async with Connection() as stu:
...         await stu.activate_bluetooth()
...         # We assume that at least one STH is available
...         # Get the RSSI of node “0”
...         return await stu.get_rssi(0)
>>> rssi = run(get_bluetooth_rssi())
>>> -80 < rssi < 0
True
async get_sensor_nodes()

Retrieve a list of available sensor nodes

Return type:

list[SensorNodeInfo]

Returns:

A list of available nodes including node number, name, MAC address and RSSI for each node

Examples

Import required library code

>>> from asyncio import run, sleep
>>> from netaddr import EUI
>>> from icotronic.can.connection import Connection

Retrieve the list of Bluetooth nodes at STU 1

>>> async def get_sensor_nodes():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         nodes = []
...         while not nodes:
...             nodes = await stu.get_sensor_nodes()
...             await sleep(0.1)
...
...         return nodes
>>> nodes = run(get_sensor_nodes())
>>> len(nodes) >= 1
True
>>> node = nodes[0]
>>> node.sensor_node_number
0
>>> isinstance(node.name, str)
True
>>> 0 <= len(node.name) <= 8
True
>>> -80 < node.rssi < 0
True
>>> isinstance(node.mac_address, EUI)
True
async is_connected()

Check if the STU is connected to a Bluetooth node

Returns:

  • True, if a Bluetooth node is connected to the node

  • False, otherwise

Examples

>>> from asyncio import run, sleep
>>> from icotronic.can.connection import Connection

Check connection of node “0” to STU

>>> async def check_bluetooth_connection():
...     async with Connection() as stu:
...         await stu.activate_bluetooth()
...         await sleep(0.1)
...         connected_start = await stu.is_connected()
...
...         # We assume that at least one STH is available
...         await stu.connect_with_number(0)
...         # Wait for node connection
...         connected_between = False
...         while not connected_between:
...             connected_between = await stu.is_connected()
...             await sleep(0.1)
...             await stu.connect_with_number(0)
...
...         # Deactivate Bluetooth connection
...         await stu.deactivate_bluetooth()
...         # Wait until node is disconnected
...         await sleep(0.1)
...         connected_after = await stu.is_connected()
...
...         return (connected_start, connected_between,
...                 connected_after)
>>> run(check_bluetooth_connection())
(False, True, False)
Return type:

bool

class icotronic.can.SensorNode(spu, eeprom=<class 'icotronic.can.node.eeprom.sensor.SensorNodeEEPROM'>)

Communicate and control a connected sensor node (SHA, STH, SMH)

Parameters:
  • spu (SPU) – The SPU object used to connect to this sensor node

  • eeprom (type[SensorNodeEEPROM]) – The EEPROM class of the node

async get_adc_configuration()

Read the current ADC configuration

Return type:

ADCConfiguration

Returns:

The ADC configuration of the sensor node

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Read ADC sensor config of sensor node with node id 0

>>> async def read_adc_config():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             return await sensor_node.get_adc_configuration()
>>> run(read_adc_config())
Get, Prescaler: 2, Acquisition Time: 8, Oversampling Rate: 64,
Reference Voltage: 3.3 V
async get_energy_mode_lowest()

Read the reduced lowest energy mode (mode 2) time values

See also:

Return type:

Times

Returns:

A tuple containing the advertisement time in the lowest energy mode in milliseconds and the time until the node will switch from the reduced energy mode (mode 1) to the lowest energy mode (mode 2) – if there is no activity – in milliseconds

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Retrieve the reduced energy time values of a sensor node

>>> async def read_energy_mode_lowest():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             return await sensor_node.get_energy_mode_lowest()
>>> times = run(read_energy_mode_lowest())
>>> round(times.advertisement)
2500
>>> times.sleep
259200000
async get_energy_mode_reduced()

Read the reduced energy mode (mode 1) sensor node time values

See also:

Return type:

Times

Returns:

A tuple containing the advertisement time in the reduced energy mode in milliseconds and the time until the node will switch from the disconnected state to the low energy mode (mode 1) – if there is no activity – in milliseconds

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Retrieve the reduced energy time values of a sensor node

>>> async def read_energy_mode_reduced():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             return await sensor_node.get_energy_mode_reduced()
>>> times = run(read_energy_mode_reduced())
>>> round(times.advertisement)
1250
>>> times.sleep
300000
async get_mac_address()

Retrieve the MAC address of the sensor node

Return type:

EUI

Returns:

The MAC address of the specified sensor node

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Retrieve the MAC address of STH 1

>>> async def get_bluetooth_mac():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             return await sensor_node.get_mac_address()
>>> mac_address = run(get_bluetooth_mac())
>>> isinstance(mac_address, EUI)
True
>>> mac_address != EUI(0)
True
async get_name()

Retrieve the name of the sensor node

Return type:

str

Returns:

The (Bluetooth broadcast) name of the node

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Get Bluetooth advertisement name of node “0”

>>> async def get_sensor_node_name():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             return await sensor_node.get_name()
>>> name = run(get_sensor_node_name())
>>> isinstance(name, str)
True
>>> 0 <= len(name) <= 8
True
async get_rssi()

Retrieve the RSSI (Received Signal Strength Indication) of the node

Return type:

int

Returns:

The RSSI of the node

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Get RSSI of node “0”

>>> async def get_sensor_node_rssi():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             return await sensor_node.get_rssi()
>>> rssi = run(get_sensor_node_rssi())
>>> -70 < rssi < 0
True
async get_sensor_configuration()

Read the current sensor configuration

Raises:

UnsupportedFeatureException – in case the sensor node replies with an error message

Return type:

SensorConfiguration

Returns:

The sensor number for the different axes

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Reading sensor config from node without sensor config support fails

>>> async def read_sensor_config():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             return await sensor_node.get_sensor_configuration()
>>> config = run(
...     read_sensor_config())
Traceback (most recent call last):
   ...
UnsupportedFeatureException: Reading sensor configuration is not
supported
async get_streaming_data_single(channels=Channel 1 enabled, Channel 2 enabled, Channel 3 enabled)

Read a single set of raw ADC values from the sensor node

Parameters:

channels – Specifies which of the three measurement channels should be enabled or disabled

Return type:

StreamingData

Returns:

The latest three ADC values measured by the sensor node

Examples

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Read a single value from all three measurement channels

>>> async def read_sensor_values():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             return (await
...                 sensor_node.get_streaming_data_single())
>>> data = run(read_sensor_values())
>>> len(data.values)
3
>>> all([0 <= value <= 0xffff for value in data.values])
True
async get_supply_voltage()

Read the current supply voltage

Return type:

float

Returns:

The supply voltage of the sensor node

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Read the supply voltage of the sensor node with node number 0

>>> async def get_supply_voltage():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             return await sensor_node.get_supply_voltage()
>>> supply_voltage = run(get_supply_voltage())
>>> 3 <= supply_voltage <= 4.2
True
open_data_stream(channels, timeout=5)

Open measurement data stream

Parameters:
  • channels (StreamingConfiguration) – Specifies which measurement channels should be enabled

  • timeout (float) – The amount of seconds between two consecutive messages, before a TimeoutError will be raised

Return type:

DataStreamContextManager

Returns:

A context manager object for managing stream data

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Read data of first and third channel

>>> async def read_streaming_data():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             channels = StreamingConfiguration(first=True,
...                                               third=True)
...             async with sensor_node.open_data_stream(
...               channels) as stream:
...                 first = []
...                 third = []
...                 messages = 0
...                 async for data, _ in stream:
...                     first.append(data.values[0])
...                     third.append(data.values[1])
...                     messages += 1
...                     if messages >= 3:
...                         break
...                 return first, third
>>> first, third = run(read_streaming_data())
>>> len(first)
3
>>> len(third)
3
async set_adc_configuration(reference_voltage=3.3, prescaler=2, acquisition_time=8, oversampling_rate=64)

Change the ADC configuration of a connected sensor node

Parameters:
  • reference_voltage (float) – The ADC reference voltage in Volt (1.25, 1.65, 1.8, 2.1, 2.2, 2.5, 2.7, 3.3, 5, 6.6)

  • prescaler (int) – The ADC prescaler value (1 – 127)

  • acquisition_time (int) – The ADC acquisition time in number of cycles (1, 2, 3, 4, 8, 16, 32, … , 256)

  • oversampling_rate (int) – The ADC oversampling rate (1, 2, 4, 8, … , 4096)

Return type:

None

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Read and write ADC sensor config

>>> async def write_read_adc_config():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             await sensor_node.set_adc_configuration(
...                 3.3, 8, 8, 64)
...             modified_config1 = (await
...                 sensor_node.get_adc_configuration())
...
...             adc_config = ADCConfiguration(
...                 reference_voltage=5.0,
...                 prescaler=16,
...                 acquisition_time=8,
...                 oversampling_rate=128)
...             await sensor_node.set_adc_configuration(
...                 **adc_config)
...             modified_config2 = (await
...                 sensor_node.get_adc_configuration())
...
...             # Write back default config values
...             await sensor_node.set_adc_configuration(
...                 3.3, 2, 8, 64)
...             return modified_config1, modified_config2
>>> config1, config2 = run(write_read_adc_config())
>>> config1
Get, Prescaler: 8, Acquisition Time: 8, Oversampling Rate: 64,
Reference Voltage: 3.3 V
>>> config2
Get, Prescaler: 16, Acquisition Time: 8, Oversampling Rate: 128,
Reference Voltage: 5.0 V
async set_energy_mode_lowest(times=Advertisement Time: 2500.0 ms, Sleep Time: 259200000 ms)

Writes the time values for the lowest energy mode (mode 2)

See also:

Parameters:

times (Times) – The values for the advertisement time in the reduced energy mode in milliseconds and the time until the node will go into the lowest energy mode (mode 2) from the reduced energy mode (mode 1) – if there is no activity – in milliseconds.

Return type:

None

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Read and write the reduced energy time values of a sensor node

>>> async def read_write_energy_mode_lowest(sleep, advertisement):
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             await sensor_node.set_energy_mode_lowest(
...                 Times(sleep=sleep,
...                       advertisement=advertisement))
...             times = await sensor_node.get_energy_mode_lowest()
...
...             await sensor_node.set_energy_mode_lowest()
...
...             return times
>>> times = run(read_write_energy_mode_lowest(200_000, 2000))
>>> times.sleep
200000
>>> round(times.advertisement)
2000
async set_energy_mode_reduced(times=Advertisement Time: 1250.0 ms, Sleep Time: 300000 ms)

Writes the time values for the reduced energy mode (mode 1)

See also:

Parameters:

times (Times) – The values for the advertisement time in the reduced energy mode in milliseconds and the time until the node will go into the low energy mode (mode 1) from the disconnected state – if there is no activity – in milliseconds.

Return type:

None

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Read and write the reduced energy time values of a sensor node

>>> async def read_write_energy_mode_reduced(sleep, advertisement):
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             await sensor_node.set_energy_mode_reduced(
...                 Times(sleep=sleep,
...                       advertisement=advertisement))
...             times = await sensor_node.get_energy_mode_reduced()
...
...             await sensor_node.set_energy_mode_reduced()
...
...             return times
>>> times = run(read_write_energy_mode_reduced(200_000, 2000))
>>> times.sleep
200000
>>> round(times.advertisement)
2000
async set_name(name)

Set the name of a sensor node

Parameters:

name (str) – The new name for the node

Return type:

None

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Change the name of a sensor node

>>> async def test_naming(name):
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         # and that this node currently does not have the name
...         # specified in the variable `name`.
...         async with stu.connect_sensor_node(0) as sensor_node:
...             before = await sensor_node.get_name()
...             await sensor_node.set_name(name)
...             updated = await sensor_node.get_name()
...             await sensor_node.set_name(before)
...             after = await sensor_node.get_name()
...             return before, updated, after
>>> before, updated, after = run(test_naming("Hello"))
>>> before != "Hello"
True
>>> updated
'Hello'
>>> before == after
True
async set_sensor_configuration(sensors)

Change the sensor numbers for the different measurement channels

If you use the sensor number 0 for one of the different measurement channels, then the sensor (number) for that channel will stay the same.

Parameters:

sensors (SensorConfiguration) – The sensor numbers of the different measurement channels

Return type:

None

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Setting sensor config from node without sensor config support fails

>>> async def set_sensor_config():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0) as sensor_node:
...             await sensor_node.set_sensor_configuration(
...                 SensorConfiguration(
...                     first=0, second=0, third=0))
>>> config = run(
...     set_sensor_config())
Traceback (most recent call last):
   ...
UnsupportedFeatureException: Writing sensor configuration is not
supported
async start_streaming_data(channels)

Start streaming data

Parameters:

channels (StreamingConfiguration) – Specifies which of the three measurement channels should be enabled or disabled

Return type:

None

The CAN identifier that this coroutine returns can be used to filter CAN messages that contain the expected streaming data

async stop_streaming_data(retries=10, ignore_errors=False)

Stop streaming data

Parameters:
  • retries (int) – The number of times the message is sent again, if no response was sent back in a certain amount of time

  • ignore_errors – Specifies, if this coroutine should ignore, if there were any problems while stopping the stream.

Return type:

None

class icotronic.can.STH(spu)

Communicate and control a connected SHA or STH

Parameters:

spu (SPU) – The SPU object used to communicate with the node

async activate_acceleration_self_test(dimension='x')

Activate self test of STH accelerometer

Parameters:

dimension (str) – The dimension (x, y or z) for which the self test should be activated.

Return type:

None

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Activate and deactivate acceleration self-test

>>> async def test_self_test():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is
...         # available
...         async with stu.connect_sensor_node(0, STH) as sth:
...             await sth.activate_acceleration_self_test()
...             await sth.deactivate_acceleration_self_test()
>>> run(test_self_test())
async deactivate_acceleration_self_test(dimension='x')

Deactivate self test of STH accelerometer

Parameters:

dimension (str) – The dimension (x, y or z) for which the self test should be deactivated.

Return type:

None

async get_acceleration_conversion_function(default=200, ignore_errors=False)

Retrieve function to convert raw sensor data into g

Parameters:
  • default (int) – The default sensor range that should be used, if the sensor range could not be determined from the EEPROM; This value will only be used if ignore_errors is set to True; The default value for of 200 represents a ±100 g₀ sensor

  • ignore_errors (bool) – Determines, if the function should raise an error, if the sensor range value value could not be determined; If this is set to False, then the function will use the value default as default sensor range

Return type:

Callable[[float], float]

Returns:

A function that converts 16 bit raw values from the STH into multiples of earth’s gravitation (g)

Raises:

ValueError – If the sensor range could not be determined and ignore_errors is set to False

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection
>>> from icotronic.can.node.sth import STH

Convert a raw ADC value into multiples of g

>>> async def read_sensor_values():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0, STH) as sth:
...             convert_to_g = (await
...                 sth.get_acceleration_conversion_function())
...             data = await sth.get_streaming_data_single()
...             before = list(data.values)
...             data.apply(convert_to_g)
...             return before, data.values
>>> before, after = run(read_sensor_values())
>>> all([0 <= value <= 2**16 for value in before])
True
>>> all([-100 <= value <= 100 for value in after])
True
async get_acceleration_sensor_range_in_g(default=200, ignore_errors=False)

Retrieve the maximum acceleration sensor range in multiples of g₀

  • For a ±100 g₀ sensor this method returns 200 (100 + abs(-100)).

  • For a ±50 g₀ sensor this method returns 100 (50 + abs(-50)).

For this to work correctly the EEPROM value of the x-axis acceleration offset in the EEPROM has to be set.

Parameters:
  • default (int) – The default value that should be used, if the value could not be determined from the EEPROM; This value will only be used if ignore_errors is set to True

  • ignore_errors (bool) – Determines, if the function should raise an error, if the value could not be determined from the EEPROM

Return type:

int

Returns:

Range of current acceleration sensor in multiples of earth’s gravitation

Raises:

ValueError – If the sensor range could not be determined and ignore_errors is set to False

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection
>>> from icotronic.can.node.sth import STH

Write and read the acceleration offset of STH 1

>>> async def read_sensor_range():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0, STH) as sth:
...             return (await
...                     sth.get_acceleration_sensor_range_in_g())
>>> sensor_range = run(read_sensor_range())
>>> 0 < sensor_range <= 200
True
async get_acceleration_voltage(dimension='x', reference_voltage=3.3)

Retrieve the current acceleration voltage in Volt

Parameters:
  • dimension (str) – The dimension (x=1, y=2, z=3) for which the acceleration voltage should be measured

  • reference_voltage (float) – The reference voltage for the ADC in Volt

Return type:

float

Returns:

The voltage of the acceleration sensor in Volt

Examples

Import required library code

>>> from asyncio import run
>>> from icotronic.can.connection import Connection

Read the acceleration voltage of STH 1

>>> async def read_acceleration_voltage():
...     async with Connection() as stu:
...         # We assume that at least one sensor node is available
...         async with stu.connect_sensor_node(0, STH) as sth:
...             before = await sth.get_acceleration_voltage()
...             await sth.activate_acceleration_self_test()
...             between = await sth.get_acceleration_voltage()
...             await sth.deactivate_acceleration_self_test()
...             after = await sth.get_acceleration_voltage()
...         return (before, between, after)
>>> before, between, after = run(read_acceleration_voltage())
>>> before < between and after < between
True

Streaming

class icotronic.can.streaming.AsyncStreamBuffer(timeout, max_buffer_size)

Buffer for streaming data

Parameters:
  • timeout (float) – The amount of seconds between two consecutive messages, before a StreamingTimeoutError will be raised

  • max_buffer_size (int) – Maximum amount of buffered messages kept by the stream buffer. If this amount is exceeded, then this listener will raise a StreamingBufferError. A large buffer indicates that the application is not able to keep up with the current rate of retrieved messages and therefore the probability of losing messages is quite high.

dataloss()

Calculate the overall amount of data loss

Return type:

float

Returns:

The overall amount of data loss as number between 0 (no data loss) and 1 (all data lost).

reset_stats()

Reset the message statistics

This method resets the amount of lost an retrieved messages used in the calculation of the method dataloss. Using this method can be useful, if you want to calculate the amount of data loss since a specific starting point.

Return type:

None

class icotronic.can.streaming.StreamingConfiguration(first=True, second=False, third=False)

Streaming configuration

Parameters:
  • first (bool) – Specifies if the first channel is enabled or not

  • second (bool) – Specifies if the second channel is enabled or not

  • third (bool) – Specifies if the third channel is enabled or not

Raises:

ValueError – if none of the channels is active

Examples

Create some example streaming configurations

>>> config = StreamingConfiguration()
>>> config = StreamingConfiguration(
...          first=False, second=True, third=True)

Creating streaming configurations without active channels will fail

>>> config = StreamingConfiguration(first=False)
Traceback (most recent call last):
   ...
ValueError: At least one channel needs to be active
>>> config = StreamingConfiguration(
...     first=False, second=False, third=False)
Traceback (most recent call last):
   ...
ValueError: At least one channel needs to be active
axes()

Get the activated axes returned by this streaming configuration

Return type:

list[str]

Returns:

A list containing all activated axes in alphabetical order

Examples

Get the activated axes for example streaming configurations

>>> StreamingConfiguration(
...     first=False, second=True, third=True).axes()
['y', 'z']
>>> StreamingConfiguration(
...     first=True, second=True, third=False).axes()
['x', 'y']
data_length()

Returns the streaming data length

This will be either:

  • 2 (when 2 channels are active), or

  • 3 (when 1 or 3 channels are active)

For more information, please take a look here.

Return type:

int

Returns:

The length of the streaming data resulting from this channel configuration

Examples

Get the data length of example streaming configurations

>>> StreamingConfiguration().data_length()
3
>>> StreamingConfiguration(
...     first=False, second=True, third=False).data_length()
3
>>> StreamingConfiguration(
...     first=True, second=True, third=True).data_length()
3
>>> StreamingConfiguration(
...     first=False, second=True, third=True).data_length()
2
enabled_channels()

Get the number of activated channels

Return type:

int

Returns:

The number of enabled channels

Examples

Get the number of enabled channels for example streaming configs

>>> StreamingConfiguration(first=True).enabled_channels()
1
>>> StreamingConfiguration(first=False, second=True, third=False
...                       ).enabled_channels()
1
>>> StreamingConfiguration(first=True, second=True, third=True
...                       ).enabled_channels()
3
property first: bool

Check the activation state of the first channel

Returns:

True, if the first channel is enabled or False otherwise

Examples

Check channel one activation status for example configs

>>> StreamingConfiguration(first=True, second=False,
...                        third=False).first
True
>>> StreamingConfiguration(first=False, second=False,
...                        third=True).first
False
property second: bool

Check the activation state of the second channel

Returns:

True, if the second channel is enabled or False otherwise

Examples

Check channel two activation status for example configs

>>> StreamingConfiguration(
...     first=True, second=False, third=False).second
False
>>> StreamingConfiguration(
...     first=False, second=True, third=True).second
True
property third: bool

Check the activation state of the third channel

Returns:

True, if the third channel is enabled or False otherwise

Examples

Check channel three activation status for example configs

>>> StreamingConfiguration(
...     first=True, second=False, third=False).third
False
>>> StreamingConfiguration(
...     first=False, second=False, third=True).third
True
class icotronic.can.streaming.StreamingData(counter, timestamp, values)

Support for storing data of a streaming message

Parameters:
  • counter (int) – The message counter value

  • timestamp (float) – The message timestamp

  • values (list[float]) – The streaming values

Examples

Create new streaming data

>>> StreamingData(values=[1, 2, 3], counter=21, timestamp=1)
[1, 2, 3]@1 #21

Streaming data must store either two or three values

>>> StreamingData(values=[1], counter=21, timestamp=1)
Traceback (most recent call last):
...
ValueError: Incorrect number of streaming values: 1 (instead of 2 or 3)
>>> StreamingData(values=[1, 2, 3, 4], counter=21, timestamp=1
...              )
Traceback (most recent call last):
...
ValueError: Incorrect number of ... values: 4 (instead of 2 or 3)
apply(function)

Apply a certain function to the streaming data

Note

This function changes the stored values in the streaming data and (as convenience feature) also returns the modified streaming data itself. This is useful if you want to use the modified streaming as parameter in a function call, i.e. you can use something like function(stream_data.apply()).

Parameters:

function (Callable[[float], float]) – The function that should be applied to the streaming data

Return type:

StreamingData

Returns:

The modified streaming data

Examples

Add the constant 10 to some example streaming data

>>> data = StreamingData(values=[1, 2, 3], counter=21, timestamp=1)
>>> data.apply(lambda value: value + 10)
[11, 12, 13]@1 #21
>>> data.values
[11, 12, 13]

Measurement

Data

class icotronic.measurement.DataPoint(counter: int, timestamp: float, value: float)

Streaming data point

Create new instance of DataPoint(counter, timestamp, value)

counter: int

Message counter

timestamp: float

Timestamp of data

value: float

Data value

class icotronic.measurement.ChannelData

Store measurement data for a single channel

__add__(other)

Concatenate channel data with other channel data

Parameters:

other (object) – The other channel data that should be concatenated to this channel data object

Return type:

ChannelData

Returns:

A new channel data object containing the data point of this channel data object followed by the data points of other

Examples

Concatenate some channel data

>>> t1 = 1756124450.256398
>>> data1 = ChannelData()
>>> data1.append(DataPoint(counter=1, timestamp=t1, value=4))
>>> data1.append(DataPoint(counter=1, timestamp=t1, value=5))
>>> data1.append(DataPoint(counter=1, timestamp=t1, value=6))
>>> data1
4@1756124450.256398 #1
5@1756124450.256398 #1
6@1756124450.256398 #1
>>> t2 = 1756124450.2564
>>> data2 = ChannelData()
>>> data2.append(DataPoint(counter=2, timestamp=t2, value=7))
>>> data2.append(DataPoint(counter=2, timestamp=t2, value=8))
>>> data2.append(DataPoint(counter=2, timestamp=t2, value=9))
>>> data2
7@1756124450.2564 #2
8@1756124450.2564 #2
9@1756124450.2564 #2
>>> data1 + data2
4@1756124450.256398 #1
5@1756124450.256398 #1
6@1756124450.256398 #1
7@1756124450.2564 #2
8@1756124450.2564 #2
9@1756124450.2564 #2
append(data)

Append a value to the channel data

Parameters:

data (DataPoint) – The data point that should be added to the channel data

Return type:

None

Examples

Add some data points to a channel data object

>>> data = ChannelData()
>>> t1 = 1756124450.256398
>>> t2 = t1 + 0.000003
>>> data.append(DataPoint(counter=255, timestamp=t1, value=10))
>>> data.append(DataPoint(counter=1, timestamp=t2, value=20))
>>> data
10@1756124450.256398 #255
20@1756124450.256401 #1
values()

Return a list of all the values of the channel data

Return type:

list[float]

Returns:

The values of the channel data

Examples

Return the values of example channel data

>>> data = ChannelData()
>>> t1 = 1756124450.256398
>>> t2 = t1 + 0.000003
>>> data.append(DataPoint(counter=10, timestamp=t1, value=10))
>>> data.append(DataPoint(counter=11, timestamp=t2, value=20))
>>> data.values()
[10, 20]
class icotronic.measurement.MeasurementData(configuration)

Measurement data

Parameters:

configuration (StreamingConfiguration) – The streaming configuration that was used to collect the measurement data

append(data)

Append some streaming data to the measurement

Parameters:

data (StreamingData) – The streaming data that should be added to the measurement

Return type:

None

Examples

Append some streaming data to a measurement

>>> config = StreamingConfiguration(first=True, second=True,
...                                 third=True)
>>> data = MeasurementData(config)
>>> data
Channel 1 enabled, Channel 2 enabled, Channel 3 enabled
>>> s1 = StreamingData(values=[4, 5, 3], counter=15,
...                    timestamp=1756197008.776551)
>>> data.append(s1)
>>> data
Channel 1 enabled, Channel 2 enabled, Channel 3 enabled
[4, 5, 3]@1756197008.776551 #15
apply(conversion)

Apply functions to the values stored in the measurement

Parameters:

conversion (Conversion) – The conversion functions that will be applied to the measurement

Return type:

MeasurementData

Returns:

The measurement data itself, after the conversion was applied

Examples

Apply functions to some measurement data with one active channel

>>> config = StreamingConfiguration(first=False, second=True,
...                                 third=False)
>>> data = MeasurementData(config)
>>> s1 = StreamingData(values=[1, 20, 81], counter=22,
...                    timestamp=1756125747.528234)
>>> s2 = StreamingData(values=[50, 1, 29], counter=25,
...                    timestamp=1756125747.528254)
>>> data.append(s1)
>>> data.append(s2)
>>> plus_two = (lambda value: value + 2)
>>> conversion = Conversion(second=plus_two)
>>> data
Channel 1 disabled, Channel 2 enabled, Channel 3 disabled
[1, 20, 81]@1756125747.528234 #22
[50, 1, 29]@1756125747.528254 #25
>>> data.apply(conversion)
Channel 1 disabled, Channel 2 enabled, Channel 3 disabled
[3, 22, 83]@1756125747.528234 #22
[52, 3, 31]@1756125747.528254 #25

Apply functions to some measurement data with two active channels

>>> config = StreamingConfiguration(first=True, second=False,
...                                 third=True)
>>> data = MeasurementData(config)
>>> s1 = StreamingData(values=[1, 2], counter=255,
...                    timestamp=1756125747.528234)
>>> s2 = StreamingData(values=[3, 4], counter=0,
...                    timestamp=1756125747.528237)
>>> data.append(s1)
>>> data.append(s2)
>>> conversion = Conversion(third=plus_two)
>>> data
Channel 1 enabled, Channel 2 disabled, Channel 3 enabled
[1, 2]@1756125747.528234 #255
[3, 4]@1756125747.528237 #0
>>> data.apply(conversion)
Channel 1 enabled, Channel 2 disabled, Channel 3 enabled
[1, 4]@1756125747.528234 #255
[3, 6]@1756125747.528237 #0

Apply functions to some measurement data with three active channels

>>> config = StreamingConfiguration(first=True, second=True,
...                                 third=True)
>>> data = MeasurementData(config)
>>> s1 = StreamingData(values=[4, 5, 3], counter=15,
...                    timestamp=1756197008.776551)
>>> s2 = StreamingData(values=[8, 10, 6], counter=16,
...                    timestamp=1756197008.776559)
>>> data.append(s1)
>>> data.append(s2)
>>> double = (lambda value: value * 2)
>>> conversion = Conversion(first=double, third=plus_two)
>>> data
Channel 1 enabled, Channel 2 enabled, Channel 3 enabled
[4, 5, 3]@1756197008.776551 #15
[8, 10, 6]@1756197008.776559 #16
>>> data.apply(conversion)
Channel 1 enabled, Channel 2 enabled, Channel 3 enabled
[8, 5, 5]@1756197008.776551 #15
[16, 10, 8]@1756197008.776559 #16
dataloss()

Get measurement dataloss based on message counters

Return type:

float

Returns:

The overall amount of dataloss as number between 0 (no data loss) and 1 (all data lost).

extend(data)

Extend this measurement data with some other measurement data

Parameters:

data (MeasurementData) – The measurement data that should be added to this measurement

Return type:

None

Examples

Extend measurement data with other measurement data

>>> config = StreamingConfiguration(first=True, second=False,
...                                 third=True)
>>> data1 = MeasurementData(config)
>>> s1 = StreamingData(values=[1, 2], counter=255,
...                    timestamp=1756125747.528234)
>>> s2 = StreamingData(values=[3, 4], counter=0,
...                    timestamp=1756125747.528237)
>>> data1.append(s1)
>>> data1.append(s2)
>>> data1
Channel 1 enabled, Channel 2 disabled, Channel 3 enabled
[1, 2]@1756125747.528234 #255
[3, 4]@1756125747.528237 #0
>>> data2 = MeasurementData(config)
>>> s3 = StreamingData(values=[10, 20], counter=1,
...                    timestamp=1756125747.678912)
>>> data2.append(s3)
>>> data2
Channel 1 enabled, Channel 2 disabled, Channel 3 enabled
[10, 20]@1756125747.678912 #1
>>> data1.extend(data2)
>>> data1
Channel 1 enabled, Channel 2 disabled, Channel 3 enabled
[1, 2]@1756125747.528234 #255
[3, 4]@1756125747.528237 #0
[10, 20]@1756125747.678912 #1
first()

Get all data of the first measurement channel

Return type:

ChannelData

Returns:

Data values for the first measurement channel

Examples

Get first channel data of measurement with two enabled channels

>>> config = StreamingConfiguration(first=True, second=True,
...                                 third=False)
>>> data = MeasurementData(config)
>>> s1 = StreamingData(values=[1, 2], counter=255,
...                    timestamp=1756125747.528234)
>>> s2 = StreamingData(values=[3, 4], counter=0,
...                    timestamp=1756125747.528237)
>>> data.append(s1)
>>> data.append(s2)
>>> data.first()
1@1756125747.528234 #255
3@1756125747.528237 #0
>>> data.second()
2@1756125747.528234 #255
4@1756125747.528237 #0
>>> data.third()

Get first channel data of measurement with one enabled channel

>>> config = StreamingConfiguration(first=True, second=False,
...                                 third=False)
>>> data = MeasurementData(config)
>>> s1 = StreamingData(values=[1, 2, 3], counter=10,
...                    timestamp=1756126628.820695)
>>> s2 = StreamingData(values=[4, 5, 6], counter=20,
...                    timestamp=1756126628.8207)
>>> data.append(s1)
>>> data.append(s2)
>>> data.first()
1@1756126628.820695 #10
2@1756126628.820695 #10
3@1756126628.820695 #10
4@1756126628.8207 #20
5@1756126628.8207 #20
6@1756126628.8207 #20
>>> data.second()

>>> data.third()
second()

Get all data of the second measurement channel

Return type:

ChannelData

Returns:

Data values for the second measurement channel

Examples

Get second channel data of measurement with two enabled channels

>>> config = StreamingConfiguration(first=False, second=True,
...                                 third=True)
>>> data = MeasurementData(config)
>>> s1 = StreamingData(values=[1, 2], counter=255,
...                    timestamp=1756125747.528234)
>>> s2 = StreamingData(values=[3, 4], counter=0,
...                    timestamp=1756125747.528237)
>>> data.append(s1)
>>> data.append(s2)
>>> data.first()

>>> data.second()
1@1756125747.528234 #255
3@1756125747.528237 #0
>>> data.third()
2@1756125747.528234 #255
4@1756125747.528237 #0

Get second channel data of measurement with one enabled channel

>>> config = StreamingConfiguration(first=False, second=True,
...                                 third=False)
>>> data = MeasurementData(config)
>>> s1 = StreamingData(values=[1, 2, 3], counter=10,
...                    timestamp=1756126628.820695)
>>> s2 = StreamingData(values=[4, 5, 6], counter=20,
...                    timestamp=1756126628.8207)
>>> data.append(s1)
>>> data.append(s2)
>>> data.first()

>>> data.second()
1@1756126628.820695 #10
2@1756126628.820695 #10
3@1756126628.820695 #10
4@1756126628.8207 #20
5@1756126628.8207 #20
6@1756126628.8207 #20
>>> data.third()
third()

Get all data of the third measurement channel

Return type:

ChannelData

Returns:

Data values for the third measurement channel

Examples

Get third channel data of measurement with two enabled channels

>>> config = StreamingConfiguration(first=True, second=False,
...                                 third=True)
>>> data = MeasurementData(config)
>>> s1 = StreamingData(values=[1, 2], counter=255,
...                    timestamp=1756125747.528234)
>>> s2 = StreamingData(values=[3, 4], counter=0,
...                    timestamp=1756125747.528237)
>>> data.append(s1)
>>> data.append(s2)
>>> data.first()
1@1756125747.528234 #255
3@1756125747.528237 #0
>>> data.second()

>>> data.third()
2@1756125747.528234 #255
4@1756125747.528237 #0

Get third channel data of measurement with one enabled channel

>>> config = StreamingConfiguration(first=False, second=False,
...                                 third=True)
>>> data = MeasurementData(config)
>>> s1 = StreamingData(values=[1, 2, 3], counter=10,
...                    timestamp=1756126628.820695)
>>> s2 = StreamingData(values=[4, 5, 6], counter=20,
...                    timestamp=1756126628.8207)
>>> data.append(s1)
>>> data.append(s2)
>>> data.first()

>>> data.second()

>>> data.third()
1@1756126628.820695 #10
2@1756126628.820695 #10
3@1756126628.820695 #10
4@1756126628.8207 #20
5@1756126628.8207 #20
6@1756126628.8207 #20
values()

Return all the values stored in the measurement

The order of values is the same as the one of the underlying streaming data.

Return type:

list[float]

Returns:

A list containing all measured values

Examples

Get the values of some example measurement data

>>> config = StreamingConfiguration(first=True, second=False,
...                                 third=True)
>>> data = MeasurementData(config)
>>> s1 = StreamingData(values=[1, 2], counter=0,
...                    timestamp=1756125747.528234)
>>> s2 = StreamingData(values=[3, 4], counter=1,
...                    timestamp=1756125747.528237)
>>> data.append(s1)
>>> data.append(s2)
>>> data.values()
[1, 2, 3, 4]
class icotronic.measurement.Conversion(first=None, second=None, third=None)

Conversion functions for measurement data

Parameters:
  • first (Callable[[float], float] | None) – The conversion function for the first channel

  • second (Callable[[float], float] | None) – The conversion function for the second channel

  • third (Callable[[float], float] | None) – The conversion function for the third channel

Examples

Create a conversion object that doubles values of the first channel

>>> double = lambda value: value * 2
>>> conversion = Conversion(second=double)

Storage

class icotronic.measurement.storage.Storage(filepath, channels=None)

Context manager class for storing measurement data in HDF5 format

Parameters:
  • filepath (Path | str) – The filepath of the HDF5 file in which this object should store measurement data

  • channels (StreamingConfiguration | None) – All channels for which data should be collected or None, if the axes data should be taken from an existing valid file at filepath.

Examples

Create new file

>>> filepath = Path("test.hdf5")
>>> with Storage(filepath,
...              channels=StreamingConfiguration(first=True)
... ) as storage:
...     pass

Opening an non empty file but still providing channel info should fail

>>> with Storage(filepath,
...             channels=StreamingConfiguration(first=True)
... ) as storage:
...     pass
Traceback (most recent call last):
    ...
ValueError: File “...” exist but channels parameter is not None
>>> filepath.unlink()
close()

Close the HDF file

Return type:

None

open()

Open and initialize the HDF file for writing

Return type:

StorageData

class icotronic.measurement.storage.StorageData(file_handle, channels=None)

Store HDF acceleration data

Parameters:
  • file_handle (File) – The HDF file that should store the data

  • channels (StreamingConfiguration | None) – All channels for which data should be collected or None, if the axes data should be taken from an existing valid file at filepath.

Examples

Create new data

>>> filepath = Path("test.hdf5")
>>> streaming_data = StreamingData(values=[1, 2, 3], counter=1,
...                                timestamp=4306978.449)
>>> with Storage(filepath, StreamingConfiguration(first=True)) as data:
...     data.add_streaming_data(streaming_data)

Read from existing file

>>> with Storage(filepath) as data:
...     print(data.dataloss_stats())
(1, 0)
>>> filepath.unlink()
add_measurement_data(measurement_data)

Add streaming data to the storage object

Parameters:

measurement_data (MeasurementData) – The streaming data that should be added to the storage

Return type:

None

Examples

Import required library code

>>> from tempfile import NamedTemporaryFile
>>> from icotronic.measurement import MeasurementData

Store measurement data for two enabled channels

>>> two_channels = StreamingConfiguration(first=True, second=False,
...                                       third=True)
>>> s1 = StreamingData(values=[10, -10], counter=1, timestamp=1)
>>> s2 = StreamingData(values=[20, -20], counter=2, timestamp=2)
>>> s3 = StreamingData(values=[30, -30], counter=2, timestamp=2)
>>> data = MeasurementData(two_channels)
>>> data.append(s1)
>>> data.append(s2)
>>> data.append(s3)
>>> with NamedTemporaryFile(suffix=".hdf5",
...                         delete_on_close=False) as temp:
...     with Storage(temp.name, two_channels) as storage:
...         storage.add_measurement_data(data)
...         # Normally the class takes care about when to store
...         # back data to the disk itself. We do a manual flush
...         # here to check the number of stored items.
...         storage.acceleration.flush()
...         print(storage.acceleration.nrows)
3
add_streaming_data(streaming_data)

Add streaming data to the storage object

Parameters:

streaming_data (StreamingData) – The streaming data that should be added to the storage

Return type:

None

Examples

Store streaming data for single channel

>>> channel3 = StreamingConfiguration(first=False, second=False,
...                                   third=True)
>>> data1 = StreamingData(values=[1, 2, 3], counter=21,
...                       timestamp=1)
>>> data2 = StreamingData(values=[4, 5, 6], counter=22,
...                       timestamp=2)
>>> filepath = Path("test.hdf5")
>>> with Storage(filepath, channel3) as storage:
...     storage.add_streaming_data(data1)
...     storage.add_streaming_data(data2)
...     # Normally the class takes care about when to store back
...     # data to the disk itself. We do a manual flush here to
...     # check the number of stored items.
...     storage.acceleration.flush()
...     print(storage.acceleration.nrows)
6
>>> filepath.unlink()

Store streaming data for three channels

>>> all = StreamingConfiguration(first=True, second=True,
...                              third=True)
>>> data1 = StreamingData(values=[1, 2, 3], counter=21,
...                       timestamp=1)
>>> data2 = StreamingData(values=[4, 5, 6], counter=22,
...                       timestamp=2)
>>> with Storage(filepath, all) as storage:
...     storage.add_streaming_data(data1)
...     storage.add_streaming_data(data2)
...     storage.acceleration.flush()
...     print(storage.acceleration.nrows)
2
>>> filepath.unlink()
dataloss()

Determine (minimum) data loss

Return type:

float

Returns:

Amount of lost messages divided by all messages (lost and retrieved)

Examples

Import required library code

>>> from math import isclose

Calculate data loss for an example storage file

>>> def calculate_dataloss():
...     filepath = Path("test.hdf5")
...     with Storage(filepath, StreamingConfiguration(
...                  first=True)) as storage:
...         for counter in range(256):
...             storage.add_streaming_data(
...                 StreamingData(values=[1, 2, 3],
...                               counter=counter,
...                               timestamp=counter/10))
...         for counter in range(128, 256):
...             storage.add_streaming_data(
...                 StreamingData(values=[4, 5, 6],
...                               counter=counter,
...                               timestamp=(255 + counter)/10))
...
...         dataloss = storage.dataloss()
...     filepath.unlink()
...     return dataloss
>>> isclose(0.25, calculate_dataloss(), rel_tol=0.005)
True
dataloss_stats()

Determine number of lost and received messages

Return type:

tuple[int, int]

Returns:

Tuple containing the number of received and the number of lost messages

Examples

Get data loss statistics for an example file

>>> def calculate_dataloss_stats():
...     filepath = Path("test.hdf5")
...     with Storage(filepath,StreamingConfiguration(
...                  first=True)) as storage:
...         for counter in range(256):
...             storage.add_streaming_data(
...                 StreamingData(values=[1, 2, 3],
...                               counter=counter,
...                               timestamp=counter/10))
...         for counter in range(128, 256):
...             storage.add_streaming_data(
...                 StreamingData(values=[4, 5, 6],
...                               counter=counter,
...                               timestamp=(255 + counter)/10))
...
...         stats = storage.dataloss_stats()
...     filepath.unlink()
...     return stats
>>> retrieved, lost = calculate_dataloss_stats()
>>> retrieved
384
>>> lost
128
measurement_time()

Get the measurement time

Note

This method returns the value of the last timestamp. If there was dataloss at the end of the measurement, then the real measurement time might have been longer.

Return type:

int

Returns:

The measurement time in microseconds

sampling_frequency()

Calculate sampling frequency of measurement data

Return type:

float

Returns:

Sampling frequency (of a single data channel) in Hz

Examples

Import required library code

>>> from math import isclose

Calculate sampling frequency for an example file

>>> def calculate_sampling_frequency():
...     filepath = Path("test.hdf5")
...     with Storage(filepath, StreamingConfiguration(
...             first=True, second=True, third=True)) as storage:
...         storage.add_streaming_data(
...             StreamingData(values=[1, 2, 3],
...                           counter=1,
...                           timestamp=0))
...         storage.add_streaming_data(
...             StreamingData(values=[1, 2, 3],
...                           counter=2,
...                           timestamp=1))
...
...         sampling_frequency = storage.sampling_frequency()
...     filepath.unlink()
...     return sampling_frequency
>>> calculate_sampling_frequency()
2.0
write_sample_rate(adc_configuration)

Store the sample rate of the ADC

Parameters:

adc_configuration (ADCConfiguration) – The current ADC configuration of the sensor node

Return type:

None

Examples

Write and read sample rate metadata in example HDF file

>>> filepath = Path("test.hdf5")
>>> adc_configuration = ADCConfiguration(
...     set=True,
...     prescaler=2,
...     acquisition_time=16,
...     oversampling_rate=256)
>>> with Storage(filepath,
...              StreamingConfiguration(first=True)) as storage:
...     storage.write_sample_rate(adc_configuration)
...     sample_rate = storage.acceleration.attrs["Sample_Rate"]
...     print(sample_rate)
1724.14 Hz (Prescaler: 2, Acquisition Time: 16,
Oversampling Rate: 256)
>>> filepath.unlink()
write_sensor_range(sensor_range_in_g)

Add metadata about sensor range

This method assumes that sensor have a symmetric measurement range (e.g. a sensor with a range of 200 g measures from - 100 g up to + 100 g).

Parameters:

sensor_range_in_g (float) – The measurement range of the sensor in multiples of g

Return type:

None

Examples

Add sensor range metadata to example file

>>> filepath = Path("test.hdf5")
>>> with Storage(filepath,
...              StreamingConfiguration(third=True)) as storage:
...     storage.write_sensor_range(200)
...     print(storage.acceleration.attrs["Sensor_Range"])
± 100 g₀
>>> filepath.unlink()

ADC

class icotronic.can.adc.ADCConfiguration(*data, set=None, prescaler=None, acquisition_time=None, oversampling_rate=None, reference_voltage=None)

Support for reading and writing analog digital converter configuration

Parameters:
  • *data (bytearray | list[int]) – A list containing the (first five) bytes of the ADC configuration

  • set (bool | None) – Specifies if we want to set or retrieve (get) the ADC configuration

  • prescaler (int | None) – The ADC prescaler value (1 – 127); default: 2

  • acquisition_time (int | None) – The acquisition time in number of cycles (1, 2, 3, 4, 8, 16, 32, … , 256); default: 8

  • oversampling_rate (int | None) – The ADC oversampling rate (1, 2, 4, 8, … , 4096); default: 64

  • reference_voltage (float | None) – The ADC reference voltage in Volt (1.25, 1.65, 1.8, 2.1, 2.2, 2.5, 2.7, 3.3, 5, 6.6); default: 3.3

Examples

Create simple ADC configuration from scratch

>>> ADCConfiguration(prescaler=2,
...     acquisition_time=4,
...     oversampling_rate=64)
Get, Prescaler: 2, Acquisition Time: 4, Oversampling Rate: 64,
Reference Voltage: 3.3 V
property acquisition_time: int

Get the acquisition time

Returns:

The acquisition time

Examples

Get initialised acquisition time

>>> config = ADCConfiguration(acquisition_time=2)
>>> config.acquisition_time
2

Get acquisition time set via property

>>> config.acquisition_time = 128
>>> config.acquisition_time
128

Trying to set an unsupported acquisition time will fail

>>> config.acquisition_time = 5
Traceback (most recent call last):
   ...
ValueError: Acquisition time of “5” out of range, please use ...
property oversampling_rate: int

Get the oversampling rate

Returns:

The oversampling rate

Examples

Get initialised oversampling rate

>>> config = ADCConfiguration(oversampling_rate=128)
>>> config.oversampling_rate
128

Get oversampling rate set via property

>>> config.oversampling_rate = 512
>>> config.oversampling_rate
512

Trying to set an unsupported oversampling rate will fail

>>> config.oversampling_rate = 3
Traceback (most recent call last):
   ...
ValueError: Oversampling rate of “3” out of range, please use ...
property prescaler: int

Get the prescaler value

Returns:

The prescaler value

Examples

Get initialised prescaler

>>> config = ADCConfiguration(prescaler=127)
>>> config.prescaler
127

Get prescaler set via property

>>> config.prescaler = 20
>>> config.prescaler
20

Trying to set an unsupported prescaler will fail

>>> config.prescaler = 128
Traceback (most recent call last):
   ...
ValueError: Prescaler value of “128” out of range, please use ...
property reference_voltage: float

Get the reference voltage

Returns:

The reference voltage in Volt

Examples

Set and get different reference voltage values

>>> config = ADCConfiguration(reference_voltage=3.3)
>>> config.reference_voltage
3.3
>>> config.reference_voltage = 6.6
>>> config.reference_voltage
6.6
>>> config = ADCConfiguration(reference_voltage=6.6)
>>> config.reference_voltage
6.6
>>> config.reference_voltage = 1.8
>>> config.reference_voltage
1.8
>>> config = ADCConfiguration(reference_voltage=1.8)
>>> config.reference_voltage
1.8

Trying to set a unsupported reference voltage will fail

>>> config.reference_voltage = 0
Traceback (most recent call last):
   ...
ValueError: Reference voltage of “0” V out of range, please use ...
sample_rate()

Calculate the sampling rate for the current ADC configuration

Return type:

float

Returns:

The calculated sample rate

Examples

Get sampling rates based on ADC attribute values

>>> round(ADCConfiguration(prescaler=2, acquisition_time=8,
...                        oversampling_rate=64).sample_rate())
9524
>>> round(ADCConfiguration(prescaler=8, acquisition_time=8,
...                        oversampling_rate=64).sample_rate())
3175
>>> round(ADCConfiguration(reference_voltage=5.0,
...                        prescaler=16,
...                        acquisition_time=8,
...                        oversampling_rate=128).sample_rate())
840

Sensor Configuration

class icotronic.can.SensorConfiguration(first=0, second=0, third=0)

Used to store the configuration of the three sensor channels

Parameters:
  • first (int) – The sensor number for the first measurement channel

  • second (int) – The sensor number for the second measurement channel

  • third (int) – The sensor number for the third measurement channel

Examples

Create an example sensor configuration

>>> SensorConfiguration(first=0, second=1, third=2)
M1: None, M2: S1, M3: S2

Initializing a sensor configuration with incorrect values will fail

>>> SensorConfiguration(first=256, second=1, third=2)
Traceback (most recent call last):
...
ValueError: Incorrect value for first channel: “256”
>>> SensorConfiguration(first=0, second=1, third=-1)
Traceback (most recent call last):
...
ValueError: Incorrect value for third channel: “-1”
check()

Check that at least one measurement channel is enabled

Raises:

ValueError – if none of the measurement channels is enabled

Examples

>>> SensorConfiguration(second=1).check()
>>> SensorConfiguration().check()
Traceback (most recent call last):
    ...
ValueError: At least one measurement channel has to be enabled
disable_channel(first=False, second=False, third=False)

Disable certain (measurement) channels

Parameters:
  • first (bool) – Specifies if the first measurement channel should be disabled or not

  • second (bool) – Specifies if the second measurement channel should be disabled or not

  • third (bool) – Specifies if the third measurement channel should be disabled or not

Return type:

None

empty()

Check if the sensor configuration is empty

In an empty sensor configuration all of the channels are disabled.

Return type:

bool

Returns:

True, if all channels are disabled, False otherwise

Examples

Check if some example configurations are empty or not

>>> SensorConfiguration(first=3).empty()
False
>>> SensorConfiguration().empty()
True
>>> SensorConfiguration(third=0).empty()
True
property first: int

Get the sensor for the first channel

Returns:

The sensor number of the first channel

Examples

Get the sensor number for the first channel of a sensor config

>>> SensorConfiguration(first=1, second=3, third=2).first
1
requires_channel_configuration_support()

Check if the sensor configuration requires channel config support

Return type:

bool

Returns:

  • True, if the configuration requires hardware that has support for changing the channel configuration

  • False, otherwise

Examples

Check if example sensor configs require channel config support

>>> SensorConfiguration(first=1, second=3, third=2
...     ).requires_channel_configuration_support()
True
>>> SensorConfiguration(first=1, second=0, third=1
...     ).requires_channel_configuration_support()
True
>>> SensorConfiguration(first=1, second=2, third=3
...     ).requires_channel_configuration_support()
False
>>> SensorConfiguration().requires_channel_configuration_support()
False
property second: int

Get the sensor for the second channel

Returns:

The sensor number of the second channel

Examples

Get the sensor number for the second channel of a sensor config

>>> SensorConfiguration(first=1, second=3, third=2).second
3
streaming_configuration()

Get a streaming configuration that represents this config

Return type:

StreamingConfiguration

Returns:

A stream configuration where

  • every channel that is enabled in the sensor configuration is enabled, and

  • every channel that is disables in the sensor configuration is disabled.

Examples

Get the streaming configuration for some example channel configs

>>> SensorConfiguration(second=1).streaming_configuration()
Channel 1 disabled, Channel 2 enabled, Channel 3 disabled
>>> SensorConfiguration(first=10, third=2
...                    ).streaming_configuration()
Channel 1 enabled, Channel 2 disabled, Channel 3 enabled
property third: int

Get the sensor for the third channel

Returns:

The sensor number of the third channel

Examples

Get the sensor number for the third channel of a sensor config

>>> SensorConfiguration(first=1, second=3, third=2).third
2

Examples

For code examples, please check out the examples directory: