相关文章推荐
APOLLO

こんにちは。アポロ株式会社でデータサイエンティストとしてインターンをしている河相です。
今回は、NVIDIA RAPIDSを使用したラージファイル(大容量データファイル)操作の高速化について、ご紹介させていただきます。

RAPIDSとは

RAPIDSとは、NVIDIAが提供するデータサイエンスと機械学習のためのオープンソースライブラリです。一番の特徴は、CPUコアだけでなく、GPU上でデータの前処理〜機械学習までを一貫して分析が行える点です。

Python,C/C++,Java等に対応したライブラリが存在します(詳しくはドキュメントを参照してください)。今回はPythonを使用するので、Pythonに対応したライブラリを使用します。

データの前処理をする際にpandasライブラリを使用すると思いますが、大規模なデータを使用する場合に「計算実行時間が遅い!」なんて事ありませんか?
私は経験があります!
RAPIDSであればGPU上で計算実行するので処理速度が格段に速くなります。また、Pandasライクなので基本的には操作もしやすいです。(所々、Pandasとは異なるので躓く箇所もありますが、、、、)

今回は、RAPIDSのcuDFライブラリを使用しました。cuDFライブラリは、データの読み込み、結合、集約、その他の操作を行うためのPython GPU DataFrameライブラリです。

データ読込・整形の高速化

ラージファイル操作の高速化を行うにあたり、cuDFライブラリだけでなくDaskライブラリも使用します。Daskとは、Pythonでの並列計算用のライブラリです。つまり、DaskとRAPIDSを併用すると、Pythonの処理速度の向上が期待できます。

まずは、Dask,cuDFを使用するためにinstallを行います。自分の環境に合ったインストールを行なってください。( https://rapids.ai/start.html
私の場合、Google Colab内でminiconda環境を作成し、PythonのVersionを3.9に変更後、Dask,cuDFをインストールしました。

# Update python 3.9
!wget -O mini.sh https://repo.anaconda.com/miniconda/Miniconda3-py39_4.12.0-Linux-x86_64.sh
!chmod +x mini.sh
!bash ./mini.sh -b -f -p /usr/local
!conda install -q -y jupyter
!conda install -q -y google-colab -c conda-forge
!python -m ipykernel install --name "py39" --user
# Install DASK
!pip install "dask[complete]" --upgrade
# Install RAPIDS
import os
os.environ['CUDA_PATH']='/usr/local/cuda-11.2'
os.environ['LD_LIBRARY_PATH']='/usr/local/cuda-11.2/targets/x86_64-linux/lib:$CUDA_PATH/lib64:$LD_LIBRARY_PATH'
!pip install cudf-cu11 dask-cudf-cu11 --extra-index-url=https://pypi.ngc.nvidia.com
!pip install cuml-cu11 --extra-index-url=https://pypi.ngc.nvidia.com --no-deps
!pip install cugraph-cu11 --extra-index-url=https://pypi.ngc.nvidia.com
!pip install cupy-cuda11x -f https://pip.cupy.dev/aarch64

次に、Dask-CUDA計算機クラスターの作成・設定を行います。Dask-CUDAクラスタはLocalCUDACluster・dask-cuda-workerのいずれかを使用して作成することができますが、今回は前者を使用します。(私のように、Google Colab内でminiconda環境を作成し、PythonのVersionを3.9に変更された方は、下記コードを実行する前にランタイムを再起動してから実行してください。)

from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import subprocess
cmd = "hostname --all-ip-addresses"
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
output, error = process.communicate()
IPADDR = str(output.decode()).split()[0]
cluster = LocalCUDACluster(ip=IPADDR)
client = Client(cluster)

今回は以下のスクリーンショット画像のようなデータの前処理・ファイル出力をしていきます。スクリーンショット画像のファイル名を「a.csv」とします。そして、他にも「a.csv」ファイルと日付部分が異なる「b.csv」,「c.csv」ファイルが存在することとします(日付部分以外は全く同じ構造)。また、スクリーンショット画像では、簡単のために列数・行数を少なくしていますが、実際のファイルは数千列・数十万行あると仮定します。

a.csv

次に、データの前処理コードを作成します。

from dask.dataframe.io.io import from_map
import dask_cudf
import cudf
import pyarrow as pa
import pandas as pd
class POS():
    def convert_multicolumn_to_singlecolumn(self, df):
        df = df.T.reset_index(drop=False).T
        col_list=['売上日','店舗コード','商品コード','数量','金額']
        col_dic = {df.columns[i]:col_list[i] for i in range(len(col_list))}
        df = df.rename(columns=col_dic)
        df = df.drop(["level_0","level_1"],axis=0)
        return df
    def execute_each_file_in_from_map(self):
        files = ['/content/a.csv', '/content/b.csv', '/content/c.csv']
        # from_map():filesに含まれている各ファイルに対してread関数を適用させる
        df = from_map(self.read, files)
        return df
    def read(self, file):
        dtypes = {'店舗コード':str, '商品コード':str}
        # cudfライブラリでデータの読込
        df = cudf.read_csv(file, dtype=dtypes)
        df = self.preprocessing(df)
        return df
    def preprocessing(self, df):
            date_list = df.columns[2:]
            date_list = [date_list[i:i + 2] for i in range(0, len(date_list), 2)]
            arr_pivot = []
            for i in range(len(date_list)):
                    df_pivot = cudf.concat([df[df.columns[:2]], df[date_list[i]]], axis=1)
                    df_pivot['売上日'] = date_list[i][0]
                    df_pivot = df_pivot.rename(columns = {
                                                                                            date_list[i][0]:'数量'
                                                                                            ,date_list[i][1]:'金額'
                    df_pivot['i'] = 1
                    df_pivot = df_pivot.pivot(values = ['数量','金額']
                                                                        ,index = ['売上日','店舗コード','商品コード']
                                                                        ,columns = ['i'])
                    df_pivot = df_pivot.reset_index()
                    arr_pivot.append(df_pivot)
            df_merge = cudf.concat(arr_pivot)
            df_merge = df_merge.to_pandas()
            df_merge = self.convert_multicolumn_to_singlecolumn(df_merge)
            return df_merge
def main():
    pos = POS()
    df = pos.execute_each_file_in_from_map()
    schema = pa.schema([
        pa.field('売上日', pa.string())
        ,pa.field('店舗コード', pa.string())
        ,pa.field('商品コード', pa.string())
        ,pa.field('数量', pa.string())
        ,pa.field('金額', pa.string())
    path = 'your_path'
    df.to_parquet(path=path+'.parquet',write_index=False, compute=True, compression='lz4', schema=schema)
if __name__ == '__main__':
		main()

上記関数内の簡単な説明をします。
まず、execute_each_file_in_from_map()・read()で複数ファイルのデータ読込を行います。

def execute_each_file_in_from_map(self,shop_name):
		files = ['a.csv', 'b.csv', 'c.csv']
		# from_map():filesに含まれている各ファイルに対してread関数を適用させる
    df = from_map(self.read, files)
def read(self, file):
    dtypes = {'店舗コード':str, '商品コード':int}
		# cudfライブラリでデータの読込
    df = cudf.read_csv(file, dtype=dtypes)
		df = self.preprocessing(df)
    return df

execute_each_file_in_from_map()内のfrom_map()はDaskライブラリで、read()内のcudf.read_csvはcuDFライブラリを使用しています。つまり、cudf dataframeから、dask dataframeに変換しています。データ変換することで、複数のGPUで処理することができます。
(また、cuDFを使用してDataFrameパーティションを処理できるようにDaskを拡張できるDask-cuDFというライブラリも存在しますが、前処理の都合上、cuDF→Daskの変換を行いました。)

次に、preprocessing()でデータの前処理を行います。

def preprocessing(df, shop_name):
		date_list = df.columns[2:]
		date_list = [date_list[i:i + 2] for i in range(0, len(date_list), 2)]
		arr_pivot = []
		for i in range(len(date_list)):
				df_pivot = pd.concat([df[df.columns[:2], df[date_list[i]], axis=1)
				df_pivot['売上日'] = date_list[i][0]
				df_pivot = df_pivot.rename(columns = {
																						columns_list[0]:'数量'
																						,columns_list[1]:'金額'
				df_pivot['i'] = 1
				df_pivot = df_pivot.pivot(values = ['数量','金額']
																	,index = ['店舗コード','商品コード']
																	,columns = ['i'])
				arr_pivot.append(df_pivot)
		df_merge = cudf.concat(arr_pivot)
		df_merge = df_merge.to_pandas()
		df_merge = convert_multicolumn_to_singlecolumn(df_merge)
		return df_merge

前処理として行なったのは以下です。

  1. 各日のピボットテーブル作成

  2. カラム名の変更

  3. 各日のピボットテーブルを結合

今回のデータは横持ちデータなので縦持ちデータに変換して、後に分析しやすいデータに格納することを考えます。売上数量と売上金額は別々の列にしたかったので、stack()ではなくpivot()を用いて整形しました。
ここで、注意しなければならないのは、cuDFライブラリのpivot()では、index,columns列の指定が必須なことです。今回の場合、columns列は不要だったので、適当に列を作成し、指定しました。しかし、columns列を指定すると、以下のスクリーンショット画像のようなマルチカラムになってしまいます。

従って、convert_multicolumn_to_singlecolumn()関数でマルチカラムをシングルカラムに直す処理を行いました。また、convert_multicolumn_to_singlecolumn()関数を実行するために、一回pandas処理ができるようにto_pandas()でpandas変換をしています。

前処理を行なった結果、a.csvファイルは以下のスクリーンショット画像のようなデータに整形できました。

最後に、main()で上記で説明した関数の実行とparquetファイルの出力を行っています。

def main(shop_name):
		df = execute_each_file_in_from_map(shop_name)
		schema = pa.schema([
				pa.field('売上日',pa.string())
				,pa.fileld('店舗コード', pa.string())
				,pa.fileld('商品コード', pa.string())
				,pa.fileld('数量', pa.string())
				,pa.fileld('金額', pa.string())
		path = 'your_path'
		df.to_parquet(path=path+'.parquet', write_index=False, compute=True, compression='lz4', schema=schema)

今回は、列ベースのフォーマットであるparquetファイルを使用して、更なる高速化を目指します。加えて、parquetはデータの圧縮が可能なので、データサイズ容量を抑えることができます。処理速度やデータサイズに関する比較については、後ほど実験します。

上記がデータ読込・整形の高速化でした。
実際に、DaskとRAPIDSを併用したことで、どれくらい高速化したのか比較を行います。
以下に、9つの約10MBファイルのデータの読込・整形を行なった結果を示します。

  • Pandas:730秒

  • Dask:261秒

  • DaskとcuDF:35秒

上から順に、Pandasだけ/Daskだけ/DaskとcuDF で処理を行った結果です。各処理を5回行い、平均を取った結果です。

Pandasだけの時では、前処理を各ファイルに対して逐次的に行うのに対して、Daskだけの時には分散処理を行います。更に、DaskとcuDFの時にはGPU上で分散処理を行なってるので、処理実行時間の速さ順は、DaskとcuDF >Dask>Pandasのような結果になりました。予想通りの結果ではあるのですが、こんなに違うなんて驚きですね!

データ整形後ファイル出力/入力時の高速化

今回は、出力するファイル形式についても検討を行いました。
比較対象は以下です。

  • csv形式

  • parquet形式(圧縮方法:SNAPPY, GZIP, LZ4, ZSTD)

比較項目は以下です。

  • ファイル出力(5回平均)

  • ファイル読込(5回平均)

  • ファイル容量(5ファイル平均)

parquetのファイル圧縮方法について詳細の説明は省きます。
また、デフォルトはsnappyです。

Dask,cuDFを扱う際の注意点

  • cuDFはexcelファイルの読み込みができないので注意が必要です。

  • 複数のファイルをDASKでcompute()(計算実行)する場合、全てのファイルのカラム名を揃える必要があります。(揃えない場合、カラムの推論エラーになります。)

    • 解決策 は、全てのファイルのカラム名を参照し、カラム名を追加・削除する事等があります。

  • Dask,cuDFライブラリでは所々、Pandasと異なる箇所があるので注意が必要です(具体例:今回のcuDFライブラリのpivot()メソッド)。

まとめ

今回はNVIDIA RAPIDSを利用したラージファイル操作の高速化についてまとめました。
Pandasだけの処理の時と比較して、Dask,cuDFを利用することで、処理時間がかなり短縮されました。凄いですよね!

また、今回紹介しなかったRAPIDSの機械学習ライブラリ(cuML)を使用することで、前処理〜機械学習までをGPUを用いて高速に計算処理してくれます。その他、networkXのような機能を持つcuGraphライブラリ等や異なるライブラリ間をコネクトする機能を持つcuxfilterライブラリ等様々なライブラリがあるので、ぜひ見てみてください!

また、本日使用したcsvファイルは以下です。

この記事を読んだ皆さんが、素敵なデータの高速化ライフが贈れることを願っています!


最後まで読んでいただき、ありがとうございます。
アポロならではの技術的課題に対する取り組みやプロダクト開発の試行錯誤で得た学びなどを定期的に発信していきます。
少しでも業界へ貢献できれば嬉しいです。
今後ともよろしくお願いいたします。

アポロでは、一緒に働く仲間を募集中です。
興味のある方は、ぜひ下記の採用サイトをご覧ください。

アポロ株式会社 の全ての求人一覧 アポロ株式会社 の全ての求人一覧です。 herp.careers
ただ構想・戦略を描くだけでもなく、ただ分析を行うだけでもなく、ただモノ作りをするだけでもなく、クライアントと手を取り合い、AIを世の中に加速度的に普及させることを目的としたスタートアップ企業です。 https://apol.co.jp
 
推荐文章