aiohttp: Bí kíp ‘x10’ tốc độ Crawl dữ liệu và Gọi API với Python

Python tutorial - IT technology blog
Python tutorial - IT technology blog

Tại sao bạn nên ngừng dùng Requests cho các tác vụ lớn?

Nếu làm việc với Python, chắc hẳn requests là người bạn thân thiết nhất của bạn khi cần gọi API. Nó đơn giản và cực kỳ ổn định. Tuy nhiên, rắc rối sẽ ập đến khi bạn cần xử lý khối lượng công việc khổng lồ, chẳng hạn như cào dữ liệu của 5.000 sản phẩm trên sàn TMĐT hoặc check lỗi đồng loạt 10.000 liên kết.

Vấn đề nằm ở chỗ requests hoạt động theo cơ chế synchronous (đồng bộ). Khi gửi một yêu cầu, toàn bộ chương trình sẽ “đứng hình” để chờ phản hồi từ server. Giả sử mỗi request mất 0.5 giây, việc xử lý 1.000 trang web sẽ ngốn của bạn hơn 8 phút. Trong môi trường production, đây là một sự lãng phí tài nguyên không thể chấp nhận được.

Sau khi tối ưu hệ thống thu thập tin tức cho một dự án thực tế, mình đã chuyển hoàn toàn sang aiohttp. Kết quả thật kinh ngạc: thời gian thực thi giảm từ 15 phút xuống còn chưa đầy 40 giây. Nhờ tận dụng sức mạnh của asyncio, aiohttp giúp bạn gửi hàng loạt request mà không cần chờ đợi cái trước kết thúc mới làm cái sau.

Bắt đầu nhanh trong 5 phút

Để bắt đầu, hãy cài đặt thư viện cùng với trình phân giải DNS tốc độ cao:

pip install aiohttp aiodns

Dưới đây là cách triển khai một hàm lấy dữ liệu cơ bản theo phong cách bất đồng bộ:

import aiohttp
import asyncio

async def fetch_status(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            status = response.status
            print(f"Trạng thái: {status} từ {url}")
            return await response.text()

if __name__ == "__main__":
    # Chạy hàm async trong event loop
    asyncio.run(fetch_status('https://google.com'))

Lưu ý nhỏ: Bạn bắt buộc phải dùng cặp từ khóa asyncawait. Đây là tín hiệu báo cho Python biết: “Trong lúc chờ mạng phản hồi, hãy đi làm việc khác, đừng ngồi chơi xào nước!”.

Bí mật nằm ở ClientSession

Nhiều bạn mới thường mắc lỗi tạo một ClientSession mới cho mỗi lần gọi hàm. Trong thực tế, đây là cách nhanh nhất để làm tràn bộ nhớ và gây lỗi “socket leak”.

Hãy coi ClientSession như một đường ống dẫn nước xuyên suốt. Bạn nên mở nó một lần và dùng chung cho tất cả các request để tận dụng connection pooling. Việc giữ lại các kết nối TCP đã thiết lập giúp tiết kiệm tới 30% thời gian xử lý vì không phải thực hiện lại quá trình bắt tay (handshake) liên tục.

Cách xử lý hàng loạt API cùng lúc

Đây là lúc aiohttp thực sự phô diễn sức mạnh. Chúng ta sẽ gom các tác vụ vào một danh sách và kích hoạt chúng đồng thời bằng asyncio.gather.

import aiohttp
import asyncio
import time

async def get_data(session, url):
    async with session.get(url) as resp:
        return await resp.json()

async def main():
    urls = ['https://jsonplaceholder.typicode.com/posts/1'] * 50
    
    async with aiohttp.ClientSession() as session:
        tasks = [get_data(session, url) for url in urls]
        # Kích hoạt 50 request cùng một lúc
        results = await asyncio.gather(*tasks)
        print(f"Đã xử lý xong {len(results)} kết quả")

start = time.perf_counter()
asyncio.run(main())
print(f"Hoàn thành sau: {time.perf_counter() - start:.2f} giây")

Kiểm soát tốc độ để tránh bị khóa IP

Gửi quá nhiều yêu cầu trong một giây có thể khiến server coi bạn là kẻ tấn công DDoS. Để không bị “ăn gậy” vào IP, mình thường dùng asyncio.Semaphore để giới hạn số lượng request chạy song song.

Semaphore giống như một cảnh sát giao thông, chỉ cho phép một lượng xe nhất định đi qua hầm tại một thời điểm.

# Chỉ cho phép tối đa 10 request chạy đồng thời
limit = asyncio.Semaphore(10)

async def safe_fetch(session, url):
    async with limit:
        async with session.get(url) as response:
            return await response.read()

Trong quá trình bóc tách dữ liệu, nếu bạn gặp khó khăn với các chuỗi Regex phức tạp, hãy thử dùng công cụ test Regex tại toolcraft.app. Nó giúp bạn kiểm tra nhanh các pattern ngay trên trình duyệt, tránh việc phải sửa code và chạy lại script nhiều lần cực kỳ mất thời gian.

Xử lý lỗi và Timeout chuyên nghiệp

Mạng chập chờn là chuyện cơm bữa. Nếu không cài đặt timeout, script của bạn có thể bị treo vĩnh viễn.

# Cấu hình timeout tổng thể là 10 giây
timeout = aiohttp.ClientTimeout(total=10) 
async with aiohttp.ClientSession(timeout=timeout) as session:
    try:
        async with session.get(url) as resp:
            data = await resp.json()
    except asyncio.TimeoutError:
        print("Lỗi: Server phản hồi quá lâu!")
    except aiohttp.ClientError as e:
        print(f"Lỗi kết nối mạng: {e}")

Vài lưu ý nhỏ nhưng đáng giá

Sau nhiều năm vận hành các hệ thống cào dữ liệu lớn, mình rút ra 4 kinh nghiệm then chốt:

  1. Luôn dùng Context Manager: Hãy luôn dùng async with để đảm bảo tài nguyên được giải phóng ngay cả khi có lỗi xảy ra.
  2. Nâng cấp lên ujson: Nếu parse file JSON nặng, hãy cài ujson. Nó nhanh hơn thư viện mặc định khoảng 3-5 lần, giúp giảm tải đáng kể cho CPU.
  3. Tận dụng DNS Cache: Cài đặt aiodns giúp việc phân giải tên miền nhanh hơn, cực kỳ hữu ích khi bạn gọi đến hàng trăm domain khác nhau.
  4. Biết khi nào nên dừng: Nếu script chỉ gọi 1-2 API đơn giản, hãy dùng requests cho nhẹ người. Đừng phức tạp hóa vấn đề nếu không thực sự cần hiệu năng cao.

Lúc đầu, việc làm quen với tư duy bất đồng bộ có thể khiến bạn hơi rối. Tuy nhiên, một khi đã nắm vững aiohttp, bạn sẽ sở hữu một công cụ cực kỳ mạnh mẽ để xử lý dữ liệu ở quy mô lớn.

Share: