4.4 Leverage Spark Using R Notebook

R is a powerful tool for data analysis given the data can be fit into memory. Because of the memory bounded dataset limit, R itself cannot be used directly for big data analysis where the data is likely stored in Hadoop and Spark system. By leverage the sparklyr package created by RStudio, we can use Databricks’ R notebook to analyze data stored in the Spark system. As the data are stored across different nodes, Spark enables parallel computation using the collection of memory and CPU across all nodes. The fundamental data element in the Spark system is called Spark DataFrames (SDF). In this section, we will illustrate how to use Databricks’ R notebook for big data analysis on top of the Spark environment through sparklyr package.

Install pacakge

First, we need to install sparklyr package which enables the connection between local node to Spark cluster environments. As it will install more than 10 dependencies, it may take a few minutes to finish. Be patient while it is installing! Once the installation finishes, load the sparklyr package as illustrated by the following code:

# Install sparklyr
if (!require("sparklyr")) {
# Load sparklyr package

Create a Spark Connection

Once the library is loaded, we need to create a Spark Connection to link the computing node (i.e. local node) running the R notebook to the Spark environment. Here we use the "databricks" option for parameter method which is specific for databricks’ cloud system. In other enterprise environments, please consult your administrator for details. The Spark Connection (i.e. sc) is the pipe to connect R notebook in the local node with the Spark Cluster. We can think of the R notebook is running on a local node that has its memory and CPU; the Spark system has a cluster of connected computation nodes, and the Spark Connection creates a mechanism to connect both systems. The Spark Connection can be established with:

# create a sparklyr connection
sc <- spark_connect(method = "databricks")

To simplify the learning process, let us use a very familiar small dataset: the iris dataset. It is part of the dplyr library and we can load that library to use the iris data frame. Now the iris dataset is still on the local node where the R notebook is running on. And we can check the first a few lines of the iris dataset using the code below:

##   Sepal.Length Sepal.Width Petal.Length Petal.Width
## 1          5.1         3.5          1.4         0.2
## 2          4.9         3.0          1.4         0.2
## 3          4.7         3.2          1.3         0.2
## 4          4.6         3.1          1.5         0.2
## 5          5.0         3.6          1.4         0.2
## 6          5.4         3.9          1.7         0.4
##   Species
## 1  setosa
## 2  setosa
## 3  setosa
## 4  setosa
## 5  setosa
## 6  setosa

IMPORTANT - Copy Data to Spark Environment

In real applications, the data set may be massive and cannot fit in a single hard disk and most likely such data are already stored in the Spark system. If the data is already in Hadoop/Spark ecosystem in the form of SDF, we can create a local R object to link to the SDF by the tbl() function where my_sdf is the SDF in the Spark system, and my_sdf_tbl is the R local object that referring to my_sdf:

my_sdf_tbl <- tbl(sc, my_sdf)

As we just created a brand new Spark computing environment, there is no SDF in the system yet. We will need to copy a local dataset to the Spark environment. As we have already created the Spark Connection sc, it is easy to copy data to spark system using sdf_copy_to() function as below:

iris_tbl <- sdf_copy_to(sc = sc, x = iris, overwrite = T)

The above one-line code copies iris dataset from the local node to Spark cluster environment. “sc” is the Spark Connection we just created; “x” is the data frame that we want to copy; “overwrite” is the option whether we want to overwrite the target object if the same name SDF exists in the Spark environment. Finally, sdf_copy_to() function will return an R object representing the copied SDF (i.e. creating a “pointer” to the SDF such that we can refer iris_tbl in the R notebook to operate iris SDF). Now irir_tbl in the local R environment can be used to refer to the iris SDF in the Spark system.

To check whether the iris data was copied to the Spark environment successfully or not, we can use src_tbls() function to the Spark Connection (sc):

## code to return all the data frames associated with sc

Analyzing the Data

Now we have successfully copied the iris dataset to the Spark environment as a SDF. This means that iris_tbl is an R object representing the iris SDF and we can use iris_tbl in R to refer the iris dataset in the Spark system (i.e. the iris SDF). With the sparklyr packages, we can use nearly all the functions in dplyr to Spark DataFrame directly through iris_tbl, same as we are applying dplyr functions to a local R data frame in our laptop. For example, we can use the %>% operator to pass iris_tbl to the count() function:

iris_tbl %>% count

or using the head() function to return the first few rows in iris_tbl:


or applying more advanced data manipulation directly to iris_tbl:

iris_tbl %>%
    mutate(Sepal_Add = Sepal_Length + Sepal_Width) %>%
    group_by(Species) %>%
    summarize(count = n(), Sepal_Add_Avg = mean(Sepal_Add))

Collect Results Back to Local Node

Even though we can run nearly all of the dplyr functions on SDF, we cannot apply functions from other packages directly to SDF (such as ggplot()). For functions that can only work on local R data frames, we must copy the SDF back to the local node as an R data frame. To copy SDF back to the local node, we use the collect() function. The following code using collect() will collect the results of a few operations and assign the collected data to iris_summary, a local R data frame:

iris_summary <- iris_tbl %>%
    mutate(Sepal_Width_round = round(Sepal_Width * 2) / 2) %>%
    group_by(Species, Sepal_Width_round) %>%
    summarize(count = n(), Sepal_Length_avg = mean(Sepal_Length),
    Sepal_Length_stdev = sd(Sepal_Length)) %>%

Now, iris_summary is a local R object to the R notebook and we can use any R packages and functions to it. In the following code, we will apply ggplot() to it, exactly the same as a stand along R console:

ggplot(iris_summary, aes(Sepal_Width_round, 
                         color = Species)) +
    geom_line(size = 1.2) +
    geom_errorbar(aes(ymin = Sepal_Length_avg - Sepal_Length_stdev, 
                      ymax = Sepal_Length_avg + Sepal_Length_stdev),
                      width = 0.05) +
    geom_text(aes(label = count), 
              vjust = -0.2, 
              hjust = 1.2,
              color = "black") +

In most cases, the heavy-duty data preprocessing and aggregation is done in Spark using functions in dplyr. Once the data is aggregated, the size is usually dramatically reduced and such reduced data can be collected to an R local object for downstream analysis.

Fit Regression to SDF

One of the advantages of the Spark system is the parallel machine learning algorithm. There are many statistical and machine learning algorithms developed to run in parallel across many CPUs with data across many memory units for SDF. In this example, we have already uploaded the iris data to the Spark system, and the data in the SDF can be referred through iris_tbl as in the last section. The linear regression algorithm implemented in the Spark system can be called through ml_linear_regression() function. The syntax to call the function is to define the local R object that representing the SDF (i.e. iris_tbl (local R object) for iris (SDF)), response variable (i.e. the y variable in linear regression in the SDF) and features (i.e. the x variables in linear regression in the SDF). Now, we can easily fit a linear regression for large dataset far beyond the memory limit of one single computer, and it is truly scalable and only constrained by the resource of the Spark cluster. Below is an illustration of how to fit a linear regression to SDF using R notebook:

fit1 <- ml_linear_regression(x = iris_tbl, 
                response = "Sepal_Length",
                features = c("Sepal_Width", "Petal_Length", "Petal_Width"))

In the above code, x is the R object pointing to the SDF; response is y-variable, features are the collection of explanatory variables. For this function, both the data and computation are in the Spark cluster which leverages multiple CPUs, distributed memories and parallel computing.

Fit a K-means Cluster

Through the sparklyr package, we can use an R notebook to access many Spark Machine Learning Library (MLlib) algorithms such as Linear Regression, Logistic Regression, Survival Regression, Generalized Linear Regression, Decision Trees, Random Forests, Gradient-Boosted Trees, Principal Components Analysis, Naive-Bayes, K-Means Clustering, and a few other methods. Below codes fit a k-means cluster algorithm:

## Now fit a k-means clustering using iris_tbl data
## with only two out of four features in iris_tbl
fit2 <- ml_kmeans(x = iris_tbl, k = 3,
                    features = c("Petal_Length", "Petal_Width"))
# print our model fit

After fitting the k-means model, we can apply the model to predict other datasets through ml_predict() function. Following code applies the model to iris_tbl again to predict the cluster and collect the results as a local R object (i.e. prediction) using collect() function:

prediction = collect(ml_predict(fit2, iris_tbl))

As prediction is a local R object, we can apply any R functions from any libraries to it. For example:

prediction %>%
  ggplot(aes(Petal_Length, Petal_Width)) +
  geom_point(aes(Petal_Width, Petal_Length, 
                 col = factor(prediction + 1)),
             size = 2, alpha = 0.5) +
  geom_point(data = fit2$centers, aes(Petal_Width, Petal_Length),
             col = scales::muted(c("red", "green", "blue")),
             pch = 'x', size = 12) +
  scale_color_discrete(name = "Predicted Cluster",
                       labels = paste("Cluster", 1:3)) + 
  labs(x = "Petal Length", 
       y = "Petal Width", 
       title = "K-Means Clustering",
       subtitle = "Use Spark ML to predict cluster 
       membership with the iris dataset")

So far, we have illustrated

  1. the relationship between a local node (i.e. where R notebook is running) and Spark Clusters (i..e where data are stored and computation are done);
  2. how to copy a local data frame to a Spark DataFrames (please note if your data is already in Spark environment, there is no need to copy and we only need to build the connection. This is likely to be the case for enterprise environment);
  3. how to manipulate Spark DataFrames for data cleaning and preprocessing through dplyr functions with the installation of sparklyr package;
  4. how to fit statistical and machine learning models to Spark DataFrame in a truly parallel manner;
  5. how to collect information from Spark DataFrames back to a local R object (i.e. local R data frame) for future analysis.

These procedures cover the basics of big data analysis that a data scientist needs to know as a beginner. We have an R notebook on the book website that contains the contents of this chapter. We also have a Python notebook on the book website.