Logistic Regression will be used to predict Low Birth Weight in infants using the remaining variables in the dataset. Low Birth Weight, the target variable, 1 = BWT <=2500g and 0 = BWT >2500g.
Dataset Source: Hosmer, D.W., Lemeshow, S. and Sturdivant, R.X. (2013) Applied Logistic Regression, 3rd ed., New York: Wiley
This dataset is also part of the aplore3 R package.
Variable | Description | ||
0 | HT | History of Hypertension (1: No, 2: Yes) | |
1 | PTL | History of premature labor (1: None, 2: One, 3: Two, etc) | |
2 | Age | Mother’s age in years | |
3 | Race | Race (1: White, 2: Black, 3: Other) | |
4 | LOW | (target) indicator of birth weight: (0: >= 2500, 1:\< 2500g) | |
5 | UI | Presence of Uterine irritability (1: No, 2: Yes) | |
6 | FTV | Number of physician visits during the first trimester (1: None, 2: One, 3: Two, etc) | |
7 | Smoke | Smoking status during pregnancy (1: No, 2: Yes) | |
8 | BabyID | Baby's unique identifier |
The Spark and Python libraries that you need are preinstalled in the notebook environment and only need to be loaded.
Run the following cell to load the libraries you will work with in this notebook:
# PySpark Machine Learning Library
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row, SQLContext
import os
import sys
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint
from numpy import array
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Library for confusion matrix, precision, test error
from pyspark.mllib.evaluation import MulticlassMetrics
# Library For Area under ROC curve and Area under precision-recall curve
from pyspark.mllib.evaluation import BinaryClassificationMetrics
# Assign resources to the application
sqlContext = SQLContext(sc)
# packages for data analysis
import numpy as np
import pandas as pd
# This is the summary of the data structure, including the column position and name.
# The first filed starts from position 0.
# 0 ID - Identification code
# 1 LOW - Low birth weight (0: >= 2500 g, 1: < 2500 g), target variable
# 2 AGE - Mother's age in years
# 3 RACE - Race (1: White, 2: Black, 3: Other)
# 4 SMOKE - Smoking status during pregnancy (1: No, 2: Yes)
# 5 PTL - History of premature labor (1: None, 2: One, 3: Two, etc)
# 6 HT - History of hypertension (1: No, 2: Yes)
# 7 UI - Presence of Uterine irritability (1: No, 2: Yes)
# 8 FTV - Number of physician visits during the first trimester (1: None, 2: One, 3: Two, etc)
# Label is a target variable. PersonInfo is a list of independent variables besides unique identifier
LabeledDocument = Row("ID", "PersonInfo", "label")
# Define a function that parses the raw CSV file and returns an object of type LabeledDocument
def parseDocument(line):
values = [str(x) for x in line.split(',')]
if (values[1]>'0'):
LOW = 1.0
else:
LOW = 0.0
textValue = str(values[2]) + " " + str(values[3])+ " " + str(values[4]) + str(values[5])+ " " + str(values[6]) + str(values[7])+ " " + str(values[8])
return LabeledDocument(values[0], textValue, LOW)
To read a file from Object Storage, you must setup the Spark configuration with your Object Storage credentials.
To do this, click on the cell below and select the Insert to code > Insert Spark Session DataFrame function from the Files tab below the data file you want to work with.
import ibmos2spark
# @hidden_cell
credentials = {
'endpoint': 'https://s3-api.us-geo.objectstorage.service.networklayer.com',
'service_id': 'iam-ServiceId-7c0fb81a-082a-4b1b-9f2e-eb2eb9bfbb3d',
'iam_service_endpoint': 'https://iam.cloud.ibm.com/oidc/token',
'api_key': '7ad67gk3JVD9viCagj_lTRN8_2Srxi-96QpN7e0PqDSs'
}
configuration_name = 'os_697aafb312964e4db161e65d0a2ee97e_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df_data_1 = spark.read\
.format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
.option('header', 'true')\
.option('inferSchema', 'true')\
.load(cos.url('lowbwt.csv', 'apachesparktutorial-donotdelete-pr-gzhhj3lfvqnwyl'))
df_data_1.take(5)
print('Number of cases', df_data_1.count())
#Number of babies born with weight less than 2.5kg and weight equal or above 2.5kg
df_data_1.groupby('LOW').count().show()
#Low birth weight grouped by Race
import matplotlib.pyplot as plt
%matplotlib inline
df_data_1.crosstab('RACE', 'LOW').show()
df=df_data_1.crosstab('RACE', 'LOW').toPandas()
df.plot.bar(x="RACE_LOW", legend=True , title="Low Birth Weight by Mother's Race")
#Low birth weight grouped by number of doctor's visit
import matplotlib.pyplot as plt
%matplotlib inline
df_data_1.crosstab('FTV', 'LOW').show()
df=df_data_1.crosstab('FTV', 'LOW').toPandas()
df.plot.bar(x="FTV_LOW", legend=True , title="Low Birth Weight by Mother's regular visit to doctor")
#Low birth weight grouped by smoking status
import matplotlib.pyplot as plt
%matplotlib inline
df_data_1.crosstab('SMOKE', 'LOW').show()
df=df_data_1.crosstab('SMOKE', 'LOW').toPandas()
df.plot.bar(x="SMOKE_LOW", legend=True , title="Low Birth Weight by Smoking Status")
# Average age by LOW outcome
# Convert Age data type from string to numeric.
pdf=df_data_1.toPandas()
pdf["AGE"]=pd.to_numeric(pdf.AGE)
df=sqlContext.createDataFrame(pdf)
PAge=df.groupby(['LOW'])\
.agg({"AGE": "AVG"}).toPandas()
PAge
#Age distribution for all mother
df.toPandas()["AGE"].plot.hist(x="Age", title="Age distribution")
# Age distribution for women having children with low birth weights by age.
df_died=df.select('AGE', 'LOW').filter(df['LOW']=='1')
pdf_died=df_died.select('AGE').toPandas()
pdf_died.plot.hist('AGE', color='m',legend=False, title="Age distribution for women having children with low birth weights by age")
Now let's load the data into a Spark RDD
and output the number of rows and first 5 rows.
Each project you create has a bucket in your object storage. You may get the bucket name from the project Settings page. Replace the string BUCKET
to the bucket name
data = sc.textFile(cos.url('lowbwt.csv', 'apachesparktutorial-donotdelete-pr-gzhhj3lfvqnwyl'))
print ("Total records in the data set:", data.count())
print ("The first 5 rows")
data.take(5)
Create DataFrame from RDD
#Parse and Load the data into a dataframe. The code calls the parsing function defined above
documents = data.filter(lambda s: "Name" not in s).map(parseDocument)
lowbwtData = documents.toDF() # ToDataFrame
print ("Number of records: " + str(lowbwtData.count()))
print ( "First 5 records: ")
lowbwtData.take(5)
We divide the data into training and test set. The training set is used to build the model to be used on future data, and the test set is used to evaluate the model.
# Divide the data into training and test set, with random seed to reproduce results
(train, test) = lowbwtData.randomSplit([0.6, 0.4], seed = 123)
print ("Number of records in the training set: " + str(train.count()))
print ("Number of records in the test set: " + str(test.count()))
# Output first 20 records in the training set
print ("First 20 records in the training set: ")
train.show()
We use the Pipeline of SparkML to build the Logistic Regression Model
# set up Logistic Regression using Pipeline of SparkML
tokenizer = Tokenizer(inputCol="PersonInfo", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=50, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# set up Logistic Regression Model
# the stages are executed in order
model = pipeline.fit(train)
#[stage.coefficients for stage in model.stages if hasattr(stage, "coefficients")]
# model.stages[2].intercept
model.stages[2].coefficients
# Make predictions for test data and print columns of interest
prediction = model.transform(test)
selected = prediction.select("PersonInfo", "prediction", "probability")
for row in selected.collect():
print (row)
#Tabulate the predicted outcome
prediction.select("prediction").groupBy("prediction").count().show(truncate=False)
#Tabulate the actual outcome
prediction.select("label").groupBy("label").count().show(truncate=False)
# This table shows:
# 1. The number of low birth weight infants predicted as having low birth weight
# 2. The number of low birth weight infants predicted as not having low birth weight
# 3. The number of regular birth weight infants predicted as having low birth weight
# 4. The number of regular birth weight infants predicted as not having low birth weight
prediction.crosstab('label', 'prediction').show()
We evaluate the model on a training set and on a test set. The purpose is to measure the model's predictive accuracy, including the accuracy for new data.
# Evaluate the Logistic Regression model on a training set
# Select (prediction, true label) and compute test error
pred_lr=model.transform(train).select("prediction", "label")
eval_lr=MulticlassClassificationEvaluator (
labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy_lr=eval_lr.evaluate(pred_lr)
# create RDD
predictionAndLabels_lr=pred_lr.rdd
metrics_lr=MulticlassMetrics(predictionAndLabels_lr)
precision_lr=metrics_lr.precision(1.0)
recall_lr=metrics_lr.recall(1.0)
f1Measure_lr = metrics_lr.fMeasure(1.0, 1.0)
print ("Model evaluation for the training data")
print ("Accuracy = %s" %accuracy_lr)
print ("Error = %s" % (1-accuracy_lr))
print ("Precision = %s" %precision_lr)
print ("Recall = %s" %recall_lr)
print("F1 Measure = %s" % f1Measure_lr)
# Evaluate the Logistic Regression model on a test set
# Select (prediction, true label) and compute test error
pred_lr=model.transform(test).select("prediction", "label")
eval_lr=MulticlassClassificationEvaluator (
labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy_lr=eval_lr.evaluate(pred_lr)
# create RDD
predictionAndLabels_lr=pred_lr.rdd
metrics_lr=MulticlassMetrics(predictionAndLabels_lr)
precision_lr=metrics_lr.precision(1.0)
recall_lr=metrics_lr.recall(1.0)
f1Measure_lr = metrics_lr.fMeasure(1.0, 1.0)
print ("Model evaluation for the test data")
print ("Accuracy = %s" %accuracy_lr)
print ("Error = %s" % (1-accuracy_lr))
print ("Precision = %s" %precision_lr)
print ("Recall = %s" %recall_lr)
print ("F1 Measure = %s" % f1Measure_lr)
bin_lr=BinaryClassificationMetrics(predictionAndLabels_lr)
# Area under precision-recall curve
print("Area under PR = %s" % bin_lr.areaUnderPR)
# Area under Receiver operating characteristic curve
print("Area under ROC = %s" % bin_lr.areaUnderROC)
The Binary Logistic Regression method returns the pyspark.ml.classification.BinaryLogisticRegressionTrainingSummary object. We may use the roc method to obtain the coordinates of the points on Receiver operating characteristic (ROC) curve. The coordinates are stored in a Dataframe with two fields FPR=false positive rate and TPR=true positive rate
#The firt 20 ROC curve points
model.stages[2].summary.roc.show()
import matplotlib.pyplot as plt
%matplotlib inline
ROC=model.stages[2].summary.roc
df=ROC.toPandas()
df.plot(x='FPR', y='TPR', legend=False)