Skip to content

Remote package

One important point of PyMAPE is the capability of decentralize (functionalities and data) and distribute loops on more devices. These features are reached by the remote package.

Class diagram

PyMAPE tries to preserve the best flexibility to the user design choices. You know pros and cons of different communication architect based on request-response (eg. polling) respect to publish-subscribe (eg. data centralization). So there are some domains (and patterns) that prefer one approach over the other, or again hybrid one to exploit the advantage of each one.

You can choose between:

  • RESTful implementation, allowing reading and writing access to the elements of your application
  • Redis as DB and message broker, allowing the communication between elements and also as shared memory (Knowledge) for distribute nodes.

Remote package

Observable and Observer

In the class diagram there are some class with the post-fix "Observer" and "Observable", key concepts in the ReactiveX library. Also the element ports use the same idea. You can think them as sink and source for your stream.

Graphical Notation

Remote notation

REST

For enable the REST support you have to provide an host:port for the web server (Uvicorn used by FastAPI). For do that you can:

mape.init(rest_host_port="0.0.0.0:6060")
rest:
    host_port: 0.0.0.0:6060

Now you can get information about levels, loops, ad elements defined in your app, but mainly you can push items to any element port through a POST request (POSTObserver class).

API documentation

API documentation and a web ui for test is available:

Thanks to the FastAPI library.

Rest swagger ui

  • POSTObserver(base_url, path, port, serializer, session)

    It behaves like a sink, sending the stream to the host:port device selected and element selected by the path.

    base_url: str

    In the format host:port of the REST API web server target.

    path: str

    Element path (loop_uid.element_uid) target

    port: Port = Port.p_in

    Destination port where inject the stream

    serializer = None

    session: aiohttp.ClientSession = None

Example

In the following example you see a communication between two distributed devices (Car_panda and Ambulance), specifically between the port out of detect element of car_panda (car_panda.detect) and port in of policy element of ambulance (ambulance.policy).

REST example

Translated in python on device Car_panda

from mape.remote.rest import POSTObserver

car = mape.Loop(uid='car_panda')

@car.monitor
def detect(item, on_next):
  ...

# Create the sink
ambulance_policy = POSTObserver("http://0.0.0.0:6060", "ambulance.policy")
# Connect detect to the sink
detect.subscribe(ambulance_policy)

and device Ambulance

# Enable REST support
mape.init(rest_host_port="0.0.0.0:6060")

ambulance = mape.Loop(uid='ambulance')
...
@ambulance.plan(ops_in=ops.distinct_until_changed())
def policy(item, on_next):
    ...  

Redis

For enable the Redis support you have to provide an url for your instance redis://localhost:6379. For do that you can:

mape.init(redis_url="redis://localhost:6379")
redis:
    url: redis://localhost:6379
Redis instance

PyMAPE doesn't provide (yet) an instance of Redis so you have to run your own, for example using a docker container:

$ docker run --name mape-redis -p 6379:6379 -v $(pwd)/docker/redis:/usr/local/etc/redis --rm redis redis-server /usr/local/etc/redis/redis.conf
oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
Redis version=6.2.6, bits=64, commit=00000000, modified=0, pid=1, just started
Configuration loaded
* monotonic clock: POSIX clock_gettime
* Running mode=standalone, port=6379.
Server initialized

Redis unlike REST have two class to allow stream communication (on distributed devices): PubObserve e SubObservable.

This mean that you have to choose a channel name, and publish/subscribe a stream on it.

Pattern matching

Redis supports channel pattern matching, allowing multi-points communication.

  • car_?.detect: can receive from car_a.detect, car_b.detect, etc...
  • car_[xy].detect: can receive only from car_x.detect, car_y.detect
  • car_*: can receive from car_foo, car_bar, etc...

Example

Let's implement the previous example with Redis.

Redis example

Translated in python on device Car_panda

from mape.remote.redis import PubObserver
# Enable Redis support
mape.init(redis_url="redis://localhost:6379")

car = mape.Loop(uid='car_panda')

@car.monitor
def detect(item, on_next):
  ...

# Publish detect output on channel named "car_panda.detect"
detect.subscribe(PubObserver(detect.path))

and device Ambulance

from mape.remote.redis import SubObservable
# Enable Redis support
mape.init(redis_url="redis://localhost:6379")

ambulance = mape.Loop(uid='ambulance')
...
@ambulance.plan(ops_in=ops.distinct_until_changed())
def policy(item, on_next):
    ...

# Subscribe to others cars 
# note: for clarity can be used "policy.port_in"
SubObservable("car_*.detect").subscribe(policy) # (1)
  1. if you have access to car detect element you can use f"car_*.{detect}" == f"car_*.{detect.uid}"

Knowledge

Redis provides a collection of native data types (Strings, lists, Sets, Hashes, Sorted Sets) and thanks to the redis-purse library we extend with Queue (FIFO, LIFO and Priority) and distributed Lock. The access to types is implemented by non-blocking I/O operation (async/await).

Async element definition

You can trasparently add async in front of your element defintion, allowing use await (as for redis-purse).

@loop.monitor
async def detect(emergency, on_next):
  await coroutine()
  ...

Example

Create and access to the cars Set in the global Knowledge.

# Create a Set in the global Knowledge (type string)
k_cars = mape.app.k.create_set("cars", str)
# Clean Set before
await k_cars.clear()
...

# Different device and base code
@loop.analyze
async def cars_store(car, on_next, self):
    if not hasattr(self, 'k_cars'):
      # Get access to the same Set
      self.k_cars = self.loop.app.k.create_set("cars", str)

    # Add {car.name} to the Set
    await self.k_cars.add(car.name)
    # Count cars in the Set
    car_count = await self.k_cars.len()
    on_next(car_count)

Attach and handler (on_cars_change()) called on change in the cars Set.

def on_cars_change(message): # (1)
  ...

# Register handler for add (sadd) / remove (srem) cars
self.loop.app.k.notifications(on_cars_change, 
                              "cars",
                              cmd_filter=('sadd', 'srem'))
  1. Simply define as async def on_cars_change(message) if you need. notifications() is smart to understand.

InfluxDB

As for REST and Redis, you have to configure it before use (config by mape.init is not available).

influxdb:
    url: http://localhost:8086
    username: user
    password: qwerty123456
    org: your-organization
    bucket: mape
    token: <GENERATE_OR_TAKE_FROM_CONFIG_YAML>
    debug: false
InfluxDB instance

PyMAPE doesn't provide (yet) an instance of InfluxDB so you have to run your own, for example using a docker container:

docker run --name mape-influxdb -p 8086:8086 \
-v $(pwd)/docker/influxdb/data:/var/lib/influxdb2 \
-v $(pwd)/docker/influxdb/conf:/etc/influxdb2 \
-e DOCKER_INFLUXDB_INIT_MODE=setup \
-e DOCKER_INFLUXDB_INIT_USERNAME=user \
-e DOCKER_INFLUXDB_INIT_PASSWORD=qwerty123456 \
-e DOCKER_INFLUXDB_INIT_ORG=univaq \
-e DOCKER_INFLUXDB_INIT_BUCKET=mape \
-e DOCKER_INFLUXDB_INIT_RETENTION=1w \
-e DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=*TOKEN* \
--rm influxdb:2.0

---> 100%

Now you can use the class InfluxObserver, a sink to publish stream in InfluxDB.

Example

Influxdb example

from mape.remote.influxdb import InfluxObserver

# Store element "detect" stream output 
detect.subscribe(InfluxObserver())
from mape.remote.influxdb import InfluxObserver

detect.subscribe(
  # All args are optional
  InfluxObserver(
    measurement="car",
    tags=("custom-tag", "value"),
    fields_mapper=lambda item: (item.type, item.value)
  )
)

mape.remote.influxdb.InfluxObserver

Bases: Observer

An Observer (Sink) where sent stream is stored in an InfluxDB instance.

More info on InfluxDB data elements.

Examples:

from mape.remote.influxdb import InfluxObserver

detect.subscribe(
  # All args are optional
  InfluxObserver(
    measurement="car",
    tags=("custom-tag", "value"),
    fields_mapper=lambda item: (item.type, item.value)
  )
)

Parameters:

Name Type Description Default
measurement str | None

The name of the measurement.

None
tags Iterable | Iterable[Iterable] | None

Tags include tag keys and tag values that are stored as strings and metadata. If not provided it tries to extreact information alone.

None
fields_mapper Callable | None

Function that return a Tuple or List of the field (key, value) given a stream item. The default mapper works with Message, dict with a "value" key, and simple base type.

None
bucket str | None

Taken from config when provided

None
is_raw bool

If True stream item must be an influxdb_client.Point.

False
write_options WriteOptions

Configure which type of writes client use (refer to https://influxdb-client.readthedocs.io/).

ASYNCHRONOUS
client InfluxDBClient | None

Leaving None the InfluxDBClient is instantiated for you.

None
Source code in mape/remote/influxdb/rx_utils.py
class InfluxObserver(Observer):
    """An Observer (Sink) where sent stream is stored in an InfluxDB instance.

    More info on [InfluxDB data elements](https://docs.influxdata.com/influxdb/v2.5/reference/key-concepts/data-elements/).

    Examples:
        ```python
        from mape.remote.influxdb import InfluxObserver

        detect.subscribe(
          # All args are optional
          InfluxObserver(
            measurement="car",
            tags=("custom-tag", "value"),
            fields_mapper=lambda item: (item.type, item.value)
          )
        )
        ```

    Args:
        measurement: The name of the measurement.
        tags: Tags include tag keys and tag values that are stored as strings and metadata.
            If not provided it tries to extreact information alone.
        fields_mapper: Function that return a `Tuple` or `List` of the field `(key, value)` given a stream item.
             The default mapper works with `Message`, `dict` with a "value" key, and simple base type.
        bucket: Taken from config when provided
        is_raw: If `True` stream item must be an `influxdb_client.Point`.
        write_options: Configure which type of writes client use (refer to [https://influxdb-client.readthedocs.io/]()).
        client: Leaving `None` the `InfluxDBClient` is instantiated for you.
    """
    def __init__(self,
                 measurement: str | None = None,
                 tags: Iterable | Iterable[Iterable] | None = None,
                 fields_mapper: Callable | None = None,
                 bucket: str | None = None,
                 is_raw: bool = False,
                 write_options: WriteOptions = ASYNCHRONOUS,
                 client: InfluxDBClient | None = None
                 ) -> None:
        from . import _config

        self._measurement = measurement
        self._tags = tags
        self._fields_mapper = fields_mapper or _fields_mapper
        self._bucket = bucket or _config['bucket']
        self._is_raw = is_raw
        self._write_options = write_options
        self._org = _config['org']
        self._client = client or InfluxDBClient(
            **{k: _config[k] for k in ('url', 'token', 'org', 'debug') if k in _config}
        )
        self._write_api = self._client.write_api(write_options=write_options)

        if self._tags and not isinstance(self._tags[0], (Tuple, List)):
            self._tags = (self._tags,)

        super().__init__()

    def _on_next_core(self, item: Any) -> None:
        if self._is_raw:
            self._write_api.write(self._bucket, self._org, record=item)
        else:
            measurement = self._measurement or type(item).__name__
            point = Point(measurement)

            if not self._tags and hasattr(item, '__dict__'):
                value_as_dict = item.__dict__
                tags = [(k, value_as_dict[k]) for k in value_as_dict if k not in ('value', 'timestamp')]
            else:
                tags = self._tags or list()

            for tag, value in tags:
                point.tag(tag, value)

            fields = self._fields_mapper(item)
            if not isinstance(fields[0], (Tuple, List)):
                fields = (fields,)

            for field, value in fields:
                point.field(field, value)

            logger.debug(f"InfluxDB write: {point.to_line_protocol()}")
            self._write_api.write(self._bucket, self._org, record=point)

    def dispose(self) -> None:
        self._client.__del__()
        super().dispose()

    def __del__(self):
        self.dispose()