BF-STREAMS - Stream Processing

../../../../../../../_images/MLPro-BF-Streams_class_diagram.drawio.png

Ver. 1.3.0 (2023-11-17)

This module provides classes for standardized stream processing.

class mlpro.bf.streams.models.Feature(p_name_short, p_base_set='R', p_name_long='', p_name_latex='', p_unit='', p_unit_latex='', p_boundaries: list = [], p_description='', p_symmetrical: bool = False, p_logging=False, **p_kwargs)

Bases: Dimension

class mlpro.bf.streams.models.Label(p_name_short, p_base_set='R', p_name_long='', p_name_latex='', p_unit='', p_unit_latex='', p_boundaries: list = [], p_description='', p_symmetrical: bool = False, p_logging=False, **p_kwargs)

Bases: Dimension

class mlpro.bf.streams.models.Instance(p_feature_data: Element, p_label_data: Element = None, p_tstamp: datetime = None, **p_kwargs)

Bases: TStamp

Instance class to store the current instance and the corresponding labels of the stream

Parameters:
  • p_feature_data (Element) – Feature data of the instance.

  • p_label_data (Element) – Optional label data of the instance.

  • p_tstamp (datetime) – Optional time stamp of the instance.

  • p_kwargs (dict) – Further optional named parameters.

C_TYPE = 'Instance'
get_id()
set_id(p_id: int)
get_feature_data() Element
set_feature_data(p_feature_data: Element)
get_label_data() Element
set_label_data(p_label_data: Element)
get_kwargs()
copy()
class mlpro.bf.streams.models.StreamShared(p_range: int = 2)

Bases: Shared

Template class for shared objects in the context of stream processing.

_instances

Dictionary of new/deleted instances per task. At the beginning of a cycle it contains the incoming instance of a stream. The dictionalry evolves due to the manipulations of the stream tasks.

Type:

dict

reset(p_inst_new: List[Instance])

Resets the shared object and prepares the processing of the given set of new instances.

Parameters:

p_inst_new (List[Instance]) – List of new instances to be processed.

get_instances(p_task_ids: list)

Provides the result instances of all given task ids.

Parameters:

p_task_ids (list) – List of task ids.

Returns:

  • inst_new (list) – List of new instances of all given task ids.

  • inst_del (list) – List of instances to be deleted of all given task ids.

set_instances(p_task_id, p_inst_new: List[Instance], p_inst_del: List[Instance])

Stores result instances of a task in the shared object.

Parameters:
  • p_task_id – Id of related task.

  • p_inst_new (list) – List of new instances.

  • p_inst_del (list) – List of instances to be deleted.

class mlpro.bf.streams.models.Sampler(p_num_instances: int = 0, **p_kwargs)

Bases: ScientificObject

Template class for data streams sampler. This object can be used in Stream.

Parameters:
  • p_num_instances (int) – number of instances.

  • p_kwargs (dict) – Further sampler specific parameters.

C_TYPE = 'Sampler'
reset()

A method to reset the sampler’s settings. Please redefine this method!

get_num_instances() int

A method to get the number of instances that is being processed by the sampler.

Returns:

Number of instances.

Return type:

int

set_num_instances(p_num_instances: int)

A method to set the number of instances that is going to be processed by the sampler.

Parameters:

p_num_instances (int) – Number of instances.

omit_instance(p_inst: Instance) bool

A method to filter any incoming instances.

Parameters:

p_inst (Instance) – An input instance to be filtered.

Returns:

False means the input instance is not omitted, otherwise True.

Return type:

bool

_omit_instance(p_inst: Instance) bool

A custom method to filter any incoming instances, which is being called by omit_instance() method. Please redefine this method!

Parameters:

p_inst (Instance) – An input instance to be filtered.

Returns:

False means the input instance is not omitted, otherwise True.

Return type:

bool

class mlpro.bf.streams.models.Stream(p_id=None, p_name: str = '', p_num_instances: int = 0, p_version: str = '', p_feature_space: MSpace = None, p_label_space: MSpace = None, p_sampler: Sampler = None, p_mode=0, p_logging=True, **p_kwargs)

Bases: Mode, Id, ScientificObject

Template class for data streams. Objects of this type can be used as iterators.

Parameters:
  • p_id – Optional id of the stream. Default = None.

  • p_name (str) – Optional name of the stream. Default = ‘’.

  • p_num_instances (int) – Optional number of instances in the stream. Default = 0.

  • p_version (str) – Optional version of the stream. Default = ‘’.

  • p_feature_space (MSpace) – Optional feature space. Default = None.

  • p_label_space (MSpace) – Optional label space. Default = None.

  • p_sampler – Optional sampler. Default: None.

  • p_mode – Operation mode. Default: Mode.C_MODE_SIM.

  • p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL.

  • p_kwargs (dict) – Further stream specific parameters.

C_TYPE = 'Stream'
get_name() str

Returns the name of the stream.

Returns:

stream_name – Name of the stream.

Return type:

str

get_url() str

Returns the URL of the scientific source/reference.

Returns:

url – URL of the scientific source/reference.

Return type:

str

get_num_instances() int

Returns the number of instances of the stream.

Returns:

num_inst – Number of instances of the stream. If 0 the number is unknown.

Return type:

int

get_feature_space() MSpace

Returns the feature space of the stream.

Returns:

feature_space – Feature space of the stream.

Return type:

MSpace

_setup_feature_space() MSpace

Custom method to set up the feature space of the stream. It is called by method get_feature_space().

Returns:

feature_space – Feature space of the stream.

Return type:

MSpace

get_label_space() MSpace

Returns the label space of the stream.

Returns:

label_space – Label space of the stream.

Return type:

MSpace

_setup_label_space() MSpace

Custom method to set up the label space of the stream. It is called by method get_label_space().

Returns:

label_space – Label space of the stream.

Return type:

MSpace

set_options(**p_kwargs)

Method to set specific options for the stream. The possible options depend on the stream provider and stream itself.

set_random_seed(p_seed=None)

Resets the internal random generator using the given seed.

_reset()

Custom reset method for data stream. See method __iter__() for more details.

setup_sampler() Sampler

A static method to set up a sampler, which allows to set a sampler after instantiation of a stream.

Returns:

An instantiated sampler.

Return type:

Sampler

_get_next() Instance

Custom method to determine the next data stream instance. At the end of the stream exception StopIteration is to be raised. See method __next__() for more details.

Returns:

instance – Next instance of data stream or None.

Return type:

Instance

class mlpro.bf.streams.models.StreamProvider(p_logging=True)

Bases: Log, ScientificObject

Template class for stream providers.

Parameters:

p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL

C_TYPE = 'Stream Provider'
get_stream_list(p_mode=0, p_logging=True, **p_kwargs) list

Gets a list of provided streams by calling custom method _get_stream_list().

Parameters:
  • p_mode – Operation mode. Default: Mode.C_MODE_SIM.

  • p_logging – Log level of stream objects (see constants of class Log). Default: Log.C_LOG_ALL.

  • p_kwargs (dict) – Further stream specific parameters.

Returns:

stream_list – List of provided streams.

Return type:

list

_get_stream_list(p_mode=0, p_logging=True, **p_kwargs) list

Custom method to get the list of provided streams. See method get_stream_list() for further details.

Parameters:
  • p_mode – Operation mode. Default: Mode.C_MODE_SIM.

  • p_logging – Log level of stream objects (see constants of class Log). Default: Log.C_LOG_ALL.

  • p_kwargs (dict) – Further stream specific parameters.

Returns:

stream_list – List of provided streams.

Return type:

list

get_stream(p_id: str = None, p_name: str = None, p_mode=0, p_logging=True, **p_kwargs) Stream

Returns stream with the specified id by calling custom method _get_stream().

Parameters:
  • p_id (str) – Optional Id of the requested stream. Default = None.

  • p_name (str) – Optional name of the requested stream. Default = None.

  • p_mode – Operation mode. Default: Mode.C_MODE_SIM.

  • p_logging – Log level of stream object (see constants of class Log). Default: Log.C_LOG_ALL.

  • p_kwargs (dict) – Further stream specific parameters.

Returns:

s – Stream object or None in case of an error.

Return type:

Stream

_get_stream(p_id: str = None, p_name: str = None, p_mode=0, p_logging=True, **p_kwargs) Stream

Custom method to get the specified stream. See method get_stream() for further details.

Parameters:
  • p_id (str) – Optional Id of the requested stream. Default = None.

  • p_name (str) – Optional name of the requested stream. Default = None.

  • p_mode – Operation mode. Default: Mode.C_MODE_SIM.

  • p_logging – Log level of stream object (see constants of class Log). Default: Log.C_LOG_ALL.

  • p_kwargs (dict) – Further stream specific parameters.

Returns:

s – Stream object or None in case of an error.

Return type:

Stream

class mlpro.bf.streams.models.StreamTask(p_name: str = None, p_range_max=1, p_duplicate_data: bool = False, p_visualize: bool = False, p_logging=True, **p_kwargs)

Bases: Task

Template class for stream-based tasks.

Parameters:
  • p_name (str) – Optional name of the task. Default is None.

  • p_range_max (int) – Maximum range of asynchonicity. See class Range. Default is Range.C_RANGE_PROCESS.

  • p_duplicate_data (bool) – If True, instances will be duplicated before processing. Default = False.

  • p_visualize (bool) – Boolean switch for visualisation. Default = False.

  • p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL

  • p_kwargs (dict) – Further optional named parameters.

C_TYPE = 'Stream-Task'
C_PLOT_ACTIVE: bool = True
C_PLOT_STANDALONE: bool = True
C_PLOT_VALID_VIEWS: list = ['2D', '3D', 'ND']
C_PLOT_DEFAULT_VIEW: str = 'ND'
C_PLOT_ND_XLABEL_INST = 'Instance index'
C_PLOT_ND_XLABEL_TIME = 'Time index'
C_PLOT_ND_YLABEL = 'Feature Data'
_get_custom_run_method()
run(p_range: int = None, p_wait: bool = False, p_inst_new: List[Instance] = None, p_inst_del: List[Instance] = None)

Executes the specific actions of the task implemented in custom method _run(). At the end event C_EVENT_FINISHED is raised to start subsequent actions.

Parameters:
  • p_range (int) – Optional deviating range of asynchonicity. See class Range. Default is None what means that the maximum range defined during instantiation is taken. Oterwise the minimum range of both is taken.

  • p_wait (bool) – If True, the method waits until all (a)synchronous tasks are finished.

  • p_inst_new (list) – Optional list of new stream instances to be processed. If None, the list of the shared object is used instead. Default = None.

  • p_inst_del (list) – List of obsolete stream instances to be removed. If None, the list of the shared object is used instead. Default = None.

_run_wrapper(p_inst_new: List[Instance], p_inst_del: List[Instance])

Internal use.

_run(p_inst_new: List[Instance], p_inst_del: List[Instance])

Custom method that is called by method run().

Parameters:
  • p_inst_new (set) – Set of new stream instances to be processed.

  • p_inst_del (set) – Set of obsolete stream instances to be removed.

init_plot(p_figure: Figure = None, p_plot_settings: PlotSettings = None)

Initializes the plot functionalities of the class.

Parameters:
  • p_figure (Matplotlib.figure.Figure, optional) – Optional MatPlotLib host figure, where the plot shall be embedded. The default is None.

  • p_plot_settings (PlotSettings) – Optional plot settings. If None, the default view is plotted (see attribute C_PLOT_DEFAULT_VIEW).

_init_plot_2d(p_figure: Figure, p_settings: PlotSettings)

Default implementation for stream tasks. See class mlpro.bf.plot.Plottable for more details.

_init_plot_3d(p_figure: Figure, p_settings: PlotSettings)

Default implementation for stream tasks. See class mlpro.bf.plot.Plottable for more details.

_init_plot_nd(p_figure: Figure, p_settings: PlotSettings)

Default implementation for stream tasks. See class mlpro.bf.plot.Plottable for more details.

_finalize_plot_view(p_inst_ref: Instance)
update_plot(p_inst_new: List[Instance] = None, p_inst_del: List[Instance] = None, **p_kwargs)

Specialized definition of method update_plot() of class mlpro.bf.plot.Plottable.

Parameters:
  • p_inst_new (List[Instance]) – List of new stream instances to be plotted.

  • p_inst_del (List[Instance]) – List of obsolete stream instances to be removed.

  • p_kwargs (dict) – Further optional plot parameters.

_update_plot_2d(p_settings: PlotSettings, p_inst_new: list, p_inst_del: list, **p_kwargs)

Default implementation for stream tasks. See class mlpro.bf.plot.Plottable for more details.

Parameters:
  • p_settings (PlotSettings) – Object with further plot settings.

  • p_inst_new (list) – List of new stream instances to be plotted.

  • p_inst_del (list) – List of obsolete stream instances to be removed.

  • p_kwargs (dict) – Further optional plot parameters.

_update_plot_3d(p_settings: PlotSettings, p_inst_new: list, p_inst_del: list, **p_kwargs)

Default implementation for stream tasks. See class mlpro.bf.plot.Plottable for more details.

Parameters:
  • p_settings (PlotSettings) – Object with further plot settings.

  • p_inst_new (list) – List of new stream instances to be plotted.

  • p_inst_del (list) – List of obsolete stream instances to be removed.

  • p_kwargs (dict) – Further optional plot parameters.

_update_plot_nd(p_settings: PlotSettings, p_inst_new: list, p_inst_del: list, **p_kwargs)

Default implementation for stream tasks. See class mlpro.bf.plot.Plottable for more details.

Parameters:
  • p_settings (PlotSettings) – Object with further plot settings.

  • p_inst_new (list) – List of new stream instances to be plotted.

  • p_inst_del (list) – List of obsolete stream instances to be removed.

  • p_kwargs (dict) – Further optional plot parameters.

class mlpro.bf.streams.models.StreamWorkflow(p_name: str = None, p_range_max=1, p_class_shared=<class 'mlpro.bf.streams.models.StreamShared'>, p_visualize: bool = False, p_logging=True, **p_kwargs)

Bases: StreamTask, Workflow

Workflow for stream processing. See class bf.mt.Workflow for further details.

Parameters:
  • p_name (str) – Optional name of the task. Default is None.

  • p_range_max (int) – Range of asynchonicity. See class Range. Default is Range.C_RANGE_THREAD.

  • p_class_shared – Optional class for a shared object (class StreamShared or a child class of StreamShared). Default = StreamShared

  • p_visualize (bool) – Boolean switch for visualisation. Default = False.

  • p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL

  • p_kwargs (dict) – Further optional named parameters handed over to every task within.

C_TYPE = 'Stream-Workflow'
C_PLOT_ACTIVE: bool = True
run(p_range: int = None, p_wait: bool = False, p_inst_new: list = None, p_inst_del: list = None)

Runs all stream tasks according to their predecessor relations.

Parameters:
  • p_range (int) – Optional deviating range of asynchonicity. See class Range. Default is None what means that the maximum range defined during instantiation is taken. Oterwise the minimum range of both is taken.

  • p_wait (bool) – If True, the method waits until all (a)synchronous tasks are finished.

  • p_inst_new (list) – Optional list of new stream instances to be processed. If None, the list of the shared object is used instead. Default = None.

  • p_inst_del (list) – List of obsolete stream instances to be removed. If None, the list of the shared object is used instead. Default = None.

init_plot(p_figure: Figure = None, p_plot_settings: PlotSettings = None)

Initializes the plot of a workflow. The method creates a host figure for all tasks if no external host figure is parameterized. The sub-plots of the tasks are autmatically arranged within the host figure.

See method init_plot() of class mlpro.bf.plot.Plottable for further details.

Parameters:
  • p_figure (Matplotlib.figure.Figure, optional) – Optional MatPlotLib host figure, where the plot shall be embedded. The default is None.

  • p_plot_settings (PlotSettings) – Optional plot settings. If None, the default view is plotted (see attribute C_PLOT_DEFAULT_VIEW).

_init_plot_2d(p_figure: Figure, p_settings: PlotSettings)

Default implementation for stream tasks. See class mlpro.bf.plot.Plottable for more details.

_init_plot_3d(p_figure: Figure, p_settings: PlotSettings)

Default implementation for stream tasks. See class mlpro.bf.plot.Plottable for more details.

_init_plot_nd(p_figure: Figure, p_settings: PlotSettings)

Default implementation for stream tasks. See class mlpro.bf.plot.Plottable for more details.

update_plot(p_inst_new: list = None, p_inst_del: list = None, **p_kwargs)

Specialized definition of method update_plot() of class mlpro.bf.plot.Plottable.

Parameters:
  • p_inst_new (list) – List of new stream instances to be plotted.

  • p_inst_del (list) – List of obsolete stream instances to be removed.

  • p_kwargs (dict) – Further optional plot parameters.

class mlpro.bf.streams.models.StreamScenario(p_mode, p_cycle_limit=0, p_visualize: bool = False, p_logging=True)

Bases: ScenarioBase

Template class for stream based scenarios.

Parameters:
  • p_mode – Operation mode. See Mode.C_VALID_MODES for valid values. Default = Mode.C_MODE_SIM.

  • p_cycle_limit (int) – Maximum number of cycles. Default = 0 (no limit).

  • p_visualize (bool) – Boolean switch for visualisation. Default = False.

  • p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL.

C_TYPE = 'Stream-Scenario'
C_PLOT_ACTIVE: bool = True
setup()

Specialized method to set up a stream scenario. It is automatically called by the constructor and calls in turn the custom method _setup().

_setup(p_mode, p_visualize: bool, p_logging)

Custom method to set up a stream scenario consisting of a stream and a processing stream workflow.

Parameters:
  • p_mode – Operation mode. See Mode.C_VALID_MODES for valid values. Default = Mode.C_MODE_SIM.

  • p_visualize (bool) – Boolean switch for visualisation.

  • p_logging – Log level (see constants of class Log). Default: Log.C_LOG_ALL.

Returns:

  • stream (Stream) – A stream object.

  • workflow (StreamWorkflow) – A stream workflow object.

_set_mode(p_mode)

Custom method to set the operation mode of components of the scenario. See method set_mode() for further details.

Parameter

p_mode

Operation mode. See class bf.ops.Mode for further details.

_reset(p_seed)

Custom method to reset the components of the scenario and to set the given random seed value. See method reset() for further details.

Parameters:

p_seed (int) – Seed value for internal random generator

get_latency() timedelta

Returns the latency of the scenario. To be implemented in child class.

_run_cycle()

Gets next instance from the stream and lets process it by the stream workflow.

Returns:

  • success (bool) – True on success. False otherwise.

  • error (bool) – True on error. False otherwise.

  • adapted (bool) – True, if something within the scenario has adapted something in this cycle. False otherwise.

  • end_of_data (bool) – True, if the end of the related data source has been reached. False otherwise.

_init_figure() Figure

Custom method to initialize a suitable standalone Matplotlib figure.

Returns:

figure – Matplotlib figure object to host the subplot(s)

Return type:

Matplotlib.figure.Figure

init_plot(p_figure: Figure = None, p_plot_settings: PlotSettings = None)

Initializes the plot functionalities of the class.

Parameters:
  • p_figure (Matplotlib.figure.Figure, optional) – Optional MatPlotLib host figure, where the plot shall be embedded. The default is None.

  • p_plot_settings (PlotSettings) – Optional plot settings. If None, the default view is plotted (see attribute C_PLOT_DEFAULT_VIEW).

update_plot(**p_kwargs)

Plot updates take place during workflow/task processing and are disabled here…

get_stream() Stream
get_workflow() StreamWorkflow