SparkR Demo

Jaehyeon Kim

March, 2018

Intro to Spark


Source: Apache Spark Ecosystem

Intro to SparkR


Source: SparkR: Scaling R Programs with Spark

Launching java with spark-submit command /usr/local/spark-2.2.1/bin/spark-submit
  sparkr-shell /tmp/RtmpfVHcxh/backend_port37123e7eb08

Intro to Docker

Intro to development environment

docker-compose.yml

version: "2"

services:
  master:
    build:
        context: .
        dockerfile: Dockerfile-master
    command: /init
    hostname: master
    ports:
      - "6066:6066"
      - "7070:7070"
      - "8080:8080"
      - "50070:50070"
      - "8787:8787"
  worker:
    build:
        context: .
        dockerfile: Dockerfile-worker
    command: /init
    environment:
      SPARK_WORKER_CORES: 1
      SPARK_WORKER_MEMORY: 2g
    links:
      - master

Dockerfile-master

FROM rockerextra/spark:3.4.3
MAINTAINER Jaehyeon Kim <dottami@gmail.com>

RUN mkdir -p /etc/services.d/spark-master \
    && echo '#!/usr/bin/with-contenv sh \n /opt/util/bin/start-spark master' > /etc/services.d/spark-master/run

# add AWS credentials
COPY ./hadoop-conf/*.xml $HADOOP_CONF_DIR/

Dockerfile-worker

FROM rockerextra/spark:3.4.3
MAINTAINER Jaehyeon Kim <dottami@gmail.com>

RUN rm -rf /etc/services.d/rstudio \
    && mkdir -p /etc/services.d/spark-worker \
    && echo '#!/usr/bin/with-contenv sh \n /opt/util/bin/start-spark worker master' > /etc/services.d/spark-worker/run

# add AWS credentials
COPY ./hadoop-conf/*.xml $HADOOP_CONF_DIR/

Intro to data manipulation - load data

Sys.setenv('JAVA_HOME'='/usr/lib/jvm/java-8-openjdk-amd64')
Sys.setenv('HADOOP_HOME'='/usr/local/hadoop-2.8.2')
Sys.setenv('SPARK_HOME'='/usr/local/spark-2.2.1')

library(magrittr); library(tibble); library(dplyr)
library(SparkR, lib.loc=file.path(Sys.getenv('SPARK_HOME'),'R', 'lib'))
sparkR.session(master = 'spark://master:7077', appName = 'titanic demo',
               sparkConfig = list(spark.driver.memory = '2g'))

tdf <- read.csv('titanic.csv', stringsAsFactors = FALSE) %>%
  dplyr::sample_frac(1, replace = FALSE) %>% as.tibble()
rec <- nrow(tdf)
df <- as.DataFrame(tdf)

df %>% head(2)
  class   age  sex survived
1 third adult male       no
2  crew adult male       no

printSchema(df)
root
 |-- class: string (nullable = true)
 |-- age: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- survived: string (nullable = true)

Intro to data manipulation - check data

## more functions
str(df)
'SparkDataFrame': 4 variables:
 $ class   : chr "third" "crew" "first" "first" "second" "crew"
 $ age     : chr "adult" "adult" "adult" "adult" "adult" "adult"
 $ sex     : chr "male" "male" "male" "female" "male" "male"
 $ survived: chr "no" "no" "yes" "yes" "no" "no"

summary(df) %>% collect()
  summary class   age    sex survived
1   count  2201  2201   2201     2201
2    mean  <NA>  <NA>   <NA>     <NA>
3  stddev  <NA>  <NA>   <NA>     <NA>
4     min  crew adult female       no
5     max third child   male      yes

df %>% collect() # SparkDataFrame to data.frame

## check classes
df %>% class() # SparkDataFrame
df %>% head() %>% class() # data.frame

Intro to data manipulation - select, filter…

## column expressions
df$survived # Column survived
column('survived') # Column survived
'survived' # string
expr('survived') # Column survived

## selecting rows, columns
df %>% select(df$survived) %>% head()
df %>% select(column('survived')) %>% head()
df %>% select(expr('survived')) %>% head()
df %>% select('class', 'survived') %>% head()

tdf %>% dplyr::select(class, survived) %>% head()

df %>% filter('survived == "yes" and age == "child"') %>% head()
df %>% filter(df$survived == 'yes' & df$age == 'child') %>% head()

tdf %>% dplyr::filter(survived == 'yes' & age == 'child') %>% head()

Intro to data manipulation - group_by, mutate …

## creating variable
df %>% mutate(age_c = ifelse(expr('age') == 'adult', '1', '0')) %>% 
  head(2)

  class   age  sex survived age_c
1 third adult male       no     1
2  crew adult male       no     1

## grouping, aggregation
df %>% group_by('class', 'age') %>%
  summarize(count = n(expr('survived'))) %>%
  arrange('class', 'age') %>% collect()

   class   age count                                                            
1   crew adult   885
2  first adult   319
3  first child     6
4 second adult   261
5 second child    24
6  third adult   627
7  third child    79

tdf %>% dplyr::group_by(class, age) %>%
  dplyr::summarise(count = n())

Intro to data manipulation - join

rdf <- data.frame(age = c('adult', 'child'), lvl = c('0', '1'), stringsAsFactors = FALSE)
rDF <- as.DataFrame(rdf)

df %>% join(rDF, df$age == rDF$age, 'inner') %>%
  group_by('class', 'lvl') %>%
  summarize(count = n(expr('survived'))) %>%
  arrange('class', 'lvl') %>% collect()

   class lvl count
1   crew   0   885
2  first   0   319
3  first   1     6
4 second   0   261
5 second   1    24
6  third   0   627
7  third   1    79

tdf %>% dplyr::inner_join(rdf, by = 'age') %>%
  dplyr::group_by(class, lvl) %>%
  dplyr::summarise(count = n())

Data manipulation case study - intro

Data manipulation case study - multiple transformations

tmp <- df %>% group_by('class', 'age') %>%
  summarize(count = n(expr('survived')))
tmp %>% mutate(prop = expr('count') / rec) %>% 
  arrange('class', 'age') %>% collect()


tdf %>% dplyr::group_by(class, age) %>%
  dplyr::summarise(count = n()) %>% 
  dplyr::mutate(prop = count / rec)

Data manipulation case study - dapply

## dapply, dapplyCollect
schema <- structType(
  structField('class', 'string'),
  structField('age', 'string'),
  structField('count', 'double'), # not integer
  structField('prop', 'double')
)

fn <- function(x) {
  cbind(x, x$count / rec) # expr() not working
}

# may take more time but no temporary DF
df %>% group_by('class', 'age') %>%
  summarize(count = n(expr('survived'))) %>%
  dapply(fn, schema) %>%
  arrange('class', 'age') %>% collect()

Data manipulation case study - gapply

## gapply, gapplyCollect
schema <- structType(
  structField('class', 'string'),
  structField('age', 'string'),
  structField('count', 'integer'),
  structField('prop', 'double')
)

fn <- function(key, x) {
  data.frame(key, nrow(x), nrow(x)/rec, stringsAsFactors = FALSE)
}

df %>% gapply(cols = c('class', 'age'), func = fn, schema = schema) %>%
  arrange('class', 'age') %>% collect()

Data manipulation case study - sql

## sql queries
createOrReplaceTempView(df, 'titanic_tbl')

`%++%` <- function(a, b) paste(a, b)
qry <- '
  SELECT class, age, count(*) as count, count(*) /' %++% 
          format(round(rec, 1), nsmall = 1) %++% 'as prop' %++%
  'FROM titanic_tbl' %++%
  'group by class, age' %++%
  'order by class, age'

sql(qry) %>% collect()

Data manipulation case study - spark.lapply

## spark.lapply
discnt <- tdf %>% dplyr::distinct(class, age)
lst <- lapply(1:nrow(discnt), function(i) {
  cls <- discnt[i, 1] %>% unlist()
  ag <- discnt[i, 2] %>% unlist()
  list(dat = tdf %>% dplyr::filter(class == cls & age == ag),
       rec = rec)
})

fn <- function(elem) {
  library(magrittr)
  elem$dat %>% dplyr::group_by(class, age) %>%
    dplyr::summarise(count = n(), prop = count / elem$rec)
}

spark.lapply(lst, fn) %>%
  bind_rows() %>%
  dplyr::arrange(class, age)

Machine Learning - session initialization

Sys.setenv('JAVA_HOME'='/usr/lib/jvm/java-8-openjdk-amd64')
Sys.setenv('HADOOP_HOME'='/usr/local/hadoop-2.8.2')
Sys.setenv('SPARK_HOME'='/usr/local/spark-2.2.1')

library(ggplot2)
library(magrittr)
library(tibble)
library(dplyr)
library(SparkR, lib.loc=file.path(Sys.getenv('SPARK_HOME'),'R', 'lib'))

source('utils.R')
seed <- 1237
ext_opts <- '-Dhttp.proxyHost=10.74.1.25 -Dhttp.proxyPort=8080 -Dhttps.proxyHost=10.74.1.25 -Dhttps.proxyPort=8080'

sparkR.session(master = "spark://master:7077",
               appName = 'ml demo',
               sparkConfig = list(spark.driver.memory = '2g'), 
               sparkPackages = 'org.apache.hadoop:hadoop-aws:2.8.2',
               spark.driver.extraJavaOptions = ext_opts)

Machine Learning - load data from S3

dat <- read.df('s3n://sparkr-demo/public-data/flight_2007.csv', 
               header = 'true', source = 'csv', inferSchema = 'true')

      date dep_time arr_time unique_carrier air_time arr_delay dep_delay origin dest distance cancelled
1 2007/1/1     1232     1341             WN       54         1         7    SMF  ONT      389         0
2 2007/1/1     1918     2043             WN       74         8        13    SMF  PDX      479         0
3 2007/1/1     2206     2334             WN       73        34        36    SMF  PDX      479         0
4 2007/1/1     1230     1356             WN       75        26        30    SMF  PDX      479         0
5 2007/1/1      831      957             WN       74        -3         1    SMF  PDX      479         0
6 2007/1/1     1430     1553             WN       74         3        10    SMF  PDX      479         0

Flight data for 2007, originally from RITA - 7,453,215 records in total.

source Data expo ’09

Machine Learning - data exploration

dat_s <- randomSplit(dat, weights = c(0.1, 0.9), seed)[[1]] %>%
  collect() %>% as.tibble()
dat_s <- dat_s %>% 
  dplyr::filter(!is.na(arr_delay) & !is.na(dep_delay)) %>%
  dplyr::mutate(
    month = as.integer(format(as.Date(date, format('%Y/%m/%d')), '%m')),
    weekday = weekdays(as.Date(date, format('%Y/%m/%d')), TRUE),
    weekday = factor(weekday, levels = c('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun')),
    is_weekend = case_when(
      weekday %in% c('Fri', 'Sat', 'Sun') ~ 1,
      TRUE ~ 0),
    dep_hour = floor(dep_time/100),
    arr_hour = floor(arr_time/100),
    is_delay = if_else(arr_delay > 15, 'yes', 'no')
  ) %>%
  dplyr::filter(cancelled == 0) %>%
  dplyr::select(-date, -cancelled, -dep_time, -arr_time)
dat_s %>% select(-unique_carrier, -origin, -dest) %>% head()

# A tibble: 727,530 x 13
   unique_carrier air_time arr_delay dep_delay origin dest  distance month weekday is_weekend dep_hour arr_hour is_delay
   <chr>             <int>     <int>     <int> <chr>  <chr>    <int> <int> <fct>        <dbl>    <dbl>    <dbl> <chr>   
 1 CO                  232      -  2         6 PHX    EWR       2133     1 Mon              0        0     6.00 no      
 2 YV                   80        16         8 LAS    ELP        584     1 Mon              0        0     2.00 yes     
 3 US                   59        16         8 LAS    SFO        414     1 Mon              0        0     1.00 yes     
 4 B6                   75       137       153 JFK    CMH        483     1 Mon              0        0     2.00 yes     
 5 US                   67        31        19 LAS    ABQ        487     1 Mon              0        0     2.00 yes     
 6 WN                  115       189       207 FLL    BWI        925     1 Mon              0        0     2.00 yes     
 7 YV                   64        39        26 LAS    SLC        368     1 Mon              0        0     2.00 yes     
 8 UA                  199        44        21 SEA    ORD       1721     1 Mon              0        0     6.00 yes     
 9 US                   60        22        21 LAS    OAK        407     1 Mon              0        0     1.00 yes     
10 AA                  165       200       204 LGA    MIA       1097     1 Mon              0        0     3.00 yes     
# ... with 727,520 more rows

Machine Learning - data exploration ctd

# use as is
get_multiplot('weekday')

# not using
get_multiplot('is_weekend')

# regrouping
# '1' if month is 12 or 1-3
# '2' if month is 6-8
# '3' if month is 4-5 or 9-11
get_multiplot('month')

# regrouping
# '1' if dep_hour is 4-12
# '2' if dep_hour is 13-19
# '3' if dep_hour is 0-3 or 20+
get_multiplot('dep_hour')

## only use dep_delay
bind_rows(
  summarise_cont(dat_s, 'dep_delay'),
  summarise_cont(dat_s, 'distance'),
  summarise_cont(dat_s, 'air_time')
)
##         col is_delay        mean median  min  max
## 1 dep_delay       no  -0.4642659     -2 -168   85
## 2 dep_delay      yes  49.2538712     34  -49 1831
## 3  distance       no 713.9658008    557   21 4962
## 4  distance      yes 751.6156694    595   31 4962
## 5  air_time       no 100.7641820     83    0  641
## 6  air_time      yes 109.7099801     91    0  729
## dep_delay is highly correlated with arr_delay
dat_s %>% dplyr::select(arr_delay, dep_delay) %>% cor()
##           arr_delay dep_delay
## arr_delay  1.000000  0.932244
## dep_delay  0.932244  1.000000

Machine Learning - feature generation

`%++%` <- function(a, b) paste(a, b)
month_c_expr <- 
  "case when split(date, '/')[1] in ('6', '7', '8') then '2'" %++%
    "when split(date, '/')[1] in ('1', '2', '3', '12') then '1'" %++%
  "else '3' end"
weekday_expr <- 
  "case date_format(to_date(date, 'yyyy/mm/dd'), 'E')" %++%
    "when 'Mon' then '1' when 'Tue' then '2'" %++%
    "when 'Wed' then '3' when 'Thu' then '4'" %++%
    "when 'Fri' then '5' when 'Sat' then '6'" %++% 
  "else '7' end"
dep_hour_c_expr <- 
  "case when cast(floor(dep_time/100) AS integer) <= 3 then '3'" %++%
    "when cast(floor(dep_time/100) AS integer) <= 12 then '1'" %++%
    "when cast(floor(dep_time/100) AS integer) <= 19 then '2'" %++%
  "else '3' end"

dat <- dat %>% dropna(cols = c('arr_delay', 'dep_delay')) %>%
  mutate(
    month_c = expr(month_c_expr),
    weekday = expr(weekday_expr),
    dep_hour_c = expr(dep_hour_c_expr),
    is_delay = ifelse(expr('arr_delay') > 15, 'yes', 'no')
) %>% 
  filter(expr('cancelled == 0'))
sel_cols <- c('is_delay', 'dep_delay', 'month_c', 'dep_hour_c', 'weekday')
dat %>% select(sel_cols) %>% head()

  is_delay dep_delay month_c dep_hour_c weekday
1       no         7       1          1       1
2       no        13       1          2       1
3      yes        36       1          3       1
4      yes        30       1          1       1
5       no         1       1          1       1
6       no        10       1          2       1

Machine Learning - model fitting

dat_split <- randomSplit(dat, weights = c(0.7, 0.3), seed)
train <- dat_split[[1]]
test <- dat_split[[2]]

formula <- 'is_delay ~ dep_delay + month_c + dep_hour_c + weekday' %>% 
  as.formula()

model <- spark.randomForest(train, formula, 'classification')
## writing/loading model
# write.ml(model, 's3n://sparkr-demo/model/flight_2007_rf.model')
# model <- read.ml('s3n://sparkr-demo/model/flight_2007_rf.model')

Machine Learning - model evaluation

preds <- predict(model, test)

hpreds <- preds %>% head(50) %>%
  dplyr::select(is_delay, prediction, probability, rawPrediction) %>%
  dplyr::rename(prob = probability, raw_pred = rawPrediction)

hpreds %>% head()

  is_delay prediction                     prob                 raw_pred
1      yes         no <environment: 0x5e7dfd8> <environment: 0x6cda460>
2      yes        yes <environment: 0x5e769a0> <environment: 0x6cdfe88>
3      yes         no <environment: 0x5e6ce68> <environment: 0x6ce78c8>
4       no         no <environment: 0x5e65460> <environment: 0x6cee580>
5       no         no <environment: 0x5e5dca0> <environment: 0x6cf5280>
6       no         no <environment: 0x5e55c28> <environment: 0x6cfaf20>

Machine Learning - model evaluation ctd

extract_from_jmethod <- function(dat, which='prob') {
  envs <- dat[, which]
  do.call(rbind, lapply(envs, function(e) {
    df <- data.frame(
      sparkR.callJMethod(unlist(e), 'apply', as.integer(0)),
      sparkR.callJMethod(unlist(e), 'apply', as.integer(1))
    )
    names(df) <- paste0(which, c('_yes', '_no'))
    df
  }))
}

hpreds_up <- bind_cols(hpreds,
                       extract_from_jmethod(hpreds, 'prob'),
                       extract_from_jmethod(hpreds, 'raw_pred')
) %>% dplyr::select(-prob, -raw_pred)
hpreds_up %>% head()
##   is_delay prediction  prob_yes   prob_no raw_pred_yes raw_pred_no
## 1      yes         no 0.7387997 0.2612003    14.775994    5.224006
## 2      yes        yes 0.1029412 0.8970588     2.058823   17.941177
## 3      yes         no 0.8681795 0.1318205    17.363590    2.636410
## 4       no         no 0.8880674 0.1119326    17.761348    2.238652
## 5       no         no 0.7650525 0.2349475    15.301049    4.698951
## 6       no         no 0.7650525 0.2349475    15.301049    4.698951

Machine Learning - model evaluation ctd

cmat <- preds %>% crosstab('is_delay', 'prediction')
cmat
##   is_delay_prediction      no    yes
## 1                 yes  154930 365202
## 2                  no 1619566  42170
# accuracy
1 - sum(cmat$no[1], cmat$yes[2])/sum(cmat$no, cmat$yes)
## [1] 0.9096646

Machine Learning - model evaluation ctd

# importance <- get_feat_importance(model))
ggplot(importance, aes(x=feature, y=importance)) +
  geom_bar(stat = 'identity', fill = 'steel blue') +
  ggtitle('Feature importance') +
  theme(plot.title = element_text(hjust = 0.5), 
        axis.text.x = element_text(angle = 45, hjust = 1))

get_feat_importance <- function(model) {
  s <- summary(model)
  features <- s$features %>% unlist()
  imp_ext <- stringr::str_extract_all(s$featureImportances, '\\[(.*?)\\]') %>% 
    unlist()
  importance <- imp_ext[length(imp_ext)] %>% 
    stringr::str_replace_all('[\\[|\\]]', '') %>%
    strsplit(',') %>% unlist() %>% as.numeric()
  data.frame(feature = features, importance = importance) %>%
    dplyr::arrange(-importance) %>% as.tibble()
}

Further topics