A Project in PySpark

COVID-19 Global Forecasting

Talha Hanif Butt
13 min readApr 18, 2020

So, as every course ends and some have a project to be submitted by the students, I had to choose mine, for which I first thought about implementing Alpha Zero in PySpark as I had previously done in Python:

So I wrote a proposal and submitted it but after some days, our supervisor told us:

If some group is not very happy with their project or dataset then they can explore COVID datasets on kaggle. Some of them are big.

And I thought, it’s better to work on something new rather than implementing the same algorithm in two different frameworks, after which I changed my project to COVID-19 Global Forecasting.

Source: https://www.medicalnewstoday.com/articles/coronavirus-myths-explored

The challenge involved forecasting confirmed cases and fatalities between April 1 and April 30 by region, the primary goal wasn’t only to produce accurate forecasts. It was also to identify factors that appeared to impact the transmission rate of COVID-19.

When I started, the competition was in Week-3. Following is what I did in the project:

Methodology

Visualization

Pre-Processing

Linear Regression

Decision Tree

Random Forest

Gradient Boosted Tree

Higher Dimension Data Projection + Gradient Boosted Tree

Post-Processing

In PySpark, it is not possible to train a regression model with multiple ouputs, as a result of which, separate training and testing needs to be performed for Confirmed Cases and Fatalities followed by joining their individual outputs in a single file for submission.

Visualization

The data set provided had 22,032 training records while 13,158 test records having following schemas:

Train
Test
Some Training Records
Some Test Records

Confirmed Cases but no Fatalities Sorted

sqlDF = spark.sql("SELECT Date, count(ConfirmedCases) FROM train_df WHERE ConfirmedCases>0 AND Fatalities=0 GROUP BY Date ORDER BY Date ASC")
sqlDF.show()
plt.xlabel("Days")
plt.ylabel("Count")
plt.plot(sqlDF.toPandas()["count(ConfirmedCases)"])
sqlDF.toPandas()["count(ConfirmedCases)"].hist()
sqlDF.toPandas()["count(ConfirmedCases)"].describe()

Countrywise Top 5

sqlDF = spark.sql("SELECT Country_Region, count(ConfirmedCases) FROM train_df WHERE ConfirmedCases>0 AND Fatalities=0 GROUP BY Country_Region ORDER BY count(ConfirmedCases) DESC LIMIT 5")
sqlDF.show()

Countrywise Bottom 5

sqlDF = spark.sql("SELECT Country_Region, count(ConfirmedCases) FROM train_df WHERE ConfirmedCases>0 AND Fatalities=0 GROUP BY Country_Region ORDER BY count(ConfirmedCases) ASC LIMIT 5")
sqlDF.show()

Per Day Confirmed Cases Sorted

sqlDF = spark.sql("SELECT Date, count(ConfirmedCases) FROM train_df WHERE ConfirmedCases>0 GROUP BY Date ORDER BY Date ASC")
sqlDF.show()
plt.xlabel("Days")
plt.ylabel("Count")
plt.plot(sqlDF.toPandas()["count(ConfirmedCases)"])
sqlDF.toPandas()["count(ConfirmedCases)"].hist()
sqlDF.toPandas()["count(ConfirmedCases)"].describe()

Per Day Fatalities Sorted

sqlDF = spark.sql("SELECT Date, count(Fatalities) FROM train_df WHERE Fatalities>0 GROUP BY Date ORDER BY Date ASC")
sqlDF.show()
plt.xlabel("Days")
plt.ylabel("Count")
plt.plot(sqlDF.toPandas()["count(Fatalities)"])
sqlDF.toPandas()["count(Fatalities)"].hist()
sqlDF.toPandas()["count(Fatalities)"].describe()

Countrywise Confirmed Cases

sqlDF = spark.sql("SELECT Country_Region, count(ConfirmedCases) FROM train_df WHERE ConfirmedCases>0 GROUP BY Country_Region")
sqlDF.toPandas()["count(ConfirmedCases)"].describe()

Top 5

sqlDF = spark.sql("SELECT Country_Region, count(ConfirmedCases) FROM train_df WHERE ConfirmedCases>0 GROUP BY Country_Region ORDER BY count(ConfirmedCases) DESC LIMIT 5").show()

Bottom 5

sqlDF = spark.sql("SELECT Country_Region, count(ConfirmedCases) FROM train_df WHERE ConfirmedCases>0 GROUP BY Country_Region ORDER BY count(ConfirmedCases) ASC LIMIT 5").show()

Countrywise Fatalities

sqlDF = spark.sql("SELECT Country_Region, count(Fatalities) FROM train_df WHERE Fatalities>0 GROUP BY Country_Region")
sqlDF.toPandas()["count(Fatalities)"].describe()

Top 5

sqlDF = spark.sql("SELECT Country_Region, count(Fatalities) FROM train_df WHERE Fatalities>0 GROUP BY Country_Region ORDER BY count(Fatalities) DESC LIMIT 5").show()

Bottom 5

sqlDF = spark.sql("SELECT Country_Region, count(Fatalities) FROM train_df WHERE Fatalities>0 GROUP BY Country_Region ORDER BY count(Fatalities) ASC LIMIT 5").show()

Province_State had NULL values so I tried removing records having NULL values and removing Province_State as a feature to include all the records. I got better results by removing Province_State as a whole.

Pre-Processing

Remove NULL values

N_NULL_DF = spark.sql("SELECT * FROM train_df WHERE Province_State IS NOT NULL")

Convert Timestamp to UnixTimestamp

indexer = StringIndexer(inputCol="Province_State", outputCol="Province_StateIndex")
indexed = indexer.fit(N_NULL_DF).transform(N_NULL_DF)
indexed.show()

Convert Categorical Attributes to Nominal

indexer = StringIndexer(inputCol="Province_State", outputCol="Province_StateIndex")
indexed = indexer.fit(N_NULL_DF).transform(N_NULL_DF)
indexed.show()
indexer = StringIndexer(inputCol="Country_Region", outputCol="Country_RegionIndex")
indexed = indexer.fit(indexed).transform(indexed)
indexed.show()

Linear Regression

Linear regression is a basic and commonly used type of predictive analysis. The overall idea of regression is to examine two things: (1) does a set of predictor variables do a good job in predicting an outcome (dependent) variable? (2) Which variables in particular are significant predictors of the outcome variable, and in what way do they–indicated by the magnitude and sign of the beta estimates–impact the outcome variable? These regression estimates are used to explain the relationship between one dependent variable and one or more independent variables. The simplest form of the regression equation with one dependent and one independent variable is defined by the formula y = c + b*x, where y = estimated dependent variable score, c = constant, b = regression coefficient, and x = score on the independent variable.

Linear Regression including Province_State

Confirmed Cases

Train

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['Province_StateIndex', 'Country_RegionIndex', 'Date'],outputCol="features")
train_df = assembler.transform(indexed)
# Fit a linear regression model
lr = LinearRegression(featuresCol = 'features', labelCol='Fatalities', maxIter=100, tol=1e-6, fitIntercept=False,standardization=True, solver="auto", aggregationDepth=2,loss="huber", epsilon=1.35)
lr_model = lr.fit(train_df)
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lr_model.coefficients))
print("Intercept: %s" % str(lr_model.intercept))
# Summarize the model over the training set and print out some metrics
trainingSummary = lr_model.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
#trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("MAE: %f" % trainingSummary.meanAbsoluteError)
print("MSE: %f" % trainingSummary.meanSquaredError)
#print("r2: %f" % trainingSummary.r2)

Test

confirmedcases_lr_predictions = lr_model.transform(test_df)

Fatalities

Train

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['Province_StateIndex', 'Country_RegionIndex', 'Date'],outputCol="features")
train_df = assembler.transform(indexed)
# Fit a linear regression model
lr = LinearRegression(featuresCol = 'features', labelCol='Fatalities', maxIter=100, tol=1e-6, fitIntercept=False,standardization=False, solver="auto", aggregationDepth=2,loss="huber", epsilon=1.35)
lr_model = lr.fit(train_df)
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lr_model.coefficients))
print("Intercept: %s" % str(lr_model.intercept))
# Summarize the model over the training set and print out some metrics
trainingSummary = lr_model.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
#trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("MAE: %f" % trainingSummary.meanAbsoluteError)
print("MSE: %f" % trainingSummary.meanSquaredError)
#print("r2: %f" % trainingSummary.r2)

Test

fatalities_lr_predictions = lr_model.transform(test_df)

Linear Regression without including Province_State

Exclude Province_State

N_NULL_DF = spark.sql("SELECT Country_Region, Date, ConfirmedCases, Fatalities FROM train_df")

Confirmed Cases

Train

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['Country_RegionIndex', 'Date'],outputCol="features")
train_df = assembler.transform(indexed)
# Fit a linear regression model
lr = LinearRegression(featuresCol = 'features', labelCol='ConfirmedCases', maxIter=100, tol=1e-6, fitIntercept=False,standardization=True, solver="auto", aggregationDepth=2,loss="huber", epsilon=1.35)
lr_model = lr.fit(train_df)
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lr_model.coefficients))
print("Intercept: %s" % str(lr_model.intercept))
# Summarize the model over the training set and print out some metrics
trainingSummary = lr_model.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
#trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("MAE: %f" % trainingSummary.meanAbsoluteError)
print("MSE: %f" % trainingSummary.meanSquaredError)
#print("r2: %f" % trainingSummary.r2)

Test

confirmedcases_lr_predictions = lr_model.transform(test_df)

Fatalities

Train

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['Country_RegionIndex', 'Date'],outputCol="features")
train_df = assembler.transform(indexed)
# Fit a linear regression model
lr = LinearRegression(featuresCol = 'features', labelCol='Fatalities', maxIter=100, tol=1e-6, fitIntercept=True,standardization=True, solver="auto", aggregationDepth=2,loss="huber", epsilon=1.35)
lr_model = lr.fit(train_df)
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lr_model.coefficients))
print("Intercept: %s" % str(lr_model.intercept))
# Summarize the model over the training set and print out some metrics
trainingSummary = lr_model.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
#trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("MAE: %f" % trainingSummary.meanAbsoluteError)
print("MSE: %f" % trainingSummary.meanSquaredError)
#print("r2: %f" % trainingSummary.r2)

Test

fatalities_lr_predictions = lr_model.transform(test_df)

Decision Tree

A tree can be “learned” by splitting the source set into subsets based on an attribute value test. This process is repeated on each derived subset in a recursive manner called recursive partitioning. The recursion is completed when the subset at a node all has the same value of the target variable, or when splitting no longer adds value to the predictions. The construction of decision tree classifier does not require any domain knowledge or parameter setting, and therefore is appropriate for exploratory knowledge discovery. Decision trees can handle high dimensional data. In general decision tree classifier has good accuracy. Decision tree induction is a typical inductive approach to learn knowledge on classification.

Decision trees classify instances by sorting them down the tree from the root to some leaf node, which provides the classification of the instance. An instance is classified by starting at the root node of the tree,testing the attribute specified by this node,then moving down the tree branch corresponding to the value of the attribute. This process is then repeated for the subtree rooted at the new node.

DecisionTree without including Province_State

Confirmed Cases

Train

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['Country_RegionIndex', 'Date'],outputCol="features")
train_df = assembler.transform(indexed)
# Fit a linear regression model
dt = DecisionTreeRegressor(featuresCol = 'features', labelCol='ConfirmedCases', maxDepth = 3, maxBins=1000)
dt_model = dt.fit(train_df)

Test

confirmedcases_dt_predictions = dt_model.transform(test_df)

Fatalities

Train

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['Country_RegionIndex', 'Date'],outputCol="features")
train_df = assembler.transform(indexed)
# Fit a linear regression model
dt = DecisionTreeRegressor(featuresCol = 'features', labelCol='Fatalities', maxDepth = 3, maxBins=1000)
dt_model = dt.fit(train_df)

Test

fatalities_dt_predictions = dt_model.transform(test_df)

Random Forest

Random forest, like its name implies, consists of a large number of individual decision trees that operate as an ensemble. Each individual tree in the random forest spits out a class prediction and the class with the most votes becomes our model’s prediction.

Random Forest without including Province_State

Confirmed Cases

Train

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['Country_RegionIndex', 'Date'],outputCol="features")
train_df = assembler.transform(indexed)
# Fit a linear regression model
rf = RandomForestRegressor(featuresCol = 'features', labelCol='ConfirmedCases', maxDepth = 3, maxBins=1000)
rf_model = rf.fit(train_df)

Test

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['Country_RegionIndex', 'Date'],outputCol="features")
test_df = assembler.transform(test_indexed)
confirmedcases_rf_predictions = rf_model.transform(test_df)

Fatalities

Train

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['Country_RegionIndex', 'Date'],outputCol="features")
train_df = assembler.transform(indexed)
# Fit a linear regression model
rf = RandomForestRegressor(featuresCol = 'features', labelCol='Fatalities', maxDepth = 3, maxBins=1000)
rf_model = rf.fit(train_df)

Test

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['Country_RegionIndex', 'Date'],outputCol="features")
test_df = assembler.transform(test_indexed)
fatalities_rf_predictions = rf_model.transform(test_df)

Gradient Boosted Tree

A gradient boosted model is an ensemble of either regression or classification tree models. Both are forward-learning ensemble methods that obtain predictive results through gradually improved estimations. Boosting is a flexible nonlinear regression procedure that helps improving the accuracy of trees. By sequentially applying weak classification algorithms to the incrementally changed data, a series of decision trees are created that produce an ensemble of weak prediction models. While boosting trees increases their accuracy, it also decreases speed and human interpretability. The gradient boosting method generalizes tree boosting to minimize these issues.

Confirmed Cases

Train

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['Country_RegionIndex', 'Date'],outputCol="features")
train_df = assembler.transform(indexed)
# Fit a linear regression model
rf = GBTRegressor(featuresCol = 'features', labelCol='ConfirmedCases', maxDepth = 3, maxBins=1000)
rf_model = rf.fit(train_df)

Test

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['Country_RegionIndex', 'Date'],outputCol="features")
test_df = assembler.transform(test_indexed)
confirmedcases_rf_predictions = rf_model.transform(test_df)

Fatalities

Train

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['Country_RegionIndex', 'Date'],outputCol="features")
train_df = assembler.transform(indexed)
# Fit a linear regression model
rf = GBTRegressor(featuresCol = 'features', labelCol='Fatalities', maxDepth = 3, maxBins=1000)
rf_model = rf.fit(train_df)

Test

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['Country_RegionIndex', 'Date'],outputCol="features")
test_df = assembler.transform(test_indexed)
fatalities_rf_predictions = rf_model.transform(test_df)

Higher Dimension Data Projection + Gradient Boosted Tree

In this experiment, I tried projecting data to higher dimensions by first taking square and then cube of Country_RegionIndex followed by Gradient Boosted Tree.

Square

Country_RegionIndex

Train

indexed = indexed.withColumn("sq_cri", indexed["Country_RegionIndex"]**2)

Test

test_indexed = test_indexed.withColumn("sq_cri", test_indexed["Country_RegionIndex"]**2)

Confirmed Cases

Train

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['Country_RegionIndex', 'Date', 'sq_cri'],outputCol="features")
train_df = assembler.transform(indexed)
# Fit a linear regression model
rf = GBTRegressor(featuresCol = 'features', labelCol='ConfirmedCases', maxDepth = 30, maxBins=1000)
rf_model = rf.fit(train_df)

Test

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['Country_RegionIndex', 'Date', 'sq_cri'],outputCol="features")
test_df = assembler.transform(test_indexed)
confirmedcases_rf_predictions = rf_model.transform(test_df)

Fatalities

Train

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['Country_RegionIndex', 'Date', 'sq_cri'],outputCol="features")
train_df = assembler.transform(indexed)
# Fit a linear regression model
rf = GBTRegressor(featuresCol = 'features', labelCol='Fatalities', maxDepth = 30, maxBins=1000)
rf_model = rf.fit(train_df)

Test

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['Country_RegionIndex', 'Date','sq_cri'],outputCol="features")
test_df = assembler.transform(test_indexed)
fatalities_rf_predictions = rf_model.transform(test_df)

Cube

Country_RegionIndex

Train

indexed = indexed.withColumn("cube_cri", indexed["Country_RegionIndex"]**3)

Test

test_indexed = test_indexed.withColumn("cube_cri", test_indexed["Country_RegionIndex"]**3)

Confirmed Cases

Train

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['Country_RegionIndex', 'Date', 'sq_cri', 'cube_cri'],outputCol="features")
train_df = assembler.transform(indexed)
# Fit a linear regression model
rf = GBTRegressor(featuresCol = 'features', labelCol='ConfirmedCases', maxDepth = 30, maxBins=1000)
rf_model = rf.fit(train_df)

Test

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['Country_RegionIndex', 'Date', 'sq_cri', 'cube_cri'],outputCol="features")
test_df = assembler.transform(test_indexed)
confirmedcases_rf_predictions = rf_model.transform(test_df)

Fatalities

Train

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['Country_RegionIndex', 'Date', 'sq_cri', 'cube_cri'],outputCol="features")
train_df = assembler.transform(indexed)
# Fit a linear regression model
rf = GBTRegressor(featuresCol = 'features', labelCol='Fatalities', maxDepth = 30, maxBins=1000)
rf_model = rf.fit(train_df)

Test

# Create a vector representation for features
assembler = VectorAssembler(inputCols=['Country_RegionIndex', 'Date','sq_cri', 'cube_cri'],outputCol="features")
test_df = assembler.transform(test_indexed)
fatalities_rf_predictions = rf_model.transform(test_df)

Post-Processing

Create Submission File

from csv import writer
from csv import reader

def add_column_in_csv(input_file, output_file, transform_row):
""" Append a column in existing csv using csv.reader / csv.writer classes"""
# Open the input_file in read mode and output_file in write mode
with open(input_file, 'r') as read_obj, \
open(output_file, 'w', newline='') as write_obj:
# Create a csv.reader object from the input file object
csv_reader = reader(read_obj)
# Create a csv.writer object from the output file object
csv_writer = writer(write_obj)
# Read each row of the input csv file as list
for row in csv_reader:
# Pass the list / row in the transform function to add column text for this row
transform_row(row, csv_reader.line_num)
# Write the updated row / list to the output file
csv_writer.writerow(row)

Confirmed Cases

confirmedcases_lr_predictions.toPandas()["ForecastId"]
indexes = confirmedcases_lr_predictions.toPandas()["ForecastId"]
pred = confirmedcases_lr_predictions.toPandas()["prediction"]
test_rec = 13159
data = np.zeros((test_rec,1))
np.shape(data)
data[indexes,0] = round(pred)
header_of_new_col = 'ConfirmedCases'
# Add the column in csv file with header
add_column_in_csv('sample_submission.csv', 'new_submission.csv',
lambda row, line_num: row.append(header_of_new_col) if line_num == 1 else row.append(
data[line_num-1,0]))

Fatalities

fatalities_lr_predictions.toPandas()["ForecastId"]
indexes = fatalities_lr_predictions.toPandas()["ForecastId"]
pred = fatalities_lr_predictions.toPandas()["prediction"]
test_rec = 13159
data = np.zeros((test_rec,1))
np.shape(data)
data[indexes,0] = round(pred*100000000)
header_of_new_col = 'Fatalities'
# Add the column in csv file with header
add_column_in_csv('new_submission.csv', 'new_submission2.csv',
lambda row, line_num: row.append(header_of_new_col) if line_num == 1 else row.append(
data[line_num-1,0]))
import pandas as pd
f=pd.read_csv("new_submission2.csv")
keep_col = ['ForecastId','ConfirmedCases','Fatalities']
new_f = f[keep_col]
new_f.to_csv("submission.csv", index=False)

References

--

--

Talha Hanif Butt

PhD Student -- Signal and Systems Engineering, Halmstad University, Volvo Trucks http://pk.linkedin.com/in/talhahanifbutt