(1) データの展開とファイル分割
例として「wls_day-07.bz2」331MBのデータを使ってみます。まずはbz2を展開していきます。僕はLinuxまたはMacのターミナルで実行しました。bunzip2 コマンドを使います。結構時間がかかります。僕の環境では3分程度です。
$ bunzip2 wls_day-07.bz2
展開したファイル名は「wls_day-07」展開後の容量は10.1GBでした。このファイルの中身はjson形式となっています。このファイルを直接開こうとしても、通常のPCではメモリエラーになってしまい開くことができません。そこで500万行ずつ程度にこのファイルを分割します。
split -l 5000000 wls_day-07 wls_day-07_5000000
完了すると、今回は10個のファイルになり、末尾にaaなどがつきます。
(2) jsonデータからcsvに変換
jsonデータ形式は情報量も多く扱いやすいのですが、今回のようなデータではcsvに変換して、データを開きやすくした方が全体を扱えるのでいいと思います。一つずつファイルを取り込み、一つのcsvファイルにします。分割されたファイルが10個あるので、MPIで10プロセス発生させ、それぞれのプロセスで取り込んだものをrank0のプロセスが集約するようにします。一つのプロセスで時間かけてやってもよいと思います。
この処理には相当大きなメモリが必要となります。最低100GB程度あるサーバで実行して下さい。今回jsonで取り込んで、pandasでデータフレーム形式に変換して、そのあとcsvに保存するようにしました。
ソースコード(HostEvents_CSV_MPI.py)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from mpi4py import MPI | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import time | |
import sys | |
import json | |
if __name__ == '__main__': | |
comm = MPI.COMM_WORLD | |
rank = comm.Get_rank() | |
size = comm.Get_size() | |
file_list = ['wls_day-07_5000000aa', 'wls_day-07_5000000ab', 'wls_day-07_5000000ac', 'wls_day-07_5000000ad', 'wls_day-07_5000000ae', 'wls_day-07_5000000af', 'wls_day-07_5000000ag', 'wls_day-07_5000000ah', 'wls_day-07_5000000ai', 'wls_day-07_5000000aj'] #file_listの要素数だけプロセスを発生するようにする(今回は10) | |
#(1)各プロセスでrank数のfile_listを取り込み | |
json_file = open(file_list[rank]) | |
df_jsonl = pd.read_json(json_file, orient='records', lines=True) | |
#(2)各プロセスから取り込んだ内容を集約して、一つのデータフレームにする | |
df_all = None | |
#データフレームの結合 | |
if rank == 0: | |
df_all = df_jsonl #自分のデータフレームを代入 | |
for i in range(1, size): #他プロセスのデータフレームを受け取り結合 | |
df_rank = comm.recv(source=i, tag=11) | |
df_all = pd.concat([df_all, df_rank], ignore_index=True) | |
print('結合完了') | |
else : | |
comm.send(df_jsonl, dest=0, tag=11) | |
if rank == 0: | |
df_all.to_csv('./wls_day-07_all.csv', index=False) | |
#mpiexec -n 10 python3 HostEvents_CSV_MPI.py |
集約ができると「wls_day-07_all.csv」が作成されます。僕の環境だと10分程度かかりました。容量は4.3GBとなり10.1GBから半減しています。これを試しに取り込んでみます。
少し時間はかかりますが、jupyterの環境でも無事取り込むことができました。今回の1日分は4500万行程度でした。ここまでで1日分のHost Events Dataを取り込むことが出来ましたので、次はこれの基本分析を行なっていきます。
0 件のコメント:
コメントを投稿