Pipelines
A pipeline is a container for a series of data processing steps. The pipeline object allows you to get an existing pipeline or create a new one with a given name. With this object, you can set input data sources, add transforms, feature generators, feature selectors, feature transforms and classifiers. An example pipeline is shown below.
Examples:
client.pipeline = 'my_pipeline'
client.pipeline.set_input_query('ExampleQuery')
client.pipeline.add_transform('Windowing')
# Feature Generation
client.pipeline.add_feature_generator(
[
{'subtype_call':'Time', 'params':{'sample_rate':100}},
{'subtype_call':'Rate of Change'},
{'subtype_call':'Statistical'},
{'subtype_call':'Energy'},
{'subtype_call':'Amplitude', 'params':{'smoothing_factor':9}}
],
function_defaults={'columns':sensor_columns},
)
# Scale to 8 bit representation for classification
client.pipeline.add_transform('Min Max Scale')
# Perform Feature Selection to remove highly correlated features, features
with high variance and finally recursive feature eliminate.
# (Note: Recursive feature elimination can be very slow for large data sets and large number of parameters,
# it is recommended to use other feature selection algorithms to first reduce the number of features)
client.pipeline.add_feature_selector(
[
{"name": "Correlation Threshold","params":{'threshold':0.85}},
{"name": "Variance Threshold",
"params":{
'threshold':0.05
}
},
{"name":"Recursive Feature Elimination",
"params":{
"method":"Log R",
"number_of_features":20
}
},
],
params = {"number_of_features":20}
)
client.pipeline.set_validation_method('Stratified K-Fold Cross-Validation',
params={'number_of_folds':3})
client.pipeline.set_classifier('PME',
params={'classification_mode':'RBF',
'distance_mode':'L1'
})
client.pipeline.set_training_algorithm('Hierarchical Clustering with Neuron Optimization',
params = {'number_of_neurons':10})
client.pipeline.set_tvo({'validation_seed':0})
results, stats = client.pipeline.execute()
The final step is where the pipeline is sent to the SensiML Cloud Engine for execution, once the job is completed the results will be returned to you as a model object if classification has occurred, or a DataFrame if you are at an intermediary step.
client.pipeline.execute()
- class sensiml.pipeline.Pipeline(client, name=None)
Base class of a pipeline object
- add_augmentation(augmentation: list[dict], params: Optional[dict] = None, overwrite: bool = True)
Add augmentation set to the pipeline.
- Parameters
augmentation (List) – List of dictionaries containing time series augmentations
params (dict, {}) – Parameters of the augmentation set.
Examples
>>> client.pipeline.add_augmentation([{"name": "Add Noise", "params": { >>> "background_scale_range": [100, 1000], >>> "target_labels": ['idle','walking'], >>> "target_sensors": ['AccelerometerX', 'GyroscopeZ'], >>> "fraction": 1.2, >>> "noise_types": ["pink", "white"], >>> "filter": {"Set": ["Train"]} >>> } >>> }])
- add_feature_generator(feature_generators, params: Optional[dict] = None, function_defaults: Optional[dict] = None, return_generator_set: bool = False)
Add a feature generator set to the pipeline.
- Parameters
feature_generators (list) – List of feature generators. As names or dictionaries.
params (dict, {}}) – Parameters to apply to the feature generator set.
function_defaults (dict,{}}) – Parameters to apply to all individual feature generators.
Examples
>>> # Add a single feature generator >>> client.pipeline.add_feature_generator([{'name': 'Downsample', 'params': {'new_length': 5}}, {'name': 'Mean'}], function_defaults = {'columns': sensor_columns})
>>> # Call multiple functions by name when they use the same same function defaults >>> client.pipeline.add_feature_generator(['Mean', 'Standard Deviation', 'Skewness', 'Kurtosis', '25th Percentile', '75th Percentile', '100th Percentile', 'Zero Crossing Rate'], function_defaults = {'columns': sensor_columns})
>>> # Call multiple functions using function defaults >>> client.pipeline.add_feature_generator([{'name': 'Downsample', 'params': {'new_length': 5}}, {'name': 'Mean'}], function_defaults = {'columns': sensor_columns})
>>> # Call multiple functions by subtype which use different parameters; note all subtypes will take the same inputs >>> client.pipeline.add_feature_generator([{'subtype_call': 'Area', 'params': {'sample_rate': 100, 'smoothing_factor': 9}}, {'subtype_call': 'Time', 'params': {'sample_rate': 100}}, {'subtype_call': 'Rate of Change'}, {'subtype_call': 'Statistical'}, {'subtype_call': 'Energy'}, {'subtype_call': 'Amplitude', 'params': {'smoothing_factor': 9}}, {'subtype_call': 'Physical', 'params': {'sample_rate': 100}} ], function_defaults={'columns': sensor_columns}, )
>>> # Mix subtype and specify additional feature generators >>> client.pipeline.add_feature_generator([{'subtype_call': 'Statistical'}, {'name': 'Downsample', 'params': {'new_length': 5}}, ], function_defaults={'columns': sensor_columns}, )
>>> # Call the same feature generators multiple times with different parameters >>> client.pipeline.add_feature_generator([{'name': 'Downsample', 'params': {'new_length': 5, 'columns': sensor_columns[0]}}, {'name': 'Downsample', 'params': {'new_length': 12}}, ], function_defaults={'columns': sensor_columns}, )
- add_feature_selector(feature_selectors: list[dict], params: Optional[dict] = None)
Add a feature selection set to the pipeline.
- Parameters
feature_selectors (List) – List of dictionaries containing feature selectors
params (dict, {}) – Parameters of the feature selector set.
Examples
>>> client.pipeline.add_feature_selector([{"name":"Recursive Feature Elimination", "params":{"method":"Log R"}}], >>> params = {"number_of_features":20})
- add_linear_step(func)
Add a step to the pipeline. Automatically tie the previous step and current step.
- Parameters
func (function) – A sensiml function method call
- add_segmenter(name: str, params: Optional[dict] = None)
Add a Segmenter to the pipeline.
- Parameters
name (str) – Name of the segmenter method to add.
params (dict, optional) – Dictionary containing the parameters of the transform.
- add_transform(name: str, params: Optional[dict] = None, overwrite: bool = True)
Add a Transform to the pipeline.
- Parameters
name (str) – Name of the transform method to add.
params (dict, optional) – Dictionary containing the parameters of the transform.
overwrite (boolean) – when adding multiple instances of the same transform, set overwrite to False for the additional steps and the first instance will not be overwritten (default is True)
- auto(auto_params: dict, run_parallel: bool = True, lock: bool = True, silent: bool = True, renderer: Optional[Callable] = None, version: int = 2) Tuple[DataFrame, dict]
Execute automated pipeline asynchronously.
The automated pipeline is used to find optimal parameters with a genetic algorithm. The genetic algorithm starts with a user-defined randomized population (pipelines) and generates models from them and tests them, keeps a subset of high-performing combinations, and then recombines those “survivors” and repeats the process over again. The offspring of good parameter combinations are usually also good and sometimes are significantly better than their parents. As the algorithm iterates each successive generation, it often finds a near-optimal model without trying as many configurations. The algorithm terminates when the desired number of iteration is completed.
The created pipelines are run in parallel in SensiML servers and results are ranked by the fitness score which takes into account the model’s F1 score, precision, sensitivity, and other metrics. The automation options and definition of the performance metrics are given below.
- Parameters
params (dict) –
Parameters of the genetic algorithm to optimize the pipelines. Definition and options of the parameters are given below. - search_steps, (list, [‘selectorset’, ‘tvo’]): it is used to define which libraries in the pipelines will be optimized.
population_size, (int, 10): Initial number of randomly created pipelines.
iterations, (int, 1): Repetition number of optimization process
mutation_rate, (float, 0.1): Random changes from the previous population.
recreation_rate, (float, 0.1): Rate of randomly created pipelines for next generation.
survivor_rate, (float, 0.5): Ratio of the population that will be transferred to next generation.
number_of_models_to_return, (int, 5): Number of pipelines that will return to user.
allow_unknown, (bool, False): Allows creating unknown prediction results for the vectors. A vector is classified as an unknown if it
cannot be recognized by any neurons.
fitness (dict): Fitness parameters are combination of statistical and cost variables. It is used to evaluate the performance of the models found by the algorithm. The user will define the coefficient scores for the parameters to define the priority of the parameters in the fitness score. Definition and options of the parameters are given below.
- statistical variables:
accuracy: The degree of correctness of all vectors
f1_score: Measures of the test’s accuracy
precision: Proportion of positive identifications that is actually correct
sensitivity: Measures of the proportion of actual positives that are correctly identified
specificity: Measures of the proportion of actual negatives that are correctly identified
positive_predictive_rate: Ratio of “true positive” is the event that the test makes a positive prediction
- cost variables:
neurons: Number of neurons that used in the model
features: Number of features that used in the model,
lock (bool, False) – Ping for results every 30 seconds until the process finishes.
silent (bool, True) – Silence status updates.
Example
activity_data = client.datasets.load_activity_raw() client.upload_dataframe('activity_data.csv', activity_data, force=True) client.pipeline.reset() client.pipeline.set_input_data('activity_data.csv', data_columns = ['accelx', 'accely', 'accelz', 'gyrox', 'gyroy', 'gyroz'], group_columns = ['Subject','Class', 'Rep'], label_column = 'Class') client.pipeline.add_transform("Windowing", params={"window_size": 100, "delta": 100 }) results, summary = client.pipeline.auto({ 'params':{"search_steps": ['selectorset', 'tvo'], "population_size": 10, "iterations": 1, "mutation_rate": 0.1, "recreation_rate": 0.1, "survivor_rate": 0.5, "number_of_models_to_return": 5, "run_parallel": True, "allow_unknown": False, "fitness": {'accuracy': 1.0, 'f1_score': 0.0, 'features': 0.3, 'precision': 0.0, 'sensitivity': 0.7, 'specificity': 0.0, 'positive_predictive_rate': 0.0}, }}) results.summarize()
- autosegment_search(params: dict, run_parallel: bool = True, lock: bool = True, silent: bool = True) Tuple[DataFrame, dict]
Execute auto segment search pipeline asynchronously.
- Parameters
params (dict) – Automation parameters for segment search.
run_parallel (bool, True) – Run in parallel in KB cloud.
lock (bool, False) – Ping for results every 30 seconds until the process finishes.
silent (bool, True) – Silence status updates.
- clear_cache() Response
Deletes the cache on KB cloud for this pipeline.
- data(pipeline_step: int, page_index: int = 0, convert_datasegments_to_dataframe=True) DataFrame
Retrieves results from a specific pipeline step in the pipeline from stored values in the pipeline cache after execution has been performed.
- Parameters
pipeline_step (int) – Pipeline step to retrieve results from.
page_index (int) – Index of data to get.
datasegments (bool) – Keep results in datasegments format instead flattening to DataFrame format
- Returns
A ModelResultSet if the selected pipeline step is TVO step, otherwise the output of the pipeline step is returned as a DataFrame.
- delete_sandbox() Response
Clears the local pipeline steps, and delete the sandbox from the KB cloud.
- describe(show_params: bool = True, show_set_params: bool = False)
Prints out a description of the pipeline steps and parameters
- Parameters
show_params (bool, True) – Include the parameters in the pipeline description
- execute(wait_time: int = 15, silent: bool = True, describe: bool = True, **kwargs) Tuple[Union[DataFrame, ModelResultSet], dict]
Execute pipeline asynchronously and check for results.
- Parameters
wait_time (int, 10) – Time to wait in between status checks.
silent (bool, True) – Silence status updates.
- features_to_tensor(feature_vectors, validate=0.1, test=0.1, label=None, feature_columns=None, class_map=None, dtype=<class 'numpy.int32'>, int8_input_type=True, shape=None, verbose=True)
Converts a SensiML feature vector Dataframe into a train, test, validate set of Numpy arrays to use in training a tensorflow model
- Parameters
feature_vectors (DataFrame) – DataFrame of feature vectors returned from SensiML Pipeline
validate (float, optional) – percentage of data to use in validation set. Defaults to 0.1.
test (float, optional) – percentage of data to use in test set. Defaults to 0.1.
label (str, optional) – the label column for the feature vectors. Defaults to None.
feature_columns (list, optional) – the coluns of the feature vector DataFrame containing features. Defaults to None.
class_map (dict, optional) – a class map describing the mapping of labels to int values. Defaults to None.
dtype (_type_, optional) – the dtype of the tensorflow object to create. Defaults to np.int32.
int8_input_type (bool, optional) – Scales the SensiML Features to int8 from uint8. Defaults to True.
shape (tuple, optional) – Reshape the dataframe before loading it into the numpy array. Defaults to None.
- Returns
A tuple containing numpy arrays for traning a TensorFlow model along with the class map x_train, x_test, x_validate, y_train, y_test, y_validate, class_map
- get_automl_iteration_metrics() DataFrame
Results of automl stats for each iteration
- Returns
AutoML results stats
- Return type
DataFrame
- get_cached_data(pipeline_step: int, page_index: int = 0) DataFrame
Retrieves results from a specific pipeline step in the pipeline from stored values in the pipeline cache after execution has been performed.
- Parameters
pipeline_step (int) – Pipeline step to retrieve results from.
page_index (int) – Index of data to get.
datasegments (bool) – Keep results in datasegments format instead flattening to DataFrame format
- Returns
A datasegments dictionary if the pipeline step is prior to the feature extraction step
A feature vector DataFrame if the result is before the Model train step
A ModelResultSet if the selected pipeline step is TVO step, otherwise the output of the pipeline
- get_function_type(name: str) str
- Returns
The type of a function.
- Return type
str
- get_knowledgepack(uuid: str) KnowledgePack
retrieve knowledgepack by uuid from the server
- Parameters
uuid (str) – unique identifier for knowledgepack
- Returns
a knowledgepack object
- Return type
TYPE
- get_pipeline_length() int
- Returns
The current length of the pipeline.
- Return type
int
- get_results(lock: bool = False, wait_time: int = 15, silent: bool = False, page_index: int = 0, renderer=None, **kwargs) Tuple[DataFrame, dict]
Retrieve status, results from the kb cloud for the current pipeline.
- Parameters
run. (results to retrieve. The default is the last type that was) –
lock (bool, False) – This will lock the process and continuously ping the KB cloud
process. (for the status of the pipeline) –
wait_time (int, 30) – The time to wait between individual status checks.
silent (bool, False) – This will silence updates to every 4th update check.
page_index (int, 0) – The page desired if the result is multi-paged (1-based)
- Returns
This is the result of the last executed pipeline step. stats (dictionary): A dictionary containing the execution summary, features and other summary statistics
- Return type
results (DataFrame or model result)
- grid_search(grid_params: dict, run_parallel: bool = True, lock: bool = True, silent: bool = True) Tuple[DataFrame, dict]
Grid search is a parameter optimization method that exhaustively searches over a gridded parameter space. Grid search returns will return the score each parameter combination for f1, precision and sensitivity so that you can choose the best performing combination to build a knowledge pack with.
- Parameters
grid_params (dict) – Grid search parameters.
run_parallel (bool, True) – Run grid search in parallel in KB cloud.
lock (bool, False) – Ping for results every 30 seconds until the process finishes.
grid_params is a nested python dictionary object.
grid_params = {“Name Of Function”:{“Name of Parameter”:[ A, B, C]}}
Where A, B and C are the parameters to search over. Additionally, for each step you may want to search over more than one of a functions configurable parameters. To do this simply add another element to the functions dictionary.
grid_params = {“Name Of Function”:{“Name of Parameter 1”:[ A, B, C], “Name of Parameter 2”:[ D, E]}}
This will tell grid search to search over 6 different parameter spaces.
You can also specify more than one step to search over in grid params. This is done by adding another element to the function level of the grid_params dictionary.
grid_params = {“Name Of Function”:{“Name of Parameter 1”:[ A, B, C], “Name of Parameter 2”:[ D, E]}, “Name of Function 2”:{“Name of Parameter”:[1, 2, 3, 4, 5, 6]}}
Example
grid_params = {‘Windowing’:{“window_size”: [100,200],’delta’:[100]}, ‘selector_set’: {“Recursive Feature Elimination”:{‘number_of_features’:[10, 20]}}, ‘Hierarchical Clustering with Neuron Optimization’: {‘number_of_neurons’:[10,20]}}
results, stats = client.pipeline.grid_search(grid_params)
- iterate_columns(fg, n_columns=None, columns=None)
Builds Multiple Feature generators by iterating over the input columns
- Parameters
fg (dict) – Single input feature generator folowing the standard format
n_columns (int) – The number of columns to return as a combination.
columns (list) – if None, will use the columns in the input feature generator, otherwise will use this list provided
- Example
>>> fg = {'name':'Downsample with Min Max Scaling', 'params':{"columns": ['gyrZ','gyrX'] , "new_length": 15}} >>> fg_new = client.pipeline.iterate_columns(fg, n_columns=1)
>>> print(fg_new)
Output: [{‘name’: ‘Downsample with Min Max Scaling’, ‘params’: {‘columns’: [‘gyrX’], ‘new_length’: 15}}, {‘name’: ‘Downsample with Min Max Scaling’, ‘params’: {‘columns’: [‘gyrX’], ‘new_length’: 15}}]
- list_knowledgepacks() DataFrame
Lists all of the projects on kb cloud associated with current pipeline
- Returns
projects on kb cloud
- Return type
DataFrame
- rehydrate(model: Optional[KnowledgePack] = None, replace: Optional[bool] = True, kp_summary: Optional[list] = False, uuid: Optional[str] = None)
Replace the executing cell with pipeline code for either a model or pipeline.
- Parameters
model (model, knowledgepack, None) – pass in a model to build a pipeline from that
replace (boolean, True) – replace the executing cell with pipeline code
- rehydrate_knowledgepack(model: Optional[KnowledgePack] = None, uuid: Optional[str] = None, replace: bool = True)
Replace the executing cell with pipeline code for a knowledge Pack
- Parameters
model (model, knowledgepack, None) – pass in a model to build a pipeline from that
- rehydrate_pipeline(model: Optional[KnowledgePack] = None, uuid: Optional[str] = None, replace: bool = True)
Replace the executing cell with pipeline code for the current pipeline or pipeline that generated the model
- Parameters
model (model, knowledgepack, None) – pass in a model to build a pipeline from that
- reset(delete_cache: bool = False, update: bool = False)
Reset the current pipeline steps.
- Parameters
delete_cache (bool, False) – Delete the cache from KB cloud.
update (bool, False) – Push cleared pipeline to the server
- set_classifier(name: str, params: Optional[dict] = None)
Classification method for the TVO step to use.
- Parameters
name (str) – Name of the classification method.
params (dict, optional) – Parameters of the classification method.
- set_columns(data_columns: Optional[list[str]] = None, group_columns: Optional[list[str]] = None, label_column: Optional[str] = None)
- Sets the columns for group_columns, data_columns and the label column
to be used in the pipeline. This will automatically handle label column, ignore columns, group columns and passthrough columns for the majority of pipelines. For pipelines that need individually specified column attributes, set them in the step command.
- Parameters
data_columns (None, list) – List of sensor data streams to use.
group_columns (None, list) – List of columns to use when applying aggregate functions and defining unique subsets on which to operate.
label_column (None, str) – The column name containing the ground truth label.
- set_input_capture(names: list[str], group_columns: Optional[list[str]] = None)
Use a data file that has been uploaded as your data source.
- Parameters
name (str,list) – single capture or list of captures file names to use in SensiML cloud.
- set_input_data(name: str, data_columns: Optional[list[str]] = None, group_columns: Optional[list[str]] = None, label_column: Optional[str] = None)
Use a data file that has been uploaded as your data source.
- Parameters
name (str, list) – The name of the data file or list of datafiles in SensiML cloud.
data_columns (list, required) – Array of data streams to use in model building.
group_columns (list, required) – The List of columns to use when applying aggregate functions and defining unique subsets on which to operate.
label_column (str, required) – The column with the true classification.
- set_input_features(name: str, feature_columns: Optional[list[str]] = None, group_columns: Optional[list[str]] = None, label_column: Optional[str] = None)
Use a feature file as input to the pipeline (features have already been generated in this case)
- Parameters
name (str, list) – The name of the data file or list of feature files in SensiML cloud.
feature_columns (list, required) – Array of data streams to use in model building.
group_columns (list, required) – The List of columns to use when applying aggregate functions and defining unique subsets on which to operate.
label_column (str, required) – The column with the true classification.
- set_input_query(name: str, use_session_preprocessor: bool = True)
Set the input data to be a stored query.
- Parameters
name (str) – The name of the saved query.
use_session_preprocessor (bool) – Use the autosegmentation algorithms for this pipeline (if there is one). When this is true, the segmentation algorithm will not show up in the pipeline but will be included in the firmware generation.
- set_regression(name: str, params: Optional[dict] = None)
Regression method for the TVO step to use.
- Parameters
name (str) – Name of the classification method.
params (dict, optional) – Parameters of the classification method.
- set_training_algorithm(name: str, params: Optional[dict] = None)
Training algorithm for the TVO step to use.
- Parameters
name (str) – Name of the training algorithm.
params (dict, optional) – Parameters of the training algorithm.
- set_tvo(params: Optional[dict] = None)
Description of the train, validate optimize step, which consists of a training algorithm, validation method and classifier.
- Parameters
params (dict, optional) – Parameters of the TVO step.
Example
>>> client.pipeline.set_validation_method('Stratified K-Fold Cross-Validation', params={'number_of_folds':3}) >>> client.pipeline.set_classifier('PME', params={"classification_mode":'RBF','distance_mode':'L1'}) >>> client.pipeline.set_training_algorithm('Hierarchical Clustering with Neuron Optimization', params = {'number_of_neurons':10}) >>> client.pipeline.set_tvo({'label_column':'Label', 'ignore_columns': ['Subject', 'Rep']})
- set_validation_method(name: str, params: Optional[dict] = None)
Set the validation method for the tvo step.
- Parameters
name (str) – Name of the validation method to use.
params (dict, optional) – Parameters for the validation method.
- stop_pipeline() Response
Kills a pipeline that is running on KB cloud.
- submit() bool
Submit a pipeline asynchronously to SensiML Cloud for execution.
- to_list() list[dict]
Converts the current pipeline into its List form
- Return type
List
- visualize_features(feature_vector: DataFrame, label_column: Optional[str] = None)
Makes a plot of feature vectors by class to aid in understanding your model
- Parameters
feature_vector (DataFrame) – Dataframe containing feature vectors and label column
- visualize_neuron_array(model, feature_vector, featureX=None, featureY=None, neuron_alpha=0.2)
Makes a plot of feature vectors by class to aid in understanding your model
- Parameters
model (model/knowledgepack) – The model or knowledgepack to use for plotting the neurons
feature_vector (DataFrame) – Dataframe containing feature vectors and label column
featureX (str) – The name of the feature for the x axis
featureY (str) – The name of the feature for the y axis