メメメモモ

プログラミング、筋トレ、ゲーム、etc

qudoのワーカーの起動スクリプトについて調べたこと

以下で紹介されているサンプルを元にワーカーの起動スクリプトを作ったので色々調べてみた。
https://github.com/nekokak/qudo/blob/master/sample/init.d/qudo-worker-sample.pl
(Qudo::Parallel::Managerというものがあることは最近知った。。。)

このスクリプトで使われているいくつかのモジュールについて調べてみた。

Parallel::Prefork

Kazuho@Cybozu Labs: Parallel::Prefork - Perl でマルチプロセスなサーバを書く方法

Parallel::ForkManagerのように、
$pm->start and next; で子プロセスを作り、
$pm->finish; で子プロセスを終了し、
$pm->wait_all_children();ですべての子プロセスの終了を待つ、
という形でマルチプロセスの処理を書ける。

Parallel::Preforkの場合は、シグナル処理前提の処理が書けるモジュール。
以下のコードは使用例。kazuhoさんのサンプルのまんまだけど、コメントを付け足してます。

use strict;
use warnings;
use utf8;
use Parallel::Prefork;
use IO::Socket::INET;


# 受け付けるリクエスト数
sub MaxRequestsPerChild () { 100 }


my $listen_sock = IO::Socket::INET->new(
    Listen => 5,
    LocalAddr => '0.0.0.0:80',
    Proto => 'tcp',
) or die $!;


my $pm = Parallel::Prefork->new({

    # ワーカープロセスの個数
    max_workers => 10,

    # シグナル
    trap_signals => {

        # SIGTERMを受信したらワーカープロセスをSIGTERM
        TERM => 'TERM',

        # SIGHUPを受信したらワーカープロセスをSIGHUP
        HUP => 'TERM',
    }
});

# メインループ、SIGTERMを受信するまでループ
while ($pm->signal_received ne 'TERM') {

    # ワーカープロセス生成処理
    $pm->start and next;

    # 受け付けるリクエスト数
    my $reqs_before_exit = MaxRequestsPerChild;

    # SIGTERMを受信したら、受け付けるリクエスト数を0にしてループを抜ける
    $SIG{TERM} = sub { $reqs_before_exit = 0 };

    # リクエスト受け付けループ
    # reqs_before_exit分だけリクエストを受け取ったら、ループを抜けて終了する
    while ($reqs_before_exit-- > 0) {

        # リクエスト処理
        my $s = $listen_sock->accept();
    }

    # ワーカープロセスの終了処理
    $pm->finish;
}

$pm->wait_all_children;

なんか聞いたことのある処理だなと思ったらStarletだった。
案の定、Plack::Handler::Starletの中身を覗いてみたら、Parallel::Preforkが使われていた。

Sub::Throttle

Kazuho@Cybozu Labs: 実行時間を抑制するモジュール Sub::Throttle を書いた

use strict;
use warnings;
use Sub::Throttle qw(throttle);


my $i = 0;
while (1) {
    throttle(0.0001, sub { warn "test"; });
}

負荷がかかりすぎないように抑制するモジュール。
指定した割合に負荷が抑えられるらしい。
throttle関数の第一引数が低いほど、遅く実行される様子がわかる。

Sys::Syslog

perlからsyslogにログを出力するモジュール。
perl5からのコアモジュールらしい。
syslogについては別途。

use strict;
use warnings;
use Sys::Syslog;


# Syslogオープン
openlog(
    "Project::Worker($$)", # ident
    'cons,pid',            # logopt
    'local6'               # facility
);


syslog(
    'info',                       # priority
    "start project's qudo @ARGV"  # format
);

qudoの起動スクリプト

以上をふまえて、起動スクリプトにコメントを付けていった。

#! /usr/bin/perl
use strict;
use warnings;
use lib qw(/path/to/lib);
use UNIVERSAL::require;
use Sys::Syslog;
use Parallel::Prefork;
use Perl6::Say;
use Carp;
use Sub::Throttle qw/throttle/;
use Module::Find;

# 1ワーカーが受け付けるリクエスト数
sub MaxRequestsPerChild () { 30 }

# syslogをオープン
openlog("Project::Worker($$)", 'cons,pid', 'local6');

# syslogに出力
syslog('info', "start project's qudo @ARGV");

# 引数から受け取ったワーカー名を格納
my @workers = @ARGV;

# 引数がなければProject::Worker::Qudo以下にあるワーカーモジュールを探して格納
unless (@workers) {
    @workers = Module::Find::findallmod('Project::Worker::Qudo');
}
for my $worker (@workers) {
    print "Setting up the $worker\n";
    $worker->use or die $@;
}

say "START WORKING : $$";

# プリフォークマネージャを作成
my $pm = Parallel::Prefork->new({

    # ワーカーの数
    # QUDO_TESTが設定されてた場合は1にする
    max_workers  => $ENV{QUDO_TEST} ? 1 : 5,

    # 現在ないオプション?
    fork_delay   => 1,

    # シグナル処理
    trap_signals => {

        # SIGTERMを受け取ったらワーカープロセスにSIGTERM
        TERM => 'TERM',

        # SIGHUPを受け取ったらワーカープロセスにSIGTERM
        HUP  => 'TERM',
    },
});

# メインループ
# SIGTERMを受け取るまでループは止まらない
while ($pm->signal_received ne 'TERM') {

    # 子プロセスを生成
    $pm->start and next;

    # 子プロセスが生まれた
    say "spawn $$";

    {
        # Qudoマネージャを生成
        require Project::Qudo;
        my $manager = Project::Qudo->new->manager;
        for my $worker (@workers) {
            $manager->can_do($worker);
        }

        # 1ワーカー当たりで処理するリクエスト数をコピー
        my $reqs_before_exit = MaxRequestsPerChild;

        # SIGTERMを受け取ったら、reqs_before_exitを0にして、ループを抜けるようにする
        $SIG{TERM} = sub { $reqs_before_exit = 0 };

        # リクエスト処理ループ
        while ($reqs_before_exit > 0) {

            # 0.5の負荷でワーカー処理が走るようにする
            # work_onceでジョブがあるまでブロック
            if (throttle(0.5, sub { $manager->work_once })) {
                # 処理した
                say "work $$";
                --$reqs_before_exit
            } else {
                # 未処理
                sleep 3
            }
        }
    }

    # 設定されたリクエスト数分だけ処理したか、
    # SIGTERMを受け取ったか、
    # いずれかで子プロセスが終了
    say "FINISHED $$";
    $pm->finish;
}

# 子プロセス終了待ち
$pm->wait_all_children;

warn "should not reach to here ;-)";