RAG Pipeline Architecture: From Zero to Production

A deep dive into RAG pipeline architecture covering naive RAG, advanced RAG, and production patterns. Learn how to build scalable retrieval-augmented generation systems.

13 min readKeiro Team

Introduction

Retrieval-Augmented Generation (RAG) has evolved from a simple research concept to the backbone of production AI systems. But the gap between a demo RAG pipeline and a production one is enormous. In this guide, we walk through the complete architecture journey — from the simplest possible RAG implementation to a hardened, scalable production system.

Level 0: Naive RAG

The simplest RAG pipeline has three steps: retrieve, augment, and generate.

import requests
from openai import OpenAI

KEIRO_API_KEY = "your-keiro-api-key"
client = OpenAI()

def naive_rag(question: str) -> str:
    # Retrieve
    search = requests.post("https://kierolabs.space/api/search", json={
        "apiKey": KEIRO_API_KEY,
        "query": question
    }).json()

    context = "\n".join([
        f"{r['title']}: {r.get('content', r.get('snippet', ''))}"
        for r in search.get("results", [])[:5]
    ])

    # Augment + Generate
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "Answer using the search results. Cite sources."},
            {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
        ]
    )
    return response.choices[0].message.content

This works for demos and simple use cases. But it has several limitations we need to address for production.

Level 1: Improved Retrieval

The first upgrade is improving retrieval quality. Instead of basic search, use Keiro's /search-pro for re-ranked results and extract full page content for the top results.

def improved_retrieval(question: str, top_n: int = 3) -> list[dict]:
    # Step 1: Pro search for better ranking
    search = requests.post("https://kierolabs.space/api/search-pro", json={
        "apiKey": KEIRO_API_KEY,
        "query": question
    }).json()
    results = search.get("results", [])[:top_n]

    # Step 2: Extract full content for top results
    enriched = []
    for r in results:
        try:
            crawl = requests.post("https://kierolabs.space/api/web-crawler", json={
                "apiKey": KEIRO_API_KEY,
                "url": r["url"]
            }).json()
            r["full_content"] = crawl.get("content", "")[:3000]
        except Exception:
            r["full_content"] = r.get("content", r.get("snippet", ""))
        enriched.append(r)

    return enriched

Level 2: Query Enhancement

Users often ask vague or ambiguous questions. Query enhancement reformulates the question before searching:

def enhance_query(original_query: str) -> list[str]:
    """Generate multiple search queries from a single user question."""
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": (
                "Given a user question, generate 3 different search queries that would "
                "help find the information needed to answer it. Return as a JSON array."
            )},
            {"role": "user", "content": original_query}
        ],
        response_format={"type": "json_object"}
    )
    import json
    result = json.loads(response.choices[0].message.content)
    return result.get("queries", [original_query])

Then search with all generated queries and merge the results. Keiro's free batch search makes this cost-effective:

def multi_query_search(original_query: str) -> list[dict]:
    queries = enhance_query(original_query)

    # Use Keiro batch search - FREE
    batch_resp = requests.post("https://kierolabs.space/api/batch-search", json={
        "apiKey": KEIRO_API_KEY,
        "queries": queries
    }).json()

    # Merge and deduplicate results
    seen_urls = set()
    merged = []
    for result_set in batch_resp.get("results", []):
        for r in result_set.get("items", []):
            if r["url"] not in seen_urls:
                seen_urls.add(r["url"])
                merged.append(r)

    return merged

Level 3: Context Window Management

Production systems need to carefully manage the context window. Too much context wastes tokens and can confuse the model. Too little misses important information.

def manage_context(results: list[dict], max_tokens: int = 4000) -> str:
    """Build a context string that fits within the token budget."""
    context_parts = []
    estimated_tokens = 0

    for i, r in enumerate(results):
        content = r.get("full_content", r.get("content", r.get("snippet", "")))
        # Rough estimate: 1 token ≈ 4 characters
        content_tokens = len(content) // 4

        if estimated_tokens + content_tokens > max_tokens:
            # Truncate this result to fit
            remaining_chars = (max_tokens - estimated_tokens) * 4
            content = content[:remaining_chars]
            context_parts.append(f"[{i+1}] {r.get('title', 'N/A')}\n{content}")
            break

        context_parts.append(f"[{i+1}] {r.get('title', 'N/A')}\nURL: {r.get('url', '')}\n{content}")
        estimated_tokens += content_tokens

    return "\n\n---\n\n".join(context_parts)

Level 4: Answer Validation

In production, you should validate that the generated answer is actually grounded in the sources:

def validate_answer(answer: str, context: str) -> dict:
    """Check if the answer is grounded in the provided sources."""
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": (
                "You are a fact-checker. Given an answer and its source context, evaluate:\n"
                "1. Is the answer supported by the sources? (yes/no)\n"
                "2. Are there any claims not supported by sources? (list them)\n"
                "3. Confidence score (0-100)\n"
                "Return as JSON."
            )},
            {"role": "user", "content": f"Answer:\n{answer}\n\nSources:\n{context}"}
        ],
        response_format={"type": "json_object"}
    )
    import json
    return json.loads(response.choices[0].message.content)

Level 5: Production Architecture

A production RAG system ties all of this together with error handling, caching, logging, and monitoring:

import logging
import time

logger = logging.getLogger(__name__)

class ProductionRAG:
    def __init__(self, keiro_key: str, openai_key: str):
        self.keiro_key = keiro_key
        self.keiro_base = "https://kierolabs.space/api"
        self.llm = OpenAI(api_key=openai_key)

    def answer(self, question: str) -> dict:
        start_time = time.time()

        try:
            # 1. Enhance query
            queries = enhance_query(question)
            logger.info(f"Enhanced query into {len(queries)} sub-queries")

            # 2. Search with multiple queries (batch is free)
            results = self._batch_search(queries)
            logger.info(f"Retrieved {len(results)} unique results")

            if not results:
                return self._fallback_response(question)

            # 3. Enrich top results with full content
            enriched = self._enrich_results(results[:3])

            # 4. Build context
            context = manage_context(enriched, max_tokens=4000)

            # 5. Generate answer
            answer = self._generate(question, context)

            # 6. Validate
            validation = validate_answer(answer, context)

            elapsed = time.time() - start_time
            logger.info(f"RAG pipeline completed in {elapsed:.2f}s, confidence: {validation.get('confidence', 'N/A')}")

            return {
                "answer": answer,
                "sources": [{"title": r.get("title", ""), "url": r.get("url", "")} for r in enriched],
                "confidence": validation.get("confidence", 0),
                "latency_ms": int(elapsed * 1000)
            }

        except Exception as e:
            logger.error(f"RAG pipeline error: {e}")
            return self._fallback_response(question)

    def _fallback_response(self, question: str) -> dict:
        """Use Keiro /answer as a fallback."""
        try:
            resp = requests.post(f"{self.keiro_base}/answer", json={
                "apiKey": self.keiro_key,
                "query": question
            }, timeout=15)
            data = resp.json()
            return {
                "answer": data.get("response", "I could not find an answer."),
                "sources": data.get("sources", []),
                "confidence": 50,
                "fallback": True
            }
        except Exception:
            return {
                "answer": "I apologize, but I am unable to search for information right now.",
                "sources": [],
                "confidence": 0,
                "error": True
            }

Performance Optimization

OptimizationImpactHow
Parallel search-50% latencyUse asyncio to parallelize search and crawl calls
Keiro cache-50% search costAutomatic on repeated queries
Batch processingFree for bulk jobsUse /batch-search for multi-query
Context truncation-30% LLM tokensSmart context management (Level 3)
Smaller model for validation-80% validation costUse gpt-4o-mini instead of gpt-4o

Monitoring Checklist

  • Latency: Track p50, p95, p99 for the full pipeline
  • Retrieval quality: Sample and manually evaluate search results weekly
  • Answer accuracy: Track validation confidence scores over time
  • Error rate: Monitor search API and LLM failure rates
  • Cost: Track per-query costs broken down by component

Conclusion

Building a production RAG pipeline is an iterative process. Start with naive RAG (Level 0) to validate your use case, then progressively add query enhancement, context management, answer validation, and production hardening as you scale. Keiro's comprehensive API makes this easier by providing search, pro search, web crawling, batch processing, and answer generation in a single platform.

Build your production RAG pipeline with Keiro at kierolabs.space. From $5.99/month.

Ready to build something?

Join developers using Keiro — 10× cheaper with superior performance.

Get started