BF-SYSTEMS - State-based Systems

../../../../../../../_images/MLPro-BF-Systems_class_diagram.drawio.png

Ver. 2.0.0 (2023-05-dd)

This module provides models and templates for state based systems.

class mlpro.bf.systems.basics.State(p_state_space: MSpace, p_initial: bool = False, p_terminal: bool = False, p_success: bool = False, p_broken: bool = False, p_timeout: bool = False, **p_kwargs)

Bases: Instance, Element, TStamp

State of a system as an element of a given state space. Additionally, the state can be labeled with various properties.

Parameters:
  • p_state_space (MSpace) – State space of the related system.

  • p_initial (bool) – This optional flag signals that the state is the first one after a reset. Default=False.

  • p_terminal (bool) – This optional flag labels the state as a terminal state. Default=False.

  • p_success (bool) – This optional flag labels the state as an objective state. Default=False.

  • p_broken (bool) – This optional flag labels the state as a final error state. Default=False.

  • p_timeout (bool) – This optional flag signals that the cycle limit of an episode has been reached. Default=False.

get_initial() bool
set_initial(p_initial: bool)
get_success() bool
set_success(p_success: bool)
get_broken() bool
set_broken(p_broken: bool)
get_timeout() bool
set_timeout(p_timeout: bool)
get_terminal() bool
set_terminal(p_terminal: bool)
get_feature_data() Element
set_feature_data(p_feature_data: Element)
copy()

Returns a copy of the state element

Returns:

copied_state – The copy of original state object.

Return type:

State

class mlpro.bf.systems.basics.ActionElement(p_action_space: Set, p_weight: float = 1.0)

Bases: Element

Single entry of an action. See class Action for further details.

Parameters:
  • p_action_space (Set) – Related action space.

  • p_weight (float) – Weight of action element. Default = 1.0.

get_weight()
set_weight(p_weight)
class mlpro.bf.systems.basics.Action(p_agent_id=0, p_action_space: Set = None, p_values: ndarray = None)

Bases: ElementList, TStamp

Objects of this class represent actions of (multi-)agents. Every element of the internal list is related to an agent, and its partial subsection. Action values for the first agent can be added while object instantiation. Action values of further agents can be added by using method self.add_elem().

Parameters:
  • p_agent_id – Unique id of (first) agent to be added

  • p_action_space (Set) – Action space of (first) agent to be added

  • p_values (np.ndarray) – Action values of (first) agent to be added

get_agent_ids()
get_sorted_values() ndarray
class mlpro.bf.systems.basics.FctSTrans(p_logging=True)

Bases: Log

Template class for state transition functions.

Parameters:

p_logging – Log level (see class Log for more details). Default = Log.C_LOG_ALL.

C_TYPE = 'Fct STrans'
simulate_reaction(p_state: State, p_action: Action, p_t_step: timedelta = None) State

Simulates a state transition based on a state and action. Custom method _simulate_reaction() is called.

Parameters:
  • p_state (State) – System state.

  • p_action (Action) – Action to be processed.

Returns:

new_state – Result state after state transition.

Return type:

State

_simulate_reaction(p_state: State, p_action: Action, p_t_step: timedelta = None) State

Custom method for a simulated state transition. See method simulate_reaction() for further details.

class mlpro.bf.systems.basics.FctSuccess(p_logging=True)

Bases: Log

Template class for functions that determine whether or not a state is a success state.

Parameters:

p_logging – Log level (see class Log for more details). Default = Log.C_LOG_ALL.

C_TYPE = 'Fct Success'
compute_success(p_state: State) bool

Assesses the given state regarding success criteria. Custom method _compute_success() is called.

Parameters:

p_state (State) – System state.

Returns:

success – True, if given state is a success state. False otherwise.

Return type:

bool

_compute_success(p_state: State) bool

Custom method for assessment for success. See method compute_success() for further details.

class mlpro.bf.systems.basics.FctBroken(p_logging=True)

Bases: Log

Template class for functions that determine whether or not a state is a broken state.

Parameters:

p_logging – Log level (see class Log for more details). Default = Log.C_LOG_ALL.

C_TYPE = 'Fct Broken'
compute_broken(p_state: State) bool

Assesses the given state regarding breakdown criteria. Custom method _compute_success() is called.

Parameters:

p_state (State) – System state.

Returns:

broken – True, if given state is a breakdown state. False otherwise.

Return type:

bool

_compute_broken(p_state: State) bool

Custom method for assessment for breakdown. See method compute_broken() for further details.

class mlpro.bf.systems.basics.Sensor(p_name_short, p_base_set='R', p_name_long='', p_name_latex='', p_unit='', p_unit_latex='', p_boundaries: list = [], p_description='', p_symmetrical: bool = False, p_logging=False, **p_kwargs)

Bases: Dimension

Template for a sensor.

C_TYPE = 'Sensor'
class mlpro.bf.systems.basics.Actuator(p_name_short, p_base_set='R', p_name_long='', p_name_latex='', p_unit='', p_unit_latex='', p_boundaries: list = [], p_description='', p_symmetrical: bool = False, p_logging=False, **p_kwargs)

Bases: Dimension

Template for an actuator.

C_TYPE = 'Actuator'
class mlpro.bf.systems.basics.Controller(p_id, p_name: str = '', p_logging: bool = True, **p_kwargs)

Bases: EventManager

Template for a controller that enables access to sensors and actuators.

Parameters:
  • p_id – Unique id of the controller.

  • p_name (str) – Optional name of the controller.

  • p_logging – Log level (see class Log for more details). Default = Log.C_LOG_ALL.

  • p_kwargs (dict) – Further keyword arguments specific to the controller.

C_EVENT_COMM_ERROR

Event that is raised on a communication error

C_TYPE = 'Controller'
C_EVENT_COMM_ERROR = 'COMM_ERROR'
reset() bool

Resets the controller by calling custom method _reset().

Returns:

result – True, if successful. False otherwise. Additionally event C_EVENT_COMM_ERROR is raised.

Return type:

bool

_reset() bool

Custom reset method.

Returns:

result – True, if successful. False otherwise.

Return type:

bool

add_sensor(p_sensor: Sensor)

Adds a sensor to the controller.

Parameters:

p_sensor (Sensor) – Sensor object to be added.

get_sensors() Set

Returns the internal set of sensors.

Returns:

sensors – Set of sensors.

Return type:

Set

get_sensor(p_id) Sensor

Returns a sensor.

get_sensor_value(p_id)

Deternines the value of a sensor by calling custom method _get_sensor_value().

Parameters:

p_id – Id of the sensor.

Returns:

Current value of the sensor or None on a communication error. In that case, event C_EVENT_COMM_ERROR is raised additionally.

Return type:

value

_get_sensor_value(p_id)

Custom method to get a sensor value. See method get_sensor_value() for further details.

Parameters:

p_id – Id of the sensor.

Returns:

Current value of the sensor or None on a communication error.

Return type:

value

add_actuator(p_actuator: Actuator)

Adds an actuator to the controller.

Parameters:

p_actuator (Actuator) – Actuator object to be added.

get_actuators() Set

Returns the internal set of actuators.

Returns:

actuators – Set of actuators.

Return type:

Set

get_actuator(p_id) Actuator

Returns an actuator.

set_actuator_value(p_id, p_value) bool

Sets the value of an actuator by calling custom method _set_actuatur_value().

Parameters:
  • p_id – Id of the actuator.

  • p_value – New actuator value.

Returns:

successful – True, if successful. False otherwise. In that case, event C_EVENT_COMM_ERROR is raised additionally.

Return type:

bool

_set_actuator_value(p_id, p_value) bool

Custom method to set an actuator value. See method set_sensor_value() for further details.

Parameters:
  • p_id – Id of the actuator.

  • p_value – New actuator value.

Returns:

successful – True, if successful. False otherwise.

Return type:

bool

class mlpro.bf.systems.basics.SystemShared(p_range: int = 0)

Bases: Shared

A specialised shared object for managing IPC between MultiSystems.

TODO:

Entry Systems to be handled yet, that get action from outside.

Parameters:

p_range – The multiprocessing range for the specific process. Default is None.

_spaces

Spaces of all the systems registered in the Shared Object.

_states

States of all the systems registered in the Shared Object.

_actions

Corresponding Actions for all the systems.

_action_dimensions

All the dimensions present in the Shared Object.

_mappings

Mapping configurations for system to action mapping.

C_NAME = 'System Shared'
reset(p_seed: int = None)

Resets the shared object.

Parameters:

p_seed (int) – Seed for reproducibility.

update_state(p_sys_id, p_state: State) bool

Updates the states in the Shared Object.

Parameters:

p_state (State) – The id of the system, for which action is to be fetched.

Return type:

bool

_map_values(p_state: State = None, p_action: Action = None)

Updates the action values based on a new state, in a MultiSystem Context.

Parameters:
  • p_sys_id – Id of the system from which the state is received.

  • p_state (State) – The State of the system which affects the action.

get_actions()
get_action(p_sys_id) Action

Fetches the corresponding action for a particular system.

Parameters:

p_sys_id – The id of the system, for which action is to be fetched.

Returns:

action – The corresponding action for the system.

Return type:

Action

get_states()

Fetch the states of all the internal systems

Returns:

states – Returns the state of each of the system registered on the shared object.

Return type:

dict

get_state(p_sys_id) State

Fetches the state of a particular system from the Shared Object.

Parameters:

p_sys_id – The id of the system, of which state is to be fetched.

Returns:

state – The corresponding state of the system.

Return type:

State

_map(p_input_dim=None)

Maps a dimension to output dimension with info about output sys, output dim type and output dim.

Parameters:
  • p_sys_id – Id of the system for which action is to be mapped into the mappings.

  • p_state (State) – The State to be mapped into the ‘states’ dictionary.

Returns:

mapping – A tuple of tuples of System id and dimension id mappings from State to Action respectively.

Return type:

(output_sys, output_dim_type, output_dim)

register_system(p_sys_id=None, p_state_space: MSpace = None, p_action_space: MSpace = None, p_mappings=None)

Registers the system in the Shared Object and sets up the dimension to dimension mapping.

Parameters:
  • p_system (System) – The system to be registered.

  • p_mappings – Mappings corresponding the system in the form: ( (ip dim_type, op dim_type), (input_sys_id, input_dim_id) , (op_sys_id, op_dimension_id) )

class mlpro.bf.systems.basics.System(p_id=None, p_name: str = None, p_range_max: int = 0, p_autorun=0, p_class_shared=None, p_mode=0, p_latency: timedelta = None, p_t_step: timedelta = None, p_fct_strans: FctSTrans = None, p_fct_success: FctSuccess = None, p_fct_broken: FctBroken = None, p_mujoco_file=None, p_frame_skip: int = 1, p_state_mapping=None, p_action_mapping=None, p_camera_conf: tuple = (None, None, None), p_visualize: bool = False, p_logging=True, **p_kwargs)

Bases: FctSTrans, FctSuccess, FctBroken, Task, Mode, Plottable, Persistent, ScientificObject

Base class for state based systems.

Parameters:
  • p_mode – Mode of the system. Possible values are Mode.C_MODE_SIM(default) or Mode.C_MODE_REAL.

  • p_latency (timedelta) – Optional latency of the system. If not provided, the internal value of constant C_LATENCY is used by default.

  • p_fct_strans (FctSTrans) – Optional external function for state transition.

  • p_fct_success (FctSuccess) – Optional external function for state evaluation ‘success’.

  • p_fct_broken (FctBroken) – Optional external function for state evaluation ‘broken’.

  • p_mujoco_file – Path to XML file for MuJoCo model.

  • p_frame_skip (int) – Frame to be skipped every step. Default = 1.

  • p_state_mapping – State mapping if the MLPro state and MuJoCo state have different naming.

  • p_action_mapping – Action mapping if the MLPro action and MuJoCo action have different naming.

  • p_use_radian (bool) – Use radian if the action and the state based on radian unit. Default = True.

  • p_camera_conf (tuple) – Default camera configuration on MuJoCo Simulation (xyz position, elevation, distance).

  • p_visualize (bool) – Boolean switch for env/agent visualisation. Default = False.

  • p_logging – Log level (see class Log for more details). Default = Log.C_LOG_ALL.

_latency

Latency of the system.

Type:

timedelta

_state

Current state of system.

Type:

State

_prev_state

Previous state of system.

Type:

State

_last_action

Last action.

Type:

Action

_fct_strans

Internal state transition function.

Type:

FctSTrans

_fct_success

Internal function for state evaluation ‘success’.

Type:

FctSuccess

_fct_broken

Internal function for state evaluation ‘broken’.

Type:

FctBroken

C_TYPE = 'System'
C_LATENCY = datetime.timedelta(seconds=1)
C_PLOT_ACTIVE: bool = True
static setup_spaces()

Static template method to set up and return state and action space of environment.

Returns:

  • state_space (MSpace) – State space object

  • action_space (MSpace) – Action space object

_reduce_state(p_state: dict, p_path: str, p_os_sep: str, p_filename_stub: str)

An embedded MuJoCo system can not be pickled and needs to be removed from the pickle stream.

_complete_state(p_path: str, p_os_sep: str, p_filename_stub: str)

Custom method to complete the object state (=self) from external data sources. This method is called by standard method __setstate__() during unpickling the object from an external file.

Parameters:
  • p_path (str) – Path of the object pickle file (and further optional related files)

  • p_os_sep (str) – OS-specific path separator.

  • p_filename_stub (str) – Filename stub to be used for further optional custom data files

switch_logging(p_logging)

Sets new log level.

Parameters:

p_logging – Log level (constant C_LOG_LEVELS contains valid values)

get_latency() timedelta

Returns latency of the system.

set_latency(p_latency: timedelta = None) None

Sets latency of the system. If p_latency is None latency will be reset to internal value of attribute C_LATENCY.

Parameters:

p_latency (timedelta) – New latency value

get_state_space() MSpace
get_action_space() MSpace
get_fct_strans()

Returns the state transition function of the system, if exists, otherwise, the system itself.

Returns:

fct_strans – State transition function of the system, if exists. Otherwise, system itself.

Return type:

FctSTrans

get_fct_broken()

Returns the broken computation function of the system, if exists, otherwise, the system itself.

Returns:

fct_broken – Broken computation function of the system, if exists. Otherwise, system itself.

Return type:

FctBroken

get_fct_success()

Returns the Success computation function of the system, if exists, otherwise, the system itself.

Returns:

fct_success – Success computation function of the system, if exists. Otherwise, system itself.

Return type:

FctSuccess

set_random_seed(p_seed=None)

Resets the internal random generator using the given seed.

Parameters:

p_seed (int) – Seed parameter for an internal random generator

reset(p_seed=None) None

Resets the system to an initial state. If MuJoCo is not used, the custom method _reset() is called.

Parameters:

p_seed (int) – Seed parameter for an internal random generator

_reset(p_seed=None) None

Custom method to reset the system to an initial/defined state. Use method _set_status() to set the state.

Parameters:

p_seed (int) – Seed parameter for an internal random generator

add_controller(p_controller: Controller, p_mapping: list) bool

Adds a controller and a related mapping of states and actions to sensors and actuators.

Parameters:
  • p_controller (Controller) – Controller object to be added.

  • p_mapping (list) – A list of mapping tuples following the syntax ( [Type = ‘S’ or ‘A’], [Name of state/action] [Name of sensor/actuator] )

Returns:

successful – True, if controller and related mapping was added successfully. False otherwise.

Return type:

bool

get_state() State

Returns current state of the system.

_set_state(p_state: State)

Explicitly sets the current state of the system. Internal use only.

get_so() SystemShared

Returns the associated shared object.

Returns:

so – Shared object of type Shared (or inherited)

Return type:

Shared

_run(p_action: Action, p_t_step: timedelta = None)

Run method that runs the system as a task. It runs the process_action() method of the system with action as a parameter.

Parameters:

p_t_step (timedelta) – Time for which the system must be simulated.

process_action(p_action: Action, p_t_step: timedelta = None) bool

Processes a state transition based on the current state and a given action. The state transition itself is implemented in child classes in the custom method _process_action().

Parameters:
  • p_action (Action) – Action to be processed

  • p_t_step (timedelta) – The timestep for which the system is to be simulated

Returns:

success – True, if action processing was successfull. False otherwise.

Return type:

bool

_process_action(p_action: Action, p_t_step: timedelta = None) bool

Internal custom method for state transition with default implementation. To be redefined in a child class on demand. See method process_action() for further details.

simulate_reaction(p_state: State = None, p_action: Action = None, p_t_step: timedelta = None) State

Simulates a state transition based on a state and an action. The simulation step itself is carried out either by an internal custom implementation in method _simulate_reaction() or by an embedded external function.

Parameters:
  • p_state (State) – Current state.

  • p_action (Action) – Action.

Returns:

Subsequent state after transition

Return type:

State

_simulate_reaction(p_state: State, p_action: Action, p_step: timedelta = None) State

Custom method for a simulated state transition. Implement this method if no external state transition function is used. See method simulate_reaction() for further details.

action_to_mujoco(p_mlpro_action)

Action conversion method from converting MLPro action to MuJoCo action.

_action_to_mujoco(p_mlpro_action)

Custom method for to do transition between MuJoCo state and MLPro state. Implement this method if the MLPro state has different dimension from MuJoCo state.

Parameters:

p_mujoco_state (Numpy) – MLPro action.

Returns:

Modified MLPro action

Return type:

Numpy

state_from_mujoco(p_mujoco_state)

State conversion method from converting MuJoCo state to MLPro state.

_state_from_mujoco(p_mujoco_state)

Custom method for to do transition between MuJoCo state and MLPro state. Implement this method if the MLPro state has different dimension from MuJoCo state.

Parameters:

p_mujoco_state (Numpy) – MuJoCo state.

Returns:

Modified MuJoCo state

Return type:

Numpy

_import_state() bool
_export_action(p_action: Action) bool
compute_success(p_state: State) bool

Assesses the given state whether it is a ‘success’ state. Assessment is carried out either by a custom implementation in method _compute_success() or by an embedded external function.

Parameters:

p_state (State) – State to be assessed.

Returns:

success – True, if the given state is a ‘success’ state. False otherwise.

Return type:

bool

_compute_success(p_state: State) bool

Custom method for assessment for success. Implement this method if no external function is used. See method compute_success() for further details.

get_success() bool
compute_broken(p_state: State) bool

Assesses the given state whether it is a ‘broken’ state. Assessment is carried out either by a custom implementation in method _compute_broken() or by an embedded external function.

Parameters:

p_state (State) – State to be assessed.

Returns:

broken – True, if the given state is a ‘broken’ state. False otherwise.

Return type:

bool

_compute_broken(p_state: State) bool

Custom method for assessment for breakdown. Implement this method if no external function is used. See method compute_broken() for further details.

get_broken() bool
init_plot(p_figure: Figure = None, p_plot_settings: PlotSettings = None, **p_kwargs)

Initializes the plot functionalities of the class.

Parameters:
  • p_figure (Matplotlib.figure.Figure, optional) – Optional MatPlotLib host figure, where the plot shall be embedded. The default is None.

  • p_plot_settings (PlotSettings) – Optional plot settings. If None, the default view is plotted (see attribute C_PLOT_DEFAULT_VIEW).

update_plot(**p_kwargs)

Updates the plot.

Parameters:

**p_kwargs – Implementation-specific plot data and/or parameters.

class mlpro.bf.systems.basics.MultiSystem(p_name: str = None, p_id=None, p_range_max=0, p_autorun=0, p_class_shared=<class 'mlpro.bf.systems.basics.SystemShared'>, p_mode=0, p_latency: ~datetime.timedelta = None, p_t_step: ~datetime.timedelta = None, p_fct_strans: ~mlpro.bf.systems.basics.FctSTrans = None, p_fct_success: ~mlpro.bf.systems.basics.FctSuccess = None, p_fct_broken: ~mlpro.bf.systems.basics.FctBroken = None, p_mujoco_file=None, p_frame_skip: int = 1, p_state_mapping=None, p_action_mapping=None, p_camera_conf: tuple = (None, None, None), p_visualize: bool = False, p_logging=True, **p_kwargs)

Bases: Workflow, System

A complex system of systems.

Parameters:
  • p_name

  • p_id

  • p_range_max

  • p_autorun

  • p_class_shared

  • p_mode

  • p_latency

  • p_t_step

  • p_fct_strans

  • p_fct_success

  • p_fct_broken

  • p_mujoco_file

  • p_frame_skip

  • p_state_mapping

  • p_action_mapping

  • p_camera_conf

  • p_visualize

  • p_logging

  • p_kwargs

C_TYPE = 'Multi-System'
add_system(p_system: System, p_mappings)

Adds sub system to the MultiSystem.

Parameters:
  • p_system (System) – The system to be added.

  • p_mappings (list) – The mappings corresponding the system in the form : [ ((ip dim_type, op dim_type), (input_sys_id, input_dim_id), (op_sys_id, op_dimension_id)), … ]

_reset(p_seed=None) None

Resets the MultiSystem and all the sub-systems inside.

Parameters:

p_seed (int) – Seed for the purpose of reproducibility

get_subsystem_ids()
get_subsystems()
get_subsystem(p_system_id) System

Returns a sub system from the MultiSystem.

Parameters:

p_system_id – Id of the system to be returned

Returns:

sub_system – The system to be returned by ID.

Return type:

System

get_states()

Returns a list of the states of all the Sub-Systems in the MultiSystem.

Returns:

states – States of all the subsystems.

Return type:

dict

simulate_reaction(p_state: State = None, p_action: Action = None, p_t_step: timedelta = None) State

Simulates the multisystem, based on the action and time step.

Parameters:
  • p_state (State) – State of the system.

  • p_action (Action.) – Action provided externally for the simulation of the system.

Returns:

current_state – The new state of the system after simulation.

Return type:

State

compute_broken(p_state: State) bool

Returns true if the system is broken

compute_success(p_state: State) bool

Returns true if the system has reached success.

Parameters:

p_state

Returns:

# TODO

Return type:

Shall return true if any of the system returns true?

class mlpro.bf.systems.basics.DemoScenario(p_system: System, p_mode, p_action_pattern: str = 'random', p_action: list = None, p_id=None, p_cycle_limit=0, p_auto_setup: bool = True, p_visualize: bool = True, p_logging=True)

Bases: ScenarioBase

Demo Scenario Class to demonstrate systems, inherits from the ScenarioBase.

Parameters:
  • p_system (System) – Mandatory parameter, takes the system for the scenario.

  • p_mode – Operation mode. See Mode.C_VALID_MODES for valid values. Default = Mode.C_MODE_SIM.

  • p_action_pattern (str) – The action pattern to be used for demonstration. Default is C_ACTION_RANDOM

  • p_action_random (list) – The action to be executed in constant mode. Mandatory when the mdoe is constant.

  • p_id – Optional external id

  • p_cycle_limit (int) – Maximum number of cycles. Default = 0 (no limit).

  • p_auto_setup (bool) – If True custom method setup() is called after initialization.

  • p_visualize (bool) – Boolean switch for visualisation. Default = True.

  • p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL.

C_NAME = 'Demo System Scenario'
C_ACTION_RANDOM = 'random'
C_ACTION_CONSTANT = 'constant'
setup()

Set’s up the system spaces.

get_latency() timedelta

Returns the latency of the system.

_reset(p_seed)

Resets the Scenario and the system. Sets up the action and state spaces of the system.

Parameters:

p_seed – Seed for the purpose of reproducibility.

_run_cycle()

Runs the custom scenario cycle, through the run method of the scenario base. Checks and returns the brokent state, false otherwise.

_get_next_action()

Generates new action based on the pattern provided by the user.

update_plot(**p_kwargs)

Updates the plot.

Parameters:

**p_kwargs – Implementation-specific plot data and/or parameters.