BF-STREAMS - Stream Processing
Ver. 1.3.0 (2023-11-17)
This module provides classes for standardized stream processing.
- class mlpro.bf.streams.models.Feature(p_name_short, p_base_set='R', p_name_long='', p_name_latex='', p_unit='', p_unit_latex='', p_boundaries: list = [], p_description='', p_symmetrical: bool = False, p_logging=False, **p_kwargs)
Bases:
Dimension
- class mlpro.bf.streams.models.Label(p_name_short, p_base_set='R', p_name_long='', p_name_latex='', p_unit='', p_unit_latex='', p_boundaries: list = [], p_description='', p_symmetrical: bool = False, p_logging=False, **p_kwargs)
Bases:
Dimension
- class mlpro.bf.streams.models.Instance(p_feature_data: Element, p_label_data: Element = None, p_tstamp: datetime = None, **p_kwargs)
Bases:
TStamp
Instance class to store the current instance and the corresponding labels of the stream
- Parameters:
- C_TYPE = 'Instance'
- get_id()
- set_id(p_id: int)
- get_kwargs()
- copy()
Bases:
Shared
Template class for shared objects in the context of stream processing.
Dictionary of new/deleted instances per task. At the beginning of a cycle it contains the incoming instance of a stream. The dictionalry evolves due to the manipulations of the stream tasks.
- Type:
dict
Resets the shared object and prepares the processing of the given set of new instances.
- Parameters:
p_inst_new (List[Instance]) – List of new instances to be processed.
Provides the result instances of all given task ids.
- Parameters:
p_task_ids (list) – List of task ids.
- Returns:
inst_new (list) – List of new instances of all given task ids.
inst_del (list) – List of instances to be deleted of all given task ids.
Stores result instances of a task in the shared object.
- Parameters:
p_task_id – Id of related task.
p_inst_new (list) – List of new instances.
p_inst_del (list) – List of instances to be deleted.
- class mlpro.bf.streams.models.Sampler(p_num_instances: int = 0, **p_kwargs)
Bases:
ScientificObject
Template class for data streams sampler. This object can be used in Stream.
- Parameters:
p_num_instances (int) – number of instances.
p_kwargs (dict) – Further sampler specific parameters.
- C_TYPE = 'Sampler'
- reset()
A method to reset the sampler’s settings. Please redefine this method!
- get_num_instances() int
A method to get the number of instances that is being processed by the sampler.
- Returns:
Number of instances.
- Return type:
int
- set_num_instances(p_num_instances: int)
A method to set the number of instances that is going to be processed by the sampler.
- Parameters:
p_num_instances (int) – Number of instances.
- omit_instance(p_inst: Instance) bool
A method to filter any incoming instances.
- Parameters:
p_inst (Instance) – An input instance to be filtered.
- Returns:
False means the input instance is not omitted, otherwise True.
- Return type:
bool
- _omit_instance(p_inst: Instance) bool
A custom method to filter any incoming instances, which is being called by omit_instance() method. Please redefine this method!
- Parameters:
p_inst (Instance) – An input instance to be filtered.
- Returns:
False means the input instance is not omitted, otherwise True.
- Return type:
bool
- class mlpro.bf.streams.models.Stream(p_id=None, p_name: str = '', p_num_instances: int = 0, p_version: str = '', p_feature_space: MSpace = None, p_label_space: MSpace = None, p_sampler: Sampler = None, p_mode=0, p_logging=True, **p_kwargs)
Bases:
Mode
,Id
,ScientificObject
Template class for data streams. Objects of this type can be used as iterators.
- Parameters:
p_id – Optional id of the stream. Default = None.
p_name (str) – Optional name of the stream. Default = ‘’.
p_num_instances (int) – Optional number of instances in the stream. Default = 0.
p_version (str) – Optional version of the stream. Default = ‘’.
p_feature_space (MSpace) – Optional feature space. Default = None.
p_label_space (MSpace) – Optional label space. Default = None.
p_sampler – Optional sampler. Default: None.
p_mode – Operation mode. Default: Mode.C_MODE_SIM.
p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL.
p_kwargs (dict) – Further stream specific parameters.
- C_TYPE = 'Stream'
- get_name() str
Returns the name of the stream.
- Returns:
stream_name – Name of the stream.
- Return type:
str
- get_url() str
Returns the URL of the scientific source/reference.
- Returns:
url – URL of the scientific source/reference.
- Return type:
str
- get_num_instances() int
Returns the number of instances of the stream.
- Returns:
num_inst – Number of instances of the stream. If 0 the number is unknown.
- Return type:
int
- get_feature_space() MSpace
Returns the feature space of the stream.
- Returns:
feature_space – Feature space of the stream.
- Return type:
- _setup_feature_space() MSpace
Custom method to set up the feature space of the stream. It is called by method get_feature_space().
- Returns:
feature_space – Feature space of the stream.
- Return type:
- get_label_space() MSpace
Returns the label space of the stream.
- Returns:
label_space – Label space of the stream.
- Return type:
- _setup_label_space() MSpace
Custom method to set up the label space of the stream. It is called by method get_label_space().
- Returns:
label_space – Label space of the stream.
- Return type:
- set_options(**p_kwargs)
Method to set specific options for the stream. The possible options depend on the stream provider and stream itself.
- set_random_seed(p_seed=None)
Resets the internal random generator using the given seed.
- _reset()
Custom reset method for data stream. See method __iter__() for more details.
- class mlpro.bf.streams.models.StreamProvider(p_logging=True)
Bases:
Log
,ScientificObject
Template class for stream providers.
- Parameters:
p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL
- C_TYPE = 'Stream Provider'
- get_stream_list(p_mode=0, p_logging=True, **p_kwargs) list
Gets a list of provided streams by calling custom method _get_stream_list().
- Parameters:
p_mode – Operation mode. Default: Mode.C_MODE_SIM.
p_logging – Log level of stream objects (see constants of class Log). Default: Log.C_LOG_ALL.
p_kwargs (dict) – Further stream specific parameters.
- Returns:
stream_list – List of provided streams.
- Return type:
list
- _get_stream_list(p_mode=0, p_logging=True, **p_kwargs) list
Custom method to get the list of provided streams. See method get_stream_list() for further details.
- Parameters:
p_mode – Operation mode. Default: Mode.C_MODE_SIM.
p_logging – Log level of stream objects (see constants of class Log). Default: Log.C_LOG_ALL.
p_kwargs (dict) – Further stream specific parameters.
- Returns:
stream_list – List of provided streams.
- Return type:
list
- get_stream(p_id: str = None, p_name: str = None, p_mode=0, p_logging=True, **p_kwargs) Stream
Returns stream with the specified id by calling custom method _get_stream().
- Parameters:
p_id (str) – Optional Id of the requested stream. Default = None.
p_name (str) – Optional name of the requested stream. Default = None.
p_mode – Operation mode. Default: Mode.C_MODE_SIM.
p_logging – Log level of stream object (see constants of class Log). Default: Log.C_LOG_ALL.
p_kwargs (dict) – Further stream specific parameters.
- Returns:
s – Stream object or None in case of an error.
- Return type:
- _get_stream(p_id: str = None, p_name: str = None, p_mode=0, p_logging=True, **p_kwargs) Stream
Custom method to get the specified stream. See method get_stream() for further details.
- Parameters:
p_id (str) – Optional Id of the requested stream. Default = None.
p_name (str) – Optional name of the requested stream. Default = None.
p_mode – Operation mode. Default: Mode.C_MODE_SIM.
p_logging – Log level of stream object (see constants of class Log). Default: Log.C_LOG_ALL.
p_kwargs (dict) – Further stream specific parameters.
- Returns:
s – Stream object or None in case of an error.
- Return type:
- class mlpro.bf.streams.models.StreamTask(p_name: str = None, p_range_max=1, p_duplicate_data: bool = False, p_visualize: bool = False, p_logging=True, **p_kwargs)
Bases:
Task
Template class for stream-based tasks.
- Parameters:
p_name (str) – Optional name of the task. Default is None.
p_range_max (int) – Maximum range of asynchonicity. See class Range. Default is Range.C_RANGE_PROCESS.
p_duplicate_data (bool) – If True, instances will be duplicated before processing. Default = False.
p_visualize (bool) – Boolean switch for visualisation. Default = False.
p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL
p_kwargs (dict) – Further optional named parameters.
- C_TYPE = 'Stream-Task'
- C_PLOT_ACTIVE: bool = True
- C_PLOT_STANDALONE: bool = True
- C_PLOT_VALID_VIEWS: list = ['2D', '3D', 'ND']
- C_PLOT_DEFAULT_VIEW: str = 'ND'
- C_PLOT_ND_XLABEL_INST = 'Instance index'
- C_PLOT_ND_XLABEL_TIME = 'Time index'
- C_PLOT_ND_YLABEL = 'Feature Data'
- _get_custom_run_method()
- run(p_range: int = None, p_wait: bool = False, p_inst_new: List[Instance] = None, p_inst_del: List[Instance] = None)
Executes the specific actions of the task implemented in custom method _run(). At the end event C_EVENT_FINISHED is raised to start subsequent actions.
- Parameters:
p_range (int) – Optional deviating range of asynchonicity. See class Range. Default is None what means that the maximum range defined during instantiation is taken. Oterwise the minimum range of both is taken.
p_wait (bool) – If True, the method waits until all (a)synchronous tasks are finished.
p_inst_new (list) – Optional list of new stream instances to be processed. If None, the list of the shared object is used instead. Default = None.
p_inst_del (list) – List of obsolete stream instances to be removed. If None, the list of the shared object is used instead. Default = None.
- _run(p_inst_new: List[Instance], p_inst_del: List[Instance])
Custom method that is called by method run().
- Parameters:
p_inst_new (set) – Set of new stream instances to be processed.
p_inst_del (set) – Set of obsolete stream instances to be removed.
- init_plot(p_figure: Figure = None, p_plot_settings: PlotSettings = None)
Initializes the plot functionalities of the class.
- Parameters:
p_figure (Matplotlib.figure.Figure, optional) – Optional MatPlotLib host figure, where the plot shall be embedded. The default is None.
p_plot_settings (PlotSettings) – Optional plot settings. If None, the default view is plotted (see attribute C_PLOT_DEFAULT_VIEW).
- _init_plot_2d(p_figure: Figure, p_settings: PlotSettings)
Default implementation for stream tasks. See class mlpro.bf.plot.Plottable for more details.
- _init_plot_3d(p_figure: Figure, p_settings: PlotSettings)
Default implementation for stream tasks. See class mlpro.bf.plot.Plottable for more details.
- _init_plot_nd(p_figure: Figure, p_settings: PlotSettings)
Default implementation for stream tasks. See class mlpro.bf.plot.Plottable for more details.
- update_plot(p_inst_new: List[Instance] = None, p_inst_del: List[Instance] = None, **p_kwargs)
Specialized definition of method update_plot() of class mlpro.bf.plot.Plottable.
- _update_plot_2d(p_settings: PlotSettings, p_inst_new: list, p_inst_del: list, **p_kwargs)
Default implementation for stream tasks. See class mlpro.bf.plot.Plottable for more details.
- Parameters:
p_settings (PlotSettings) – Object with further plot settings.
p_inst_new (list) – List of new stream instances to be plotted.
p_inst_del (list) – List of obsolete stream instances to be removed.
p_kwargs (dict) – Further optional plot parameters.
- _update_plot_3d(p_settings: PlotSettings, p_inst_new: list, p_inst_del: list, **p_kwargs)
Default implementation for stream tasks. See class mlpro.bf.plot.Plottable for more details.
- Parameters:
p_settings (PlotSettings) – Object with further plot settings.
p_inst_new (list) – List of new stream instances to be plotted.
p_inst_del (list) – List of obsolete stream instances to be removed.
p_kwargs (dict) – Further optional plot parameters.
- _update_plot_nd(p_settings: PlotSettings, p_inst_new: list, p_inst_del: list, **p_kwargs)
Default implementation for stream tasks. See class mlpro.bf.plot.Plottable for more details.
- Parameters:
p_settings (PlotSettings) – Object with further plot settings.
p_inst_new (list) – List of new stream instances to be plotted.
p_inst_del (list) – List of obsolete stream instances to be removed.
p_kwargs (dict) – Further optional plot parameters.
- class mlpro.bf.streams.models.StreamWorkflow(p_name: str = None, p_range_max=1, p_class_shared=<class 'mlpro.bf.streams.models.StreamShared'>, p_visualize: bool = False, p_logging=True, **p_kwargs)
Bases:
StreamTask
,Workflow
Workflow for stream processing. See class bf.mt.Workflow for further details.
- Parameters:
p_name (str) – Optional name of the task. Default is None.
p_range_max (int) – Range of asynchonicity. See class Range. Default is Range.C_RANGE_THREAD.
p_class_shared – Optional class for a shared object (class StreamShared or a child class of StreamShared). Default = StreamShared
p_visualize (bool) – Boolean switch for visualisation. Default = False.
p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL
p_kwargs (dict) – Further optional named parameters handed over to every task within.
- C_TYPE = 'Stream-Workflow'
- C_PLOT_ACTIVE: bool = True
- run(p_range: int = None, p_wait: bool = False, p_inst_new: list = None, p_inst_del: list = None)
Runs all stream tasks according to their predecessor relations.
- Parameters:
p_range (int) – Optional deviating range of asynchonicity. See class Range. Default is None what means that the maximum range defined during instantiation is taken. Oterwise the minimum range of both is taken.
p_wait (bool) – If True, the method waits until all (a)synchronous tasks are finished.
p_inst_new (list) – Optional list of new stream instances to be processed. If None, the list of the shared object is used instead. Default = None.
p_inst_del (list) – List of obsolete stream instances to be removed. If None, the list of the shared object is used instead. Default = None.
- init_plot(p_figure: Figure = None, p_plot_settings: PlotSettings = None)
Initializes the plot of a workflow. The method creates a host figure for all tasks if no external host figure is parameterized. The sub-plots of the tasks are autmatically arranged within the host figure.
See method init_plot() of class mlpro.bf.plot.Plottable for further details.
- Parameters:
p_figure (Matplotlib.figure.Figure, optional) – Optional MatPlotLib host figure, where the plot shall be embedded. The default is None.
p_plot_settings (PlotSettings) – Optional plot settings. If None, the default view is plotted (see attribute C_PLOT_DEFAULT_VIEW).
- _init_plot_2d(p_figure: Figure, p_settings: PlotSettings)
Default implementation for stream tasks. See class mlpro.bf.plot.Plottable for more details.
- _init_plot_3d(p_figure: Figure, p_settings: PlotSettings)
Default implementation for stream tasks. See class mlpro.bf.plot.Plottable for more details.
- _init_plot_nd(p_figure: Figure, p_settings: PlotSettings)
Default implementation for stream tasks. See class mlpro.bf.plot.Plottable for more details.
- update_plot(p_inst_new: list = None, p_inst_del: list = None, **p_kwargs)
Specialized definition of method update_plot() of class mlpro.bf.plot.Plottable.
- Parameters:
p_inst_new (list) – List of new stream instances to be plotted.
p_inst_del (list) – List of obsolete stream instances to be removed.
p_kwargs (dict) – Further optional plot parameters.
- class mlpro.bf.streams.models.StreamScenario(p_mode, p_cycle_limit=0, p_visualize: bool = False, p_logging=True)
Bases:
ScenarioBase
Template class for stream based scenarios.
- Parameters:
p_mode – Operation mode. See Mode.C_VALID_MODES for valid values. Default = Mode.C_MODE_SIM.
p_cycle_limit (int) – Maximum number of cycles. Default = 0 (no limit).
p_visualize (bool) – Boolean switch for visualisation. Default = False.
p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL.
- C_TYPE = 'Stream-Scenario'
- C_PLOT_ACTIVE: bool = True
- setup()
Specialized method to set up a stream scenario. It is automatically called by the constructor and calls in turn the custom method _setup().
- _setup(p_mode, p_visualize: bool, p_logging)
Custom method to set up a stream scenario consisting of a stream and a processing stream workflow.
- Parameters:
p_mode – Operation mode. See Mode.C_VALID_MODES for valid values. Default = Mode.C_MODE_SIM.
p_visualize (bool) – Boolean switch for visualisation.
p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL.
- Returns:
stream (Stream) – A stream object.
workflow (StreamWorkflow) – A stream workflow object.
- _set_mode(p_mode)
Custom method to set the operation mode of components of the scenario. See method set_mode() for further details.
Parameter
- p_mode
Operation mode. See class bf.ops.Mode for further details.
- _reset(p_seed)
Custom method to reset the components of the scenario and to set the given random seed value. See method reset() for further details.
- Parameters:
p_seed (int) – Seed value for internal random generator
- get_latency() timedelta
Returns the latency of the scenario. To be implemented in child class.
- _run_cycle()
Gets next instance from the stream and lets process it by the stream workflow.
- Returns:
success (bool) – True on success. False otherwise.
error (bool) – True on error. False otherwise.
adapted (bool) – True, if something within the scenario has adapted something in this cycle. False otherwise.
end_of_data (bool) – True, if the end of the related data source has been reached. False otherwise.
- _init_figure() Figure
Custom method to initialize a suitable standalone Matplotlib figure.
- Returns:
figure – Matplotlib figure object to host the subplot(s)
- Return type:
Matplotlib.figure.Figure
- init_plot(p_figure: Figure = None, p_plot_settings: PlotSettings = None)
Initializes the plot functionalities of the class.
- Parameters:
p_figure (Matplotlib.figure.Figure, optional) – Optional MatPlotLib host figure, where the plot shall be embedded. The default is None.
p_plot_settings (PlotSettings) – Optional plot settings. If None, the default view is plotted (see attribute C_PLOT_DEFAULT_VIEW).
- update_plot(**p_kwargs)
Plot updates take place during workflow/task processing and are disabled here…
- get_workflow() StreamWorkflow