induceBackwardを高速化しようとした
Haskell vs F# - Life Goes Onが気になったのでやってみた。
手元にF#の実行環境がないので、元のコードを2倍高速化することを目標にしてみる。環境はLinux x64, GHC 7.0.4.
最初のコード。
import Data.Array.Unboxed data Node = Node { df :: Double, branch :: [(Int, Double)] } induceBackward :: Array Int Node -> UArray Int Double -> UArray Int Double induceBackward nodes values = accumArray (+) 0 (bounds nodes) [(j, p * values ! k * df) | (j, Node df branch) <- assocs nodes, (k, p) <- branch] iteration = 1000 main :: IO() main = print (maximum [value i | i <- [1..iteration]]) where value i = foldr induceBackward (lastValues i) testTree ! 0 lastValues i = listArray (-100, 100) (repeat (fromIntegral i)) testTree = [listArray (-i, i) [Node 1.0 [(j-1, 1.0/6.0), (j, 2.0/3.0), (j+1, 1.0/6.0)] | j <- [-i..i]] | i <- [0..99]]
実行時間。
% time ./backward 999.9999999999998 ./backward 1.12s user 0.01s system 99% cpu 1.126 total
まず気が付いたのは、一番内側のループ(induceBackward内の「(k, p) <- branch」部分)でリストを辿っていること。これを配列上のfoldにしてしまいたい。そのためにはもう一段外のループ(assocs nodeの部分)を配列上のmapにしたい。しかしこれはArrayからUArrayへの変換なので効率的にやるのは面倒。そこでarrayをやめてvectorパッケージを使うことにする。
vectorパッケージを使うと、vがboxed vectorなら次のようにしてunboxed vectorに変換しつつmapできる。
-- import qualified Data.Vector as V
V.convert (V.map f v)
V.convertはboxed vectorをunboxed vectorに変換する関数だが、融合変換の魔法によってV.mapで作られてV.convertで消費される中間のboxed vectorが排除されるので、全体としてmap一回分のコストで済む。
vectorパッケージのvectorはarrayパッケージの配列とちがって添字のオフセットを指定できないので、それを合わせて型ArrとUArrを作ることにした。
import qualified Data.Vector as V import qualified Data.Vector.Unboxed as U data Node = Node { df :: Double, branch :: [(Int, Double)] } type Arr a = (Int, V.Vector a) type UArr a = (Int, U.Vector a) (!) :: (U.Unbox a) => UArr a -> Int -> a (offset, vec) ! k = vec U.! (k - offset) induceBackward :: Arr Node -> UArr Double -> UArr Double induceBackward (nodesOffset, nodes) values = (nodesOffset, newValues) where newValues = V.convert $ V.map f nodes f (Node df branch) = sum [p * values ! k * df | (k, p) <- branch] iteration = 1000 main :: IO() main = print (maximum [value i | i <- [1..iteration]]) where value i = foldr induceBackward (lastValues i) testTree ! 0 lastValues i = (-100, U.replicate 201 (fromIntegral i)) testTree = [(-i, V.fromList [Node 1.0 [(j-1, 1.0/6.0), (j, 2.0/3.0), (j+1, 1.0/6.0)] | j <- [-i..i]]) | i <- [0..99]]
./backward2 3.01s user 0.02s system 99% cpu 3.034 total
かなり遅くなったが、気にしないで次にいく。branchをvector型に変更。
data Node = Node { df :: Double, branch :: U.Vector (Int, Double) }
f (Node df branch) = U.sum $ U.map (\(k, p) -> p * values ! k * df) branch
この(U.sum $ U.map ...)もやはり融合変換で中間vectorのない形になっていることを期待している。
./backward3 1.15s user 0.01s system 99% cpu 1.163 total
実行時間は最初のコードとほぼ同じに戻った。
induceBackwardの引数valuesが内側のループで頻繁に使われているので正格にする。
induceBackward (nodesOffset, nodes) values@(!_, !_) = (nodesOffset, newValues)
./backward4 0.76s user 0.01s system 99% cpu 0.766 total
この時点でコアを読んでみるが明かな無駄が見つからなかった。Nodeのdfフィールドを正格にしてUNPACKプラグマを付けてみる。
./backward6 0.73s user 0.01s system 99% cpu 0.743 total
同様にbranchフィールドにもUNPACK指定したいところだが、なぜか型族をUNPACKすることはできない。
LLVMバックエンドが速いようなので試してみる。手元のLLVMが新しすぎる(3.0)ためGHC 7.0では対応していないのでGHC 7.4.1をインストールした。
./backward6-7.4.1 0.74s user 0.01s system 99% cpu 0.751 total ./backward6-7.4.1-llvm 0.71s user 0.01s system 99% cpu 0.724 total
このあたりが限界かと思ったが、branchの長さが3固定であることを利用してループをアンロールし、さらに二箇所ある(U.!)をU.unsafeIndexに置き換えることで劇的な高速化ができることに偶然気づいた。
./backward7-7.4.1-llvm 0.39s user 0.00s system 99% cpu 0.401 total
まとめ
2倍を越える高速化は達成したが釈然としない。なぜアンロールにこれほどの効果があるのか分からないし、unsafeIndexが必須なのも気分が悪い。あとはもっと詳しい人に任せたい。
あとF#速いですね。
最終的なコード。
{-# LANGUAGE BangPatterns #-} import qualified Data.Vector as V import qualified Data.Vector.Unboxed as U data Node = Node { df :: {-# UNPACK #-} !Double, branch :: U.Vector (Int, Double) } type Arr a = (Int, V.Vector a) type UArr a = (Int, U.Vector a) (!) :: (U.Unbox a) => UArr a -> Int -> a (!) (offset, vec) k = vec `U.unsafeIndex` (k - offset) induceBackward :: Arr Node -> UArr Double -> UArr Double induceBackward (nodesOffset, nodes) values@(!_, !_) = (nodesOffset, newValues) where newValues = V.convert $ V.map f nodes f (Node df branch) = fold3 0 $ \i s -> case branch `U.unsafeIndex` i of (k, p) -> p * values ! k * df + s fold3 :: a -> (Int -> a -> a) -> a fold3 x f = f 0 $ f 1 $ f 2 x iteration = 1000 main :: IO() main = print (maximum [value i | i <- [1..iteration]]) where value i = foldr induceBackward (lastValues i) testTree ! 0 lastValues i = (-100, U.replicate 201 (fromIntegral i)) testTree = [(-i, V.fromList [Node 1.0 $ U.fromList [(j-1, 1.0/6.0), (j, 2.0/3.0), (j+1, 1.0/6.0)] | j <- [-i..i]]) | i <- [0..99]]
おまけ
保守性とか安全性とか無視して速さを追求したらこうなった。
./backward-fast 0.20s user 0.00s system 99% cpu 0.205 total
ここまでしてやっと、適当に書いたC++コードと同等。
./a.out 0.21s user 0.00s system 99% cpu 0.214 total
コードは以下。
{-# LANGUAGE BangPatterns #-} import Control.Monad import Control.Monad.ST import qualified Data.Primitive as P import qualified Data.Vector as V import qualified Data.Vector.Unboxed as U data Node = Node { df :: {-# UNPACK #-} !Double, branchIndex :: {-# UNPACK #-} !P.ByteArray{- Int -}, branchCoefficient :: {-# UNPACK #-} !P.ByteArray{- Double -} } type Arr a = (Int, V.Vector a) type UArr a = (Int, U.Vector a) (!) :: (U.Unbox a) => UArr a -> Int -> a (!) (offset, vec) k = vec `U.unsafeIndex` (k - offset) induceBackward :: Arr Node -> UArr Double -> UArr Double induceBackward (nodesOffset, nodes) values@(!_, !_) = (nodesOffset, newValues) where newValues = V.convert $ V.map f nodes f (Node df branchIndex branchCoefficient) = fold3 0 $ \i s -> s + P.indexByteArray branchCoefficient i * values ! P.indexByteArray branchIndex i * df fold3 :: a -> (Int -> a -> a) -> a fold3 x f = f 0 $ f 1 $ f 2 x iteration = 1000 main :: IO() main = print (maximum [value i | i <- [1..iteration]]) where value i = foldr induceBackward (lastValues i) testTree ! 0 lastValues i = (-100, U.replicate 201 (fromIntegral i)) testTree = [(-i, V.fromList [Node 1.0 (byteArrayFromList [j-1, j, j+1]) coefficients | j <- [-i..i]]) | i <- [0..99]] coefficients = byteArrayFromList [1.0/6.0, 2.0/3.0, 1.0/6.0::Double] byteArrayFromList :: (P.Prim a) => [a] -> P.ByteArray byteArrayFromList xs = runST $ do mut <- P.newByteArray (length xs * P.sizeOf (head xs)) forM_ (zip [0..] xs) $ \(i, v) -> P.writeByteArray mut i v P.unsafeFreezeByteArray mut
# include <vector> # include <utility> # include <memory> # include <cstdio> using namespace std; struct node { double df; vector<pair<int, double> > branch; }; auto_ptr<vector<double> > induce_backward(const vector<node> &nodes, const vector<double> &values) { const int n_nodes = nodes.size(); auto_ptr<vector<double> > ret(new vector<double>()); ret->reserve(n_nodes); const int n = values.size() / 2; for(int j = 0; j < n_nodes; j++) { double sum = 0; const node &node = nodes[j]; for(int k = 0; k < 3; k++) sum += node.branch[k].second * values[n + node.branch[k].first] * node.df; ret->push_back(sum); } return ret; } const int iteration = 1000; int main() { vector<vector<node> > test_tree; for(int i = 0; i < 100; i++) { test_tree.push_back(vector<node>()); vector<node> &nodes = test_tree.back(); for(int j = -i; j <= i; j++) { nodes.push_back(node()); node &node = nodes.back(); node.df = 1.0; node.branch.push_back(make_pair(j-1, 1.0/6.0)); node.branch.push_back(make_pair(j, 2.0/3.0)); node.branch.push_back(make_pair(j+1, 1.0/6.0)); } } double maxval = 0; for(int i = 1; i <= iteration; i++) { auto_ptr<vector<double> > values(new vector<double>(201, (double)i)); for(int j = 99; j >= 0; j--) values = induce_backward(test_tree[j], *values); maxval = max(maxval, (*values)[0]); } printf("%f\n", maxval); }