インデントで悩まないための単純な指針

Haskellコードを書いていて、インデントを揃えるのが面倒だとか、インデントが揃っているか判別しにくいということがあるかもしれない。以前は私にもよくあったのだが、二つの簡単な指針に従うことに決めてからこの種の問題に悩まされることはなくなった。ので紹介する。

  1. インデントを揃える際は、そこより左には空白しか置かない
  2. インデントは常にkの倍数個の半角スペースで行う。ここでkはプロジェクトごとの定数(以下ではk=2)

具体例

foo x y =
  do runThis
     runThat

と書かずに、

foo x y = do
  runThis
  runThat

と書く。

foo = bar * 2
  where bar = 1 + quux
        quux = log 3

と書かずに、

foo = bar * 2
  where
    bar = 1 + quux
    quux = log 3

と書く。

data Foo a = Foo { fooId :: !Int
                 , fooName :: !String
                 , fooRef :: !(IORef a)
                 }

と書かずに、

data Foo a = Foo
  { fooId :: !Int
  , fooName :: !String
  , fooRef :: !(IORef a)
  }

と書く。

build = runBuilder $ fromInt foo
                     <> fromInt bar
                     <> fromString baz

と書かずに、

build = runBuilder $
  fromInt foo
     <> fromInt bar
     <> fromString baz

と書く。

何が良いか

行頭にスペース以外を置かないので、

  • 識別子の名前変更や、引数の追加/削除によってインデントを変更する必要がない
  • ほとんど必要最小限のインデント量になるので、コードが無駄に横に長くなりにくい。whereの三段ネストくらいなら平気でできる*1
  • Unicodeの記号を使ってもずれない。プロポーショナルフォントですら問題ない

常にkの倍数個のスペースでインデントするので、

  • インデント水準の変更が簡単*2
  • 異なるインデント水準は最低でもk桁違うので見分けやすい

何が悪いか

doの中でletを使うときに微妙に格好悪い。

do
  let
    foo = 1
    bar = 2
  go foo bar

以下のように書きたくなる。(とくにk=4の場合)

do
  let foo = 1
      bar = 2
  go foo bar

また、人によってはwhere節の内部がインデントされすぎると感じる。原則を曲げてwhereをk/2だけインデントする方が好きな人もいるかもしれない。

foo = bar * 2
 where
  bar = 1 + quux
  quux = log 3

*1:whereのネストは読み難いという人もいるみたいだけど

*2:vimなら範囲選択して>とか

オブジェクト指向とは何か、何が良いのか

Haskellオブジェクト指向言語ではないが、コードを書く上でオブジェクト指向の考え方を利用するのが便利なこともあると思うので紹介する。

オブジェクト指向とは何か

オブジェクト指向という言葉に共通定義がないのは共通認識だと思う。気をつけないと議論が発散しがちなので、この記事ではオブジェクト指向の理念については扱わず、オブジェクト指向プログラミングで用いられるテクニックと、オブジェクト指向言語が提供する言語機能について専ら話題にする。オブジェクト指向の特徴として良く言われるのは次のようなものだと思う。

多態
インタフェースが同じだが異なる振る舞いをする異なる種類のオブジェクトを一つのコードで扱う機能。Haskellでは「オブジェクトを操作する関数一式」を受け渡しすることで簡単に実現できる。型クラスを使っても良い。
隠蔽
インタフェースと実装を分離し、実装を外部から見えないようにする。Haskellならモジュールを使えばいい。
継承
良く分かっていないのでパス。
メッセージ渡し
全然分かっていないのでパス。
アイデンティティのあるモノを使ったプログラミング
Haskellでどうやるかがそんなに自明じゃない。この記事ではこれを扱う。

アイデンティティのあるオブジェクトを作成する

Haskellの普通の値、たとえばIntや、dataで定義した型の値にはアイデンティティがない。Eqで比較できるのは内容が一致しているかどうかだけであり、同一の実体かどうかを区別することはできない。

ではHaskellにはアイデンティティのあるオブジェクトが存在しないのだろうか。そんなことはない。たとえばHandleが例になっている。Handle同士の(==)は、二つのHandleが同一の実体であるときにのみTrueになる。もうすこし正確に言うと、Handle型の値自体はアイデンティティのない普通のHaskellの値(ポインタみたいなもの)だが、それが「指し」ているハンドルの実体にはアイデンティティがある。だから、Handle型の値を保存しておけば、そのハンドルが別の場所から操作されて内部状態が変わったとしても、保存しておいたHandleを使って変更後の実体にアクセスすることができる。

Handleのようにアイデンティティのあるものを自分で設計したいときはどうするか。これは簡単で、参照を表現する型がライブラリに用意されているのでそれを使う。

MVar
マルチスレッド下での所有権奪い合いの対応付き。Handleは内部でこれを使っている
IORef
read, write, modifyがアトミックにできるのみだが、比較的速い
TVar
STMモナド上のIORef。STM上なのでなんでもアトミック
STRef
STモナド上のIORef

たとえばIORefを使うなら、オブジェクトの内部状態の型をTとすればIORef Tがオブジェクトを指すポインタのように振る舞う。必要に応じてnewtypeでラップするなどしてカプセル化することができ、そうするとHandleのように抽象型としてのインタフェースを与えることができる。

module MyObject(MyObject, newMyObject, doSomething) where

newtype MyObject = My (IORef MyObjectState)
  -- 構築子Myはエクスポートしない
data MyObjectState = ...内部状態の定義...

-- | MyObjectのインスタンスを作成する
newMyObject :: Foo -> IO MyObject
newMyObject foo = ...

-- | MyObjectを使って何かする
doSomething :: MyObject -> Bar -> IO ()
doSomething (My ref) bar = ...

これは静的型のあるオブジェクト指向言語のクラス定義にだいたい対応している。継承やらstatic変数やらはないが。多態が必要なら簡単に付け足すことができる。(型クラスを使うにせよ関数を直接渡すにせよ、アイデンティティを持つことと多態的に使われることは直交しているので特別な配慮は必要ない)

オブジェクト指向を明示的にサポートしている言語に比べると記述が面倒なので、プログラム全体をこのスタイルで書こうとすると苦しいが、普通はそんなことをする必要はない(大抵、もっと楽な方法がある)。それでも、このスタイルが一番書き易いような場面はたまにあると思う。

アイデンティティを拡大解釈する

「もっと楽な方法」の一つを見ていこう。

オブジェクトにはアイデンティティが必要としても、別に(==)で同一性比較ができることが絶対に必要な訳ではない。例えばHandleが(==)で比較できなかったとしてもそんなに困らない。むしろ本質的なのは、実体と参照の分離だろう。言い換えると、オブジェクトAを参照しているものX(別のオブジェクトでも、スレッドでも)があったとして、Aの内部状態が変ってもXがAを見失わず、変更後のAを観察できることが重要だ。

このことを念頭におくと、オブジェクトへのインタフェースを簡略化することができる。上の例で、MyObjectを使ってできることがdoSomethingしかないのなら、それ相当の関数さえあればMyObjectの全機能をカバーしている。つまり、(Bar -> IO ())という関数そのものをMyObjectの代わりに使うことができる。インタフェースは次のようになる。

-- | MyObjectのインスタンスを作成する
newMyObject :: Foo -> IO (Bar -> IO ())
newMyObject foo = ...必要に応じてIORefを作ったりする...

型を新たに定義する必要も、モジュールを使って実装を隠蔽する必要もない。型が実装に依存していないのでこのままで多態に対応している。欠点は、インタフェースの複雑さがそのまま型の複雑さに反映されるので、複雑なインタフェースを持つオブジェクトを扱いにくい方法であること。

アイデンティティをもっと拡大解釈する

もっと前提を疑ってみよう。観察者たちは初めAの内部状態S0を観察している(隠蔽されているなら間接的に)。ここでなんらかの変化が起きて、その後、全ての観察者がAの内部状態S1を観察するようになる。これが必要な挙動であった。これさえ満たせばAの「実体」なるものが存在する必要もない。

実体なしでどうするかというと、内部状態を表現する型を定義し、それを更新しながら関数間を引き回すだけである。オブジェクトのアイデンティティは、そのオブジェクトを扱う関数の心の中にのみ存在する。実例を挙げる。

module StddevCalculator(Stddevcalculator, initial, addSample, getValue) where

-- | 実数の集まりの標準偏差を計算するオブジェクトの内部状態
data StddevCalculator = SC !Double !Double !Int
  -- 構築子SCはエクスポートしない

-- | 初期状態
initial :: StddevCalculator

-- | 標本を食わせる
addSample :: StddevCalculator -> Double -> StddevCalculator
addSample = ...

-- | 標準偏差を得る
getValue :: StddevCalculator -> Double
getValue = ...

これを例えば次のように使う。

-- | chanから読んだ値を表示していく。Nothingが来たら標準偏差を表示して終了
consume :: Chan (Maybe Double) -> IO ()
consume chan = loop initial -- 初期状態はinitial
  where
    loop !calc = do -- calcが「現在の」オブジェクトの状態だと思う
      mv <- readChan chan
      case mv of
        Just val -> do
          print val
          loop (addSample calc val) -- 以降、addSample calc valを「現在の」状態だと思いなおす
        Nothing -> print $ getValue calc

このスタイルの良さは、IOやSTに依存しないことだ。一方で、観察者の「心の中」をアップデートする必要があるので、観察者がたくさんいたり分散している場合には使えない。

また、この方法はStateモナドと組み合せることで特に効果を発揮する。次のような状況を考える。

-- | アプリのメインモナド
type MyApp a = State MyAppState a
data MyAppState = MyAppState{ ...フィールドたくさん... }

-- アプリを構成する関数群

foo :: Foo -> MyApp Int
foo = ...

bar = ...
baz = ...
...

ここで、fooで得られるDoubleの標準偏差を計算する必要が発生したとしよう。fooはアプリの実行を通して繰り返し呼ばれ、そのたびにDoubleの標本を一個得るとする。必然的に、計算の途中状態はMyAppState内のどこかに保存することになる。いちばん素朴には、MyAppStateに標準偏差計算用のフィールド(Double二個とか)を追加し、fooにそれを更新するコードを含めることで実現できる。

data MyAppState = MyAppState
  { sdSum :: !Double
  , sdSquaredSum :: !Double
  ...フィールドたくさん...
  }
foo = ... 状態更新コード ...

しかしこの設計には改善の余地がある。標準偏差計算用のデータとコードをまとめてStddevCalculatorオブジェクト(上記)として括り出すことで、標準偏差計算のコード+データをそれ以外から分離できる。これは古典的に説かれるオブジェクト指向の恩恵(処理単位で関数を分けるだけでは分離できないものを分離する)と同じものである。

import StddevCalculator

data MyAppState = MyAppState
  { sdCalc :: !Stddevcalculator
  ...フィールドたくさん...
  }
foo = ... addSample ...

もっと

アイデンティティのあるオブジェクト間の相互作用によるプログラミングは、オブジェクト間ネットワークが複雑になってくるとすぐに面倒なことになる。もっと、もっと楽に扱いたい。コンビネータパターンに従って、単純なネットワークを組み合わせて徐々に複雑なネットワークを構成することはできないだろうか。

普通にやるとこれはうまくいかない。オブジェクトのアイデンティティが邪魔をする。たとえばオブジェクトAとオブジェクトBを(何らかの方法で)連結すると、それ以降Aに言及するたびに、それが既にBと連結されていることに注意する必要がある。ネットワークXとYを連結する際には、XとYの両方に含まれているオブジェクトがあると不味いことになるかもしれない。などなど。コンビネータパターンは、個性がなく無限にコピーが可能な値を組み合わせるのには便利だが、状態を持つものを扱うのは得意でない。

そこで、オブジェクトやネットワークを直接扱うのではなく、それらの「設計書」を扱うことにする。設計書は無限にコピー可能であり、アイデンティティはない。例えば、Aを作る設計書をそれ自身と連結すると、Aのコピーを二つ作って連結するような設計書が生まれる。これによって、複雑な回路を矛盾なく作ることが理論上、可能になる。

具体的にどうやるか。関数scanlを考える。scanlにリストを与えて評価すると、入力を読みながら状態を更新し、同時に出力を生成してゆく。がんばって拡大解釈すれば、入力の要素を一個読むごとに内部状態を更新するオートマトンあるいはオブジェクトを作る設計書だと思うことができる。(scanlは特定の状態と結び付いていないので、scanl自体はオブジェクトではない)。同様に、無限リストをとって無限リストを返す関数は全て、一入力一出力で内部状態を持つかもしれないオブジェクトの設計書とみなせる。そして、関数合成(.)は、このような設計書同士を結合するコンビネータだと思うことができる。

しかしこの枠組はあまりに制限がきつい。外界とのIOができないし、オブジェクトが一入力一出力に限定されるのでは複雑なことはなにもできない。これらの制限を解消したのがiteratee、正確にはEnumerateeである。Enumerateeは外界とのIOを扱え、一入力多出力である。Enumerateeは、関数合成に対応する(<><)で結合できる他に、一入力から複数の出力を生成する能力を持ったzip関数を使って組み合わせられる*1。結果として、Enumerateeからなるネットワークは木構造をなし、唯一の入力が根となる。

この一般化をしても一部の単純なネットワークを扱えるに過ぎない。一般のアプリを構成するには多入力多出力のグラフ、最低でもDAGが必要になる。これを提供するのがFRPだ。任意のDAGと、(場合によっては遅延付きの)フィードバックループを扱える。しかし実際に使ってみるとこれでもまだ足りない。単純な例から一歩踏み出すと動的にネットワークを構築する必要が出てくる。この要求に応えるのが高階FRPであり、ネットワークの配線そのものを値として扱うことで動的に変化するネットワークを構築できる。これなら十分に柔軟なのだろうか。これについては私はまだ結論を出せていない。一見するとまだ力が足りないように見えるが、少し工夫すると思わぬことができたりする。気になる人はぜひ自分で実験してみて頂きたい*2

まとめ

いくつか異なる方法を紹介しましたが、得手不得手があるので使い分けると良いのではないかと思います。*3

*1:関数名はiterateeパッケージのもの

*2:FRPパッケージとしてelereaをおすすめします。唯一まともな高階FRPライブラリではないかと思う

*3:FRPの研究が進んで、全てを簡単に書けるようになれば別だが

iterateeとは何か、何が良いのか

iterateeって良く聞くけど何が良いの、と思ってるHaskellユーザのためのメモ。iterateeについては既に日本語の紹介が複数あるが、この記事では実装の詳細に立ち入らず、何が嬉しくてあんな奇妙なインタフェースになっているかについてだけ説明する。具体的なライブラリは使わず、出てくるHaskell風のコードは全て疑似コード。

データ源と処理の分離

iterateeは何をするものかを一言で言うと、データを取得しながら回すループを簡単に書くためのものだ。典型的には、ファイルやソケットからデータを受け取り、それを加工して、画面に出力したり統計を取ったりする。これを素朴に書くと、readやrecvをして、EOFを判定し、加工し、最終的な処理をするまでを一つのループ内で行うことになる。ループが大きくなってくるとこれは嫌なので、ループを分解して、データの取得、加工、最終処理をそれぞれ別々に書いて後で組み合わせる、というのが、iterateeの基本的な考え方だ。各過程を分離することでモジュール性を高めれば、コードが読みやすくなり、変更しやすくなり、単体テストが簡単になり、あわよくば再利用を狙える、という期待である。

データ源と処理を分離するという考えは別に新しくない。最近の言語の多くがイテレータと呼ばれるものを持っている。イテレータはデータ源の抽象化で、リクエストを受けて一個づつ要素を返すもの(pull方式/外部イテレータ)であったり、データを次々読んで指定されたコールバックに渡すもの(push方式/内部イテレータ)だったりする。Google検索によればC++JavaPythonなどが外部イテレータを持ち、Rubyなどが内部イテレータを持つらしい。

Haskellでのiterateeプログラミングは、基本的には内部イテレータ方式と同じものである(ここのトレードオフには後で触れる)。より詳細には、例えばRubyのものと比べて以下のような特徴がある。

  • データ源(iterator)だけでなく処理(iteratee)も明示的に扱う。むしろこちらが主。
  • データ源と処理を分離するのに加えて、これらを加工したり複数合成するといった、コンビネータ的な操作を重視する。これにより、場合によっては既存の部品の組み合せだけでループが完結する。

インタフェースの導出

最も素朴には、処理/iterateeはデータを受け取って何かするもの(e -> IO ())でモデル化できる。

-- | e型の要素を扱う処理
type Iteratee e = e -> IO ()

このインタフェースに合わせてデータ源を定義し、使ってみる。なおHaskellではデータ源をイテレータと呼ばずにenumeratorと呼ぶ習慣なので、以降そのようにする。

-- | e型の要素を生産するデータ源
type Enumerator e = Iteratee e -> IO () -- 処理を受けとり、データを与えつつ繰り返し実行する

-- | Enumeratorの例。ファイルからバイト列を読む
enumBinaryFile :: FilePath -> Enumerator Word8
enumBinaryFile = ...

-- test.pngの各バイトを標準出力に表示する
main = enumBinaryFile "test.png" print -- Prelude.printはそのままIteratee Word8として使える

これで一応、データ源と処理の分離は達成できた。しかしこのままでは不便な点が多すぎる。ループからの途中脱出に対応すべきだし、iterateeはローカルな状態を持てるべきだし、EOFに達したことがiteratee側から分かる仕組みも欲しい。これらは全て、Iteratee型を複雑化させることで対応できる。例えば、途中脱出のためには、Iterateeが継続の意思をBoolで返すようにする、など。

-- | e型の要素を扱う処理
data Iteratee e = ... -- 複雑なデータ構造

これに伴ってprintをそのままIterateeとして扱うことができなくなったので、変換関数を用意する。

-- | 入力の全ての要素についてfを実行するようなiteratee
foreach :: (e -> IO ()) -> Iteratee e
foreach = ...

main = enumBinaryFile "test.png" (foreach print) -- foreachを使ってiterateeを作る

内部状態を持つiterateeも作れる。

-- | 内部状態bを更新しつつ処理するiteratee
foldI :: (b -> e -> IO b) -> b -> Iteratee e
foldI updater initialState = ...

ここまでの拡張で、普通のアプリケーションで必要になるループはほとんど全て、データ源と処理を分離した形で書けるようになった。

Iterateeの逐次合成

いよいよ合成に入る。「xというiterateeを実行してからyというiterateeを実行する」という処理はやはりiterateeになるべきである。

-- | 二つのiterateeの逐次合成。まず左辺を実行し、左辺が終了(つまり途中脱出)したら続けて右辺を実行する
(>>) :: Iteratee e -> Iteratee e -> Iteratee e

これは次のように使う。

-- | n個の要素を読み捨て、その後終了するiteratee
dropI :: Int -> Iteratee e

-- PNGのファイルヘッダ(先頭8バイト)を無視し、それ以降を表示する。
main = enumBinaryFile "test.png" (dropI 8 >> foreach print)

iterateeが終了に際して値を返せるようになれば、逐次合成はずっと柔軟になる。前のiterateeの結果に応じて次のiterateeを決めることができ、これは要するにモナドである。

-- | e型の要素を扱い、最終的にaを生成する処理
data Iteratee e a = ...

-- iterateeは逐次合成に関してモナドをなす
instance Monad (Iteratee e) where
  -- 入力を消費せず、xを最終結果として即座に終了するiteratee
  return x = ...
  -- xを実行し、それが結果vで終了したなら続けてf vを実行するiteratee
  x >>= f = ...

instance MonadIO (Iteratee e) where
  -- 入力を消費せず、アクションaを実行し、
  -- その返り値を最終結果として即座に終了するiteratee
  liftIO a = ...

iterateeが値を返せるようになったので、これを活かすために次のような小さなiteratee群を用意しておく。

-- | 先頭要素を読み、それを返すiteratee。EOFが来たら例外でも投げておく
haedI :: Iteratee e e
headI = ...

-- | 先頭n要素を読む
takeI :: Int -> Iteratee e [e]
takeI n = replicateM n headI

-- | 内部状態bを更新しつつ処理するiteratee。最終状態を返す
foldI :: (b -> e -> IO b) -> b -> Iteratee e b
foldI updater initialState = ...

これを使うと、たとえば次のようなことができる。

-- | PNGの画像サイズを取得するiteratee
pngSizeI :: Iteratee Word8 (Int, Int)
pngSizeI = do
  dropI 8 -- ファイルヘッダを読み飛ばす
  dropI 8 -- IHDRチャンクヘッダを読み飛ばす
  !width <- int32beI
  !height <- int32beI
  return (width, height)

-- | 32bit, big-endian整数を読むiteratee
int32beI :: Iteratee Word8 Int
int32beI = foldl' make 0 <$> takeI 4
  where
    make acc byte = acc * 256 + fromIntegral byte

-- test.pngのサイズを表示する
main = enumBinaryFile "test.png" (pngSizeI >>= liftIO . print)

つまり、headIやtakeIなどの小さなiterateeをモナディックに連結することで、逐次的・手続き的スタイルでiterateeを記述できる。これはforeachやfoldIを使った一括処理スタイルと自由に混合できる。

Enumeratorの逐次合成

iterateeが値を返すことを考慮すると、Enumeratorは次のような定義になる。

-- | e型の要素を生産し、aを最終結果とするiterateeに食わせるデータ源
type Enumerator e a = Iteratee e a -> IO a

次にEnumerator同士の合成を考える。このEnumeratorの定義のままだとちょっとやりにくい(できないことはないが)ので、Enumerator自体がEOFを生成しないように変更する。Enumeratorがデータを生成し終わったら、iterateeにEOFを与えてループを完結させることをせず、その時点のiterateeを返す。

-- | e型の要素を生産し、aを最終結果とするiterateeに食わせるデータ源
type Enumerator e a = Iteratee e a -> IO (Iteratee e a)

実際にループを完結させて最終結果を取りだす関数が必要になる。

-- | iにEOFを送り、結果を取得する
run :: Iteratee e a -> IO a
run i = ...

これは次のように使う。

-- 最終的な実行にはrunが必要
main = enumBinaryFile "test.png" (foreach print) >>= run

Enumeratorの定義を変更したことで、Enumeratorの逐次合成が可能になる。

-- | aからデータを取り出し、それが終わったらbからデータを取り出すEnumerator
cat :: Enumerator e a -> Enumerator e a -> Enumerator e a
cat a b = \iter -> a iter >>= b

これで、小さいEnumeratorを定義し、それを結合するスタイルが可能になる。

-- | 0個の要素を生成する
enumZero :: Enumerator e a
enumZero = return

-- | 一個の要素を生成する
enumOne :: e -> Enumerator e a
enumOne = ...

-- | 指定されたリストを生成する
enumList :: [e] -> Enumerator e a
enumList xs = foldr cat enumZero $ map enumOne xs

ストリームの変換

データ源と処理の二層に分割するだけでなく、データを加工する中間層を持ちたいこともある。これは上の枠組の範囲内で導入できる。データの加工なので直感的にはEnumerator -> Enumeratorの関数にしたくなるが、Iteratee -> Iterateeにした方が単純になる。アイディアは、「加工済みのデータを要求するiteratee、A」を「未加工のデータを要求してAと同じことをするiteratee」に変換できれば、実質的にデータを加工しているのと同じ、というものだ。

-- | output型の入力を要求するiterateeをinput型の入力で駆動できるようにする変換
-- 気分としては、input型のストリームをoutput型のストリームに変換している
type Enumeratee input output a = Iteratee output a -> Iteratee input a
  -- Enumerateeという名称は謎だが、これが慣習

代表的なEnumerateeとしてmapやfilterが書ける。

mapE :: (a -> b) -> Enumeratee a b z
mapE = ...

filterE :: (a -> Bool) -> Enumeratee a a z
filterE = ..

他にも、バイト列を行の列に変換したり、zlib圧縮されたデータを伸長したりといった、ありとあらゆるストリーム変換がEnumerateeの形で書ける。

Enumerateeは変換を表すデータなので、関数的合成が定義できる。

-- | gで変換したストリームをさらにfで変換するような変換
(<><) :: Enumeratee b c z -> Enumeratee a b z -> Enumeratee a c z
f <>< g = g . f

これで、任意の数のデータ加工レイヤを使うことができる。

さらにEnumerateeの逐次合成も考えられる。これはEnumeratorの逐次合成と同じアイディアで、内側の(加工済みデータを要求する)iterateeにEOFを与えず、それを返すようにすることで実現される。

-- | output型の入力を要求するiterateeをinput型の入力で駆動し、EOFを送らずに返す
type Enumeratee input output a = Iteratee output a -> Iteratee input (Iteratee output a)

-- | Enumerateeを使って単純にiterateeを変換する
(=$) :: Enumeratee input output a -> Iteratee output a -> Iteratee input a
enee =$ iter = enee iter >>= liftIO . run

iterateeの並置合成

iterateeの合成にはもう一つある。複数の独立なiterateeを、同一の入力に対して並行して実行する。

-- | 入力をxでもyでも処理するような処理。xとyの両方が終了した時点で終了する。
pairI :: Iteratee e a -> Iteratee e b -> Iteratee e (a, b)
pairI x y = ...

これを使うと、たとえばDouble列の平均を計算するiterateeは次のように書ける。

meanI :: Iteratee Double Double
meanI = make <$> pairI sumI lengthI -- 和と長さを別々に計算し、最後にまとめる
  where
    make (total, size) = total / fromIntegral size

sumI :: (Num a) => Iteratee a a
sumI = foldI (+) 0

lengthI :: Iteratee a Int
lengthI = foldI (\c _ -> c + 1) 0

もっと沢山のiterateeを並置合成することもできる。

-- | 入力をxsのすべてで処理するような処理。xsがすべて終了した時点で終了する。
listI :: [Iteratee e a] -> Iteratee e [a]
listI xs = ...

個人的には、この並置合成を簡単に書ける点こそがHaskell式のiterateeの最大の旨味だと思う。

実際のライブラリインタフェース

http://hackage.haskell.org/package/iterateehttp://hackage.haskell.org/package/enumeratorhttp://hackage.haskell.org/package/iterIOのインタフェースについて簡単に述べる。

これらのライブラリでは、iterateeはIOに限らず任意のモナド上で動作するようになっている。このため、Iterateeがもう一つの型引数mをとる。

-- | モナドm上でe型の要素を扱い、最終的にaを生成する処理
data Iteratee e m a = ...

instance MonadTrans (Iteratee e) where ...

iterateeとiterIOでは、効率のためにiterateeはチャンク単位でデータを受け取る。チャンクは要素のコレクションで、ListLikeクラスのインスタンスであることが要求される。[a]やByteStringがチャンクの型としてよく使われる。

-- | sは要素がelである列を表現するデータである
class ListLike s el | s -> el where ...

-- | モナドm上でs型のチャンクを扱い、最終的にaを生成する処理
data Iteratee s m a = ...

-- | 先頭要素を読み、それを返すiteratee
headI :: (ListLike s el) => Iteratee s m el
headI = ...

ライブラリによってはさらに機能がある。たとえばiterateeが例外を投げられたり、iterateeからenumeratorへ制御メッセージ(たとえばseek request)を送ることが可能だったり、iterateeを別スレッドで並行に実行することが可能だったりする。しかしこれはインタフェースに大して影響しない。

設計のトレードオフ

ここからは個人的な考察。

他の言語のイテレータ機構と比べてHaskellのものはかなり複雑。これはたぶん、コンテナ走査の手段としてよりもIOの手段として考えられていて、アプリケーションのかなり大きな部分を一つのiteratee-enumeratorの枠組で書くことが想定されているので、そのぶん柔軟性が求められているからだろう。コンテナ走査については、Haskellには既にfold(内部イテレータ相当)とtoList(外部イテレータ相当)がある。

内部イテレータ形式、つまりループがiteratee側でなくenumerator側にあるので、enumeratorに結び付いたリソース(ファイルハンドルとか)の管理が簡単。逆にiterateeに結び付いたリソースがあるならちょっと面倒。

iterateeの並置合成が可能な反面、enumerateeの並置合成、つまり複数の独立なデータ源から一度に一個ずつ要素を取って来るようなenumerateeが書けない。外部イテレータ形式だとこれは逆になる。

結局どうなのか

Hackageだとiterateeスタイルのプログラミングはほぼ事実標準と言ってよいと思う。少なくともenumeratorとiterateeの両パッケージが既に実用レベルに達していることは間違いない。しかし改善の余地はまだあると思っている。

一つのデータ源を独立に二通りの方法で加工し、その両方を一つの処理に渡すような形は実現できない。これについては外部イテレータ形式でもできない。実際にこれができなくて困ることが結構あるので、なんとかするべき。

インタフェースが非常に複雑で嫌になる。仕方ない部分もあるが、Enumerator a m bのbパラメタとか要らないような気がする。

Lambda: the Gathering をやった

今年もICFP Programming Contestに参加していた。今回は職場のチームで出てみた。といっても開催日時が悪かったせいでたった二人のチームになってしまったが。相棒のpacakはHaskellerで、本業はISPの経営者らしい。開始前、サーバが欲しいなあと呟いたら、メモリ32Gバイト搭載のマシンをあっさり用意してくれた。結局Gitサーバにしか使わなかったが。チーム名は乱数でyazDual5osになった。

今年の問題はあるカードゲーム*1をプレイするプログラムの作成。面白そうなゲームなのでテンションが上がる。開始+1:00で入出力ライブラリが完成。まともなプレーヤは内部でシミュレーションをするのが必須*2だということが分かったので、シミュレータライブラリを書く。+7:00で一応の完成。+8:00に最初のプレーヤ"nop"(シミュレーションだけして何もしない)を練習サーバに登録。

シミュレータを書いているうちにゲームの仕組みがなんとなく見えてくる。勝利のためには敵のスロットを殺すことが必要で、それにはダメージを与えないといけない。ダメージソースになり得るのは「dec」「attack」「zombie」。手始めに最も扱いが簡単なdecを使ってみることにした。decの与えるダメージは1で、2ターンに一回撃ったとしてもスロット一個(体力1万)を殺すのに2万ターンが必要。ゲームが10万ターンで終了することを考えると遅すぎると思ったので、無限ループを作ることにした。素材はwebで見つけた"S I I (S I I)"。問題文に無限ループのサンプルがあることには気づいていなかった。

任意の関数適用「a b」を作る直接の方法がないことに気づいて絶望しかけるが、すぐに「a (x y)」(ただしxとyはカード)を「S (K a) x y」で表現できることに気づいた。任意の「a b」は、bをスロット0に入れた上で「S (K a) get zero」と書ける。これを自動化して、木構造を入力すると命令列を出力する関数を書く。+12:00あたりでプレーヤ"fill"を提出。無限ループを利用して敵のスロット#255にdecを連発し、このスロットを殺したら次は#254、という単純な動作の上、効率が非常に悪く、8ターンあたり83ダメージしか与えられなかった。さらに練習サーバ上では頻繁にinvalid outputで負けていた。+12:00。

この時点で、スロット#0の重要性を認識する。それどころか過大評価していて、#0が使えないと上述の関数適用ができないと思い込んでいた。(実際には多少のオーバーヘッドを我慢すれば可能)。さらに整数255を作るのに大変なターン数(128を用意してから127回succ)が掛かると誤解していて、最速で敵#0を殺すのはzombie戦術だと考えた。ここで寝る。

"zombie"を提出。基本は"fill"と同じで、#255を殺してから#254に移る前に#255にzombieを仕掛けてhelp 0 0 8192を撃つだけ。やはりinvalid outputでの負けが多い。+22:00ごろ。

zombie戦術の実装中にシミュレータのバグを複数見つけたので、テストすることに。ランダムな式を構築するプレーヤ同士を、公式シミュレータと自作シミュレータでそれぞれ対戦させてゲーム内容をdiffで比較。似たようなことはマーガレットのシミュレータを書いたときにもやったのであっさりできた。見つけたバグを取る。+23:00。

環境を知るために"scout"を提出。普段は何もせず、自分の#0が殺された瞬間にinvalid outputで自殺するだけのプログラム。これがチーム「Wile E.」に45ターンで殺されて衝撃を受ける。やはりdecでは遅すぎでattackを使うべきだということを学ぶ。+24:00。

pacakにinvalid outputについて相談したところ、バッファリングではないかと言われる。標準入出力をNoBufferingにした修正"zombie"は初めての「ちゃんと戦えるプレーヤ」になった。成績は17勝9敗。+26:00。

ここまでのプレーヤは全て、事前に計算された手の列を機械的になぞるだけだったが、reviveを入れるにはメインロジックを一旦中断→revive→復帰、のような動作が必要だった。心機一転して新しいフレームワークを書くことにする。各戦術はMakefileのように依存関係を持ち、準備が整っていれば実行、そうでなければ準備をしなければならない。依存関係を毎ターン再計算することで、理論的には、敵の攻撃でスロット内容が破壊されても復帰できることになる。これをStateモナドをベースにしたTaskモナドとして実装した。しかしTaskモナド上で任意の関数式を効率良く用意するロジック(prepare関数と呼んだ)を書くのには相当苦労して、+32:00で"zombie"をTaskモナドに移植した"zombie1"を提出。

reviveより効率の悪い攻めは撃つだけ不利なので、なるべく高速な攻撃を探したところ、k回help→一回attackを対象をずらしながらループするのを思い付いた。1ターンに30スロットを殺せ、再充填16ターンと試算した。これは当時の水準からすると圧倒的な火力であった。しかしこれには引数を持つ再帰関数の実装が必要。値呼びで使えるSKI上の不動点演算子をひたすらググったりしたが、LtGで実装するにはどれもあまりに複雑だった。途方に暮れていた時、昔見たUnlambdaの公式サイトにループの作り方が載っていたのを突然思い出したので参照してみると、不動点演算子を使わずにself引数を明示的に与えるような実装が紹介してあった*3。これでattack戦術が現実味を帯びてきた。なお、getを使って再帰関数を効率的に表現できることが問題文に書いてあったことに気付いたのはコンテスト終了後。

実装の手が重くなってきたので戦略について夢想する。効率の悪い攻めはreviveされて終わりだが、逆に相手が効率の悪い攻めをしてきたらreviveをしないといけない。他にも自己複製するゾンビを使って毎ターン広範囲reviveやら、指数関数的な大きさの式を構築して敵をメモリ不足orタイムアウトさせる戦術を考えた。就寝。

+48:00ごろ、プレーヤ"attack"のメインロジックが完成した。n=32768のhelpを6回行った後にn=16834のattackで攻撃する。これを反復することで1ターンに最大16スロットを殺せる。詠唱時間は約230ターン。"nop"と対戦すれば600ターンほどで勝利できるはずだった。

しかし、Taskモナド上の未来予測では正しく動作しているものの、実戦のように毎回再計算をさせると無限ループに入って異常動作することが分かる。ここから一日がデバッグに費やされた。prepareのバグを修正し、それによって見つけたバグを修正し、たくさんのアドホックな変更のせいでメンテ不能になったprepareを捨てて「高速堅牢」を目指すqprepareを実装した。qprepareも当然のようにバグっていて、もっと悪いことに小さな設計ミスと大きな設計ミスが一つずつあった。相当やる気を失ったが頑張ってさらにアドホックな回避策(Taskモナドの意味をねじまげる)を実装し、+61:00でついに"attack"を提出。

revive未実装にしては"attack"はそこそこ強かった(9勝3敗)が、対戦結果を見ると単純に速度の差で圧倒されているケースが多いように思った。なんとか詠唱時間を短縮できないか考えたところ、任意の二分木はpushとapplyの列で表現できることに思い当たり、スタックを内臓するオートマトンを作ろうと思い立つ。Haskell風に書くと以下のような感じで、任意のapplyを2ターンで実行できるのが売り。

data Command = Push | Apply
type Machine = Command -> Machine
machine :: [Tree] -> Machine
machine stack Push = machine (get 0 : stack)
machine (x:y:stack) Apply = machine (apply x y : stack)

この定義は大きすぎて、これ自体の詠唱に時間が掛かりすぎることがすぐに分かった。しかし魔法のオートマトンを自作するという発想がとても気に入ったので、これの簡略版を実装したが、結局ほとんど圧縮効果がなく、使いものにはならなかった。

時間切れが迫っていたので、確実に効果があると思われるreviveの実装に取りかかる。深夜3時ごろの回らない頭でゴミ捨て場のようなコードを書いた。なんとか間に合ったので、最終プレーヤ"attack.revive.opt"を公式に提出した。最終的に、"nop"に対して最初のスロットを殺すのに191ターン、勝利に541ターンを要した。おそらく上位30チームでは"nop"を200〜300ターンで殺せるプレーヤが主流なので、"attack.revive.opt"は全く歯が立っていないと思われる。

反省点はたくさんあるが、重大なのは、

  • デバッグに時間を取られすぎ。しかしどうしたら改善できるか良く分からん。
  • 問題文付属のサンプルをちゃんと理解すべきだった。特にgetを介した再帰を思い付かなかったのは痛い。
  • チームワークの欠如。当初は、「最悪、互いに独立にプレーヤを実装して、勝った方を提出すれば良い」と考えていたが、その域にすら達しなかった。提出したコードは100%私が書いたもの。

今年の問題はすごく良かったと思う。実際、参加中は異様に楽しかった(カフェイン摂取の影響かもしれないが)。個人的な好みにも合致していた。それだけに優勝争いにすら加われなかったのは残念。

*1:カードをシャッフルしたり裏返したりしない、デッキを組む必要もなく使ってもなくならない、論理的にはカードである必要はなさそう

*2:そうしないと現在のゲーム状態が分からない

*3:このテクニックはGrassで再帰したいときに便利

GHCのプロファイル出力を読むツールを書いた

(追記(2011-07-30): '記号を含むprofファイルの解析に失敗するバグを修正したものをGitHub - mkotha/viewprof: A viewer of GHC .prof files with curses interfaceに上げた)

+RTS -pで出力される.profファイルには呼び出しグラフが含まれていて大変役に立つが、大きなプログラムだと木構造が一見して分かり辛い。特に、「この関数に時間が掛かっているのは分かったが、コストが高いのはどこから呼ばれる場合か」のようなことを知るのが面倒だ。そこで.profファイルを対話的に閲覧できるツールを書いた。

使い方

  • j: 一行下へ
  • k: 一行上へ
  • Ctrl-d: 半ページ下へ
  • Ctrl-u: 半ページ上へ
  • スペース: 木構造の展開/畳み
  • l: 右へスクロール
  • h: 左へスクロール
  • r: ボトムアップモードへ
  • q: 終了

適当な行にカーソルを合わせてrキーを押すと木構造の親子関係が逆転する。選択されたコスト集約点が根となり、それを呼んでいる各地点が子になる。この状態では、選択されたコスト集約点の時間/空間コストのうち、それぞれの呼び出し地点の寄与分が「contribution」列に表示される。もう一度rキーを押すと元に戻る。

検索機能は未実装。これがないと使い物にならないかもしれないが、とりあえず今日はここまで。

コード

実行にはSBCLとcl-ncursesが必要。初心者なので変なことをしていたら教えてくれると有難い。

#!/usr/bin/sbcl --script

(require 'asdf)
(require 'cl-ncurses)

(defpackage viewprof
  (:use common-lisp)
  (:use cl-ncurses))

(in-package viewprof)

;;;; rose tree

(defstruct rtree
  value
  children) ; list of rtrees

(defun maptree (fn tree)
  (make-rtree
    :value (funcall fn (rtree-value tree))
    :children (mapcar (lambda (sub) (maptree fn sub))
                      (rtree-children tree))))

(defun tree-from-list (list)
  (destructuring-bind (val . children) list
    (make-rtree
      :value val
      :children (mapcar #'tree-from-list children))))

;;;; tree cursor

(defstruct cursor
  current
  parents) ; list of (node . index) conses, deepest first

(defun head-cursor (tree)
  (make-cursor
    :current tree
    :parents nil))

; returns the next visitable node in the tree, or nil
; if there is none
(defun cursor-next (cursor visitable)
  (cursor-next-upwards
    (cons (cons (cursor-current cursor) -1)
          (cursor-parents cursor))
    visitable))

(defun cursor-next-upwards (context visitable)
  (and context
    (destructuring-bind ((node . index) . rest) context
      (let* ((parents (mapcar #'car context))
             (next (position-if
                     (lambda (child)
                       (funcall visitable child parents))
                     (rtree-children node)
                     :start (1+ index))))
        (if next
          (make-cursor
            :current (elt (rtree-children node) next)
            :parents (cons (cons node next) rest))
          (cursor-next-upwards rest visitable))))))

(defun cursor-prev (cursor visitable)
  (let ((context (cursor-parents cursor)))
    (and context
       (destructuring-bind ((node . index) . rest) context
         (cursor-prev-downwards node index rest visitable)))))

(defun cursor-prev-downwards (node end context visitable)
   (let* ((parents (cons node (mapcar #'car context)))
          (prev (position-if
                  (lambda (child)
                    (funcall visitable child parents))
                  (rtree-children node)
                  :end end
                  :from-end t)))
     (if prev
       (cursor-prev-downwards
         (elt (rtree-children node) prev)
         nil
         (cons (cons node prev) context)
         visitable)
       (make-cursor
         :current node
         :parents context))))

(defun cursor-move (cursor count visitable)
  (loop while (< 0 count)
    do (decf count)
       (setf cursor (or (cursor-next cursor visitable) cursor)))
  (loop while (< count 0)
    do (incf count)
       (setf cursor (or (cursor-prev cursor visitable) cursor)))
  cursor)

;;;; .prof parser

(defun unindent (str)
  (let ((pos (or (position-if (lambda (x) (not (eql x #\ ))) str)
                  0)))
    (cons pos (subseq str pos))))

(defun get-unindented (bufstream)
  (if (cdr bufstream)
    (let ((putback (cdr bufstream)))
      (setf (cdr bufstream) nil)
      putback)
    (let ((line (read-line (car bufstream) nil)))
      (when line (unindent line)))))

(defun unget-unindented (bufstream value)
  (setf (cdr bufstream) value)
  nil)

(defun read-trees (input)
  (let ((buf (cons input nil)))
    (loop for tree = (read-subtree buf 0)
          while tree
          collect (car tree))))

(defun read-subtree (input minimum-level)
  (let ((head (get-unindented input)))
    (when head
      (destructuring-bind (head-level . head-str) head
        (if (< head-level minimum-level)
          (unget-unindented input head)
          (let* ((children (read-children input head-level))
                 (tree (make-rtree :value head-str :children children)))
            (cons tree head-level)))))))

(defun read-children (input parent-level)
  (let ((fst (read-subtree input (+ 1 parent-level))))
    (when fst
      (destructuring-bind (fst-subtree . level) fst
        (cons fst-subtree
          (loop for item = (read-subtree input level)
                while item 
                collect (car item)))))))

(defun read-trees-from-prof (input)
  (let ((cost-centre-count 0))
    (loop for line = (read-line input)
          do (when (search "COST CENTRE" line)
               (when (< 1 (incf cost-centre-count))
                 (read-line input); skip an empty line
                 (return (read-trees input)))))))

;;;; internal representation of an entire profile

(defstruct line
  name
  module
  idnum
  count
  individual
  inherited)

(defparameter *case-sensitive-readtable* (copy-readtable nil))
(setf (readtable-case *case-sensitive-readtable*) :preserve)

(defun parse-line (line)
  (let* ((*read-eval* nil)
         (*readtable* *case-sensitive-readtable*)
         (list-str (concatenate 'string "(" line ")"))
         (list (read-from-string list-str)))
    (destructuring-bind
        (name module idnum count idtime idalloc ihtime ihalloc) list
      (make-line
        :name name
        :module module
        :idnum idnum
        :count count
        :individual (cons idtime idalloc)
        :inherited (cons ihtime ihalloc)))))

(defun trees-from-file (file)
  (mapcar (lambda (tree) (maptree #'parse-line tree))
    (with-open-file (input file)
      (read-trees-from-prof input))))

; info on a cost center
(defstruct cc
  name
  module
  key
  occurrences)

; an occurrence in a call tree
(defstruct occurrence
  cc
  count
  parent ; occurrence
  children ; list of occurrences
  individual
  inherited)

; i store the whole information from a profiling report as a graph.
; a graph is represented by a hashtable from keys to ccs
; a key is a symbol of the form:
;   <name> <module>

; build a graph from a set of trees containing lines.
(defun build-graph (trees)
  (let ((table (make-hash-table)))
    (dolist (tree trees)
      (add-tree table tree))
    table))

(defun add-tree (table tree &optional parent)
  (let* ((root (rtree-value tree))
         (rootkey (key-from-line root))
         (cc (or (gethash rootkey table)
                 (setf (gethash rootkey table)
                   (make-cc
                     :name (line-name root)
                     :module (line-module root)
                     :key rootkey
                     :occurrences nil))))
         (occ (new-occurrence parent cc root)))
    (setf (cc-occurrences cc)
          (cons occ (cc-occurrences cc)))
    (dolist (child (rtree-children tree))
      (add-tree table child occ))))

(defun key-from-line (line)
  (make-key
    (line-name line)
    (line-module line)))

(defun make-key (name module)
  (intern (concatenate 'string
            (symbol-name name)
            " "
            (symbol-name module))))

(defun new-occurrence (parent cc line)
  (let ((occ (make-occurrence
               :cc cc
               :count (line-count line)
               :parent parent
               :children nil
               :individual (line-individual line)
               :inherited (line-inherited line))))
    (when parent
      (setf (occurrence-children parent)
            (cons occ (occurrence-children parent))))
    occ))

;;;; tree reversal

(defun parent-tree (root)
  (parent-tree-from
    (mapcar (lambda (occ)
              (cons occ
                (occurrence-inherited occ)))
            (cc-occurrences root))))

(defun parent-tree-from (heads)
  (make-rtree
    :value
      (cons (occurrence-cc (car (car heads)))
            (reduce #'add-timealloc
              (mapcar #'cdr heads)))
    :children
      (let* ((next-heads
               (remove-if (lambda (x) (not (car x)))
                 (mapcar (lambda (x)
                           (cons (occurrence-parent (car x))
                                 (cdr x)))
                   heads)))
             (g (group-on
                  (lambda (x) (occurrence-cc (car x)))
                  next-heads)))
        (mapcar #'parent-tree-from g))))

(defun add-timealloc (x y)
  (destructuring-bind ((a . b) (c . d)) (list x y)
    (cons (+ a c) (+ b d))))

(defun show-parent-tree-item (item)
  (destructuring-bind (cc . timealloc) item
    (format nil "(~A% ~A%) ~A ~A"
      (car timealloc)
      (cdr timealloc)
      (cc-name cc)
      (cc-module cc))))

;;;; curses interface

(defun init-curses ()
  (initscr)
  (start-color)
  (init-pair 1 COLOR_WHITE COLOR_BLACK)
  (init-pair 2 COLOR_BLACK COLOR_WHITE)
  (init-pair 3 COLOR_CYAN COLOR_BLACK)
  (noecho)
  (curs-set 0)
  (bkgd (color-pair 1))
  (clear)
  (move 0 0))

(defun render-tree-part (visitable height cursor cursor-pos)
  (let*
      ((endpoint (cursor-move cursor (- height cursor-pos 1) visitable))
       (startpoint (cursor-move endpoint (- 1 height) visitable)))
    (loop for c = startpoint then (cursor-next c visitable)
          for i from 1 to height
          while c
          collect (cursor-current c))))

(defun fix-cursor-pos (height requested-cursor-pos)
   (cond ((< height 5) 0)
         ((< requested-cursor-pos 2) 2)
         ((< (- height 3) requested-cursor-pos) (- height 3))
         (t requested-cursor-pos)))

(defun tree-view-filtering (view)
  (lambda (node context)
    (declare (ignore node))
    (or (not context)
        (gethash (car context) (tree-view-unfold-table view)))))

(defstruct tree-view
  heading
  ypos
  current
  unfold-table
  height
  width
  cursor-pos)

(defmacro with-color (w color-id &rest body)
  `(progn
     (wattron ,w (color-pair ,color-id))
     ,@body
     (wattroff ,w (color-pair ,color-id))))

(defun tree-view-render (w view)
  (let ((lines
          (render-tree-part
            (tree-view-filtering view)
            (1- (tree-view-height view))
            (tree-view-current view)
            (tree-view-cursor-pos view))))
    (wmove w 0 0)
    (werase w)
    (with-color w 3
      (wprintw w
        (fixed-width-line "" (tree-view-heading view) (tree-view-width view))))
    (dolist (node lines)
      (let ((line (format-tree-view-item
                    (rtree-value node)
                    (gethash node (tree-view-unfold-table view))
                    (consp (rtree-children node))
                    (tree-view-ypos view)
                    (tree-view-width view))))
        (if (eq node (cursor-current (tree-view-current view)))
          (with-color w 2 (wprintw w line))
          (wprintw w line))))))

(defstruct tree-view-item
  name
  module
  infostr
  indentation-level)

(defun format-tree-view-item
    (item unfolded has-children &optional (offset 0) (width 80))
  (let* ((idstr (format nil "~v,0T~A ~A         ~0,13T~A"
                        (tree-view-item-indentation-level item)
                        (cond ((not has-children) " ")
                              (unfolded "-")
                              (t "+"))
                        (tree-view-item-name item)
                        (tree-view-item-module item)))
         (head (subseq idstr (min offset (length idstr))))
         (info (tree-view-item-infostr item)))
    (fixed-width-line head info width)))

(defun fixed-width-line (x y width)
  (let* ((x-length (length x))
         (x-width (- width 1 (length y))))
    (if (<= x-length x-width)
      (format nil "~A~v,0T ~A~%" x x-width y)
      (format nil "~A... ~A~%" (subseq x 0 (- x-width 3)) y))))

(defun make-tree-view-tree (item-maker tree &optional (level 0))
  (make-rtree
    :value (funcall item-maker level (rtree-value tree))
    :children (mapcar
                (lambda (child) (make-tree-view-tree item-maker child (1+ level)))
                (rtree-children tree))))

(defun tree-view-item-from-line (level line)
  (make-tree-view-item
    :name (line-name line)
    :module (line-module line)
    :infostr (format nil "~A  ~6,2F ~6,2F  ~6,2F ~6,2F"
                     (line-count line)
                     (car (line-individual line))
                     (cdr (line-individual line))
                     (car (line-inherited line))
                     (cdr (line-inherited line)))
    :indentation-level level))

(defun tree-view-item-from-parent-tree-item (level pitem)
  (destructuring-bind (cc . (time . alloc)) pitem
    (let ((total-inher (cc-sum #'add-timealloc #'occurrence-inherited cc))
          (total-indiv (cc-sum #'add-timealloc #'occurrence-individual cc)))
    (make-tree-view-item
      :name (cc-name cc)
      :module (cc-module cc)
      :infostr (format nil "~A ~6,2F ~6,2F  ~6,2F ~6,2F  ~6,2F ~6,2F"
                       (cc-sum #'+ #'occurrence-count cc)
                       time
                       alloc
                       (car total-indiv)
                       (cdr total-indiv)
                       (car total-inher)
                       (cdr total-inher))
      :indentation-level level))))

(defun cc-sum (reducer mapper cc)
  (reduce reducer (mapcar mapper (cc-occurrences cc))))

(defun tree-view (tree heading height width)
  (make-tree-view
    :heading heading
    :ypos 0
    :current (head-cursor tree)
    :unfold-table (make-hash-table)
    :height height
    :width width
    :cursor-pos 0))

(defun ui-loop (tree window width height)
  (let* ((base-view (tree-view
                      (make-tree-view-tree #'tree-view-item-from-line tree)
                      "count     individual      inherited"
                      height
                      width))
         (view base-view)
         (graph (build-graph (list tree))))
    (flet ((reverse-tree-view ()
             (let ((cc (gethash (tree-view-current-key view) graph)))
               (when cc
                 (tree-view
                   (make-tree-view-tree
                     #'tree-view-item-from-parent-tree-item
                     (parent-tree cc))
                   "count  contribution     individual      inherited"
                   height
                   width)))))
      (loop do
        (tree-view-render window view)
        (let ((ch (getch)))
          (cond
            ((= ch (char-code #\j)) (tree-view-dn view))
            ((= ch (char-code #\k)) (tree-view-up view))
            ((= ch (ctrl-code #\D)) (tree-view-dn-halfwindow view))
            ((= ch (ctrl-code #\U)) (tree-view-up-halfwindow view))
            ((= ch (char-code #\l)) (tree-view-right view))
            ((= ch (char-code #\h)) (tree-view-left view))
            ((= ch (char-code #\ )) (tree-view-toggle view))
            ((= ch (char-code #\r))
              (setf view
                (if (eq view base-view)
                  (or (reverse-tree-view) view)
                  base-view)))
            ((= ch (char-code #\q)) (return)))))
      nil)))

(defun ctrl-code (char)
  (logandc1 64 (char-code char)))

(defun ui-main (tree)
  (init-curses)
  (ui-loop
    tree
    *stdscr*
    (min 120 (1- *cols*))
    *lines*)
  (endwin))

(defun tree-view-set-cursor-pos (view pos)
  (setf (tree-view-cursor-pos view)
        (fix-cursor-pos (tree-view-height view) pos)))

(defun tree-view-up (view)
  (let ((p (cursor-prev (tree-view-current view) (tree-view-filtering view))))
    (when p
      (setf (tree-view-current view) p)
      (tree-view-set-cursor-pos view (1- (tree-view-cursor-pos view))))))

(defun tree-view-dn (view)
  (let ((p (cursor-next (tree-view-current view) (tree-view-filtering view))))
    (when p
      (setf (tree-view-current view) p)
      (tree-view-set-cursor-pos view (1+ (tree-view-cursor-pos view))))))

(defun tree-view-up-halfwindow (view)
  (tree-view-move-halfwindow #'tree-view-up view))

(defun tree-view-dn-halfwindow (view)
  (tree-view-move-halfwindow #'tree-view-dn view))

(defun tree-view-move-halfwindow (mv view)
  (dotimes (k (floor (/ (tree-view-height view) 2)))
    (funcall mv view)))

(defun tree-view-right (view)
  (setf (tree-view-ypos view)
        (+ 2 (tree-view-ypos view))))

(defun tree-view-left (view)
  (setf (tree-view-ypos view)
        (max 0
             (- (tree-view-ypos view) 2))))

(defun tree-view-toggle (view)
  (tree-view-toggle-unfold
    view
    (cursor-current (tree-view-current view))))

(defun tree-view-toggle-unfold (view node)
  (setf (gethash node (tree-view-unfold-table view))
        (not (gethash node (tree-view-unfold-table view)))))

(defun tree-view-current-key (view)
  (let ((item (rtree-value (cursor-current (tree-view-current view)))))
    (make-key
      (tree-view-item-name item)
      (tree-view-item-module item))))

;;;; utilities

(defun group-on (make-key list)
  (let ((table (make-hash-table)))
    (dolist (item list)
      (let ((key (funcall make-key item)))
        (setf (gethash key table)
              (cons item (gethash key table)))))
    (loop for x being the hash-values in table
          collect x)))

;;;; driver

(defun main (args)
  (if (= (length args) 1)
    (let ((trees (trees-from-file (car args))))
      (if trees
        (ui-main (car trees))
        (format t "viewprof: Failed to read ~A" (car args))))
    (format t "Usage: viewprof.lisp PROG.prof")))

(main (cdr sb-ext:*posix-argv*))

Stricter Haskell

なにこれ

Haskellは素敵な言語だと思うが、デフォルトが遅延評価であることに起因する欠点のせいで魅力を50%くらい損なっているように見える。具体的にはサンク構築/評価のオーバーヘッドとメモリリークで、特にメモリリークの実害は大きい。ならば必要な所以外で評価を遅延させないコーディングをすれば、うっかりメモリリークを作ってしまう確率を減らせるに違いないというのがこの記事の主旨。

既に発生してしまったメモリリークに対処するのは別の問題で、有効な方法も全然違うだろう。

原則

特にそうしない必要のない限り、サンクを作ったらすぐ壊す。つまり、

let x = f y

と書く代わりに、

let !x = f y

と書き、

return (f x)

と書く代わりに

return $! f x

と書く。

評価の責任

基本的に、サンクを作った者がそれを壊す責任を負うことにする。つまり、関数引数は呼び出し側が評価し、関数の結果(に入れ子になった値)は関数側が評価する。

foo :: (Int, Int) -> Maybe Int
foo (arg1, arg2) = Just $! arg2 + 1

bar :: Int -> Int
bar n = fromMaybe 0 $ foo (x, y)
  where
    !x = n * n
    !y = x * x

ただし引数を受け取る側を正格にした方が楽なこともあるので臨機応変に。特に再帰関数の場合など。

loop !x = do
  n <- readLn
  loop (x + n)

また、「作った者が壊す」原則に従うなら構築子に!を付けて正格にする必要はないが、付けた方が後で楽になる場合が多い。

f, g :: Int -> (Int, Int)

-- サンクを返している
f n = (n - 1, n + 1)

-- サンクを明示的に潰す。面倒
g n = (x, y)
  where
    !x = n - 1
    !y = n + 1

-- 正格なデータ構造を使えば…
data Pair a b = Pair !a !b

-- サンクを楽に潰せる
h :: Int -> Pair Int Int
h n = Pair (n - 1) (n + 1)

サンクを返す関数

標準ライブラリを含めてほとんどのHaskell関数は以上の約束に従わないので、サンクを含む値を返してくることがある。良くあるのはIOに関するfmapやap。

do
  x <- negate <$> return 4
  -- ここでxは-4という値ではなく(negate 4)というサンク
  ...

そういう場合は呼び出し元でいちいち潰してやるしかない。

do
  !x <- neagte <$> return 4
  ...

また、一部の関数は潰しにくいサンクを含むデータを返す。筆頭はData.Map.mapだろう。こういうのはそもそも使わずに済ませることができると楽。この場合はData.Map.map fの代わりにData.Map.mapMaybe ((Just$!) . f)を使うという方法がある(これはバッドノウハウの類だろうけど)。

おまけ

レコードからフィールドの値を取り出す際には、パターンマッチを使った方が、フィールド抽出関数を使った場合に比べて潰さなければならないサンクが減ることが多い。しかしレコードのパターンマッチはタイプ量が多くなりがちでつい避けたくなってしまう。そういう場合RecordWildcards拡張を使うとほどよい堕落ができる。

data Record = Record { recFoo :: !Int, recBar :: !Int, recBaz :: !String }

f, g, h :: Record -> (Int, String)
f Record{ recFoo = foo, recBaz = baz } = (foo, baz) -- パターンマッチを使う。面倒
g rec = (foo, baz) -- 抽出関数を使う。サンク潰しが面倒
  where
    !foo = recFoo rec
    !baz = recBaz foo
h Record{..} = (recFoo, recBaz) -- 素敵!

ただしこれは実際に堕落であって、フィールド抽出関数を黙って覆い隠すのでバグの原因になり得る。

foldrが効率的に動作する条件

eagerな言語ではfoldrはリストの長さに比例するスタックを消費する。GHCでも(+)のような正格な関数で畳み込もうとすれば同様にO(n)の振る舞いになる。

{-# OPTIONS_GHC -O0 #-}
import Data.List(foldl')
main = do
  n <- readLn
  print $ foldr (+) 0 [1..n] -- O(n)スタック、O(1)ヒープ
  print $ foldl (+) 0 [1..n] -- O(n)スタック、O(n)ヒープ
  print $ foldl' (+) 0 [1..n] -- O(1)スタック、O(1)ヒープ

一方、foldrを使って効率的に実装できる関数もHaskellには存在する。例えば以下はconcatの立派な定義であり、O(1)空間で動作する*1

concat :: [[a]] -> [a]
concat = foldr (++) []

このようなことが可能な条件は何か。それは、畳み込みに使用する関数(この例では(++))が、第二引数をすぐに評価しようとしないことである。具体的には、それを捨てるか、そのまま返すか、非正格なデータ構造に入れて返すかのどれかを選ぶことになる。

foldrに渡される関数の立場から見てみる。

myfunc = foldr f z [x0, x1, x2...]
  where
    f x y = ...

fがyとして受け取るのは通常の値ではなく、f x1 (f x2 (f x3...))のような未評価の式で、これは「foldrの残りの部分」を表す。つまりyは(無引数の)継続だと思うことができる。この観点から見ると、foldrは継続渡し形式を使ったループを一般化した関数だ。

このように考えると、「第二引数をすぐに評価しようとしない」という制約の意味がはっきりする。

  • yを捨てるのは、すなわち継続を捨てることで、ループからの途中脱出に対応する。
  • yをそのまま返すのは、継続を末尾呼び出しすることで、ループを続けることに対応する。
  • yをテータ構造に入れて返すのは、継続を保存することで、ループの一時中断に対応する。
  • yを加工して返すのは、継続の非末尾呼び出しに対応し、スタックを消費する。

なお、「データ構造に入れて返す」だけでなく「そのまま返す」「捨てる」が許されているので、例えばfindもO(1)空間で動く。

find :: (a -> Bool) -> [a] -> Maybe a
find cond = foldr f Nothing
  where
    f x cont = if cond x then Just x else cont

値を取る継続

fの第二引数を継続だと思った場合、すぐ思い付く一般化は、それが値を取るようにすることだろう。

loop :: (acc -> a -> (acc -> r) -> r) -> acc -> r -> [a] -> r
loop _ _ z [] = z
loop f acc z (x:xs) = f acc x $ \acc' -> loop f acc' z xs

これで、foldlのように値を蓄積しながらループすることが可能になった。「値の蓄積」「途中脱出」の両方が可能という点で、典型的な手続き型言語のループ構文に相当する能力を持っている。

加えて「一時中断」が可能なので、これを利用すると例えばtakeを実装できる。

take :: Int -> [a] -> [a]
take len = loop f len []
  where
    f n x cont
      | n <= 0 = []
      | otherwise = x : cont (n - 1)

難点は複雑なことだが、一ヶ月くらい使っていたら慣れるような気もする。

モナド上のfoldr

Control.Monadには、foldlのモナド版であるfoldMという関数が定義されている。

foldM :: (Monad m) => (a -> b -> m a) -> a -> [b] -> m a
foldM _ acc [] = return acc
foldM f acc (x:xs) = f acc x >>= \acc' -> foldM f acc' xs

これに倣って、foldrのモナド版を定義できる。Data.Foldableをインポートすれば実際に使用可能。

foldrM :: (Monad m) => (a -> b -> m b) -> b -> [a] -> m b
foldrM _ z [] = return z
foldrM f z (x:xs) = foldrM f z xs >>= f x

しかしこの定義は正格な言語のものに近く、IOなどの正格なモナドではfにかかわらずO(n)スタックを消費する。そこで、ここまでの流れに沿ってfの第二引数を継続だと思ってみる。するとこの引数の型はbではなく未実行の計算m bであるべきだろう。

foldrM' :: (Monad m) => (a -> m b -> m b) -> m b -> [a] -> m b
foldrM' _ z [] = z
foldrM' f z (x:xs) = f x (foldrM' f z xs)

foldrの定義と同じになってしまった。つまり、foldrはそのままでモナド上の(途中脱出可能な)ループを表現できる。例として、探索パスのリストから指定された名前の実行可能ファイルを探すIO関数は次のように書ける。

import Control.Applicative
import Control.Exception (tryJust)
import Control.Monad (guard)
import System.Directory hiding (findExecutable)
import System.FilePath
import System.IO.Error (isDoesNotExistError)

findExecutable :: [FilePath] -> String -> IO (Maybe FilePath)
findExecutable dirs name = foldr f (return Nothing) dirs
  where
    f dir cont = do
      found <- isExec file
      if found then return (Just file) else cont
      where file = dir </> name

    isExec file = either (const False) executable <$>
      tryJust (guard . isDoesNotExistError) (getPermissions file)

同様に上で定義したloopもモナド上で使える。たぶんloopが便利なのはモナドが絡んでいる場合だけだろう。

悪ノリ

リストいらなくね?

namedlet :: acc -> (acc -> (acc -> r) -> r) -> r
namedlet acc f = f acc $ \acc' -> namedlet acc' f

main = namedlet 0 $ \n cont ->
  if n > 100
    then return ()
    else do
      ln <- getLine
      putStrLn ln
      cont $ n + length ln

(この記事は2010-08-24を参考にして書きました)

*1:結果のリストを先頭から読み捨てていくという前提で