7 Intro to sparklyr

7.1 New Spark session

Learn to open a new Spark session

  1. Use spark_connect() to create a new local Spark session
sc <- spark_connect(master = "local")
## * Using Spark: 2.0.0
  1. Click on the SparkUI button to view the current Spark session’s UI

  2. Click on the Log button to see the message history

7.2 Data transfer

Practice uploading data to Spark

  1. Copy the mtcars dataset into the session
spark_mtcars <- sdf_copy_to(sc, mtcars, "my_mtcars")
  1. In the Connections pane, expande the my_mtcars table

  2. Go to the Spark UI, note the new jobs

  3. In the UI, click the Storage button, note the new table

  4. Click on the In-memory table my_mtcars link

7.3 Simple dplyr example

See how Spark handles dplyr commands

  1. Run the following code snipett
spark_mtcars %>%
  group_by(am) %>%
  summarise(avg_wt = mean(wt, na.rm = TRUE))
## # Source: spark<?> [?? x 2]
##      am avg_wt
##   <dbl>  <dbl>
## 1     0   3.77
## 2     1   2.41
  1. Go to the Spark UI and click the SQL button

  2. Click on the top item inside the Completed Queries table

  3. At the bottom of the diagram, expand Details

7.4 Map data

See the machanics of how Spark is able to use files as a data source

  1. Examine the contents of the /usr/share/class/flights/data folder

  2. Read the top 5 rows of the flight_2008_1 CSV file. It is located under /usr/share/class/flights

library(readr)
top_rows <- read_csv("/usr/share/class/flights/data/flight_2008_1.csv", n_max = 5)
## Parsed with column specification:
## cols(
##   .default = col_double(),
##   uniquecarrier = col_character(),
##   tailnum = col_character(),
##   origin = col_character(),
##   dest = col_character(),
##   cancellationcode = col_logical(),
##   score = col_logical()
## )
## See spec(...) for full column specifications.
  1. Create a list based on the column names, and add a list item with “character” as its value.
library(purrr)
file_columns <- top_rows %>%
  rename_all(tolower) %>%
  map(function(x) "character")
head(file_columns)
## $flightid
## [1] "character"
## 
## $year
## [1] "character"
## 
## $month
## [1] "character"
## 
## $dayofmonth
## [1] "character"
## 
## $dayofweek
## [1] "character"
## 
## $deptime
## [1] "character"
  1. Use spark_read() to “map” the file’s structure and location to the Spark context
spark_flights <- spark_read_csv(
  sc,
  name = "flights",
  path = "/usr/share/class/flights/data/",
  memory = FALSE,
  columns = file_columns,
  infer_schema = FALSE
)
  1. In the Connections pane, click on the table icon by the flights variable

  2. Verify that the new variable pointer works by using tally()

spark_flights %>%
  tally()
## # Source: spark<?> [?? x 1]
##         n
##     <dbl>
## 1 7009728

7.5 Caching data

Learn how to cache a subset of the data in Spark

  1. Create a subset of the flights table object
cached_flights <- spark_flights %>%
  mutate(
    arrdelay = ifelse(arrdelay == "NA", 0, arrdelay),
    depdelay = ifelse(depdelay == "NA", 0, depdelay)
  ) %>%
  select(
    month,
    dayofmonth,
    arrtime,
    arrdelay,
    depdelay,
    crsarrtime,
    crsdeptime,
    distance
  ) %>%
  mutate_all(as.numeric)
  1. Use compute() to extract the data into Spark memory
cached_flights <- compute(cached_flights, "sub_flights")
  1. Confirm new variable pointer works
cached_flights %>%
  tally()
## # Source: spark<?> [?? x 1]
##         n
##     <dbl>
## 1 7009728

7.6 sdf Functions

Overview of a few sdf_ functions: http://spark.rstudio.com/reference/#section-spark-dataframes

  1. Use sdf_pivot to create a column for each value in month
cached_flights %>%
  arrange(month) %>%
  sdf_pivot(month ~ dayofmonth)
## # Source: spark<?> [?? x 32]
##    month `1.0` `2.0` `3.0` `4.0` `5.0` `6.0` `7.0` `8.0` `9.0` `10.0`
##    <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>  <dbl>
##  1     1     1     2     3     4     5     6     7     8     9     10
##  2     2     1     2     3     4     5     6     7     8     9     10
##  3     3     1     2     3     4     5     6     7     8     9     10
##  4     4     1     2     3     4     5     6     7     8     9     10
##  5     5     1     2     3     4     5     6     7     8     9     10
##  6     6     1     2     3     4     5     6     7     8     9     10
##  7     7     1     2     3     4     5     6     7     8     9     10
##  8     8     1     2     3     4     5     6     7     8     9     10
##  9     9     1     2     3     4     5     6     7     8     9     10
## 10    10     1     2     3     4     5     6     7     8     9     10
## # … with more rows, and 21 more variables: `11.0` <dbl>, `12.0` <dbl>,
## #   `13.0` <dbl>, `14.0` <dbl>, `15.0` <dbl>, `16.0` <dbl>, `17.0` <dbl>,
## #   `18.0` <dbl>, `19.0` <dbl>, `20.0` <dbl>, `21.0` <dbl>, `22.0` <dbl>,
## #   `23.0` <dbl>, `24.0` <dbl>, `25.0` <dbl>, `26.0` <dbl>, `27.0` <dbl>,
## #   `28.0` <dbl>, `29.0` <dbl>, `30.0` <dbl>, `31.0` <dbl>
  1. Use sdf_partition() to sepparate the data into discrete groups
partition <- cached_flights %>%
  sdf_partition(training = 0.01, testing = 0.09, other = 0.9)

tally(partition$training)
## # Source: spark<?> [?? x 1]
##       n
##   <dbl>
## 1 70102

7.7 Feature transformers

See how to use Spark’s feature transformers: http://spark.rstudio.com/reference/#section-spark-feature-transformers

  1. Use ft_binarizer() to identify “delayed” flights
cached_flights %>%
  ft_binarizer(
    input_col = "depdelay",
    output_col = "delayed",
    threshold = 15
  ) %>%
  select(
    depdelay,
    delayed
  ) %>%
  head(100)
## # Source: spark<?> [?? x 2]
##    depdelay delayed
##       <dbl>   <dbl>
##  1       -4       0
##  2       -1       0
##  3       15       0
##  4       -2       0
##  5        2       0
##  6       -4       0
##  7       19       1
##  8        1       0
##  9        0       0
## 10       -3       0
## # … with more rows
  1. Use ft_bucketizer() to split the data into groups
cached_flights %>%
  ft_bucketizer(
    input_col = "crsdeptime",
    output_col = "dephour",
    splits = c(0, 400, 800, 1200, 1600, 2000, 2400)
  ) %>%
  select(
    crsdeptime,
    dephour
  ) %>%
  head(100)
## # Source: spark<?> [?? x 2]
##    crsdeptime dephour
##         <dbl>   <dbl>
##  1        910       2
##  2        835       2
##  3       1555       3
##  4        730       1
##  5       2045       5
##  6       1135       2
##  7       1310       3
##  8       1220       3
##  9       1515       3
## 10        630       1
## # … with more rows

7.8 Fit a model with sparklyr

Build on the recently learned transformation techniques to feed data into a model

  1. Combine the ft_ and sdf_ functions to prepare the da
sample_data <- cached_flights %>%
  filter(!is.na(arrdelay)) %>%
  ft_binarizer(
    input_col = "arrdelay",
    output_col = "delayed",
    threshold = 15
  ) %>%
  ft_bucketizer(
    input_col = "crsdeptime",
    output_col = "dephour",
    splits = c(0, 400, 800, 1200, 1600, 2000, 2400)
  ) %>%
  mutate(dephour = paste0("h", as.integer(dephour))) %>%
  sdf_partition(training = 0.01, testing = 0.09, other = 0.9)
  1. Cache the training data
training <- sdf_register(sample_data$training, "training")
tbl_cache(sc, "training")
  1. Run a logistic regression model in Spark
delayed_model <- training %>%
  ml_logistic_regression(delayed ~ depdelay + dephour)
  1. View the model results
summary(delayed_model)
## Coefficients:
## (Intercept)    depdelay  dephour_h2  dephour_h3  dephour_h4  dephour_h1 
##  -3.5139214   0.1375437   0.9346765   0.7824030   0.8512516   1.0539023 
##  dephour_h5 
##   0.7359309

7.9 Run predictions in Spark

Quick review of running predictions and reviewing accuracy

  1. Use sdf_predict() agains the test dataset
delayed_testing <- sdf_predict(delayed_model, sample_data$testing)
## Warning in sdf_predict.ml_model(delayed_model, sample_data$testing): The
## signature sdf_predict(model, dataset) is deprecated and will be removed
## in a future version. Use sdf_predict(dataset, model) or ml_predict(model,
## dataset) instead.
delayed_testing %>%
  head()
## # Source: spark<?> [?? x 17]
##   month dayofmonth arrtime arrdelay depdelay crsarrtime crsdeptime distance
##   <dbl>      <dbl>   <dbl>    <dbl>    <dbl>      <dbl>      <dbl>    <dbl>
## 1     7          1     NaN        0       -1       1359       1235      282
## 2     7          1     NaN        0        0        551        527      316
## 3     7          1     NaN        0        0        850        720      416
## 4     7          1     NaN        0        0        955        730     1846
## 5     7          1     NaN        0        0       1019        900      678
## 6     7          1     NaN        0        0       1116        959      647
## # … with 9 more variables: delayed <dbl>, dephour <chr>, features <list>,
## #   label <dbl>, rawPrediction <list>, probability <list>,
## #   prediction <dbl>, probability_0 <dbl>, probability_1 <dbl>
  1. Use group_by() to see how effective the new model is
delayed_testing %>%
  group_by(delayed, prediction) %>%
  tally() %>% 
  mutate(percent = n / sum(n))
## Warning: Missing values are always removed in SQL.
## Use `sum(x, na.rm = TRUE)` to silence this warning

## Warning: Missing values are always removed in SQL.
## Use `sum(x, na.rm = TRUE)` to silence this warning

## Warning: Missing values are always removed in SQL.
## Use `sum(x, na.rm = TRUE)` to silence this warning

## Warning: Missing values are always removed in SQL.
## Use `sum(x, na.rm = TRUE)` to silence this warning
## # Source: spark<?> [?? x 4]
## # Groups: delayed
##   delayed prediction      n percent
##     <dbl>      <dbl>  <dbl>   <dbl>
## 1       0          1  10482  0.0211
## 2       0          0 487204  0.979 
## 3       1          1  91058  0.690 
## 4       1          0  40900  0.310