Building Your RAG Pipeline
What you'll learn
- ✓Build a complete document ingestion pipeline from scratch
- ✓Implement embedding and vector storage with Chroma
- ✓Write a query engine that retrieves and generates grounded answers
- ✓Explore LangChain and LlamaIndex as RAG framework options
- ✓Learn testing and evaluation strategies for RAG quality
Theory is great, but the real learning happens when you start writing code. In this lesson, we are building a fully functional RAG pipeline -- from loading documents to generating answers grounded in your own data. By the end, you will have a working system you can point at any collection of documents and start asking questions.
We will start with a from-scratch approach so you understand every piece, then show how frameworks like LangChain and LlamaIndex can accelerate your work.
Project Setup
Let us set up a clean project for our RAG pipeline. We will use Python, Chroma for vector storage (easy local development), and OpenAI's embedding model.
# Create and activate a virtual environment
mkdir rag-pipeline && cd rag-pipeline
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install openai chromadb langchain langchain-community
pip install pypdf tiktoken python-dotenv
Create a .env file for your API key:
OPENAI_API_KEY=sk-your-key-here
⚠️ Warning
Never commit your .env file to version control. Add it to your .gitignore immediately. If you accidentally expose an API key, rotate it right away in your provider's dashboard.
Part 1: Document Ingestion
Let us build the ingestion pipeline step by step. We will create a system that can handle multiple file types and prepare them for embedding.
Loading Documents
# ingest.py
import os
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
def load_documents(directory: str) -> list[dict]:
"""Load all supported documents from a directory."""
documents = []
supported_extensions = {".txt", ".md", ".pdf"}
for filepath in Path(directory).rglob("*"):
if filepath.suffix.lower() not in supported_extensions:
continue
print(f"Loading: {filepath}")
if filepath.suffix.lower() == ".pdf":
documents.extend(load_pdf(filepath))
else:
documents.extend(load_text(filepath))
print(f"Loaded {len(documents)} documents")
return documents
def load_text(filepath: Path) -> list[dict]:
"""Load a plain text or markdown file."""
text = filepath.read_text(encoding="utf-8")
return [{
"content": text,
"metadata": {
"source": str(filepath),
"type": filepath.suffix,
}
}]
def load_pdf(filepath: Path) -> list[dict]:
"""Load a PDF file, one document per page."""
from pypdf import PdfReader
reader = PdfReader(str(filepath))
documents = []
for i, page in enumerate(reader.pages):
text = page.extract_text()
if text.strip(): # Skip empty pages
documents.append({
"content": text,
"metadata": {
"source": str(filepath),
"type": ".pdf",
"page": i + 1,
}
})
return documents
💡Start Simple, Add Loaders Later
We are supporting three file types to keep things focused. In a real project, you would add loaders for HTML, DOCX, CSV, Notion exports, and whatever else your data comes in. The pattern is always the same: extract text, attach metadata, return a list of documents.
Chunking the Documents
Now we split documents into overlapping chunks using a recursive character splitter -- the same approach LangChain uses under the hood. The idea: try splitting on paragraph breaks first, then sentences, then words, falling back to harder splits only when necessary.
# chunker.py
import tiktoken
def chunk_documents(
documents: list[dict],
chunk_size: int = 500,
chunk_overlap: int = 100,
) -> list[dict]:
"""Split documents into overlapping chunks."""
encoder = tiktoken.encoding_for_model("gpt-4")
chunks = []
for doc in documents:
doc_chunks = recursive_split(
doc["content"], chunk_size, chunk_overlap, encoder
)
for i, chunk_text in enumerate(doc_chunks):
chunks.append({
"content": chunk_text,
"metadata": {
**doc["metadata"],
"chunk_index": i,
"chunk_total": len(doc_chunks),
}
})
print(f"Created {len(chunks)} chunks from {len(documents)} documents")
return chunks
def recursive_split(text, chunk_size, overlap, encoder, separators=None):
"""Split text recursively, trying larger separators first."""
if separators is None:
separators = ["\n\n", "\n", ". ", " ", ""]
if len(encoder.encode(text)) <= chunk_size:
return [text.strip()] if text.strip() else []
for sep in separators:
if sep and sep in text:
parts = text.split(sep)
chunks, current = [], ""
for part in parts:
candidate = current + sep + part if current else part
if len(encoder.encode(candidate)) > chunk_size:
if current:
chunks.append(current.strip())
current = part
else:
current = candidate
if current:
chunks.append(current.strip())
# Add overlap between adjacent chunks
if overlap > 0 and len(chunks) > 1:
result = [chunks[0]]
for i in range(1, len(chunks)):
prev_tokens = encoder.encode(chunks[i - 1])
overlap_text = encoder.decode(prev_tokens[-overlap:])
result.append(overlap_text + " " + chunks[i])
chunks = result
return [c for c in chunks if c]
# Fallback: hard split by tokens
tokens = encoder.encode(text)
return [encoder.decode(tokens[i:i + chunk_size])
for i in range(0, len(tokens), chunk_size - overlap)]
💡 Tip
We use tiktoken to count tokens rather than characters. A chunk of 500 tokens is roughly 375 words -- enough for a solid paragraph or two of context.
Embedding and Storing
Now we embed our chunks and store them in Chroma:
# embedder.py
import chromadb
from openai import OpenAI
client = OpenAI()
def embed_texts(texts: list[str], model: str = "text-embedding-3-small") -> list[list[float]]:
"""Embed a batch of texts using OpenAI's API."""
response = client.embeddings.create(model=model, input=texts)
return [item.embedding for item in response.data]
def store_chunks(chunks: list[dict], collection_name: str = "my_docs"):
"""Embed and store chunks in Chroma."""
chroma_client = chromadb.PersistentClient(path="./chroma_db")
try:
chroma_client.delete_collection(collection_name)
except ValueError:
pass
collection = chroma_client.create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"},
)
batch_size = 50
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
texts = [c["content"] for c in batch]
metadatas = [c["metadata"] for c in batch]
ids = [f"chunk_{i + j}" for j in range(len(batch))]
collection.add(
embeddings=embed_texts(texts),
documents=texts,
metadatas=metadatas,
ids=ids,
)
print(f"Stored {min(i + batch_size, len(chunks))}/{len(chunks)} chunks")
return collection
Putting Ingestion Together
# run_ingest.py
from ingest import load_documents
from chunker import chunk_documents
from embedder import store_chunks
# Point this at your documents
documents = load_documents("./data")
chunks = chunk_documents(documents, chunk_size=500, chunk_overlap=100)
collection = store_chunks(chunks)
print(f"Pipeline complete: {collection.count()} chunks in vector store")
Run the Ingestion Pipeline
- Create a
data/folder in your project directory. - Add 3-5 documents -- markdown files, text files, or PDFs. Use something you actually care about: your notes, documentation for a tool you use, or articles you have saved.
- Run the ingestion pipeline:
python run_ingest.py - Verify it worked by checking the chunk count and inspecting a few chunks.
- Try different chunk sizes (250, 500, 1000) and see how the total chunk count changes.
Part 2: The Query Engine
Now for the fun part -- asking questions and getting grounded answers.
# query.py
import chromadb
from openai import OpenAI
openai_client = OpenAI()
def query_rag(
question: str,
collection_name: str = "my_docs",
top_k: int = 5,
model: str = "claude-3-5-sonnet-20241022",
) -> dict:
"""Query the RAG pipeline end-to-end."""
# 1. Connect to Chroma
chroma_client = chromadb.PersistentClient(path="./chroma_db")
collection = chroma_client.get_collection(collection_name)
# 2. Embed the question
q_embedding = openai_client.embeddings.create(
model="text-embedding-3-small",
input=question,
).data[0].embedding
# 3. Retrieve relevant chunks
results = collection.query(
query_embeddings=[q_embedding],
n_results=top_k,
include=["documents", "metadatas", "distances"],
)
# 4. Format context
context_parts = []
sources = []
for doc, meta, distance in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0],
):
similarity = 1 - distance # Convert distance to similarity
source_info = meta.get("source", "unknown")
context_parts.append(
f"[Source: {source_info} | Similarity: {similarity:.3f}]\n{doc}"
)
sources.append({
"source": source_info,
"similarity": similarity,
"preview": doc[:100] + "...",
})
context = "\n\n---\n\n".join(context_parts)
# 5. Generate response
system_prompt = """You are a helpful assistant that answers questions
based on the provided context. Follow these rules:
1. Only use information from the provided context to answer.
2. If the context does not contain enough information, say so clearly.
3. Cite the source when possible.
4. Be concise but thorough."""
user_prompt = f"""Context:
{context}
Question: {question}
Answer based on the context above:"""
response = openai_client.chat.completions.create(
model="gpt-4o-mini", # Or use Anthropic's API for Claude
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
temperature=0.2, # Lower temperature for factual answers
)
return {
"answer": response.choices[0].message.content,
"sources": sources,
"chunks_retrieved": len(context_parts),
}
# Interactive query loop
if __name__ == "__main__":
print("RAG Pipeline Ready! Type 'quit' to exit.\n")
while True:
question = input("Ask a question: ").strip()
if question.lower() in ("quit", "exit", "q"):
break
result = query_rag(question)
print(f"\nAnswer: {result['answer']}")
print(f"\nSources ({result['chunks_retrieved']} chunks retrieved):")
for s in result["sources"]:
print(f" - {s['source']} (similarity: {s['similarity']:.3f})")
print()
🐾Haku's Pro Tip
Set temperature=0.2 or lower for RAG responses. You want the model to stick closely to the retrieved context, not get creative. Higher temperatures increase the risk of the model "filling in" information that is not in the context.
Framework Alternatives: LangChain and LlamaIndex
Building from scratch is educational, but in production you will likely use a framework. Both achieve the same load-chunk-embed-query pipeline in far fewer lines:
LangChain Approach
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_chroma import Chroma
from langchain.chains import RetrievalQA
docs = DirectoryLoader("./data", glob="**/*.md", loader_cls=TextLoader).load()
chunks = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200).split_documents(docs)
vectorstore = Chroma.from_documents(chunks, OpenAIEmbeddings(model="text-embedding-3-small"), persist_directory="./chroma_db")
qa_chain = RetrievalQA.from_chain_type(
llm=ChatOpenAI(model="gpt-4o-mini", temperature=0.2),
retriever=vectorstore.as_retriever(search_kwargs={"k": 5}),
return_source_documents=True,
)
result = qa_chain.invoke({"query": "What is the main topic?"})
LlamaIndex Approach
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, Settings
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small")
Settings.llm = OpenAI(model="gpt-4o-mini", temperature=0.2)
# LlamaIndex handles chunking automatically
index = VectorStoreIndex.from_documents(SimpleDirectoryReader("./data").load_data())
response = index.as_query_engine(similarity_top_k=5).query("What is the main topic?")
💡LangChain vs. LlamaIndex: When to Use Which
LangChain gives you more control and composability -- better for custom chains, mixed tools, or integrating RAG into a larger agent system. LlamaIndex is more opinionated and higher-level -- better when your primary focus is document indexing and retrieval. Both are excellent choices.
Evaluating RAG Quality
Building a RAG pipeline is only half the battle. How do you know if it is actually working well? You need to evaluate two things independently:
Retrieval Quality
Are the right chunks being retrieved? Measure this with:
- Hit Rate -- For a test question, is the correct chunk in the top-K results?
- Mean Reciprocal Rank (MRR) -- How high does the correct chunk rank?
def evaluate_retrieval(test_cases: list[dict], collection) -> dict:
"""
test_cases: [{"question": "...", "expected_source": "...", "expected_content": "..."}]
"""
hits = 0
reciprocal_ranks = []
for case in test_cases:
results = collection.query(
query_texts=[case["question"]],
n_results=10,
)
for rank, doc in enumerate(results["documents"][0], 1):
if case["expected_content"] in doc:
hits += 1
reciprocal_ranks.append(1.0 / rank)
break
else:
reciprocal_ranks.append(0.0)
return {
"hit_rate": hits / len(test_cases),
"mrr": sum(reciprocal_ranks) / len(reciprocal_ranks),
}
Answer Quality
Is the generated answer correct and grounded? Check for:
- Faithfulness -- Does the answer only use information from the context?
- Relevance -- Does the answer actually address the question?
- Completeness -- Does the answer cover all relevant information from the context?
💡 Tip
Build a test set of 20-30 question-answer pairs early in development. Run your pipeline against them after every change. This simple practice catches regressions before they reach users.
Build and Test Your Pipeline
- Implement the full pipeline using either the from-scratch approach or a framework (LangChain/LlamaIndex).
- Ingest your documents from the previous exercise.
- Ask 10 questions -- 5 that should have answers in your documents and 5 that should not.
- Evaluate the results:
- Did it retrieve the right chunks for the answerable questions?
- Did it correctly say "I don't know" for the unanswerable ones?
- Were the answers faithful to the source material?
- Experiment with parameters: Try changing chunk size, overlap, and top-K. Record what improves and what does not.
Paw Print Check
Before moving on, make sure you can answer these:
- 🐾Can you build a document ingestion pipeline that loads, chunks, embeds, and stores documents?
- 🐾How does the query flow work from user question to generated answer?
- 🐾What are the key differences between LangChain and LlamaIndex for RAG?
- 🐾How would you evaluate whether your RAG pipeline is producing good results?
- 🐾What parameters would you tune if retrieval quality is poor?
What Comes Next
You now have a working RAG pipeline. In the final lesson, we combine everything -- multi-agent orchestration, RAG, shared memory, and deployment -- into a complete agentic system.
Next Up
Your Agentic System
Combine multi-agent orchestration with RAG to build a complete, deployable AI system