Heavy Watal

parallel — 並行処理 in R

parallel

Rの並列化では snowmulticore が使われてきたが、 バージョン2.14からそれらを統合した parallel が標準ライブラリに入った。

options(mc.cores = parallel::detectCores(logical = FALSE))

timeit = function(expr, digits = 1) {
  start = Sys.time()
  expr
  diff = Sys.time() - start
  as.numeric(round(diff, digits = digits))
}

slow_square = function(x) {
  Sys.sleep(0.3)
  x * x
}

lapply(seq_len(4L), slow_square) |> timeit()
purrr::map(seq_len(4L), slow_square) |> timeit()
parallel::mclapply(seq_len(4L), slow_square) |> timeit()
[1] 1.2
[1] 1.2
[1] 0.3

mclapply()lapply() のお手軽並列化バージョン。 UNIX系OSのforkに依存するためWindows不可。

mclapply(X, FUN, ...,
         mc.preschedule = TRUE, mc.set.seed = TRUE,
         mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L),
         mc.cleanup = TRUE, mc.allow.recursive = TRUE, affinity.list = NULL)

purrr::map()のように無名関数を渡せる ラッパー関数 mcmap() を書いてみた。 そのへんをもっとしっかりやった future, future.apply, furrr を使っていくのが良さそう。

もっと細かくいろいろ制御したい場合は parLapply() 系の関数や後述の foreach を介して使う。 その場合、makeCluster()/stopCluster() で明示的にクラスタを生成・破棄したり、 子プロセスで使うオブジェクトやパッケージを clusterEvalQ()clusterExport() などで予め宣言したりする必要がある。

cluster = parallel::makeCluster(4L, type = "FORK")
parallel::parLapply(cluster, seq_len(4L), slow_square) |> timeit()
parallel::stopCluster(cluster)
[1] 0.3

makeCluster()

spec
いくつのworkerを立ち上げるか。 物理コア数を取得するには parallel::detectCores(logical = FALSE)
type = "PSOCK"
デフォルト。高コストだけどだいたいどの環境でも使える。 マルチCPUのサーバーで並列化したい場合はこれ。 foreach() で使う場合 .export=.packages= の指定が重要。
type = "FORK"
マルチコアCPUひとつ搭載の普通のパソコンで気楽に並列化したいならこれ。 低コストだし .export=.packages= を指定せず foreach() できる。 Windowsでは使えないらしいけど。
outfile = ""
print()message()などの出力先を標準に戻す。 デフォルトでは/dev/nullに捨てられてしまう。

乱数と再現性

並列化せず親プロセスで実行すれば当然再現性あり:

rint = function(..., n = 65535L, size = 3L) sample.int(n, size)

set.seed(19937)
lapply(seq_len(4L), rint) |> simplify2array()
set.seed(19937)
lapply(seq_len(4L), rint) |> simplify2array()
      [,1]  [,2]  [,3]  [,4]
[1,] 49270 65407  8721 23119
[2,] 19176 60768 65517 62677
[3,] 29429 29809  5210  9959
      [,1]  [,2]  [,3]  [,4]
[1,] 49270 65407  8721 23119
[2,] 19176 60768 65517 62677
[3,] 29429 29809  5210  9959

デフォルト mc.set.seed = TRUE で並列化すると子プロセスがてんでにシードを設定して再現性なし:

set.seed(19937)
parallel::mclapply(seq_len(4L), rint) |> simplify2array()
set.seed(19937)
parallel::mclapply(seq_len(4L), rint) |> simplify2array()
      [,1]  [,2]  [,3]  [,4]
[1,] 62234  3262 12411 62023
[2,] 62793  4436 53581 20557
[3,]  8474 55511  8828 14334
      [,1]  [,2]  [,3]  [,4]
[1,] 44555 57701 37457 48634
[2,] 35817  3149 40988 23286
[3,] 10319 34748 35695 30145

mc.set.seed = FALSE では親プロセスのシードが参照されて再現性こそあるものの、 いくつかの子プロセス(mc.coresの数ずつ?)がセットで同じ乱数列を出してくる:

set.seed(19937)
parallel::mclapply(seq_len(6L), rint, mc.set.seed = FALSE, mc.cores = 2L) |> simplify2array()
      [,1]  [,2]  [,3]  [,4]  [,5]  [,6]
[1,] 49270 49270 65407 65407  8721  8721
[2,] 19176 19176 60768 60768 65517 65517
[3,] 29429 29429 29809 29809  5210  5210

RNGkind("L'Ecuyer-CMRG") を設定する、 もしくは set.seed(19937, "L'Ecuyer-CMRG") のようにシード設定すると mc.set.seed = TRUE の挙動が変わる。 再現性は親プロセスの乱数生成器の状態依存となり、 なおかつ子プロセスがそれぞれ異なる乱数列を出してくれるようになる。 親プロセスの乱数生成器の状態が mclapply() や子プロセスによって更新されないことに注意:

RNGkind("L'Ecuyer-CMRG")
set.seed(19937)
parallel::mclapply(seq_len(4L), rint) |> simplify2array()
parallel::mclapply(seq_len(4L), rint) |> simplify2array()
invisible(runif(1))
parallel::mclapply(seq_len(4L), rint) |> simplify2array()
set.seed(19937)
parallel::mclapply(seq_len(4L), rint) |> simplify2array()
      [,1]  [,2]  [,3]  [,4]
[1,] 33309 19174 22386 46754
[2,] 38519 53475 45877 24809
[3,] 39904 60478 42220 45717
      [,1]  [,2]  [,3]  [,4]
[1,] 33309 19174 22386 46754
[2,] 38519 53475 45877 24809
[3,] 39904 60478 42220 45717
      [,1]  [,2]  [,3]  [,4]
[1,] 17241 10459 58255 18031
[2,] 39348 54919 19631 64554
[3,] 65371 12315 16237  4515
      [,1]  [,2]  [,3]  [,4]
[1,] 33309 19174 22386 46754
[2,] 38519 53475 45877 24809
[3,] 39904 60478 42220 45717

foreach

https://CRAN.R-project.org/package=foreach

カウンター無しでループを書けるようにするパッケージ。 普段は purrr::map() 系関数のほうが使いやすいけど、 並行処理をするために mclapply() で足りない場面では橋渡しライブラリ doParallel とともに助けを借りると少し楽かもしれない。 あとはメモリの制約のために後述の iterators を使うときとか。

library(foreach)
library(doParallel)
cluster = parallel::makeCluster(getOption("mc.cores", 2L), type = "FORK")
registerDoParallel(cluster)

foreach(x = seq_len(4L), .combine = c) %do% {
  slow_square(x)
} |> timeit()

foreach(x = seq_len(4L)) %dopar% {
  slow_square(x)
} |> timeit()

parallel::stopCluster(cluster)
[1] 1.2
[1] 0.3

foreach()

.combine (list)
型が既知でvectorが欲しい場合に c にするなど
.multicombine (FALSE)
結果が出る度に二値関数で結合していくか、まとめてか。 .combine=ccbind を指定すると暗黙TRUE
.maxcombine (100)
まとめる場合の最大個数
.export (NULL)
並列化する場合、処理ブロック内に持ち込みたいオブジェクト
.packages (NULL)
並列化する場合、名前空間省略で使いたいパッケージ
.inorder (TRUE)
並列化する場合、順序を保持したいか

.init, .final, .noexport, .verbose

iterators

https://CRAN.R-project.org/package=iterators

大抵はメモリを一気に確保してしまう方が速いが、 データがRAMを超えるほど大きいときはそうも言ってられない。 最大要求メモリを減らしたり、 並列foreachのノード間通信を減らすためにはiteratorsを利用する。

nextElem(it, ...)
イテレータを進めて値を得る。 デバッグ時にとりあえず全部見たいときは as.list(it) が便利。
icount(n)
イテレータ版 seq_len()
icountn(vn)
自然数限定イテレータ版 expand.grid()
iter(obj, by = c("column", "row"))
イテレータ版 purrrlyr::by_row() のようなもので、 並列foreachで各ノードに巨大data.frameを送りたくない場合に有用。 data.frame以外も適用可。 e.g., iter(diamonds, by = "row")
isplit(x, f, drop = FALSE, ...)
イテレータ版 purrrlyr::slice_rows() のようなもので、 fは列名じゃなくてfactor。 data.frame以外も適用可。 e.g., isplit(diamonds, diamonds$cut)
iread.table(file, ..., verbose = FALSE), ireadLines(con, n = 1, ...)
ファイルを1行ずつ読み込む
irbinom(..., count), irnbinom(), irnorm(), irpois(), irunif()
乱数
idiv(n, ..., chunks, chunkSize)
整数nをいい感じに振り分ける。