Dask Python:RAMを超えるBig Dataを並列処理で攻略 — 実践から学んだTips

Python tutorial - IT technology blog
Python tutorial - IT technology blog

pandasでは力不足だったとき — 実体験

PythonをほぼすべてのデイリータスクのAutomationツールとして使っています。デプロイスクリプトからMonitoringアラートまで。あるとき、本番システムに蓄積されたサーバーログ(約40GBのCSVファイル)を処理しようとして、pd.read_csv()を実行した瞬間にマシンがフリーズしました。16GBのRAMが数秒で消え去ったのです。そこからDaskを真剣に調べ始めました。

DaskはPython向けの並列計算ライブラリで、データを小さなchunksに分割し、lazy評価(結果が実際に必要になったときだけ実行)によってRAMを超える大規模データを処理できるよう設計されています。最大の利点は、DaskのAPIがpandasやNumPyとほぼ同一なため、ゼロから学び直す必要がないことです。

Daskをいつ使うべきか?

インストール前に知っておくべきこと:Daskはシルバーバレットではありません。次のような場合に適しています:

  • データセットがRAMより大きい(または通常のpandasで50%以上のRAMを占有する場合)
  • マルチコアCPUを活かして並列処理したい場合
  • 既存のワークフローでpandas/NumPyを使っており、最小限のリファクタリングでスケールアップしたい場合
  • 多数の小さなファイルをバッチ処理したい場合(ログファイル、センサーデータなど)

逆に、データセットがRAMに収まる場合は、通常のpandasの方が速いです。DaskにはTask Schedulingのオーバーヘッドがあるためです。過剰設計は禁物です。

Daskのインストール

基本的なインストール:

# コアDaskをインストール
pip install dask

# Distributed SchedulerとDashboard込みでフルインストール
pip install "dask[complete]"

# または必要なものだけ
pip install "dask[dataframe]"   # dask.dataframe
pip install "dask[array]"       # dask.array
pip install "dask[distributed]" # distributed scheduler + dashboard

インストール確認:

import dask
print(dask.__version__)  # 2024.x.x

詳細な設定と使い方

Dask DataFrame — 大きなファイルへのpandas代替

これが最もよく使う機能です。APIがpandasにかなり近く、既存のコードの大部分をほぼコピペで流用できます:

import dask.dataframe as dd

# 大きなCSVファイルを読み込む — Daskが複数のpartitionに分割
df = dd.read_csv('server_logs_*.csv')  # 複数ファイルを読み込むglobパターン

# pandasでおなじみの操作
print(df.dtypes)          # すぐ確認できる — データ読み込み不要なメタデータ
print(df.columns.tolist()) # カラム名を確認

# FilterとTransform — LAZY: まだ何も計算していない
filtered = df[df['status_code'] >= 500]
grouped = filtered.groupby('endpoint')['response_time'].mean()

# .compute() — ここで初めて実際の計算が行われる
result = grouped.compute()
print(result)

重要なTips:.compute()はパイプラインの最後に1回だけ呼び出し、途中で何度も呼ばないこと — 毎回呼び出すたびにデータを再読み込みします。

# ❌ 悪い例 — データを2回読み込む
count = df.shape[0].compute()
avg = df['response_time'].mean().compute()

# ✅ 良い例 — dask.compute()を使って同時に計算する
import dask
count, avg = dask.compute(df.shape[0], df['response_time'].mean())

Partition数の制御

PartitionはDaskの並列処理の単位です。少なすぎるとCPUを活かせず、多すぎるとTask Schedulingのオーバーヘッドが増大します。

# 現在のpartition数を確認
print(df.npartitions)  # 通常DaskはファイルサイズをもとにPartitionを自動分割

# 必要に応じてRepartition — 例:グループ化前に負荷を均等化
df_balanced = df.repartition(npartitions=20)

# 目安:partition size は100MB〜1GBが適切
# 小さすぎる(< 10MB):オーバーヘッドが多すぎる
# 大きすぎる(> 2GB):WorkerメモリにFitしない

Dask Array — 大規模データ向けNumPy

機械学習や大きな行列処理をする場合:

import dask.array as da
import numpy as np

# 大きなArrayを作成 — 実際はメモリを確保しないLazy Array
x = da.random.random((100_000, 1000), chunks=(1000, 1000))

# 通常のNumPy演算
result = (x ** 2).mean(axis=0)

# 実行
output = result.compute()
print(output.shape)  # (1000,)

# .npyファイルまたはHDF5から読み込む
arr = da.from_array(np.load('large_matrix.npy', mmap_mode='r'), chunks=10000)

Distributed Schedulerの設定

DaskのデフォルトschedulerはローカルのThread/Processを使用します。Dashboardによる監視やより細かい制御のためには、distributedを使います:

from dask.distributed import Client

# ローカルClusterを起動 — CPUコア数を自動検出
client = Client()
print(client)  # Cluster情報 + Dashboard URLを出力

# WorkerとMemoryをカスタマイズ
client = Client(
    n_workers=4,           # Workerプロセス数
    threads_per_worker=2,  # Worker当たりのThread数
    memory_limit='4GB'     # Worker当たりの最大RAM
)

# Cluster情報を確認
print(client.scheduler_info())

実践からのTips:サーバーで夜間データパイプラインを処理するとき、OOM Killerが処理中に割り込まないよう、memory_limit実際のRAMより少し低めに設定するようにしています。

フォーマット別の効率的なファイル読み込み

import dask.dataframe as dd

# CSV — 最も一般的
df = dd.read_csv('data/*.csv', dtype={'id': 'int32', 'value': 'float32'})

# Parquet — Big Dataに最適なフォーマット(カラム型、圧縮済み)
df = dd.read_parquet('data/logs.parquet')

# CSVをParquetに変換して次回の読み込みを高速化
df.to_parquet('data/logs_parquet/', write_index=False)

# JSON Lines
df = dd.read_json('events/*.jsonl', lines=True)

大規模データセットを頻繁に扱う場合は、早めにParquetに変換することをおすすめします。この変換だけで、ロード時間が4分から20秒に短縮できた経験があります。

パフォーマンス確認とMonitoring

Dask Dashboard — 欠かせないツール

Client()を起動すると、Daskは自動的にhttp://localhost:8787でDashboardを開きます。Dashboardで確認できること:

  • Task Stream:タイムライン上で実行中の各Taskを、種類別に色分けして表示
  • Progress:現在の計算の完了率
  • Memory:各WorkerのリアルタイムRAM使用量
  • Workers:各WorkerのStatus、CPU、Memory
from dask.distributed import Client

client = Client()

# Dashboard URLをコンソールに出力
print(f"Dashboard: {client.dashboard_link}")

# またはPythonから直接開く(Jupyter使用時)
client  # クリック可能なリンク付きWidgetを表示

ProfilingとDebug

import dask

# Task Graphを可視化 — 実行前にDaskが何を計算するか確認
# graphvizが必要:pip install graphviz
df_filtered = df[df['status'] == 'error'].groupby('host').count()
df_filtered.visualize(filename='task_graph.png')

# performance_reportを使ってHTMLレポートを出力
from dask.distributed import performance_report

with performance_report(filename='dask_report.html'):
    result = df.groupby('endpoint')['latency'].agg(['mean', 'max']).compute()

# 実行後に確認
print("Report saved to dask_report.html")

Memoryエラーとspill to diskの処理

WorkerのMemoryが不足したとき、Daskはクラッシュする代わりにディスクにデータをSpillできます:

from dask.distributed import Client

client = Client(
    n_workers=2,
    memory_limit='8GB',
    # Memoryが閾値を超えたらDiskにSpill
    # Dask Configで設定
)

# またはグローバル設定
import dask
dask.config.set({
    'distributed.worker.memory.target': 0.6,   # Memory > 60%でSpill開始
    'distributed.worker.memory.spill': 0.7,    # Memory > 70%でアグレッシブにSpill
    'distributed.worker.memory.pause': 0.8,    # Memory > 80%で新規Task受付を停止
    'distributed.worker.memory.terminate': 0.95 # Memory > 95%でWorkerをKill
})

避けるべきAnti-pattern

# ❌ 複雑なLambdaを使ったapply()を避ける — Serialize/Deserializeで遅い
df['new_col'] = df['text'].apply(lambda x: complex_processing(x), meta=('new_col', 'str'))

# ✅ Vectorized OperationsでMap_Partitionsを使う
def process_partition(partition):
    partition['new_col'] = partition['text'].str.lower().str.strip()
    return partition

df = df.map_partitions(process_partition)

# ❌ iterrows()を避ける — 並列処理不可
for idx, row in df.iterrows():  # Daskでは非常に遅い
    pass

# ✅ Vectorized Operationsを使う
df['flagged'] = df['latency'] > 1000

Daskと実践的なワークフローの組み合わせ

システムログを処理するためによく使う完全なパイプラインはこちらです:

from dask.distributed import Client
import dask.dataframe as dd

def process_logs(log_dir: str, output_path: str):
    client = Client(n_workers=4, memory_limit='4GB')
    print(f"Dashboard: {client.dashboard_link}")

    # すべてのログファイルを読み込む
    df = dd.read_csv(
        f'{log_dir}/*.csv',
        dtype={'status_code': 'int16', 'latency_ms': 'float32'},
        parse_dates=['timestamp']
    )

    # 処理パイプライン
    result = (
        df
        .assign(is_error=df['status_code'] >= 500)
        .assign(hour=df['timestamp'].dt.hour)
        .groupby(['hour', 'endpoint'])
        .agg({'latency_ms': 'mean', 'is_error': 'sum', 'status_code': 'count'})
        .rename(columns={'status_code': 'total_requests'})
    )

    # 結果を保存
    result.to_parquet(output_path)
    print("Done!")
    client.close()

process_logs('/var/log/nginx', './output/daily_stats')

このスクリプトを40GBのログで実行したとき、16GB RAMのマシンで約8分で処理完了しました。純粋なpandasなら最初からクラッシュするところです。Dashboardでは4コアCPUがプロセス全体を通じてほぼ100%稼働し、OOMになったWorkerはゼロでした。

Daskは魔法ではありません。リソースは必要ですし、オーバーヘッドもあります。しかし、RAMを超えるデータの処理という課題に対しては、Pythonエコシステムの中で現時点最も実用的なツールです。特にチームがpandasに慣れている場合はなおさらです。

Share: