Pipelines

A pipeline is a container for a series of data processing steps. The pipeline object allows you to get an existing pipeline or create a new one with a given name. With this object, you can set input data sources, add transforms, feature generators, feature selectors, feature transforms and classifiers. An example pipeline is shown below.

Examples:

dsk.pipeline = 'my_pipeline'

dsk.pipeline.set_input_query('ExampleQuery')

dsk.pipeline.add_transform('Windowing')

# Feature Generation
dsk.pipeline.add_feature_generator(
    [
        {'subtype_call':'Time', 'params':{'sample_rate':100}},
        {'subtype_call':'Rate of Change'},
        {'subtype_call':'Statistical'},
        {'subtype_call':'Energy'},
        {'subtype_call':'Amplitude', 'params':{'smoothing_factor':9}}
    ],
        function_defaults={'columns':sensor_columns},
    )

# Scale to 8 bit representation for classification
dsk.pipeline.add_transform('Min Max Scale')

# Perform Feature Selection to remove highly correlated features, features
 with high variance and finally recursive feature eliminate.
# (Note: Recursive feature elimination can be very slow for large data sets and large number of parameters,
#        it is recommended to use other feature selection algorithms to first reduce the number of features)

dsk.pipeline.add_feature_selector(
    [
        {"name": "Correlation Threshold","params":{'threshold':0.85}},
        {"name": "Variance Threshold",
                            "params":{
                                'threshold':0.05
                                    }
        },
        {"name":"Recursive Feature Elimination",
                            "params":{
                                    "method":"Log R",
                                    "number_of_features":20
                                    }
        },
    ],
        params = {"number_of_features":20}
    )

dsk.pipeline.set_validation_method('Stratified K-Fold Cross-Validation',
                                        params={'number_of_folds':3})

dsk.pipeline.set_classifier('PME',
            params={'classification_mode':'RBF',
                    'distance_mode':'L1'
                    })

dsk.pipeline.set_training_algorithm('Hierarchical Clustering with Neuron Optimization',
                    params = {'number_of_neurons':10})

dsk.pipeline.set_tvo({'validation_seed':0})

results, stats = dsk.pipeline.execute()

The final step is where the pipeline is sent to the SensiML Cloud Engine for execution, once the job is completed the results will be returned to you as a model object if classification has occurred, or a DataFrame if you are at an intermediary step.

dsk.pipeline.execute()

class sensiml.pipeline.Pipeline(kb, name=None)

Base class of a pipeline object

add_feature_generator(feature_generators, params={}, function_defaults={}, return_generator_set=False)

Add a feature generator set to the pipeline.

Parameters
  • feature_generators (list) – List of feature generators. As names or dictionaries.

  • params (dict, {}}) – Parameters to apply to the feature generator set.

  • function_defaults (dict,{}}) – Parameters to apply to all individual feature generators.

Examples

>>> # Add a single feature generator
>>> dsk.pipeline.add_feature_generator([{'name': 'Downsample', 'params': {'new_length': 5}}, {'name': 'Mean'}],
                                       function_defaults = {'columns': sensor_columns})
>>> # Call multiple functions by name when they use the same same function defaults
>>> dsk.pipeline.add_feature_generator(['Mean', 'Standard Deviation', 'Skewness', 'Kurtosis', '25th Percentile',
                                       '75th Percentile', '100th Percentile', 'Zero Crossing Rate'],
                                       function_defaults = {'columns': sensor_columns})
>>> # Call multiple functions using function defaults
>>> dsk.pipeline.add_feature_generator([{'name': 'Downsample', 'params': {'new_length': 5}}, {'name': 'Mean'}],
                                        function_defaults = {'columns': sensor_columns})
>>> # Call multiple functions by subtype which use different parameters; note all subtypes will take the same inputs
>>> dsk.pipeline.add_feature_generator([{'subtype_call': 'Area', 'params': {'sample_rate': 100, 'smoothing_factor': 9}},
                                        {'subtype_call': 'Time', 'params': {'sample_rate': 100}},
                                        {'subtype_call': 'Rate of Change'},
                                        {'subtype_call': 'Statistical'},
                                        {'subtype_call': 'Energy'},
                                        {'subtype_call': 'Amplitude', 'params': {'smoothing_factor': 9}},
                                        {'subtype_call': 'Physical', 'params': {'sample_rate': 100}}
                                        ],
                                        function_defaults={'columns': sensor_columns},
                                       )
>>> # Mix subtype and specify additional feature generators
>>> dsk.pipeline.add_feature_generator([{'subtype_call': 'Statistical'},
                                        {'name': 'Downsample', 'params': {'new_length': 5}},
                                        ],
                                        function_defaults={'columns': sensor_columns},
                                        )
>>> # Call the same feature generators multiple times with different parameters
>>> dsk.pipeline.add_feature_generator([{'name': 'Downsample', 'params': {'new_length': 5, 'columns': sensor_columns[0]}},
                                        {'name': 'Downsample', 'params': {'new_length': 12}},
                                        ],
                                        function_defaults={'columns': sensor_columns},
                                        )
add_feature_selector(feature_selectors, params={})

Add a feature selection set to the pipeline.

Parameters
  • feature_selectors (List) – List of dictionaries containing feature selectors

  • params (dict, {}) – Parameters of the feature selector set.

Examples

>>> dsk.pipeline.add_feature_selector([{"name":"Recursive Feature Elimination", "params":{"method":"Log R"}}],
>>>                                    params = {"number_of_features":20})
add_linear_step(func)

Add a step to the pipeline. Automatically tie the previous step and current step.

Parameters

func (function) – A sensiml function method call

add_segmenter(name, params={})

Add a Segmenter to the pipeline.

Parameters
  • name (str) – Name of the segmenter method to add.

  • params (dict, optional) – Dictionary containing the parameters of the transform.

add_transform(name, params={}, overwrite=True)

Add a Transform to the pipeline.

Parameters
  • name (str) – Name of the transform method to add.

  • params (dict, optional) – Dictionary containing the parameters of the transform.

  • overwrite (boolean) – when adding multiple instances of the same transform, set overwrite to False for the additional steps and the first instance will not be overwritten (default is True)

auto(auto_params, run_parallel=True, lock=True, silent=True, renderer=None)

Execute automated pipeline asynchronously.

The automated pipeline is used to find optimal parameters with a genetic algorithm. The genetic algorithm starts with a user-defined randomized population (pipelines) and generates models from them and tests them, keeps a subset of high-performing combinations, and then recombines those “survivors” and repeats the process over again. The offspring of good parameter combinations are usually also good and sometimes are significantly better than their parents. As the algorithm iterates each successive generation, it often finds a near-optimal model without trying as many configurations. The algorithm terminates when the desired number of iteration is completed.

The created pipelines are run in parallel in SensiML servers and results are ranked by the fitness score which takes into account the model’s F1 score, precision, sensitivity, and other metrics. The automation options and definition of the performance metrics are given below.

Parameters
  • seed (str) –

    Pipeline templates that focus on different feature libraries. Options and definitions are given below. Basic Features: Generates a set of all-purpose, high-performance features using statistical, energy, and rate of change feature

    generators. The seed then performs feature selection and model generation algorithms with a genetic algorithm to optimize pipeline parameters.

    Advanced Features: Generates a comprehensive set of features using statistical, energy, amplitude, shape, time, and rate of

    change feature generators. The seed then performs feature selection and model generation algorithms with a genetic algorithm to optimize pipeline parameters.

    Histogram_Features: Generates a set of histogram features and then performs feature selection and model generation algorithms

    with a genetic algorithm to optimize pipeline parameters.

    Downsampled_Features: Generates a set of downsampled features and then performs feature selection and model generation algorithms

    with a genetic algorithm to optimize pipeline parameters.

    Custom: Uses the user-defined pipeline to extract features and then searches for optimal parameters for the feature selection step,

    if defined, and the model generation step (required) using a genetic algorithm.

  • params (dict) –

    Parameters of the genetic algorithm to optimize the pipelines. Definition and options of the parameters are given below. search_steps, (list, [‘selectorset’, ‘tvo’]): it is used to define which libraries in the pipelines will be optimized. population_size, (int, 10): Initial number of randomly created pipelines. iterations, (int, 1): Repetition number of optimization process mutation_rate, (float, 0.1): Random changes from the previous population. recreation_rate, (float, 0.1): Rate of randomly created pipelines for next generation. survivor_rate, (float, 0.5): Ratio of the population that will be transferred to next generation. number_of_models_to_return, (int, 5): Number of pipeline that will return to user. allow_unknown, (bool, False): Allows creating unknown prediction results for the vectors. A vector is classified as an unknown if it

    cannot be recognized by any neurons.

    demean_segments, (bool, False): Removes the mean from the input data before extracting features. validation_method, (Stratified Shuffle Split): Validation method that will be used in optimization balance_data, (bool, False): Use Undersampling of the Majority classes to balance the data prior to Model Building outlier_filter, (bool, False): Filter outliers using Isolation Forest Filter with a filter percent of 5. fitness (dict): Fitness parameters are combination of statistical and cost variables. It is used to evaluate the performance of

    the models found by the algorithm. The user will define the coefficient scores for the parameters to define the priority of the parameters in the fitness score. Definition and options of the parameters are given below.

    statistical variables:

    accuracy: The degree of correctness of all vectors f1_score: Measures of the test’s accuracy precision: Proportion of positive identifications that is actually correct sensitivity: Measures of the proportion of actual positives that are correctly identified specificity: Measures of the proportion of actual negatives that are correctly identified positive_predictive_rate: Ratio of “true positive” is the event that the test makes a positive prediction

    cost variables:

    neurons: Number of neurons that used in the model features: Number of features that used in the model,

  • lock (bool, False) – Ping for results every 30 seconds until the process finishes.

  • silent (bool, True) – Silence status updates.

Example

activity_data = dsk.datasets.load_activity_raw() dsk.upload_dataframe(‘activity_data.csv’, activity_data, force=True)

dsk.pipeline.reset() dsk.pipeline.set_input_data(‘activity_data.csv’,

data_columns = [‘accelx’, ‘accely’, ‘accelz’, ‘gyrox’, ‘gyroy’, ‘gyroz’], group_columns = [‘Subject’,’Class’, ‘Rep’], label_column = ‘Class’)

dsk.pipeline.add_transform(“Windowing”, params={“window_size”: 100, “delta”: 100 })

results, summary = dsk.pipeline.auto({‘seed’: ‘Basic Features’,
‘params’:{“search_steps”: [‘selectorset’, ‘tvo’],

“population_size”: 10, “iterations”: 1, “mutation_rate”: 0.1, “recreation_rate”: 0.1, “survivor_rate”: 0.5, “number_of_models_to_return”: 5, “run_parallel”: True, “allow_unknown”: False, “validation_method”: “‘Stratified Shuffle Split’”, “fitness”: {‘neurons’: 0.5,

‘accuracy’: 1.0, ‘f1_score’: 0.0, ‘features’: 0.3, ‘precision’: 0.0, ‘sensitivity’: 0.7, ‘specificity’: 0.0, ‘positive_predictive_rate’: 0.0},

}})

results.summarize()

Execute auto segment search pipeline asynchronously.

Parameters
  • params (dict) – Automation parameters for segment search.

  • run_parallel (bool, True) – Run in parallel in KB cloud.

  • lock (bool, False) – Ping for results every 30 seconds until the process finishes.

  • silent (bool, True) – Silence status updates.

clear_cache()

Deletes the cache on KB cloud for this pipeline.

data(pipeline_step, page_index=0)

Retrieves results from a specific pipeline step in the pipeline from stored values in kbcloud after execution has been performed.

Parameters
  • pipeline_step (int) – Pipeline step to retrieve results from.

  • page_index (int) – Index of data to get.

Returns

A ModelResultSet if the selected pipeline step is TVO step, otherwise the output of the pipeline step is returned as a DataFrame.

delete_sandbox()

Clears the local pipeline steps, and delete the sandbox from the KB cloud.

describe(show_params=True, show_set_params=False)

Prints out a description of the pipeline steps and parameters

Parameters

show_params (bool, True) – Include the parameters in the pipeline description

execute(wait_time=15, silent=True, describe=True, **kwargs)

Execute pipeline asynchronously and check for results.

Parameters
  • wait_time (int, 10) – Time to wait in between status checks.

  • silent (bool, True) – Silence status updates.

get_function_type(name)
Returns

The type of a function.

Return type

str

get_knowledgepack(uuid)

retrieve knowledgepack by uuid from the server

Parameters

uuid (str) – unique identifier for knowledgepack

Returns

a knowledgepack object

Return type

TYPE

get_pipeline_length()
Returns

The current length of the pipeline.

Return type

int

get_results(lock=False, wait_time=15, silent=False, page_index=0, renderer=None, **kwargs)

Retrieve status, results from the kb cloud for the current pipeline.

Parameters
  • to retrieve. The default is the last type that was run. (results) –

  • lock (bool, False) – This will lock the process and continuously ping the KB cloud

  • the status of the pipeline process. (for) –

  • wait_time (int, 30) – The time to wait between individual status checks.

  • silent (bool, False) – This will silence updates to every 4th update check.

  • page_index (int, 0) – The page desired if the result is multi-paged (1-based)

Returns

This is the result of the last executed pipeline step. stats (dictionary): A dictionary containing the execution summary, features and other summary statistics

Return type

results (DataFrame or model result)

Grid search is a parameter optimization method that exhaustivley searches over a gridded parameter space. Grid search returns will return the score each parameter combination for f1, precision and sensitivity so that you can choose the best performing combination to build a knowledge pack with.

Parameters
  • grid_params (dict) – Grid search parameters.

  • run_parallel (bool, True) – Run grid search in parallel in KB cloud.

  • lock (bool, False) – Ping for results every 30 seconds until the process finishes.

grid_params is a nested python dictionary object.

grid_params = {“Name Of Function”:{“Name of Parameter”:[ A, B, C]}}

Where A, B and C are the parameters to search over. Additionally, for each step you may want to search over more than one of a functions configurable parameters. To do this simply add another element to the functions dictionary.

grid_params = {“Name Of Function”:{“Name of Parameter 1”:[ A, B, C],

“Name of Parameter 2”:[ D, E]}}

This will tell grid search to search over 6 different parameter spaces.

You can also specify more than one step to search over in grid params. This is done by adding another element to the function level of the grid_params dictionary.

grid_params = {“Name Of Function”:{“Name of Parameter 1”:[ A, B, C],

“Name of Parameter 2”:[ D, E]},

“Name of Function 2”:{“Name of Parameter”:[1, 2, 3, 4, 5, 6]}}

Example

grid_params = {‘Windowing’:{“window_size”: [100,200],’delta’:[100]},

‘selector_set’: {“Recursive Feature Elimination”:{‘number_of_features’:[10, 20]}}, ‘Hierarchical Clustering with Neuron Optimization’: {‘number_of_neurons’:[10,20]} }

results, stats = dsk.pipeline.grid_search(grid_params)

iterate_columns(fg, n_columns=None, columns=None)

Builds Multiple Feature generators by iterating over the input columns

Parameters
  • fg (dict) – Single input feature generator folowing the standard format

  • n_columns (int) – The number of columns to return as a combination.

  • columns (list) – if None, will use the columns in the input feature generator, otherwise will use this list provided

fg = {‘name’:’Downsample with Min Max Scaling’, ‘params’:{“columns”: [‘gyrZ’,’gyrX’] , “new_length”: 15}}

fg_new = dsk.pipeline.iterate_columns(fg, n_columns=1)

>> print(fg_new) >> [{‘name’: ‘Downsample with Min Max Scaling’, ‘params’: {‘columns’: [‘gyrX’], ‘new_length’: 15}},

{‘name’: ‘Downsample with Min Max Scaling’, ‘params’: {‘columns’: [‘gyrX’], ‘new_length’: 15}}]

list_knowledgepacks()

Lists all of the projects on kb cloud associated with current pipeline

Returns

projects on kb cloud

Return type

DataFrame

rehydrate(model=None, replace=True, kp_summary=False, uuid=None)

Replace the executing cell with pipeline code for either a model or pipeline.

Parameters
  • model (model, knowledgepack, None) – pass in a model to build a pipeline from that

  • replace (boolean, True) – replace the executing cell with pipeline code

rehydrate_knowledgepack(model=None, uuid=None, replace=True)

Replace the executing cell with pipeline code for a knowledge Pack

Parameters

model (model, knowledgepack, None) – pass in a model to build a pipeline from that

rehydrate_pipeline(model=None, uuid=None, replace=True)

Replace the executing cell with pipeline code for the current pipeline or pipeline that generated the model

Parameters

model (model, knowledgepack, None) – pass in a model to build a pipeline from that

reset(delete_cache=False)

Reset the current pipeline steps.

Parameters

delete_cache (bool, False) – Delete the cache from KB cloud.

set_classifier(name, params={})

Classification method for the TVO step to use.

Parameters
  • name (str) – Name of the classification method.

  • params (dict, optional) – Parameters of the classification method.

set_columns(data_columns=None, group_columns=None, label_column=None)
Sets the columns for group_columns, data_columns and the label column

to be used in the pipeline. This will automatically handle label column, ignore columns, group columns and passthrough columns for the majority of pipelines. For pipelines that need individually specified column attributes, set them in the step command.

Parameters
  • data_columns (None, list) – List of sensor data streams to use.

  • group_columns (None, list) – List of columns to use when applying aggregate functions and defining unique subsets on which to operate.

  • label_column (None, str) – The column name containing the ground truth label.

set_input_capture(names)

Use a data file that has been uploaded as your data source.

Parameters

name (str,list) – single capture or list of captures file names to use in SensiML cloud.

set_input_data(name, data_columns=None, group_columns=None, label_column=None)

Use a data file that has been uploaded as your data source.

Parameters
  • name (str, list) – The name of the data file or list of datafiles in SensiML cloud.

  • data_columns (list, required) – Array of data streams to use in model building.

  • group_columns (list, required) – The List of columns to use when applying aggregate functions and defining unique subsets on which to operate.

  • label_column (str, required) – The column with the true classification.

set_input_query(name)

Set the input data to be a stored query.

Parameters

name (str) – The name of the saved query.

set_knowledgepack_platform(*args, **kwargs)

Backwards compatible call to set_device_configuration

set_training_algorithm(name, params={})

Training algorithm for the TVO step to use.

Parameters
  • name (str) – Name of the training algorithm.

  • params (dict, optional) – Parameters of the training algorithm.

set_tvo(params={})

Description of the train, validate optimize step, which consists of a training algorithm, validation method and classifier.

Parameters

params (dict, optional) – Parameters of the TVO step.

Example

>>> dsk.pipeline.set_validation_method('Stratified K-Fold Cross-Validation', params={'number_of_folds':3})
>>> dsk.pipeline.set_classifier('PME', params={"classification_mode":'RBF','distance_mode':'L1'})
>>> dsk.pipeline.set_training_algorithm('Hierarchical Clustering with Neuron Optimization', params = {'number_of_neurons':10})
>>> dsk.pipeline.set_tvo({'label_column':'Label', 'ignore_columns': ['Subject', 'Rep']})
set_validation_method(name, params={})

Set the validation method for the tvo step.

Parameters
  • name (str) – Name of the validation method to use.

  • params (dict, optional) – Parameters for the validation method.

stop_pipeline()

Kills a pipeline that is running on KB cloud.

submit(lock=False)

Submit a pipeline asynchronously to SensiML Cloud for execution.

visualize_features(feature_vector, label_column=None)

Makes a plot of feature vectors by class to aid in understanding your model

Parameters

feature_vector (DataFrame) – Dataframe containing feature vectors and label column

visualize_neuron_array(model, feature_vector, featureX=None, featureY=None, neuron_alpha=0.2)

Makes a plot of feature vectors by class to aid in understanding your model

Parameters
  • model (model/knowledpack) – The model or knowledpack to use for plotting the neurons

  • feature_vector (DataFrame) – Dataframe containing feature vectors and label column

  • featureX (str) – The name of the feature for the x axis

  • featureY (str) – The name of the feature for the y axis