4.4 Leverage Spark Using R Notebook
R is a powerful tool for data analysis given the data can be fit into memory. Because of the memory bounded dataset limit, R itself cannot be used directly for big data analysis where the data is likely stored in Hadoop and Spark system. By leverage the sparklyr
package created by RStudio, we can use Databricks’ R notebook to analyze data stored in the Spark system. As the data are stored across different nodes, Spark enables parallel computation using the collection of memory and CPU across all nodes. The fundamental data element in the Spark system is called Spark DataFrames (SDF). In this section, we will illustrate how to use Databricks’ R notebook for big data analysis on top of the Spark environment through sparklyr
package.
Install pacakge
First, we need to install sparklyr
package which enables the connection between local node to Spark cluster environments. As it will install more than 10 dependencies, it may take a few minutes to finish. Be patient while it is installing! Once the installation finishes, load the sparklyr
package as illustrated by the following code:
# Install sparklyr
if (!require("sparklyr")) {
install.packages("sparklyr")
}# Load sparklyr package
library(sparklyr)
Create a Spark Connection
Once the library is loaded, we need to create a Spark Connection to link the computing node (i.e. local node) running the R notebook to the Spark environment. Here we use the "databricks"
option for parameter method
which is specific for databricks’ cloud system. In other enterprise environments, please consult your administrator for details. The Spark Connection (i.e. sc
) is the pipe to connect R notebook in the local node with the Spark Cluster. We can think of the R notebook is running on a local node that has its memory and CPU; the Spark system has a cluster of connected computation nodes, and the Spark Connection creates a mechanism to connect both systems. The Spark Connection can be established with:
# create a sparklyr connection
<- spark_connect(method = "databricks") sc
To simplify the learning process, let us use a very familiar small dataset: the iris dataset. It is part of the dplyr
library and we can load that library to use the iris
data frame. Now the iris
dataset is still on the local node where the R notebook is running on. And we can check the first a few lines of the iris
dataset using the code below:
library(dplyr)
head(iris)
## Sepal.Length Sepal.Width Petal.Length Petal.Width
## 1 5.1 3.5 1.4 0.2
## 2 4.9 3.0 1.4 0.2
## 3 4.7 3.2 1.3 0.2
## 4 4.6 3.1 1.5 0.2
## 5 5.0 3.6 1.4 0.2
## 6 5.4 3.9 1.7 0.4
## Species
## 1 setosa
## 2 setosa
## 3 setosa
## 4 setosa
## 5 setosa
## 6 setosa
IMPORTANT - Copy Data to Spark Environment
In real applications, the data set may be massive and cannot fit in a single hard disk and most likely such data are already stored in the Spark system. If the data is already in Hadoop/Spark ecosystem in the form of SDF, we can create a local R object to link to the SDF by the tbl()
function where my_sdf
is the SDF in the Spark system, and my_sdf_tbl
is the R local object that referring to my_sdf
:
<- tbl(sc, my_sdf) my_sdf_tbl
As we just created a brand new Spark computing environment, there is no SDF in the system yet. We will need to copy a local dataset to the Spark environment. As we have already created the Spark Connection sc
, it is easy to copy data to spark system using sdf_copy_to()
function as below:
<- sdf_copy_to(sc = sc, x = iris, overwrite = T) iris_tbl
The above one-line code copies iris dataset from the local node to Spark cluster environment. “sc
” is the Spark Connection we just created; “x” is the data frame that we want to copy; “overwrite
” is the option whether we want to overwrite the target object if the same name SDF exists in the Spark environment. Finally, sdf_copy_to()
function will return an R object representing the copied SDF (i.e. creating a “pointer” to the SDF such that we can refer iris_tbl
in the R notebook to operate iris
SDF). Now irir_tbl
in the local R environment can be used to refer to the iris
SDF in the Spark system.
To check whether the iris
data was copied to the Spark environment successfully or not, we can use src_tbls()
function to the Spark Connection (sc
):
## code to return all the data frames associated with sc
src_tbls(sc)
Analyzing the Data
Now we have successfully copied the iris
dataset to the Spark environment as a SDF. This means that iris_tbl
is an R object representing the iris
SDF and we can use iris_tbl
in R to refer the iris
dataset in the Spark system (i.e. the iris
SDF). With the sparklyr
packages, we can use nearly all the functions in dplyr
to Spark DataFrame directly through iris_tbl
, same as we are applying dplyr
functions to a local R data frame in our laptop. For example, we can use the %>%
operator to pass iris_tbl
to the count()
function:
%>% count iris_tbl
or using the head()
function to return the first few rows in iris_tbl
:
head(iris_tbl)
or applying more advanced data manipulation directly to iris_tbl
:
%>%
iris_tbl mutate(Sepal_Add = Sepal_Length + Sepal_Width) %>%
group_by(Species) %>%
summarize(count = n(), Sepal_Add_Avg = mean(Sepal_Add))
Collect Results Back to Local Node
Even though we can run nearly all of the dplyr
functions on SDF, we cannot apply functions from other packages directly to SDF (such as ggplot()
). For functions that can only work on local R data frames, we must copy the SDF back to the local node as an R data frame. To copy SDF back to the local node, we use the collect()
function. The following code using collect()
will collect the results of a few operations and assign the collected data to iris_summary
, a local R data frame:
<- iris_tbl %>%
iris_summary mutate(Sepal_Width_round = round(Sepal_Width * 2) / 2) %>%
group_by(Species, Sepal_Width_round) %>%
summarize(count = n(), Sepal_Length_avg = mean(Sepal_Length),
Sepal_Length_stdev = sd(Sepal_Length)) %>%
collect()
Now, iris_summary
is a local R object to the R notebook and we can use any R packages and functions to it. In the following code, we will apply ggplot()
to it, exactly the same as a stand along R console:
library(ggplot2)
ggplot(iris_summary, aes(Sepal_Width_round,
Sepal_Length_avg, color = Species)) +
geom_line(size = 1.2) +
geom_errorbar(aes(ymin = Sepal_Length_avg - Sepal_Length_stdev,
ymax = Sepal_Length_avg + Sepal_Length_stdev),
width = 0.05) +
geom_text(aes(label = count),
vjust = -0.2,
hjust = 1.2,
color = "black") +
theme(legend.position="top")
In most cases, the heavy-duty data preprocessing and aggregation is done in Spark using functions in dplyr
. Once the data is aggregated, the size is usually dramatically reduced and such reduced data can be collected to an R local object for downstream analysis.
Fit Regression to SDF
One of the advantages of the Spark system is the parallel machine learning algorithm. There are many statistical and machine learning algorithms developed to run in parallel across many CPUs with data across many memory units for SDF. In this example, we have already uploaded the iris
data to the Spark system, and the data in the SDF can be referred through iris_tbl
as in the last section. The linear regression algorithm implemented in the Spark system can be called through ml_linear_regression()
function. The syntax to call the function is to define the local R object that representing the SDF (i.e. iris_tbl
(local R object) for iris
(SDF)), response variable (i.e. the y variable in linear regression in the SDF) and features (i.e. the x variables in linear regression in the SDF). Now, we can easily fit a linear regression for large dataset far beyond the memory limit of one single computer, and it is truly scalable and only constrained by the resource of the Spark cluster. Below is an illustration of how to fit a linear regression to SDF using R notebook:
<- ml_linear_regression(x = iris_tbl,
fit1 response = "Sepal_Length",
features = c("Sepal_Width", "Petal_Length", "Petal_Width"))
summary(fit1)
In the above code, x
is the R object pointing to the SDF; response
is y-variable, features
are the collection of explanatory variables. For this function, both the data and computation are in the Spark cluster which leverages multiple CPUs, distributed memories and parallel computing.
Fit a K-means Cluster
Through the sparklyr
package, we can use an R notebook to access many Spark Machine Learning Library (MLlib) algorithms such as Linear Regression, Logistic Regression, Survival Regression, Generalized Linear Regression, Decision Trees, Random Forests, Gradient-Boosted Trees, Principal Components Analysis, Naive-Bayes, K-Means Clustering, and a few other methods. Below codes fit a k-means cluster algorithm:
## Now fit a k-means clustering using iris_tbl data
## with only two out of four features in iris_tbl
<- ml_kmeans(x = iris_tbl, k = 3,
fit2 features = c("Petal_Length", "Petal_Width"))
# print our model fit
print(fit2)
After fitting the k-means model, we can apply the model to predict other datasets through ml_predict()
function. Following code applies the model to iris_tbl
again to predict the cluster and collect the results as a local R object (i.e. prediction
) using collect()
function:
= collect(ml_predict(fit2, iris_tbl)) prediction
As prediction
is a local R object, we can apply any R functions from any libraries to it. For example:
%>%
prediction ggplot(aes(Petal_Length, Petal_Width)) +
geom_point(aes(Petal_Width, Petal_Length,
col = factor(prediction + 1)),
size = 2, alpha = 0.5) +
geom_point(data = fit2$centers, aes(Petal_Width, Petal_Length),
col = scales::muted(c("red", "green", "blue")),
pch = 'x', size = 12) +
scale_color_discrete(name = "Predicted Cluster",
labels = paste("Cluster", 1:3)) +
labs(x = "Petal Length",
y = "Petal Width",
title = "K-Means Clustering",
subtitle = "Use Spark ML to predict cluster
membership with the iris dataset")
So far, we have illustrated
- the relationship between a local node (i.e. where R notebook is running) and Spark Clusters (i..e where data are stored and computation are done);
- how to copy a local data frame to a Spark DataFrames (please note if your data is already in Spark environment, there is no need to copy and we only need to build the connection. This is likely to be the case for enterprise environment);
- how to manipulate Spark DataFrames for data cleaning and preprocessing through
dplyr
functions with the installation ofsparklyr
package; - how to fit statistical and machine learning models to Spark DataFrame in a truly parallel manner;
- how to collect information from Spark DataFrames back to a local R object (i.e. local R data frame) for future analysis.
These procedures cover the basics of big data analysis that a data scientist needs to know as a beginner. We have an R notebook on the book website that contains the contents of this chapter. We also have a Python notebook on the book website.