7 Intro to sparklyr
7.1 New Spark session
Learn to open a new Spark session
- Use
spark_connect()
to create a new local Spark session
sc <- spark_connect(master = "local")
## * Using Spark: 2.0.0
Click on the
SparkUI
button to view the current Spark session’s UIClick on the
Log
button to see the message history
7.2 Data transfer
Practice uploading data to Spark
- Copy the
mtcars
dataset into the session
spark_mtcars <- sdf_copy_to(sc, mtcars, "my_mtcars")
In the Connections pane, expande the
my_mtcars
tableGo to the Spark UI, note the new jobs
In the UI, click the Storage button, note the new table
Click on the In-memory table my_mtcars link
7.3 Simple dplyr example
See how Spark handles dplyr
commands
- Run the following code snipett
spark_mtcars %>%
group_by(am) %>%
summarise(avg_wt = mean(wt, na.rm = TRUE))
## # Source: spark<?> [?? x 2]
## am avg_wt
## <dbl> <dbl>
## 1 0 3.77
## 2 1 2.41
Go to the Spark UI and click the SQL button
Click on the top item inside the Completed Queries table
At the bottom of the diagram, expand Details
7.4 Map data
See the machanics of how Spark is able to use files as a data source
Examine the contents of the /usr/share/class/flights/data folder
Read the top 5 rows of the
flight_2008_1
CSV file. It is located under /usr/share/class/flights
library(readr)
top_rows <- read_csv("/usr/share/class/flights/data/flight_2008_1.csv", n_max = 5)
## Parsed with column specification:
## cols(
## .default = col_double(),
## uniquecarrier = col_character(),
## tailnum = col_character(),
## origin = col_character(),
## dest = col_character(),
## cancellationcode = col_logical(),
## score = col_logical()
## )
## See spec(...) for full column specifications.
- Create a list based on the column names, and add a list item with “character” as its value.
library(purrr)
file_columns <- top_rows %>%
rename_all(tolower) %>%
map(function(x) "character")
head(file_columns)
## $flightid
## [1] "character"
##
## $year
## [1] "character"
##
## $month
## [1] "character"
##
## $dayofmonth
## [1] "character"
##
## $dayofweek
## [1] "character"
##
## $deptime
## [1] "character"
- Use
spark_read()
to “map” the file’s structure and location to the Spark context
spark_flights <- spark_read_csv(
sc,
name = "flights",
path = "/usr/share/class/flights/data/",
memory = FALSE,
columns = file_columns,
infer_schema = FALSE
)
In the Connections pane, click on the table icon by the
flights
variableVerify that the new variable pointer works by using
tally()
spark_flights %>%
tally()
## # Source: spark<?> [?? x 1]
## n
## <dbl>
## 1 7009728
7.5 Caching data
Learn how to cache a subset of the data in Spark
- Create a subset of the flights table object
cached_flights <- spark_flights %>%
mutate(
arrdelay = ifelse(arrdelay == "NA", 0, arrdelay),
depdelay = ifelse(depdelay == "NA", 0, depdelay)
) %>%
select(
month,
dayofmonth,
arrtime,
arrdelay,
depdelay,
crsarrtime,
crsdeptime,
distance
) %>%
mutate_all(as.numeric)
- Use
compute()
to extract the data into Spark memory
cached_flights <- compute(cached_flights, "sub_flights")
- Confirm new variable pointer works
cached_flights %>%
tally()
## # Source: spark<?> [?? x 1]
## n
## <dbl>
## 1 7009728
7.6 sdf
Functions
Overview of a few sdf_
functions: http://spark.rstudio.com/reference/#section-spark-dataframes
- Use
sdf_pivot
to create a column for each value in month
cached_flights %>%
arrange(month) %>%
sdf_pivot(month ~ dayofmonth)
## # Source: spark<?> [?? x 32]
## month `1.0` `2.0` `3.0` `4.0` `5.0` `6.0` `7.0` `8.0` `9.0` `10.0`
## <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
## 1 1 1 2 3 4 5 6 7 8 9 10
## 2 2 1 2 3 4 5 6 7 8 9 10
## 3 3 1 2 3 4 5 6 7 8 9 10
## 4 4 1 2 3 4 5 6 7 8 9 10
## 5 5 1 2 3 4 5 6 7 8 9 10
## 6 6 1 2 3 4 5 6 7 8 9 10
## 7 7 1 2 3 4 5 6 7 8 9 10
## 8 8 1 2 3 4 5 6 7 8 9 10
## 9 9 1 2 3 4 5 6 7 8 9 10
## 10 10 1 2 3 4 5 6 7 8 9 10
## # … with more rows, and 21 more variables: `11.0` <dbl>, `12.0` <dbl>,
## # `13.0` <dbl>, `14.0` <dbl>, `15.0` <dbl>, `16.0` <dbl>, `17.0` <dbl>,
## # `18.0` <dbl>, `19.0` <dbl>, `20.0` <dbl>, `21.0` <dbl>, `22.0` <dbl>,
## # `23.0` <dbl>, `24.0` <dbl>, `25.0` <dbl>, `26.0` <dbl>, `27.0` <dbl>,
## # `28.0` <dbl>, `29.0` <dbl>, `30.0` <dbl>, `31.0` <dbl>
- Use
sdf_partition()
to sepparate the data into discrete groups
partition <- cached_flights %>%
sdf_partition(training = 0.01, testing = 0.09, other = 0.9)
tally(partition$training)
## # Source: spark<?> [?? x 1]
## n
## <dbl>
## 1 70102
7.7 Feature transformers
See how to use Spark’s feature transformers: http://spark.rstudio.com/reference/#section-spark-feature-transformers
- Use
ft_binarizer()
to identify “delayed” flights
cached_flights %>%
ft_binarizer(
input_col = "depdelay",
output_col = "delayed",
threshold = 15
) %>%
select(
depdelay,
delayed
) %>%
head(100)
## # Source: spark<?> [?? x 2]
## depdelay delayed
## <dbl> <dbl>
## 1 -4 0
## 2 -1 0
## 3 15 0
## 4 -2 0
## 5 2 0
## 6 -4 0
## 7 19 1
## 8 1 0
## 9 0 0
## 10 -3 0
## # … with more rows
- Use
ft_bucketizer()
to split the data into groups
cached_flights %>%
ft_bucketizer(
input_col = "crsdeptime",
output_col = "dephour",
splits = c(0, 400, 800, 1200, 1600, 2000, 2400)
) %>%
select(
crsdeptime,
dephour
) %>%
head(100)
## # Source: spark<?> [?? x 2]
## crsdeptime dephour
## <dbl> <dbl>
## 1 910 2
## 2 835 2
## 3 1555 3
## 4 730 1
## 5 2045 5
## 6 1135 2
## 7 1310 3
## 8 1220 3
## 9 1515 3
## 10 630 1
## # … with more rows
7.8 Fit a model with sparklyr
Build on the recently learned transformation techniques to feed data into a model
- Combine the
ft_
andsdf_
functions to prepare the da
sample_data <- cached_flights %>%
filter(!is.na(arrdelay)) %>%
ft_binarizer(
input_col = "arrdelay",
output_col = "delayed",
threshold = 15
) %>%
ft_bucketizer(
input_col = "crsdeptime",
output_col = "dephour",
splits = c(0, 400, 800, 1200, 1600, 2000, 2400)
) %>%
mutate(dephour = paste0("h", as.integer(dephour))) %>%
sdf_partition(training = 0.01, testing = 0.09, other = 0.9)
- Cache the training data
training <- sdf_register(sample_data$training, "training")
tbl_cache(sc, "training")
- Run a logistic regression model in Spark
delayed_model <- training %>%
ml_logistic_regression(delayed ~ depdelay + dephour)
- View the model results
summary(delayed_model)
## Coefficients:
## (Intercept) depdelay dephour_h2 dephour_h3 dephour_h4 dephour_h1
## -3.5139214 0.1375437 0.9346765 0.7824030 0.8512516 1.0539023
## dephour_h5
## 0.7359309
7.9 Run predictions in Spark
Quick review of running predictions and reviewing accuracy
- Use
sdf_predict()
agains the test dataset
delayed_testing <- sdf_predict(delayed_model, sample_data$testing)
## Warning in sdf_predict.ml_model(delayed_model, sample_data$testing): The
## signature sdf_predict(model, dataset) is deprecated and will be removed
## in a future version. Use sdf_predict(dataset, model) or ml_predict(model,
## dataset) instead.
delayed_testing %>%
head()
## # Source: spark<?> [?? x 17]
## month dayofmonth arrtime arrdelay depdelay crsarrtime crsdeptime distance
## <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
## 1 7 1 NaN 0 -1 1359 1235 282
## 2 7 1 NaN 0 0 551 527 316
## 3 7 1 NaN 0 0 850 720 416
## 4 7 1 NaN 0 0 955 730 1846
## 5 7 1 NaN 0 0 1019 900 678
## 6 7 1 NaN 0 0 1116 959 647
## # … with 9 more variables: delayed <dbl>, dephour <chr>, features <list>,
## # label <dbl>, rawPrediction <list>, probability <list>,
## # prediction <dbl>, probability_0 <dbl>, probability_1 <dbl>
- Use
group_by()
to see how effective the new model is
delayed_testing %>%
group_by(delayed, prediction) %>%
tally() %>%
mutate(percent = n / sum(n))
## Warning: Missing values are always removed in SQL.
## Use `sum(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `sum(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `sum(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `sum(x, na.rm = TRUE)` to silence this warning
## # Source: spark<?> [?? x 4]
## # Groups: delayed
## delayed prediction n percent
## <dbl> <dbl> <dbl> <dbl>
## 1 0 1 10482 0.0211
## 2 0 0 487204 0.979
## 3 1 1 91058 0.690
## 4 1 0 40900 0.310