比治山日記

比治山スカイウォーカーです

KedroのModular Pipelineについて

本記事はMLOps Advent Calendar 2022の12/18の記事です。 (投稿遅れてしまい、申し訳ありません🙇‍♂。ご指摘・コメント等あればぜひお願いします。)

概要

  • Kedroでパイプラインを構成する場合に、モジュール化したパイプラインを定義することができる。
  • モジュールパイプラインとして定義することでそのパイプラインを他のプロジェクトにも移植しやすくなる。
  • 簡単な例で確かめる。

Modular Pipelineについて

Kedroで機械学習プロジェクトを実装していくと、プロジェクトが大きくなるに連れて、以下のような課題やニーズが発生するのではないでしょうか?

  • 一つのパイプラインがどんどん大きくなり、全体の見通しが悪くなり、テストも書きにくくなる
  • 同じような一連の処理を複数の場所で行っていて無駄が多くなる
  • パイプラインの中の特定の処理に関しては他のKedroを使ったプロジェクトになるべくそのまま利用したい

Kedroではパイプラインをモジュール化した状態で作成することで、パイプラインの肥大化を避け、再利用性を高める事が可能です。この記事ではModular Pipelineの作成方法と使用例を示したいと思います。

Modular Pipelineの作成方法

以下のコマンドで、パイプラインを作成します。

kedro pipeline create <pipeline_name>

コマンドを実行すると、プロジェクトトップディレクトリ直下に以下のboiler plateが作成されます。

  • src/<project_name>/pipelines/<pipeline_name>
    • パイプラインとパイプラインに含むノードを作成するディレクト
    • pipeline.py, nodes.pyが自動作成される
  • src/tests/pipelines/<pipeline_name>
    • パイプライン用のテストコードを格納するためのディレクト
  • conf/base/parameters/<pipeline_name>.yml
    • パイプライン用のパラメータのコンフィグファイル

作成したModular Pipelineの移植

モジュール化されているため、他のプロジェクトに簡単に移行することができますが、公式のドキュメントによれば他のKedroプロジェクトに移植するにあたっては以下の制約があります。

  • モジュールパイプラインはメイン側のパイプライン内のモジュールに依存できない
  • catalog.yml は移植先のプロジェクトで更新が必要
  • パラメータのコンフィグファイルについて移植先のプロジェクトの conf/以下に再配置が必要

Irisデータセットでの例

Starterにあるpandas-iris で動作を確認します。今回はpd.DataFrameのデータを  \mu=0, {\sigma}^2=1 に標準化するs処理をModular Pipelineとして追加してみます。

まずpandas-irisをスターターにしてプロジェクトを作成します。

kedro new --starter=pandas-iris

続いてプロジェクト内にあるsrc/requirements.txt`に記載された依存パッケージをインストールします。

pip install -r src/requirements.txt

ここでModular Pipelineを作成します。例としてmy_pipelineという名前で作成してみます。

kedro pipeline create my_pipeline

これでsrc/iris/pipelines/my_pipelineが作成されるので、my_pipelineの中のnodes.pypipeline.pyに処理を書いていきます。

# iris/pipelines/my_pipeline/nodes.py
import pandas as pd

def standardize(train_data: pd.DataFrame, test_data: pd.DataFrame) -> pd.DataFrame:
    def calc(data: pd.DataFrame) -> pd.DataFrame:
        return (data - data.mean()) / (data.std() + 1e-9)

    return calc(train_data), calc(test_data)
# iris/pipelines/my_pipeline/pipeline.py
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import standardize


def create_pipeline(**kwargs) -> Pipeline:
    return Pipeline([
        node(
            func=standardize, 
            inputs=["input_train", "input_test"], 
            outputs=["standardized_train", "standardized_test"], 
            name="standardize_node"),
    ])

この状態でkedro vizしてパイプラインの構成を可視化すると以下のようになります。 (なおこのままだとモジュール側のパイプラインの入力となる"input_train"、"input_test"が宣言されていないため、catalog.ymlに"example_iris_data"と同じCSVを読み込むように追記しています。)

図から分かる通り、標準化を行うモジュールパイプラインは定義できていますが、メイン側のパイプラインとは独立して存在し、メインの処理に組み込まれていません。 この標準化のモジュールパイプラインの処理をX_trainX_testに適用したいと思います。そうするには、メイン側のパイプラインにつなぎ込む必要があり、メイン側のパイプラインを書き換えることで実現できます。

# iris/pipeline.py
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import make_predictions, report_accuracy, split_data

def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            # node(
            #     func=split_data,
            #     inputs=["example_iris_data", "parameters"],
            #     outputs=["X_train", "X_test", "y_train", "y_test"],
            #     name="split",
            # ),
            # node(
            #     func=make_predictions,
            #     inputs=["X_train", "X_test", "y_train"],
            #     outputs="y_pred",
            #     name="make_predictions",
            # ),
            node(
                func=split_data,
                inputs=["example_iris_data", "parameters"],
                outputs=["input_train", "input_test", "y_train", "y_test"],
                name="split",
            ),
            node(
                func=make_predictions,
                inputs=["standardized_train", "standardized_test", "y_train"],
                outputs="y_pred",
                name="make_predictions",
            ),
            node(
                func=report_accuracy,
                inputs=["y_pred", "y_test"],
                outputs=None,
                name="report_accuracy",
            ),
        ]
    )

定義したモジュールパイプラインにデータを渡すメイン側のノードの出力名と、反対にモジュールパイプラインからデータを受け取るメイン側のノードの入力名を、モジュールパイプラインに合わせて変更しています。

これでモジュールパイプラインをメイン側のパイプラインにつなぎ込む事ができました。
再びkedro vizすると以下のようになり、無事定義したモジュールパイプラインがメイン側のパイプラインに接続していることがわかります。

これで、ノードを追加して一つのパイプラインとしてワークフローを定義するのではなく、モジュール化したパイプライン自体を定義して既存のパイプラインに結合することができました。モジュール化したパイプラインはそのまま他のプロジェクトに持っていくことができるので便利です。

この記事で書かなかったこと

この記事では時間の都合で以下の点について触れていません。後で追記できる範囲でしたいと思います。

  • モジュールパイプラインをメイン側のパイプラインに繋ぐ場合に、メイン側のノードの入出力を変えないままパイプして繋ぐ方法
  • モジュールパイプラインを他のプロジェクトに移植するためにmicro-packagingする方法

参考

公式のドキュメント

kedro.readthedocs.io