Jaehyeon Kim
March, 2018
Source: Apache Spark Ecosystem
Source: SparkR: Scaling R Programs with Spark
Launching java with spark-submit command /usr/local/spark-2.2.1/bin/spark-submit
sparkr-shell /tmp/RtmpfVHcxh/backend_port37123e7eb08
docker-compose up -d --scale worker=6
docker-compose stop
docker-compose start
docker-compose rm -f
(after stop)$HADOOP_CONF_DIR
version: "2"
services:
master:
build:
context: .
dockerfile: Dockerfile-master
command: /init
hostname: master
ports:
- "6066:6066"
- "7070:7070"
- "8080:8080"
- "50070:50070"
- "8787:8787"
worker:
build:
context: .
dockerfile: Dockerfile-worker
command: /init
environment:
SPARK_WORKER_CORES: 1
SPARK_WORKER_MEMORY: 2g
links:
- master
FROM rockerextra/spark:3.4.3
MAINTAINER Jaehyeon Kim <dottami@gmail.com>
RUN mkdir -p /etc/services.d/spark-master \
&& echo '#!/usr/bin/with-contenv sh \n /opt/util/bin/start-spark master' > /etc/services.d/spark-master/run
# add AWS credentials
COPY ./hadoop-conf/*.xml $HADOOP_CONF_DIR/
FROM rockerextra/spark:3.4.3
MAINTAINER Jaehyeon Kim <dottami@gmail.com>
RUN rm -rf /etc/services.d/rstudio \
&& mkdir -p /etc/services.d/spark-worker \
&& echo '#!/usr/bin/with-contenv sh \n /opt/util/bin/start-spark worker master' > /etc/services.d/spark-worker/run
# add AWS credentials
COPY ./hadoop-conf/*.xml $HADOOP_CONF_DIR/
Sys.setenv('JAVA_HOME'='/usr/lib/jvm/java-8-openjdk-amd64')
Sys.setenv('HADOOP_HOME'='/usr/local/hadoop-2.8.2')
Sys.setenv('SPARK_HOME'='/usr/local/spark-2.2.1')
library(magrittr); library(tibble); library(dplyr)
library(SparkR, lib.loc=file.path(Sys.getenv('SPARK_HOME'),'R', 'lib'))
sparkR.session(master = 'spark://master:7077', appName = 'titanic demo',
sparkConfig = list(spark.driver.memory = '2g'))
tdf <- read.csv('titanic.csv', stringsAsFactors = FALSE) %>%
dplyr::sample_frac(1, replace = FALSE) %>% as.tibble()
rec <- nrow(tdf)
df <- as.DataFrame(tdf)
df %>% head(2)
class age sex survived
1 third adult male no
2 crew adult male no
printSchema(df)
root
|-- class: string (nullable = true)
|-- age: string (nullable = true)
|-- sex: string (nullable = true)
|-- survived: string (nullable = true)
## more functions
str(df)
'SparkDataFrame': 4 variables:
$ class : chr "third" "crew" "first" "first" "second" "crew"
$ age : chr "adult" "adult" "adult" "adult" "adult" "adult"
$ sex : chr "male" "male" "male" "female" "male" "male"
$ survived: chr "no" "no" "yes" "yes" "no" "no"
summary(df) %>% collect()
summary class age sex survived
1 count 2201 2201 2201 2201
2 mean <NA> <NA> <NA> <NA>
3 stddev <NA> <NA> <NA> <NA>
4 min crew adult female no
5 max third child male yes
df %>% collect() # SparkDataFrame to data.frame
## check classes
df %>% class() # SparkDataFrame
df %>% head() %>% class() # data.frame
## column expressions
df$survived # Column survived
column('survived') # Column survived
'survived' # string
expr('survived') # Column survived
## selecting rows, columns
df %>% select(df$survived) %>% head()
df %>% select(column('survived')) %>% head()
df %>% select(expr('survived')) %>% head()
df %>% select('class', 'survived') %>% head()
tdf %>% dplyr::select(class, survived) %>% head()
df %>% filter('survived == "yes" and age == "child"') %>% head()
df %>% filter(df$survived == 'yes' & df$age == 'child') %>% head()
tdf %>% dplyr::filter(survived == 'yes' & age == 'child') %>% head()
::
for calling themexpr()
is more expressive - see ML section## creating variable
df %>% mutate(age_c = ifelse(expr('age') == 'adult', '1', '0')) %>%
head(2)
class age sex survived age_c
1 third adult male no 1
2 crew adult male no 1
## grouping, aggregation
df %>% group_by('class', 'age') %>%
summarize(count = n(expr('survived'))) %>%
arrange('class', 'age') %>% collect()
class age count
1 crew adult 885
2 first adult 319
3 first child 6
4 second adult 261
5 second child 24
6 third adult 627
7 third child 79
tdf %>% dplyr::group_by(class, age) %>%
dplyr::summarise(count = n())
rdf <- data.frame(age = c('adult', 'child'), lvl = c('0', '1'), stringsAsFactors = FALSE)
rDF <- as.DataFrame(rdf)
df %>% join(rDF, df$age == rDF$age, 'inner') %>%
group_by('class', 'lvl') %>%
summarize(count = n(expr('survived'))) %>%
arrange('class', 'lvl') %>% collect()
class lvl count
1 crew 0 885
2 first 0 319
3 first 1 6
4 second 0 261
5 second 1 24
6 third 0 627
7 third 1 79
tdf %>% dplyr::inner_join(rdf, by = 'age') %>%
dplyr::group_by(class, lvl) %>%
dplyr::summarise(count = n())
tmp <- df %>% group_by('class', 'age') %>%
summarize(count = n(expr('survived')))
tmp %>% mutate(prop = expr('count') / rec) %>%
arrange('class', 'age') %>% collect()
tdf %>% dplyr::group_by(class, age) %>%
dplyr::summarise(count = n()) %>%
dplyr::mutate(prop = count / rec)
## dapply, dapplyCollect
schema <- structType(
structField('class', 'string'),
structField('age', 'string'),
structField('count', 'double'), # not integer
structField('prop', 'double')
)
fn <- function(x) {
cbind(x, x$count / rec) # expr() not working
}
# may take more time but no temporary DF
df %>% group_by('class', 'age') %>%
summarize(count = n(expr('survived'))) %>%
dapply(fn, schema) %>%
arrange('class', 'age') %>% collect()
dapply()
- apply a function to each partition of a SparkDataFrameexpr()
/string don’t work in the function## gapply, gapplyCollect
schema <- structType(
structField('class', 'string'),
structField('age', 'string'),
structField('count', 'integer'),
structField('prop', 'double')
)
fn <- function(key, x) {
data.frame(key, nrow(x), nrow(x)/rec, stringsAsFactors = FALSE)
}
df %>% gapply(cols = c('class', 'age'), func = fn, schema = schema) %>%
arrange('class', 'age') %>% collect()
gapply()
- apply a function to each partition of a grouped SparkDataFramenrow()
is not base R function## sql queries
createOrReplaceTempView(df, 'titanic_tbl')
`%++%` <- function(a, b) paste(a, b)
qry <- '
SELECT class, age, count(*) as count, count(*) /' %++%
format(round(rec, 1), nsmall = 1) %++% 'as prop' %++%
'FROM titanic_tbl' %++%
'group by class, age' %++%
'order by class, age'
sql(qry) %>% collect()
## spark.lapply
discnt <- tdf %>% dplyr::distinct(class, age)
lst <- lapply(1:nrow(discnt), function(i) {
cls <- discnt[i, 1] %>% unlist()
ag <- discnt[i, 2] %>% unlist()
list(dat = tdf %>% dplyr::filter(class == cls & age == ag),
rec = rec)
})
fn <- function(elem) {
library(magrittr)
elem$dat %>% dplyr::group_by(class, age) %>%
dplyr::summarise(count = n(), prop = count / elem$rec)
}
spark.lapply(lst, fn) %>%
bind_rows() %>%
dplyr::arrange(class, age)
Sys.setenv('JAVA_HOME'='/usr/lib/jvm/java-8-openjdk-amd64')
Sys.setenv('HADOOP_HOME'='/usr/local/hadoop-2.8.2')
Sys.setenv('SPARK_HOME'='/usr/local/spark-2.2.1')
library(ggplot2)
library(magrittr)
library(tibble)
library(dplyr)
library(SparkR, lib.loc=file.path(Sys.getenv('SPARK_HOME'),'R', 'lib'))
source('utils.R')
seed <- 1237
ext_opts <- '-Dhttp.proxyHost=10.74.1.25 -Dhttp.proxyPort=8080 -Dhttps.proxyHost=10.74.1.25 -Dhttps.proxyPort=8080'
sparkR.session(master = "spark://master:7077",
appName = 'ml demo',
sparkConfig = list(spark.driver.memory = '2g'),
sparkPackages = 'org.apache.hadoop:hadoop-aws:2.8.2',
spark.driver.extraJavaOptions = ext_opts)
sparkR.session()
terminates before the package/dependencies downloadedsparkR.session()
after downloading completesdat <- read.df('s3n://sparkr-demo/public-data/flight_2007.csv',
header = 'true', source = 'csv', inferSchema = 'true')
date dep_time arr_time unique_carrier air_time arr_delay dep_delay origin dest distance cancelled
1 2007/1/1 1232 1341 WN 54 1 7 SMF ONT 389 0
2 2007/1/1 1918 2043 WN 74 8 13 SMF PDX 479 0
3 2007/1/1 2206 2334 WN 73 34 36 SMF PDX 479 0
4 2007/1/1 1230 1356 WN 75 26 30 SMF PDX 479 0
5 2007/1/1 831 957 WN 74 -3 1 SMF PDX 479 0
6 2007/1/1 1430 1553 WN 74 3 10 SMF PDX 479 0
Flight data for 2007, originally from RITA - 7,453,215 records in total.
source Data expo ’09
dat_s <- randomSplit(dat, weights = c(0.1, 0.9), seed)[[1]] %>%
collect() %>% as.tibble()
dat_s <- dat_s %>%
dplyr::filter(!is.na(arr_delay) & !is.na(dep_delay)) %>%
dplyr::mutate(
month = as.integer(format(as.Date(date, format('%Y/%m/%d')), '%m')),
weekday = weekdays(as.Date(date, format('%Y/%m/%d')), TRUE),
weekday = factor(weekday, levels = c('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun')),
is_weekend = case_when(
weekday %in% c('Fri', 'Sat', 'Sun') ~ 1,
TRUE ~ 0),
dep_hour = floor(dep_time/100),
arr_hour = floor(arr_time/100),
is_delay = if_else(arr_delay > 15, 'yes', 'no')
) %>%
dplyr::filter(cancelled == 0) %>%
dplyr::select(-date, -cancelled, -dep_time, -arr_time)
randomSplit()
for exploratory analysis - 727,530 recordsdat_s %>% select(-unique_carrier, -origin, -dest) %>% head()
# A tibble: 727,530 x 13
unique_carrier air_time arr_delay dep_delay origin dest distance month weekday is_weekend dep_hour arr_hour is_delay
<chr> <int> <int> <int> <chr> <chr> <int> <int> <fct> <dbl> <dbl> <dbl> <chr>
1 CO 232 - 2 6 PHX EWR 2133 1 Mon 0 0 6.00 no
2 YV 80 16 8 LAS ELP 584 1 Mon 0 0 2.00 yes
3 US 59 16 8 LAS SFO 414 1 Mon 0 0 1.00 yes
4 B6 75 137 153 JFK CMH 483 1 Mon 0 0 2.00 yes
5 US 67 31 19 LAS ABQ 487 1 Mon 0 0 2.00 yes
6 WN 115 189 207 FLL BWI 925 1 Mon 0 0 2.00 yes
7 YV 64 39 26 LAS SLC 368 1 Mon 0 0 2.00 yes
8 UA 199 44 21 SEA ORD 1721 1 Mon 0 0 6.00 yes
9 US 60 22 21 LAS OAK 407 1 Mon 0 0 1.00 yes
10 AA 165 200 204 LGA MIA 1097 1 Mon 0 0 3.00 yes
# ... with 727,520 more rows
# use as is
get_multiplot('weekday')
# not using
get_multiplot('is_weekend')
# regrouping
# '1' if month is 12 or 1-3
# '2' if month is 6-8
# '3' if month is 4-5 or 9-11
get_multiplot('month')
# regrouping
# '1' if dep_hour is 4-12
# '2' if dep_hour is 13-19
# '3' if dep_hour is 0-3 or 20+
get_multiplot('dep_hour')
## only use dep_delay
bind_rows(
summarise_cont(dat_s, 'dep_delay'),
summarise_cont(dat_s, 'distance'),
summarise_cont(dat_s, 'air_time')
)
## col is_delay mean median min max
## 1 dep_delay no -0.4642659 -2 -168 85
## 2 dep_delay yes 49.2538712 34 -49 1831
## 3 distance no 713.9658008 557 21 4962
## 4 distance yes 751.6156694 595 31 4962
## 5 air_time no 100.7641820 83 0 641
## 6 air_time yes 109.7099801 91 0 729
## dep_delay is highly correlated with arr_delay
dat_s %>% dplyr::select(arr_delay, dep_delay) %>% cor()
## arr_delay dep_delay
## arr_delay 1.000000 0.932244
## dep_delay 0.932244 1.000000
`%++%` <- function(a, b) paste(a, b)
month_c_expr <-
"case when split(date, '/')[1] in ('6', '7', '8') then '2'" %++%
"when split(date, '/')[1] in ('1', '2', '3', '12') then '1'" %++%
"else '3' end"
weekday_expr <-
"case date_format(to_date(date, 'yyyy/mm/dd'), 'E')" %++%
"when 'Mon' then '1' when 'Tue' then '2'" %++%
"when 'Wed' then '3' when 'Thu' then '4'" %++%
"when 'Fri' then '5' when 'Sat' then '6'" %++%
"else '7' end"
dep_hour_c_expr <-
"case when cast(floor(dep_time/100) AS integer) <= 3 then '3'" %++%
"when cast(floor(dep_time/100) AS integer) <= 12 then '1'" %++%
"when cast(floor(dep_time/100) AS integer) <= 19 then '2'" %++%
"else '3' end"
dat <- dat %>% dropna(cols = c('arr_delay', 'dep_delay')) %>%
mutate(
month_c = expr(month_c_expr),
weekday = expr(weekday_expr),
dep_hour_c = expr(dep_hour_c_expr),
is_delay = ifelse(expr('arr_delay') > 15, 'yes', 'no')
) %>%
filter(expr('cancelled == 0'))
sel_cols <- c('is_delay', 'dep_delay', 'month_c', 'dep_hour_c', 'weekday')
dat %>% select(sel_cols) %>% head()
is_delay dep_delay month_c dep_hour_c weekday
1 no 7 1 1 1
2 no 13 1 2 1
3 yes 36 1 3 1
4 yes 30 1 1 1
5 no 1 1 1 1
6 no 10 1 2 1
dat_split <- randomSplit(dat, weights = c(0.7, 0.3), seed)
train <- dat_split[[1]]
test <- dat_split[[2]]
formula <- 'is_delay ~ dep_delay + month_c + dep_hour_c + weekday' %>%
as.formula()
model <- spark.randomForest(train, formula, 'classification')
## writing/loading model
# write.ml(model, 's3n://sparkr-demo/model/flight_2007_rf.model')
# model <- read.ml('s3n://sparkr-demo/model/flight_2007_rf.model')
write.ml()
/read.ml()
preds <- predict(model, test)
hpreds <- preds %>% head(50) %>%
dplyr::select(is_delay, prediction, probability, rawPrediction) %>%
dplyr::rename(prob = probability, raw_pred = rawPrediction)
hpreds %>% head()
is_delay prediction prob raw_pred
1 yes no <environment: 0x5e7dfd8> <environment: 0x6cda460>
2 yes yes <environment: 0x5e769a0> <environment: 0x6cdfe88>
3 yes no <environment: 0x5e6ce68> <environment: 0x6ce78c8>
4 no no <environment: 0x5e65460> <environment: 0x6cee580>
5 no no <environment: 0x5e5dca0> <environment: 0x6cf5280>
6 no no <environment: 0x5e55c28> <environment: 0x6cfaf20>
probability
and rawPrediction
are not serializable types
org.apache.spark.mllib.linalg.DenseVector
- further detailsextract_from_jmethod <- function(dat, which='prob') {
envs <- dat[, which]
do.call(rbind, lapply(envs, function(e) {
df <- data.frame(
sparkR.callJMethod(unlist(e), 'apply', as.integer(0)),
sparkR.callJMethod(unlist(e), 'apply', as.integer(1))
)
names(df) <- paste0(which, c('_yes', '_no'))
df
}))
}
hpreds_up <- bind_cols(hpreds,
extract_from_jmethod(hpreds, 'prob'),
extract_from_jmethod(hpreds, 'raw_pred')
) %>% dplyr::select(-prob, -raw_pred)
hpreds_up %>% head()
## is_delay prediction prob_yes prob_no raw_pred_yes raw_pred_no
## 1 yes no 0.7387997 0.2612003 14.775994 5.224006
## 2 yes yes 0.1029412 0.8970588 2.058823 17.941177
## 3 yes no 0.8681795 0.1318205 17.363590 2.636410
## 4 no no 0.8880674 0.1119326 17.761348 2.238652
## 5 no no 0.7650525 0.2349475 15.301049 4.698951
## 6 no no 0.7650525 0.2349475 15.301049 4.698951
cmat <- preds %>% crosstab('is_delay', 'prediction')
cmat
## is_delay_prediction no yes
## 1 yes 154930 365202
## 2 no 1619566 42170
# accuracy
1 - sum(cmat$no[1], cmat$yes[2])/sum(cmat$no, cmat$yes)
## [1] 0.9096646
# importance <- get_feat_importance(model))
ggplot(importance, aes(x=feature, y=importance)) +
geom_bar(stat = 'identity', fill = 'steel blue') +
ggtitle('Feature importance') +
theme(plot.title = element_text(hjust = 0.5),
axis.text.x = element_text(angle = 45, hjust = 1))
get_feat_importance <- function(model) {
s <- summary(model)
features <- s$features %>% unlist()
imp_ext <- stringr::str_extract_all(s$featureImportances, '\\[(.*?)\\]') %>%
unlist()
importance <- imp_ext[length(imp_ext)] %>%
stringr::str_replace_all('[\\[|\\]]', '') %>%
strsplit(',') %>% unlist() %>% as.numeric()
data.frame(feature = features, importance = importance) %>%
dplyr::arrange(-importance) %>% as.tibble()
}