BF-SYSTEMS - State-based Systems
Ver. 2.0.0 (2023-05-dd)
This module provides models and templates for state based systems.
- class mlpro.bf.systems.basics.State(p_state_space: MSpace, p_initial: bool = False, p_terminal: bool = False, p_success: bool = False, p_broken: bool = False, p_timeout: bool = False, **p_kwargs)
Bases:
Instance
,Element
,TStamp
State of a system as an element of a given state space. Additionally, the state can be labeled with various properties.
- Parameters:
p_state_space (MSpace) – State space of the related system.
p_initial (bool) – This optional flag signals that the state is the first one after a reset. Default=False.
p_terminal (bool) – This optional flag labels the state as a terminal state. Default=False.
p_success (bool) – This optional flag labels the state as an objective state. Default=False.
p_broken (bool) – This optional flag labels the state as a final error state. Default=False.
p_timeout (bool) – This optional flag signals that the cycle limit of an episode has been reached. Default=False.
- get_initial() bool
- set_initial(p_initial: bool)
- get_success() bool
- set_success(p_success: bool)
- get_broken() bool
- set_broken(p_broken: bool)
- get_timeout() bool
- set_timeout(p_timeout: bool)
- get_terminal() bool
- set_terminal(p_terminal: bool)
- class mlpro.bf.systems.basics.ActionElement(p_action_space: Set, p_weight: float = 1.0)
Bases:
Element
Single entry of an action. See class Action for further details.
- Parameters:
p_action_space (Set) – Related action space.
p_weight (float) – Weight of action element. Default = 1.0.
- get_weight()
- set_weight(p_weight)
- class mlpro.bf.systems.basics.Action(p_agent_id=0, p_action_space: Set = None, p_values: ndarray = None)
Bases:
ElementList
,TStamp
Objects of this class represent actions of (multi-)agents. Every element of the internal list is related to an agent, and its partial subsection. Action values for the first agent can be added while object instantiation. Action values of further agents can be added by using method self.add_elem().
- Parameters:
p_agent_id – Unique id of (first) agent to be added
p_action_space (Set) – Action space of (first) agent to be added
p_values (np.ndarray) – Action values of (first) agent to be added
- get_agent_ids()
- get_sorted_values() ndarray
- class mlpro.bf.systems.basics.FctSTrans(p_logging=True)
Bases:
Log
Template class for state transition functions.
- Parameters:
p_logging – Log level (see class Log for more details). Default = Log.C_LOG_ALL.
- C_TYPE = 'Fct STrans'
- class mlpro.bf.systems.basics.FctSuccess(p_logging=True)
Bases:
Log
Template class for functions that determine whether or not a state is a success state.
- Parameters:
p_logging – Log level (see class Log for more details). Default = Log.C_LOG_ALL.
- C_TYPE = 'Fct Success'
- class mlpro.bf.systems.basics.FctBroken(p_logging=True)
Bases:
Log
Template class for functions that determine whether or not a state is a broken state.
- Parameters:
p_logging – Log level (see class Log for more details). Default = Log.C_LOG_ALL.
- C_TYPE = 'Fct Broken'
- class mlpro.bf.systems.basics.Sensor(p_name_short, p_base_set='R', p_name_long='', p_name_latex='', p_unit='', p_unit_latex='', p_boundaries: list = [], p_description='', p_symmetrical: bool = False, p_logging=False, **p_kwargs)
Bases:
Dimension
Template for a sensor.
- C_TYPE = 'Sensor'
- class mlpro.bf.systems.basics.Actuator(p_name_short, p_base_set='R', p_name_long='', p_name_latex='', p_unit='', p_unit_latex='', p_boundaries: list = [], p_description='', p_symmetrical: bool = False, p_logging=False, **p_kwargs)
Bases:
Dimension
Template for an actuator.
- C_TYPE = 'Actuator'
- class mlpro.bf.systems.basics.Controller(p_id, p_name: str = '', p_logging: bool = True, **p_kwargs)
Bases:
EventManager
Template for a controller that enables access to sensors and actuators.
- Parameters:
p_id – Unique id of the controller.
p_name (str) – Optional name of the controller.
p_logging – Log level (see class Log for more details). Default = Log.C_LOG_ALL.
p_kwargs (dict) – Further keyword arguments specific to the controller.
- C_EVENT_COMM_ERROR
Event that is raised on a communication error
- C_TYPE = 'Controller'
- C_EVENT_COMM_ERROR = 'COMM_ERROR'
- reset() bool
Resets the controller by calling custom method _reset().
- Returns:
result – True, if successful. False otherwise. Additionally event C_EVENT_COMM_ERROR is raised.
- Return type:
bool
- _reset() bool
Custom reset method.
- Returns:
result – True, if successful. False otherwise.
- Return type:
bool
- add_sensor(p_sensor: Sensor)
Adds a sensor to the controller.
- Parameters:
p_sensor (Sensor) – Sensor object to be added.
- get_sensors() Set
Returns the internal set of sensors.
- Returns:
sensors – Set of sensors.
- Return type:
- get_sensor_value(p_id)
Deternines the value of a sensor by calling custom method _get_sensor_value().
- Parameters:
p_id – Id of the sensor.
- Returns:
Current value of the sensor or None on a communication error. In that case, event C_EVENT_COMM_ERROR is raised additionally.
- Return type:
value
- _get_sensor_value(p_id)
Custom method to get a sensor value. See method get_sensor_value() for further details.
- Parameters:
p_id – Id of the sensor.
- Returns:
Current value of the sensor or None on a communication error.
- Return type:
value
- add_actuator(p_actuator: Actuator)
Adds an actuator to the controller.
- Parameters:
p_actuator (Actuator) – Actuator object to be added.
- get_actuators() Set
Returns the internal set of actuators.
- Returns:
actuators – Set of actuators.
- Return type:
- set_actuator_value(p_id, p_value) bool
Sets the value of an actuator by calling custom method _set_actuatur_value().
- Parameters:
p_id – Id of the actuator.
p_value – New actuator value.
- Returns:
successful – True, if successful. False otherwise. In that case, event C_EVENT_COMM_ERROR is raised additionally.
- Return type:
bool
- _set_actuator_value(p_id, p_value) bool
Custom method to set an actuator value. See method set_sensor_value() for further details.
- Parameters:
p_id – Id of the actuator.
p_value – New actuator value.
- Returns:
successful – True, if successful. False otherwise.
- Return type:
bool
Bases:
Shared
A specialised shared object for managing IPC between MultiSystems.
- TODO:
Entry Systems to be handled yet, that get action from outside.
- Parameters:
p_range – The multiprocessing range for the specific process. Default is None.
Spaces of all the systems registered in the Shared Object.
States of all the systems registered in the Shared Object.
Corresponding Actions for all the systems.
All the dimensions present in the Shared Object.
Mapping configurations for system to action mapping.
Resets the shared object.
- Parameters:
p_seed (int) – Seed for reproducibility.
Updates the states in the Shared Object.
- Parameters:
p_state (State) – The id of the system, for which action is to be fetched.
- Return type:
bool
Updates the action values based on a new state, in a MultiSystem Context.
- Parameters:
p_sys_id – Id of the system from which the state is received.
p_state (State) – The State of the system which affects the action.
Fetches the corresponding action for a particular system.
- Parameters:
p_sys_id – The id of the system, for which action is to be fetched.
- Returns:
action – The corresponding action for the system.
- Return type:
Fetch the states of all the internal systems
- Returns:
states – Returns the state of each of the system registered on the shared object.
- Return type:
dict
Fetches the state of a particular system from the Shared Object.
- Parameters:
p_sys_id – The id of the system, of which state is to be fetched.
- Returns:
state – The corresponding state of the system.
- Return type:
Maps a dimension to output dimension with info about output sys, output dim type and output dim.
- Parameters:
p_sys_id – Id of the system for which action is to be mapped into the mappings.
p_state (State) – The State to be mapped into the ‘states’ dictionary.
- Returns:
mapping – A tuple of tuples of System id and dimension id mappings from State to Action respectively.
- Return type:
(output_sys, output_dim_type, output_dim)
Registers the system in the Shared Object and sets up the dimension to dimension mapping.
- Parameters:
p_system (System) – The system to be registered.
p_mappings – Mappings corresponding the system in the form: ( (ip dim_type, op dim_type), (input_sys_id, input_dim_id) , (op_sys_id, op_dimension_id) )
- class mlpro.bf.systems.basics.System(p_id=None, p_name: str = None, p_range_max: int = 0, p_autorun=0, p_class_shared=None, p_mode=0, p_latency: timedelta = None, p_t_step: timedelta = None, p_fct_strans: FctSTrans = None, p_fct_success: FctSuccess = None, p_fct_broken: FctBroken = None, p_mujoco_file=None, p_frame_skip: int = 1, p_state_mapping=None, p_action_mapping=None, p_camera_conf: tuple = (None, None, None), p_visualize: bool = False, p_logging=True, **p_kwargs)
Bases:
FctSTrans
,FctSuccess
,FctBroken
,Task
,Mode
,Plottable
,Persistent
,ScientificObject
Base class for state based systems.
- Parameters:
p_mode – Mode of the system. Possible values are Mode.C_MODE_SIM(default) or Mode.C_MODE_REAL.
p_latency (timedelta) – Optional latency of the system. If not provided, the internal value of constant C_LATENCY is used by default.
p_fct_strans (FctSTrans) – Optional external function for state transition.
p_fct_success (FctSuccess) – Optional external function for state evaluation ‘success’.
p_fct_broken (FctBroken) – Optional external function for state evaluation ‘broken’.
p_mujoco_file – Path to XML file for MuJoCo model.
p_frame_skip (int) – Frame to be skipped every step. Default = 1.
p_state_mapping – State mapping if the MLPro state and MuJoCo state have different naming.
p_action_mapping – Action mapping if the MLPro action and MuJoCo action have different naming.
p_use_radian (bool) – Use radian if the action and the state based on radian unit. Default = True.
p_camera_conf (tuple) – Default camera configuration on MuJoCo Simulation (xyz position, elevation, distance).
p_visualize (bool) – Boolean switch for env/agent visualisation. Default = False.
p_logging – Log level (see class Log for more details). Default = Log.C_LOG_ALL.
- _latency
Latency of the system.
- Type:
timedelta
- _fct_success
Internal function for state evaluation ‘success’.
- Type:
- C_TYPE = 'System'
- C_LATENCY = datetime.timedelta(seconds=1)
- C_PLOT_ACTIVE: bool = True
- static setup_spaces()
Static template method to set up and return state and action space of environment.
- Returns:
state_space (MSpace) – State space object
action_space (MSpace) – Action space object
- _reduce_state(p_state: dict, p_path: str, p_os_sep: str, p_filename_stub: str)
An embedded MuJoCo system can not be pickled and needs to be removed from the pickle stream.
- _complete_state(p_path: str, p_os_sep: str, p_filename_stub: str)
Custom method to complete the object state (=self) from external data sources. This method is called by standard method __setstate__() during unpickling the object from an external file.
- Parameters:
p_path (str) – Path of the object pickle file (and further optional related files)
p_os_sep (str) – OS-specific path separator.
p_filename_stub (str) – Filename stub to be used for further optional custom data files
- switch_logging(p_logging)
Sets new log level.
- Parameters:
p_logging – Log level (constant C_LOG_LEVELS contains valid values)
- get_latency() timedelta
Returns latency of the system.
- set_latency(p_latency: timedelta = None) None
Sets latency of the system. If p_latency is None latency will be reset to internal value of attribute C_LATENCY.
- Parameters:
p_latency (timedelta) – New latency value
- get_fct_strans()
Returns the state transition function of the system, if exists, otherwise, the system itself.
- Returns:
fct_strans – State transition function of the system, if exists. Otherwise, system itself.
- Return type:
- get_fct_broken()
Returns the broken computation function of the system, if exists, otherwise, the system itself.
- Returns:
fct_broken – Broken computation function of the system, if exists. Otherwise, system itself.
- Return type:
- get_fct_success()
Returns the Success computation function of the system, if exists, otherwise, the system itself.
- Returns:
fct_success – Success computation function of the system, if exists. Otherwise, system itself.
- Return type:
- set_random_seed(p_seed=None)
Resets the internal random generator using the given seed.
- Parameters:
p_seed (int) – Seed parameter for an internal random generator
- reset(p_seed=None) None
Resets the system to an initial state. If MuJoCo is not used, the custom method _reset() is called.
- Parameters:
p_seed (int) – Seed parameter for an internal random generator
- _reset(p_seed=None) None
Custom method to reset the system to an initial/defined state. Use method _set_status() to set the state.
- Parameters:
p_seed (int) – Seed parameter for an internal random generator
- add_controller(p_controller: Controller, p_mapping: list) bool
Adds a controller and a related mapping of states and actions to sensors and actuators.
- Parameters:
p_controller (Controller) – Controller object to be added.
p_mapping (list) – A list of mapping tuples following the syntax ( [Type = ‘S’ or ‘A’], [Name of state/action] [Name of sensor/actuator] )
- Returns:
successful – True, if controller and related mapping was added successfully. False otherwise.
- Return type:
bool
- get_so() SystemShared
Returns the associated shared object.
- Returns:
so – Shared object of type Shared (or inherited)
- Return type:
- _run(p_action: Action, p_t_step: timedelta = None)
Run method that runs the system as a task. It runs the process_action() method of the system with action as a parameter.
- Parameters:
p_t_step (timedelta) – Time for which the system must be simulated.
- process_action(p_action: Action, p_t_step: timedelta = None) bool
Processes a state transition based on the current state and a given action. The state transition itself is implemented in child classes in the custom method _process_action().
- Parameters:
p_action (Action) – Action to be processed
p_t_step (timedelta) – The timestep for which the system is to be simulated
- Returns:
success – True, if action processing was successfull. False otherwise.
- Return type:
bool
- _process_action(p_action: Action, p_t_step: timedelta = None) bool
Internal custom method for state transition with default implementation. To be redefined in a child class on demand. See method process_action() for further details.
- simulate_reaction(p_state: State = None, p_action: Action = None, p_t_step: timedelta = None) State
Simulates a state transition based on a state and an action. The simulation step itself is carried out either by an internal custom implementation in method _simulate_reaction() or by an embedded external function.
- _simulate_reaction(p_state: State, p_action: Action, p_step: timedelta = None) State
Custom method for a simulated state transition. Implement this method if no external state transition function is used. See method simulate_reaction() for further details.
- action_to_mujoco(p_mlpro_action)
Action conversion method from converting MLPro action to MuJoCo action.
- _action_to_mujoco(p_mlpro_action)
Custom method for to do transition between MuJoCo state and MLPro state. Implement this method if the MLPro state has different dimension from MuJoCo state.
- Parameters:
p_mujoco_state (Numpy) – MLPro action.
- Returns:
Modified MLPro action
- Return type:
Numpy
- state_from_mujoco(p_mujoco_state)
State conversion method from converting MuJoCo state to MLPro state.
- _state_from_mujoco(p_mujoco_state)
Custom method for to do transition between MuJoCo state and MLPro state. Implement this method if the MLPro state has different dimension from MuJoCo state.
- Parameters:
p_mujoco_state (Numpy) – MuJoCo state.
- Returns:
Modified MuJoCo state
- Return type:
Numpy
- _import_state() bool
- compute_success(p_state: State) bool
Assesses the given state whether it is a ‘success’ state. Assessment is carried out either by a custom implementation in method _compute_success() or by an embedded external function.
- Parameters:
p_state (State) – State to be assessed.
- Returns:
success – True, if the given state is a ‘success’ state. False otherwise.
- Return type:
bool
- _compute_success(p_state: State) bool
Custom method for assessment for success. Implement this method if no external function is used. See method compute_success() for further details.
- get_success() bool
- compute_broken(p_state: State) bool
Assesses the given state whether it is a ‘broken’ state. Assessment is carried out either by a custom implementation in method _compute_broken() or by an embedded external function.
- Parameters:
p_state (State) – State to be assessed.
- Returns:
broken – True, if the given state is a ‘broken’ state. False otherwise.
- Return type:
bool
- _compute_broken(p_state: State) bool
Custom method for assessment for breakdown. Implement this method if no external function is used. See method compute_broken() for further details.
- get_broken() bool
- init_plot(p_figure: Figure = None, p_plot_settings: PlotSettings = None, **p_kwargs)
Initializes the plot functionalities of the class.
- Parameters:
p_figure (Matplotlib.figure.Figure, optional) – Optional MatPlotLib host figure, where the plot shall be embedded. The default is None.
p_plot_settings (PlotSettings) – Optional plot settings. If None, the default view is plotted (see attribute C_PLOT_DEFAULT_VIEW).
- update_plot(**p_kwargs)
Updates the plot.
- Parameters:
**p_kwargs – Implementation-specific plot data and/or parameters.
- class mlpro.bf.systems.basics.MultiSystem(p_name: str = None, p_id=None, p_range_max=0, p_autorun=0, p_class_shared=<class 'mlpro.bf.systems.basics.SystemShared'>, p_mode=0, p_latency: ~datetime.timedelta = None, p_t_step: ~datetime.timedelta = None, p_fct_strans: ~mlpro.bf.systems.basics.FctSTrans = None, p_fct_success: ~mlpro.bf.systems.basics.FctSuccess = None, p_fct_broken: ~mlpro.bf.systems.basics.FctBroken = None, p_mujoco_file=None, p_frame_skip: int = 1, p_state_mapping=None, p_action_mapping=None, p_camera_conf: tuple = (None, None, None), p_visualize: bool = False, p_logging=True, **p_kwargs)
-
A complex system of systems.
- Parameters:
p_name
p_id
p_range_max
p_autorun
p_class_shared
p_mode
p_latency
p_t_step
p_fct_strans
p_fct_success
p_fct_broken
p_mujoco_file
p_frame_skip
p_state_mapping
p_action_mapping
p_camera_conf
p_visualize
p_logging
p_kwargs
- C_TYPE = 'Multi-System'
- add_system(p_system: System, p_mappings)
Adds sub system to the MultiSystem.
- Parameters:
p_system (System) – The system to be added.
p_mappings (list) – The mappings corresponding the system in the form : [ ((ip dim_type, op dim_type), (input_sys_id, input_dim_id), (op_sys_id, op_dimension_id)), … ]
- _reset(p_seed=None) None
Resets the MultiSystem and all the sub-systems inside.
- Parameters:
p_seed (int) – Seed for the purpose of reproducibility
- get_subsystem_ids()
- get_subsystems()
- get_subsystem(p_system_id) System
Returns a sub system from the MultiSystem.
- Parameters:
p_system_id – Id of the system to be returned
- Returns:
sub_system – The system to be returned by ID.
- Return type:
- get_states()
Returns a list of the states of all the Sub-Systems in the MultiSystem.
- Returns:
states – States of all the subsystems.
- Return type:
dict
- class mlpro.bf.systems.basics.DemoScenario(p_system: System, p_mode, p_action_pattern: str = 'random', p_action: list = None, p_id=None, p_cycle_limit=0, p_auto_setup: bool = True, p_visualize: bool = True, p_logging=True)
Bases:
ScenarioBase
Demo Scenario Class to demonstrate systems, inherits from the ScenarioBase.
- Parameters:
p_system (System) – Mandatory parameter, takes the system for the scenario.
p_mode – Operation mode. See Mode.C_VALID_MODES for valid values. Default = Mode.C_MODE_SIM.
p_action_pattern (str) – The action pattern to be used for demonstration. Default is C_ACTION_RANDOM
p_action_random (list) – The action to be executed in constant mode. Mandatory when the mdoe is constant.
p_id – Optional external id
p_cycle_limit (int) – Maximum number of cycles. Default = 0 (no limit).
p_auto_setup (bool) – If True custom method setup() is called after initialization.
p_visualize (bool) – Boolean switch for visualisation. Default = True.
p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL.
- C_NAME = 'Demo System Scenario'
- C_ACTION_RANDOM = 'random'
- C_ACTION_CONSTANT = 'constant'
- setup()
Set’s up the system spaces.
- get_latency() timedelta
Returns the latency of the system.
- _reset(p_seed)
Resets the Scenario and the system. Sets up the action and state spaces of the system.
- Parameters:
p_seed – Seed for the purpose of reproducibility.
- _run_cycle()
Runs the custom scenario cycle, through the run method of the scenario base. Checks and returns the brokent state, false otherwise.
- _get_next_action()
Generates new action based on the pattern provided by the user.
- update_plot(**p_kwargs)
Updates the plot.
- Parameters:
**p_kwargs – Implementation-specific plot data and/or parameters.