ヘッダオンリーで簡単なタスクライブラリhwm.taskを作ってみた。

勉強会での発表

先日Sapporo.cppCLR/Hの合同勉強会を開催し、
僕はC++11のスレッドについて紹介をしました。

この発表では、C++11で導入されたスレッドライブラリの中から、

  • thread
  • mutex/lock
  • future/promise
  • condition_variable

について、それぞれがどんなものかを紹介し、また、それらの機能を使用して簡単なタスクライブラリを作ったという話をしました。

実際のセッションでは、発表時間が足りなくなってしまい、ちゃんとこのhwm.taskを紹介できなかったので、ちょっと残念だったのと、
あと、当日発表したときは未完成なところや微妙なところがもあり、それからもう少し手を加えたりしたので、改めてこのブログで紹介したいと思います。

hwm.task

GitHub - hotwatermorning/hwm.task: minimal task library

  • git cloneしてきた後、libs/exampls/SConstructファイルのgcc(4.8以降とかがいいのかも)の設定を変更してsconsで実行すると、libs/examples/bin/task以下にサンプルコードをコンパイルしたバイナリが生成されます。
  • 発表時と異なり、boostは使わなくなりました。
  • C++11対応コンパイラであれば、hwm.taskはヘッダオンリーで使用できます。
概要

さて、このhwm.taskですが、次のように使用できます。

#include <iostream>
#include <hwm/task/task_queue.hpp>

int main()
{
    //! タスクキュー
    //! キューに積まれた関数や関数オブジェクトを別スレッドで随時取り出して実行する。
    //! 実行するスレッドの数をコンストラクタで指定する。
    hwm::task_queue tq(std::thread::hardware_concurrency());

    std::future<int> f =
        tq.enqueue(
            //! タスクキュー内のスレッドで実行する関数
            //! 関数の戻り値(あるいは例外)が、内部のpromiseオブジェクトに設定される。
            [](int x1, int x2) -> int {
                std::cout << (x1 + x2) << std::endl;
                return x1 + x2;
            },
            //! 関数に渡す引数
            10, 20
        );

    //! タスクの実行結果は、enqueue()から返るfutureオブジェクトを通じて取得できる。
    std::cout << "calculated value : " << f.get() << std::endl;
}
解説

hwm::task_queueクラスが、タスクを複数個保持し、適宜取り出して実行するためのタスクキューの実装です。

別スレッドで実行されて欲しい何らかの処理をtask_queueクラスに追加するには、task_queue::equeue()メンバ関数を使用します。
enqueue()メンバ関数は、std::threadクラスなどと同じように、関数のように呼び出し可能な何かと、その何かに適用する引数を渡せるようになっています。
(注:enqueue()メンバ関数は、発表時のスライドではenqueue_syncという名前でした。)

namespace hwm {

struct task_queue {
    //...
    
    template<class F, class... Args>
    std::future<FにArgsを適用させた戻り値の型>
            enqueue(F&& f, Args&&... args);
    //...
};

}

別スレッドで実行されて欲しい関数や関数オブジェクトと、それに適用される引数を渡してenqueue()メンバ関数を呼び出すと、それらを元に内部でタスクを生成し、そのタスクがタスクキューに追加されます。
追加されたタスクは、タスクキューが管理するいずれかのスレッドによってキューから取り出され、実行されます。
取り出されたタスクの実行結果(渡した関数の戻り値、もしくは送出される例外)は、enqueue()メンバ関数の返り値であるstd::future<>オブジェクトを通じて受け取れます。

task_queueは、各タスクを適宜別スレッドで実行しますが、いくつのスレッドを立ち上げるかは、task_queueクラスのコンストラクタで指定できます。

また、コンストラクタでは、キューに保持できるタスク数の上限も設定できます。
この数を超えてタスクが追加されようとした場合、タスクキューのいずれかのスレッドによってタスクがキューから取り出されるまで、enqueue()メンバ関数の呼び出しはブロックされます。
(各タスクは、キューから取り出されてから実行されるため、enqueue()メンバ関数のブロックが解除されたからといって、その時にキューから取り出されたタスクの実行が終了しているかどうかは不定です。)

namespace hwm {

struct task_queue {

    //! @brief コンストラクタ
    //! @detail 引数に指定された値だけスレッドを起動する
    //! @param thread_limit [in] 起動する引数の数
    //! @param queue_limit [in] キューに保持できるタスク数の限界
    task_queue(size_t thread_limit, size_t queue_limit = ((std::numeric_limits<size_t>::max)()));

    //...
};
}

また、enqueue()メンバ関数の呼び出しはマルチスレッドセーフです。
task_queueとは独立している任意のスレッドから、任意のタイミングでenqueue()メンバ関数を呼び出せます。

hwm::task_queue tq;

void thread_process1()
{
    for(int i = 0; i < 100; ++i) {
        tq.enqueue( /*なんらかのタスクを追加*/ );
    }
}

void thread_process2()
{
    for(int i = 0; i < 100; ++i) {
        tq.enqueue( /*なんらかのタスクを追加*/ );
    }
}

task_queueにいくつかタスクが積まれている場合、wait()メンバ関数を呼び出して、タスクの終了を待機できます。

void add_many_tasks(hwm::task_queue &tq)
{
    for(int i = 0; i < 100; ++i) {
        tq.enqueue([]() -> void { /*なんらかの処理*/ });
    }
}

void foo()
{
    hwm::task_queue tq;
    add_many_tasks(tq);

    // なんらかの処理など

    // すべてのタスクが取り出され、実行が終了するのを待機する。
    tq.wait();
}

同様に、指定時間だけ終了待機をするwait_for()、指定時刻まで終了待機をするwait_until()があります。
(これら待機用の関数は、enqueue()によるタスクの追加をブロックしないため、wait()メンバ関数を呼び出している最中にどんどんタスクが追加されたりすると、いつまでたっても処理が戻りません)

task_queueクラスは、デフォルトではデストラクト時にwait()を呼び出しタスクキューに積まれたタスクがすべて実行され完了するのを待機します。
この挙動を変更し、デストラクト時にwait()を呼び出さず、積まれているタスクをすべて破棄したい場合はtask_queue.set_wait_before_destructed(false)を呼び出します。

wait()しないように設定した場合、実行されなかったタスクは、内部で保持しているpromise対して、エラーコードstd::future_errc::broken_promiseを指定したstd::future_error例外を設定します。
(内部で保持しているpromiseが、なにも値が設定されないまま破棄されたため。)
そのため、対応するfutureのget()メンバ関数を呼び出すと、その例外が再送出され、破棄されたことを検出できます。

int main()
{
    std::cout << ">>> don't wait before destructed" << std::endl;
    std::future<int> f_dont_wait;
    {
        hwm::task_queue tq(1, 2);

        tq.enqueue([]() { std::this_thread::sleep_for(std::chrono::seconds(1)); });
        f_dont_wait = tq.enqueue(
            // この関数はほぼ確実に実行されない。
            [](int a, int b) { return a + b; },
            10, 20);

        tq.set_wait_before_destructed(false);
    }
    std::cout << "<<< don't wait before destructed" << std::endl;

    try {
        std::cout << "10 + 20 = " << f_dont_wait.get() << std::endl;
    } catch(std::future_error &e) { // 実行されるより前にtask_queueがデストラクトされるはず。なので例外が設定されているはず。
        std::cout << e.what() << std::endl; //Broken promiseとかが出力されるはず。
    }
}

こんな感じのtask_queueの使用例を示すサンプルコードは、examplesディレクトリ内にあります。

対応環境

とりあえずGCC 4.8.2ではコンパイルできて動いているのを確認しています。
また、Clangでは試していないです。
C++11の機能としてVariadic Templatesを使用していますし、スレッドライブラリも標準のstd::threadを使用しているため、Visual Studio 2012では使えません。
Visual Studio2012に対応する場合、標準の<thread>, <future>などはbuggyで使えないため、boostのものを使用することになります。可変長引数も、Boost.Preprocessorを使用して対応することになります。
ちょっと面倒でまだちゃんとできてないため、公開はまだ先になります。

最後に

hwm.taskは勉強会での発表のためだけではなく、最近自分でも必要になったため作りました。
ただ作ったとはいいつつ、バグや設計上の工夫点はまだまだあるかもしれません。

(たとえば、task_queueクラスのキューを担うコンテナをテンプレート引数で指定できたり、Allocatorを指定できたりとかですかねー。必要性を感じたらやるかも。)

あと、テストとかドキュメントとかまだまだです。
とりあえず、折角作ったのでちょくちょく使っていきたいです。