Pipelines

A pipeline is a container for a series of data processing steps. The pipeline object allows you to get an existing pipeline or create a new one with a given name. With this object, you can set input data sources, add transforms, feature generators, feature selectors, feature transforms and classifiers. An example pipeline is shown below.

Examples:

client.pipeline = 'my_pipeline'

client.pipeline.set_input_query('ExampleQuery')

client.pipeline.add_transform('Windowing')

# Feature Generation
client.pipeline.add_feature_generator(
    [
        {'subtype_call':'Time', 'params':{'sample_rate':100}},
        {'subtype_call':'Rate of Change'},
        {'subtype_call':'Statistical'},
        {'subtype_call':'Energy'},
        {'subtype_call':'Amplitude', 'params':{'smoothing_factor':9}}
    ],
        function_defaults={'columns':sensor_columns},
    )

# Scale to 8 bit representation for classification
client.pipeline.add_transform('Min Max Scale')

# Perform Feature Selection to remove highly correlated features, features
 with high variance and finally recursive feature eliminate.
# (Note: Recursive feature elimination can be very slow for large data sets and large number of parameters,
#        it is recommended to use other feature selection algorithms to first reduce the number of features)

client.pipeline.add_feature_selector(
    [
        {"name": "Correlation Threshold","params":{'threshold':0.85}},
        {"name": "Variance Threshold",
                            "params":{
                                'threshold':0.05
                                    }
        },
        {"name":"Recursive Feature Elimination",
                            "params":{
                                    "method":"Log R",
                                    "number_of_features":20
                                    }
        },
    ],
        params = {"number_of_features":20}
    )

client.pipeline.set_validation_method('Stratified K-Fold Cross-Validation',
                                        params={'number_of_folds':3})

client.pipeline.set_classifier('PME',
            params={'classification_mode':'RBF',
                    'distance_mode':'L1'
                    })

client.pipeline.set_training_algorithm('Hierarchical Clustering with Neuron Optimization',
                    params = {'number_of_neurons':10})

client.pipeline.set_tvo({'validation_seed':0})

results, stats = client.pipeline.execute()

The final step is where the pipeline is sent to the SensiML Cloud Engine for execution, once the job is completed the results will be returned to you as a model object if classification has occurred, or a DataFrame if you are at an intermediary step.

client.pipeline.execute()

Copyright 2017-2024 SensiML Corporation

This file is part of SensiML™ Piccolo AI™.

SensiML Piccolo AI is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

SensiML Piccolo AI is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License along with SensiML Piccolo AI. If not, see <https://www.gnu.org/licenses/>.

class sensiml.pipeline.Pipeline(client, name=None)

Base class of a pipeline object

add_augmentation(augmentation: list[dict], params: Optional[dict] = None, overwrite: bool = True)

Add augmentation set to the pipeline.

Parameters
  • augmentation (List) – List of dictionaries containing time series augmentations

  • params (dict, {}) – Parameters of the augmentation set.

Examples

>>> client.pipeline.add_augmentation([{"name": "Add Noise", "params": {
>>>                                                                 "background_scale_range": [100, 1000],
>>>                                                                 "target_labels": ['idle','walking'],
>>>                                                                 "target_sensors": ['AccelerometerX', 'GyroscopeZ'],
>>>                                                                 "fraction": 1.2,
>>>                                                                 "noise_types": ["pink", "white"],
>>>                                                                 "filter": {"Set": ["Train"]}
>>>                                                                 }
>>>                                   }])
add_feature_generator(feature_generators, params: Optional[dict] = None, function_defaults: Optional[dict] = None, return_generator_set: bool = False)

Add a feature generator set to the pipeline.

Parameters
  • feature_generators (list) – List of feature generators. As names or dictionaries.

  • params (dict, {}}) – Parameters to apply to the feature generator set.

  • function_defaults (dict,{}}) – Parameters to apply to all individual feature generators.

Examples

>>> # Add a single feature generator
>>> client.pipeline.add_feature_generator([{'name': 'Downsample', 'params': {'new_length': 5}}, {'name': 'Mean'}],
                                       function_defaults = {'columns': sensor_columns})
>>> # Call multiple functions by name when they use the same same function defaults
>>> client.pipeline.add_feature_generator(['Mean', 'Standard Deviation', 'Skewness', 'Kurtosis', '25th Percentile',
                                       '75th Percentile', '100th Percentile', 'Zero Crossing Rate'],
                                       function_defaults = {'columns': sensor_columns})
>>> # Call multiple functions using function defaults
>>> client.pipeline.add_feature_generator([{'name': 'Downsample', 'params': {'new_length': 5}}, {'name': 'Mean'}],
                                        function_defaults = {'columns': sensor_columns})
>>> # Call multiple functions by subtype which use different parameters; note all subtypes will take the same inputs
>>> client.pipeline.add_feature_generator([{'subtype_call': 'Area', 'params': {'sample_rate': 100, 'smoothing_factor': 9}},
                                        {'subtype_call': 'Time', 'params': {'sample_rate': 100}},
                                        {'subtype_call': 'Rate of Change'},
                                        {'subtype_call': 'Statistical'},
                                        {'subtype_call': 'Energy'},
                                        {'subtype_call': 'Amplitude', 'params': {'smoothing_factor': 9}},
                                        {'subtype_call': 'Physical', 'params': {'sample_rate': 100}}
                                        ],
                                        function_defaults={'columns': sensor_columns},
                                       )
>>> # Mix subtype and specify additional feature generators
>>> client.pipeline.add_feature_generator([{'subtype_call': 'Statistical'},
                                        {'name': 'Downsample', 'params': {'new_length': 5}},
                                        ],
                                        function_defaults={'columns': sensor_columns},
                                        )
>>> # Call the same feature generators multiple times with different parameters
>>> client.pipeline.add_feature_generator([{'name': 'Downsample', 'params': {'new_length': 5, 'columns': sensor_columns[0]}},
                                        {'name': 'Downsample', 'params': {'new_length': 12}},
                                        ],
                                        function_defaults={'columns': sensor_columns},
                                        )
add_feature_selector(feature_selectors: list[dict], params: Optional[dict] = None)

Add a feature selection set to the pipeline.

Parameters
  • feature_selectors (List) – List of dictionaries containing feature selectors

  • params (dict, {}) – Parameters of the feature selector set.

Examples

>>> client.pipeline.add_feature_selector([{"name":"Recursive Feature Elimination", "params":{"method":"Log R"}}],
>>>                                    params = {"number_of_features":20})
add_linear_step(func)

Add a step to the pipeline. Automatically tie the previous step and current step.

Parameters

func (function) – A sensiml function method call

add_segmenter(name: str, params: Optional[dict] = None)

Add a Segmenter to the pipeline.

Parameters
  • name (str) – Name of the segmenter method to add.

  • params (dict, optional) – Dictionary containing the parameters of the transform.

add_transform(name: str, params: Optional[dict] = None, overwrite: bool = True)

Add a Transform to the pipeline.

Parameters
  • name (str) – Name of the transform method to add.

  • params (dict, optional) – Dictionary containing the parameters of the transform.

  • overwrite (boolean) – when adding multiple instances of the same transform, set overwrite to False for the additional steps and the first instance will not be overwritten (default is True)

auto(auto_params: dict, run_parallel: bool = True, lock: bool = True, silent: bool = True, renderer: Optional[Callable] = None, version: int = 2) Tuple[DataFrame, dict]

Execute automated pipeline asynchronously.

The automated pipeline is used to find optimal parameters with a genetic algorithm. The genetic algorithm starts with a user-defined randomized population (pipelines) and generates models from them and tests them, keeps a subset of high-performing combinations, and then recombines those “survivors” and repeats the process over again. The offspring of good parameter combinations are usually also good and sometimes are significantly better than their parents. As the algorithm iterates each successive generation, it often finds a near-optimal model without trying as many configurations. The algorithm terminates when the desired number of iteration is completed.

The created pipelines are run in parallel in SensiML servers and results are ranked by the fitness score which takes into account the model’s F1 score, precision, sensitivity, and other metrics. The automation options and definition of the performance metrics are given below.

Parameters
  • params (dict) –

    Parameters of the genetic algorithm to optimize the pipelines. Definition and options of the parameters are given below. - search_steps, (list, [‘selectorset’, ‘tvo’]): it is used to define which libraries in the pipelines will be optimized.

    • population_size, (int, 10): Initial number of randomly created pipelines.

    • iterations, (int, 1): Repetition number of optimization process

    • mutation_rate, (float, 0.1): Random changes from the previous population.

    • recreation_rate, (float, 0.1): Rate of randomly created pipelines for next generation.

    • survivor_rate, (float, 0.5): Ratio of the population that will be transferred to next generation.

    • number_of_models_to_return, (int, 5): Number of pipelines that will return to user.

    • allow_unknown, (bool, False): Allows creating unknown prediction results for the vectors. A vector is classified as an unknown if it

      cannot be recognized by any neurons.

    • fitness (dict): Fitness parameters are combination of statistical and cost variables. It is used to evaluate the performance of the models found by the algorithm. The user will define the coefficient scores for the parameters to define the priority of the parameters in the fitness score. Definition and options of the parameters are given below.

      • statistical variables:
        • accuracy: The degree of correctness of all vectors

        • f1_score: Measures of the test’s accuracy

        • precision: Proportion of positive identifications that is actually correct

        • sensitivity: Measures of the proportion of actual positives that are correctly identified

        • specificity: Measures of the proportion of actual negatives that are correctly identified

        • positive_predictive_rate: Ratio of “true positive” is the event that the test makes a positive prediction

      • cost variables:
        • neurons: Number of neurons that used in the model

        • features: Number of features that used in the model,

  • lock (bool, False) – Ping for results every 30 seconds until the process finishes.

  • silent (bool, True) – Silence status updates.

Example

activity_data = client.datasets.load_activity_raw()
client.upload_dataframe('activity_data.csv', activity_data, force=True)

client.pipeline.reset()
client.pipeline.set_input_data('activity_data.csv',
                            data_columns = ['accelx', 'accely', 'accelz', 'gyrox', 'gyroy', 'gyroz'],
                            group_columns = ['Subject','Class', 'Rep'],
                            label_column = 'Class')

client.pipeline.add_transform("Windowing", params={"window_size": 100, "delta": 100 })

results, summary = client.pipeline.auto({ 'params':{"search_steps": ['selectorset', 'tvo'],
                                                "population_size": 10,
                                                "iterations": 1,
                                                "mutation_rate": 0.1,
                                                "recreation_rate": 0.1,
                                                "survivor_rate": 0.5,
                                                "number_of_models_to_return": 5,
                                                "run_parallel": True,
                                                "allow_unknown": False,
                                                "fitness": {'accuracy': 1.0,
                                                            'f1_score': 0.0,
                                                            'features': 0.3,
                                                            'precision': 0.0,
                                                            'sensitivity': 0.7,
                                                            'specificity': 0.0,
                                                            'positive_predictive_rate': 0.0},

                                                }})

results.summarize()

Execute auto segment search pipeline asynchronously.

Parameters
  • params (dict) – Automation parameters for segment search.

  • run_parallel (bool, True) – Run in parallel in KB cloud.

  • lock (bool, False) – Ping for results every 30 seconds until the process finishes.

  • silent (bool, True) – Silence status updates.

clear_cache() Response

Deletes the cache on KB cloud for this pipeline.

data(pipeline_step: int, page_index: int = 0) DataFrame

Retrieves results from a specific pipeline step in the pipeline from stored values in kbcloud after execution has been performed.

Parameters
  • pipeline_step (int) – Pipeline step to retrieve results from.

  • page_index (int) – Index of data to get.

Returns

A ModelResultSet if the selected pipeline step is TVO step, otherwise the output of the pipeline step is returned as a DataFrame.

delete_sandbox() Response

Clears the local pipeline steps, and delete the sandbox from the KB cloud.

describe(show_params: bool = True, show_set_params: bool = False)

Prints out a description of the pipeline steps and parameters

Parameters

show_params (bool, True) – Include the parameters in the pipeline description

execute(wait_time: int = 15, silent: bool = True, describe: bool = True, **kwargs) Tuple[Union[DataFrame, ModelResultSet], dict]

Execute pipeline asynchronously and check for results.

Parameters
  • wait_time (int, 10) – Time to wait in between status checks.

  • silent (bool, True) – Silence status updates.

features_to_tensor(feature_vectors, validate=0.1, test=0.1, label=None, feature_columns=None, class_map=None, dtype=<class 'numpy.int32'>, int8_input_type=True, shape=None, verbose=True)

Converts a SensiML feature vector Dataframe into a train, test, validate set of Numpy arrays to use in training a tensorflow model

Parameters
  • feature_vectors (DataFrame) – DataFrame of feature vectors returned from SensiML Pipeline

  • validate (float, optional) – percentage of data to use in validation set. Defaults to 0.1.

  • test (float, optional) – percentage of data to use in test set. Defaults to 0.1.

  • label (str, optional) – the label column for the feature vectors. Defaults to None.

  • feature_columns (list, optional) – the coluns of the feature vector DataFrame containing features. Defaults to None.

  • class_map (dict, optional) – a class map describing the mapping of labels to int values. Defaults to None.

  • dtype (_type_, optional) – the dtype of the tensorflow object to create. Defaults to np.int32.

  • int8_input_type (bool, optional) – Scales the SensiML Features to int8 from uint8. Defaults to True.

  • shape (tuple, optional) – Reshape the dataframe before loading it into the numpy array. Defaults to None.

Returns

A tuple containing numpy arrays for traning a TensorFlow model along with the class map x_train, x_test, x_validate, y_train, y_test, y_validate, class_map

get_automl_iteration_metrics() DataFrame

Results of automl stats for each iteration

Returns

AutoML results stats

Return type

DataFrame

get_function_type(name: str) str
Returns

The type of a function.

Return type

str

get_knowledgepack(uuid: str) KnowledgePack

retrieve knowledgepack by uuid from the server

Parameters

uuid (str) – unique identifier for knowledgepack

Returns

a knowledgepack object

Return type

TYPE

get_pipeline_length() int
Returns

The current length of the pipeline.

Return type

int

get_results(lock: bool = False, wait_time: int = 15, silent: bool = False, page_index: int = 0, renderer=None, **kwargs) Tuple[DataFrame, dict]

Retrieve status, results from the kb cloud for the current pipeline.

Parameters
  • run. (results to retrieve. The default is the last type that was) –

  • lock (bool, False) – This will lock the process and continuously ping the KB cloud

  • process. (for the status of the pipeline) –

  • wait_time (int, 30) – The time to wait between individual status checks.

  • silent (bool, False) – This will silence updates to every 4th update check.

  • page_index (int, 0) – The page desired if the result is multi-paged (1-based)

Returns

This is the result of the last executed pipeline step. stats (dictionary): A dictionary containing the execution summary, features and other summary statistics

Return type

results (DataFrame or model result)

Grid search is a parameter optimization method that exhaustively searches over a gridded parameter space. Grid search returns will return the score each parameter combination for f1, precision and sensitivity so that you can choose the best performing combination to build a knowledge pack with.

Parameters
  • grid_params (dict) – Grid search parameters.

  • run_parallel (bool, True) – Run grid search in parallel in KB cloud.

  • lock (bool, False) – Ping for results every 30 seconds until the process finishes.

grid_params is a nested python dictionary object.

grid_params = {“Name Of Function”:{“Name of Parameter”:[ A, B, C]}}

Where A, B and C are the parameters to search over. Additionally, for each step you may want to search over more than one of a functions configurable parameters. To do this simply add another element to the functions dictionary.

grid_params = {“Name Of Function”:{“Name of Parameter 1”:[ A, B, C], “Name of Parameter 2”:[ D, E]}}

This will tell grid search to search over 6 different parameter spaces.

You can also specify more than one step to search over in grid params. This is done by adding another element to the function level of the grid_params dictionary.

grid_params = {“Name Of Function”:{“Name of Parameter 1”:[ A, B, C], “Name of Parameter 2”:[ D, E]}, “Name of Function 2”:{“Name of Parameter”:[1, 2, 3, 4, 5, 6]}}

Example

grid_params = {‘Windowing’:{“window_size”: [100,200],’delta’:[100]}, ‘selector_set’: {“Recursive Feature Elimination”:{‘number_of_features’:[10, 20]}}, ‘Hierarchical Clustering with Neuron Optimization’: {‘number_of_neurons’:[10,20]}}

results, stats = client.pipeline.grid_search(grid_params)

iterate_columns(fg, n_columns=None, columns=None)

Builds Multiple Feature generators by iterating over the input columns

Parameters
  • fg (dict) – Single input feature generator folowing the standard format

  • n_columns (int) – The number of columns to return as a combination.

  • columns (list) – if None, will use the columns in the input feature generator, otherwise will use this list provided

Example
>>> fg =  {'name':'Downsample with Min Max Scaling', 'params':{"columns": ['gyrZ','gyrX'] , "new_length": 15}}
>>> fg_new = client.pipeline.iterate_columns(fg, n_columns=1)
>>> print(fg_new)

Output: [{‘name’: ‘Downsample with Min Max Scaling’, ‘params’: {‘columns’: [‘gyrX’], ‘new_length’: 15}}, {‘name’: ‘Downsample with Min Max Scaling’, ‘params’: {‘columns’: [‘gyrX’], ‘new_length’: 15}}]

list_knowledgepacks() DataFrame

Lists all of the projects on kb cloud associated with current pipeline

Returns

projects on kb cloud

Return type

DataFrame

rehydrate(model: Optional[KnowledgePack] = None, replace: Optional[bool] = True, kp_summary: Optional[list] = False, uuid: Optional[str] = None)

Replace the executing cell with pipeline code for either a model or pipeline.

Parameters
  • model (model, knowledgepack, None) – pass in a model to build a pipeline from that

  • replace (boolean, True) – replace the executing cell with pipeline code

rehydrate_knowledgepack(model: Optional[KnowledgePack] = None, uuid: Optional[str] = None, replace: bool = True)

Replace the executing cell with pipeline code for a knowledge Pack

Parameters

model (model, knowledgepack, None) – pass in a model to build a pipeline from that

rehydrate_pipeline(model: Optional[KnowledgePack] = None, uuid: Optional[str] = None, replace: bool = True)

Replace the executing cell with pipeline code for the current pipeline or pipeline that generated the model

Parameters

model (model, knowledgepack, None) – pass in a model to build a pipeline from that

reset(delete_cache: bool = False, update: bool = False)

Reset the current pipeline steps.

Parameters
  • delete_cache (bool, False) – Delete the cache from KB cloud.

  • update (bool, False) – Push cleared pipeline to the server

set_classifier(name: str, params: Optional[dict] = None)

Classification method for the TVO step to use.

Parameters
  • name (str) – Name of the classification method.

  • params (dict, optional) – Parameters of the classification method.

set_columns(data_columns: Optional[list[str]] = None, group_columns: Optional[list[str]] = None, label_column: Optional[str] = None)
Sets the columns for group_columns, data_columns and the label column

to be used in the pipeline. This will automatically handle label column, ignore columns, group columns and passthrough columns for the majority of pipelines. For pipelines that need individually specified column attributes, set them in the step command.

Parameters
  • data_columns (None, list) – List of sensor data streams to use.

  • group_columns (None, list) – List of columns to use when applying aggregate functions and defining unique subsets on which to operate.

  • label_column (None, str) – The column name containing the ground truth label.

set_input_capture(names: list[str], group_columns: Optional[list[str]] = None)

Use a data file that has been uploaded as your data source.

Parameters

name (str,list) – single capture or list of captures file names to use in SensiML cloud.

set_input_data(name: str, data_columns: Optional[list[str]] = None, group_columns: Optional[list[str]] = None, label_column: Optional[str] = None)

Use a data file that has been uploaded as your data source.

Parameters
  • name (str, list) – The name of the data file or list of datafiles in SensiML cloud.

  • data_columns (list, required) – Array of data streams to use in model building.

  • group_columns (list, required) – The List of columns to use when applying aggregate functions and defining unique subsets on which to operate.

  • label_column (str, required) – The column with the true classification.

set_input_features(name: str, feature_columns: Optional[list[str]] = None, group_columns: Optional[list[str]] = None, label_column: Optional[str] = None)

Use a feature file as input to the pipeline (features have already been generated in this case)

Parameters
  • name (str, list) – The name of the data file or list of feature files in SensiML cloud.

  • feature_columns (list, required) – Array of data streams to use in model building.

  • group_columns (list, required) – The List of columns to use when applying aggregate functions and defining unique subsets on which to operate.

  • label_column (str, required) – The column with the true classification.

set_input_query(name: str, use_session_preprocessor: bool = True)

Set the input data to be a stored query.

Parameters
  • name (str) – The name of the saved query.

  • use_session_preprocessor (bool) – Use the autosegmentation algorithms for this pipeline (if there is one). When this is true, the segmentation algorithm will not show up in the pipeline but will be included in the firmware generation.

set_regression(name: str, params: Optional[dict] = None)

Regression method for the TVO step to use.

Parameters
  • name (str) – Name of the classification method.

  • params (dict, optional) – Parameters of the classification method.

set_training_algorithm(name: str, params: Optional[dict] = None)

Training algorithm for the TVO step to use.

Parameters
  • name (str) – Name of the training algorithm.

  • params (dict, optional) – Parameters of the training algorithm.

set_tvo(params: Optional[dict] = None)

Description of the train, validate optimize step, which consists of a training algorithm, validation method and classifier.

Parameters

params (dict, optional) – Parameters of the TVO step.

Example

>>> client.pipeline.set_validation_method('Stratified K-Fold Cross-Validation', params={'number_of_folds':3})
>>> client.pipeline.set_classifier('PME', params={"classification_mode":'RBF','distance_mode':'L1'})
>>> client.pipeline.set_training_algorithm('Hierarchical Clustering with Neuron Optimization', params = {'number_of_neurons':10})
>>> client.pipeline.set_tvo({'label_column':'Label', 'ignore_columns': ['Subject', 'Rep']})
set_validation_method(name: str, params: Optional[dict] = None)

Set the validation method for the tvo step.

Parameters
  • name (str) – Name of the validation method to use.

  • params (dict, optional) – Parameters for the validation method.

stop_pipeline() Response

Kills a pipeline that is running on KB cloud.

submit() bool

Submit a pipeline asynchronously to SensiML Cloud for execution.

to_list() list[dict]

Converts the current pipeline into its List form

Return type

List

visualize_features(feature_vector: DataFrame, label_column: Optional[str] = None)

Makes a plot of feature vectors by class to aid in understanding your model

Parameters

feature_vector (DataFrame) – Dataframe containing feature vectors and label column

visualize_neuron_array(model, feature_vector, featureX=None, featureY=None, neuron_alpha=0.2)

Makes a plot of feature vectors by class to aid in understanding your model

Parameters
  • model (model/knowledgepack) – The model or knowledgepack to use for plotting the neurons

  • feature_vector (DataFrame) – Dataframe containing feature vectors and label column

  • featureX (str) – The name of the feature for the x axis

  • featureY (str) – The name of the feature for the y axis