Pythonでジョブキューシステムを作った

Pythonで標準ライブラリのみを使ってシンプルなジョブキューシステムを作りました。キューはRedisを使わずにインメモリに保存します。

capture

この記事ではQueickの特徴と使い方、ジョブキューシステムについての説明、よく使われているシステムでの実装、Queickのアーキテクチャと実装について説明します。

0. きっかけ

以前Raspberry PiNFCリーダーとSlackで研究室の入退室管理(打刻)システムを作ったのですが、Raspberry Piネットが途切れてSlackに打刻されないことが1-2週間に1回ありました。

moriokalab.com

そこでジョブキューシステムを導入して非同期にリトライする仕組みを組み込もうと思いいくつか探したのですが、基本的にキューとしてredisを使っているようでした。

Raspberry Piの中にRedis入れて運用するのが嫌でインメモリで閉じたものが欲しくなったため作りました。

正直、論文終わって少し時間があったのと何となく作れそうな気がして作ってみたので習作の面が大きいです。興が乗ってしまった。

f:id:asmsuechan:20201105153215j:plain
元気に動いている様子

1. できたもの

queickという名前で作りました。queueとquickを足して2で割った名前です。何もquickじゃない気がするけど。強いて言えば私の開発がquickだった。

github.com

1.1 できること

Queickには以下の特徴があります。

  • redisを使わない(データはインメモリ保存)
  • 標準ライブラリだけで作られている
  • リトライ機構を作る
  • 日時指定のスケジュール実行をできるようにする
  • 定期実行(cron)機能
  • ネットワーク復帰時に失敗したジョブを流せるようにする
  • ジョブは新しく生成したスレッドで実行される
  • 永続化無し
  • Raspberry Piなどでの運用、IoTなシステムを想定
  • 大規模システムは想定しない。エッジ端末向け。さほど多くない軽量なタスクを実行する

マシンスペックが高くなく、移動などによりネットワーク接続が保証されていないような環境を意識しています。

1.2 使い方

pipでインストールできて、インストールしたらqueickというコマンドが実行できるようになります。(Python3.6以上必須)

$ pip install queick
$ queick
[INFO] 2020-10-31 14:23:10,105 - Welcome to queick!

まずジョブの本体であるjobfunc.pyを作成します。なおジョブとなる関数は別ファイルでなければなりません。

def function(arg1, arg2):
    print(arg1, "+", arg2, "=", arg1 + arg2)

そしてjobrunner.pyを以下のように作ります。

from queick import JobQueue
from jobfunc import function
import time

q = JobQueue()
q.enqueue(function, args=(1, 2,))
q.enqueue_at(time.time() + 5, function, args=(3, 4,)) # 5秒後に1回実行

st = SchedulingTime()
st.every(minutes=1).starting_from(time.time() + 10)
q.cron(st, function, args=(1, 2,)) # 10秒後から1分おきに実行

そしてjobrunner.pyを実行するとジョブがqueickのキューに追加されスレッドで実行されます。

$ python jobrunner.py

f:id:asmsuechan:20201105154758p:plain

実行できていますね。

2. ジョブキューシステム

ジョブキューシステムについて簡単に説明します。ジョブキューシステムの基礎の部分は非常に簡単です。基本的に(1) アプリケーションからキューにジョブを投入するモジュール、(2) キュー、(3) ワーカーの3つの要素で構成されています。

まずアプリケーションがメソッドを呼び出してキューにジョブを追加します。そしてジョブキューシステムがキューに追加されたジョブを取り出してそれをスレッドや別プロセスにて実行します。基本はこれだけです。

ですのでジョブキューシステムを(標準ライブラリのみで)作ろうと思ったら基本的には並行プログラミングをしていくことになります。

そして一般的なジョブキューシステムは非同期に実行する必要がある処理を安全に行うことを目的として作られています。ここで非同期に実行する必要がある処理とは、大量の処理や時間がかかる処理、メール送信など別のサーバーで実行するような処理などバックグラウンドで動作しておいた方がいい処理のことです。

3. 既存システムのアーキテクチャ

さて、実装を始めるにあたりまずは調査です。主に以前少し使ったことがあるSidekiqと、Python用ジョブキューシステムのRQのコードを読みながらアーキテクチャを理解していきました。

3.1 RQのアーキテクチャ

Python用ジョブキューシステムのRQは以下のようなアーキテクチャでした。

RQ本体はワーカープロセスとスケジューラープロセスの2つのプロセスが動作しており、それぞれがキューから取り出したジョブをforkして実行します。スケジューラーは1秒ごとにキューの中身を見て開始予定時間を過ぎたジョブを実行します。また、リトライはスケジューラーを活用しており、失敗したプロセスはN秒後にスケジュールされます。

RQがschedではなくスケジューラーを自前実装なのはPython2系とのコンパチのためでしょうかね。

3.2 Sidekiq

Pythonではないのですが以前少し使ったことがあるSidekiqについても調べました。Sidekiqは図にまとめるのが面倒になったので箇条書きします。

  • スレッドでジョブを実行
  • リトライ処理できる

4. アーキテクチャ

そして既存システムのアーキテクチャと要件を考慮し、以下のようなアーキテクチャで作ることにしました。構成に他との違いはさほどありません。

アプリケーションがTCPサーバーを介してジョブキューにジョブをenqueueし、ワーカープロセスがこれを受け取って開始時間がセットされたジョブならスケジューラーにジョブを移動します。そして即時開始できるジョブはスレッドを作ってそこで実行します。ジョブ失敗時はN秒後に実行するよう指定してスケジューラーに直接ジョブを投げます。

4.1 ネットワーク復帰時リトライのアーキテクチャ

ネットワーク復帰時に失敗ジョブを実行する仕組みも作っています。図がゴチャつくのを防ぐために別の図にしていますが基本のアーキテクチャは上のものと同じです。

この機能を使うにはqueick起動時に--ping-hostオプションを指定しなければなりません。このオプションにはネットワーク疎通確認用のサーバーがあるIPアドレスやホスト名を指定します。

$ queick --ping-host asmsuechan.com

そしてジョブのenqueue時にq.enqueue(function, args=(1, 2,), retry_on_network_available=True)のようにretry_on_network_availableをTrueにセットしたらそのジョブでネットワーク復帰時リトライが使えます。

ネットワークチェッカーは1秒1回ネットワークの疎通確認をして切断状態->接続状態への変化を検知した時に失敗キューを全てdequeueしてジョブを再実行するものです。ワーカープロセスとは別のサイクルで動作するので別プロセスで起動します。ですのでqueickに--ping-hostオプションが指定されていないときはネットワークチェッカーは起動しません。

5. 実装

ではようやく実装の話です。上のアーキテクチャを実装に落とし込んでいきます。(実際の順序は逆で、まずとりあえず最小限動くものを作ってそれから他システムのソースコード読みながらアーキテクチャを改善していったのですが)

こんなインターフェイスでアプリケーションから使うことを想定し実装しました。

# job.py
from queick import JobQueue, RETRY_TYPE
from jobfunc import function
q = JobQueue()
q.enqueue(function, args=(1, 2,))

# jobfunc.py (別ファイル)
def function():
    print(arg1, "+", arg2, "=", arg1 + arg2)

これでjob.pyを実行するとキューにジョブがenqueueされてワーカープロセスが生成したスレッドで処理が実行されます。

なおインターフェイスはrqに影響を受けています。Queue.enqueue(func)って直感的で分かりやすいですね。

5.1 ワーカー

ワーカーの役割は「ジョブをキューから受け取って別スレッドで実行する」です。この実装にはconcurrent.futuresを使いました。ジョブの受け取りはpollingではなくてイベント通知によって行っています。

また、ワーカーは「実行するジョブの関数を見つける」ことも必要です。ここはimportlibを使っておりimportlib.import_module(module_name)のようにして関数名から動的に関数を探し出してきています。

なおキューには辞書型として以下のような形式でジョブが入っています。

f:id:asmsuechan:20201105161014p:plain

スレッドを使用するとPythonGILによってマルチコアを活用できないという問題があります。しかしこのシステムの理想的なユースケースがたまにネットワークが切断される環境でのジョブ実行だと想定しています。つまりジョブ自体は軽くて別ホストのサーバーと通信するものレスポンス待ちの時間が長くなるはずなのでマルチプロセスのオーバーヘッドを避けてスレッドで実行するようにしました。

ここの検証は下の性能評価で行っています。

5.2 TCPサーバー

アプリケーションからワーカーへのメッセージの送信部分はTCPサーバーにしました。HTTPサーバーでも良かったのですが大したことはしないのでTCPソケット使うようにしました。ポートは9999に開いてジョブを待ち受けます。

ちなみにここは実装サボってて、スレッドやforkを使ったコネクションの多重化をしていないので1コネクションしか同時に受けることができません。興が乗るか必要に迫られたら作り込もうかなと思ってます。

5.3 スケジューリング、リトライ、定期実行

スケジューリングとリトライは基本的に同じ仕組みで作っていて、schedモジュールを使っています。

キューは1本で、一度ワーカープロセスがジョブを受け取って開始時間がセットされていたらスケジューラーにセットするようにしています。

リトライはジョブが失敗した時スケジューリングキューに未来の時間を設定したジョブを新しく投入します。デフォルトはExponential Backoffを採用しています(最大リトライ時間は3600秒後)。

定期実行もスケジューリング機能に乗っています。定期実行ジョブの場合はジョブ実行時に次のジョブをスケジュールする実装です。

スケジュール時間はジョブ投入時に指定できて、上のExponential Backoff、10秒後に固定のもの、1秒後, 2秒後, 3秒後, ...と失敗回数が増えるに連れて1秒ずつ増えていくもの、5秒後, 10秒後, 15秒後, ...のようにN秒ずつ増えていくものを準備しました。

5.4 自動テスト

Pythonのテストフレームワークはpytestがデファクトっぽいのですが標準ライブラリ縛りでやっているのでユニットテストにはunittestモジュールを使います。別にアプリケーションコードじゃないので気にする必要が無いと言えばそうなのですがどうせなら貫きたかったので。つまり趣味です。

結合テストはDockerで環境作ってその中でファイル書き込みを行うジョブを実行してファイルの行数をテストするようにしました。苦肉の策な風もありますがないより100倍マシです。イベント駆動でマルチプロセスなシステム、自動テスト非常に難しいですね。

6. 性能評価? とりあえずジョブ100万件投げる

とても雑で性能評価とは言いにくいですが簡単なジョブを100万件投げてみて不具合が出ないか確認します。

マシンスペックはCPU: Intel® Core™ i7-8565U CPU @ 1.80GHz × 8, RAM: 32GB, OS: Pop!_OS 20.04 LTSです。なおThreadPoolExecutorのmax_workersは8に設定しています。

実際のジョブは以下です。100万回ループするだけの簡単なCPUバウンドの処理です。これを100万回実行します。

def function(arg1, arg2):
    start_time = time.time()
    for i in range(0, 1000000):
        i = i
    end_time = time.time()

    print("Time:", end_time - start_time)

実際に計測を実行してみたところ、まずジョブの投入時間は70sでした。

$ time python testclient.py
python testclient.py  70.11s user 68.13s system 44% cpu 5:14.06 total

なおジョブは1つあたり約30msで終了し、合計でかかった時間は8.5時間程度でした。

f:id:asmsuechan:20201105145845p:plain

そしてジョブ実行中の最大メモリ使用量は1.1GBでした。

f:id:asmsuechan:20201105150342p:plain

30msくらいかかる処理を100万回実行するくらいでは特に問題なく実行できました。concurrent.futuresさんが偉いです。本当に落ちるまでやろうと思ったらハードリミットまでやれそうです。

7. CPUバウンドな処理とI/Oバウンドな処理の並列性比較

上でGILの話をしましたが、ここではその実際の性能を計測します。といってもマルチスレッド部分で込み入ったことはしていないので結果は自明っちゃ自明なんですけどね。

7.1 100000000回ループ(CPUバウンドな処理)

まずは単純に100000000回ループするだけのジョブを実行します。プログラムは以下です。

# jobfunc.py
import time
def function():
    start_time = time.time()
    for i in range(0, 100000000):
        i = i
    end_time = time.time()

    print("Time:", end_time - start_time)

# testclient.py
from queick import JobQueue
from jobfunc import function

q = JobQueue()
print(q.enqueue(function, args=(,)))
[INFO] 2020-11-05 11:24:06,648 - Job received -> data: {'func_name': 'jobfunc.function', 'args': (1, 2), 'retry': False, 'retry_interval': 10, 'retry_type': 'constant', 'max_retry_interval': 600, 'retry_on_network_available': True, 'max_workers': 10}, addr: ('127.0.0.1', 49808)
Time: 1.9901025295257568

単体ジョブの実行時間は1.99sです。

次に、上のジョブを2つenqueueした時の実行時間です。

[INFO] 2020-11-05 11:24:29,554 - Job received -> data: {'func_name': 'jobfunc.function', 'args': (1, 2), 'retry': False, 'retry_interval': 10, 'retry_type': 'constant', 'max_retry_interval': 600, 'retry_on_network_available': True, 'max_workers': 10}, addr: ('127.0.0.1', 49816)
[INFO] 2020-11-05 11:24:29,555 - Job received -> data: {'func_name': 'jobfunc.function', 'args': (1, 2), 'retry': False, 'retry_interval': 10, 'retry_type': 'constant', 'max_retry_interval': 600, 'retry_on_network_available': True, 'max_workers': 10}, addr: ('127.0.0.1', 49818)
Time: 4.395071744918823
Time: 4.393498420715332

2つのジョブが終了する時間は4.39sとなっています。これは単体のジョブの2倍以上の時間がかかっています。よってCPUバウンドな処理は向いていません。

7.2 外部への通信を行う(I/Oバウンドな処理)

次に通信のレスポンス待ちが発生するケースです。計測用コードは以下になります。https://moriokalab.com (弊研究室)にGETリクエストを投げるだけです。

# jobfunc.py
import time
import urllib.request

def function():
    start_time = time.time()
    with urllib.request.urlopen('https://moriokalab.com') as f:
        pass
    end_time = time.time()

    print("Time:", end_time - start_time)

# testclient.py
from queick import JobQueue
from jobfunc import function

q = JobQueue()
print(q.enqueue(function, args=(,)))

まず単体の実行時間を計測します。

[INFO] 2020-11-05 10:49:43,941 - Job received -> data: {'func_name': 'jobfunc.function2', 'args': (1,), 'retry': False, 'retry_interval': 10, 'retry_type': 'constant', 'max_retry_interval': 600, 'retry_on_network_available': False, 'max_workers': 10}, addr: ('127.0.0.1', 49002)
Time: 1.1877124309539795

単体での実行時間は1.19sであることが分かりました。

次に2つに増やした場合の実行時間を計測します。testclient.pyのprint(q.enqueue(function, args=(,)))を2行に増やして実行します。

[INFO] 2020-11-05 10:47:13,975 - Job received -> data: {'func_name': 'jobfunc.function', 'args': (,), 'retry': False, 'retry_interval': 10, 'retry_type': 'constant', 'max_retry_interval': 600, 'retry_on_network_available': False, 'max_workers': 10}, addr: ('127.0.0.1', 48968)
[INFO] 2020-11-05 10:47:13,976 - Job received -> data: {'func_name': 'jobfunc.function2, 'args': (,), 'retry': False, 'retry_interval': 10, 'retry_type': 'constant', 'max_retry_interval': 600, 'retry_on_network_available': False, 'max_workers': 10}, addr: ('127.0.0.1', 48970)
Time: 1.3101766109466553
Time: 1.8106589317321777

2つ目のジョブの終了時間が1.81sということで、単体の実行時間の約1.5倍です。2つのジョブを実行しているのにも関わらず実行時間は明らかに2倍より少ないのでI/Oバウンドな処理での有用性が分かりました。

8. まとめ

私は自律移動ロボット関係の研究をしているのですが、最近の自律移動ロボットのソフトウェア部分はPythonで書かれる場合が多いしロボットはセンサーから生成され続けるデータを扱うのでクラウド連携等でこちらの方にも応用できたら良いかなと考えながら作りました。

とにかく非同期ジョブ実行+スケジューリング+リトライを手軽に行えるものが欲しくて勢いで作ってしまいましたが作ってる間とても楽しかったです。標準ライブラリ縛りプログラミングは楽しい(競プロは苦手だけど)。

難しくない割に動くものができるので新しい言語を使い始めるときの練習としても良いテーマかもしれません。

機能的には他にもワーカープロセス多重化したりキューを複数作ったりとかいろいろ実装のしがいはありそうだと思ったのですがひとまずいらないので作ってないです。

dev.toに英語記事を書いてRedditに投稿してみたところ少しだけ反響があってOSSやってる感じがして楽しいです。

9. 参考