MilvusベクターデータベースをDockerでセットアップ:RAGとAIのナレッジストアを構築する

Database tutorial - IT technology blog
Database tutorial - IT technology blog

Docker ComposeでMilvusを5分で起動する

MilvusはRedisやPostgreSQLとは異なります。最初から一つのことに特化して設計されています:ベクターの保存と検索——AIが意味を「理解する」のに使うデータ型で、通常のキーワードマッチングとは全く異なります。技術文書50万件を処理するRAGプロジェクトにMilvusを採用する前に、いくつかのベクターDBを試しましたが、決め手はデータが大規模になったときのスケーラビリティでした。

docker-compose.ymlファイルを作成してすぐに起動しましょう:

version: '3.5'

services:
  etcd:
    container_name: milvus-etcd
    image: quay.io/coreos/etcd:v3.5.5
    environment:
      - ETCD_AUTO_COMPACTION_MODE=revision
      - ETCD_AUTO_COMPACTION_RETENTION=1000
      - ETCD_QUOTA_BACKEND_BYTES=4294967296
      - ETCD_SNAPSHOT_COUNT=50000
    volumes:
      - ./volumes/etcd:/etcd
    command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd

  minio:
    container_name: milvus-minio
    image: minio/minio:RELEASE.2023-03-13T19-46-17Z
    environment:
      MINIO_ACCESS_KEY: minioadmin
      MINIO_SECRET_KEY: minioadmin
    volumes:
      - ./volumes/minio:/minio_data
    command: minio server /minio_data --console-address ":9001"

  standalone:
    container_name: milvus-standalone
    image: milvusdb/milvus:v2.4.0
    command: ["milvus", "run", "standalone"]
    environment:
      ETCD_ENDPOINTS: etcd:2379
      MINIO_ADDRESS: minio:9000
    volumes:
      - ./volumes/milvus:/var/lib/milvus
    ports:
      - "19530:19530"
      - "9091:9091"
    depends_on:
      - etcd
      - minio
docker compose up -d

# ステータスを確認
docker compose ps

# エラーがある場合はログを確認
docker compose logs standalone --tail=50

Milvusが完全に起動するまで約30秒待ちます(etcdとMinIOが先に準備完了する必要があります)。その後、Python SDKをインストールします:

pip install pymilvus openai  # embeddingの生成にopenaiを使用

Milvusの動作原理を理解する

Milvusはデータを次のモデルで整理します:Database → Collection → Partition → EntityPostgreSQLと対比すると:Database → Table → (相当するものなし) → Rowとなります。核心的な違いは、各entityには少なくとも一つのvectorフィールドが必要という点です——これがMilvusが処理するために設計されたもので、必須です。

検索時は、WHERE content LIKE '%キーワード%'の代わりに、Milvusはベクター間の距離——コサイン類似度またはL2距離——を計算して、クエリと意味的に最も近いentityを見つけます。

最初のCollectionを作成する

from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility

# 接続
connections.connect("default", host="localhost", port="19530")

# スキーマを定義
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=500),
    FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=5000),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536),  # OpenAI text-embedding-3-small
]

schema = CollectionSchema(fields, description="RAG用のナレッジベース")
collection = Collection("knowledge_base", schema)

# vectorフィールドにインデックスを作成(検索前に必須)
index_params = {
    "metric_type": "COSINE",
    "index_type": "IVF_FLAT",
    "params": {"nlist": 1024},
}
collection.create_index("embedding", index_params)
print("Collectionの作成が完了しました:", collection.name)

MilvusにドキュメントをInsertする

from openai import OpenAI

client = OpenAI()  # 環境変数にOPENAI_API_KEYが必要

def get_embedding(text: str) -> list[float]:
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=text
    )
    return response.data[0].embedding

# データを準備
documents = [
    {"source": "linux_guide.md", "content": "chmodコマンドはLinuxでファイルのアクセス権限を変更します..."},
    {"source": "docker_tips.md", "content": "Docker volumeはcontainerのrestartでもデータを永続化します..."},
    {"source": "git_workflow.md", "content": "Git rebaseはmergeと比べてcommit履歴をすっきりさせます..."},
]

# embeddingを作成してinsert
for doc in documents:
    embedding = get_embedding(doc["content"])
    collection.insert({
        "source": [doc["source"]],
        "content": [doc["content"]],
        "embedding": [embedding],
    })

collection.flush()  # データがディスクに書き込まれることを保証
print(f"{collection.num_entities} entitiesをInsertしました")

完全なRAGパイプラインを構築する

これは私が本番環境で使用しているパターンです。MySQLからPostgreSQLへ100GBをマイグレーションした経験——計画3日、実行1日——から得た教訓は、最初にスキーマを丁寧に設計することで後々の多くの頭痛を避けられるということです。Milvusではさらにその通りで、dimを間違えるとcollectionをdropして最初からやり直しになります。マイグレーションパスはありません。

セマンティック検索関数

def semantic_search(query: str, top_k: int = 5) -> list[dict]:
    # 検索前にcollectionをメモリにLoad
    collection.load()
    
    query_embedding = get_embedding(query)
    
    search_params = {
        "metric_type": "COSINE",
        "params": {"nprobe": 10},  # 精度を上げるには増やし、速度を優先するには下げる
    }
    
    results = collection.search(
        data=[query_embedding],
        anns_field="embedding",
        param=search_params,
        limit=top_k,
        output_fields=["source", "content"],
    )
    
    hits = []
    for hit in results[0]:
        hits.append({
            "source": hit.entity.get("source"),
            "content": hit.entity.get("content"),
            "score": hit.score,
        })
    
    return hits

# すぐに試してみる
results = semantic_search("LinuxのFileパーミッションを変更する方法")
for r in results:
    print(f"[{r['score']:.3f}] {r['source']}: {r['content'][:80]}...")

LLMと連携して質問に回答する

def rag_answer(question: str) -> str:
    # ステップ1: 関連するcontextを検索
    relevant_docs = semantic_search(question, top_k=3)
    context = "\n\n".join([d["content"] for d in relevant_docs])
    
    # ステップ2: contextと一緒にLLMに送信
    prompt = f"""以下のドキュメントをもとに、質問に日本語で答えてください:

ドキュメント:
{context}

質問:{question}

回答:"""
    
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.3,
    )
    
    return response.choices[0].message.content

# テスト
answer = rag_answer("Linuxでファイルのパーミッションを変更するにはどうすればよいですか?")
print(answer)

本番環境向けにMilvusを最適化する

適切なインデックスタイプを選ぶ

これは多くの人が見落として、後でMilvusが遅いと不満を言うポイントです。各インデックスタイプにはトレードオフがあります:

  • IVF_FLAT: 速度と精度のバランスが良い——ほとんどのケースに対応、1000万ベクター以下のデータに適切
  • HNSW: クエリ時に高速だがRAM消費が大きい——低レイテンシが必要でRAMに余裕がある場合に使用
  • IVF_SQ8: ベクターを4倍圧縮しRAMを節約——データが大きくRAMが限られる場合に使用、精度約2%の低下を許容
  • FLAT: ブルートフォース、100%精度だが遅い——テスト用または100万ベクター以下の小規模データのみ
# HNSWインデックス——十分なRAMがあれば本番環境での使用を検討
hnsw_params = {
    "metric_type": "COSINE",
    "index_type": "HNSW",
    "params": {
        "M": 16,           # ノードごとの接続数、Mを増やすと精度向上・RAM消費増加
        "efConstruction": 200,  # インデックス構築時の精度
    },
}

# HNSWで検索する場合
hnsw_search_params = {
    "metric_type": "COSINE",
    "params": {"ef": 64},  # efを増やすと精度向上・若干速度低下
}

データ整理のためのPartition

ナレッジベースに複数のソース——ドキュメント、FAQ、コードスニペット——がある場合、partitionを使って検索を高速化しましょう:

# ドキュメントの種類ごとにpartitionを作成
collection.create_partition("docs")
collection.create_partition("faq")
collection.create_partition("code")

# 特定のpartitionにInsert
collection.insert(
    {"source": ["manual.pdf"], "content": ["..."], "embedding": [[...]]},
    partition_name="docs"
)

# docsパーティション内のみ検索——全体検索より高速
results = collection.search(
    data=[query_embedding],
    anns_field="embedding",
    param=search_params,
    limit=5,
    partition_names=["docs"],  # partitionでフィルタリング
    output_fields=["source", "content"],
)

ハイブリッド検索:ベクター+フィルターの組み合わせ

# セマンティック検索+ソースファイルでフィルタリング
results = collection.search(
    data=[query_embedding],
    anns_field="embedding",
    param=search_params,
    limit=5,
    expr='source like "linux%"',  # linux_*ファイルからのドキュメントのみ取得
    output_fields=["source", "content"],
)

Milvus活用の実践的なTips

Chunkingストラテジーは思っている以上に重要

さまざまなチャンキング方法を試した結果、一つのことが明確になりました:100トークン未満のチャンクはコンテキストが失われ、1000トークンを超えるとベクターembeddingが「希薄」になります——何でも少しずつマッチして、本当に際立つものがなくなります。最適な範囲は300〜500トークンで、チャンクの境界で文脈が途切れないよう50〜100トークンのオーバーラップを設けます。

def chunk_text(text: str, chunk_size: int = 400, overlap: int = 80) -> list[str]:
    words = text.split()
    chunks = []
    start = 0
    while start < len(words):
        end = start + chunk_size
        chunk = " ".join(words[start:end])
        chunks.append(chunk)
        start += chunk_size - overlap  # 境界でcontextが失われないようにOverlap
    return chunks

MilvusデータのBackup

# 全volumes(etcd + minio)をBackup
docker compose stop
tar -czf milvus-backup-$(date +%Y%m%d).tar.gz ./volumes/
docker compose start

# または公式のMilvus Backupツールを使用
pip install pymilvus[bulk_writer]
# https://github.com/zilliztech/milvus-backup

Milvus Web UIでモニタリング

AttuはMilvusの公式Web UI——コレクションの統計確認、直接クエリ実行、インデックスステータスの追跡がコマンドなしで簡単にできます:

docker run -d \
  --name attu \
  -p 3000:3000 \
  -e MILVUS_URL=localhost:19530 \
  zilliz/attu:v2.4

# アクセス:http://localhost:3000

他のソリューションではなくMilvusを選ぶタイミング

これはよく聞かれる質問です。簡潔に言えば:ベクターデータが100万レコードを超えて分散スケーリングが必要な場合、またはチームがKubernetesエコシステムに慣れている場合にMilvusを選びましょう。データが小規模でシングルノードで動かす場合は、より軽量な選択肢が適しています。Milvusが真価を発揮するのはエンタープライズスケール——数十億のベクター、マルチテナント、高可用性——です。

Share: