OA-STREAMS - Online-adaptive Stream Processing
Ver. 0.8.0 (2023-12-20)
Core classes for online adaptive stream processing.
Bases:
StreamShared
Template class for shared objects in the context of online adaptive stream processing.
- class mlpro.oa.streams.basics.OATask(p_name: str = None, p_range_max=1, p_ada: bool = True, p_buffer_size: int = 0, p_duplicate_data: bool = False, p_visualize: bool = False, p_logging=True, **p_kwargs)
Bases:
StreamTask
,Model
Template class for online adaptive ML tasks.
- Parameters:
p_name (str) – Optional name of the task. Default is None.
p_range_max (int) – Maximum range of asynchonicity. See class Range. Default is Range.C_RANGE_PROCESS.
p_ada (bool) – Boolean switch for adaptivitiy. Default = True.
p_buffer_size (int) – Initial size of internal data buffer. Defaut = 0 (no buffering).
p_duplicate_data (bool) – If True, instances will be duplicated before processing. Default = False.
p_visualize (bool) – Boolean switch for visualisation. Default = False.
p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL
p_kwargs (dict) – Further optional named parameters.
- C_TYPE = 'OA-Task'
- C_PLOT_ACTIVE: bool = True
- C_PLOT_STANDALONE: bool = True
- C_PLOT_VALID_VIEWS: list = ['2D', '3D', 'ND']
- C_PLOT_DEFAULT_VIEW: str = 'ND'
- adapt(p_inst_new: List[Instance], p_inst_del: List[Instance]) bool
Adapts the model by calling the custom method _adapt().
- Parameters:
p_kwargs (dict) – All parameters that are needed for the adaption. Depends on the specific higher context.
- Returns:
adapted – True, if something has been adapted. False otherwise.
- Return type:
bool
- _adapt(p_inst_new: List[Instance]) bool
Obligatory custom method for adaptations during regular operation.
- Parameters:
p_inst_new (list) – List of new stream instances to be processed.
- Returns:
adapted – True, if something has been adapted. False otherwise.
- Return type:
bool
- _adapt_reverse(p_inst_del: List[Instance]) bool
Optional custom method for reverse adaptations during regular operation.
- Parameters:
p_inst_del (list) – List of obsolete stream instances to be removed.
- Returns:
adapted – True, if something has been adapted. False otherwise.
- Return type:
bool
- _renormalize(p_normalizer: Normalizer)
Custom method to renormalize internally buffered data using the given normalizer object. This is necessary after an adaptation of a related predecessor normalizer task. See method renormalize_on_event() for further details.
- Parameters:
p_normalizer (Normalizer) – Normalizer object to be applied on task-specific
- renormalize_on_event(p_event_id: str, p_event_object: Event)
Event handler method to be registered on event Model.C_EVENT_ADAPTED of an online adaptive normalizer task. It carries out the task-specific renormalization of internally buffered data by calling the custom method _renormalize().
- Parameters:
p_event_id (str) – Unique event id
p_event_object (Event) – Event object with further context informations
- class mlpro.oa.streams.basics.OAWorkflow(p_name: str = None, p_range_max=1, p_class_shared=<class 'mlpro.oa.streams.basics.OAShared'>, p_ada: bool = True, p_visualize: bool = False, p_logging=True, **p_kwargs)
Bases:
StreamWorkflow
,AWorkflow
Online adaptive workflow based on a stream-workflow and an adaptive workflow.
- Parameters:
p_name (str) – Optional name of the workflow. Default is None.
p_range_max (int) – Maximum range of asynchonicity. See class Range. Default is Range.C_RANGE_PROCESS.
p_class_shared – Optional class for a shared object (class OAShared or a child class of OAShared)
p_ada (bool) – Boolean switch for adaptivitiy. Default = True.
p_visualize (bool) – Boolean switch for visualisation. Default = False.
p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL
p_kwargs (dict) – Further optional named parameters.
- C_TYPE = 'OA-Workflow'
- add_task(p_task: StreamTask, p_pred_tasks: list = None)
Adds a task to the workflow.
- Parameters:
p_task (Task) – Task object to be added.
p_pred_tasks (list) – Optional list of predecessor task objects
- class mlpro.oa.streams.basics.OAScenario(p_mode=0, p_ada: bool = True, p_cycle_limit=0, p_visualize: bool = False, p_logging=True)
Bases:
StreamScenario
Template class for stream based scenarios with online adaptive workflows.
- Parameters:
p_mode – Operation mode. See bf.ops.Mode.C_VALID_MODES for valid values. Default = Mode.C_MODE_SIM.
p_ada (bool) – Boolean switch for adaptivitiy. Default = True.
p_cycle_limit (int) – Maximum number of cycles (0=no limit, -1=get from env). Default = 0.
p_visualize (bool) – Boolean switch for env/agent visualisation. Default = False.
p_logging – Log level (see constants of class mlpro.bf.various.Log). Default = Log.C_LOG_WE.
- C_TYPE = 'OA-Scenario'
- setup()
Specialized method to set up an oa stream scenario. It is automatically called by the constructor and calls in turn the custom method _setup().
- _setup(p_mode, p_ada: bool, p_visualize: bool, p_logging)
Custom method to set up a stream scenario consisting of a stream and a processing stream workflow.
- Parameters:
p_mode – Operation mode. See Mode.C_VALID_MODES for valid values. Default = Mode.C_MODE_SIM.
p_ada (bool) – Boolean switch for adaptivitiy. Default = True.
p_visualize (bool) – Boolean switch for visualisation.
p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL.
- Returns:
stream (Stream) – A stream object.
workflow (OAWorkflow) – An online adaptive stream workflow object.
- class mlpro.oa.streams.basics.OATrainingResults(p_scenario: Scenario, p_run, p_cycle_id, p_logging='W')
Bases:
TrainingResults
…