Perlスクリプトとの組み合わせ

perl言語でカレント・ディレクトリ内にあるファイルを検索しながらバッチ処理的に Rで計算を行うためのサンプルコードを記載する. ディレクトリには各日次におけるティック価格がsample-20101202.dat と同じ形式で格納されていると仮定する. 1行目は日付(YYYY/mm/dd), 2行目は時間(HH:MM:SS), 3行目はティック価格を示す. またperlスクリプトはmean.pl(このサンプルコードはMacおよびLinuxで実行することを仮定している. Windowsで実行する場合には/bin/ls を dir /s /bに置き換えられたい.), Rスクリプトはmean.Rというファイル名で保存されていると仮定する.


perl mean.pl

をコマンドラインから実行することにより, mean.plからRスクリプトmean.Rがファイル名を引数として実行されカレント・ ディレクトリにある全てのデータの対数価格の平均値がresult.outに格納できる. perlスクリプトではカレントディレクトリにあるファイル名がsample-????????.dat というパターンに合致したファイルを取り出し (ここで?は任意の一文字にマッチするというunixコマンドにおける正規表現を表す), R言語で平均値を計算するmean.Rにファイル名とともに受け渡されている. mean.Rでは指定されたファイルから対数収益率を計算して, その平均値を出力することがなされる.

2010/12/0200:02:31100.32
2010/12/0200:03:31100.35
2010/12/0200:05:21100.39
2010/12/0200:09:03100.41
2010/12/0200:13:21100.90
2010/12/0200:15:21101.01

対数収益率の平均値を計算するRのサンプル(mean.pl)

#!/usr/bin/perl
#ファイルリスト作成
open(LS,"/bin/ls sample-????????.dat |");
open(OUTDATA,"> result.out"); #結果出力ハンドラ作成
while(){
 chop();
 # Rコマンドの実行
 open(R,"R --vanilla --slave --args $_ < ./mean.R |");
 while($r = ){
   printf("%s",$r);
   printf OUTDATA ("%s",$r); #結果の出力
 }
 close(R);
}
close(LS);
close(OUTDATA);

対数収益率の平均値を計算するRのサンプル(mean.R)

#!/usr/bin/R -f
args <- commandArgs()
if(length(args)!=5){
  cat(sprintf("file"))
  q()
}
file <- args[5]
data <- read.table(file)
price <- na.omit(data)
lprice <- diff(log(price[,3]))
cat(sprintf("%s %f\n",file,mean(lprice)))

インデックスに戻る


snowを用いた並列計算の手順

上記で紹介したRスクリプト(mean.R)とperlスクリプト(mean.pl)を改良し, 並列的にプロセスを実行することにより計算のパフォーマンスを上げる方法を紹介する. この準備として, まず並列計算を簡単にRで実装することができる並列計算用ライブラリsnowを紹介する. 執筆現在snowのはCRANのレポジトリからインストールを行うことが可能である.


install.package("snow")

によりパッケージのインストールを行う.

snowではプロセス間通信にMPI, SOCK, PVM, NWSを選択することができる. SOCK通信は特別なライブラリなしで単一コンピュータ上の複数コアでの並列計算に使用することができる. さらに, MPI(Message Passing Interface)を用いることにより, MPIライブラリを用いて大規模なクラスタ並列計算も可能である. このとき, Rmpisnowを組み合わせて使うことになる. Rmpiをインストールするためには計算機にOpenMPIまたはMPICHを別途インストールし RmpiRへインストールすることが必要である.

並列計算を実装するためのsnowの関数群を紹介する (Rossini et al., 2003; Tierney et al., 2004). snowを使って並列計算を実装するコードの手順は以下のとおりである. このように, 並列計算ライブラリsnowによる並列計算ではクラスタを作成し計算準備を行った後, 並列計算を実行するといういたって単純なものである.

 [文献]
Rossini, A. and Tierney, L. and Li, N. (2003) "Simple parallel statistical computing", in R. UW Biostatistics working paper series, Paper 193, University of Washington.
Tierney, L. and Rossini, A. and Li, N. and Sevcikova, H. (2004) "The snow Package: Simple Network of Workstations", Version 0.2-1.

インデックスに戻る


並列計算ライブラリsnow

次にsnowに含まれる主要な関数を簡単なサンプルコードとともに紹介する.

makeCluster()

クラスタ数を設定してクラスタを生成する. クラスタの種類は第二引数typeで選択する. typeとして''SOCK'', ''PVM'', ''MPI'', ''NWS''を選択することができる. その他同様の関数としてmakeSOCKcluster(), makePVMcluster(), makeMPIcluster(), makeNWScluster()がある. マルチコアのパソコンで並列計算を行ないたい場合には, SOCKクラスタを用いる. 以下の例ではSOCK通信を用いて4つのコアからなるクラスタを生成し, sclにクラスタハンドルを格納している.

scl <- makeCluster(4,type="SOCK")
また, スーパーコンピュータシステムなどでは一般にネットワーク・キューイング・システム (Network Queuing System; NQS)を通じて並列計算のバッチジョブを投入する必要がある. Rmpiとsnowを用いた並列計算を実行するためのRスクリプトをバッチジョブとして投入する場合には, makeCluster()をオプションなしで使用し, クラスタハンドルとクラスタ数を以下のように取得する.


scl <- makeCluster()
corenum <- length(clusterCall(scl,function() Sys.info()))

さらに, バッチジョブの投入には R/libs/snow/RMPISNOWを用い, ジョブスクリプト内でRMPISNOWをmpirunを通じて実行することによりバッチジョブを投入する. 例えば, Rscript.rをクラスタ数100で並列実行したい場合には, 以下のようにジョブスクリプトに記載する.


mpirun -np 100 RMPISNOW --vanilla --no-save < Rscript.r > logfile.log

以下, snowを用いた並列計算のための関数の一部を紹介する. 次の例では共通に scl にクラスタハンドルが定義されていると仮定する.

clusterCall()

ユーザが設定した関数が出す返り値が各ノードでどのようになっているかを見る. 初期値を与えるのにも使える.


f1 <- function(n){ return(mean(rnorm(n)))}
clusterCall(scl, f, 100)

clusterExport(scl, list)

リストで示されたマスタのグローバル環境の変数値を各ノードのグローバル環 境に配置する.


myvar <- 666
clusterExport(scl, "myvar")

clusterEvalQ()

各ノードにコマンドを送り返り値を評価する. この例では環境変数HOMEをクラスタ内のコアごとに評価している.


clusterEvalQ(scl,Sys.getenv("HOME"))

clusterApply(scl, seq, function, ...)

seqを順番に1ずつ増やしつつ関数{\bf function}の第一引数に与えながら{\bf function}を並列実行する.


f2 <- function(n,d){return(mean(d*rnorm(n)))}
clusterApply(scl, 1:10, f2, d=0.3)

この例では関数平均0, 標準偏差$d=0.3$の正規乱数を$n$個$(n=1,\ldots,10)$ を生成し平均値を計算している.

clusterApplyLB(scl, seq, function, ...)

clusterApplyLBclusterApplyと同様であるが, よりバランス良くジョブを各ノードに割り当てる. ただしプロセス間通信が多いときは遅くなる.


f2 <- function(n,d){return(mean(d*rnorm(n)))}
clusterApply(scl, 1:10, f2, d=0.3)

parLapply(scl, list, function, ...)

リストで指定された値を割り振りながら関数を実行する. 結果がリストで返ってくる.


f2 <- function(n,d){return(mean(d*rnorm(n)))}
parLapply(scl, 1:10, f2, d=0.3)

parSapply(scl, list, function, ...)

リストで指定された値を割り振りながら関数を実行する. 結果がベクトルか行列で返ってくる.


f2 <- function(n,d){return(mean(d*rnorm(n)))}
parSapply(scl, 1:10, f2, d=0.3)

parRapply(scl, x, function,...)

行列xの行に対して並列化する


x <- matrix(1:10, 2, 5)
f3 <- function(n,d){return(n*d)}
parRapply(scl, x, f3 d=0.3)

この例では, 0.3 x n (n=1,...,10)を行に対して並列的に計算している.

parCapply(scl, x, function, ...)

行列xの列に対して並列化する


x <- matrix(1:10, 2, 5)
f3 <- function(n,d){return(n*d)}
parCapply(scl, x, f3, d=0.3)

この例では, 0.3 x n (n=1,...,10)を列に対して並列的に計算している.

stopCluster()

クラスタハンドルsclを解放する.


stopCluster(scl)

ここで紹介した関数はsnowの一部である. より詳しくはsnowのマニュアル http://cran.r-project.org/web/packages/snow/snow.pdfを参照されたい.

インデックスに戻る


並列計算の例

次に実際にsnowRmpiを使って並列計算を行うためのサンプルコード parallel-mean.Rとparallel-mean.plを紹介する (Rmpiが利用できない環境においてもMPIクラスタを構成する代わりに SOCKクラスタを構成することにより並列計算を行うことができる). このサンプルコードはsnowRmpi を用いて平均値を並列計算で求めるプログラムのサンプルである. parallel-mean.Rでは4コアのMPIクラスタを作成し平均値を計算する関数 nopt.mean()clusterApplyLB()関数を用いて並列的に実行している.

parallel-mean.pl(このサンプルコードはMacおよびLinuxで実行することを仮定している. Windowsで実行する場合には/bin/ls を dir /s /bに置き換えられたい) ではカレント・ディレクトリにあるデータ・ファイルのリストを作りRスクリプトの引数に与えて実行している. parallel-mean.plを実行すると平均値が表示されresult.outに計算結果が格納されることを確認してほしい. また, clusterApplyLB()関数を用いるとクラスタサイズに応じて適切なプログラムの並列実行がなされる. このサンプルコード内の計算負荷nopt.mean()関数を書き換えるだけでプッシュ・プル型の多くの並列計算を行うための雛形としてparallel-mean.Rは利用できる. snowを使うことにより並列計算の実行は極めて単純なコードのみで可能となる.

parallel-mean.pl

#!/usr/bin/perl
#ファイルリスト作成
open(OUTDATA,"> result.out"); #結果出力ハンドラ作成
open(LS,"/bin/ls sample-????????.dat |");
$fl="";
while(){
 chop();
 $fl=$fl." ".$_;
}
close(LS);
# Rコマンドの実行
open(R,"R --vanilla --slave --args$fl<./parallel-mean.R |");
$r=;
while($r=){ # 結果を書き出す
    print OUTDATA $r;
    print $r;
}
close(OUTDATA);

parallel-mean.R

args <- commandArgs()
if(length(args)<5){
  cat(sprintf("file"))
  q()
}
#クラスタのコア数を設定 
corenum <- 4
#
library(snow)
library(Rmpi)
#ファイルリストをコマンド引数から得る
filelist <- c()
for(i in 5:length(args)){
      filelist <- c(filelist,args[i])
}
# MPIクラスタの初期化
scl <- makeCluster(corenum,"MPI")
.Last <- function() { stopCluster(scl) }
# 平均値を計算する関数を定義
nopt.mean <- function(file){
  if(!is.null(file)){
    data <- read.table(file)
    price <- na.omit(data)
    lprice <- diff(log(price[,3]))
    nopt.mean <- sprintf("%s %f",file,mean(lprice))
  }
  else{
    nopt.mean <- NULL
  }
}
#平均値を並列計算で得る
l<-clusterApplyLB(scl,filelist,nopt.mean)
#得られた結果を表示する
for(j in 1:length(l)){
    cat(sprintf("%d %s\n",j,l[j]))
}

インデックスに戻る


(C)2016 Takaki Hayashi and Aki-Hiro Sato. All rights reserved. 無断転用を禁ず