Skip to content

First loop

Let's write your first MAPE loop. You'll write a simplified loop of an ambulance that switch on/off the siren and increase speed in case of an emergency detected.

Ambulance loop

You'll implement this simple ambulance loop, composed of 3 connected elements:

  • Monitor detect
  • Planner custom_policy
  • Executer exec

bare ambulance mape diagram

In the next sections this graphical notation will be more clear.

import mape

mape.init() # (1)

# Ambulance loop definition, named "ambulance"
loop = mape.Loop(uid='ambulance') # (2)

# Monitor element of ambulance loop
@loop.monitor # (3)
def detect(emergency, on_next): # (4)
    on_next(emergency)
  1. mape.init accept different params to setup PyMAPE, allowing configuration by a config file and/or directly in the source code.
  2. uid is an unique identification for the defined loop. If you don't pass an uid, system provide a random one for you.
  3. Decorator create and register the new element monitor to the ambulance loop.
  4. The function name detect is used as uid of our element inside the ambulance loop.

A this point, you have a loop (ambulance) made up of a single monitor element called detect.

Loop and Element uid

Loop uid ambulance must be unique in the whole app, instead detect (monitor uid) must be so only within the loop to which it belongs. This allow to address an element by its path (eg. ambulance.detect).

detect() function is the monitor element. It accepts at least two positional params:

  • stream item (emergency) - is the input value of the element. The elements talk with each other by streams following the ReactiveX philosophy.
  • on_next - is the function to call with the value to pass to the next linked elements (subscribed()). It can be called 0 or N times, you haven't to confused with the function return concept.
type of detect

detect is no more a simple function but an object, instance of class Monitor.

Go ahead with a plan for add policies in case of an emergency detected.

@loop.plan(uid='custom_policy') # (1)
def policy(emergency, on_next, self):
    # Emergency plans
    if emergency is True:
        on_next({'speed': self.emergency_speed}) # (2)
        on_next({'siren': True})
    else:
        on_next({'speed': 80})
        on_next({'siren': False})

policy.emergency_speed = 180
  1. Differently from the monitor, here we choose the element uid (custom_policy instead of policy).
  2. Use the object property emergency_speed set externally.

In this case, policy() has a third param self. When you add it to the function signature, you can directly access to the Element object behind the scene.

This time, respect previous detect(), you pass a dict to the next linked element.

The last element defined and registered is the execute element, exec applies the plan on the managed system (ie. ambulance), in our case simply print an output.

@loop.execute
def exec(item, on_next):
    if 'speed' in item:
        print(f"Speed: {item['speed']} Km/h")
    if 'siren' in item:
        print(f"Siren is {'ON' if item['siren'] else 'OFF'}")

The last step to complete ambulance loop is to connect (subscribe()) the three elements:

detect.subscribe(policy)
policy.subscribe(exec)
from mape import operators as ops # (1)

# Alternative way
detect.pipe(
  ops.through(policy),
  ops.through(exec)
).subscribe()
  1. ops are all the RxPY operators plus some more extras.

You are finally ready to try your ambulance emergency system, sending a detected emergency to the monitor element detect:

detect(True) # (1)
  1. Just call the detect() function and pass the item as param.

As you can see, nothing happens!

This is the wanted behaviour, because you have to start element detect whenever you are ready to receive the stream.

Different elements behaviour

Elements can have 3 different behaviors with respect to the stream passing through them:

  • manual start (monitor): the element cannot send or receive items until the .start() method invocation. This is often related to internal resource lifetime (eg. DB connection, socket, ...). It allows to connect all loops and elements before start receiving the stream.
  • start on subscribe (analyzer, planner): the element will be started when someone is interested to it (ie. subscribed()).
  • start on init (executer): the element is ready from its initialization.

So, we start the monitor element and simulate an emergency:

detect.start()
detect(True)

Get the results:

Speed: 180
Siren is ON

Again, ending the emergency (ie. False):

detect(False)

Get the results:

Speed: 80
Siren is OFF

Operators

Inspired by the functional programming, there are plenty operators available in RxPY and directly usable in PyMAPE. They can be chained in a pipeline, allowing transformations, combinations, filtering, grouping, error managing, applying conditions, making math operations, and so on.

There are more than 140 operators (about 400 variants), and we advise and encourage the creation of your custom operators following your need.

Filters

In the ambulance example there is an issue. If the monitor collect an emergency multiple time, all the following element (planner and executer) are still invoked.

detect(True)
# Speed: 180
# Siren is ON
detect(True)
# Speed: 180
# Siren is ON
# ...
Same on emergency end
detect(False)
# Speed: 80
# Siren is OFF
detect(False)
# Speed: 80
# Siren is OFF
# ...

To solve the problem, we can put a filter at the input of planner (or the output of monitor):

@loop.plan(uid='custom_policy', ops_in=ops.distinct_until_changed()) # (1)
def policy(emergency, on_next, self):
    # Emergency plans
    if emergency is True:
        on_next({'speed': self.emergency_speed}) # (2)
        on_next({'siren': True})
    else:
        on_next({'speed': 80})
        on_next({'siren': False})
  1. Simply use the param ops_out to put the operator on output.
# In alternative we can add filter in the pipe chain
detect.pipe(
    ops.distinct_until_changed(),
    ops.through(policy),
    ops.through(exec)
).subscribe()

distinct_until_changed() allows sequence that contains only distinct contiguous item.

A better example

Ambulance can be not the best example to understand the use of distinct_until_changed(), but try to image a monitor that use polling to read state of managed system. The values read can be the same until the system change the state (eg. motion detection, ambient temprature, ...).

Loop and Element access

PyMAPE provide a simplified access to his main entities. For example, you can iterate:

# Iterate over all loops and elements defined in the app
for loop in mape.app:
    print(f"* {loop.uid}")
    for element in loop:
        print(f" - {element.uid}")

Returns:

* ambulance
  - detect
  - custom_policy
  - exec

...or use python list comprehension to retrieve particular elements:

# Get all execute elements
[element.uid for element in mape.app.ambulance if isinstance(element, mape.Execute)]

Returns:

['exec']

You can chain the loop and element uid (dot-notation path) to have direct access to the elements:

mape.app.ambulance.detect
# return monitor detect object

Same result is obtained with:

mape.app['ambulance.detect'] # (1)

  1. Allowing use of variable as dot-notation path.

Debug

Some tips for debug your loops and elements crossed by the streams.

Internal library debug

To enable PyMAPE debug (ie. verbose information), remember to configure your logger. At least:

import logging

logging.basicConfig(level=logging.INFO) # (1)
logging.getLogger('mape').setLevel(logging.DEBUG)
  1. Set level for "root" module logger
  2. Set level for PyMAPE internal logger

or enable PyMAPE debug, simply:

logger = init_logger(lvl=logging.INFO) # (1)
mape.init(debug=True)
  1. Use only if you want try our internal logger settings.

Element

You can print the in and/or out items stream of an element:

element.debug(mape.Element.Debug.IN)
element.debug(mape.Element.Debug.OUT)
from mape import Element
element.debug(Element.Debug.IN | Element.Debug.OUT)
element.debug() # (1)
  1. element.debug(Element.Debug.DISABLE) is equivalent
Sample debug output

detect(True)

return:

# detect(True)
Speed: 180
(9373) on next: {'speed': 180} | in > [ambulance.exec]
(2472) on next: {'speed': 180} | [ambulance.custom_policy] > out
Siren is ON
(9373) on next: {'siren': True} | in > [ambulance.exec]
(2472) on next: {'siren': True} | [ambulance.custom_policy] > out
(2073) on next: True | in > [ambulance.custom_policy]
(2388) on next: True | strem out of detect()
(2616) on next: True | [ambulance.detect] > out

The history order is due the recursive call, on which ReactiveX is based. If necessary this beahviour can be "fixed" in the next realease.

It use PyMAPE internal logger.

LogObserver

You can create a fake element (LogObserver) and attach it to real element. The LogObserver doesn't modify the stream but simply print it.

If you want print all the stream output from the montiro, you can modify the loop subscription of ambulance like this:

detect.subscribe(LogObserver("stream out detect"))
detect.subscribe(policy)
policy.subscribe(exec)
Sample debug output

detect(True)

return:

(5393) on next: True | stream out detect
Speed: 180
Siren is ON

It use "root" module logger or print() if not exist.