Heavy Watal

foreach/parallel — 並行処理 in R

foreach

カウンター無しでループを書けるようにするパッケージ。 普段は purrr::map() とかのほうが使いやすいけど、 後述の並列化の場面ではお世話になる。

library(foreach)

slow_square = function(x) {
  Sys.sleep(0.5)
  x * x
}

foreach(x = seq(1, 4), .combine = c) %do% {
  slow_square(x)
}
# 2.0 sec with 1 core

https://CRAN.R-project.org/package=foreach

foreach()

.combine (list)
型が既知でvectorが欲しい場合に c にするなど
.multicombine (FALSE)
結果が出る度に二値関数で結合していくか、まとめてか。 .combine=ccbind を指定すると暗黙TRUE
.maxcombine (100)
まとめる場合の最大個数
.export (NULL)
並列化する場合、処理ブロック内に持ち込みたいオブジェクト
.packages (NULL)
並列化する場合、名前空間省略で使いたいパッケージ
.inorder (TRUE)
並列化する場合、順序を保持したいか

.init, .final, .noexport, .verbose

parallel

Rの並列化では snowmulticore が使われてきたが、 バージョン2.14からそれらを統合した parallel が標準ライブラリに入った。

library(parallel)
options(mc.cores = detectCores(logical = FALSE))  # 4

mclapply(seq(1, 4), slow_square)     # 0.5 sec with 4 cores
lapply(seq(1, 4), slow_square)       # 2.0 sec with 1 cores
purrr::map(seq(1, 4), slow_square)   # 2.0 sec with 1 cores

mclapply()lapply() のお手軽並列化バージョン。 UNIX系OSのforkに依存するためWindows不可。

mclapply(X, FUN, ...,
         mc.preschedule = TRUE, mc.set.seed = TRUE,
         mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L),
         mc.cleanup = TRUE, mc.allow.recursive = TRUE, affinity.list = NULL)

purrr::map()のように無名関数を渡せる ラッパー関数 mcmap() を書いてみた

そのへんをもっとしっかりやった future, future.apply, furrr を使っていくのが良さそう。

もっと細かくいろいろ制御したい場合は foreach (とその橋渡しライブラリdoParallel) を介して使う。 この場合、クラスタの生成や破棄なども自前でやることになる。

library(doParallel)
cluster = makeCluster(getOption("mc.cores", 2L), type = "FORK")
registerDoParallel(cluster)

foreach(x = seq(1, 4)) %dopar% {
  slow_square(x)
}

stopCluster(cluster)

makeCluster()

spec
いくつのworkerを立ち上げるか。 物理コア数を取得するには parallel::detectCores(logical = FALSE)
type = "PSOCK"
デフォルト。高コストだけどだいたいどの環境でも使える。 マルチCPUのサーバーで並列化したい場合はこれ。 foreach() で使う場合 .export=.packages= の指定が重要。
type = "FORK"
4コア1CPUとかの普通のデスクトップマシンで気楽に並列化したいならこれ。 低コストだし .export=.packages= を指定せず foreach() できる。 Windowsでは使えないらしいけど。
outfile = ""
print()message()などの出力先を標準に戻す。 デフォルトでは/dev/nullに捨てられてしまう。

iterators

https://CRAN.R-project.org/package=iterators

大抵はメモリを一気に確保してしまう方が速いが、 データがRAMを超えるほど大きいときはそうも言ってられない。 最大要求メモリを減らしたり、 並列foreachのノード間通信を減らすためにはiteratorsを利用する。

nextElem(it, ...)
イテレータを進めて値を得る。 デバッグ時にとりあえず全部見たいときは as.list(it) が便利。
icount(n)
イテレータ版 seq_len()
icountn(vn)
自然数限定イテレータ版 expand.grid()
iter(obj, by = c("column", "row"))
イテレータ版 purrrlyr::by_row() のようなもので、 並列foreachで各ノードに巨大data.frameを送りたくない場合に有用。 data.frame以外も適用可。 e.g., iter(diamonds, by = "row")
isplit(x, f, drop = FALSE, ...)
イテレータ版 purrrlyr::slice_rows() のようなもので、 fは列名じゃなくてfactor。 data.frame以外も適用可。 e.g., isplit(diamonds, diamonds$cut)
iread.table(file, ..., verbose = FALSE), ireadLines(con, n = 1, ...)
ファイルを1行ずつ読み込む
irbinom(..., count), irnbinom(), irnorm(), irpois(), irunif()
乱数
idiv(n, ..., chunks, chunkSize)
整数nをいい感じに振り分ける。

関連書籍