API
Usage
Connection
Connecting to STU
To communicate with the ICOtronic system use the the async context manager of the Connection class to open and close the connection to the STU:
>>> from asyncio import run
>>> from icotronic.can import Connection
>>> async def create_and_shutdown_connection():
... async with Connection() as stu:
... ... # ← Your code goes here
>>> run(create_and_shutdown_connection())
Connecting to Sensor Node
To connect to a sensor node (e.g. SHA, SMH, STH) use the async context manager of the coroutine STU.connect_sensor_node(). To connect to a node you need to know one of the identifiers of the node. In the example below we connect to a node with the name Test-STH:
>>> from asyncio import run
>>> from netaddr import EUI
>>> from icotronic.can import Connection
>>> async def connect_to_sensor_node(identifier):
... async with Connection() as stu:
... async with stu.connect_sensor_node(identifier) as sensor_node:
... return await sensor_node.get_mac_address()
>>> mac_address = run(connect_to_sensor_node("Test-STH"))
>>> isinstance(mac_address, EUI)
True
By default STU.connect_sensor_node() assumes that you want to connect to a generic sensor node (e.g. a sensory milling head (SMH)). To connect to an STH (a sensor node with additional functionality), use STH for the sensor_node_class parameter:
>>> from asyncio import run
>>> from icotronic.can import Connection, STH
>>> async def get_sensor_range(identifier):
... async with Connection() as stu:
... async with stu.connect_sensor_node(identifier, STH) as sth:
... return await sth.get_acceleration_sensor_range_in_g()
>>> sensor_range = run(get_sensor_range("Test-STH"))
>>> 0 <= sensor_range <= 200
True
Streaming
Reading Data
After you connected to the sensor node use the coroutine SensorNode.open_data_stream() to open the data stream and an async for statement to iterate over the received streaming data. The following code:
>>> from asyncio import run
>>> from icotronic.can import Connection
>>> from icotronic.can import StreamingConfiguration
>>> async def read_streaming_data():
... async with Connection() as stu:
... async with stu.connect_sensor_node("Test-STH") as sensor_node:
... channels = StreamingConfiguration(first=True)
... async with sensor_node.open_data_stream(channels) as stream:
... async for data, lost_messages in stream:
... return data
>>> data = run(read_streaming_data())
>>> # Example Output: [32579, 32637, 32575]@1724251001.976368 #123
>>> data
[..., ..., ...]@... #...
>>> len(data.values)
3
>>> isinstance(data.timestamp, float)
True
>>> all(0 <= value <= 2**16-1 for value in data.values)
True
>>> 0 <= data.counter <= 255
True
connects to a node called
Test-STH,opens a data stream for the first measurement channel,
receives a single streaming data object,
prints its representation and
shows some of the properties of the streaming data object.
The data returned by the async for (stream) is an object of the class StreamingData with the following attributes:
StreamingData.values: a list containing either two or three values,StreamingData.timestamp: the timestamp when the data was collectedStreamingData.counter: a cyclic message counter (0 – 255) that can be used to detect data loss
Note
The amount of data stored in StreamingData.values depends on the enabled streaming channels. For the recommended amount of one or three enabled channels the list contains three values. For
one enabled channel all three values belong to the same channel, while
for the three enabled channels
the first value belongs to the first channel,
the second value belongs to the second channel,
and the third value belongs to the third channel.
By default StreamingData.values contains 16-bit ADC values. To convert the data into multiples of g (the standard gravity) you can
use the coroutine
STH.get_acceleration_conversion_function()to retrieve a function that converts 16-bit ADC values values into multiples of g and thenuse this function to convert streaming data with the method
StreamingData.apply().
In the example below we convert the first retrieved streaming data object and return it:
>>> from asyncio import run
>>> from icotronic.can import Connection
>>> from icotronic.can import STH, StreamingConfiguration
>>> async def read_streaming_data_g():
... async with Connection() as stu:
... async with stu.connect_sensor_node("Test-STH", STH) as sth:
... conversion_to_g = (await
... sth.get_acceleration_conversion_function())
... channels = StreamingConfiguration(first=True)
... async with sth.open_data_stream(channels) as stream:
... async for data, lost_messages in stream:
... data.apply(conversion_to_g)
... return data
>>> streaming_data = run(read_streaming_data_g())
>>> len(streaming_data.values)
3
>>> all([-100 <= value <= 100 for value in streaming_data.values])
True
Collecting Multiple Values
While working directly with the class StreamingData might make sense for small projects often you:
want to collect a bunch of data values,
work on data for a specific measurement channel, or
convert data values.
For that case you can use the class MeasurementData. The example code below
collects data for all three measurement channels,
applies a conversion into multiples of g for the first channel (only) and
checks that the values for the first channel are all between -2 g and 2 g.
>>> from asyncio import run
>>> from time import monotonic
>>> from icotronic.can import Connection, STH, StreamingConfiguration
>>> from icotronic.measurement import Conversion, MeasurementData
>>> async def collect_streaming_data(identifier):
... async with Connection() as stu:
... async with stu.connect_sensor_node(identifier, STH) as sth:
... conversion_to_g = (await
... sth.get_acceleration_conversion_function())
... all_channels = StreamingConfiguration(
... first=True, second=True, third=True
... )
... measurement_data = MeasurementData(all_channels)
... async with sth.open_data_stream(all_channels) as stream:
... messages = 10
... async for data, _ in stream:
... measurement_data.append(data)
... messages -= 1
... if messages <= 0:
... break
... measurement_data.apply(Conversion(first=conversion_to_g))
... return measurement_data
>>> data = run(collect_streaming_data(identifier="Test-STH"))
>>> first_channel_in_g = data.first()
>>> all(-2 <= data.value <= 2 for data in first_channel_in_g)
True
Converting Data Values
The class Conversion allows you to apply different functions to the different channels of the streaming data (attributes first, second and third). Each function gets a measurement value (of type float) and should return a value of type float. If you do not want to apply any conversion to a certain channel you can use the (default value) of None for the Conversion channel attribute. In the example below we apply a Conversion object that
does not change the data for the first channel,
multiplies the values of the second channel by two and
adds the value
10to the data of the second channel.
>>> from icotronic.can import StreamingConfiguration, StreamingData
>>> from icotronic.measurement import Conversion, MeasurementData
>>> measurement_data = MeasurementData(
... StreamingConfiguration(first=True, second=True, third=True))
>>> s1 = StreamingData(counter=1, timestamp=1757946559.499677,
... values=[1.0, 2.0, 3.0])
>>> s2 = StreamingData(counter=2, timestamp=1757946559.499680,
... values=[4.0, 5.0, 6.0])
>>> measurement_data.append(s1)
>>> measurement_data.append(s2)
>>> measurement_data
Channel 1 enabled, Channel 2 enabled, Channel 3 enabled
[1.0, 2.0, 3.0]@1757946559.499677 #1
[4.0, 5.0, 6.0]@1757946559.49968 #2
>>> measurement_data.first()
1.0@1757946559.499677 #1
4.0@1757946559.49968 #2
>>> measurement_data.second()
2.0@1757946559.499677 #1
5.0@1757946559.49968 #2
>>> measurement_data.third()
3.0@1757946559.499677 #1
6.0@1757946559.49968 #2
>>> def multiply_by_two(value):
... return value * 2
>>> conversion = Conversion(second=multiply_by_two,
... third=(lambda value: value + 10))
>>> measurement_data.apply(conversion)
Channel 1 enabled, Channel 2 enabled, Channel 3 enabled
[1.0, 4.0, 13.0]@1757946559.499677 #1
[4.0, 10.0, 16.0]@1757946559.49968 #2
>>> measurement_data.first()
1.0@1757946559.499677 #1
4.0@1757946559.49968 #2
>>> measurement_data.second()
4.0@1757946559.499677 #1
10.0@1757946559.49968 #2
>>> measurement_data.third()
13.0@1757946559.499677 #1
16.0@1757946559.49968 #2
Storing Data
If you want to store streaming data for later use you can use the Storage class to open a context manager that lets you store data as HDF5 file via the method add_streaming_data() of the class StorageData. The code below shows how to store one second of measurement data in a file called measurement.hdf5.
>>> from asyncio import run
>>> from pathlib import Path
>>> from time import monotonic
>>> from icotronic.can import Connection, STH, StreamingConfiguration
>>> from icotronic.measurement.storage import Storage
>>> async def store_streaming_data(identifier, storage):
... async with Connection() as stu:
... async with stu.connect_sensor_node(identifier, STH) as sth:
... conversion_to_g = (await
... sth.get_acceleration_conversion_function())
...
... # Store acceleration range as metadata
... storage.write_sensor_range(
... await sth.get_acceleration_sensor_range_in_g()
... )
... # Store sampling rate (and ADC configuration as metadata)
... storage.write_sample_rate(await sth.get_adc_configuration())
...
... async with sth.open_data_stream(
... storage.streaming_configuration
... ) as stream:
... # Read data for about one seconds
... end = monotonic() + 1
... async for data, _ in stream:
... # Convert from ADC bit value into multiples of g
... storage.add_streaming_data(
... data.apply(conversion_to_g))
... if monotonic() > end:
... break
>>> filepath = Path("measurement.hdf5") # Store data in HDF5 file
>>> with Storage(filepath, StreamingConfiguration(first=True)) as storage:
... run(store_streaming_data("Test-STH", storage))
Since HDF5 is a standard file format you can use general purpose tools such as HDFView to view the stored data. To specifically analyze the data produced by the ICOtronic package you can also use one of the scripts of the ICOlyzer package.
For more information about the measurement format, please take a look at the section “Measurement Data” of the general ICOtronic package documentation.
Determining Data Loss
Sometimes the
connection to your sensor node might be bad or
code might run too slow to retrieve/process streaming data.
In both cases there will be some form of data loss. The ICOtronic library currently takes multiple measures to detect data loss.
Bad Connection
The iterator for streaming data AsyncStreamBuffer will raise a StreamingTimeoutError, if there is no streaming data for a certain amount of time (default: 5 seconds). The class AsyncStreamBuffer also provides access to statistics that can be used to determine the amount of lost data. For example, if you iterate through the streaming messages with async for, then in addition to the streaming data, the iterator will also return the amount of lost messages since the last successfully received message (lost_messages in the example below):
async with sensor_node.open_data_stream(channels) as stream:
async for data, lost_messages in stream:
if lost_messages > 0:
print(f"Lost {lost_messages} messages!")
To access the overall data quality, since the start of streaming you can use the method AsyncStreamBuffer.dataloss(). The example code below shows how to use this method:
>>> from asyncio import run
>>> from time import monotonic
>>> from icotronic.can import Connection, StreamingConfiguration
>>> async def determine_data_loss(identifier):
... async with Connection() as stu:
... async with stu.connect_sensor_node(identifier) as sensor_node:
... end = monotonic() + 1 # Read data for roughly one second
... channels = StreamingConfiguration(first=True)
... async with sensor_node.open_data_stream(channels) as stream:
... async for data, lost_messages in stream:
... if monotonic() > end:
... break
...
... return stream.dataloss()
>>> data_loss = run(determine_data_loss(identifier="Test-STH"))
>>> data_loss < 0.2 # We assume that the data loss was less than 20 %
True
If you want to calculate the amount of data loss for a specific time-span you can use the method AsyncStreamBuffer.reset_stats() to reset the message statistics at the start of the time-span. In the following example we stream data for (roughly) 2.1 seconds and return a list with the amount of data loss over periods of 0.5 seconds:
>>> from asyncio import run
>>> from time import monotonic
>>> from icotronic.can import Connection, StreamingConfiguration
>>> async def determine_data_loss(identifier):
... async with Connection() as stu:
... async with stu.connect_sensor_node(identifier) as sensor_node:
... start = monotonic()
... end = start + 2.1
... last_reset = start
... data_lost = []
... channels = StreamingConfiguration(first=True)
... async with sensor_node.open_data_stream(channels) as stream:
... async for data, lost_messages in stream:
... current = monotonic()
... if current >= last_reset + 0.5:
... data_lost.append(stream.dataloss())
... stream.reset_stats()
... last_reset = current
... if current > end:
... break
...
... return data_lost
>>> data_lost = run(determine_data_loss(identifier="Test-STH"))
>>> len(data_lost)
4
>>> all(map(lambda loss: loss < 0.1, data_lost))
True
Note
We used a overall runtime of 2.1 seconds, since in a timing interval of 2 seconds there is always the possibility that the code above either returns three or four data loss values depending on the specific timing.
Slow Processing of Data
The buffer of the CAN controller is only able to store a certain amount of streaming messages before it has to drop them to make room for new ones. For this reason the ICOtronic library will raise a StreamingBufferError, if the buffer for streaming messages exceeds a certain threshold (default: 10 000 messages).
Auxiliary Functionality
Reading Names
After your are connected to a node you can read its (advertisement) name using the coroutine SensorNode.get_name():
>>> from asyncio import run
>>> from icotronic.can import Connection
>>> async def read_sensor_name(name):
... async with Connection() as stu:
... async with stu.connect_sensor_node(name) as sensor_node:
... sensor_name = await sensor_node.get_name()
... return sensor_name
>>> sensor_name = "Test-STH"
>>> run(read_sensor_name(sensor_name))
'Test-STH'
Changing ADC Configuration
To change
the sample rate/frequency (via prescaler, acquisition time and oversampling rate) or
the reference voltage
of the analog digital converter (ADC) of your sensor node you can use the coroutine SensorNode.set_adc_configuration(). Since all of the parameters of this coroutine use default values, you can also just call it without changing any parameters to apply the default ADC configuration.
Note
Some sensor nodes use a different reference voltage (not 3.3V). In this case applying the default configuration might not be what you want.
To retrieve the current ADC configuration use the coroutine SensorNode.get_adc_configuration(), which will return an ADCConfiguration object. This object provides the method ADCConfiguration.sample_rate() to calculate the sampling rate/frequency based on the current value of prescaler, acquisition time and oversampling rate.
In the example below we
apply the default ADC configuration on the sensor node,
retrieve the configuration via the coroutine
SensorNode.get_adc_configuration(), and thenprint the sampling rate/frequency based on the retrieved ADC configuration values.
>>> from asyncio import run
>>> from icotronic.can import Connection
>>> from icotronic.can.adc import ADCConfiguration
>>> async def write_read_adc_config(name):
... async with Connection() as stu:
... async with stu.connect_sensor_node(name) as sensor_node:
... # Set default configuration
... await sensor_node.set_adc_configuration()
... # Retrieve ADC configuration
... adc_configuration = await sensor_node.get_adc_configuration()
... print("Sample Rate: "
... f"{adc_configuration.sample_rate():0.2f} Hz")
>>> run(write_read_adc_config("Test-STH"))
Sample Rate: 9523.81 Hz
One option to decrease the sample rate from the default value of about 9524 Hz is to change the
prescaler,
acquisition time or
oversampling rate.
For a
formula on how to calculate the sample rate based on the values above and
a list of suggested sample rates
please take a look at the section “Sampling Rate” of the general ICOtronic system documentation. The example code below shows you how to change the sample rate to about 4762 Hz.
>>> from asyncio import run
>>> from icotronic.can import Connection
>>> from icotronic.can.adc import ADCConfiguration
>>> async def change_sample_rate(name):
... async with Connection() as stu:
... async with stu.connect_sensor_node(name) as sensor_node:
... # Retrieve current reference voltage
... adc_configuration = await sensor_node.get_adc_configuration()
... reference_voltage = adc_configuration.reference_voltage
... # Change sample rate
... configuration = ADCConfiguration(
... prescaler=2,
... acquisition_time=8,
... oversampling_rate=128,
... reference_voltage=reference_voltage)
... print("Set sample rate to "
... f"{configuration.sample_rate():0.2f} Hz")
... await sensor_node.set_adc_configuration(**configuration)
>>> run(change_sample_rate("Test-STH"))
Set sample rate to 4761.90 Hz
Changing Channel Configuration
Some ICOtronic sensor nodes support changing the mapping from the three available measurement channels to different sensor channels. To get the current mapping you can use the coroutine SensorNode.get_sensor_configuration(). The following code:
from asyncio import run
from icotronic.can import Connection
from icotronic.can.error import UnsupportedFeatureException
async def read_sensor_configuration(identifier):
async with Connection() as stu:
async with stu.connect_sensor_node(identifier) as sensor_node:
try:
print(await sensor_node.get_sensor_configuration())
except UnsupportedFeatureException as error:
print(error)
run(read_sensor_configuration("Test-STH"))
will either
print the current sensor configuration e.g.:
M1: S1, M2: S2, M3: S3
or print an error message:
Reading sensor configuration not supported
if the sensor node does not support getting (or setting) the sensor configuration.
To set the sensor configuration you can use the coroutine SensorNode.set_sensor_configuration().
The method expects a SensorConfiguration object as parameter. The following code:
from asyncio import run
from icotronic.can import Connection, SensorConfiguration
from icotronic.can.error import UnsupportedFeatureException
async def set_sensor_configuration(identifier):
async with Connection() as stu:
async with stu.connect_sensor_node(identifier) as sensor_node:
try:
config = await sensor_node.get_sensor_configuration()
print(f"Sensor configuration: {config}")
await sensor_node.set_sensor_configuration(
SensorConfiguration(first=2, second=4, third=1)
)
config = await sensor_node.get_sensor_configuration()
print(f"Updated sensor configuration: {config}")
except UnsupportedFeatureException:
print("Sensor configuration not supported by device")
run(set_sensor_configuration("Test-STH"))
will set the sensor configuration for
the first measurement channel to the sensor channel 2,
the second measurement channel to the sensor channel 4, and
the third measurement channel to the sensor channel 1.
For hardware that supports channel configuration the code should print the following text:
Sensor configuration: M1: S1, M2: S2, M3: S3
Updated sensor configuration: M1: S2, M2: S4, M3: S1
Classes & Functions
Connection
- class icotronic.can.Connection
Basic class to initialize CAN communication
To actually connect to the CAN bus you need to use the async context manager, provided by this class. If you want to manage the connection yourself, please just use
__aenter__and__aexit__manually.Examples
Create a new connection (without connecting to the CAN bus)
>>> connection = Connection()
- async __aenter__()
Connect to the STU
- Return type:
- Returns:
An object that can be used to communicate with the STU
- Raises:
CANInitError – if the CAN initialization fails
Examples
Import required library code
>>> from asyncio import run
Use a context manager to handle the cleanup process automatically
>>> async def connect_can_context(): ... async with Connection() as stu: ... pass >>> run(connect_can_context())
Create and shutdown the connection explicitly
>>> async def connect_can_manual(): ... connection = Connection() ... connected = await connection.__aenter__() ... await connection.__aexit__(None, None, None) >>> run(connect_can_manual())
Nodes
- class icotronic.can.STU(spu)
Communicate and control a connected STU
- Parameters:
spu (
SPU) – The SPU object that created this STU instance
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Create an STU object
>>> async def create_stu(): ... async with Connection() as stu: ... pass # call some coroutines of `stu` object >>> run(create_stu())
- async activate_bluetooth()
Activate Bluetooth on the STU
- Return type:
None
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Activate Bluetooth on the STU
>>> async def activate(): ... async with Connection() as stu: ... await stu.activate_bluetooth() >>> run(activate())
- async collect_sensor_nodes(timeout=5)
Collect available sensor nodes
This coroutine collects sensor nodes until either
no new sensor node was found or
until the given timeout, if no sensor node was found.
- Parameters:
timeout – The timeout in seconds until this coroutine returns, if no sensor node was found at all
- Return type:
list[SensorNodeInfo]- Returns:
A list of found sensor nodes
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Collect available sensor nodes
>>> async def collect_sensor_nodes(): ... async with Connection() as stu: ... return await stu.collect_sensor_nodes()
>>> # We assume that at least one sensor node is available >>> nodes = run(collect_sensor_nodes()) >>> len(nodes) >= 1 True
- connect_sensor_node(identifier, sensor_node_class=<class 'icotronic.can.node.sensor.SensorNode'>)
Connect to a sensor node (e.g. SHA, SMH or STH)
- Parameters:
identifier (
int|str|EUI) –The
MAC address (EUI),
name (str), or
node number (int)
of the sensor node we want to connect to
sensor_node_class (
type[SensorNode]) – Sensor node subclass that should be returned by context manager
- Return type:
AsyncSensorNodeManager- Returns:
A context manager that returns a sensor node object for the connected node
- Raises:
ValueError – If you use an invalid name or node number as identifier
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Connect to the sensor node with node number
0>>> async def connect_sensor_node(): ... async with Connection() as stu: ... async with stu.connect_sensor_node(0): ... connected = await stu.is_connected() ... after = await stu.is_connected() ... return (connected, after) >>> run(connect_sensor_node()) (True, False)
- async connect_with_mac_address(mac_address)
Connect to a Bluetooth sensor node using its MAC address
- Parameters:
mac_address (
EUI) – The MAC address of the sensor node- Return type:
None
Examples
Import required library code
>>> from asyncio import run, sleep >>> from icotronic.can.connection import Connection
Connect to a sensor node via its MAC address
>>> async def get_bluetooth_mac(): ... async with Connection() as stu: ... await stu.activate_bluetooth() ... # Wait for Bluetooth activation to take place ... await sleep(2) ... return await stu.get_mac_address(0) >>> mac_address = run(get_bluetooth_mac()) >>> mac_address != EUI(0) True
>>> async def connect(mac_address): ... async with Connection() as stu: ... await stu.deactivate_bluetooth() ... # We assume that at least one STH is available ... connected = before = await stu.is_connected() ... await stu.activate_bluetooth() ... while not connected: ... await stu.connect_with_mac_address(mac_address) ... await sleep(0.1) ... connected = await stu.is_connected() ... await stu.deactivate_bluetooth() ... after = await stu.is_connected() ... # Return status of Bluetooth node connect response ... return before, connected, after >>> run(connect(mac_address)) (False, True, False)
- async connect_with_number(sensor_node_number=0)
Connect to a Bluetooth node using a node number
- Parameters:
sensor_node_number (
int) – The number of the Bluetooth node (0 up to the number of available nodes - 1)- Return type:
bool- Returns:
True, if
in search mode,
at least single node was found,
no legacy mode,
and scanning mode active
False, otherwise
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Connect to node “0”
>>> async def connect_bluetooth_sensor_node_number(): ... async with Connection() as stu: ... await stu.activate_bluetooth() ... # We assume that at least one STH is available ... connected = before = await stu.is_connected() ... while not connected: ... connected = await stu.connect_with_number(0) ... await stu.deactivate_bluetooth() ... after = await stu.is_connected() ... # Return status of Bluetooth node connect response ... return before, connected, after >>> run(connect_bluetooth_sensor_node_number()) (False, True, False)
- async deactivate_bluetooth()
Deactivate Bluetooth on the STU
- Return type:
None
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Deactivate Bluetooth on STU 1
>>> async def deactivate_bluetooth(): ... async with Connection() as stu: ... await stu.deactivate_bluetooth() >>> run(deactivate_bluetooth())
- async get_available_nodes()
Retrieve the number of available sensor nodes
- Return type:
int- Returns:
The number of available sensor nodes
Examples
Import required library code
>>> from asyncio import run, sleep >>> from icotronic.can.connection import Connection
Get the number of available Bluetooth nodes at STU 1
>>> async def get_number_bluetooth_nodes(): ... async with Connection() as stu: ... await stu.activate_bluetooth() ... ... # We assume at least one STH is available ... number_sths = 0 ... while number_sths <= 0: ... number_sths = await stu.get_available_nodes() ... await sleep(0.1) ... ... return number_sths >>> run(get_number_bluetooth_nodes()) >= 0 1
- async get_mac_address(sensor_node_number=255)
Retrieve the MAC address of the STU or a sensor node
Note
Bluetooth needs to be activated before calling this coroutine, otherwise an incorrect MAC address will be returned (for sensor nodes).
- Parameters:
sensor_node_number (
int) – The node number of the Bluetooth node (0 up to the number of available nodes - 1) or0x00(SENSOR_NODE_NUMBER_SELF_ADDRESSING) to retrieve the MAC address of the STU itself- Return type:
EUI- Returns:
The MAC address of the specified sensor node
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Retrieve the MAC address of STH 1
>>> async def get_bluetooth_mac(): ... async with Connection() as stu: ... await stu.activate_bluetooth() ... return await stu.get_mac_address(0) >>> mac_address = run(get_bluetooth_mac()) >>> isinstance(mac_address, EUI) True >>> mac_address != EUI(0) True
- async get_name(sensor_node_number=255)
Retrieve the name of a sensor node
- Parameters:
sensor_node_number (
int) – The number of the Bluetooth node (0 up to the number of available nodes - 1); Use the special node numberSENSOR_NODE_NUMBER_SELF_ADDRESSINGto retrieve the name of the STU itself.
Note
You are probably only interested in the name of the STU itself (
SENSOR_NODE_NUMBER_SELF_ADDRESSING), if you want to know the advertisement name of the STU in OTA (Over The Air) update mode for flashing a new firmware onto the STU.- Return type:
str- Returns:
The (Bluetooth broadcast) name of the node
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Get Bluetooth advertisement name of node “0” from STU 1
>>> async def get_bluetooth_node_name(): ... async with Connection() as stu: ... await stu.activate_bluetooth() ... # We assume that at least one STH is available ... return await stu.get_name(0) >>> name = run(get_bluetooth_node_name()) >>> isinstance(name, str) True >>> 0 <= len(name) <= 8 True
- async get_rssi(sensor_node_number)
Retrieve the RSSI (Received Signal Strength Indication) of an STH
- Parameters:
sensor_node_number (
int) – The number of the Bluetooth node (0 up to the number of available nodes)- Returns:
The RSSI of the node
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Retrieve the RSSI of a disconnected STH
>>> async def get_bluetooth_rssi(): ... async with Connection() as stu: ... await stu.activate_bluetooth() ... # We assume that at least one STH is available ... # Get the RSSI of node “0” ... return await stu.get_rssi(0) >>> rssi = run(get_bluetooth_rssi()) >>> -80 < rssi < 0 True
- async get_sensor_nodes()
Retrieve a list of available sensor nodes
- Return type:
list[SensorNodeInfo]- Returns:
A list of available nodes including node number, name, MAC address and RSSI for each node
Examples
Import required library code
>>> from asyncio import run, sleep >>> from netaddr import EUI >>> from icotronic.can.connection import Connection
Retrieve the list of Bluetooth nodes at STU 1
>>> async def get_sensor_nodes(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... nodes = [] ... while not nodes: ... nodes = await stu.get_sensor_nodes() ... await sleep(0.1) ... ... return nodes >>> nodes = run(get_sensor_nodes()) >>> len(nodes) >= 1 True >>> node = nodes[0]
>>> node.sensor_node_number 0
>>> isinstance(node.name, str) True >>> 0 <= len(node.name) <= 8 True
>>> -80 < node.rssi < 0 True
>>> isinstance(node.mac_address, EUI) True
- async is_connected()
Check if the STU is connected to a Bluetooth node
Returns:
True, if a Bluetooth node is connected to the node
False, otherwise
Examples
>>> from asyncio import run, sleep >>> from icotronic.can.connection import Connection
Check connection of node “0” to STU
>>> async def check_bluetooth_connection(): ... async with Connection() as stu: ... await stu.activate_bluetooth() ... await sleep(0.1) ... connected_start = await stu.is_connected() ... ... # We assume that at least one STH is available ... await stu.connect_with_number(0) ... # Wait for node connection ... connected_between = False ... while not connected_between: ... connected_between = await stu.is_connected() ... await sleep(0.1) ... await stu.connect_with_number(0) ... ... # Deactivate Bluetooth connection ... await stu.deactivate_bluetooth() ... # Wait until node is disconnected ... await sleep(0.1) ... connected_after = await stu.is_connected() ... ... return (connected_start, connected_between, ... connected_after) >>> run(check_bluetooth_connection()) (False, True, False)
- Return type:
bool
- class icotronic.can.SensorNode(spu, eeprom=<class 'icotronic.can.node.eeprom.sensor.SensorNodeEEPROM'>)
Communicate and control a connected sensor node (SHA, STH, SMH)
- Parameters:
spu (
SPU) – The SPU object used to connect to this sensor nodeeeprom (
type[SensorNodeEEPROM]) – The EEPROM class of the node
- async get_adc_configuration()
Read the current ADC configuration
- Return type:
- Returns:
The ADC configuration of the sensor node
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Read ADC sensor config of sensor node with node id 0
>>> async def read_adc_config(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... return await sensor_node.get_adc_configuration() >>> run(read_adc_config()) Get, Prescaler: 2, Acquisition Time: 8, Oversampling Rate: 64, Reference Voltage: 3.3 V
- async get_energy_mode_lowest()
Read the reduced lowest energy mode (mode 2) time values
See also:
- Return type:
Times- Returns:
A tuple containing the advertisement time in the lowest energy mode in milliseconds and the time until the node will switch from the reduced energy mode (mode 1) to the lowest energy mode (mode 2) – if there is no activity – in milliseconds
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Retrieve the reduced energy time values of a sensor node
>>> async def read_energy_mode_lowest(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... return await sensor_node.get_energy_mode_lowest() >>> times = run(read_energy_mode_lowest()) >>> round(times.advertisement) 2500 >>> times.sleep 259200000
- async get_energy_mode_reduced()
Read the reduced energy mode (mode 1) sensor node time values
See also:
- Return type:
Times- Returns:
A tuple containing the advertisement time in the reduced energy mode in milliseconds and the time until the node will switch from the disconnected state to the low energy mode (mode 1) – if there is no activity – in milliseconds
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Retrieve the reduced energy time values of a sensor node
>>> async def read_energy_mode_reduced(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... return await sensor_node.get_energy_mode_reduced() >>> times = run(read_energy_mode_reduced()) >>> round(times.advertisement) 1250 >>> times.sleep 300000
- async get_mac_address()
Retrieve the MAC address of the sensor node
- Return type:
EUI- Returns:
The MAC address of the specified sensor node
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Retrieve the MAC address of STH 1
>>> async def get_bluetooth_mac(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... return await sensor_node.get_mac_address() >>> mac_address = run(get_bluetooth_mac()) >>> isinstance(mac_address, EUI) True >>> mac_address != EUI(0) True
- async get_name()
Retrieve the name of the sensor node
- Return type:
str- Returns:
The (Bluetooth broadcast) name of the node
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Get Bluetooth advertisement name of node “0”
>>> async def get_sensor_node_name(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... return await sensor_node.get_name() >>> name = run(get_sensor_node_name()) >>> isinstance(name, str) True >>> 0 <= len(name) <= 8 True
- async get_rssi()
Retrieve the RSSI (Received Signal Strength Indication) of the node
- Return type:
int- Returns:
The RSSI of the node
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Get RSSI of node “0”
>>> async def get_sensor_node_rssi(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... return await sensor_node.get_rssi() >>> rssi = run(get_sensor_node_rssi()) >>> -70 < rssi < 0 True
- async get_sensor_configuration()
Read the current sensor configuration
- Raises:
UnsupportedFeatureException – in case the sensor node replies with an error message
- Return type:
- Returns:
The sensor number for the different axes
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Reading sensor config from node without sensor config support fails
>>> async def read_sensor_config(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... return await sensor_node.get_sensor_configuration() >>> config = run( ... read_sensor_config()) Traceback (most recent call last): ... UnsupportedFeatureException: Reading sensor configuration is not supported
- async get_streaming_data_single(channels=Channel 1 enabled, Channel 2 enabled, Channel 3 enabled)
Read a single set of raw ADC values from the sensor node
- Parameters:
channels – Specifies which of the three measurement channels should be enabled or disabled
- Return type:
- Returns:
The latest three ADC values measured by the sensor node
Examples
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Read a single value from all three measurement channels
>>> async def read_sensor_values(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... return (await ... sensor_node.get_streaming_data_single()) >>> data = run(read_sensor_values()) >>> len(data.values) 3 >>> all([0 <= value <= 0xffff for value in data.values]) True
- async get_supply_voltage()
Read the current supply voltage
- Return type:
float- Returns:
The supply voltage of the sensor node
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Read the supply voltage of the sensor node with node number 0
>>> async def get_supply_voltage(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... return await sensor_node.get_supply_voltage() >>> supply_voltage = run(get_supply_voltage()) >>> 3 <= supply_voltage <= 4.2 True
- open_data_stream(channels, timeout=5)
Open measurement data stream
- Parameters:
channels (
StreamingConfiguration) – Specifies which measurement channels should be enabledtimeout (
float) – The amount of seconds between two consecutive messages, before a TimeoutError will be raised
- Return type:
DataStreamContextManager- Returns:
A context manager object for managing stream data
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Read data of first and third channel
>>> async def read_streaming_data(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... channels = StreamingConfiguration(first=True, ... third=True) ... async with sensor_node.open_data_stream( ... channels) as stream: ... first = [] ... third = [] ... messages = 0 ... async for data, _ in stream: ... first.append(data.values[0]) ... third.append(data.values[1]) ... messages += 1 ... if messages >= 3: ... break ... return first, third >>> first, third = run(read_streaming_data()) >>> len(first) 3 >>> len(third) 3
- async set_adc_configuration(reference_voltage=3.3, prescaler=2, acquisition_time=8, oversampling_rate=64)
Change the ADC configuration of a connected sensor node
- Parameters:
reference_voltage (
float) – The ADC reference voltage in Volt (1.25, 1.65, 1.8, 2.1, 2.2, 2.5, 2.7, 3.3, 5, 6.6)prescaler (
int) – The ADC prescaler value (1 – 127)acquisition_time (
int) – The ADC acquisition time in number of cycles (1, 2, 3, 4, 8, 16, 32, … , 256)oversampling_rate (
int) – The ADC oversampling rate (1, 2, 4, 8, … , 4096)
- Return type:
None
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Read and write ADC sensor config
>>> async def write_read_adc_config(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... await sensor_node.set_adc_configuration( ... 3.3, 8, 8, 64) ... modified_config1 = (await ... sensor_node.get_adc_configuration()) ... ... adc_config = ADCConfiguration( ... reference_voltage=5.0, ... prescaler=16, ... acquisition_time=8, ... oversampling_rate=128) ... await sensor_node.set_adc_configuration( ... **adc_config) ... modified_config2 = (await ... sensor_node.get_adc_configuration()) ... ... # Write back default config values ... await sensor_node.set_adc_configuration( ... 3.3, 2, 8, 64) ... return modified_config1, modified_config2 >>> config1, config2 = run(write_read_adc_config()) >>> config1 Get, Prescaler: 8, Acquisition Time: 8, Oversampling Rate: 64, Reference Voltage: 3.3 V >>> config2 Get, Prescaler: 16, Acquisition Time: 8, Oversampling Rate: 128, Reference Voltage: 5.0 V
- async set_energy_mode_lowest(times=Advertisement Time: 2500.0 ms, Sleep Time: 259200000 ms)
Writes the time values for the lowest energy mode (mode 2)
See also:
- Parameters:
times (
Times) – The values for the advertisement time in the reduced energy mode in milliseconds and the time until the node will go into the lowest energy mode (mode 2) from the reduced energy mode (mode 1) – if there is no activity – in milliseconds.- Return type:
None
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Read and write the reduced energy time values of a sensor node
>>> async def read_write_energy_mode_lowest(sleep, advertisement): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... await sensor_node.set_energy_mode_lowest( ... Times(sleep=sleep, ... advertisement=advertisement)) ... times = await sensor_node.get_energy_mode_lowest() ... ... await sensor_node.set_energy_mode_lowest() ... ... return times >>> times = run(read_write_energy_mode_lowest(200_000, 2000)) >>> times.sleep 200000 >>> round(times.advertisement) 2000
- async set_energy_mode_reduced(times=Advertisement Time: 1250.0 ms, Sleep Time: 300000 ms)
Writes the time values for the reduced energy mode (mode 1)
See also:
- Parameters:
times (
Times) – The values for the advertisement time in the reduced energy mode in milliseconds and the time until the node will go into the low energy mode (mode 1) from the disconnected state – if there is no activity – in milliseconds.- Return type:
None
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Read and write the reduced energy time values of a sensor node
>>> async def read_write_energy_mode_reduced(sleep, advertisement): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... await sensor_node.set_energy_mode_reduced( ... Times(sleep=sleep, ... advertisement=advertisement)) ... times = await sensor_node.get_energy_mode_reduced() ... ... await sensor_node.set_energy_mode_reduced() ... ... return times >>> times = run(read_write_energy_mode_reduced(200_000, 2000)) >>> times.sleep 200000 >>> round(times.advertisement) 2000
- async set_name(name)
Set the name of a sensor node
- Parameters:
name (
str) – The new name for the node- Return type:
None
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Change the name of a sensor node
>>> async def test_naming(name): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... # and that this node currently does not have the name ... # specified in the variable `name`. ... async with stu.connect_sensor_node(0) as sensor_node: ... before = await sensor_node.get_name() ... await sensor_node.set_name(name) ... updated = await sensor_node.get_name() ... await sensor_node.set_name(before) ... after = await sensor_node.get_name() ... return before, updated, after >>> before, updated, after = run(test_naming("Hello")) >>> before != "Hello" True >>> updated 'Hello' >>> before == after True
- async set_sensor_configuration(sensors)
Change the sensor numbers for the different measurement channels
If you use the sensor number 0 for one of the different measurement channels, then the sensor (number) for that channel will stay the same.
- Parameters:
sensors (
SensorConfiguration) – The sensor numbers of the different measurement channels- Return type:
None
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Setting sensor config from node without sensor config support fails
>>> async def set_sensor_config(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0) as sensor_node: ... await sensor_node.set_sensor_configuration( ... SensorConfiguration( ... first=0, second=0, third=0)) >>> config = run( ... set_sensor_config()) Traceback (most recent call last): ... UnsupportedFeatureException: Writing sensor configuration is not supported
- async start_streaming_data(channels)
Start streaming data
- Parameters:
channels (
StreamingConfiguration) – Specifies which of the three measurement channels should be enabled or disabled- Return type:
None
The CAN identifier that this coroutine returns can be used to filter CAN messages that contain the expected streaming data
- async stop_streaming_data(retries=10, ignore_errors=False)
Stop streaming data
- Parameters:
retries (
int) – The number of times the message is sent again, if no response was sent back in a certain amount of timeignore_errors – Specifies, if this coroutine should ignore, if there were any problems while stopping the stream.
- Return type:
None
- class icotronic.can.STH(spu)
Communicate and control a connected SHA or STH
- Parameters:
spu (
SPU) – The SPU object used to communicate with the node
- async activate_acceleration_self_test(dimension='x')
Activate self test of STH accelerometer
- Parameters:
dimension (
str) – The dimension (x,yorz) for which the self test should be activated.- Return type:
None
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Activate and deactivate acceleration self-test
>>> async def test_self_test(): ... async with Connection() as stu: ... # We assume that at least one sensor node is ... # available ... async with stu.connect_sensor_node(0, STH) as sth: ... await sth.activate_acceleration_self_test() ... await sth.deactivate_acceleration_self_test() >>> run(test_self_test())
- async deactivate_acceleration_self_test(dimension='x')
Deactivate self test of STH accelerometer
- Parameters:
dimension (
str) – The dimension (x,yorz) for which the self test should be deactivated.- Return type:
None
- async get_acceleration_conversion_function(default=200, ignore_errors=False)
Retrieve function to convert raw sensor data into g
- Parameters:
default (
int) – The default sensor range that should be used, if the sensor range could not be determined from the EEPROM; This value will only be used ifignore_errorsis set toTrue; The default value for of200represents a ±100 g₀ sensorignore_errors (
bool) – Determines, if the function should raise an error, if the sensor range value value could not be determined; If this is set toFalse, then the function will use the valuedefaultas default sensor range
- Return type:
Callable[[float],float]- Returns:
A function that converts 16 bit raw values from the STH into multiples of earth’s gravitation (g)
- Raises:
ValueError – If the sensor range could not be determined and
ignore_errorsis set toFalse
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection >>> from icotronic.can.node.sth import STH
Convert a raw ADC value into multiples of g
>>> async def read_sensor_values(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0, STH) as sth: ... convert_to_g = (await ... sth.get_acceleration_conversion_function()) ... data = await sth.get_streaming_data_single() ... before = list(data.values) ... data.apply(convert_to_g) ... return before, data.values >>> before, after = run(read_sensor_values()) >>> all([0 <= value <= 2**16 for value in before]) True >>> all([-100 <= value <= 100 for value in after]) True
- async get_acceleration_sensor_range_in_g(default=200, ignore_errors=False)
Retrieve the maximum acceleration sensor range in multiples of g₀
For a ±100 g₀ sensor this method returns 200 (100 + abs(-100)).
For a ±50 g₀ sensor this method returns 100 (50 + abs(-50)).
For this to work correctly the EEPROM value of the x-axis acceleration offset in the EEPROM has to be set.
- Parameters:
default (
int) – The default value that should be used, if the value could not be determined from the EEPROM; This value will only be used ifignore_errorsis set toTrueignore_errors (
bool) – Determines, if the function should raise an error, if the value could not be determined from the EEPROM
- Return type:
int- Returns:
Range of current acceleration sensor in multiples of earth’s gravitation
- Raises:
ValueError – If the sensor range could not be determined and
ignore_errorsis set toFalse
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection >>> from icotronic.can.node.sth import STH
Write and read the acceleration offset of STH 1
>>> async def read_sensor_range(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0, STH) as sth: ... return (await ... sth.get_acceleration_sensor_range_in_g()) >>> sensor_range = run(read_sensor_range()) >>> 0 < sensor_range <= 200 True
- async get_acceleration_voltage(dimension='x', reference_voltage=3.3)
Retrieve the current acceleration voltage in Volt
- Parameters:
dimension (
str) – The dimension (x=1, y=2, z=3) for which the acceleration voltage should be measuredreference_voltage (
float) – The reference voltage for the ADC in Volt
- Return type:
float- Returns:
The voltage of the acceleration sensor in Volt
Examples
Import required library code
>>> from asyncio import run >>> from icotronic.can.connection import Connection
Read the acceleration voltage of STH 1
>>> async def read_acceleration_voltage(): ... async with Connection() as stu: ... # We assume that at least one sensor node is available ... async with stu.connect_sensor_node(0, STH) as sth: ... before = await sth.get_acceleration_voltage() ... await sth.activate_acceleration_self_test() ... between = await sth.get_acceleration_voltage() ... await sth.deactivate_acceleration_self_test() ... after = await sth.get_acceleration_voltage() ... return (before, between, after) >>> before, between, after = run(read_acceleration_voltage()) >>> before < between and after < between True
Streaming
- class icotronic.can.streaming.AsyncStreamBuffer(timeout, max_buffer_size)
Buffer for streaming data
- Parameters:
timeout (
float) – The amount of seconds between two consecutive messages, before aStreamingTimeoutErrorwill be raisedmax_buffer_size (
int) – Maximum amount of buffered messages kept by the stream buffer. If this amount is exceeded, then this listener will raise aStreamingBufferError. A large buffer indicates that the application is not able to keep up with the current rate of retrieved messages and therefore the probability of losing messages is quite high.
- dataloss()
Calculate the overall amount of data loss
- Return type:
float- Returns:
The overall amount of data loss as number between 0 (no data loss) and 1 (all data lost).
- reset_stats()
Reset the message statistics
This method resets the amount of lost an retrieved messages used in the calculation of the method
dataloss. Using this method can be useful, if you want to calculate the amount of data loss since a specific starting point.- Return type:
None
- class icotronic.can.streaming.StreamingConfiguration(first=True, second=False, third=False)
Streaming configuration
- Parameters:
first (
bool) – Specifies if the first channel is enabled or notsecond (
bool) – Specifies if the second channel is enabled or notthird (
bool) – Specifies if the third channel is enabled or not
- Raises:
ValueError – if none of the channels is active
Examples
Create some example streaming configurations
>>> config = StreamingConfiguration() >>> config = StreamingConfiguration( ... first=False, second=True, third=True)
Creating streaming configurations without active channels will fail
>>> config = StreamingConfiguration(first=False) Traceback (most recent call last): ... ValueError: At least one channel needs to be active
>>> config = StreamingConfiguration( ... first=False, second=False, third=False) Traceback (most recent call last): ... ValueError: At least one channel needs to be active
- axes()
Get the activated axes returned by this streaming configuration
- Return type:
list[str]- Returns:
A list containing all activated axes in alphabetical order
Examples
Get the activated axes for example streaming configurations
>>> StreamingConfiguration( ... first=False, second=True, third=True).axes() ['y', 'z'] >>> StreamingConfiguration( ... first=True, second=True, third=False).axes() ['x', 'y']
- data_length()
Returns the streaming data length
This will be either:
2 (when 2 channels are active), or
3 (when 1 or 3 channels are active)
For more information, please take a look here.
- Return type:
int- Returns:
The length of the streaming data resulting from this channel configuration
Examples
Get the data length of example streaming configurations
>>> StreamingConfiguration().data_length() 3
>>> StreamingConfiguration( ... first=False, second=True, third=False).data_length() 3
>>> StreamingConfiguration( ... first=True, second=True, third=True).data_length() 3
>>> StreamingConfiguration( ... first=False, second=True, third=True).data_length() 2
- enabled_channels()
Get the number of activated channels
- Return type:
int- Returns:
The number of enabled channels
Examples
Get the number of enabled channels for example streaming configs
>>> StreamingConfiguration(first=True).enabled_channels() 1
>>> StreamingConfiguration(first=False, second=True, third=False ... ).enabled_channels() 1
>>> StreamingConfiguration(first=True, second=True, third=True ... ).enabled_channels() 3
- property first: bool
Check the activation state of the first channel
Returns:
True, if the first channel is enabled orFalseotherwiseExamples
Check channel one activation status for example configs
>>> StreamingConfiguration(first=True, second=False, ... third=False).first True >>> StreamingConfiguration(first=False, second=False, ... third=True).first False
- property second: bool
Check the activation state of the second channel
- Returns:
True, if the second channel is enabled orFalseotherwise
Examples
Check channel two activation status for example configs
>>> StreamingConfiguration( ... first=True, second=False, third=False).second False >>> StreamingConfiguration( ... first=False, second=True, third=True).second True
- property third: bool
Check the activation state of the third channel
Returns:
True, if the third channel is enabled orFalseotherwiseExamples
Check channel three activation status for example configs
>>> StreamingConfiguration( ... first=True, second=False, third=False).third False >>> StreamingConfiguration( ... first=False, second=False, third=True).third True
- class icotronic.can.streaming.StreamingData(counter, timestamp, values)
Support for storing data of a streaming message
- Parameters:
counter (
int) – The message counter valuetimestamp (
float) – The message timestampvalues (
list[float]) – The streaming values
Examples
Create new streaming data
>>> StreamingData(values=[1, 2, 3], counter=21, timestamp=1) [1, 2, 3]@1 #21
Streaming data must store either two or three values
>>> StreamingData(values=[1], counter=21, timestamp=1) Traceback (most recent call last): ... ValueError: Incorrect number of streaming values: 1 (instead of 2 or 3)
>>> StreamingData(values=[1, 2, 3, 4], counter=21, timestamp=1 ... ) Traceback (most recent call last): ... ValueError: Incorrect number of ... values: 4 (instead of 2 or 3)
- apply(function)
Apply a certain function to the streaming data
Note
This function changes the stored values in the streaming data and (as convenience feature) also returns the modified streaming data itself. This is useful if you want to use the modified streaming as parameter in a function call, i.e. you can use something like
function(stream_data.apply()).- Parameters:
function (
Callable[[float],float]) – The function that should be applied to the streaming data- Return type:
- Returns:
The modified streaming data
Examples
Add the constant 10 to some example streaming data
>>> data = StreamingData(values=[1, 2, 3], counter=21, timestamp=1) >>> data.apply(lambda value: value + 10) [11, 12, 13]@1 #21 >>> data.values [11, 12, 13]
Measurement
Data
- class icotronic.measurement.DataPoint(counter: int, timestamp: float, value: float)
Streaming data point
Create new instance of DataPoint(counter, timestamp, value)
- counter: int
Message counter
- timestamp: float
Timestamp of data
- value: float
Data value
- class icotronic.measurement.ChannelData
Store measurement data for a single channel
- __add__(other)
Concatenate channel data with other channel data
- Parameters:
other (
object) – The other channel data that should be concatenated to this channel data object- Return type:
- Returns:
A new channel data object containing the data point of this channel data object followed by the data points of
other
Examples
Concatenate some channel data
>>> t1 = 1756124450.256398 >>> data1 = ChannelData() >>> data1.append(DataPoint(counter=1, timestamp=t1, value=4)) >>> data1.append(DataPoint(counter=1, timestamp=t1, value=5)) >>> data1.append(DataPoint(counter=1, timestamp=t1, value=6)) >>> data1 4@1756124450.256398 #1 5@1756124450.256398 #1 6@1756124450.256398 #1
>>> t2 = 1756124450.2564 >>> data2 = ChannelData() >>> data2.append(DataPoint(counter=2, timestamp=t2, value=7)) >>> data2.append(DataPoint(counter=2, timestamp=t2, value=8)) >>> data2.append(DataPoint(counter=2, timestamp=t2, value=9)) >>> data2 7@1756124450.2564 #2 8@1756124450.2564 #2 9@1756124450.2564 #2
>>> data1 + data2 4@1756124450.256398 #1 5@1756124450.256398 #1 6@1756124450.256398 #1 7@1756124450.2564 #2 8@1756124450.2564 #2 9@1756124450.2564 #2
- append(data)
Append a value to the channel data
- Parameters:
data (
DataPoint) – The data point that should be added to the channel data- Return type:
None
Examples
Add some data points to a channel data object
>>> data = ChannelData()
>>> t1 = 1756124450.256398 >>> t2 = t1 + 0.000003 >>> data.append(DataPoint(counter=255, timestamp=t1, value=10)) >>> data.append(DataPoint(counter=1, timestamp=t2, value=20)) >>> data 10@1756124450.256398 #255 20@1756124450.256401 #1
- values()
Return a list of all the values of the channel data
- Return type:
list[float]- Returns:
The values of the channel data
Examples
Return the values of example channel data
>>> data = ChannelData()
>>> t1 = 1756124450.256398 >>> t2 = t1 + 0.000003 >>> data.append(DataPoint(counter=10, timestamp=t1, value=10)) >>> data.append(DataPoint(counter=11, timestamp=t2, value=20)) >>> data.values() [10, 20]
- class icotronic.measurement.MeasurementData(configuration)
Measurement data
- Parameters:
configuration (
StreamingConfiguration) – The streaming configuration that was used to collect the measurement data
- append(data)
Append some streaming data to the measurement
- Parameters:
data (
StreamingData) – The streaming data that should be added to the measurement- Return type:
None
Examples
Append some streaming data to a measurement
>>> config = StreamingConfiguration(first=True, second=True, ... third=True) >>> data = MeasurementData(config) >>> data Channel 1 enabled, Channel 2 enabled, Channel 3 enabled
>>> s1 = StreamingData(values=[4, 5, 3], counter=15, ... timestamp=1756197008.776551) >>> data.append(s1) >>> data Channel 1 enabled, Channel 2 enabled, Channel 3 enabled [4, 5, 3]@1756197008.776551 #15
- apply(conversion)
Apply functions to the values stored in the measurement
- Parameters:
conversion (
Conversion) – The conversion functions that will be applied to the measurement- Return type:
- Returns:
The measurement data itself, after the conversion was applied
Examples
Apply functions to some measurement data with one active channel
>>> config = StreamingConfiguration(first=False, second=True, ... third=False) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 20, 81], counter=22, ... timestamp=1756125747.528234) >>> s2 = StreamingData(values=[50, 1, 29], counter=25, ... timestamp=1756125747.528254) >>> data.append(s1) >>> data.append(s2)
>>> plus_two = (lambda value: value + 2) >>> conversion = Conversion(second=plus_two)
>>> data Channel 1 disabled, Channel 2 enabled, Channel 3 disabled [1, 20, 81]@1756125747.528234 #22 [50, 1, 29]@1756125747.528254 #25 >>> data.apply(conversion) Channel 1 disabled, Channel 2 enabled, Channel 3 disabled [3, 22, 83]@1756125747.528234 #22 [52, 3, 31]@1756125747.528254 #25
Apply functions to some measurement data with two active channels
>>> config = StreamingConfiguration(first=True, second=False, ... third=True) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2], counter=255, ... timestamp=1756125747.528234) >>> s2 = StreamingData(values=[3, 4], counter=0, ... timestamp=1756125747.528237) >>> data.append(s1) >>> data.append(s2)
>>> conversion = Conversion(third=plus_two)
>>> data Channel 1 enabled, Channel 2 disabled, Channel 3 enabled [1, 2]@1756125747.528234 #255 [3, 4]@1756125747.528237 #0
>>> data.apply(conversion) Channel 1 enabled, Channel 2 disabled, Channel 3 enabled [1, 4]@1756125747.528234 #255 [3, 6]@1756125747.528237 #0
Apply functions to some measurement data with three active channels
>>> config = StreamingConfiguration(first=True, second=True, ... third=True) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[4, 5, 3], counter=15, ... timestamp=1756197008.776551) >>> s2 = StreamingData(values=[8, 10, 6], counter=16, ... timestamp=1756197008.776559) >>> data.append(s1) >>> data.append(s2)
>>> double = (lambda value: value * 2) >>> conversion = Conversion(first=double, third=plus_two)
>>> data Channel 1 enabled, Channel 2 enabled, Channel 3 enabled [4, 5, 3]@1756197008.776551 #15 [8, 10, 6]@1756197008.776559 #16
>>> data.apply(conversion) Channel 1 enabled, Channel 2 enabled, Channel 3 enabled [8, 5, 5]@1756197008.776551 #15 [16, 10, 8]@1756197008.776559 #16
- dataloss()
Get measurement dataloss based on message counters
- Return type:
float- Returns:
The overall amount of dataloss as number between 0 (no data loss) and 1 (all data lost).
- extend(data)
Extend this measurement data with some other measurement data
- Parameters:
data (
MeasurementData) – The measurement data that should be added to this measurement- Return type:
None
Examples
Extend measurement data with other measurement data
>>> config = StreamingConfiguration(first=True, second=False, ... third=True) >>> data1 = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2], counter=255, ... timestamp=1756125747.528234) >>> s2 = StreamingData(values=[3, 4], counter=0, ... timestamp=1756125747.528237) >>> data1.append(s1) >>> data1.append(s2) >>> data1 Channel 1 enabled, Channel 2 disabled, Channel 3 enabled [1, 2]@1756125747.528234 #255 [3, 4]@1756125747.528237 #0
>>> data2 = MeasurementData(config) >>> s3 = StreamingData(values=[10, 20], counter=1, ... timestamp=1756125747.678912) >>> data2.append(s3) >>> data2 Channel 1 enabled, Channel 2 disabled, Channel 3 enabled [10, 20]@1756125747.678912 #1
>>> data1.extend(data2) >>> data1 Channel 1 enabled, Channel 2 disabled, Channel 3 enabled [1, 2]@1756125747.528234 #255 [3, 4]@1756125747.528237 #0 [10, 20]@1756125747.678912 #1
- first()
Get all data of the first measurement channel
- Return type:
- Returns:
Data values for the first measurement channel
Examples
Get first channel data of measurement with two enabled channels
>>> config = StreamingConfiguration(first=True, second=True, ... third=False) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2], counter=255, ... timestamp=1756125747.528234) >>> s2 = StreamingData(values=[3, 4], counter=0, ... timestamp=1756125747.528237) >>> data.append(s1) >>> data.append(s2) >>> data.first() 1@1756125747.528234 #255 3@1756125747.528237 #0 >>> data.second() 2@1756125747.528234 #255 4@1756125747.528237 #0 >>> data.third()
Get first channel data of measurement with one enabled channel
>>> config = StreamingConfiguration(first=True, second=False, ... third=False) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2, 3], counter=10, ... timestamp=1756126628.820695) >>> s2 = StreamingData(values=[4, 5, 6], counter=20, ... timestamp=1756126628.8207) >>> data.append(s1) >>> data.append(s2) >>> data.first() 1@1756126628.820695 #10 2@1756126628.820695 #10 3@1756126628.820695 #10 4@1756126628.8207 #20 5@1756126628.8207 #20 6@1756126628.8207 #20 >>> data.second() >>> data.third()
- second()
Get all data of the second measurement channel
- Return type:
- Returns:
Data values for the second measurement channel
Examples
Get second channel data of measurement with two enabled channels
>>> config = StreamingConfiguration(first=False, second=True, ... third=True) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2], counter=255, ... timestamp=1756125747.528234) >>> s2 = StreamingData(values=[3, 4], counter=0, ... timestamp=1756125747.528237) >>> data.append(s1) >>> data.append(s2) >>> data.first() >>> data.second() 1@1756125747.528234 #255 3@1756125747.528237 #0 >>> data.third() 2@1756125747.528234 #255 4@1756125747.528237 #0
Get second channel data of measurement with one enabled channel
>>> config = StreamingConfiguration(first=False, second=True, ... third=False) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2, 3], counter=10, ... timestamp=1756126628.820695) >>> s2 = StreamingData(values=[4, 5, 6], counter=20, ... timestamp=1756126628.8207) >>> data.append(s1) >>> data.append(s2) >>> data.first() >>> data.second() 1@1756126628.820695 #10 2@1756126628.820695 #10 3@1756126628.820695 #10 4@1756126628.8207 #20 5@1756126628.8207 #20 6@1756126628.8207 #20 >>> data.third()
- third()
Get all data of the third measurement channel
- Return type:
- Returns:
Data values for the third measurement channel
Examples
Get third channel data of measurement with two enabled channels
>>> config = StreamingConfiguration(first=True, second=False, ... third=True) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2], counter=255, ... timestamp=1756125747.528234) >>> s2 = StreamingData(values=[3, 4], counter=0, ... timestamp=1756125747.528237) >>> data.append(s1) >>> data.append(s2) >>> data.first() 1@1756125747.528234 #255 3@1756125747.528237 #0 >>> data.second() >>> data.third() 2@1756125747.528234 #255 4@1756125747.528237 #0
Get third channel data of measurement with one enabled channel
>>> config = StreamingConfiguration(first=False, second=False, ... third=True) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2, 3], counter=10, ... timestamp=1756126628.820695) >>> s2 = StreamingData(values=[4, 5, 6], counter=20, ... timestamp=1756126628.8207) >>> data.append(s1) >>> data.append(s2) >>> data.first() >>> data.second() >>> data.third() 1@1756126628.820695 #10 2@1756126628.820695 #10 3@1756126628.820695 #10 4@1756126628.8207 #20 5@1756126628.8207 #20 6@1756126628.8207 #20
- values()
Return all the values stored in the measurement
The order of values is the same as the one of the underlying streaming data.
- Return type:
list[float]- Returns:
A list containing all measured values
Examples
Get the values of some example measurement data
>>> config = StreamingConfiguration(first=True, second=False, ... third=True) >>> data = MeasurementData(config) >>> s1 = StreamingData(values=[1, 2], counter=0, ... timestamp=1756125747.528234) >>> s2 = StreamingData(values=[3, 4], counter=1, ... timestamp=1756125747.528237) >>> data.append(s1) >>> data.append(s2) >>> data.values() [1, 2, 3, 4]
- class icotronic.measurement.Conversion(first=None, second=None, third=None)
Conversion functions for measurement data
- Parameters:
first (
Callable[[float],float] |None) – The conversion function for the first channelsecond (
Callable[[float],float] |None) – The conversion function for the second channelthird (
Callable[[float],float] |None) – The conversion function for the third channel
Examples
Create a conversion object that doubles values of the first channel
>>> double = lambda value: value * 2 >>> conversion = Conversion(second=double)
Storage
- class icotronic.measurement.storage.Storage(filepath, channels=None)
Context manager class for storing measurement data in HDF5 format
- Parameters:
filepath (
Path|str) – The filepath of the HDF5 file in which this object should store measurement datachannels (
StreamingConfiguration|None) – All channels for which data should be collected orNone, if the axes data should be taken from an existing valid file atfilepath.
Examples
Create new file
>>> filepath = Path("test.hdf5") >>> with Storage(filepath, ... channels=StreamingConfiguration(first=True) ... ) as storage: ... pass
Opening an non empty file but still providing channel info should fail
>>> with Storage(filepath, ... channels=StreamingConfiguration(first=True) ... ) as storage: ... pass Traceback (most recent call last): ... ValueError: File “...” exist but channels parameter is not None >>> filepath.unlink()
- close()
Close the HDF file
- Return type:
None
- open()
Open and initialize the HDF file for writing
- Return type:
- class icotronic.measurement.storage.StorageData(file_handle, channels=None)
Store HDF acceleration data
- Parameters:
file_handle (
File) – The HDF file that should store the datachannels (
StreamingConfiguration|None) – All channels for which data should be collected orNone, if the axes data should be taken from an existing valid file atfilepath.
Examples
Create new data
>>> filepath = Path("test.hdf5") >>> streaming_data = StreamingData(values=[1, 2, 3], counter=1, ... timestamp=4306978.449) >>> with Storage(filepath, StreamingConfiguration(first=True)) as data: ... data.add_streaming_data(streaming_data)
Read from existing file
>>> with Storage(filepath) as data: ... print(data.dataloss_stats()) (1, 0)
>>> filepath.unlink()
- add_measurement_data(measurement_data)
Add streaming data to the storage object
- Parameters:
measurement_data (
MeasurementData) – The streaming data that should be added to the storage- Return type:
None
Examples
Import required library code
>>> from tempfile import NamedTemporaryFile >>> from icotronic.measurement import MeasurementData
Store measurement data for two enabled channels
>>> two_channels = StreamingConfiguration(first=True, second=False, ... third=True) >>> s1 = StreamingData(values=[10, -10], counter=1, timestamp=1) >>> s2 = StreamingData(values=[20, -20], counter=2, timestamp=2) >>> s3 = StreamingData(values=[30, -30], counter=2, timestamp=2) >>> data = MeasurementData(two_channels) >>> data.append(s1) >>> data.append(s2) >>> data.append(s3)
>>> with NamedTemporaryFile(suffix=".hdf5", ... delete_on_close=False) as temp: ... with Storage(temp.name, two_channels) as storage: ... storage.add_measurement_data(data) ... # Normally the class takes care about when to store ... # back data to the disk itself. We do a manual flush ... # here to check the number of stored items. ... storage.acceleration.flush() ... print(storage.acceleration.nrows) 3
- add_streaming_data(streaming_data)
Add streaming data to the storage object
- Parameters:
streaming_data (
StreamingData) – The streaming data that should be added to the storage- Return type:
None
Examples
Store streaming data for single channel
>>> channel3 = StreamingConfiguration(first=False, second=False, ... third=True) >>> data1 = StreamingData(values=[1, 2, 3], counter=21, ... timestamp=1) >>> data2 = StreamingData(values=[4, 5, 6], counter=22, ... timestamp=2) >>> filepath = Path("test.hdf5") >>> with Storage(filepath, channel3) as storage: ... storage.add_streaming_data(data1) ... storage.add_streaming_data(data2) ... # Normally the class takes care about when to store back ... # data to the disk itself. We do a manual flush here to ... # check the number of stored items. ... storage.acceleration.flush() ... print(storage.acceleration.nrows) 6 >>> filepath.unlink()
Store streaming data for three channels
>>> all = StreamingConfiguration(first=True, second=True, ... third=True) >>> data1 = StreamingData(values=[1, 2, 3], counter=21, ... timestamp=1) >>> data2 = StreamingData(values=[4, 5, 6], counter=22, ... timestamp=2) >>> with Storage(filepath, all) as storage: ... storage.add_streaming_data(data1) ... storage.add_streaming_data(data2) ... storage.acceleration.flush() ... print(storage.acceleration.nrows) 2 >>> filepath.unlink()
- dataloss()
Determine (minimum) data loss
- Return type:
float- Returns:
Amount of lost messages divided by all messages (lost and retrieved)
Examples
Import required library code
>>> from math import isclose
Calculate data loss for an example storage file
>>> def calculate_dataloss(): ... filepath = Path("test.hdf5") ... with Storage(filepath, StreamingConfiguration( ... first=True)) as storage: ... for counter in range(256): ... storage.add_streaming_data( ... StreamingData(values=[1, 2, 3], ... counter=counter, ... timestamp=counter/10)) ... for counter in range(128, 256): ... storage.add_streaming_data( ... StreamingData(values=[4, 5, 6], ... counter=counter, ... timestamp=(255 + counter)/10)) ... ... dataloss = storage.dataloss() ... filepath.unlink() ... return dataloss >>> isclose(0.25, calculate_dataloss(), rel_tol=0.005) True
- dataloss_stats()
Determine number of lost and received messages
- Return type:
tuple[int,int]- Returns:
Tuple containing the number of received and the number of lost messages
Examples
Get data loss statistics for an example file
>>> def calculate_dataloss_stats(): ... filepath = Path("test.hdf5") ... with Storage(filepath,StreamingConfiguration( ... first=True)) as storage: ... for counter in range(256): ... storage.add_streaming_data( ... StreamingData(values=[1, 2, 3], ... counter=counter, ... timestamp=counter/10)) ... for counter in range(128, 256): ... storage.add_streaming_data( ... StreamingData(values=[4, 5, 6], ... counter=counter, ... timestamp=(255 + counter)/10)) ... ... stats = storage.dataloss_stats() ... filepath.unlink() ... return stats >>> retrieved, lost = calculate_dataloss_stats() >>> retrieved 384 >>> lost 128
- measurement_time()
Get the measurement time
Note
This method returns the value of the last timestamp. If there was dataloss at the end of the measurement, then the real measurement time might have been longer.
- Return type:
int- Returns:
The measurement time in microseconds
- sampling_frequency()
Calculate sampling frequency of measurement data
- Return type:
float- Returns:
Sampling frequency (of a single data channel) in Hz
Examples
Import required library code
>>> from math import isclose
Calculate sampling frequency for an example file
>>> def calculate_sampling_frequency(): ... filepath = Path("test.hdf5") ... with Storage(filepath, StreamingConfiguration( ... first=True, second=True, third=True)) as storage: ... storage.add_streaming_data( ... StreamingData(values=[1, 2, 3], ... counter=1, ... timestamp=0)) ... storage.add_streaming_data( ... StreamingData(values=[1, 2, 3], ... counter=2, ... timestamp=1)) ... ... sampling_frequency = storage.sampling_frequency() ... filepath.unlink() ... return sampling_frequency >>> calculate_sampling_frequency() 2.0
- write_sample_rate(adc_configuration)
Store the sample rate of the ADC
- Parameters:
adc_configuration (
ADCConfiguration) – The current ADC configuration of the sensor node- Return type:
None
Examples
Write and read sample rate metadata in example HDF file
>>> filepath = Path("test.hdf5") >>> adc_configuration = ADCConfiguration( ... set=True, ... prescaler=2, ... acquisition_time=16, ... oversampling_rate=256) >>> with Storage(filepath, ... StreamingConfiguration(first=True)) as storage: ... storage.write_sample_rate(adc_configuration) ... sample_rate = storage.acceleration.attrs["Sample_Rate"] ... print(sample_rate) 1724.14 Hz (Prescaler: 2, Acquisition Time: 16, Oversampling Rate: 256) >>> filepath.unlink()
- write_sensor_range(sensor_range_in_g)
Add metadata about sensor range
This method assumes that sensor have a symmetric measurement range (e.g. a sensor with a range of 200 g measures from - 100 g up to + 100 g).
- Parameters:
sensor_range_in_g (
float) – The measurement range of the sensor in multiples of g- Return type:
None
Examples
Add sensor range metadata to example file
>>> filepath = Path("test.hdf5") >>> with Storage(filepath, ... StreamingConfiguration(third=True)) as storage: ... storage.write_sensor_range(200) ... print(storage.acceleration.attrs["Sensor_Range"]) ± 100 g₀ >>> filepath.unlink()
ADC
- class icotronic.can.adc.ADCConfiguration(*data, set=None, prescaler=None, acquisition_time=None, oversampling_rate=None, reference_voltage=None)
Support for reading and writing analog digital converter configuration
- Parameters:
*data (
bytearray|list[int]) – A list containing the (first five) bytes of the ADC configurationset (
bool|None) – Specifies if we want to set or retrieve (get) the ADC configurationprescaler (
int|None) – The ADC prescaler value (1 – 127); default: 2acquisition_time (
int|None) – The acquisition time in number of cycles (1, 2, 3, 4, 8, 16, 32, … , 256); default: 8oversampling_rate (
int|None) – The ADC oversampling rate (1, 2, 4, 8, … , 4096); default: 64reference_voltage (
float|None) – The ADC reference voltage in Volt (1.25, 1.65, 1.8, 2.1, 2.2, 2.5, 2.7, 3.3, 5, 6.6); default: 3.3
Examples
Create simple ADC configuration from scratch
>>> ADCConfiguration(prescaler=2, ... acquisition_time=4, ... oversampling_rate=64) Get, Prescaler: 2, Acquisition Time: 4, Oversampling Rate: 64, Reference Voltage: 3.3 V
- property acquisition_time: int
Get the acquisition time
- Returns:
The acquisition time
Examples
Get initialised acquisition time
>>> config = ADCConfiguration(acquisition_time=2) >>> config.acquisition_time 2
Get acquisition time set via property
>>> config.acquisition_time = 128 >>> config.acquisition_time 128
Trying to set an unsupported acquisition time will fail
>>> config.acquisition_time = 5 Traceback (most recent call last): ... ValueError: Acquisition time of “5” out of range, please use ...
- property oversampling_rate: int
Get the oversampling rate
- Returns:
The oversampling rate
Examples
Get initialised oversampling rate
>>> config = ADCConfiguration(oversampling_rate=128) >>> config.oversampling_rate 128
Get oversampling rate set via property
>>> config.oversampling_rate = 512 >>> config.oversampling_rate 512
Trying to set an unsupported oversampling rate will fail
>>> config.oversampling_rate = 3 Traceback (most recent call last): ... ValueError: Oversampling rate of “3” out of range, please use ...
- property prescaler: int
Get the prescaler value
- Returns:
The prescaler value
Examples
Get initialised prescaler
>>> config = ADCConfiguration(prescaler=127) >>> config.prescaler 127
Get prescaler set via property
>>> config.prescaler = 20 >>> config.prescaler 20
Trying to set an unsupported prescaler will fail
>>> config.prescaler = 128 Traceback (most recent call last): ... ValueError: Prescaler value of “128” out of range, please use ...
- property reference_voltage: float
Get the reference voltage
- Returns:
The reference voltage in Volt
Examples
Set and get different reference voltage values
>>> config = ADCConfiguration(reference_voltage=3.3) >>> config.reference_voltage 3.3 >>> config.reference_voltage = 6.6 >>> config.reference_voltage 6.6
>>> config = ADCConfiguration(reference_voltage=6.6) >>> config.reference_voltage 6.6 >>> config.reference_voltage = 1.8 >>> config.reference_voltage 1.8
>>> config = ADCConfiguration(reference_voltage=1.8) >>> config.reference_voltage 1.8
Trying to set a unsupported reference voltage will fail
>>> config.reference_voltage = 0 Traceback (most recent call last): ... ValueError: Reference voltage of “0” V out of range, please use ...
- sample_rate()
Calculate the sampling rate for the current ADC configuration
- Return type:
float- Returns:
The calculated sample rate
Examples
Get sampling rates based on ADC attribute values
>>> round(ADCConfiguration(prescaler=2, acquisition_time=8, ... oversampling_rate=64).sample_rate()) 9524
>>> round(ADCConfiguration(prescaler=8, acquisition_time=8, ... oversampling_rate=64).sample_rate()) 3175
>>> round(ADCConfiguration(reference_voltage=5.0, ... prescaler=16, ... acquisition_time=8, ... oversampling_rate=128).sample_rate()) 840
Sensor Configuration
- class icotronic.can.SensorConfiguration(first=0, second=0, third=0)
Used to store the configuration of the three sensor channels
- Parameters:
first (
int) – The sensor number for the first measurement channelsecond (
int) – The sensor number for the second measurement channelthird (
int) – The sensor number for the third measurement channel
Examples
Create an example sensor configuration
>>> SensorConfiguration(first=0, second=1, third=2) M1: None, M2: S1, M3: S2
Initializing a sensor configuration with incorrect values will fail
>>> SensorConfiguration(first=256, second=1, third=2) Traceback (most recent call last): ... ValueError: Incorrect value for first channel: “256”
>>> SensorConfiguration(first=0, second=1, third=-1) Traceback (most recent call last): ... ValueError: Incorrect value for third channel: “-1”
- check()
Check that at least one measurement channel is enabled
- Raises:
ValueError – if none of the measurement channels is enabled
Examples
>>> SensorConfiguration(second=1).check() >>> SensorConfiguration().check() Traceback (most recent call last): ... ValueError: At least one measurement channel has to be enabled
- disable_channel(first=False, second=False, third=False)
Disable certain (measurement) channels
- Parameters:
first (
bool) – Specifies if the first measurement channel should be disabled or notsecond (
bool) – Specifies if the second measurement channel should be disabled or notthird (
bool) – Specifies if the third measurement channel should be disabled or not
- Return type:
None
- empty()
Check if the sensor configuration is empty
In an empty sensor configuration all of the channels are disabled.
- Return type:
bool- Returns:
True, if all channels are disabled,Falseotherwise
Examples
Check if some example configurations are empty or not
>>> SensorConfiguration(first=3).empty() False >>> SensorConfiguration().empty() True >>> SensorConfiguration(third=0).empty() True
- property first: int
Get the sensor for the first channel
- Returns:
The sensor number of the first channel
Examples
Get the sensor number for the first channel of a sensor config
>>> SensorConfiguration(first=1, second=3, third=2).first 1
- requires_channel_configuration_support()
Check if the sensor configuration requires channel config support
- Return type:
bool- Returns:
True, if the configuration requires hardware that has support for changing the channel configurationFalse, otherwise
Examples
Check if example sensor configs require channel config support
>>> SensorConfiguration(first=1, second=3, third=2 ... ).requires_channel_configuration_support() True
>>> SensorConfiguration(first=1, second=0, third=1 ... ).requires_channel_configuration_support() True
>>> SensorConfiguration(first=1, second=2, third=3 ... ).requires_channel_configuration_support() False
>>> SensorConfiguration().requires_channel_configuration_support() False
- property second: int
Get the sensor for the second channel
- Returns:
The sensor number of the second channel
Examples
Get the sensor number for the second channel of a sensor config
>>> SensorConfiguration(first=1, second=3, third=2).second 3
- streaming_configuration()
Get a streaming configuration that represents this config
- Return type:
- Returns:
A stream configuration where
every channel that is enabled in the sensor configuration is enabled, and
every channel that is disables in the sensor configuration is disabled.
Examples
Get the streaming configuration for some example channel configs
>>> SensorConfiguration(second=1).streaming_configuration() Channel 1 disabled, Channel 2 enabled, Channel 3 disabled
>>> SensorConfiguration(first=10, third=2 ... ).streaming_configuration() Channel 1 enabled, Channel 2 disabled, Channel 3 enabled
- property third: int
Get the sensor for the third channel
- Returns:
The sensor number of the third channel
Examples
Get the sensor number for the third channel of a sensor config
>>> SensorConfiguration(first=1, second=3, third=2).third 2
Examples
For code examples, please check out the examples directory:
Read STH Name: Read the name of the „first“ available STH (sensor node id 0).
Read Data Points: Read five acceleration messages (5 · 3 = 15 values) and print their string representation (value, timestamp and message counter)
Store Data as HDF5 file: Read five seconds of acceleration data and store it as HDF5 file
Concurrent Access to Sensor Node Data: Show how to read the current ADC configuration, while streaming is active