Skip to content
← Back to trail
Aoraki25 min

Building Your RAG Pipeline

Building Your RAG Pipeline

What you'll learn

  • Build a complete document ingestion pipeline from scratch
  • Implement embedding and vector storage with Chroma
  • Write a query engine that retrieves and generates grounded answers
  • Explore LangChain and LlamaIndex as RAG framework options
  • Learn testing and evaluation strategies for RAG quality

Theory is great, but the real learning happens when you start writing code. In this lesson, we are building a fully functional RAG pipeline -- from loading documents to generating answers grounded in your own data. By the end, you will have a working system you can point at any collection of documents and start asking questions.

We will start with a from-scratch approach so you understand every piece, then show how frameworks like LangChain and LlamaIndex can accelerate your work.

Project Setup

Let us set up a clean project for our RAG pipeline. We will use Python, Chroma for vector storage (easy local development), and OpenAI's embedding model.

# Create and activate a virtual environment
mkdir rag-pipeline && cd rag-pipeline
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install dependencies
pip install openai chromadb langchain langchain-community
pip install pypdf tiktoken python-dotenv

Create a .env file for your API key:

OPENAI_API_KEY=sk-your-key-here

⚠️ Warning

Never commit your .env file to version control. Add it to your .gitignore immediately. If you accidentally expose an API key, rotate it right away in your provider's dashboard.

Part 1: Document Ingestion

Let us build the ingestion pipeline step by step. We will create a system that can handle multiple file types and prepare them for embedding.

Loading Documents

# ingest.py
import os
from pathlib import Path
from dotenv import load_dotenv

load_dotenv()

def load_documents(directory: str) -> list[dict]:
    """Load all supported documents from a directory."""
    documents = []
    supported_extensions = {".txt", ".md", ".pdf"}

    for filepath in Path(directory).rglob("*"):
        if filepath.suffix.lower() not in supported_extensions:
            continue

        print(f"Loading: {filepath}")

        if filepath.suffix.lower() == ".pdf":
            documents.extend(load_pdf(filepath))
        else:
            documents.extend(load_text(filepath))

    print(f"Loaded {len(documents)} documents")
    return documents


def load_text(filepath: Path) -> list[dict]:
    """Load a plain text or markdown file."""
    text = filepath.read_text(encoding="utf-8")
    return [{
        "content": text,
        "metadata": {
            "source": str(filepath),
            "type": filepath.suffix,
        }
    }]


def load_pdf(filepath: Path) -> list[dict]:
    """Load a PDF file, one document per page."""
    from pypdf import PdfReader

    reader = PdfReader(str(filepath))
    documents = []

    for i, page in enumerate(reader.pages):
        text = page.extract_text()
        if text.strip():  # Skip empty pages
            documents.append({
                "content": text,
                "metadata": {
                    "source": str(filepath),
                    "type": ".pdf",
                    "page": i + 1,
                }
            })

    return documents

💡Start Simple, Add Loaders Later

We are supporting three file types to keep things focused. In a real project, you would add loaders for HTML, DOCX, CSV, Notion exports, and whatever else your data comes in. The pattern is always the same: extract text, attach metadata, return a list of documents.

Chunking the Documents

Now we split documents into overlapping chunks using a recursive character splitter -- the same approach LangChain uses under the hood. The idea: try splitting on paragraph breaks first, then sentences, then words, falling back to harder splits only when necessary.

# chunker.py
import tiktoken

def chunk_documents(
    documents: list[dict],
    chunk_size: int = 500,
    chunk_overlap: int = 100,
) -> list[dict]:
    """Split documents into overlapping chunks."""
    encoder = tiktoken.encoding_for_model("gpt-4")
    chunks = []

    for doc in documents:
        doc_chunks = recursive_split(
            doc["content"], chunk_size, chunk_overlap, encoder
        )
        for i, chunk_text in enumerate(doc_chunks):
            chunks.append({
                "content": chunk_text,
                "metadata": {
                    **doc["metadata"],
                    "chunk_index": i,
                    "chunk_total": len(doc_chunks),
                }
            })

    print(f"Created {len(chunks)} chunks from {len(documents)} documents")
    return chunks


def recursive_split(text, chunk_size, overlap, encoder, separators=None):
    """Split text recursively, trying larger separators first."""
    if separators is None:
        separators = ["\n\n", "\n", ". ", " ", ""]

    if len(encoder.encode(text)) <= chunk_size:
        return [text.strip()] if text.strip() else []

    for sep in separators:
        if sep and sep in text:
            parts = text.split(sep)
            chunks, current = [], ""

            for part in parts:
                candidate = current + sep + part if current else part
                if len(encoder.encode(candidate)) > chunk_size:
                    if current:
                        chunks.append(current.strip())
                    current = part
                else:
                    current = candidate
            if current:
                chunks.append(current.strip())

            # Add overlap between adjacent chunks
            if overlap > 0 and len(chunks) > 1:
                result = [chunks[0]]
                for i in range(1, len(chunks)):
                    prev_tokens = encoder.encode(chunks[i - 1])
                    overlap_text = encoder.decode(prev_tokens[-overlap:])
                    result.append(overlap_text + " " + chunks[i])
                chunks = result

            return [c for c in chunks if c]

    # Fallback: hard split by tokens
    tokens = encoder.encode(text)
    return [encoder.decode(tokens[i:i + chunk_size])
            for i in range(0, len(tokens), chunk_size - overlap)]

💡 Tip

We use tiktoken to count tokens rather than characters. A chunk of 500 tokens is roughly 375 words -- enough for a solid paragraph or two of context.

Embedding and Storing

Now we embed our chunks and store them in Chroma:

# embedder.py
import chromadb
from openai import OpenAI

client = OpenAI()

def embed_texts(texts: list[str], model: str = "text-embedding-3-small") -> list[list[float]]:
    """Embed a batch of texts using OpenAI's API."""
    response = client.embeddings.create(model=model, input=texts)
    return [item.embedding for item in response.data]


def store_chunks(chunks: list[dict], collection_name: str = "my_docs"):
    """Embed and store chunks in Chroma."""
    chroma_client = chromadb.PersistentClient(path="./chroma_db")

    try:
        chroma_client.delete_collection(collection_name)
    except ValueError:
        pass

    collection = chroma_client.create_collection(
        name=collection_name,
        metadata={"hnsw:space": "cosine"},
    )

    batch_size = 50
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i:i + batch_size]
        texts = [c["content"] for c in batch]
        metadatas = [c["metadata"] for c in batch]
        ids = [f"chunk_{i + j}" for j in range(len(batch))]

        collection.add(
            embeddings=embed_texts(texts),
            documents=texts,
            metadatas=metadatas,
            ids=ids,
        )
        print(f"Stored {min(i + batch_size, len(chunks))}/{len(chunks)} chunks")

    return collection

Putting Ingestion Together

# run_ingest.py
from ingest import load_documents
from chunker import chunk_documents
from embedder import store_chunks

# Point this at your documents
documents = load_documents("./data")
chunks = chunk_documents(documents, chunk_size=500, chunk_overlap=100)
collection = store_chunks(chunks)

print(f"Pipeline complete: {collection.count()} chunks in vector store")
🛠️

Run the Ingestion Pipeline

  1. Create a data/ folder in your project directory.
  2. Add 3-5 documents -- markdown files, text files, or PDFs. Use something you actually care about: your notes, documentation for a tool you use, or articles you have saved.
  3. Run the ingestion pipeline: python run_ingest.py
  4. Verify it worked by checking the chunk count and inspecting a few chunks.
  5. Try different chunk sizes (250, 500, 1000) and see how the total chunk count changes.

Part 2: The Query Engine

Now for the fun part -- asking questions and getting grounded answers.

# query.py
import chromadb
from openai import OpenAI

openai_client = OpenAI()

def query_rag(
    question: str,
    collection_name: str = "my_docs",
    top_k: int = 5,
    model: str = "claude-3-5-sonnet-20241022",
) -> dict:
    """Query the RAG pipeline end-to-end."""

    # 1. Connect to Chroma
    chroma_client = chromadb.PersistentClient(path="./chroma_db")
    collection = chroma_client.get_collection(collection_name)

    # 2. Embed the question
    q_embedding = openai_client.embeddings.create(
        model="text-embedding-3-small",
        input=question,
    ).data[0].embedding

    # 3. Retrieve relevant chunks
    results = collection.query(
        query_embeddings=[q_embedding],
        n_results=top_k,
        include=["documents", "metadatas", "distances"],
    )

    # 4. Format context
    context_parts = []
    sources = []

    for doc, meta, distance in zip(
        results["documents"][0],
        results["metadatas"][0],
        results["distances"][0],
    ):
        similarity = 1 - distance  # Convert distance to similarity
        source_info = meta.get("source", "unknown")

        context_parts.append(
            f"[Source: {source_info} | Similarity: {similarity:.3f}]\n{doc}"
        )
        sources.append({
            "source": source_info,
            "similarity": similarity,
            "preview": doc[:100] + "...",
        })

    context = "\n\n---\n\n".join(context_parts)

    # 5. Generate response
    system_prompt = """You are a helpful assistant that answers questions
based on the provided context. Follow these rules:

1. Only use information from the provided context to answer.
2. If the context does not contain enough information, say so clearly.
3. Cite the source when possible.
4. Be concise but thorough."""

    user_prompt = f"""Context:
{context}

Question: {question}

Answer based on the context above:"""

    response = openai_client.chat.completions.create(
        model="gpt-4o-mini",  # Or use Anthropic's API for Claude
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
        temperature=0.2,  # Lower temperature for factual answers
    )

    return {
        "answer": response.choices[0].message.content,
        "sources": sources,
        "chunks_retrieved": len(context_parts),
    }


# Interactive query loop
if __name__ == "__main__":
    print("RAG Pipeline Ready! Type 'quit' to exit.\n")

    while True:
        question = input("Ask a question: ").strip()
        if question.lower() in ("quit", "exit", "q"):
            break

        result = query_rag(question)

        print(f"\nAnswer: {result['answer']}")
        print(f"\nSources ({result['chunks_retrieved']} chunks retrieved):")
        for s in result["sources"]:
            print(f"  - {s['source']} (similarity: {s['similarity']:.3f})")
        print()

🐾Haku's Pro Tip

Set temperature=0.2 or lower for RAG responses. You want the model to stick closely to the retrieved context, not get creative. Higher temperatures increase the risk of the model "filling in" information that is not in the context.

Framework Alternatives: LangChain and LlamaIndex

Building from scratch is educational, but in production you will likely use a framework. Both achieve the same load-chunk-embed-query pipeline in far fewer lines:

LangChain Approach

from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_chroma import Chroma
from langchain.chains import RetrievalQA

docs = DirectoryLoader("./data", glob="**/*.md", loader_cls=TextLoader).load()
chunks = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200).split_documents(docs)
vectorstore = Chroma.from_documents(chunks, OpenAIEmbeddings(model="text-embedding-3-small"), persist_directory="./chroma_db")

qa_chain = RetrievalQA.from_chain_type(
    llm=ChatOpenAI(model="gpt-4o-mini", temperature=0.2),
    retriever=vectorstore.as_retriever(search_kwargs={"k": 5}),
    return_source_documents=True,
)
result = qa_chain.invoke({"query": "What is the main topic?"})

LlamaIndex Approach

from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, Settings
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI

Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small")
Settings.llm = OpenAI(model="gpt-4o-mini", temperature=0.2)

# LlamaIndex handles chunking automatically
index = VectorStoreIndex.from_documents(SimpleDirectoryReader("./data").load_data())
response = index.as_query_engine(similarity_top_k=5).query("What is the main topic?")

💡LangChain vs. LlamaIndex: When to Use Which

LangChain gives you more control and composability -- better for custom chains, mixed tools, or integrating RAG into a larger agent system. LlamaIndex is more opinionated and higher-level -- better when your primary focus is document indexing and retrieval. Both are excellent choices.

Evaluating RAG Quality

Building a RAG pipeline is only half the battle. How do you know if it is actually working well? You need to evaluate two things independently:

Retrieval Quality

Are the right chunks being retrieved? Measure this with:

  • Hit Rate -- For a test question, is the correct chunk in the top-K results?
  • Mean Reciprocal Rank (MRR) -- How high does the correct chunk rank?
def evaluate_retrieval(test_cases: list[dict], collection) -> dict:
    """
    test_cases: [{"question": "...", "expected_source": "...", "expected_content": "..."}]
    """
    hits = 0
    reciprocal_ranks = []

    for case in test_cases:
        results = collection.query(
            query_texts=[case["question"]],
            n_results=10,
        )

        for rank, doc in enumerate(results["documents"][0], 1):
            if case["expected_content"] in doc:
                hits += 1
                reciprocal_ranks.append(1.0 / rank)
                break
        else:
            reciprocal_ranks.append(0.0)

    return {
        "hit_rate": hits / len(test_cases),
        "mrr": sum(reciprocal_ranks) / len(reciprocal_ranks),
    }

Answer Quality

Is the generated answer correct and grounded? Check for:

  • Faithfulness -- Does the answer only use information from the context?
  • Relevance -- Does the answer actually address the question?
  • Completeness -- Does the answer cover all relevant information from the context?

💡 Tip

Build a test set of 20-30 question-answer pairs early in development. Run your pipeline against them after every change. This simple practice catches regressions before they reach users.

🛠️

Build and Test Your Pipeline

  1. Implement the full pipeline using either the from-scratch approach or a framework (LangChain/LlamaIndex).
  2. Ingest your documents from the previous exercise.
  3. Ask 10 questions -- 5 that should have answers in your documents and 5 that should not.
  4. Evaluate the results:
    • Did it retrieve the right chunks for the answerable questions?
    • Did it correctly say "I don't know" for the unanswerable ones?
    • Were the answers faithful to the source material?
  5. Experiment with parameters: Try changing chunk size, overlap, and top-K. Record what improves and what does not.

Paw Print Check

Before moving on, make sure you can answer these:

  • 🐾Can you build a document ingestion pipeline that loads, chunks, embeds, and stores documents?
  • 🐾How does the query flow work from user question to generated answer?
  • 🐾What are the key differences between LangChain and LlamaIndex for RAG?
  • 🐾How would you evaluate whether your RAG pipeline is producing good results?
  • 🐾What parameters would you tune if retrieval quality is poor?

What Comes Next

You now have a working RAG pipeline. In the final lesson, we combine everything -- multi-agent orchestration, RAG, shared memory, and deployment -- into a complete agentic system.

Next Up

Your Agentic System

Combine multi-agent orchestration with RAG to build a complete, deployable AI system

Enjoying the course?

If you found this helpful, please share it with friends and family — it really helps us out!

Stay in the loop

Get notified about new lessons, trails, and updates — no spam, just the good stuff.