Python で標準ライブラリのみを使ってシンプルなジョブキューシステムを作りました。キューはRedisを使わずにインメモリに保存します。
この記事ではQueickの特徴と使い方、ジョブキューシステムについての説明、よく使われているシステムでの実装、Queickのアーキテクチャ と実装について説明します。
0. きっかけ
以前Raspberry Pi とNFC リーダーとSlackで研究室の入退室管理(打刻)システム を作ったのですが、Raspberry Pi のネットが途切れてSlackに打刻されない ことが1-2週間に1回ありました。
moriokalab.com
そこでジョブキューシステムを導入して非同期にリトライ する仕組みを組み込もうと思いいくつか探したのですが、基本的にキューとしてredisを使っているようでした。
Raspberry Pi の中にRedis入れて運用するのが嫌でインメモリで閉じたものが欲しくなった ため作りました。
正直、論文終わって少し時間があった のと何となく作れそう な気がして作ってみたので習作 の面が大きいです。興が乗ってしまった。
元気に動いている様子
1. できたもの
queick という名前で作りました。queueとquickを足して2で割った名前です。何もquickじゃない気がするけど。強いて言えば私の開発がquickだった。
github.com
1.1 できること
Queickには以下の特徴があります。
redisを使わない (データはインメモリ保存)
標準ライブラリだけで作られている
リトライ機構 を作る
日時指定のスケジュール実行 をできるようにする
定期実行(cron)機能
ネットワーク復帰時 に失敗したジョブを流せるようにする
ジョブは新しく生成したスレッドで実行 される
永続化無し
Raspberry Pi などでの運用、IoTなシステムを想定
大規模システムは想定しない。エッジ端末向け。さほど多くない軽量なタスクを実行する
マシンスペックが高くなく、移動などによりネットワーク接続が保証されていないような環境 を意識しています。
1.2 使い方
pipでインストールできて、インストールしたらqueickというコマンドが実行できるようになります。(Python3.6以上必須)
$ pip install queick
$ queick
[INFO] 2020-10-31 14:23:10,105 - Welcome to queick!
まずジョブの本体であるjobfunc.pyを作成します。なおジョブとなる関数は別ファイルでなければなりません。
def function (arg1, arg2):
print (arg1, "+" , arg2, "=" , arg1 + arg2)
そしてjobrunner.pyを以下のように作ります。
from queick import JobQueue
from jobfunc import function
import time
q = JobQueue()
q.enqueue(function, args=(1 , 2 ,))
q.enqueue_at(time.time() + 5 , function, args=(3 , 4 ,))
st = SchedulingTime()
st.every(minutes=1 ).starting_from(time.time() + 10 )
q.cron(st, function, args=(1 , 2 ,))
そしてjobrunner.pyを実行するとジョブがqueickのキューに追加されスレッドで実行されます。
$ python jobrunner.py
実行できていますね。
2. ジョブキューシステム
ジョブキューシステムについて簡単に説明します。ジョブキューシステムの基礎の部分は非常に簡単です。基本的に(1) アプリケーションからキューにジョブを投入するモジュール、(2) キュー、(3) ワーカー の3つの要素で構成されています。
まずアプリケーションがメソッドを呼び出してキューにジョブを追加します。そしてジョブキューシステムがキューに追加されたジョブを取り出してそれをスレッドや別プロセスにて実行します。基本はこれだけです。
ですのでジョブキューシステムを(標準ライブラリのみで)作ろうと思ったら基本的には並行プログラミング をしていくことになります。
そして一般的なジョブキューシステムは非同期に実行する必要がある処理を安全に行う ことを目的として作られています。ここで非同期に実行する必要がある処理とは、大量の処理や時間がかかる処理、メール送信など別のサーバーで実行するような処理などバックグラウンドで動作しておいた方がいい処理のことです。
さて、実装を始めるにあたりまずは調査です。主に以前少し使ったことがあるSidekiq と、Python 用ジョブキューシステムのRQ のコードを読みながらアーキテクチャ を理解していきました。
Python 用ジョブキューシステムのRQは以下のようなアーキテクチャ でした。
RQ本体はワーカープ ロセスとスケジューラープロセス の2つのプロセスが動作しており、それぞれがキューから取り出したジョブをforkして実行 します。スケジューラーは1秒ごとにキューの中身を見て 開始予定時間を過ぎたジョブを実行します。また、リトライはスケジューラーを活用しており、失敗したプロセスはN秒後にスケジュールされます。
RQがschedではなくスケジューラーを自前実装なのはPython2系とのコンパチのためでしょうかね。
3.2 Sidekiq
Python ではないのですが以前少し使ったことがあるSidekiqについても調べました。Sidekiqは図にまとめるのが面倒になったので箇条書きします。
そして既存システムのアーキテクチャ と要件を考慮し、以下のようなアーキテクチャ で作ることにしました。構成に他との違いはさほどありません。
アプリケーションがTCP サーバーを介してジョブキューにジョブをenqueueし、ワーカープ ロセスがこれを受け取って開始時間がセットされたジョブならスケジューラーにジョブを移動します。そして即時開始できるジョブはスレッドを作ってそこで実行します。ジョブ失敗時はN秒後に実行するよう指定してスケジューラーに直接ジョブを投げます。
4.1 ネットワーク復帰時リトライのアーキテクチャ
ネットワーク復帰時に失敗ジョブを実行する仕組みも作っています。図がゴチャつくのを防ぐために別の図にしていますが基本のアーキテクチャ は上のものと同じです。
この機能を使うにはqueick起動時に--ping-host
オプションを指定しなければなりません。このオプションにはネットワーク疎通確認用のサーバーがあるIPアドレス やホスト名を指定します。
$ queick --ping-host asmsuechan.com
そしてジョブのenqueue時にq.enqueue(function, args=(1, 2,), retry_on_network_available=True)
のようにretry_on_ network_availableをTrueにセットしたらそのジョブでネットワーク復帰時リトライが使えます。
ネットワークチェッカーは1秒1回ネットワークの疎通確認をして切断状態->接続状態への変化を検知した時に失敗キューを全てdequeueしてジョブを再実行するものです。ワーカープ ロセスとは別のサイクルで動作するので別プロセスで起動します。ですのでqueickに--ping -hostオプションが指定されていないときはネットワークチェッカーは起動しません。
5. 実装
ではようやく実装の話です。上のアーキテクチャ を実装に落とし込んでいきます。(実際の順序は逆で、まずとりあえず最小限動くものを作ってそれから他システムのソースコード 読みながらアーキテクチャ を改善していったのですが)
こんなインターフェイス でアプリケーションから使うことを想定し実装しました。
from queick import JobQueue, RETRY_TYPE
from jobfunc import function
q = JobQueue()
q.enqueue(function, args=(1 , 2 ,))
def function ():
print (arg1, "+" , arg2, "=" , arg1 + arg2)
これでjob.pyを実行するとキューにジョブがenqueueされてワーカープ ロセスが生成したスレッドで処理が実行されます。
なおインターフェイス はrqに影響を受けています。Queue.enqueue(func)
って直感的で分かりやすいですね。
5.1 ワーカー
ワーカーの役割は「ジョブをキューから受け取って別スレッドで実行する」です。この実装にはconcurrent.futures を使いました。ジョブの受け取りはpollingではなくてイベント通知によって行っています。
また、ワーカーは「実行するジョブの関数を見つける 」ことも必要です。ここはimportlib
を使っておりimportlib.import_module(module_name)
のようにして関数名から動的に関数を探し出してきています。
なおキューには辞書型として以下のような形式でジョブが入っています。
スレッドを使用するとPython のGIL によってマルチコアを活用できないという問題があります。しかしこのシステムの理想的なユースケース がたまにネットワークが切断される環境でのジョブ実行だと想定しています。つまりジョブ自体は軽くて別ホストのサーバーと通信するもの でレスポンス待ちの時間が長くなる はずなのでマルチプロセスのオーバーヘッドを避け てスレッドで実行するようにしました。
ここの検証は下の性能評価で行っています。
5.2 TCP サーバー
アプリケーションからワーカーへのメッセージの送信部分はTCP サーバーにしました。HTTPサーバーでも良かったのですが大したことはしないのでTCP ソケット使うようにしました。ポートは9999に開いてジョブを待ち受けます。
ちなみにここは実装サボってて、スレッドやforkを使ったコネクションの多重化をしていないので1コネクションしか同時に受けることができません。興が乗るか必要に迫られたら作り込もうかなと思ってます。
5.3 スケジューリング、リトライ、定期実行
スケジューリングとリトライは基本的に同じ仕組みで作っていて、schedモジュール を使っています。
キューは1本で、一度ワーカープ ロセスがジョブを受け取って開始時間がセットされていたらスケジューラーにセットするようにしています。
リトライはジョブが失敗した時スケジューリングキューに未来の時間を設定したジョブを新しく投入 します。デフォルトはExponential Backoff を採用しています(最大リトライ時間は3600秒後)。
定期実行もスケジューリング機能に乗っています。 定期実行ジョブの場合はジョブ実行時に次のジョブをスケジュールする実装です。
スケジュール時間はジョブ投入時に指定できて、上のExponential Backoff、10秒後に固定のもの、1秒後, 2秒後, 3秒後, ...と失敗回数が増えるに連れて1秒ずつ増えていくもの、5秒後, 10秒後, 15秒後, ...のようにN秒ずつ増えていくものを準備しました。
5.4 自動テスト
Python のテストフレームワーク はpytestがデファクト っぽいのですが標準ライブラリ縛りでやっているのでユニットテスト にはunittestモジュール を使います。別にアプリケーションコードじゃないので気にする必要が無いと言えばそうなのですがどうせなら貫きたかったので。つまり趣味です。
結合テスト はDockerで環境作ってその中でファイル書き込みを行うジョブを実行してファイルの行数をテストするようにしました。苦肉の策な風もありますがないより100倍マシです。イベント駆動でマルチプロセスなシステム、自動テスト非常に難しいですね。
6. 性能評価? とりあえずジョブ100万件投げる
とても雑で性能評価とは言いにくいですが簡単なジョブを100万件投げてみて不具合が出ないか確認します。
マシンスペックはCPU: Intel ® Core™ i7-8565U CPU @ 1.80GHz × 8, RAM: 32GB, OS: Pop!_OS 20.04 LTSです。なおThreadPoolExecutorのmax_workersは8に設定しています。
実際のジョブは以下です。100万回ループするだけの簡単なCPUバウンドの処理です。これを100万回実行します。
def function (arg1, arg2):
start_time = time.time()
for i in range (0 , 1000000 ):
i = i
end_time = time.time()
print ("Time:" , end_time - start_time)
実際に計測を実行してみたところ、まずジョブの投入時間は70sでした。
$ time python testclient.py
python testclient.py 70.11s user 68.13s system 44% cpu 5:14.06 total
なおジョブは1つあたり約30msで終了し、合計でかかった時間は8.5時間程度でした。
そしてジョブ実行中の最大メモリ使用量は1.1GBでした。
30msくらいかかる処理を100万回実行するくらいでは特に問題なく実行できました。concurrent.futuresさんが偉い です。本当に落ちるまでやろうと思ったらハードリミットまでやれそうです。
7. CPUバウンドな処理とI/Oバウンドな処理の並列性比較
上でGILの話をしましたが、ここではその実際の性能を計測します。といってもマルチスレッド部分で込み入ったことはしていないので結果は自明 っちゃ自明なんですけどね。
7.1 100000000回ループ(CPUバウンドな処理)
まずは単純に100000000回ループするだけのジョブを実行します。プログラムは以下です。
import time
def function ():
start_time = time.time()
for i in range (0 , 100000000 ):
i = i
end_time = time.time()
print ("Time:" , end_time - start_time)
from queick import JobQueue
from jobfunc import function
q = JobQueue()
print (q.enqueue(function, args=(,)))
[INFO] 2020-11-05 11:24:06,648 - Job received -> data: {'func_name': 'jobfunc.function', 'args': (1, 2), 'retry': False, 'retry_interval': 10, 'retry_type': 'constant', 'max_retry_interval': 600, 'retry_on_network_available': True, 'max_workers': 10}, addr: ('127.0.0.1', 49808)
Time: 1.9901025295257568
単体ジョブの実行時間は1.99s です。
次に、上のジョブを2つenqueueした時の実行時間です。
[INFO] 2020-11-05 11:24:29,554 - Job received -> data: {'func_name': 'jobfunc.function', 'args': (1, 2), 'retry': False, 'retry_interval': 10, 'retry_type': 'constant', 'max_retry_interval': 600, 'retry_on_network_available': True, 'max_workers': 10}, addr: ('127.0.0.1', 49816)
[INFO] 2020-11-05 11:24:29,555 - Job received -> data: {'func_name': 'jobfunc.function', 'args': (1, 2), 'retry': False, 'retry_interval': 10, 'retry_type': 'constant', 'max_retry_interval': 600, 'retry_on_network_available': True, 'max_workers': 10}, addr: ('127.0.0.1', 49818)
Time: 4.395071744918823
Time: 4.393498420715332
2つのジョブが終了する時間は4.39s となっています。これは単体のジョブの2倍以上の時間がかかっています。よってCPUバウンドな処理は向いていません。
7.2 外部への通信を行う(I/Oバウンドな処理)
次に通信のレスポンス待ちが発生するケースです。計測用コードは以下になります。https://moriokalab.com (弊研究室)にGETリクエス トを投げるだけです。
import time
import urllib.request
def function ():
start_time = time.time()
with urllib.request.urlopen('https://moriokalab.com' ) as f:
pass
end_time = time.time()
print ("Time:" , end_time - start_time)
from queick import JobQueue
from jobfunc import function
q = JobQueue()
print (q.enqueue(function, args=(,)))
まず単体の実行時間を計測します。
[INFO] 2020-11-05 10:49:43,941 - Job received -> data: {'func_name': 'jobfunc.function2', 'args': (1,), 'retry': False, 'retry_interval': 10, 'retry_type': 'constant', 'max_retry_interval': 600, 'retry_on_network_available': False, 'max_workers': 10}, addr: ('127.0.0.1', 49002)
Time: 1.1877124309539795
単体での実行時間は1.19s であることが分かりました。
次に2つに増やした場合の実行時間を計測します。testclient.pyのprint(q.enqueue(function, args=(,)))
を2行に増やして実行します。
[INFO] 2020-11-05 10:47:13,975 - Job received -> data: {'func_name': 'jobfunc.function', 'args': (,), 'retry': False, 'retry_interval': 10, 'retry_type': 'constant', 'max_retry_interval': 600, 'retry_on_network_available': False, 'max_workers': 10}, addr: ('127.0.0.1', 48968)
[INFO] 2020-11-05 10:47:13,976 - Job received -> data: {'func_name': 'jobfunc.function2, 'args': (,), 'retry': False, 'retry_interval': 10, 'retry_type': 'constant', 'max_retry_interval': 600, 'retry_on_network_available': False, 'max_workers': 10}, addr: ('127.0.0.1', 48970)
Time: 1.3101766109466553
Time: 1.8106589317321777
2つ目のジョブの終了時間が1.81s ということで、単体の実行時間の約1.5倍です。2つのジョブを実行しているのにも関わらず実行時間は明らかに2倍より少ないのでI/Oバウンドな処理での有用性 が分かりました。
8. まとめ
私は自律移動ロボット 関係の研究をしているのですが、最近の自律移動ロボットのソフトウェア部分はPython で書かれる場合が多いしロボットはセンサーから生成され続ける データを扱うのでクラウド 連携等でこちらの方にも応用できたら良いかなと考えながら作りました。
とにかく非同期ジョブ実行+スケジューリング+リトライを手軽に行えるものが欲しくて勢いで作ってしまいましたが作ってる間とても楽しかったです。標準ライブラリ縛りプログラミングは楽しい (競プロは苦手だけど)。
難しくない割に動くものができるので新しい言語を使い始めるときの練習としても良いテーマかもしれません。
機能的には他にもワーカープ ロセス多重化したりキューを複数作ったりとかいろいろ実装のしがいはありそうだと思ったのですがひとまずいらないので作ってないです。
dev.toに英語記事を書いて Reddit に投稿してみたところ少しだけ反響があってOSS やってる感じがして楽しいです。
9. 参考