Creating a Custom Source¶
The primary purpose of ExoEdge is to provide the "plumbing" for getting channel data into ExoSense.
Without a config provided to a running edged
process \(the process name for ExoEdge, pronounced edge-dee, as in 'edge daemon'\), nothing happens. The edged
process needs channels to do anything. The relationship between channels in edged
and a custom source is mainly via the protocol_config.application
field of a channel definition. When an ExoSense user selects a given Application \(e.g. CANOpen, Modbus\_TCP, etc.\) during the channel configuration of an Asset, the value is assigned to protocol_config.application
. When the channel is added \(saved\) to the Asset in ExoSense, the resulting config_io
JSON object is synced down to the edged:ConfigIO
thread.
Creating an ExoEdge Source¶
There are two types of sources for data in ExoEdge: ones that are implemented and supported by Exosite \(e.g. Modbus\_TCP, Modbus\_RTU, ExoSimulator, etc.\), and ones that are provided by installed Python packages and modules.
Creating a "source" is as simple as writing Python package or module.
Classic Source¶
A "classic" style source is one that just utilizes the built-in import logic of ExoEdge. In a "classic" source, a function is imported from a module and its value is returned as channel data.
The source, below, illustrates that ExoEdge imports the my_source
module and calls the function minutes_from_now
with parameter 30
every second. This module needs to be installed like any other Python module or package \(i.e. python setup.py install
, pip install my_source
, etc.\).
ExoEdge Source¶
# in my_source/__init__.py
import time
def minutes_to_seconds(min):
return min * 60
def minutes_from_now(minutes=0):
# Returns the timestamp `minutes` after the current time.
return time.time() + minutes_to_seconds(minutes)
ExoSense Configuration
{
"channels": {
"001": {
"display_name": "30 Minutes from Now",
...,
"protocol_config": {
"application": "Custom Application",
"report_on_change": false,
"report_rate": 1000,
"sample_rate": 1000,
"down_sample": "ACT",
"app_specific_config": {
"module": "my_source",
"function": "minutes_from_now",
"parameters": {
"minutes": 30
},
"positionals": []
}
...
}
}
}
}
The module that is imported is specified on app_specific_config
as is the function and any positional or parametric arguments the the function.
ExoEdgeSource Case¶
The example, below, utilizes the ExoEdgeSource
class provided by ExoEdge.
In order for ExoEdge to effectively use your custom ExoEdgeSource
, you must abide by the following rules when creating the new source:
- Sources must subclass
exoedge.sources.ExoEdgeSource
. - Lowercased, the module name must be prefixed by
exoedge_
\(e.g.exoedge_canopen
, etc.\). - Titlecased, the class name must be suffixed by "ExoEdgeSource" \(e.g.
CanopenExoEdgeSource
\). - Modbus_TCP and _RTU are the exception, they get stripped to just [Mm]odbus.
NOTE: BACnet might also become an exception and will require a PR against lib_exoedge_python
if the protocol_config.application
field distinguishes between BACnet_MSTP
and BACnet_IP
in the same way Modbus does. But this would require a PR against the exoedge.config_io.ConfigIO.add_source()
function in the core lib_exoedge_python
library.
- Source class name must not be
ExoEdgeSource
.
Examples:
protocol_config.application
=MySource
- Python package name =
exoedge_mysource
. - Python class name =
MysourceExoEdgeSource
Application Name in ExoSense | Python Package Name | Python Class Name |
---|---|---|
CANOpen | exoedge_canopen | CanopenExoEdgeSource |
Modbus_TCP | exoedge_modbus | ModbusExoEdgeSource |
CSVReader | exoedge_csvreader | CsvreaderExoEdgeSource |
ExoEdge Source
# exoedge_example/__init__.py
# -*- coding: utf-8 -*-
# pylint: disable=W1202
import sys
import time
import datetime
from exoedge.sources import ExoEdgeSource
from exoedge import logger
# set logging levels to whatever exoedge logging levels are set to
LOG = logger.getLogger(__name__, level=logging.getLogger('exoedge').getEffectiveLevel())
def sixteen():
return 16
class ExampleExoEdgeSource(ExoEdgeSource):
def minutes_from_now(self, minutes=0.0):
return datetime.datetime.fromtimestamp(float(minutes)*60 + time.time()).strftime('%c')
def run(self):
example_source_channels = self.get_channels_by_application('Example')
LOG.critical("Starting with channels: {}".format(example_source_channels))
while True:
for channel in example_source_channels:
if channel.is_sample_time():
func = channel.protocol_config.app_specific_config['function']
if hasattr(sys.modules.get(__name__), func):
function = getattr(sys.modules[__name__], func)
par = channel.protocol_config.app_specific_config['parameters']
pos = channel.protocol_config.app_specific_config['positionals']
LOG.warning("calling '{}' with: **({})"
.format(function, par))
try:
channel.put_sample(function(*pos, **par))
except Exception as exc: # pylint: disable=W0703
LOG.warning("Exception".format(format_exc=exc))
channel.put_channel_error(exc)
elif hasattr(self, func):
function = getattr(self, func)
par = channel.protocol_config.app_specific_config['parameters']
pos = channel.protocol_config.app_specific_config['positionals']
LOG.warning("calling '{}' with: **({})"
.format(function, par))
try:
channel.put_sample(function(*pos, **par))
except Exception as exc: # pylint: disable=W0703
LOG.warning("Exception".format(format_exc=exc))
channel.put_channel_error(exc)
else:
channel.put_channel_error(
'MySource has no function: {}'.format(func))
time.sleep(0.01) # throttle a bit to lay off the processor
ExoSense Configuration
{
"channels": {
"sixteen": {
"display_name": "The number 16, according to the example_source.",
"protocol_config": {
"report_on_change": false,
"report_rate": 5000,
"application": "Classic",
"app_specific_config": {
"function": "sixteen",
"parameters": {},
"positionals": []
},
"sample_rate": 5000,
"down_sample": "ACT"
}
},
"thirty_minutes_from_now": {
"display_name": "The time, thirty minutes from now, according to the gateway.",
"protocol_config": {
"report_on_change": false,
"report_rate": 5000,
"application": "Example",
"app_specific_config": {
"function": "minutes_from_now",
"parameters": {
"minutes": 30
},
"positionals": []
},
"sample_rate": 5000,
"down_sample": "ACT"
}
}
}
}
For more in-depth information on creating Python modules and packages, see the python packages docs.
Passing Parameters¶
Some application functions take keyword arguments which are passed in the parameters section of the app_specific_config
object in config_io
. For instance, a function random_integer(lower=0, upper=10)
which returns—you guessed it—a random integer between it's lower
and upper
keyword arguments might have a parameters section like this:
In this case, supply them in protocol_config.app_specific_config['parameters']
.
"parameters": {
"lower": 100,
"upper": 200
}
Other application functions accept positional arguments. In this case, supply them in protocol_config.app_specific_config['positionals']
.
"positionals": ["arg1", "foo", 15]
FAQ¶
Why Do I Need To Subclass exoedge.sources.ExoEdgeSource
?¶
The primary use-case that drives the requirement on subclassing ExoEdgeSource
is that the channel definitions in config_io
can change at any time. The way ExoEdge deals with this is that when it gets a new config \(i.e. config_io
, config_applications
, etc.\) it deletes its list of channels, stops all "sources" \(which are commonly comprised of 1 to N threads or timers\) and creates new ones.
Subclassing ExoEdgeSource
provides edged
with the methods to start and stop all sources when configs are added/changed.
I Need More Than One Thread In My Source¶
The only limit to the number of threads you can use when designing an ExoEdgeSource
is made by the host system \(hardware\).
If you need multiple threads for any reason, then override the ExoEdgeSource
's start_source()
and stop_source()
methods with the logic for starting them \(if necessary\) and, more importantly, stopping them.
The example below illustrates how one might override the stop and start methods.
class MycustomExoEdgeSource(ExoEdgeSource):
CONDITION_MET = False
RESOURCES = {
"devices":['/dev/ttyC1'],
"threads": []
}
def start_source(self):
while not self.CONDITION_MET:
LOG.warning(
"Waiting for CONDITION before starting source..."
)
sleep(0.01)
for resource in self.RESOURCES['devices']:
self.RESOURCES["threads"].append(self.start_thread(resource))
super(self, self).start_source()
def stop_source(self):
while self.CONDITION_MET:
LOG.warning(
"Waiting for CONDITION before starting source..."
)
sleep(0.01)
for thread in self.RESOURCES['threads']:
thread.stop()
super(self, self).stop_source()
def resource_parser(self, read_object):
return read_object.split(',')[1]
def run(self):
"""
Custom (a)syncronous logic here.
"""
# method proveded by ExoEdgeSource base-class
channels = self.get_channels_by_application(
'MyCustom'
)
while not self.RESOURCES['devices'][0].is_ready():
sleep(0.5)
while True:
for channel in channels:
try:
channel.put_sample(
self.resource_parser(
self.RESOURCES['devices'][0].read()
)
)
except Exception as exc:
channel.put_channel_error(exc)
sleep(0.1)
Note
For a thread class that can be "stopped", see the murano_client.client.StoppableThread
class.