Skip to content

Creating a Custom Source

The primary purpose of ExoEdge is to provide the "plumbing" for getting channel data into ExoSense.

Without a config provided to a running edged process \(the process name for ExoEdge, pronounced edge-dee, as in 'edge daemon'\), nothing happens. The edged process needs channels to do anything. The relationship between channels in edged and a custom source is mainly via the protocol_config.application field of a channel definition. When an ExoSense user selects a given Application \(e.g. CANOpen, Modbus\_TCP, etc.\) during the channel configuration of an Asset, the value is assigned to protocol_config.application. When the channel is added \(saved\) to the Asset in ExoSense, the resulting config_io JSON object is synced down to the edged:ConfigIO thread.

Creating an ExoEdge Source

There are two types of sources for data in ExoEdge: ones that are implemented and supported by Exosite \(e.g. Modbus\_TCP, Modbus\_RTU, ExoSimulator, etc.\), and ones that are provided by installed Python packages and modules.

Creating a "source" is as simple as writing Python package or module.

Classic Source

A "classic" style source is one that just utilizes the built-in import logic of ExoEdge. In a "classic" source, a function is imported from a module and its value is returned as channel data.

The source, below, illustrates that ExoEdge imports the my_source module and calls the function minutes_from_now with parameter 30 every second. This module needs to be installed like any other Python module or package \(i.e. python setup.py install, pip install my_source, etc.\).

ExoEdge Source

# in my_source/__init__.py
import time

def minutes_to_seconds(min):
  return min * 60

def minutes_from_now(minutes=0):
  # Returns the timestamp `minutes` after the current time.
  return time.time() + minutes_to_seconds(minutes)

ExoSense Configuration

{
  "channels": {
    "001": {
      "display_name": "30 Minutes from Now",
      ...,
      "protocol_config": {
        "application": "Custom Application",
        "report_on_change": false,
        "report_rate": 1000,
        "sample_rate": 1000,
        "down_sample": "ACT",
        "app_specific_config": {
          "module": "my_source",
          "function": "minutes_from_now",
          "parameters": {
            "minutes": 30
          },
          "positionals": []
        }
        ...
      }
    }
  }
}

The module that is imported is specified on app_specific_config as is the function and any positional or parametric arguments the the function.

ExoEdgeSource Case

The example, below, utilizes the ExoEdgeSource class provided by ExoEdge.

In order for ExoEdge to effectively use your custom ExoEdgeSource, you must abide by the following rules when creating the new source:

  1. Sources must subclass exoedge.sources.ExoEdgeSource.
  2. Lowercased, the module name must be prefixed by exoedge_ \(e.g. exoedge_canopen, etc.\).
  3. Titlecased, the class name must be suffixed by "ExoEdgeSource" \(e.g. CanopenExoEdgeSource\).
  4. Modbus_TCP and _RTU are the exception, they get stripped to just [Mm]odbus.

NOTE: BACnet might also become an exception and will require a PR against lib_exoedge_python if the protocol_config.application field distinguishes between BACnet_MSTP and BACnet_IP in the same way Modbus does. But this would require a PR against the exoedge.config_io.ConfigIO.add_source() function in the core lib_exoedge_python library.

  1. Source class name must not be ExoEdgeSource.

Examples:

  • protocol_config.application = MySource
  • Python package name = exoedge_mysource.
  • Python class name = MysourceExoEdgeSource
Application Name in ExoSense Python Package Name Python Class Name
CANOpen exoedge_canopen CanopenExoEdgeSource
Modbus_TCP exoedge_modbus ModbusExoEdgeSource
CSVReader exoedge_csvreader CsvreaderExoEdgeSource

ExoEdge Source

# exoedge_example/__init__.py

# -*- coding: utf-8 -*-
# pylint: disable=W1202
import sys
import time
import datetime
from exoedge.sources import ExoEdgeSource
from exoedge import logger

# set logging levels to whatever exoedge logging levels are set to
LOG = logger.getLogger(__name__, level=logging.getLogger('exoedge').getEffectiveLevel())

def sixteen():
    return 16

class ExampleExoEdgeSource(ExoEdgeSource):

    def minutes_from_now(self, minutes=0.0):
        return datetime.datetime.fromtimestamp(float(minutes)*60 + time.time()).strftime('%c')

    def run(self):

        example_source_channels = self.get_channels_by_application('Example')

        LOG.critical("Starting with channels: {}".format(example_source_channels))

        while True:

            for channel in example_source_channels:
                if channel.is_sample_time():
                    func = channel.protocol_config.app_specific_config['function']
                    if hasattr(sys.modules.get(__name__), func):
                        function = getattr(sys.modules[__name__], func)
                        par = channel.protocol_config.app_specific_config['parameters']
                        pos = channel.protocol_config.app_specific_config['positionals']
                        LOG.warning("calling '{}' with: **({})"
                                    .format(function, par))
                        try:
                            channel.put_sample(function(*pos, **par))
                        except Exception as exc: # pylint: disable=W0703
                            LOG.warning("Exception".format(format_exc=exc))
                            channel.put_channel_error(exc)
                    elif hasattr(self, func):
                        function = getattr(self, func)
                        par = channel.protocol_config.app_specific_config['parameters']
                        pos = channel.protocol_config.app_specific_config['positionals']
                        LOG.warning("calling '{}' with: **({})"
                                    .format(function, par))
                        try:
                            channel.put_sample(function(*pos, **par))
                        except Exception as exc: # pylint: disable=W0703
                            LOG.warning("Exception".format(format_exc=exc))
                            channel.put_channel_error(exc)
                    else:
                        channel.put_channel_error(
                            'MySource has no function: {}'.format(func))

            time.sleep(0.01) # throttle a bit to lay off the processor

ExoSense Configuration

{
  "channels": {
    "sixteen": {
      "display_name": "The number 16, according to the example_source.",
      "protocol_config": {
        "report_on_change": false,
        "report_rate": 5000,
        "application": "Classic",
        "app_specific_config": {
          "function": "sixteen",
          "parameters": {},
          "positionals": []
        },
        "sample_rate": 5000,
        "down_sample": "ACT"
      }
    },
    "thirty_minutes_from_now": {
      "display_name": "The time, thirty minutes from now, according to the gateway.",
      "protocol_config": {
        "report_on_change": false,
        "report_rate": 5000,
        "application": "Example",
        "app_specific_config": {
          "function": "minutes_from_now",
          "parameters": {
            "minutes": 30
          },
          "positionals": []
        },
        "sample_rate": 5000,
        "down_sample": "ACT"
      }
    }
  }
}

For more in-depth information on creating Python modules and packages, see the python packages docs.

Passing Parameters

Some application functions take keyword arguments which are passed in the parameters section of the app_specific_config object in config_io. For instance, a function random_integer(lower=0, upper=10) which returns—you guessed it—a random integer between it's lower and upper keyword arguments might have a parameters section like this:

In this case, supply them in protocol_config.app_specific_config['parameters'].

"parameters": {
  "lower": 100,
  "upper": 200
}

Other application functions accept positional arguments. In this case, supply them in protocol_config.app_specific_config['positionals'].

"positionals": ["arg1", "foo", 15]

FAQ

Why Do I Need To Subclass exoedge.sources.ExoEdgeSource?

The primary use-case that drives the requirement on subclassing ExoEdgeSource is that the channel definitions in config_io can change at any time. The way ExoEdge deals with this is that when it gets a new config \(i.e. config_io, config_applications, etc.\) it deletes its list of channels, stops all "sources" \(which are commonly comprised of 1 to N threads or timers\) and creates new ones.

Subclassing ExoEdgeSource provides edged with the methods to start and stop all sources when configs are added/changed.

I Need More Than One Thread In My Source

The only limit to the number of threads you can use when designing an ExoEdgeSource is made by the host system \(hardware\).

If you need multiple threads for any reason, then override the ExoEdgeSource's start_source() and stop_source() methods with the logic for starting them \(if necessary\) and, more importantly, stopping them.

The example below illustrates how one might override the stop and start methods.

class MycustomExoEdgeSource(ExoEdgeSource):
    CONDITION_MET = False
    RESOURCES = {
        "devices":['/dev/ttyC1'],
        "threads": []
    }

    def start_source(self):
        while not self.CONDITION_MET:
            LOG.warning(
                "Waiting for CONDITION before starting source..."
            )
            sleep(0.01)
        for resource in self.RESOURCES['devices']:
            self.RESOURCES["threads"].append(self.start_thread(resource))
        super(self, self).start_source()

    def stop_source(self):
        while self.CONDITION_MET:
            LOG.warning(
                "Waiting for CONDITION before starting source..."
            )
            sleep(0.01)
        for thread in self.RESOURCES['threads']:
            thread.stop()
        super(self, self).stop_source()

    def resource_parser(self, read_object):
        return read_object.split(',')[1]

    def run(self):
        """
            Custom (a)syncronous logic here.
        """
        # method proveded by ExoEdgeSource base-class
        channels = self.get_channels_by_application(
            'MyCustom'
        )
        while not self.RESOURCES['devices'][0].is_ready():
            sleep(0.5)
        while True:
            for channel in channels:
                try:
                    channel.put_sample(
                        self.resource_parser(
                            self.RESOURCES['devices'][0].read()
                        )
                    )
                except Exception as exc:
                    channel.put_channel_error(exc)

            sleep(0.1)

Note

For a thread class that can be "stopped", see the murano_client.client.StoppableThread class.