ML_Demo2_Regression.ipynb
This notebook is part of the Spark training delivered by CERN IT¶
Regression with spark.ml¶
Contact: Luca.Canali@cern.ch
This notebook is an implementation of a regression system trained using spark.ml
to predict house prices.
The data used for this exercise is the "California Housing Prices dataset" from the StatLib repository, originally featured in the following paper: Pace, R. Kelley, and Ronald Barry. "Sparse spatial autoregressions." Statistics & Probability Letters 33.3 (1997): 291-297. The code and steps we follow in this notebook are inspired by the book "Hands-On Machine Learning with Scikit-Learn, Keras, and TensorFlow, Aurelien Geron, 2nd Edition".
Run this notebook from Jupyter with Python kernel
- When using on CERN SWAN, do not attach the notebook to a Spark cluster, but rather run locally on the SWAN container
- If running this outside CERN SWAN, plese make sure to tha PySpark installed:
pip install pyspark
Create the Spark session and read the data¶
#
# Local mode: run this when using CERN SWAN not connected to a cluster
# or run it on a private Jupyter notebook instance
# Dependency: PySpark (use SWAN or pip install pyspark)
#
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local[*]") \
.appName("ML HandsOn Regression") \
.config("spark.driver.memory","4g") \
.config("spark.ui.showConsoleProgress", "false") \
.getOrCreate()
spark
# Local mode: read the data locally from the cloned repo
df = (spark.read
.format("csv")
.option("header","true")
.option("inferschema","true")
.load("../data/housing.csv.gz")
)
Split data into a training and test datasets¶
train, test = df.randomSplit([0.8, 0.2], 4242)
# cache the training dataset
train.cache().count()
Basic data exploration¶
train.printSchema()
# The dataset reports housing prices in California from 1990s
train.limit(10).toPandas()
train.createOrReplaceTempView("train")
spark.sql("select ocean_proximity, count(*) from train group by ocean_proximity").show()
# the are some missing data in the total_bedrooms feature (i.e. there are null values)
spark.sql("select count(*) from train where total_bedrooms is null").show()
Feature preparation¶
from pyspark.ml.feature import StringIndexer,OneHotEncoder,VectorIndexer,Imputer,VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
# Transform ocean_proximity feature in a one-hot encoded feature
ocean_index = StringIndexer(inputCol="ocean_proximity",outputCol="indexed_ocean_proximity")
ocean_onehot = OneHotEncoder(inputCol="indexed_ocean_proximity",outputCol="oh_ocean_proximity",dropLast=False)
# Add missing data to the total_bedrooms feature, by using estimation.
imputer_tot_br = Imputer(strategy='median',inputCols=["total_bedrooms"],outputCols=["total_bedrooms_filled"])
features = ["longitude", "latitude", "housing_median_age",
"total_rooms", "population", "households",
"median_income", "total_bedrooms_filled"]
Build a pipeline, bundling the feature preparation steps¶
feature_preparation_pipeline = Pipeline(stages=[ocean_index,ocean_onehot,imputer_tot_br])
# fit the feature preparation pipeline with trinaing data and show the
feature_preparation_transformer = feature_preparation_pipeline.fit(train)
# show a sample of data after feature preparation
feature_preparation_transformer.transform(train).limit(10).toPandas()
Further data preparation¶
Vector assembler puts all data in a vector column. This step is required by the Spark ML algorithms.
Standard scaler is a data preparation step. StandardScaler follows Standard Normal Distribution (SND). Therefore, it makes mean = 0 and scales the data to unit variance.
assembler = VectorAssembler(inputCols=features, outputCol="unscaled_features")
std_scaler = StandardScaler(inputCol="unscaled_features", outputCol="features")
full_feature_preparation_pipeline = Pipeline(stages=[feature_preparation_pipeline,assembler,std_scaler])
# this shows the results of data scaling
full_feature_preparation_transformer = full_feature_preparation_pipeline.fit(train)
full_feature_preparation_transformer.transform(train).select("unscaled_features","features").limit(10).toPandas()
Define the model and assemble a pipeline¶
from pyspark.ml.regression import GBTRegressor
regressor = GBTRegressor(labelCol="median_house_value", maxIter=40)
pipeline = Pipeline(stages=[full_feature_preparation_pipeline, regressor])
# this is equivalent to
# pipeline = Pipeline(stages=[ocean_index, ocean_onehot, imputer_tot_br, assembler, std_scaler, regressor])
Fit the model using the training dataset¶
# model training
# this uses the pipeline built above
# the pipeline puts together transformers and the model and is an estimator
# we are going to fit it to the training data
model = pipeline.fit(train)
# the trained model can be saved on the filesystem
model.save("myTrainedModel")
Evaluate the model performance on the test dataset by computing RMSE¶
from pyspark.ml.evaluation import RegressionEvaluator
predictions = model.transform(test)
dt_evaluator = RegressionEvaluator(
labelCol="median_house_value", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
Correlation Matrix¶
The correlation matrix demonstrates the relationship between features.
Correlation ranges from -1 to +1. Values closer to zero means there is no linear trend between the two variables.
It is often displayed using a heatmap.
from pyspark.ml.stat import Correlation
matrix = Correlation.corr(full_feature_preparation_transformer.transform(train).select('features'), 'features')
matrix_np = matrix.collect()[0]["pearson({})".format('features')].values
import seaborn as sns
import matplotlib.pyplot as plt
matrix_np = matrix_np.reshape(8,8)
fig, ax = plt.subplots(figsize=(12,8))
ax = sns.heatmap(matrix_np, cmap="Blues")
ax.xaxis.set_ticklabels(features, rotation=270)
ax.yaxis.set_ticklabels(features, rotation=0)
ax.set_title("Correlation Matrix")
plt.tight_layout()
plt.show()
An example of cross validation and grid search¶
## This crossvalidation step takes several minutes, depending on the available cores
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
paramGrid = ParamGridBuilder()\
.addGrid(regressor.maxIter, [100,50]) \
.baseOn({regressor.labelCol: "median_house_value"})\
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=RegressionEvaluator(labelCol="median_house_value"),
numFolds=4)
cvModel=crossval.fit(train)
from pyspark.ml.evaluation import RegressionEvaluator
predictions = cvModel.transform(test)
dt_evaluator = RegressionEvaluator(
labelCol="median_house_value", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
spark.stop()