Skip to main content
Milvus is an open-source vector database designed for building high-performance Retrieval-Augmented Generation (RAG) applications. By combining Milvus with Cerebras’s ultra-fast inference, you can create scalable RAG pipelines that efficiently store, search, and retrieve embeddings for your AI applications.

Prerequisites

Before you begin, ensure you have:
  • Cerebras API Key - Get a free API key here
  • Milvus Instance - Either install Milvus locally or use Zilliz Cloud (managed Milvus)
  • Python 3.8 or higher
  • Basic understanding of vector databases and RAG concepts

Configure Milvus with Cerebras

1

Create a virtual environment

First, create and activate a virtual environment to keep your dependencies isolated:
python -m venv .venv
source .venv/bin/activate  # On Windows: .venv\Scripts\activate
2

Install required dependencies

Install the necessary packages for working with Milvus and Cerebras:
pip install openai pymilvus
These packages provide:
  • openai - For connecting to Cerebras API (OpenAI-compatible)
  • pymilvus - Python SDK for Milvus vector database
3

Configure environment variables

Create a .env file in your project directory to store your API credentials:
CEREBRAS_API_KEY=your-cerebras-api-key-here
MILVUS_URI=http://localhost:19530  # Or your Zilliz Cloud URI
MILVUS_TOKEN=your-milvus-token  # Only needed for Zilliz Cloud
If you’re using Milvus locally with Docker, the default URI is http://localhost:19530. For Zilliz Cloud, you’ll receive a URI and token when you create your cluster.
4

Initialize the Cerebras client

Set up the Cerebras client using the OpenAI SDK. This client will be used for generating chat completions in your RAG pipeline:
import os
from openai import OpenAI

client = OpenAI(
    api_key=os.getenv("CEREBRAS_API_KEY"),
    base_url="https://api.cerebras.ai/v1"
)
This client will handle all interactions with Cerebras models for generating responses based on retrieved context.
5

Connect to Milvus

Establish a connection to your Milvus instance to begin storing and retrieving vectors:
import os
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType

# Connect to Milvus
connections.connect(
    alias="default",
    uri=os.getenv("MILVUS_URI", "http://localhost:19530"),
    token=os.getenv("MILVUS_TOKEN", "")  # Empty string for local Milvus
)

print("Successfully connected to Milvus!")
6

Create a collection for your embeddings

Define and create a Milvus collection to store your document embeddings. This example uses a 1024-dimensional embedding space suitable for modern embedding models:
import os
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection

# Connect to Milvus
connections.connect(
    alias="default",
    uri=os.getenv("MILVUS_URI", "http://localhost:19530"),
    token=os.getenv("MILVUS_TOKEN", "")  # Empty string for local Milvus
)

# Define the schema for your collection
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1024)
]

schema = CollectionSchema(fields=fields, description="Document embeddings for RAG")

# Create the collection
collection_name = "cerebras_documents"
collection = Collection(name=collection_name, schema=schema)

print(f"Collection '{collection_name}' created successfully!")
The embedding dimension (1024) should match the output dimension of your embedding model. Common dimensions are 768, 1024, or 1536 depending on your chosen embedding provider.
7

Generate and store embeddings

Create embeddings for your documents using your preferred embedding provider and store them in Milvus. This example shows the structure for inserting documents:
import os
import numpy as np
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType

# Connect to Milvus
connections.connect(
    alias="default",
    uri=os.getenv("MILVUS_URI", "http://localhost:19530"),
    token=os.getenv("MILVUS_TOKEN", "")  # Empty string for local Milvus
)

# Define the schema for your collection
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1024)
]

schema = CollectionSchema(fields=fields, description="Document embeddings for RAG")

# Create the collection
collection_name = "cerebras_documents"
collection = Collection(name=collection_name, schema=schema)

def get_embedding(text):
    """
    Generate embedding for text using your embedding provider.
    Replace this with your actual embedding generation logic.
    Example providers: OpenAI, Cohere, Voyage AI, or local models.
    """
    # Placeholder: Replace with actual embedding API call
    # For example, using OpenAI embeddings:
    # response = embedding_client.embeddings.create(
    #     model="text-embedding-3-large",
    #     input=text
    # )
    # return response.data[0].embedding
    
    # Placeholder random embedding for demonstration
    return np.random.rand(1024).tolist()

# Sample documents
documents = [
    "Cerebras provides ultra-fast inference for AI applications with industry-leading speed.",
    "Milvus is an open-source vector database designed for scalable RAG systems.",
    "RAG combines retrieval with generation for more accurate and contextual AI responses.",
    "Cerebras Cloud offers models like Llama 3.3 70B with exceptional performance."
]

# Generate embeddings and prepare data
data = []
for doc in documents:
    embedding = get_embedding(doc)
    data.append({"text": doc, "embedding": embedding})

# Insert data into Milvus
collection.insert(data)
collection.flush()

print(f"Inserted {len(documents)} documents into Milvus!")
8

Create an index for fast retrieval

Create an index on the embedding field to enable fast similarity search. The IVF_FLAT index provides a good balance of speed and accuracy:
import os
from pymilvus import connections, Collection

# Connect to Milvus
connections.connect(
    alias="default",
    uri=os.getenv("MILVUS_URI", "http://localhost:19530"),
    token=os.getenv("MILVUS_TOKEN", "")  # Empty string for local Milvus
)

# Get the existing collection
collection_name = "cerebras_documents"
collection = Collection(name=collection_name)

# Create an IVF_FLAT index for similarity search
index_params = {
    "index_type": "IVF_FLAT",
    "metric_type": "L2",
    "params": {"nlist": 128}
}

collection.create_index(
    field_name="embedding",
    index_params=index_params
)

# Load the collection into memory for searching
collection.load()

print("Index created and collection loaded!")
For production use, consider HNSW index for better performance with larger datasets. Adjust nlist based on your data size: use higher values (1024-4096) for millions of vectors.
9

Build a RAG query pipeline

Now create a complete RAG pipeline that retrieves relevant documents from Milvus and generates responses using Cerebras:
import os
import numpy as np
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
from openai import OpenAI

# Connect to Milvus
connections.connect(
    alias="default",
    uri=os.getenv("MILVUS_URI", "http://localhost:19530"),
    token=os.getenv("MILVUS_TOKEN", "")  # Empty string for local Milvus
)

# Initialize Cerebras client
client = OpenAI(
    api_key=os.getenv("CEREBRAS_API_KEY"),
    base_url="https://api.cerebras.ai/v1"
)

# Define the schema for your collection
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1024)
]

schema = CollectionSchema(fields=fields, description="Document embeddings for RAG")

# Create the collection
collection_name = "cerebras_documents"
collection = Collection(name=collection_name, schema=schema)

def get_embedding(text):
    """
    Generate embedding for text using your embedding provider.
    Replace this with your actual embedding generation logic.
    Example providers: OpenAI, Cohere, Voyage AI, or local models.
    """
    # Placeholder: Replace with actual embedding API call
    # For example, using OpenAI embeddings:
    # response = embedding_client.embeddings.create(
    #     model="text-embedding-3-large",
    #     input=text
    # )
    # return response.data[0].embedding
    
    # Placeholder random embedding for demonstration
    return np.random.rand(1024).tolist()

def rag_query(question, top_k=3):
    """
    Perform RAG query: retrieve relevant documents and generate response.
    
    Args:
        question: User's question
        top_k: Number of documents to retrieve
    
    Returns:
        Dictionary with answer and retrieved documents
    """
    # Generate embedding for the question
    question_embedding = get_embedding(question)
    
    # Search for similar documents in Milvus
    search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
    results = collection.search(
        data=[question_embedding],
        anns_field="embedding",
        param=search_params,
        limit=top_k,
        output_fields=["text"]
    )
    
    # Extract retrieved documents
    retrieved_docs = [hit.entity.get("text") for hit in results[0]]
    context = "\n\n".join(retrieved_docs)
    
    # Generate response using Cerebras
    response = client.chat.completions.create(
        model="llama-3.3-70b",
        messages=[
            {
                "role": "system",
                "content": "You are a helpful assistant. Answer questions based on the provided context. If the context doesn't contain relevant information, say so."
            },
            {
                "role": "user",
                "content": f"Context:\n{context}\n\nQuestion: {question}"
            }
        ],
        temperature=0.7,
        max_completion_tokens=500
    )
    
    return {
        "answer": response.choices[0].message.content,
        "retrieved_docs": retrieved_docs
    }

# Example query
result = rag_query("What is Cerebras used for?")
print("Answer:", result["answer"])
print("\nRetrieved documents:")
for i, doc in enumerate(result["retrieved_docs"], 1):
    print(f"{i}. {doc}")
This pipeline retrieves the most relevant documents from Milvus based on semantic similarity, then uses Cerebras’s fast inference to generate a contextually-aware response.
10

Stream responses for better UX

For a better user experience, enable streaming to display responses as they’re generated:
import os
import numpy as np
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
from openai import OpenAI

# Connect to Milvus
connections.connect(
    alias="default",
    uri=os.getenv("MILVUS_URI", "http://localhost:19530"),
    token=os.getenv("MILVUS_TOKEN", "")  # Empty string for local Milvus
)

# Initialize Cerebras client
client = OpenAI(
    api_key=os.getenv("CEREBRAS_API_KEY"),
    base_url="https://api.cerebras.ai/v1"
)

# Define the schema for your collection
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1024)
]

schema = CollectionSchema(fields=fields, description="Document embeddings for RAG")

# Create the collection
collection_name = "cerebras_documents"
collection = Collection(name=collection_name, schema=schema)

def get_embedding(text):
    """
    Generate embedding for text using your embedding provider.
    Replace this with your actual embedding generation logic.
    Example providers: OpenAI, Cohere, Voyage AI, or local models.
    """
    # Placeholder: Replace with actual embedding API call
    # For example, using OpenAI embeddings:
    # response = embedding_client.embeddings.create(
    #     model="text-embedding-3-large",
    #     input=text
    # )
    # return response.data[0].embedding
    
    # Placeholder random embedding for demonstration
    return np.random.rand(1024).tolist()

def rag_query_streaming(question, top_k=3):
    """
    Perform RAG query with streaming response.
    """
    # Retrieve context (same as before)
    question_embedding = get_embedding(question)
    search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
    results = collection.search(
        data=[question_embedding],
        anns_field="embedding",
        param=search_params,
        limit=top_k,
        output_fields=["text"]
    )
    
    retrieved_docs = [hit.entity.get("text") for hit in results[0]]
    context = "\n\n".join(retrieved_docs)
    
    # Stream response from Cerebras
    stream = client.chat.completions.create(
        model="llama-3.3-70b",
        messages=[
            {
                "role": "system",
                "content": "You are a helpful assistant. Answer questions based on the provided context."
            },
            {
                "role": "user",
                "content": f"Context:\n{context}\n\nQuestion: {question}"
            }
        ],
        temperature=0.7,
        max_completion_tokens=500,
        stream=True
    )
    
    print("Answer: ", end="")
    for chunk in stream:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="", flush=True)
    print()  # New line after streaming

# Example streaming query
rag_query_streaming("What is Cerebras used for?")

Next Steps

  • Migrate to GLM4.6: Ready to upgrade? Follow our migration guide to start using our latest model
  • Explore Milvus documentation for advanced features like hybrid search and filtering
  • Try different Cerebras models to optimize for your use case
  • Learn about Milvus indexing strategies for better performance
  • Check out Zilliz Cloud for a fully managed Milvus experience
  • Experiment with different embedding models and dimensions for your specific domain
  • Implement hybrid search combining vector and scalar filtering

Troubleshooting

If you’re running Milvus locally with Docker, ensure the container is running:
docker ps | grep milvus
If it’s not running, start Milvus using Docker Compose:
wget https://github.com/milvus-io/milvus/releases/download/v2.3.0/milvus-standalone-docker-compose.yml -O docker-compose.yml
docker-compose up -d
For Zilliz Cloud, verify your URI and token are correct in your .env file. The URI should look like: https://your-cluster.api.gcp-us-west1.zillizcloud.com
This error occurs when the embedding dimension doesn’t match your collection schema. Ensure:
  1. The dim parameter in your collection schema matches your embedding model’s output dimension
  2. You’re using the correct embedding model consistently throughout your application
  3. If you need to change embedding models, create a new collection with the correct dimension
Common embedding dimensions:
  • OpenAI text-embedding-3-small: 1536
  • OpenAI text-embedding-3-large: 3072 (or 1024 with dimension parameter)
  • Cohere embed-english-v3.0: 1024
  • Voyage AI voyage-2: 1024
To improve search performance:
  1. Choose the right index: Use HNSW for best performance on large datasets:
    index_params = {
        "index_type": "HNSW",
        "metric_type": "L2",
        "params": {"M": 16, "efConstruction": 256}
    }
    
  2. Adjust search parameters: Increase ef for HNSW or nprobe for IVF indexes:
    search_params = {"metric_type": "L2", "params": {"ef": 64}}
    
  3. Ensure collection is loaded: Always call collection.load() before searching
  4. Use appropriate nlist: For IVF indexes, set nlist to sqrt(num_entities) as a starting point
  5. Consider GPU acceleration: Milvus supports GPU indexes for even faster search on large datasets
If you encounter memory issues:
  1. Batch your insertions: Insert documents in batches of 1000-10000 instead of all at once:
    batch_size = 1000
    for i in range(0, len(data), batch_size):
        batch = data[i:i + batch_size]
        collection.insert(batch)
        collection.flush()
    
  2. Use memory-efficient indexes: IVF_SQ8 uses less memory than IVF_FLAT:
    index_params = {
        "index_type": "IVF_SQ8",
        "metric_type": "L2",
        "params": {"nlist": 1024}
    }
    
  3. Adjust Docker memory limits: If running locally, increase Docker’s memory allocation in Docker Desktop settings
  4. Consider Zilliz Cloud: Managed service with automatic scaling and memory management
  5. Release collections: Release collections from memory when not in use:
    collection.release()
    
Empty search results can occur due to:
  1. Collection not loaded: Ensure you call collection.load() after creating the index
  2. Wrong metric type: If you used IP (inner product) for indexing but L2 for searching, results may be incorrect. Keep them consistent:
    # Use same metric_type for both index and search
    index_params = {"metric_type": "L2", ...}
    search_params = {"metric_type": "L2", ...}
    
  3. Embedding mismatch: Ensure you’re using the same embedding model for both indexing and querying
  4. Collection is empty: Verify data was inserted successfully:
    print(f"Number of entities: {collection.num_entities}")
    
  5. Search threshold too strict: Try increasing limit parameter or adjusting distance thresholds

Additional Resources