Published on

Feature Selection Using Feature Importance Score - Creating a PySpark Estimator

Authors
  • Name
    Twitter

In this post I discuss how to create a new pyspark estimator to integrate in an existing machine learning pipeline. This is an extension of my previous post where I discussed how to create a custom cross validation function. Recently, I have been looking at integrating existing code in the pyspark ML pipeline framework. A pipeline is a fantastic concept of abstraction since it allows the analyst to focus on the main tasks that needs to be carried out and allows the entire piece of work to be reusable.

As a fun and useful example, I will show how feature selection using feature importance score can be coded into a pipeline. I find Pyspark's MLlib native feature selection functions relatively limited so this is also part of an effort to extend the feature selection methods. Here, I use the feature importance score as estimated from a model (decision tree / random forest / gradient boosted trees) to extract the variables that are plausibly the most important.

First, let's setup the jupyter notebook and import the relevant functions. I use a local version of spark to illustrate how this works but one can easily use a yarn cluster instead.

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

import numpy as np
import pandas as pd
pd.options.display.max_columns = None

import findspark
findspark.init()
from pyspark import SparkContext
from pyspark import SQLContext
sc = SparkContext()
spark = SQLContext(sc)
from pyspark.sql.functions import *
from pyspark.ml.classification import  RandomForestClassifier
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, VectorSlicer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

Bank Marketing Data Set

To show the usefulness of feature selection and to sort of validate the script, I used the Bank Marketing Data Set from UCI Machine Learning Repository as an example throughout this post. This comes from Moro et al., 2014 paper on A Data-Driven Approach to Predict the Success of Bank Telemarketing. As the name of the paper suggests, the goal of this dataset is to predict which bank customers would subscribe to a term deposit product as a result of a phone marketing campaign.

Let us read in the file and take a look at the variables of the dataset.

df = spark.read.option("delimiter", ";").csv("../data/bank-additional/bank-additional-full.csv", header=True, inferSchema = True)
df.dtypes
[('age', 'int'),
('job', 'string'),
('marital', 'string'),
('education', 'string'),
('default', 'string'),
('housing', 'string'),
('loan', 'string'),
('contact', 'string'),
('month', 'string'),
('day_of_week', 'string'),
('duration', 'int'),
('campaign', 'int'),
('pdays', 'int'),
('previous', 'int'),
('poutcome', 'string'),
('emp.var.rate', 'double'),
('cons.price.idx', 'double'),
('cons.conf.idx', 'double'),
('euribor3m', 'double'),
('nr.employed', 'double'),
('y', 'string')]

There are some problematic variable names and we should replace the dot seperator with an underscore.

df = df.toDF(*(c.replace('.', '_') for c in df.columns))
df.limit(5).toPandas()
summaryagejobmaritaleducationdefaulthousingloancontactmonthday_of_weekduration
0count4118841188411884118841188411884118841188411884118841188
1mean40.02406040594348NoneNoneNoneNoneNoneNoneNoneNoneNone258.2850101971448
2stddev10.421249980934057NoneNoneNoneNoneNoneNoneNoneNoneNone259.27924883646455
3min17admin.divorcedbasic.4ynononocellularaprfri0
4max98unknownunknownunknownyesyesyestelephonesepwed4918

Continued...

campaignpdayspreviouspoutcomeemp_var_ratecons_price_idxcons_conf_idxeuribor3mnr_employedy
041188411884118841188411884118841188411884118841188
12.567592502670681962.47545401573280.17296299893172767None0.0818855006317839293.57566436828918-40.502600271917873.62129081285853665167.035910944004None
22.770013542902331186.91090734474140.49490107983928927None1.570959740517030.57884004895413554.6281978561745951.734447404851255772.25152766825924None
3100failure-3.492.201-50.80.6344963.6no
4569997success1.494.767-26.95.0455228.1yes

It's always nice to take a look at the distribution of the variables

df.describe().toPandas()
summaryagejobmaritaleducationdefaulthousingloancontactmonthday_of_weekduration
0count4118841188411884118841188411884118841188411884118841188
1mean40.02406040594348NoneNoneNoneNoneNoneNoneNoneNoneNone258.2850101971448
2stddev10.421249980934057NoneNoneNoneNoneNoneNoneNoneNoneNone259.27924883646455
3min17admin.divorcedbasic.4ynononocellularaprfri0
4max98unknownunknownunknownyesyesyestelephonesepwed4918

Continued...

campaignpdayspreviouspoutcomeemp_var_ratecons_price_idxcons_conf_idxeuribor3mnr_employedy
041188411884118841188411884118841188411884118841188
12.567592502670681962.47545401573280.17296299893172767None0.0818855006317839293.57566436828918-40.502600271917873.62129081285853665167.035910944004None
22.770013542902331186.91090734474140.49490107983928927None1.570959740517030.57884004895413554.6281978561745951.734447404851255772.25152766825924None
3100failure-3.492.201-50.80.6344963.6no
4569997success1.494.767-26.95.0455228.1yes

There are quite a few variables that are encoded as a string in this dataset. Converting strings to a binary indicator variable / dummy variable takes up quite a few degrees of freedom. In machine learning speak it might also lead to the model being overfitted. Let us take a look at what is represented by each variable that is of string type.

for i in df.dtypes:
    if i[1]=='string':
        df.groupBy(i[0]).count().orderBy('count', ascending=False).toPandas()
jobcount
0admin.10422
1blue-collar9254
2technician6743
3services3969
4management2924
5retired1720
6entrepreneur1456
7self-employed1421
8housemaid1060
9unemployed1014
10student875
11unknown330
maritalcount
0married24928
1single11568
2divorced4612
3unknown80
educationcount
0university.degree12168
1high.school9515
2basic.9y6045
3professional.course5243
4basic.4y4176
5basic.6y2292
6unknown1731
7illiterate18
defaultcount
0no32588
1unknown8597
2yes3
housingcount
0yes21576
1no18622
2unknown990
loancount
0no33950
1yes6248
2unknown990
contactcount
0cellular26144
1telephone15044
monthcount
0may13769
1jul7174
2aug6178
3jun5318
4nov4101
5apr2632
6oct718
7sep570
8mar546
9dec182
day_of_weekcount
0thu8623
1mon8514
2wed8134
3tue8090
4fri7827
poutcomecount
0nonexistent35563
1failure4252
2success1373
ycount
0no36548
1yes4640

The number of categories for each string type is relatively small which makes creating binary indicator variables / one-hot encoding a suitable pre-processing step. Let us take a look at how to do feature selection using the feature importance score the manual way before coding it as an estimator to fit into a Pyspark pipeline.

Data Preprocessing

Before we run the model on the most relevant features, we would first need to encode the string variables as binary vectors and run a random forest model on the whole feature set to get the feature importance score. Here I just run most of these tasks as part of a pipeline.

# one hot encoding and assembling
encoding_var = [i[0] for i in df.dtypes if (i[1]=='string') & (i[0]!='y')]
num_var = [i[0] for i in df.dtypes if ((i[1]=='int') | (i[1]=='double')) & (i[0]!='y')]

string_indexes = [StringIndexer(inputCol = c, outputCol = 'IDX_' + c, handleInvalid = 'keep') for c in encoding_var]
onehot_indexes = [OneHotEncoderEstimator(inputCols = ['IDX_' + c], outputCols = ['OHE_' + c]) for c in encoding_var]
label_indexes = StringIndexer(inputCol = 'y', outputCol = 'label', handleInvalid = 'keep')
assembler = VectorAssembler(inputCols = num_var + ['OHE_' + c for c in encoding_var], outputCol = "features")
rf = RandomForestClassifier(labelCol="label", featuresCol="features", seed = 8464,
                            numTrees=10, cacheNodeIds = True, subsamplingRate = 0.7)

pipe = Pipeline(stages = string_indexes + onehot_indexes + [assembler, label_indexes, rf])
mod = pipe.fit(df)
df2 = mod.transform(df)

The feature importance score that is returned comes in the form of a sparse vector. This is not very human readable and we would need to map this to the actual variable names for some insights. I wrote a little function to return the variable names sorted by importance score as a pandas data frame. This was inspired by the following post on stackoverflow

mod.stages[-1].featureImportances
SparseVector(63, {0: 0.0257, 1: 0.1596, 2: 0.0037, 3: 0.2212, 4: 0.0305, 5: 0.0389, 6: 0.0762, 7: 0.0423, 8: 0.1869, 9: 0.063, 10: 0.0002, 12: 0.0003, 13: 0.0002, 14: 0.0003, 15: 0.0005, 16: 0.0002, 18: 0.0006, 19: 0.0003, 20: 0.0002, 21: 0.0, 22: 0.001, 23: 0.0003, 24: 0.0005, 26: 0.0005, 27: 0.0007, 28: 0.0008, 29: 0.0003, 30: 0.0, 31: 0.0001, 34: 0.0002, 35: 0.0021, 37: 0.0001, 38: 0.0003, 39: 0.0003, 40: 0.0003, 41: 0.0001, 42: 0.0002, 43: 0.0284, 44: 0.0167, 45: 0.0038, 46: 0.0007, 47: 0.0008, 48: 0.0132, 49: 0.0003, 50: 0.0014, 51: 0.0159, 52: 0.0114, 53: 0.0103, 54: 0.0036, 55: 0.0002, 56: 0.0021, 57: 0.0002, 58: 0.0006, 59: 0.0005, 60: 0.0158, 61: 0.0038, 62: 0.0121})
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))
ExtractFeatureImp(mod.stages[-1].featureImportances, df2, "features").head(10)
idxnamescore
33pdays0.221203
88euribor3m0.186892
11duration0.159579
66cons_price_idx0.076177
99nr_employed0.063016
77cons_conf_idx0.042298
55emp_var_rate0.038875
44previous0.030470
4343OHE_contact_cellular0.028401
00age0.025732

Now that we have the most important faatures in a nicely formatted list, we can extract the top 10 features and create a new input vector column with only these variables. Pyspark has a VectorSlicer function that does exactly that. A new model can then be trained just on these 10 variables.

varlist = ExtractFeatureImp(mod.stages[-1].featureImportances, df2, "features")
varidx = [x for x in varlist['idx'][0:10]]
varidx
[3, 8, 1, 6, 9, 7, 5, 4, 43, 0]
slicer = VectorSlicer(inputCol="features", outputCol="features2", indices=varidx)
df3 = slicer.transform(df2)
df3 = df3.drop('rawPrediction', 'probability', 'prediction')
rf2 = RandomForestClassifier(labelCol="label", featuresCol="features2", seed = 8464,
                            numTrees=10, cacheNodeIds = True, subsamplingRate = 0.7)
mod2 = rf2.fit(df3)
df4 = mod2.transform(df3)

Building the estimator function

Now let us learn to build a new pipeline object that makes the above task easy!

First a bit of theory as taken from the ML pipeline documentation:

DataFrame: This ML API uses DataFrame from Spark SQL as an ML dataset, which can hold a variety of data types. E.g., a DataFrame could have different columns storing text, feature vectors, true labels, and predictions.

Transformer: A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions.

Estimator: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.

Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.

The important thing to remember is that the pipeline object has two components. The first is the estimator which returns a model and the second is the model/transformer which returns a dataframe.

We begin by coding up the estimator object. The cross-validation function in the previous post provides a thorough walk-through on creating the estimator object and params needed. In this case, I wanted the function to select either the top n features or based on a certain cut-off so these parameters are included as arguments to the function. An estimator (either decision tree / random forest / gradient boosted trees) is also required as an input.

def __init__(self, estimator = None, selectorType = "numTopFeatures",
                 numTopFeatures = 20, threshold = 0.01, outputCol = "features")

Given a dataset we can write a fit function that extracts the feature importance scores

mod = est.fit(dataset)
dataset2 = mod.transform(dataset)
varlist = ExtractFeatureImp(mod.featureImportances, dataset2, est.getFeaturesCol())

Some conditional statements to select the correct indexes that corresponds to the feature we want to extract. This gives us the output of the model - a list of features we want to extract.

if (selectorType == "numTopFeatures"):
    varidx = [x for x in varlist['idx'][0:nfeatures]]
elif (selectorType == "threshold"):
    varidx = [x for x in varlist[varlist['score'] > threshold]['idx']]

Now for the second part of the problem - we want to take this list of features and create a transform function that returns the dataset with a new column containing our most relevant features. Sounds familiar? This is exactly what the VectorSlicer transformer does. So there is no need to re-invent the wheel and we can just reurn a VectorSlicer with the correct indices to slice.

return VectorSlicer(inputCol = est.getFeaturesCol(),
                           outputCol = outputCol,
                           indices = varidx)

That concludes our new feature selection estimator! The full code can be obtained here.

Putting the new function to the test

Let's try out the new function. I saved it as a file called FeatureImportanceSelector.py. Notice there is a new pipeline object called fis (featureImpSelector). This takes in the first random forest model and uses the feature importance score from it to extract the top 10 variables.

from FeatureImportanceSelector import ExtractFeatureImp, FeatureImpSelector
# one hot encoding and assembling
encoding_var = [i[0] for i in df.dtypes if (i[1]=='string') & (i[0]!='y')]
num_var = [i[0] for i in df.dtypes if ((i[1]=='int') | (i[1]=='double')) & (i[0]!='y')]

string_indexes = [StringIndexer(inputCol = c, outputCol = 'IDX_' + c, handleInvalid = 'keep') for c in encoding_var]
onehot_indexes = [OneHotEncoderEstimator(inputCols = ['IDX_' + c], outputCols = ['OHE_' + c]) for c in encoding_var]
label_indexes = StringIndexer(inputCol = 'y', outputCol = 'label', handleInvalid = 'keep')
assembler = VectorAssembler(inputCols = num_var + ['OHE_' + c for c in encoding_var], outputCol = "features")

rf = RandomForestClassifier(labelCol="label", featuresCol="features", seed = 8464,
                            numTrees=10, cacheNodeIds = True, subsamplingRate = 0.7)
fis = FeatureImpSelector(estimator = rf, selectorType = "numTopFeatures",
                         numTopFeatures = 10, outputCol = "features_subset")
rf2 = RandomForestClassifier(labelCol="label", featuresCol="features_subset", seed = 8464,
                            numTrees=10, cacheNodeIds = True, subsamplingRate = 0.7)

pipe = Pipeline(stages = string_indexes + onehot_indexes + [assembler, label_indexes, fis, rf2])
pipeline_mod = pipe.fit(df)
df2 = pipeline_mod.transform(df)
ExtractFeatureImp(mod.stages[-1].featureImportances, df2, "features_subset")
idxnamescore
33cons_price_idx0.221203
98OHE_contact_cellular0.186892
11euribor3m0.159579
66emp_var_rate0.076177
89age0.063016
77previous0.042298
55cons_conf_idx0.038875
44nr_employed0.030470
00pdays0.025732
22duration0.003744

10 features as intended and not suprisingly, it matches the top 10 features as generated by our previous non-pipeline method.

Hope you found the tutorial useful and maybe it will inspire you to create more useful extensions for pyspark.