8 Spark pipelines
8.1 Build a pipeline
Step-by-step of how to build a new Spark pipeline
- Use
sdf_partition()
to create a sample of 1% training and 1% testing of the flights table.
model_data <- sdf_partition(
tbl(sc, "flights"),
training = 0.01,
testing = 0.01,
rest = 0.98
)
- Recreate the
dplyr
code in the cached_flights variable from the previous unit. Assign it to a new variable calledpepeline_df
.
pipeline_df <- model_data$training %>%
mutate(
arrdelay = ifelse(arrdelay == "NA", 0, arrdelay),
depdelay = ifelse(depdelay == "NA", 0, depdelay)
) %>%
select(
month,
dayofmonth,
arrtime,
arrdelay,
depdelay,
crsarrtime,
crsdeptime,
distance
) %>%
mutate_all(as.numeric)
- Start a new pipeline with
ml_pipeline()
anddplyr
-pipe intoft_dplyr_transformer()
. Usepipeline_df
as thetbl
argument.
ml_pipeline(sc) %>%
ft_dplyr_transformer(
tbl = pipeline_df
)
## Pipeline (Estimator) with 1 stage
## <pipeline_771d46fa3b12>
## Stages
## |--1 SQLTransformer (Transformer)
## | <dplyr_transformer_771d26c841da>
## | (Parameters -- Column Names)
- Pipe code into
ft_binarizer()
to determine if arrdelay is over 15 minutes.
ml_pipeline(sc) %>%
ft_dplyr_transformer(
tbl = pipeline_df
) %>%
ft_binarizer(
input_col = "arrdelay",
output_col = "delayed",
threshold = 15
)
## Pipeline (Estimator) with 2 stages
## <pipeline_771d790c2547>
## Stages
## |--1 SQLTransformer (Transformer)
## | <dplyr_transformer_771d7c65f40a>
## | (Parameters -- Column Names)
## |--2 Binarizer (Transformer)
## | <binarizer_771d1b32e592>
## | (Parameters -- Column Names)
## | input_col: arrdelay
## | output_col: delayed
- Pipe code into
ft_bucketizer()
. Use it to split dephour into six even segments of 4 hours.
ml_pipeline(sc) %>%
ft_dplyr_transformer(
tbl = pipeline_df
) %>%
ft_binarizer(
input_col = "arrdelay",
output_col = "delayed",
threshold = 15
) %>%
ft_bucketizer(
input_col = "crsdeptime",
output_col = "dephour",
splits = c(0, 400, 800, 1200, 1600, 2000, 2400)
)
## Pipeline (Estimator) with 3 stages
## <pipeline_771d55043e5f>
## Stages
## |--1 SQLTransformer (Transformer)
## | <dplyr_transformer_771d397eb5a8>
## | (Parameters -- Column Names)
## |--2 Binarizer (Transformer)
## | <binarizer_771d2ae8baf5>
## | (Parameters -- Column Names)
## | input_col: arrdelay
## | output_col: delayed
## |--3 Bucketizer (Transformer)
## | <bucketizer_771d5bc82de8>
## | (Parameters -- Column Names)
## | input_col: crsdeptime
## | output_col: dephour
- Add
ft_r_formula()
with a model that compares uses arrdelay and dephour against depdelay.
ml_pipeline(sc) %>%
ft_dplyr_transformer(
tbl = pipeline_df
) %>%
ft_binarizer(
input_col = "arrdelay",
output_col = "delayed",
threshold = 15
) %>%
ft_bucketizer(
input_col = "crsdeptime",
output_col = "dephour",
splits = c(0, 400, 800, 1200, 1600, 2000, 2400)
) %>%
ft_r_formula(delayed ~ arrdelay + dephour)
## Pipeline (Estimator) with 4 stages
## <pipeline_771d461623dc>
## Stages
## |--1 SQLTransformer (Transformer)
## | <dplyr_transformer_771d52515c6e>
## | (Parameters -- Column Names)
## |--2 Binarizer (Transformer)
## | <binarizer_771d6615abc0>
## | (Parameters -- Column Names)
## | input_col: arrdelay
## | output_col: delayed
## |--3 Bucketizer (Transformer)
## | <bucketizer_771de7aa7e3>
## | (Parameters -- Column Names)
## | input_col: crsdeptime
## | output_col: dephour
## |--4 RFormula (Estimator)
## | <r_formula_771d7d7168b8>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## | (Parameters)
## | formula: delayed ~ arrdelay + dephour
- Pipe into a logistic regression model, with
ml_logistic_regression()
ml_pipeline(sc) %>%
ft_dplyr_transformer(
tbl = pipeline_df
) %>%
ft_binarizer(
input_col = "arrdelay",
output_col = "delayed",
threshold = 15
) %>%
ft_bucketizer(
input_col = "crsdeptime",
output_col = "dephour",
splits = c(0, 400, 800, 1200, 1600, 2000, 2400)
) %>%
ft_r_formula(delayed ~ arrdelay + dephour) %>%
ml_logistic_regression()
## Pipeline (Estimator) with 5 stages
## <pipeline_771d59092a6b>
## Stages
## |--1 SQLTransformer (Transformer)
## | <dplyr_transformer_771d68b4188b>
## | (Parameters -- Column Names)
## |--2 Binarizer (Transformer)
## | <binarizer_771d6bdee23c>
## | (Parameters -- Column Names)
## | input_col: arrdelay
## | output_col: delayed
## |--3 Bucketizer (Transformer)
## | <bucketizer_771d547fef7d>
## | (Parameters -- Column Names)
## | input_col: crsdeptime
## | output_col: dephour
## |--4 RFormula (Estimator)
## | <r_formula_771d1ba41a0e>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## | (Parameters)
## | formula: delayed ~ arrdelay + dephour
## |--5 LogisticRegression (Estimator)
## | <logistic_regression_771df916e8a>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## | prediction_col: prediction
## | probability_col: probability
## | raw_prediction_col: rawPrediction
## | (Parameters)
## | elastic_net_param: 0
## | fit_intercept: TRUE
## | max_iter: 100
## | reg_param: 0
## | standardization: TRUE
## | threshold: 0.5
## | tol: 1e-06
- Assign the entire piped code to a new variable called
flights_pipeline
flights_pipeline <- ml_pipeline(sc) %>%
ft_dplyr_transformer(
tbl = pipeline_df
) %>%
ft_binarizer(
input_col = "arrdelay",
output_col = "delayed",
threshold = 15
) %>%
ft_bucketizer(
input_col = "crsdeptime",
output_col = "dephour",
splits = c(0, 400, 800, 1200, 1600, 2000, 2400)
) %>%
ft_r_formula(delayed ~ arrdelay + dephour) %>%
ml_logistic_regression()
flights_pipeline
## Pipeline (Estimator) with 5 stages
## <pipeline_771d78a8fa3c>
## Stages
## |--1 SQLTransformer (Transformer)
## | <dplyr_transformer_771d2ad23958>
## | (Parameters -- Column Names)
## |--2 Binarizer (Transformer)
## | <binarizer_771d51e97378>
## | (Parameters -- Column Names)
## | input_col: arrdelay
## | output_col: delayed
## |--3 Bucketizer (Transformer)
## | <bucketizer_771d2b73307d>
## | (Parameters -- Column Names)
## | input_col: crsdeptime
## | output_col: dephour
## |--4 RFormula (Estimator)
## | <r_formula_771d5d53839c>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## | (Parameters)
## | formula: delayed ~ arrdelay + dephour
## |--5 LogisticRegression (Estimator)
## | <logistic_regression_771d7f6d576b>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## | prediction_col: prediction
## | probability_col: probability
## | raw_prediction_col: rawPrediction
## | (Parameters)
## | elastic_net_param: 0
## | fit_intercept: TRUE
## | max_iter: 100
## | reg_param: 0
## | standardization: TRUE
## | threshold: 0.5
## | tol: 1e-06
8.2 Fit, evaluate, save
- Fit (train) the
flights_pipeline
pipeline model using the training data onmodel_data
. The function to use isml_fit()
model <- ml_fit(
flights_pipeline,
model_data$training
)
model
## PipelineModel (Transformer) with 5 stages
## <pipeline_771d78a8fa3c>
## Stages
## |--1 SQLTransformer (Transformer)
## | <dplyr_transformer_771d2ad23958>
## | (Parameters -- Column Names)
## |--2 Binarizer (Transformer)
## | <binarizer_771d51e97378>
## | (Parameters -- Column Names)
## | input_col: arrdelay
## | output_col: delayed
## |--3 Bucketizer (Transformer)
## | <bucketizer_771d2b73307d>
## | (Parameters -- Column Names)
## | input_col: crsdeptime
## | output_col: dephour
## |--4 RFormulaModel (Transformer)
## | <r_formula_771d5d53839c>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## | (Transformer Info)
## | formula: chr "delayed ~ arrdelay + dephour"
## |--5 LogisticRegressionModel (Transformer)
## | <logistic_regression_771d7f6d576b>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## | prediction_col: prediction
## | probability_col: probability
## | raw_prediction_col: rawPrediction
## | (Transformer Info)
## | coefficients: num [1:2] 28.5057 -0.0886
## | intercept: num -441
## | num_classes: int 2
## | num_features: int 2
## | threshold: num 0.5
- Use the newly fitted model to perform predictions using
ml_transform()
. Use the testing data frommodel_data
predictions <- ml_transform(
x = model,
dataset = model_data$testing
)
- Use
group_by()
/tally()
to see how the model performed
predictions %>%
group_by(delayed, prediction) %>%
tally()
## # Source: spark<?> [?? x 3]
## # Groups: delayed
## delayed prediction n
## <dbl> <dbl> <dbl>
## 1 0 0 55669
## 2 1 1 14559
- Save the model into disk using
ml_save()
ml_save(model, "saved_model", overwrite = TRUE)
## Model successfully saved.
list.files("saved_model")
## [1] "metadata" "stages"
- Save the pipeline using
ml_save()
ml_save(flights_pipeline, "saved_pipeline", overwrite = TRUE)
## Model successfully saved.
list.files("saved_pipeline")
## [1] "metadata" "stages"
- Close the Spark session
spark_disconnect(sc)
## NULL
8.3 Reload model
Use the saved model inside a different Spark session
- Open a new Spark connection and reload the data
library(sparklyr)
sc <- spark_connect(master = "local", version = "2.0.0")
spark_flights <- spark_read_csv(
sc,
name = "flights",
path = "/usr/share/class/flights/data/",
memory = FALSE,
columns = file_columns,
infer_schema = FALSE
)
- Use
ml_load()
to reload the model directly into the Spark session
reload <- ml_load(sc, "saved_model")
reload
## PipelineModel (Transformer) with 5 stages
## <pipeline_771d78a8fa3c>
## Stages
## |--1 SQLTransformer (Transformer)
## | <dplyr_transformer_771d2ad23958>
## | (Parameters -- Column Names)
## |--2 Binarizer (Transformer)
## | <binarizer_771d51e97378>
## | (Parameters -- Column Names)
## | input_col: arrdelay
## | output_col: delayed
## |--3 Bucketizer (Transformer)
## | <bucketizer_771d2b73307d>
## | (Parameters -- Column Names)
## | input_col: crsdeptime
## | output_col: dephour
## |--4 RFormulaModel (Transformer)
## | <r_formula_771d5d53839c>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## |--5 LogisticRegressionModel (Transformer)
## | <logistic_regression_771d7f6d576b>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## | prediction_col: prediction
## | probability_col: probability
## | raw_prediction_col: rawPrediction
## | (Transformer Info)
## | coefficients: num [1:2] 28.5057 -0.0886
## | intercept: num -441
## | num_classes: int 2
## | num_features: int 2
## | threshold: num 0.5
- Create a new table called current. It needs to pull today’s flights.
library(lubridate)
current <- tbl(sc, "flights") %>%
filter(
month == !! month(now()),
dayofmonth == !! day(now())
)
show_query(current)
## <SQL>
## SELECT *
## FROM `flights`
## WHERE ((`month` = 1.0) AND (`dayofmonth` = 11))
- Run predictions against
current
usingml__transform()
.
new_predictions <- ml_transform(
x = ml_load(sc, "saved_model"),
dataset = current
)
- Get a quick count of expected delayed flights. The field to check on is called
prediction
new_predictions %>%
summarise(late_fligths = sum(prediction, na.rm = TRUE))
## # Source: spark<?> [?? x 1]
## late_fligths
## <dbl>
## 1 3979
8.4 Reload pipeline
Overview of how to use new data to re-fit the pipeline, thus creating a new pipeline model
- Use
ml_load()
to reload the pipeline into the Spark session
flights_pipeline <- ml_load(sc, "saved_pipeline")
flights_pipeline
## Pipeline (Estimator) with 5 stages
## <pipeline_771d78a8fa3c>
## Stages
## |--1 SQLTransformer (Transformer)
## | <dplyr_transformer_771d2ad23958>
## | (Parameters -- Column Names)
## |--2 Binarizer (Transformer)
## | <binarizer_771d51e97378>
## | (Parameters -- Column Names)
## | input_col: arrdelay
## | output_col: delayed
## |--3 Bucketizer (Transformer)
## | <bucketizer_771d2b73307d>
## | (Parameters -- Column Names)
## | input_col: crsdeptime
## | output_col: dephour
## |--4 RFormula (Estimator)
## | <r_formula_771d5d53839c>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## | (Parameters)
## | formula: delayed ~ arrdelay + dephour
## |--5 LogisticRegression (Estimator)
## | <logistic_regression_771d7f6d576b>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## | prediction_col: prediction
## | probability_col: probability
## | raw_prediction_col: rawPrediction
## | (Parameters)
## | elastic_net_param: 0
## | fit_intercept: TRUE
## | max_iter: 100
## | reg_param: 0
## | standardization: TRUE
## | threshold: 0.5
## | tol: 1e-06
- Create a new sample data set using
sample_frac()
, 1% of the total data should be sufficient
sample <- tbl(sc, "flights") %>%
sample_frac(0.001)
- Re-fit the model using
ml_fit()
and the new sample data
new_model <- ml_fit(flights_pipeline, sample)
new_model
## PipelineModel (Transformer) with 5 stages
## <pipeline_771d78a8fa3c>
## Stages
## |--1 SQLTransformer (Transformer)
## | <dplyr_transformer_771d2ad23958>
## | (Parameters -- Column Names)
## |--2 Binarizer (Transformer)
## | <binarizer_771d51e97378>
## | (Parameters -- Column Names)
## | input_col: arrdelay
## | output_col: delayed
## |--3 Bucketizer (Transformer)
## | <bucketizer_771d2b73307d>
## | (Parameters -- Column Names)
## | input_col: crsdeptime
## | output_col: dephour
## |--4 RFormulaModel (Transformer)
## | <r_formula_771d5d53839c>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## | (Transformer Info)
## | formula: chr "delayed ~ arrdelay + dephour"
## |--5 LogisticRegressionModel (Transformer)
## | <logistic_regression_771d7f6d576b>
## | (Parameters -- Column Names)
## | features_col: features
## | label_col: label
## | prediction_col: prediction
## | probability_col: probability
## | raw_prediction_col: rawPrediction
## | (Transformer Info)
## | coefficients: num [1:2] 29.642 -0.447
## | intercept: num -458
## | num_classes: int 2
## | num_features: int 2
## | threshold: num 0.5
- Save the newly fitted model
ml_save(new_model, "new_model", overwrite = TRUE)
## Model successfully saved.
list.files("new_model")
## [1] "metadata" "stages"
- Disconnect from Spark
spark_disconnect(sc)
## NULL