KedroのModular Pipelineについて
本記事はMLOps Advent Calendar 2022の12/18の記事です。 (投稿遅れてしまい、申し訳ありません🙇♂。ご指摘・コメント等あればぜひお願いします。)
概要
- Kedroでパイプラインを構成する場合に、モジュール化したパイプラインを定義することができる。
- モジュールパイプラインとして定義することでそのパイプラインを他のプロジェクトにも移植しやすくなる。
- 簡単な例で確かめる。
Modular Pipelineについて
Kedroで機械学習プロジェクトを実装していくと、プロジェクトが大きくなるに連れて、以下のような課題やニーズが発生するのではないでしょうか?
- 一つのパイプラインがどんどん大きくなり、全体の見通しが悪くなり、テストも書きにくくなる
- 同じような一連の処理を複数の場所で行っていて無駄が多くなる
- パイプラインの中の特定の処理に関しては他のKedroを使ったプロジェクトになるべくそのまま利用したい
Kedroではパイプラインをモジュール化した状態で作成することで、パイプラインの肥大化を避け、再利用性を高める事が可能です。この記事ではModular Pipelineの作成方法と使用例を示したいと思います。
Modular Pipelineの作成方法
以下のコマンドで、パイプラインを作成します。
kedro pipeline create <pipeline_name>
コマンドを実行すると、プロジェクトトップディレクトリ直下に以下のboiler plateが作成されます。
src/<project_name>/pipelines/<pipeline_name>
- パイプラインとパイプラインに含むノードを作成するディレクトリ
pipeline.py
,nodes.py
が自動作成される
src/tests/pipelines/<pipeline_name>
- パイプライン用のテストコードを格納するためのディレクトリ
conf/base/parameters/<pipeline_name>.yml
- パイプライン用のパラメータのコンフィグファイル
作成したModular Pipelineの移植
モジュール化されているため、他のプロジェクトに簡単に移行することができますが、公式のドキュメントによれば他のKedroプロジェクトに移植するにあたっては以下の制約があります。
- モジュールパイプラインはメイン側のパイプライン内のモジュールに依存できない
catalog.yml
は移植先のプロジェクトで更新が必要- パラメータのコンフィグファイルについて移植先のプロジェクトの
conf/
以下に再配置が必要
Irisデータセットでの例
Starterにあるpandas-iris
で動作を確認します。今回はpd.DataFrame
のデータを に標準化するs処理をModular Pipelineとして追加してみます。
まずpandas-iris
をスターターにしてプロジェクトを作成します。
kedro new --starter=pandas-iris
続いてプロジェクト内にあるsrc/requirements.txt`に記載された依存パッケージをインストールします。
pip install -r src/requirements.txt
ここでModular Pipelineを作成します。例としてmy_pipeline
という名前で作成してみます。
kedro pipeline create my_pipeline
これでsrc/iris/pipelines/my_pipeline
が作成されるので、my_pipeline
の中のnodes.py
とpipeline.py
に処理を書いていきます。
# iris/pipelines/my_pipeline/nodes.py import pandas as pd def standardize(train_data: pd.DataFrame, test_data: pd.DataFrame) -> pd.DataFrame: def calc(data: pd.DataFrame) -> pd.DataFrame: return (data - data.mean()) / (data.std() + 1e-9) return calc(train_data), calc(test_data)
# iris/pipelines/my_pipeline/pipeline.py from kedro.pipeline import Pipeline, node, pipeline from .nodes import standardize def create_pipeline(**kwargs) -> Pipeline: return Pipeline([ node( func=standardize, inputs=["input_train", "input_test"], outputs=["standardized_train", "standardized_test"], name="standardize_node"), ])
この状態でkedro viz
してパイプラインの構成を可視化すると以下のようになります。
(なおこのままだとモジュール側のパイプラインの入力となる"input_train"、"input_test"が宣言されていないため、catalog.yml
に"example_iris_data"と同じCSVを読み込むように追記しています。)
図から分かる通り、標準化を行うモジュールパイプラインは定義できていますが、メイン側のパイプラインとは独立して存在し、メインの処理に組み込まれていません。
この標準化のモジュールパイプラインの処理をX_train
とX_test
に適用したいと思います。そうするには、メイン側のパイプラインにつなぎ込む必要があり、メイン側のパイプラインを書き換えることで実現できます。
# iris/pipeline.py from kedro.pipeline import Pipeline, node, pipeline from .nodes import make_predictions, report_accuracy, split_data def create_pipeline(**kwargs) -> Pipeline: return pipeline( [ # node( # func=split_data, # inputs=["example_iris_data", "parameters"], # outputs=["X_train", "X_test", "y_train", "y_test"], # name="split", # ), # node( # func=make_predictions, # inputs=["X_train", "X_test", "y_train"], # outputs="y_pred", # name="make_predictions", # ), node( func=split_data, inputs=["example_iris_data", "parameters"], outputs=["input_train", "input_test", "y_train", "y_test"], name="split", ), node( func=make_predictions, inputs=["standardized_train", "standardized_test", "y_train"], outputs="y_pred", name="make_predictions", ), node( func=report_accuracy, inputs=["y_pred", "y_test"], outputs=None, name="report_accuracy", ), ] )
定義したモジュールパイプラインにデータを渡すメイン側のノードの出力名と、反対にモジュールパイプラインからデータを受け取るメイン側のノードの入力名を、モジュールパイプラインに合わせて変更しています。
これでモジュールパイプラインをメイン側のパイプラインにつなぎ込む事ができました。
再びkedro viz
すると以下のようになり、無事定義したモジュールパイプラインがメイン側のパイプラインに接続していることがわかります。
これで、ノードを追加して一つのパイプラインとしてワークフローを定義するのではなく、モジュール化したパイプライン自体を定義して既存のパイプラインに結合することができました。モジュール化したパイプラインはそのまま他のプロジェクトに持っていくことができるので便利です。
この記事で書かなかったこと
この記事では時間の都合で以下の点について触れていません。後で追記できる範囲でしたいと思います。
- モジュールパイプラインをメイン側のパイプラインに繋ぐ場合に、メイン側のノードの入出力を変えないままパイプして繋ぐ方法
- モジュールパイプラインを他のプロジェクトに移植するためにmicro-packagingする方法
参考
公式のドキュメント