Dask Python: Xử lý Big Data vượt RAM với tính toán song song — Tips từ thực chiến

Python tutorial - IT technology blog
Python tutorial - IT technology blog

Khi pandas không đủ sức — câu chuyện thực tế

Mình hay dùng Python làm automation tool cho hầu hết task hàng ngày, từ deploy script đến monitoring alert. Một lần xử lý log server tích lũy từ hệ thống production — file CSV gần 40GB — chạy pd.read_csv() xong máy treo luôn. RAM 16GB bốc hơi trong vài giây. Đó là lúc mình bắt đầu nghiêm túc tìm hiểu Dask.

Dask là thư viện tính toán song song cho Python, thiết kế để xử lý dữ liệu lớn hơn RAM bằng cách chia nhỏ dữ liệu thành các chunks và tính toán lazy (chỉ thực thi khi thực sự cần kết quả). Điểm hay nhất: API của Dask gần như giống hệt pandas và NumPy, nên không phải học lại từ đầu.

Khi nào nên dùng Dask?

Trước khi cài, cần biết Dask không phải silver bullet. Nó phù hợp khi:

  • Dataset lớn hơn RAM (hoặc chiếm hơn 50% RAM với pandas thông thường)
  • Cần tận dụng multi-core CPU cho xử lý song song
  • Workflow hiện tại dùng pandas/NumPy và muốn scale up mà ít refactor nhất
  • Xử lý nhiều file nhỏ theo batch (log files, sensor data…)

Ngược lại, nếu dataset vừa đủ RAM, pandas thông thường vẫn nhanh hơn vì Dask có overhead từ task scheduling. Đừng over-engineer.

Cài đặt Dask

Cài đặt cơ bản:

# Cài core Dask
pip install dask

# Cài đầy đủ với distributed scheduler và dashboard
pip install "dask[complete]"

# Hoặc chỉ những gì cần thiết
pip install "dask[dataframe]"   # dask.dataframe
pip install "dask[array]"       # dask.array
pip install "dask[distributed]" # distributed scheduler + dashboard

Kiểm tra cài đặt:

import dask
print(dask.__version__)  # 2024.x.x

Cấu hình và sử dụng chi tiết

Dask DataFrame — thay thế pandas cho file lớn

Đây là tính năng mình dùng nhiều nhất. API gần giống pandas đến mức copy paste được phần lớn code cũ:

import dask.dataframe as dd

# Đọc file CSV lớn — Dask chia thành nhiều partitions
df = dd.read_csv('server_logs_*.csv')  # glob pattern để đọc nhiều file

# Các thao tác quen thuộc với pandas
print(df.dtypes)          # Xem ngay — metadata không cần load dữ liệu
print(df.columns.tolist()) # Xem tên cột

# Filter và transform — LAZY: chưa tính toán gì cả
filtered = df[df['status_code'] >= 500]
grouped = filtered.groupby('endpoint')['response_time'].mean()

# .compute() — TẠI ĐÂY mới thực sự tính toán
result = grouped.compute()
print(result)

Mẹo quan trọng: chỉ gọi .compute() một lần ở cuối pipeline, đừng gọi giữa chừng nhiều lần — mỗi lần gọi là một lần đọc lại dữ liệu.

# ❌ Tệ — đọc dữ liệu 2 lần
count = df.shape[0].compute()
avg = df['response_time'].mean().compute()

# ✅ Tốt — dùng dask.compute() để tính cùng lúc
import dask
count, avg = dask.compute(df.shape[0], df['response_time'].mean())

Kiểm soát số lượng partitions

Partition là đơn vị xử lý song song của Dask. Quá ít partition → không tận dụng được CPU. Quá nhiều → overhead task scheduling tăng cao.

# Xem số partition hiện tại
print(df.npartitions)  # Thường Dask tự chia dựa theo kích thước file

# Repartition khi cần — ví dụ trước khi group by để balance tải
df_balanced = df.repartition(npartitions=20)

# Rule of thumb: partition size ~100MB-1GB là hợp lý
# Quá nhỏ (< 10MB): quá nhiều overhead
# Quá lớn (> 2GB): không fit vào worker memory

Dask Array — NumPy cho dữ liệu lớn

Nếu làm machine learning hoặc xử lý ma trận lớn:

import dask.array as da
import numpy as np

# Tạo array lớn — thực ra là lazy array chưa allocate memory
x = da.random.random((100_000, 1000), chunks=(1000, 1000))

# Các phép tính NumPy thông thường
result = (x ** 2).mean(axis=0)

# Thực thi
output = result.compute()
print(output.shape)  # (1000,)

# Đọc từ file .npy hoặc HDF5
arr = da.from_array(np.load('large_matrix.npy', mmap_mode='r'), chunks=10000)

Cấu hình Distributed Scheduler

Scheduler mặc định của Dask dùng thread/process local. Để có dashboard monitor và kiểm soát tốt hơn, dùng distributed:

from dask.distributed import Client

# Khởi động local cluster — tự detect số CPU cores
client = Client()
print(client)  # In thông tin cluster + URL dashboard

# Tùy chỉnh số workers và memory
client = Client(
    n_workers=4,           # Số worker processes
    threads_per_worker=2,  # Thread per worker
    memory_limit='4GB'     # RAM tối đa mỗi worker
)

# Xem thông tin cluster
print(client.scheduler_info())

Một tip từ thực chiến: khi xử lý data pipeline ban đêm trên server, mình thường set memory_limit thấp hơn RAM thực tế một chút để tránh OOM killer can thiệp vào giữa chừng.

Đọc file hiệu quả theo định dạng

import dask.dataframe as dd

# CSV — phổ biến nhất
df = dd.read_csv('data/*.csv', dtype={'id': 'int32', 'value': 'float32'})

# Parquet — format tốt nhất cho big data (columnar, compressed)
df = dd.read_parquet('data/logs.parquet')

# Chuyển CSV sang Parquet để lần sau đọc nhanh hơn
df.to_parquet('data/logs_parquet/', write_index=False)

# JSON Lines
df = dd.read_json('events/*.jsonl', lines=True)

Nếu thường xuyên làm việc với dataset lớn, hãy convert sang Parquet từ sớm. Mình từng giảm thời gian load từ 4 phút xuống còn 20 giây chỉ bằng bước convert này.

Kiểm tra hiệu năng và Monitoring

Dashboard Dask — công cụ không thể bỏ qua

Sau khi khởi động Client(), Dask tự mở dashboard tại http://localhost:8787. Dashboard cho thấy:

  • Task Stream: Từng task đang chạy theo timeline, màu sắc theo loại task
  • Progress: % hoàn thành của computation hiện tại
  • Memory: RAM usage của từng worker theo thời gian thực
  • Workers: Status, CPU, memory của mỗi worker
from dask.distributed import Client

client = Client()

# In URL dashboard ra console
print(f"Dashboard: {client.dashboard_link}")

# Hoặc mở thẳng từ Python (nếu dùng Jupyter)
client  # Hiển thị widget với link clickable

Profiling và debug

import dask

# Visualize task graph — xem Dask sẽ tính gì trước khi chạy
# Cần cài graphviz: pip install graphviz
df_filtered = df[df['status'] == 'error'].groupby('host').count()
df_filtered.visualize(filename='task_graph.png')

# Dùng with performance_report để xuất báo cáo HTML
from dask.distributed import performance_report

with performance_report(filename='dask_report.html'):
    result = df.groupby('endpoint')['latency'].agg(['mean', 'max']).compute()

# Xem sau khi chạy xong
print("Report saved to dask_report.html")

Xử lý lỗi memory và spill to disk

Khi worker hết memory, Dask có thể spill data xuống disk thay vì crash:

from dask.distributed import Client

client = Client(
    n_workers=2,
    memory_limit='8GB',
    # Spill to disk khi memory vượt ngưỡng
    # Cấu hình trong dask config
)

# Hoặc set global config
import dask
dask.config.set({
    'distributed.worker.memory.target': 0.6,   # Bắt đầu spill khi > 60% memory
    'distributed.worker.memory.spill': 0.7,    # Spill aggressively khi > 70%
    'distributed.worker.memory.pause': 0.8,    # Pause nhận task mới khi > 80%
    'distributed.worker.memory.terminate': 0.95 # Kill worker khi > 95%
})

Một số anti-pattern cần tránh

# ❌ Tránh apply() với lambda phức tạp — chậm vì serialize/deserialize
df['new_col'] = df['text'].apply(lambda x: complex_processing(x), meta=('new_col', 'str'))

# ✅ Dùng map_partitions với vectorized operations
def process_partition(partition):
    partition['new_col'] = partition['text'].str.lower().str.strip()
    return partition

df = df.map_partitions(process_partition)

# ❌ Tránh iterrows() — không song song được
for idx, row in df.iterrows():  # Cực kỳ chậm với Dask
    pass

# ✅ Dùng vectorized operations
df['flagged'] = df['latency'] > 1000

Kết hợp Dask với workflow thực tế

Một pipeline hoàn chỉnh mình hay dùng để xử lý log hệ thống:

from dask.distributed import Client
import dask.dataframe as dd

def process_logs(log_dir: str, output_path: str):
    client = Client(n_workers=4, memory_limit='4GB')
    print(f"Dashboard: {client.dashboard_link}")

    # Đọc toàn bộ log files
    df = dd.read_csv(
        f'{log_dir}/*.csv',
        dtype={'status_code': 'int16', 'latency_ms': 'float32'},
        parse_dates=['timestamp']
    )

    # Pipeline xử lý
    result = (
        df
        .assign(is_error=df['status_code'] >= 500)
        .assign(hour=df['timestamp'].dt.hour)
        .groupby(['hour', 'endpoint'])
        .agg({'latency_ms': 'mean', 'is_error': 'sum', 'status_code': 'count'})
        .rename(columns={'status_code': 'total_requests'})
    )

    # Lưu kết quả
    result.to_parquet(output_path)
    print("Done!")
    client.close()

process_logs('/var/log/nginx', './output/daily_stats')

Chạy script này với 40GB log, máy 16GB RAM xử lý xong trong khoảng 8 phút — thay vì crash ngay từ đầu như pandas thuần. Dashboard cho thấy CPU 4 cores chạy gần 100% suốt quá trình, không có worker nào bị OOM.

Dask không thần kỳ — nó vẫn cần tài nguyên, vẫn có overhead. Nhưng với bài toán data vượt RAM, đây là công cụ pragmatic nhất hiện tại trong Python ecosystem, đặc biệt khi team đã quen với pandas.

Share: