etsavの日記: 非同期でーたふろー型えらとすてねすの篩・改良版 2
日記 by
etsav
// 【変更点】
//
// ○スレッド間のキューを別クラスに切り出しました。
// そもそもそれが欲しかったからこの実験コード書いてたわけでして。こ
// のまま使うわけじゃないけどね。最低限、pthread_cond_timedwait() 使っ
// てタイムアウト処理するだろうし。
//
// ○スレッド数多すぎて作れなくなっても assert() で止まらなくなりました。
// 当然その先の素数は見つけられないけどさ。
//
#include <assert.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <iostream>
#include <queue>
using namespace std;
////////////////////////////////////////////////////////////////////////
// 排他キュークラス
//
// キューの入口と出口が別スレッドなのが前提なのです。シングルスレッ
// ドで使ったら直ぐにブロックしちゃうのです。
////////////////////////////////////////////////////////////////////////
template<class T> class Queue
{
// こんすとらくた・ですとらくた ////////////////////////////////////////
public:
//
// 唯一のこんすとらくた
// length: キューの最大長
//
Queue(int length) :
_length(length)
{
int ret; // 戻り値チェック用
// Mutex とか条件変数とか準備
ret = pthread_mutex_init(&_mutex, NULL);
assert(ret == 0);
ret = pthread_cond_init(&_less, NULL);
assert(ret == 0);
ret = pthread_cond_init(&_more, NULL);
assert(ret == 0);
}
//
// ですとらくた
//
virtual ~Queue()
{
// Mutex とか条件変数とか後片付け
(void) pthread_mutex_destroy(&_mutex);
(void) pthread_cond_destroy(&_less);
(void) pthread_cond_destroy(&_more);
}
// 公開めそっど ////////////////////////////////////////////////////////
public:
//
// キューに値を入れる
// data: キューに入れる値
//
void Enqueue(T data)
{
// キューをロックしまーす
pthread_mutex_lock(&_mutex);
{
// キューに空きが出来るまで待つのです
while (_queue.size() >= _length)
pthread_cond_wait(&_less, &_mutex);
assert(_queue.size() < _length);
// キューに数を入れるのです
_queue.push_back(data);
// 「キューに値が入りましたよ~」と待ってるスレッドに通知
pthread_cond_signal(&_more);
}
// キューのロックを解除しまーす
pthread_mutex_unlock(&_mutex);
}
//
// キューから値を取り出す
// 戻り値: キューから取り出した値
T Dequeue(void)
{
T data; // 取り出す値
// キューをロックしまーす
pthread_mutex_lock(&_mutex);
{
// キューに値が入るまで待つのです
while (_queue.size() == 0)
pthread_cond_wait(&_more, &_mutex);
assert(_queue.size() > 0);
// キューから値を取り出すのです
data = _queue.front();
_queue.pop_front();
// 「キューに空きができましたよ~」と待ってるスレッドに通知
pthread_cond_signal(&_less);
}
// キューのロックを解除しまーす
pthread_mutex_unlock(&_mutex);
// 取り出した値を返すのです
return data;
}
// 非公開でーためんば //////////////////////////////////////////////////
private:
int _length; // キューの最大長
deque<T> _queue; // キュー
pthread_mutex_t _mutex; // キューのロック用 mutex
pthread_cond_t _less; // キューの空き待ち条件変数
pthread_cond_t _more; // キューの値待ち条件変数
};
////////////////////////////////////////////////////////////////////////
// 非同期篩い機能つき素数クラス
//
// 自分自身の倍数を篩い落とすスレッドを持ってる素数クラスなのです。
// 新しい素数が見付かると勝手にインスタンスが増殖するのです。増殖す
// るとスレッドも増えちゃって、もぉ大変な事に……
////////////////////////////////////////////////////////////////////////
class Prime
{
// こんすとらくた・ですとらくた ////////////////////////////////////////
public:
//
// 唯一のこんすとらくた
// n: 自分自身の値(ちゃんと使えば素数が入る)
//
// 引数なしならデフォルト引数で最初の素数 2 が出来るのです。
//
Prime(int n = 2) :
_n(n),
_next(NULL),
_energized(false),
_queue(QUEUE_MAX)
{ /* なにもしない */ }
//
// ですとらくた
//
virtual ~Prime()
{
// 篩いスレッドの終了待ち
if (_energized) pthread_join(_thread, NULL);
// 断末魔で自分自身の値を叫ぶ
cout << _n;
// 次の素数が無いなら改行
if (_next == NULL) { cout << endl; }
// 有るなら空白区切り
else { cout << " "; }
// 次の素数を道連れにする
delete _next;
// delete NULL; ゎ何もしないのよん
}
// 公開めそっど ////////////////////////////////////////////////////////
public:
//
// 篩いスレッド起動
//
// 戻り値:
// true スレッドは起動している(既にしていた場合も含む)
// false スレッド起動失敗
//
bool Energize(void)
{
// まだスレッドが起動していないならスレッド起動
if (!_energized)
_energized = (pthread_create(&_thread, NULL, ThreadCtrlRelay, this) == 0);
// 成否を返す
return _energized;
}
//
// 篩いスレッドに停止要求を出す
//
void Terminate(void)
{
// 自分自身の篩い待ちキューに停止要求コードを渡す
Receive(TERM);
}
//
// 篩い待ちキューに数を渡す
// d: 篩い待ちキューに渡す数
//
void Receive(int d)
{
// 篩いスレッドが起動していなければ何もしない
if (!_energized) return;
// つまり、スレッド数の限界に達するとそれ以降の素数は見つけら
// れないわけ。
// 篩い待ちキューに値を渡す
_queue.Enqueue(d);
}
// すたてぃっくな非公開めそっど ////////////////////////////////////////
private:
//
// スレッド制御リレー関数
// instance: リレー先インスタンスへのポインタ
//
static void* ThreadCtrlRelay(void* instance)
{
// 与えられたインスタンスにリレーするのです
return static_cast<Prime*>(instance)->ThreadCtrl();
// pthread_create() はスレッド制御関数として非スタティックメ
// ンバー関数を直接取れないからねぃ。
}
// 非公開めそっど //////////////////////////////////////////////////////
private:
//
// スレッド制御関数
//
void* ThreadCtrl(void)
{
// <<< 停止要求コードを受け取るまで、受け取った数について >>>
int d;
while ((d = _queue.Dequeue()) != TERM)
{
// 受け取った数が自分の値で割り切れちゃうなら篩い落とし
// 念の為自分の値以下のも
if ((d <= _n) || (d % _n == 0)) continue;
// で、割り切れないなら……
// 次の素数がまだ無ければ
if (_next == NULL)
{
try
{
// 受け取った値は素数だから新しく作るのさ
_next = new Prime(d);
// 作った新しい素数の篩いスレッドを起動
_next->Energize();
}
catch (bad_alloc)
{
// 新しい素数が作れなきゃほっとけ
_next = NULL;
// 無けりゃ無いでその先何もしないだけだし。
}
}
// 既に次の素数が有るなら
else
{
// 受け取った値を渡す
_next->Receive(d);
}
}
// >>> 停止要求コードを受け取るまで、受け取った数について <<<
// 次の素数があったらその篩いスレッドに停止要求を出す
if (_next != NULL) _next->Terminate();
// スレッドおしまい
return NULL;
}
// すたてぃっくな非公開メンバ //////////////////////////////////////////
private:
static const int TERM = -1; // 停止要求コード
static const int QUEUE_MAX = 5; // 篩い待ちキュー最大長
// 非公開メンバ ////////////////////////////////////////////////////////
private:
int _n; // 自分自身の値(素数)
Prime* _next; // 次の素数へのポインタ
bool _energized; // 篩いスレッド起動フラグ
Queue<int> _queue; // 篩い待ちキュー
pthread_t _thread; // 篩いスレッド
};
////////////////////////////////////////////////////////////////////////
// めいん
////////////////////////////////////////////////////////////////////////
int main(int argc, char* argv[])
{
const int MAX = 1000; // MAX までの素数を求めようとする
// あんまし大きくしてもスレッドが作れなくなるので、それ以降の素数
// を見つける事は出来ないのです。たとえばあたしの試した環境だと
// 313 まで(65 個)しか見つけられなかったし。
Prime prime; // 最初の素数
// 最初の素数の篩いスレッドを起動
prime.Energize();
// 篩に小さい順に値を送る
for (int i = 1; i <= MAX; i++) prime.Receive(i);
// 篩いスレッドに停止要求を送る
prime.Terminate();
// おしまい
return 0;
}
//
// ○スレッド間のキューを別クラスに切り出しました。
// そもそもそれが欲しかったからこの実験コード書いてたわけでして。こ
// のまま使うわけじゃないけどね。最低限、pthread_cond_timedwait() 使っ
// てタイムアウト処理するだろうし。
//
// ○スレッド数多すぎて作れなくなっても assert() で止まらなくなりました。
// 当然その先の素数は見つけられないけどさ。
//
#include <assert.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <iostream>
#include <queue>
using namespace std;
////////////////////////////////////////////////////////////////////////
// 排他キュークラス
//
// キューの入口と出口が別スレッドなのが前提なのです。シングルスレッ
// ドで使ったら直ぐにブロックしちゃうのです。
////////////////////////////////////////////////////////////////////////
template<class T> class Queue
{
// こんすとらくた・ですとらくた ////////////////////////////////////////
public:
//
// 唯一のこんすとらくた
// length: キューの最大長
//
Queue(int length) :
_length(length)
{
int ret; // 戻り値チェック用
// Mutex とか条件変数とか準備
ret = pthread_mutex_init(&_mutex, NULL);
assert(ret == 0);
ret = pthread_cond_init(&_less, NULL);
assert(ret == 0);
ret = pthread_cond_init(&_more, NULL);
assert(ret == 0);
}
//
// ですとらくた
//
virtual ~Queue()
{
// Mutex とか条件変数とか後片付け
(void) pthread_mutex_destroy(&_mutex);
(void) pthread_cond_destroy(&_less);
(void) pthread_cond_destroy(&_more);
}
// 公開めそっど ////////////////////////////////////////////////////////
public:
//
// キューに値を入れる
// data: キューに入れる値
//
void Enqueue(T data)
{
// キューをロックしまーす
pthread_mutex_lock(&_mutex);
{
// キューに空きが出来るまで待つのです
while (_queue.size() >= _length)
pthread_cond_wait(&_less, &_mutex);
assert(_queue.size() < _length);
// キューに数を入れるのです
_queue.push_back(data);
// 「キューに値が入りましたよ~」と待ってるスレッドに通知
pthread_cond_signal(&_more);
}
// キューのロックを解除しまーす
pthread_mutex_unlock(&_mutex);
}
//
// キューから値を取り出す
// 戻り値: キューから取り出した値
T Dequeue(void)
{
T data; // 取り出す値
// キューをロックしまーす
pthread_mutex_lock(&_mutex);
{
// キューに値が入るまで待つのです
while (_queue.size() == 0)
pthread_cond_wait(&_more, &_mutex);
assert(_queue.size() > 0);
// キューから値を取り出すのです
data = _queue.front();
_queue.pop_front();
// 「キューに空きができましたよ~」と待ってるスレッドに通知
pthread_cond_signal(&_less);
}
// キューのロックを解除しまーす
pthread_mutex_unlock(&_mutex);
// 取り出した値を返すのです
return data;
}
// 非公開でーためんば //////////////////////////////////////////////////
private:
int _length; // キューの最大長
deque<T> _queue; // キュー
pthread_mutex_t _mutex; // キューのロック用 mutex
pthread_cond_t _less; // キューの空き待ち条件変数
pthread_cond_t _more; // キューの値待ち条件変数
};
////////////////////////////////////////////////////////////////////////
// 非同期篩い機能つき素数クラス
//
// 自分自身の倍数を篩い落とすスレッドを持ってる素数クラスなのです。
// 新しい素数が見付かると勝手にインスタンスが増殖するのです。増殖す
// るとスレッドも増えちゃって、もぉ大変な事に……
////////////////////////////////////////////////////////////////////////
class Prime
{
// こんすとらくた・ですとらくた ////////////////////////////////////////
public:
//
// 唯一のこんすとらくた
// n: 自分自身の値(ちゃんと使えば素数が入る)
//
// 引数なしならデフォルト引数で最初の素数 2 が出来るのです。
//
Prime(int n = 2) :
_n(n),
_next(NULL),
_energized(false),
_queue(QUEUE_MAX)
{ /* なにもしない */ }
//
// ですとらくた
//
virtual ~Prime()
{
// 篩いスレッドの終了待ち
if (_energized) pthread_join(_thread, NULL);
// 断末魔で自分自身の値を叫ぶ
cout << _n;
// 次の素数が無いなら改行
if (_next == NULL) { cout << endl; }
// 有るなら空白区切り
else { cout << " "; }
// 次の素数を道連れにする
delete _next;
// delete NULL; ゎ何もしないのよん
}
// 公開めそっど ////////////////////////////////////////////////////////
public:
//
// 篩いスレッド起動
//
// 戻り値:
// true スレッドは起動している(既にしていた場合も含む)
// false スレッド起動失敗
//
bool Energize(void)
{
// まだスレッドが起動していないならスレッド起動
if (!_energized)
_energized = (pthread_create(&_thread, NULL, ThreadCtrlRelay, this) == 0);
// 成否を返す
return _energized;
}
//
// 篩いスレッドに停止要求を出す
//
void Terminate(void)
{
// 自分自身の篩い待ちキューに停止要求コードを渡す
Receive(TERM);
}
//
// 篩い待ちキューに数を渡す
// d: 篩い待ちキューに渡す数
//
void Receive(int d)
{
// 篩いスレッドが起動していなければ何もしない
if (!_energized) return;
// つまり、スレッド数の限界に達するとそれ以降の素数は見つけら
// れないわけ。
// 篩い待ちキューに値を渡す
_queue.Enqueue(d);
}
// すたてぃっくな非公開めそっど ////////////////////////////////////////
private:
//
// スレッド制御リレー関数
// instance: リレー先インスタンスへのポインタ
//
static void* ThreadCtrlRelay(void* instance)
{
// 与えられたインスタンスにリレーするのです
return static_cast<Prime*>(instance)->ThreadCtrl();
// pthread_create() はスレッド制御関数として非スタティックメ
// ンバー関数を直接取れないからねぃ。
}
// 非公開めそっど //////////////////////////////////////////////////////
private:
//
// スレッド制御関数
//
void* ThreadCtrl(void)
{
// <<< 停止要求コードを受け取るまで、受け取った数について >>>
int d;
while ((d = _queue.Dequeue()) != TERM)
{
// 受け取った数が自分の値で割り切れちゃうなら篩い落とし
// 念の為自分の値以下のも
if ((d <= _n) || (d % _n == 0)) continue;
// で、割り切れないなら……
// 次の素数がまだ無ければ
if (_next == NULL)
{
try
{
// 受け取った値は素数だから新しく作るのさ
_next = new Prime(d);
// 作った新しい素数の篩いスレッドを起動
_next->Energize();
}
catch (bad_alloc)
{
// 新しい素数が作れなきゃほっとけ
_next = NULL;
// 無けりゃ無いでその先何もしないだけだし。
}
}
// 既に次の素数が有るなら
else
{
// 受け取った値を渡す
_next->Receive(d);
}
}
// >>> 停止要求コードを受け取るまで、受け取った数について <<<
// 次の素数があったらその篩いスレッドに停止要求を出す
if (_next != NULL) _next->Terminate();
// スレッドおしまい
return NULL;
}
// すたてぃっくな非公開メンバ //////////////////////////////////////////
private:
static const int TERM = -1; // 停止要求コード
static const int QUEUE_MAX = 5; // 篩い待ちキュー最大長
// 非公開メンバ ////////////////////////////////////////////////////////
private:
int _n; // 自分自身の値(素数)
Prime* _next; // 次の素数へのポインタ
bool _energized; // 篩いスレッド起動フラグ
Queue<int> _queue; // 篩い待ちキュー
pthread_t _thread; // 篩いスレッド
};
////////////////////////////////////////////////////////////////////////
// めいん
////////////////////////////////////////////////////////////////////////
int main(int argc, char* argv[])
{
const int MAX = 1000; // MAX までの素数を求めようとする
// あんまし大きくしてもスレッドが作れなくなるので、それ以降の素数
// を見つける事は出来ないのです。たとえばあたしの試した環境だと
// 313 まで(65 個)しか見つけられなかったし。
Prime prime; // 最初の素数
// 最初の素数の篩いスレッドを起動
prime.Energize();
// 篩に小さい順に値を送る
for (int i = 1; i <= MAX; i++) prime.Receive(i);
// 篩いスレッドに停止要求を送る
prime.Terminate();
// おしまい
return 0;
}
Re:なぜおれは(ガッ) (スコア:1)
日記ネタだから日本語コメントにしましたけど、 あたしは普段は英語でコメント付けてますよん。 頻繁に日本語入力に切り替えるのが面倒(Mac の Cmd + Space に慣れちゃったから)、 ソースコード認識性に優れた日本語フォントが無い、 CVS との相性が悪い、 等の理由で。
Mishawaka 風の日本語フォントがあれば日本語コメントも考えるんだけどなぁ……