• Install and Load Library
  • Create Spark Connection
  • Sample Dataset
  • IMPORTANT - Copy Data to Spark Environment
  • Analyzing the Data
  • Collect Results Back to R Local Node
  • Fit A Regression Model to Spark Data Frame
  • Fit Other Models
  • Summary

This notebook illustrates how to use Databrick’s Community Edition account for the Spark big data cloud platform. We use the R package sparklyr to drive Spark computation on Spark data frames.

The methods mentioned in the notebook are only brief introductions. For more details of additional functions, refer to:

Since both sparklyr and the Databrick Community environment are fast developing, we only test the notebook to run under a specific version. This notebook is updated and tested: 2021-01-31 running on 7.2 ML cluster instance of community edition account.

Install and Load Library

First, we need to install the sparklyr package, which enables the connection between the local node (i.e., running R instance) to the Spark cluster environments. As it will install a few dependencies, it may take a few minutes to finish.

install.packages("sparklyr")

Load package:

library(sparklyr)
## 
## Attaching package: 'sparklyr'
## The following object is masked from 'package:stats':
## 
##     filter

Create Spark Connection

Once the R package is loaded, we need to create a Spark Connection to link the R local node to the Spark environment. Here we set method = "databricks" in spark_connect( ) function. In other environments, please consult your IT administrator for more details. The created Spark Connection (i.e., sc) will be the bridge that connects the R local node to the Spark Cluster.

sc <- spark_connect(method = "databricks")

Sample Dataset

To simplify the learning process, let us use the well-known iris data. It is part of the dplyr package, and let’s load the package and the iris data frame. Here the iris data frame is still in the local node that the R notebook is running on. And we can see that the first a few lines of the iris below the code after running:

library(dplyr)
head(iris)
##   Sepal.Length Sepal.Width Petal.Length Petal.Width Species
## 1          5.1         3.5          1.4         0.2  setosa
## 2          4.9         3.0          1.4         0.2  setosa
## 3          4.7         3.2          1.3         0.2  setosa
## 4          4.6         3.1          1.5         0.2  setosa
## 5          5.0         3.6          1.4         0.2  setosa
## 6          5.4         3.9          1.7         0.4  setosa

IMPORTANT - Copy Data to Spark Environment

In real big data applications, the data sets are usually too big to fit into one hard disk. It is very likely the data sets are already in the form of Spark data frames. However, for a brand Spark system we just created, there is no Spark data frame yet. For illustration purposes, we will first copy the iris R data frame to Spark using the sdf_copy_to() function as described below:

iris_tbl <- sdf_copy_to(sc = sc, x = iris, overwrite = T)

The sdf_copy_to() function will return an R object iris_tbl referring to the newly created Spark data frame iris. Later in the notebook, we can use iris_tbl to refer to the iris Spark data frame. To list the existing Spark data frames, we can use src_tbls( ) function to the Spark Connection (sc):

src_tbls(sc)
## [1] "iris"

Analyzing the Data

With sparklyr packages, we can use many functions in dplyr to Spark data frame iris directly through R object iris_tbl, same as we are applying the same dplyr functions to a local R data frame. For example, we can use %>% operator to pass iris_tbl to count( ) function:

iris_tbl %>% count 
## # Source: spark<?> [?? x 1]
##       n
##   <dbl>
## 1   150
head(iris_tbl)
## # Source: spark<?> [?? x 5]
##   Sepal_Length Sepal_Width Petal_Length Petal_Width Species
##          <dbl>       <dbl>        <dbl>       <dbl> <chr>  
## 1          5.1         3.5          1.4         0.2 setosa 
## 2          4.9         3            1.4         0.2 setosa 
## 3          4.7         3.2          1.3         0.2 setosa 
## 4          4.6         3.1          1.5         0.2 setosa 
## 5          5           3.6          1.4         0.2 setosa 
## 6          5.4         3.9          1.7         0.4 setosa

Or more advanced data operation directly to iris_tbl:

iris_tbl %>% 
  mutate(Sepal_Width = ceiling(Sepal_Width*2)/2) %>%
  group_by(Species, Sepal_Width) %>% 
  summarize(count = n(), 
            avg_Sepal_Length = mean(Sepal_Length), 
            std_Sepal_Length = sd(Sepal_Length)) 
## Warning: Missing values are always removed in SQL.
## Use `mean(x, na.rm = TRUE)` to silence this warning
## This warning is displayed only once per session.
## Warning: Missing values are always removed in SQL.
## Use `sd(x, na.rm = TRUE)` to silence this warning
## This warning is displayed only once per session.

## Warning: Missing values are always removed in SQL.
## Use `sd(x, na.rm = TRUE)` to silence this warning
## This warning is displayed only once per session.

## Warning: Missing values are always removed in SQL.
## Use `sd(x, na.rm = TRUE)` to silence this warning
## This warning is displayed only once per session.
## # Source: spark<?> [?? x 5]
## # Groups: Species
##    Species    Sepal_Width count avg_Sepal_Length std_Sepal_Length
##    <chr>            <dbl> <dbl>            <dbl>            <dbl>
##  1 virginica          3.5    14             6.64            0.292
##  2 versicolor         2.5    12             5.62            0.490
##  3 versicolor         3      29             5.95            0.411
##  4 versicolor         3.5     8             6.49            0.405
##  5 versicolor         2       1             5              NA    
##  6 setosa             3.5    26             4.96            0.269
##  7 setosa             4      13             5.22            0.324
##  8 setosa             4.5     3             5.47            0.252
##  9 setosa             2.5     1             4.5            NA    
## 10 virginica          3      28             6.58            0.646
## # … with more rows

Collect Results Back to R Local Node

We can run many dplyr functions to Spark data frames. However, we can NOT apply functions from other packages to Spark data frames using spark_connection(). For functions that can only work on local R data frames, we have to copy the processed Spark data frame back to the local node, which can be done using the collect() function. The following code collects the data manipulation results of a Spark data frame and assigns it to the iris_summary variable.

iris_summary <- iris_tbl %>% 
  mutate(Sepal_Width = ceiling(Sepal_Width*2)/2) %>% 
  group_by(Species, Sepal_Width) %>% 
  summarize(count = n(), 
            avg_Sepal_Length = mean(Sepal_Length),
            std_Sepal_Length = sd(Sepal_Length)) %>% 
  collect()
## Warning: Missing values are always removed in SQL.
## Use `sd(x, na.rm = TRUE)` to silence this warning
## This warning is displayed only once per session.
iris_summary
## # A tibble: 13 × 5
##    Species    Sepal_Width count avg_Sepal_Length std_Sepal_Length
##    <chr>            <dbl> <dbl>            <dbl>            <dbl>
##  1 virginica          3.5    14             6.64            0.292
##  2 versicolor         2.5    12             5.62            0.490
##  3 versicolor         3      29             5.95            0.411
##  4 versicolor         3.5     8             6.49            0.405
##  5 versicolor         2       1             5              NA    
##  6 setosa             3.5    26             4.96            0.269
##  7 setosa             4      13             5.22            0.324
##  8 setosa             4.5     3             5.47            0.252
##  9 setosa             2.5     1             4.5            NA    
## 10 virginica          3      28             6.58            0.646
## 11 virginica          2.5     5             5.92            0.680
## 12 setosa             3       7             4.66            0.282
## 13 virginica          4       3             7.6             0.361

Here iris_summary is a local variable to the R notebook, and we can apply any R packages and functions to it. For example, we can use ggplot() function:

library(ggplot2)
ggplot(iris_summary, 
       aes(Sepal_Width, avg_Sepal_Length, color = Species)) + 
  geom_line(size = 1.2) +
  geom_errorbar(aes(ymin = avg_Sepal_Length - std_Sepal_Length, 
                    ymax = avg_Sepal_Length + std_Sepal_Length), 
                width = 0.05) +
  geom_text(aes(label = count), 
            vjust = -0.2, 
            hjust = 1.2, 
            color = "black") +
  theme(legend.position="top")

Fit A Regression Model to Spark Data Frame

There are many existing Spark version statistical and machine learning algorithms to leverage parallel computing on distributed data. We can easily fit a linear regression on a big dataset far beyond one single computer’s memory limit.

fit1 <-  ml_linear_regression(x = iris_tbl, 
                              response = "Sepal_Length", 
                              features = c("Sepal_Width", "Petal_Length", "Petal_Width"))
summary(fit1)
## Deviance Residuals:
##      Min       1Q   Median       3Q      Max 
## -0.82816 -0.21989  0.01875  0.19709  0.84570 
## 
## Coefficients:
##  (Intercept)  Sepal_Width Petal_Length  Petal_Width 
##    1.8559975    0.6508372    0.7091320   -0.5564827 
## 
## R-Squared: 0.8586
## Root Mean Squared Error: 0.3103

Fit Other Models

Through sparkly package, we can call Spark machine learning library (MLlib) to fit many models, such as linear regression, logistic regression, Survival Regression, Generalized Linear Regression, Decision Trees, Random Forests, Gradient-Boosted Trees, Principal Components Analysis, Naive-Bayes, K-Means Clustering, and a few other methods. The following codes fit a k-means cluster algorithm:

fit2 <- ml_kmeans(x = iris_tbl, 
                  k = 3, 
                  features = c("Petal_Length", "Petal_Width"))
print(fit2)
## K-means clustering with 3 clusters
## 
## Cluster centers:
##   Petal_Length Petal_Width
## 1     5.626087    2.047826
## 2     1.462000    0.246000
## 3     4.292593    1.359259
## 
## Within Set Sum of Squared Errors =  not computed.

After fitting the model, we can then apply the model to predict other datasets through the ml_predict() function. The code below uses the fitted model to iris_tbl again to predict each row’s cluster. The predictions are collected back to a local variable prediction through the collect() function:

prediction = collect(ml_predict(fit2, iris_tbl))
head(prediction)
## # A tibble: 6 × 7
##   Sepal_Length Sepal_Width Petal_Length Petal_Width Species features  prediction
##          <dbl>       <dbl>        <dbl>       <dbl> <chr>   <list>         <int>
## 1          5.1         3.5          1.4         0.2 setosa  <dbl [2]>          1
## 2          4.9         3            1.4         0.2 setosa  <dbl [2]>          1
## 3          4.7         3.2          1.3         0.2 setosa  <dbl [2]>          1
## 4          4.6         3.1          1.5         0.2 setosa  <dbl [2]>          1
## 5          5           3.6          1.4         0.2 setosa  <dbl [2]>          1
## 6          5.4         3.9          1.7         0.4 setosa  <dbl [2]>          1

As variable prediction is a local R variable, we can apply any R functions from any packages. For example:

prediction  %>%
  ggplot(aes(Petal_Length, Petal_Width)) +
  geom_point(aes(Petal_Width, Petal_Length, col = factor(prediction + 1)),
             size = 2, alpha = 0.5) + 
  geom_point(data = fit2$centers, aes(Petal_Width, Petal_Length),
             col = scales::muted(c("red", "green", "blue")),
             pch = 'x', size = 12) +
  scale_color_discrete(name = "Predicted Cluster",
                       labels = paste("Cluster", 1:3)) +
  labs(
    x = "Petal Width",
    y = "Petal Length",
    title = "K-Means Clustering",
    subtitle = "Use Spark.ML to predict cluster membership with the iris dataset."
  )

Summary

In this notebook, we illustrated: - The relationship between a local R node and the Spark environment. - How to copy a local data frame to a Spark data frame (please note if the data is already in Spark environment, there - is no need to copy. This is likely to be the case for real applications.). - How to perform data operation to Spark data frame through dplyr functions using sparklyr package. - How to fit statistical and machine learning models to Spark data frame. - How to collect information from processed Spark data frame back to a local R data frame for future analysis.