Julia - 並列計算用のリダクション演算子を作る

ふと「そういえば、Juliaのリダクション (reduction) は分かりづらかったなぁ」と思い出したので複数の変数をリダクションする方法と自前のリダクション演算子(reduction operator, reducer)の作り方について紹介します。

環境

  • OS: ArchLinux
  • Julia 1.1.0

Julia での for 文の並列化

Julia で手っ取り早く for 文を並列化する方法は for の前に @threads または @distributed をつけることです。(望みの結果を得られるかどうかは別にして)

スレッド並列(スレッド並列では並列数を環境変数で指定する必要がある。)

julia> using Base.Threads

julia> @threads for itr in 1:n
          body
       end

プロセス並列 (Julia v0.6 以前は@parallel を使用したが、 Julia v0.7 から @distributed に変更された)

julia> using Distributed

julia> addprocs() # CPUの論理プロセッサ数分プロセスを立ち上げる

julia> @distributed for itr in 1:n
           body
       end

このとき、スレッド並列では出来ませんが、プロセス並列ではリダクションが使えます。

公式ドキュメントの例

julia> nheads = @distributed (+) for i = 1:200000000
           Int(rand(Bool))
       end
100000133

上の例でのリダクション演算子+ です。今回はこのリダクション演算子に注目していきます。

そもそもリダクションって何?

冒頭から散々リダクションという単語を使っていますが、そもそもリダクションって何でしょう? Juliaの公式ドキュメントには次のようにあります。

Many iterations run independently over several processes, and then their results are combined using some function. The combination process is called a reduction, since it is generally tensor-rank-reducing: a vector of numbers is reduced to a single number, or a matrix is reduced to a single row or column, etc.

Parallel Computing · The Julia Language

 n 人が持っていたものを合体させれば全体としては 1 個になって総数が減る(reduction)でしょ、といった感じでしょうか。

どの変数がリダクションされるの?

私が Julia で初めてリダクションを使った時に思ったことは「どの変数がリダクションされるの?」でした。

たとえば、OpenMPの場合では、「変数 sum+ でリダクションしてください」と明示的に宣言するのでわかりやすいです。

#pragma omp parallel for reduction(+:sum)
for(i=0; i<N; i++) {
    sum += a[i] * b[i];
}

OpenMP* 4.5 による新しいレベルの並列プログラミング p.35 より

一方で上記のコードを Julia で書くと以下のようになります。

sum = @distributed (+) for i in 1:N
    a[i] + b[i]
end

この例では for 文の中に1文しかないのでまだいいですが、もっと for 文の中身が複雑になったらどうすればよいのでしょうか? 例えば、並列化される for 文の中にもう一つ for 文を入れて、その途中にリダクションしたい変数が入っているケースとかはどうでしょう。

sum = @distributed (+) for i in 1:N # 並列化のために分割される for 文
    for j in 1:N # こちらの for 文は分割されない
         ︙
        foo = ... # リダクションしたい変数end
end

なかなか難しそうですが、公式ドキュメントの例の直下に書いてある様に、並列化されている for 文の end の上にリダクションしたい変数をもってくればOKです。すなわち、上記の場合だと以下のようにすれば各プロセスで計算した foo を合算できます。

sum = @distributed (+) for i in 1:N
    for j in 1:N
         ︙
        foo = ... # リダクションしたい変数end
    foo # ここにfooと書くだけ
end

使えるリダクション演算子

OpenMPで使えるリダクション演算子は大抵使えます。 https://computing.llnl.gov/tutorials/openMP/#REDUCTION

logical and と logical or に関してはそのままでは使えませんが、以下のようにすることで同じことができます。

# logical and
julia> @distributed (*)  for i in 1:10
           rand(Bool)
       end

# logical or
julia> @distributed (x,y) -> x||y  for i in 1:10
           rand(Bool)
       end

その他にも使えるものはありますが、リダクション演算子が満たすべき条件の詳細は後述の「自前のリダクション演算子を定義する」で紹介します。

複数の変数をリダクションする

2つの変数をリダクションしたい場合、単純に end の上に a, b と書くとエラーが出ます。

julia> @distributed (+)  for i in 1:10
           a = rand()
           b = randn()
           a, b
       end
ERROR: MethodError: no method matching +(::Tuple{Float64,Float64}, ::Tuple{Float64,Float64})
...

ですが、タプルでなく配列にすると通るようになります。

julia> a, b = rand(10), randn(10);

julia> @distributed (+)  for i in 1:10
           [a[i], b[i]]
       end
2-element Array{Float64,1}:
 3.634179493372805 
 0.7752150063169639

julia> sum(a)
3.634179493372805

julia> sum(b)
0.7752150063169639

この例では配列にするだけで上手く行きましたが、一般にどのような関数ならばリダクション演算子として使えるのか、いまいちピンと来ませんね。

自前のリダクション演算子を定義する

公式ドキュメントには載っていない*1ようなので試行錯誤しながらリダクション演算子が満たすべき条件を探っていきます。 今回は簡単のため + とほぼ同じ働きをするリダクション演算子を作ってみます。

docs.julialang.org

julia> using Distributed

julia> addprocs(2); # ここでは例として2並列とする。

julia> @everywhere function reducer end # メソッドを持たない関数を定義。これを最終的にリダクション演算子にする

julia> @distributed (reducer)  for i in 1:10
           i
       end
ERROR: On worker 2:
MethodError: no method matching reducer(::Int64, ::Int64)
...

他のリダクション演算子からなんとなく想像は付いていましたが、どうやらリダクション演算子は二項演算のようですね。 引数に何が入ってくるのか知りたいので @show 使って見てみましょう。

julia> @everywhere function reducer(x::Int64, y::Int64)
           @show x
           @show y
           
           return nothing
       end

julia> @distributed (reducer)  for i in 1:10
           i
       end
      From worker 2:   x = 1
      From worker 2:   y = 2
      From worker 3:   x = 6
      From worker 3:   y = 7
ERROR: On worker 2:
MethodError: no method matching reducer(::Nothing, ::Int64)
...

reducer の返り値は nothing なので、想像するに reducer(reducer(1,2), 3) という計算をしていそうです。 返り値を Int16 にしてリトライ。

julia> @everywhere function reducer(x::Int64, y::Int64)
           @show x
           @show y
                  
           return Int16(x+y)
       end
WARNING: Method definition reducer(Int64, Int64) in module Main at REPL[5]:2 overwritten at REPL[7]:2.

julia> @distributed (reducer)  for i in 1:10
           i
       end
      From worker 2:   x = 1
      From worker 2:   y = 2
      From worker 3:   x = 6
      From worker 3:   y = 7
ERROR: On worker 2:
MethodError: no method matching reducer(::Int16, ::Int64)
Closest candidates are:
  reducer(::Int64, ::Int64) at REPL[7]:2 (method too new to be called from this world context.)
...

想像があたっている気がしないでもない。 第一引数に Int16 を取る reducer を定義してみます。

julia> @everywhere function reducer(x::Int16, y::Int64)
           @show x
           @show y
                  
           return Int16(x+y)
       end

julia> @distributed (reducer)  for i in 1:10
           i
       end
      From worker 3:   x = 6
      From worker 3:   y = 7
      From worker 2:   x = 1
      From worker 2:   y = 2
      From worker 3:   x = 13
      From worker 3:   y = 8
      From worker 3:   x = 21
      From worker 3:   y = 9
      From worker 3:   x = 30
      From worker 3:   y = 10
      From worker 2:   x = 3
      From worker 2:   y = 3
      From worker 2:   x = 6
      From worker 2:   y = 4
      From worker 2:   x = 10
      From worker 2:   y = 5
ERROR: MethodError: no method matching reducer(::Int16, ::Int16)
Closest candidates are:
  reducer(::Int16, ::Int64) at REPL[9]:2
...

だいぶ進みました。 今までは ERROR: On worker 2: と出ていましたが、今回は worker と出ていないので最後の計算はマスタープロセスで実行されているようです。 reducer にメソッドを追加します。

julia> function reducer(x::Int16, y::Int16) # マスタープロセスのみで定義。
           @show x
           @show y
                  
           return Int16(x+y)
       end
reducer (generic function with 3 methods)

julia> @distributed (reducer)  for i in 1:10
           i
       end
      From worker 3:   x = 6
      From worker 3:   y = 7
      From worker 3:   x = 13
      From worker 2:   x = 1
      From worker 2:   y = 2
      From worker 2:   x = 3
      From worker 2:   y = 3
      From worker 2:   x = 6
      From worker 2:   y = 4
      From worker 2:   x = 10
      From worker 3:   y = 8
      From worker 3:   x = 21
      From worker 3:   y = 9
      From worker 3:   x = 30
      From worker 3:   y = 10
x = 15 # マスタープロセスからの出力
y = 40 # マスタープロセスからの出力
      From worker 2:   y = 5
55

結果として 55 が返ってきたので動作としてはよさげです。

今までのことを図にまとめるとこのような感じでしょうか。

f:id:goropikarikun:20190211175311p:plain

結論

以上の結果より、次のような並列計算をするとき

x = @distributed (reducer)  for i in 1:n
    ...
    f(i) # reduced value
end

f(i) の型を X, reducer(f(i), f(j)) の型を Y とすると、リダクション演算子 reducer は以下の3つのメソッドを定義すればよさそうです。

@everywhere reducer(::X, ::X)
@everywhere reducer(::Y, ::X)
reducer(::Y, ::Y)

大抵の場合においては X = Y だと思うので、reducer(::X, ::X) だけ定義すれば十分だと思いますがトリッキーなことをしたい方は上の3つを定義してください。

ちなみに範囲が 1:1 で、もはや逐次の時は reducer を使わないので f(1) が返ってきます。

最後に配列中の最大値と最小値を自作のリダクション演算子を使って求めてみます。

julia> x = rand(100);

julia> @everywhere function reducer(x::Float64, y::Float64)
           return [max(x,y), min(x,y)]
       end

julia> @everywhere function reducer(v::Vector{Float64}, y::Float64)
           return [max(v[1],y), min(v[2],y)]
       end

julia> function reducer(v1::Vector{Float64}, v2::Vector{Float64})
           return [max(v1[1], v2[1]), min(v1[2], v2[2])]
       end
reducer (generic function with 3 methods)

julia> maxval, minval = @distributed (reducer) for item in x
           item
       end
2-element Array{Float64,1}:
 0.992511684241252  
 0.01586818629402975

julia> maximum(x)
0.992511684241252

julia> minimum(x)
0.01586818629402975

出てきた数値を見る限り望み通りに動いていそうです。

終わりに

一応それっぽいリダクション演算子を作ることができましたが、Julia 本体のソースコードをちゃんと読んだわけではないので、本当に望みのリダクション演算子が作れているのか確証はありません。

make_preduce_body という関数の定義を見る限り、今回紹介した作り方で問題ないと思いますが保証はしません。

github.com

参考

*1:正確にはありますけどわかりづらい