Ubuntuでdask-xgboost の例を動作させる

主記憶メモリに収まりきらないデータの教師あり学習ができるようにするライブラリを調査している。XGBoostを利用できる vaex を調べてきたが、別のライブラリでOut of coreにデータを置いても学習できないかを調べた。Daskを併用することを中心にして調べてみた。ただし、daskの分散機能ではなく、Out of coreのデータフレームや機械学習のサポートに関心をもって調べた。

調べた範囲では、データをメモリ外においてXGBoostを利用する方法としては大きく以下のものがあるようだ。

  1. dask-xgboost
  2. dask-mlのXGBoost
  3. vaex.ml.xgboost
  4. Xgboostの External Memory の機能を利用

1. と 2. について混同して関係がわからず、理解に手間取った。3.と4. はdaskは関係がない。

ここでは、1. のサンプルを PCのUbuntuで動作させるまでの記録を残しておく。

対象と環境

Dask and Pandas and XGBoost: Playing nicely between distributed systemshttps://github.com/mrocklin/dask-xgboost に説明が記載されているJupyter Notebook の動作確認をすることにした。

1台のPCで動作させる。

データのダウンロード

Notebook のコードでは次のように AWS S3からデータを読み込むようにしている。

# Create the dataframe
df = dd.read_csv('s3://dask-data/airline-data/20*.csv', usecols=cols,
                  storage_options={'anon': True})

AWS の EC2 でコードを実行するのならそれで良いが、データ量も多いことからダウンロードしてローカルで処理することにした。

AWSにログインする必要があるが、https://s3.console.aws.amazon.com/s3/buckets/dask-data/ からデータにアクセスできる。

複数のファイルをまとめてダウンロードしたいので、AWS コマンドラインインターフェイス(CLI) の cp コマンドを使用して airline-data をコピーした。

ローカルでは、次のようなコードでローカルファイルシステムから読み込むように変更した。

df = dd.read_csv('airline-data/200*.csv', usecols=cols,
                  storage_options={'anon': True})

変更点

ローカルで実行するためには一部変更する必要のあるところがあった。

クライアントの生成

1台のPCで動作させたいので、プロセスを別にする必要がないと考えて、processes=False を設定して試してみた。

from dask import compute, persist
from dask.distributed import Client, progress

client = Client(processes=False)
client

この設定では、ClientのSchedulerが inproc://192.168.1.125/25828/1 などのようになり、プロセス内で動作しているようである。しかし、これで進めると dask_xgboost.xgboost.trainを実行した際に、socket.gaierror: [Errno -2] Name or service not known のようなエラーとなった。

distributed.worker - WARNING - Run Failed
Function: start_tracker
args:     ('192.168.1.125/25828/1', 1)
kwargs:   {}

inprocのアドレスの表記を適切に解釈できないようである。バグと思うが、daskを利用する人は1台のPCで動作させることがないので、修正されないのであろう。

なんとか、processes=Falseにしたまま動作させることができなかを試したところ、手元の環境では、以下のようにip=にTCPを指定することで dask_xgboost.xgboost.train が実行できるようになった。

from dask import compute, persist
from dask.distributed import Client, progress

client = Client(processes=False,  ip='tcp://127.0.0.1:8989')
client

ただし、ip=を指定すると2回目からは、既に指定したアドレスが使われているという  OSError: [Errno 98] Address already in use のようなエラーが発生する。

プロセスが生成されても良いのであれば、processes=Trueにするか、省略する。

ちなみに、Google Colabでも試してみたが、processes = True にすると処理が進まない。制限が設けられているようである。また、processes = False にするとdask_xgboost.xgboost.trainを実行した際に上記と同じエラーとなる。そこで in=tcp://… を加えてみたが、インストールされているパッケージのバージョンが異なるためか、Colabの制限なのか、ローカルPCのUbuntuとは違って処理が進まない。現時点では、Google Colabでdask-xgboostを利用した学習はできていない。

メモリ管理

計算を行うWorkerのメモリ管理の設定をしないで、データ量を増やしてtrainを実行したところ以下の警告が出て、Workerが終了し、再起動して計算が終わらない。

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker

メモリ管理のしきい値などを設定して終了しないようにする必要がある。もちろん、対象とする問題が必要とするメモリ量(仮想記憶を含む)を超えるものは正常に処理が終わらないが、問題が必要とするメモリ量が範囲内なら正常に終了するようにしたい。

Jupyter Notebook内では次のように設定を確認できる。

import dask dask.config.config
以下のように表示される。
{'distributed': {'worker': {'memory': {'target': 0.6,
    'spill': 0.7,
    'pause': 0.8,
    'terminate': 0.95}}},
 'temporary-directory': None,
 'dataframe': {'shuffle-compression': None},
 'array': {'svg': {'size': 120}}}

Dask の Configuration およびWorker の Memory Management の記述に従って ~/.config/dask にある YAMLファイルで設定を記述するか、プログラムで指定する。

YAMLファイルでは、Worker の Memory Management に示されているとおり次のように記述する。terminateに Falseを設定することで、メモリ不足になってもプロセスが終了して、再起動しなくなる。最後に示す実行例でもこの設定で実行している。

distributed:
  worker:
    # Fractions of worker memory at which we take action to avoid memory blowup
    # Set any of the lower three values to False to turn off the behavior entirely
    memory:
      target: 0.60  # target fraction to stay below
      spill: 0.70  # fraction at which we spill to disk
      pause: 0.80  # fraction at which we pause worker threads
      terminate: False  # fraction at which we terminate the worker

プログラムでは、以下のように記述できる。ただし、執筆時点では、Workerを終了させるか否かおよび終了させる場合にはそのしきい値を設定するための terminateはプログラムでは設定できない。

from dask import compute, persist
from dask.distributed import Client, LocalCluster, progress

worker_kwargs = {
    'memory_limit': '24G',
    'memory_target_fraction': 0.6,
    'memory_spill_fraction': 0.7,
    'memory_pause_fraction': 0.8,
#    'memory_terminate_fraction': 0.95,
}
cluster = LocalCluster(processes=False, n_workers=1, threads_per_worker=8, **worker_kwargs)
client = Client(cluster)
client

仮想記憶を使わない範囲での memory_limitを設定し、terminate をFalseにして再起動しないようにすることで、前述の Restarting worker の警告が出ないようになった。ただし、終了しないだけで、必要なメモリは利用していくようであり、動作確認に利用したノートブックのファイル airline-data/20*.csv をすべて読み込むようにして手元のPC(主記憶32GB、スワップ47GB程度)で実行した場合には、主記憶を超えて、スワップ領域も使われて仮想記憶の限界に達した。

YAMLファイルでは、以下の設定を行った。

distributed:
  worker:
    # Fractions of worker memory at which we take action to avoid memory blowup
    # Set any of the lower three values to False to turn off the behavior entirely
    memory:
      target: 0.60  # target fraction to stay below
      spill: 0.70  # fraction at which we spill to disk
      pause: 0.80  # fraction at which we pause worker threads
      terminate: False  # fraction at which we terminate the worker

訓練データ量が増えた際の木構造などのパラメータ領域の増加による使用メモリ量の増大が、現時点で観測されているものが最小なのかははっきりしない。増加の割合をもっと減らせるようにも思われるので、できればもう少し調査していきたい。

実行例

実行したノートブックの.ipynbファイルを Gistにアップロードした。

https://gist.github.com/kunsen-an/334a52b929ad5ffc69134f8f91cc8ce9