This notebook illustrates how to use Databrick’s Community Edition
account for the Spark big data cloud platform. We use the R package
sparklyr
to drive Spark computation on Spark data
frames.
The methods mentioned in the notebook are only brief introductions. For more details of additional functions, refer to:
Since both sparklyr
and the Databrick Community
environment are fast developing, we only test the notebook to run under
a specific version. This notebook is updated and tested:
2021-01-31 running on 7.2 ML cluster instance of community edition
account.
First, we need to install the sparklyr
package, which
enables the connection between the local node (i.e., running R instance)
to the Spark cluster environments. As it will install a few
dependencies, it may take a few minutes to finish.
install.packages("sparklyr")
Load package:
library(sparklyr)
##
## Attaching package: 'sparklyr'
## The following object is masked from 'package:stats':
##
## filter
Once the R package is loaded, we need to create a Spark Connection to
link the R local node to the Spark environment. Here we set
method = "databricks"
in spark_connect( )
function. In other environments, please consult your IT administrator
for more details. The created Spark Connection (i.e., sc) will be the
bridge that connects the R local node to the Spark Cluster.
sc <- spark_connect(method = "databricks")
To simplify the learning process, let us use the well-known
iris
data. It is part of the dplyr
package,
and let’s load the package and the iris
data frame. Here
the iris
data frame is still in the local node that the R
notebook is running on. And we can see that the first a few lines of the
iris
below the code after running:
library(dplyr)
head(iris)
## Sepal.Length Sepal.Width Petal.Length Petal.Width Species
## 1 5.1 3.5 1.4 0.2 setosa
## 2 4.9 3.0 1.4 0.2 setosa
## 3 4.7 3.2 1.3 0.2 setosa
## 4 4.6 3.1 1.5 0.2 setosa
## 5 5.0 3.6 1.4 0.2 setosa
## 6 5.4 3.9 1.7 0.4 setosa
In real big data applications, the data sets are usually too big to
fit into one hard disk. It is very likely the data sets are already in
the form of Spark data frames. However, for a brand Spark system we just
created, there is no Spark data frame yet. For illustration purposes, we
will first copy the iris
R data frame to Spark using the
sdf_copy_to()
function as described below:
iris_tbl <- sdf_copy_to(sc = sc, x = iris, overwrite = T)
The sdf_copy_to()
function will return an R object
iris_tbl
referring to the newly created Spark data frame
iris.
Later in the notebook, we can use
iris_tbl
to refer to the iris
Spark data
frame. To list the existing Spark data frames, we can use
src_tbls( )
function to the Spark Connection
(sc
):
src_tbls(sc)
## [1] "iris"
With sparklyr
packages, we can use many functions in
dplyr
to Spark data frame iris
directly
through R object iris_tbl
, same as we are applying the same
dplyr
functions to a local R data frame. For example, we
can use %>%
operator to pass iris_tbl
to
count( )
function:
iris_tbl %>% count
## # Source: spark<?> [?? x 1]
## n
## <dbl>
## 1 150
head(iris_tbl)
## # Source: spark<?> [?? x 5]
## Sepal_Length Sepal_Width Petal_Length Petal_Width Species
## <dbl> <dbl> <dbl> <dbl> <chr>
## 1 5.1 3.5 1.4 0.2 setosa
## 2 4.9 3 1.4 0.2 setosa
## 3 4.7 3.2 1.3 0.2 setosa
## 4 4.6 3.1 1.5 0.2 setosa
## 5 5 3.6 1.4 0.2 setosa
## 6 5.4 3.9 1.7 0.4 setosa
Or more advanced data operation directly to
iris_tbl
:
iris_tbl %>%
mutate(Sepal_Width = ceiling(Sepal_Width*2)/2) %>%
group_by(Species, Sepal_Width) %>%
summarize(count = n(),
avg_Sepal_Length = mean(Sepal_Length),
std_Sepal_Length = sd(Sepal_Length))
## Warning: Missing values are always removed in SQL.
## Use `mean(x, na.rm = TRUE)` to silence this warning
## This warning is displayed only once per session.
## Warning: Missing values are always removed in SQL.
## Use `sd(x, na.rm = TRUE)` to silence this warning
## This warning is displayed only once per session.
## Warning: Missing values are always removed in SQL.
## Use `sd(x, na.rm = TRUE)` to silence this warning
## This warning is displayed only once per session.
## Warning: Missing values are always removed in SQL.
## Use `sd(x, na.rm = TRUE)` to silence this warning
## This warning is displayed only once per session.
## # Source: spark<?> [?? x 5]
## # Groups: Species
## Species Sepal_Width count avg_Sepal_Length std_Sepal_Length
## <chr> <dbl> <dbl> <dbl> <dbl>
## 1 virginica 3.5 14 6.64 0.292
## 2 versicolor 2.5 12 5.62 0.490
## 3 versicolor 3 29 5.95 0.411
## 4 versicolor 3.5 8 6.49 0.405
## 5 versicolor 2 1 5 NA
## 6 setosa 3.5 26 4.96 0.269
## 7 setosa 4 13 5.22 0.324
## 8 setosa 4.5 3 5.47 0.252
## 9 setosa 2.5 1 4.5 NA
## 10 virginica 3 28 6.58 0.646
## # … with more rows
We can run many dplyr
functions to Spark data frames.
However, we can NOT apply functions from other packages to Spark data
frames using spark_connection()
. For functions that can
only work on local R data frames, we have to copy the processed Spark
data frame back to the local node, which can be done using the
collect()
function. The following code collects the data
manipulation results of a Spark data frame and assigns it to the
iris_summary
variable.
iris_summary <- iris_tbl %>%
mutate(Sepal_Width = ceiling(Sepal_Width*2)/2) %>%
group_by(Species, Sepal_Width) %>%
summarize(count = n(),
avg_Sepal_Length = mean(Sepal_Length),
std_Sepal_Length = sd(Sepal_Length)) %>%
collect()
## Warning: Missing values are always removed in SQL.
## Use `sd(x, na.rm = TRUE)` to silence this warning
## This warning is displayed only once per session.
iris_summary
## # A tibble: 13 × 5
## Species Sepal_Width count avg_Sepal_Length std_Sepal_Length
## <chr> <dbl> <dbl> <dbl> <dbl>
## 1 virginica 3.5 14 6.64 0.292
## 2 versicolor 2.5 12 5.62 0.490
## 3 versicolor 3 29 5.95 0.411
## 4 versicolor 3.5 8 6.49 0.405
## 5 versicolor 2 1 5 NA
## 6 setosa 3.5 26 4.96 0.269
## 7 setosa 4 13 5.22 0.324
## 8 setosa 4.5 3 5.47 0.252
## 9 setosa 2.5 1 4.5 NA
## 10 virginica 3 28 6.58 0.646
## 11 virginica 2.5 5 5.92 0.680
## 12 setosa 3 7 4.66 0.282
## 13 virginica 4 3 7.6 0.361
Here iris_summary
is a local variable to the R notebook,
and we can apply any R packages and functions to it. For example, we can
use ggplot()
function:
library(ggplot2)
ggplot(iris_summary,
aes(Sepal_Width, avg_Sepal_Length, color = Species)) +
geom_line(size = 1.2) +
geom_errorbar(aes(ymin = avg_Sepal_Length - std_Sepal_Length,
ymax = avg_Sepal_Length + std_Sepal_Length),
width = 0.05) +
geom_text(aes(label = count),
vjust = -0.2,
hjust = 1.2,
color = "black") +
theme(legend.position="top")
There are many existing Spark version statistical and machine learning algorithms to leverage parallel computing on distributed data. We can easily fit a linear regression on a big dataset far beyond one single computer’s memory limit.
fit1 <- ml_linear_regression(x = iris_tbl,
response = "Sepal_Length",
features = c("Sepal_Width", "Petal_Length", "Petal_Width"))
summary(fit1)
## Deviance Residuals:
## Min 1Q Median 3Q Max
## -0.82816 -0.21989 0.01875 0.19709 0.84570
##
## Coefficients:
## (Intercept) Sepal_Width Petal_Length Petal_Width
## 1.8559975 0.6508372 0.7091320 -0.5564827
##
## R-Squared: 0.8586
## Root Mean Squared Error: 0.3103
Through sparkly
package, we can call Spark machine
learning library (MLlib) to fit many models, such as linear regression,
logistic regression, Survival Regression, Generalized Linear Regression,
Decision Trees, Random Forests, Gradient-Boosted Trees, Principal
Components Analysis, Naive-Bayes, K-Means Clustering, and a few other
methods. The following codes fit a k-means cluster algorithm:
fit2 <- ml_kmeans(x = iris_tbl,
k = 3,
features = c("Petal_Length", "Petal_Width"))
print(fit2)
## K-means clustering with 3 clusters
##
## Cluster centers:
## Petal_Length Petal_Width
## 1 5.626087 2.047826
## 2 1.462000 0.246000
## 3 4.292593 1.359259
##
## Within Set Sum of Squared Errors = not computed.
After fitting the model, we can then apply the model to predict other
datasets through the ml_predict()
function. The code below
uses the fitted model to iris_tbl
again to predict each
row’s cluster. The predictions are collected back to a local variable
prediction
through the collect()
function:
prediction = collect(ml_predict(fit2, iris_tbl))
head(prediction)
## # A tibble: 6 × 7
## Sepal_Length Sepal_Width Petal_Length Petal_Width Species features prediction
## <dbl> <dbl> <dbl> <dbl> <chr> <list> <int>
## 1 5.1 3.5 1.4 0.2 setosa <dbl [2]> 1
## 2 4.9 3 1.4 0.2 setosa <dbl [2]> 1
## 3 4.7 3.2 1.3 0.2 setosa <dbl [2]> 1
## 4 4.6 3.1 1.5 0.2 setosa <dbl [2]> 1
## 5 5 3.6 1.4 0.2 setosa <dbl [2]> 1
## 6 5.4 3.9 1.7 0.4 setosa <dbl [2]> 1
As variable prediction
is a local R variable, we can
apply any R functions from any packages. For example:
prediction %>%
ggplot(aes(Petal_Length, Petal_Width)) +
geom_point(aes(Petal_Width, Petal_Length, col = factor(prediction + 1)),
size = 2, alpha = 0.5) +
geom_point(data = fit2$centers, aes(Petal_Width, Petal_Length),
col = scales::muted(c("red", "green", "blue")),
pch = 'x', size = 12) +
scale_color_discrete(name = "Predicted Cluster",
labels = paste("Cluster", 1:3)) +
labs(
x = "Petal Width",
y = "Petal Length",
title = "K-Means Clustering",
subtitle = "Use Spark.ML to predict cluster membership with the iris dataset."
)
In this notebook, we illustrated: - The relationship between a local
R node and the Spark environment. - How to copy a local data frame to a
Spark data frame (please note if the data is already in Spark
environment, there - is no need to copy. This is likely to be the case
for real applications.). - How to perform data operation to Spark data
frame through dplyr
functions using sparklyr
package. - How to fit statistical and machine learning models to Spark
data frame. - How to collect information from processed Spark data frame
back to a local R data frame for future analysis.