induceBackwardを高速化しようとした

Haskell vs F# - Life Goes Onが気になったのでやってみた。

手元にF#の実行環境がないので、元のコードを2倍高速化することを目標にしてみる。環境はLinux x64, GHC 7.0.4.

最初のコード。

import Data.Array.Unboxed

data Node = Node {
  df :: Double,
  branch :: [(Int, Double)]
  }

induceBackward :: Array Int Node -> UArray Int Double -> UArray Int Double
induceBackward nodes values = accumArray (+) 0 (bounds nodes)
  [(j, p * values ! k * df) | (j, Node df branch) <- assocs nodes, (k, p) <- branch]

iteration = 1000

main :: IO()
main = print (maximum [value i | i <- [1..iteration]])
  where
  value i = foldr induceBackward (lastValues i) testTree ! 0
  lastValues i = listArray (-100, 100) (repeat (fromIntegral i))
  testTree = [listArray (-i, i)
    [Node 1.0 [(j-1, 1.0/6.0), (j, 2.0/3.0), (j+1, 1.0/6.0)] | j <- [-i..i]]
    | i <- [0..99]]

実行時間。

% time ./backward
999.9999999999998
./backward  1.12s user 0.01s system 99% cpu 1.126 total

まず気が付いたのは、一番内側のループ(induceBackward内の「(k, p) <- branch」部分)でリストを辿っていること。これを配列上のfoldにしてしまいたい。そのためにはもう一段外のループ(assocs nodeの部分)を配列上のmapにしたい。しかしこれはArrayからUArrayへの変換なので効率的にやるのは面倒。そこでarrayをやめてvectorパッケージを使うことにする。

vectorパッケージを使うと、vがboxed vectorなら次のようにしてunboxed vectorに変換しつつmapできる。

-- import qualified Data.Vector as V
V.convert (V.map f v)

V.convertはboxed vectorをunboxed vectorに変換する関数だが、融合変換の魔法によってV.mapで作られてV.convertで消費される中間のboxed vectorが排除されるので、全体としてmap一回分のコストで済む。

vectorパッケージのvectorはarrayパッケージの配列とちがって添字のオフセットを指定できないので、それを合わせて型ArrとUArrを作ることにした。

import qualified Data.Vector as V
import qualified Data.Vector.Unboxed as U

data Node = Node {
  df :: Double,
  branch :: [(Int, Double)]
  }

type Arr a = (Int, V.Vector a)
type UArr a = (Int, U.Vector a)

(!) :: (U.Unbox a) => UArr a -> Int -> a
(offset, vec) ! k = vec U.! (k - offset)

induceBackward :: Arr Node -> UArr Double -> UArr Double
induceBackward (nodesOffset, nodes) values = (nodesOffset, newValues)
  where
    newValues = V.convert $ V.map f nodes
    f (Node df branch) = sum [p * values ! k * df | (k, p) <- branch]

iteration = 1000

main :: IO()
main = print (maximum [value i | i <- [1..iteration]])
  where
  value i = foldr induceBackward (lastValues i) testTree ! 0
  lastValues i = (-100, U.replicate 201 (fromIntegral i))
  testTree = [(-i, V.fromList [Node 1.0 [(j-1, 1.0/6.0), (j, 2.0/3.0), (j+1, 1.0/6.0)] | j <- [-i..i]])
    | i <- [0..99]]
./backward2  3.01s user 0.02s system 99% cpu 3.034 total

かなり遅くなったが、気にしないで次にいく。branchをvector型に変更。

data Node = Node { 
  df :: Double,
  branch :: U.Vector (Int, Double)
  } 
f (Node df branch) = U.sum $ U.map (\(k, p) -> p * values ! k * df) branch

この(U.sum $ U.map ...)もやはり融合変換で中間vectorのない形になっていることを期待している。

./backward3  1.15s user 0.01s system 99% cpu 1.163 total

実行時間は最初のコードとほぼ同じに戻った。

induceBackwardの引数valuesが内側のループで頻繁に使われているので正格にする。

induceBackward (nodesOffset, nodes) values@(!_, !_) = (nodesOffset, newValues)
./backward4  0.76s user 0.01s system 99% cpu 0.766 total

この時点でコアを読んでみるが明かな無駄が見つからなかった。Nodeのdfフィールドを正格にしてUNPACKプラグマを付けてみる。

./backward6  0.73s user 0.01s system 99% cpu 0.743 total

同様にbranchフィールドにもUNPACK指定したいところだが、なぜか型族をUNPACKすることはできない。

LLVMバックエンドが速いようなので試してみる。手元のLLVMが新しすぎる(3.0)ためGHC 7.0では対応していないのでGHC 7.4.1をインストールした。

./backward6-7.4.1  0.74s user 0.01s system 99% cpu 0.751 total
./backward6-7.4.1-llvm  0.71s user 0.01s system 99% cpu 0.724 total

このあたりが限界かと思ったが、branchの長さが3固定であることを利用してループをアンロールし、さらに二箇所ある(U.!)をU.unsafeIndexに置き換えることで劇的な高速化ができることに偶然気づいた。

./backward7-7.4.1-llvm  0.39s user 0.00s system 99% cpu 0.401 total

まとめ

2倍を越える高速化は達成したが釈然としない。なぜアンロールにこれほどの効果があるのか分からないし、unsafeIndexが必須なのも気分が悪い。あとはもっと詳しい人に任せたい。

あとF#速いですね。

最終的なコード。

{-# LANGUAGE BangPatterns #-}
import qualified Data.Vector as V
import qualified Data.Vector.Unboxed as U

data Node = Node {
  df :: {-# UNPACK #-} !Double,
  branch :: U.Vector (Int, Double)
  }

type Arr a = (Int, V.Vector a)
type UArr a = (Int, U.Vector a)

(!) :: (U.Unbox a) => UArr a -> Int -> a
(!) (offset, vec) k = vec `U.unsafeIndex` (k - offset)

induceBackward :: Arr Node -> UArr Double -> UArr Double
induceBackward (nodesOffset, nodes) values@(!_, !_) = (nodesOffset, newValues)
  where
    newValues = V.convert $ V.map f nodes
    f (Node df branch) = fold3 0 $ \i s -> case branch `U.unsafeIndex` i of
      (k, p) -> p * values ! k * df + s

fold3 :: a -> (Int -> a -> a) -> a
fold3 x f = f 0 $ f 1 $ f 2 x

iteration = 1000

main :: IO()
main = print (maximum [value i | i <- [1..iteration]])
  where
  value i = foldr induceBackward (lastValues i) testTree ! 0
  lastValues i = (-100, U.replicate 201 (fromIntegral i))
  testTree = [(-i, V.fromList [Node 1.0 $ U.fromList [(j-1, 1.0/6.0), (j, 2.0/3.0), (j+1, 1.0/6.0)] | j <- [-i..i]])
    | i <- [0..99]]

おまけ

保守性とか安全性とか無視して速さを追求したらこうなった。

./backward-fast  0.20s user 0.00s system 99% cpu 0.205 total

ここまでしてやっと、適当に書いたC++コードと同等。

./a.out  0.21s user 0.00s system 99% cpu 0.214 total

コードは以下。

{-# LANGUAGE BangPatterns #-}

import Control.Monad
import Control.Monad.ST
import qualified Data.Primitive as P
import qualified Data.Vector as V
import qualified Data.Vector.Unboxed as U

data Node = Node {
  df :: {-# UNPACK #-} !Double,
  branchIndex :: {-# UNPACK #-} !P.ByteArray{- Int -},
  branchCoefficient :: {-# UNPACK #-} !P.ByteArray{- Double -}
  }

type Arr a = (Int, V.Vector a)
type UArr a = (Int, U.Vector a)

(!) :: (U.Unbox a) => UArr a -> Int -> a
(!) (offset, vec) k = vec `U.unsafeIndex` (k - offset)

induceBackward :: Arr Node -> UArr Double -> UArr Double
induceBackward (nodesOffset, nodes) values@(!_, !_) = (nodesOffset, newValues)
  where
    newValues = V.convert $ V.map f nodes
    f (Node df branchIndex branchCoefficient) = fold3 0 $ \i s ->
      s +
      P.indexByteArray branchCoefficient i *
      values ! P.indexByteArray branchIndex i *
      df

fold3 :: a -> (Int -> a -> a) -> a
fold3 x f = f 0 $ f 1 $ f 2 x

iteration = 1000

main :: IO()
main = print (maximum [value i | i <- [1..iteration]])
  where
  value i = foldr induceBackward (lastValues i) testTree ! 0
  lastValues i = (-100, U.replicate 201 (fromIntegral i))
  testTree = [(-i, V.fromList [Node 1.0 (byteArrayFromList [j-1, j, j+1]) coefficients | j <- [-i..i]])
    | i <- [0..99]]
  coefficients = byteArrayFromList [1.0/6.0, 2.0/3.0, 1.0/6.0::Double]

byteArrayFromList :: (P.Prim a) => [a] -> P.ByteArray
byteArrayFromList xs = runST $ do
  mut <- P.newByteArray (length xs * P.sizeOf (head xs))
  forM_ (zip [0..] xs) $ \(i, v) -> P.writeByteArray mut i v
  P.unsafeFreezeByteArray mut
# include <vector>
# include <utility>
# include <memory>
# include <cstdio>

using namespace std;

struct node
{
  double df;
  vector<pair<int, double> > branch;
};

auto_ptr<vector<double> > induce_backward(const vector<node> &nodes, const vector<double> &values)
{
  const int n_nodes = nodes.size();
  auto_ptr<vector<double> > ret(new vector<double>());
  ret->reserve(n_nodes);
  const int n = values.size() / 2;
  for(int j = 0; j < n_nodes; j++)
  {
    double sum = 0;
    const node &node = nodes[j];
    for(int k = 0; k < 3; k++)
      sum += node.branch[k].second * values[n + node.branch[k].first] * node.df;
    ret->push_back(sum);
  }
  return ret;
}

const int iteration = 1000;

int main()
{
  vector<vector<node> > test_tree;
  for(int i = 0; i < 100; i++)
  {
    test_tree.push_back(vector<node>());
    vector<node> &nodes = test_tree.back();
    for(int j = -i; j <= i; j++)
    {
      nodes.push_back(node());
      node &node = nodes.back();
      node.df = 1.0;
      node.branch.push_back(make_pair(j-1, 1.0/6.0));
      node.branch.push_back(make_pair(j, 2.0/3.0));
      node.branch.push_back(make_pair(j+1, 1.0/6.0));
    }
  }
  double maxval = 0;
  for(int i = 1; i <= iteration; i++)
  {
    auto_ptr<vector<double> > values(new vector<double>(201, (double)i));
    for(int j = 99; j >= 0; j--)
      values = induce_backward(test_tree[j], *values);
    maxval = max(maxval, (*values)[0]);
  }
  printf("%f\n", maxval);
}