8 Spark pipelines

8.1 Build a pipeline

Step-by-step of how to build a new Spark pipeline

  1. Use sdf_partition() to create a sample of 1% training and 1% testing of the flights table.
model_data <- sdf_partition(
  tbl(sc, "flights"),
  training = 0.01,
  testing = 0.01,
  rest = 0.98
)
  1. Recreate the dplyr code in the cached_flights variable from the previous unit. Assign it to a new variable called pepeline_df.
pipeline_df <- model_data$training %>%
  mutate(
    arrdelay = ifelse(arrdelay == "NA", 0, arrdelay),
    depdelay = ifelse(depdelay == "NA", 0, depdelay)
  ) %>%
  select(
    month,
    dayofmonth,
    arrtime,
    arrdelay,
    depdelay,
    crsarrtime,
    crsdeptime,
    distance
  ) %>%
  mutate_all(as.numeric)
  1. Start a new pipeline with ml_pipeline() and dplyr-pipe into ft_dplyr_transformer(). Use pipeline_df as the tbl argument.
ml_pipeline(sc) %>%
  ft_dplyr_transformer(
    tbl = pipeline_df
  ) 
## Pipeline (Estimator) with 1 stage
## <pipeline_771d46fa3b12> 
##   Stages 
##   |--1 SQLTransformer (Transformer)
##   |    <dplyr_transformer_771d26c841da> 
##   |     (Parameters -- Column Names)
  1. Pipe code into ft_binarizer() to determine if arrdelay is over 15 minutes.
 ml_pipeline(sc) %>%
  ft_dplyr_transformer(
    tbl = pipeline_df
  ) %>%
  ft_binarizer(
    input_col = "arrdelay",
    output_col = "delayed",
    threshold = 15
  )
## Pipeline (Estimator) with 2 stages
## <pipeline_771d790c2547> 
##   Stages 
##   |--1 SQLTransformer (Transformer)
##   |    <dplyr_transformer_771d7c65f40a> 
##   |     (Parameters -- Column Names)
##   |--2 Binarizer (Transformer)
##   |    <binarizer_771d1b32e592> 
##   |     (Parameters -- Column Names)
##   |      input_col: arrdelay
##   |      output_col: delayed
  1. Pipe code into ft_bucketizer(). Use it to split dephour into six even segments of 4 hours.
ml_pipeline(sc) %>%
  ft_dplyr_transformer(
    tbl = pipeline_df
  ) %>%
  ft_binarizer(
    input_col = "arrdelay",
    output_col = "delayed",
    threshold = 15
  ) %>%
  ft_bucketizer(
    input_col = "crsdeptime",
    output_col = "dephour",
    splits = c(0, 400, 800, 1200, 1600, 2000, 2400)
  )
## Pipeline (Estimator) with 3 stages
## <pipeline_771d55043e5f> 
##   Stages 
##   |--1 SQLTransformer (Transformer)
##   |    <dplyr_transformer_771d397eb5a8> 
##   |     (Parameters -- Column Names)
##   |--2 Binarizer (Transformer)
##   |    <binarizer_771d2ae8baf5> 
##   |     (Parameters -- Column Names)
##   |      input_col: arrdelay
##   |      output_col: delayed
##   |--3 Bucketizer (Transformer)
##   |    <bucketizer_771d5bc82de8> 
##   |     (Parameters -- Column Names)
##   |      input_col: crsdeptime
##   |      output_col: dephour
  1. Add ft_r_formula() with a model that compares uses arrdelay and dephour against depdelay.
ml_pipeline(sc) %>%
  ft_dplyr_transformer(
    tbl = pipeline_df
  ) %>%
  ft_binarizer(
    input_col = "arrdelay",
    output_col = "delayed",
    threshold = 15
  ) %>%
  ft_bucketizer(
    input_col = "crsdeptime",
    output_col = "dephour",
    splits = c(0, 400, 800, 1200, 1600, 2000, 2400)
  ) %>%
  ft_r_formula(delayed ~ arrdelay + dephour)
## Pipeline (Estimator) with 4 stages
## <pipeline_771d461623dc> 
##   Stages 
##   |--1 SQLTransformer (Transformer)
##   |    <dplyr_transformer_771d52515c6e> 
##   |     (Parameters -- Column Names)
##   |--2 Binarizer (Transformer)
##   |    <binarizer_771d6615abc0> 
##   |     (Parameters -- Column Names)
##   |      input_col: arrdelay
##   |      output_col: delayed
##   |--3 Bucketizer (Transformer)
##   |    <bucketizer_771de7aa7e3> 
##   |     (Parameters -- Column Names)
##   |      input_col: crsdeptime
##   |      output_col: dephour
##   |--4 RFormula (Estimator)
##   |    <r_formula_771d7d7168b8> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |     (Parameters)
##   |      formula: delayed ~ arrdelay + dephour
  1. Pipe into a logistic regression model, with ml_logistic_regression()
ml_pipeline(sc) %>%
  ft_dplyr_transformer(
    tbl = pipeline_df
  ) %>%
  ft_binarizer(
    input_col = "arrdelay",
    output_col = "delayed",
    threshold = 15
  ) %>%
  ft_bucketizer(
    input_col = "crsdeptime",
    output_col = "dephour",
    splits = c(0, 400, 800, 1200, 1600, 2000, 2400)
  ) %>%
  ft_r_formula(delayed ~ arrdelay + dephour) %>%
  ml_logistic_regression()
## Pipeline (Estimator) with 5 stages
## <pipeline_771d59092a6b> 
##   Stages 
##   |--1 SQLTransformer (Transformer)
##   |    <dplyr_transformer_771d68b4188b> 
##   |     (Parameters -- Column Names)
##   |--2 Binarizer (Transformer)
##   |    <binarizer_771d6bdee23c> 
##   |     (Parameters -- Column Names)
##   |      input_col: arrdelay
##   |      output_col: delayed
##   |--3 Bucketizer (Transformer)
##   |    <bucketizer_771d547fef7d> 
##   |     (Parameters -- Column Names)
##   |      input_col: crsdeptime
##   |      output_col: dephour
##   |--4 RFormula (Estimator)
##   |    <r_formula_771d1ba41a0e> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |     (Parameters)
##   |      formula: delayed ~ arrdelay + dephour
##   |--5 LogisticRegression (Estimator)
##   |    <logistic_regression_771df916e8a> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |      prediction_col: prediction
##   |      probability_col: probability
##   |      raw_prediction_col: rawPrediction
##   |     (Parameters)
##   |      elastic_net_param: 0
##   |      fit_intercept: TRUE
##   |      max_iter: 100
##   |      reg_param: 0
##   |      standardization: TRUE
##   |      threshold: 0.5
##   |      tol: 1e-06
  1. Assign the entire piped code to a new variable called flights_pipeline
flights_pipeline <- ml_pipeline(sc) %>%
  ft_dplyr_transformer(
    tbl = pipeline_df
  ) %>%
  ft_binarizer(
    input_col = "arrdelay",
    output_col = "delayed",
    threshold = 15
  ) %>%
  ft_bucketizer(
    input_col = "crsdeptime",
    output_col = "dephour",
    splits = c(0, 400, 800, 1200, 1600, 2000, 2400)
  ) %>%
  ft_r_formula(delayed ~ arrdelay + dephour) %>%
  ml_logistic_regression()

flights_pipeline
## Pipeline (Estimator) with 5 stages
## <pipeline_771d78a8fa3c> 
##   Stages 
##   |--1 SQLTransformer (Transformer)
##   |    <dplyr_transformer_771d2ad23958> 
##   |     (Parameters -- Column Names)
##   |--2 Binarizer (Transformer)
##   |    <binarizer_771d51e97378> 
##   |     (Parameters -- Column Names)
##   |      input_col: arrdelay
##   |      output_col: delayed
##   |--3 Bucketizer (Transformer)
##   |    <bucketizer_771d2b73307d> 
##   |     (Parameters -- Column Names)
##   |      input_col: crsdeptime
##   |      output_col: dephour
##   |--4 RFormula (Estimator)
##   |    <r_formula_771d5d53839c> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |     (Parameters)
##   |      formula: delayed ~ arrdelay + dephour
##   |--5 LogisticRegression (Estimator)
##   |    <logistic_regression_771d7f6d576b> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |      prediction_col: prediction
##   |      probability_col: probability
##   |      raw_prediction_col: rawPrediction
##   |     (Parameters)
##   |      elastic_net_param: 0
##   |      fit_intercept: TRUE
##   |      max_iter: 100
##   |      reg_param: 0
##   |      standardization: TRUE
##   |      threshold: 0.5
##   |      tol: 1e-06

8.2 Fit, evaluate, save

  1. Fit (train) the flights_pipeline pipeline model using the training data on model_data. The function to use is ml_fit()
model <- ml_fit(
  flights_pipeline, 
  model_data$training
  )

model
## PipelineModel (Transformer) with 5 stages
## <pipeline_771d78a8fa3c> 
##   Stages 
##   |--1 SQLTransformer (Transformer)
##   |    <dplyr_transformer_771d2ad23958> 
##   |     (Parameters -- Column Names)
##   |--2 Binarizer (Transformer)
##   |    <binarizer_771d51e97378> 
##   |     (Parameters -- Column Names)
##   |      input_col: arrdelay
##   |      output_col: delayed
##   |--3 Bucketizer (Transformer)
##   |    <bucketizer_771d2b73307d> 
##   |     (Parameters -- Column Names)
##   |      input_col: crsdeptime
##   |      output_col: dephour
##   |--4 RFormulaModel (Transformer)
##   |    <r_formula_771d5d53839c> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |     (Transformer Info)
##   |      formula:  chr "delayed ~ arrdelay + dephour" 
##   |--5 LogisticRegressionModel (Transformer)
##   |    <logistic_regression_771d7f6d576b> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |      prediction_col: prediction
##   |      probability_col: probability
##   |      raw_prediction_col: rawPrediction
##   |     (Transformer Info)
##   |      coefficients:  num [1:2] 28.5057 -0.0886 
##   |      intercept:  num -441 
##   |      num_classes:  int 2 
##   |      num_features:  int 2 
##   |      threshold:  num 0.5
  1. Use the newly fitted model to perform predictions using ml_transform(). Use the testing data from model_data
predictions <- ml_transform(
  x = model,
  dataset = model_data$testing
)
  1. Use group_by()/ tally() to see how the model performed
predictions %>%
  group_by(delayed, prediction) %>%
  tally()
## # Source: spark<?> [?? x 3]
## # Groups: delayed
##   delayed prediction     n
##     <dbl>      <dbl> <dbl>
## 1       0          0 55669
## 2       1          1 14559
  1. Save the model into disk using ml_save()
ml_save(model, "saved_model", overwrite = TRUE)
## Model successfully saved.
list.files("saved_model")
## [1] "metadata" "stages"
  1. Save the pipeline using ml_save()
ml_save(flights_pipeline, "saved_pipeline", overwrite = TRUE)
## Model successfully saved.
list.files("saved_pipeline")
## [1] "metadata" "stages"
  1. Close the Spark session
spark_disconnect(sc)
## NULL

8.3 Reload model

Use the saved model inside a different Spark session

  1. Open a new Spark connection and reload the data
library(sparklyr)
sc <- spark_connect(master = "local", version = "2.0.0")
spark_flights <- spark_read_csv(
  sc,
  name = "flights",
  path = "/usr/share/class/flights/data/",
  memory = FALSE,
  columns = file_columns,
  infer_schema = FALSE
)
  1. Use ml_load() to reload the model directly into the Spark session
reload <- ml_load(sc, "saved_model")
reload
## PipelineModel (Transformer) with 5 stages
## <pipeline_771d78a8fa3c> 
##   Stages 
##   |--1 SQLTransformer (Transformer)
##   |    <dplyr_transformer_771d2ad23958> 
##   |     (Parameters -- Column Names)
##   |--2 Binarizer (Transformer)
##   |    <binarizer_771d51e97378> 
##   |     (Parameters -- Column Names)
##   |      input_col: arrdelay
##   |      output_col: delayed
##   |--3 Bucketizer (Transformer)
##   |    <bucketizer_771d2b73307d> 
##   |     (Parameters -- Column Names)
##   |      input_col: crsdeptime
##   |      output_col: dephour
##   |--4 RFormulaModel (Transformer)
##   |    <r_formula_771d5d53839c> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |--5 LogisticRegressionModel (Transformer)
##   |    <logistic_regression_771d7f6d576b> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |      prediction_col: prediction
##   |      probability_col: probability
##   |      raw_prediction_col: rawPrediction
##   |     (Transformer Info)
##   |      coefficients:  num [1:2] 28.5057 -0.0886 
##   |      intercept:  num -441 
##   |      num_classes:  int 2 
##   |      num_features:  int 2 
##   |      threshold:  num 0.5
  1. Create a new table called current. It needs to pull today’s flights.
library(lubridate)

current <- tbl(sc, "flights") %>%
  filter(
    month == !! month(now()),
    dayofmonth == !! day(now())
  )

show_query(current)
## <SQL>
## SELECT *
## FROM `flights`
## WHERE ((`month` = 1.0) AND (`dayofmonth` = 11))
  1. Run predictions against current using ml__transform().
new_predictions <- ml_transform(
  x = ml_load(sc, "saved_model"),
  dataset = current
)
  1. Get a quick count of expected delayed flights. The field to check on is called prediction
new_predictions %>%
  summarise(late_fligths = sum(prediction, na.rm = TRUE))
## # Source: spark<?> [?? x 1]
##   late_fligths
##          <dbl>
## 1         3979

8.4 Reload pipeline

Overview of how to use new data to re-fit the pipeline, thus creating a new pipeline model

  1. Use ml_load() to reload the pipeline into the Spark session
flights_pipeline <- ml_load(sc, "saved_pipeline")
flights_pipeline
## Pipeline (Estimator) with 5 stages
## <pipeline_771d78a8fa3c> 
##   Stages 
##   |--1 SQLTransformer (Transformer)
##   |    <dplyr_transformer_771d2ad23958> 
##   |     (Parameters -- Column Names)
##   |--2 Binarizer (Transformer)
##   |    <binarizer_771d51e97378> 
##   |     (Parameters -- Column Names)
##   |      input_col: arrdelay
##   |      output_col: delayed
##   |--3 Bucketizer (Transformer)
##   |    <bucketizer_771d2b73307d> 
##   |     (Parameters -- Column Names)
##   |      input_col: crsdeptime
##   |      output_col: dephour
##   |--4 RFormula (Estimator)
##   |    <r_formula_771d5d53839c> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |     (Parameters)
##   |      formula: delayed ~ arrdelay + dephour
##   |--5 LogisticRegression (Estimator)
##   |    <logistic_regression_771d7f6d576b> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |      prediction_col: prediction
##   |      probability_col: probability
##   |      raw_prediction_col: rawPrediction
##   |     (Parameters)
##   |      elastic_net_param: 0
##   |      fit_intercept: TRUE
##   |      max_iter: 100
##   |      reg_param: 0
##   |      standardization: TRUE
##   |      threshold: 0.5
##   |      tol: 1e-06
  1. Create a new sample data set using sample_frac(), 1% of the total data should be sufficient
sample <- tbl(sc, "flights") %>%
  sample_frac(0.001) 
  1. Re-fit the model using ml_fit() and the new sample data
new_model <- ml_fit(flights_pipeline, sample)
new_model
## PipelineModel (Transformer) with 5 stages
## <pipeline_771d78a8fa3c> 
##   Stages 
##   |--1 SQLTransformer (Transformer)
##   |    <dplyr_transformer_771d2ad23958> 
##   |     (Parameters -- Column Names)
##   |--2 Binarizer (Transformer)
##   |    <binarizer_771d51e97378> 
##   |     (Parameters -- Column Names)
##   |      input_col: arrdelay
##   |      output_col: delayed
##   |--3 Bucketizer (Transformer)
##   |    <bucketizer_771d2b73307d> 
##   |     (Parameters -- Column Names)
##   |      input_col: crsdeptime
##   |      output_col: dephour
##   |--4 RFormulaModel (Transformer)
##   |    <r_formula_771d5d53839c> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |     (Transformer Info)
##   |      formula:  chr "delayed ~ arrdelay + dephour" 
##   |--5 LogisticRegressionModel (Transformer)
##   |    <logistic_regression_771d7f6d576b> 
##   |     (Parameters -- Column Names)
##   |      features_col: features
##   |      label_col: label
##   |      prediction_col: prediction
##   |      probability_col: probability
##   |      raw_prediction_col: rawPrediction
##   |     (Transformer Info)
##   |      coefficients:  num [1:2] 29.642 -0.447 
##   |      intercept:  num -458 
##   |      num_classes:  int 2 
##   |      num_features:  int 2 
##   |      threshold:  num 0.5
  1. Save the newly fitted model
ml_save(new_model, "new_model", overwrite = TRUE)
## Model successfully saved.
list.files("new_model")
## [1] "metadata" "stages"
  1. Disconnect from Spark
spark_disconnect(sc)
## NULL