n_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_chroma import Chroma
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
@dataclass
class PipelineConfig:
chunk_size: int = 800
chunk_overlap: int = 150
embedding_model: str = "text-embedding-3-small"
retrieval_k: int = 5
similarity_threshold: float = 0.65
persist_dir: str = "./chroma_market_db"
class MarketIntelligencePipeline:
def init(self, config: PipelineConfig):
self.config = config
self.embeddings = OpenAIEmbeddings(model=config.embedding_model)
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=config.chunk_size,
chunk_overlap=config.chunk_overlap,
separators=["\n\n", "\n", ". ", " ", ""]
)
self.vector_store = None
self.llm = ChatOpenAI(temperature=0.2, model="gpt-4o-mini")
def ingest_text_source(self, file_path: str, metadata: Dict[str, Any] = None) -> List[Document]:
"""Ingest text files with metadata tagging."""
if not os.path.exists(file_path):
raise FileNotFoundError(f"Text source not found: {file_path}")
loader = TextLoader(file_path, encoding="utf-8")
raw_docs = loader.load()
# Attach source metadata for traceability
for doc in raw_docs:
doc.metadata.update(metadata or {})
doc.metadata["source_type"] = "text"
return self.splitter.split_documents(raw_docs)
def ingest_csv_source(self, file_path: str, id_column: str = "id") -> List[Document]:
"""Ingest CSV data with structured row-level metadata."""
df = pd.read_csv(file_path)
documents = []
for _, row in df.iterrows():
# Construct content from relevant columns
content_parts = [f"{col}: {val}" for col, val in row.items() if pd.notna(val)]
content = "\n".join(content_parts)
# Preserve row ID and schema in metadata for filtering
row_metadata = {
"source_type": "csv",
"row_id": str(row.get(id_column, "")),
"schema_columns": list(df.columns)
}
documents.append(Document(page_content=content, metadata=row_metadata))
return documents
def ingest_web_source(self, urls: List[str], metadata: Dict[str, Any] = None) -> List[Document]:
"""Ingest web content with error handling and mode configuration."""
all_docs = []
for url in urls:
try:
loader = UnstructuredURLLoader(
urls=[url],
mode="elements", # Extracts structured elements
post_processors_kwargs={"raise_on_failure": False}
)
docs = loader.load()
for doc in docs:
doc.metadata.update(metadata or {})
doc.metadata["source_type"] = "web"
doc.metadata["url"] = url
all_docs.extend(docs)
except Exception as e:
print(f"Warning: Failed to ingest {url}: {e}")
return self.splitter.split_documents(all_docs)
def build_vector_store(self, documents: List[Document]) -> None:
"""Initialize or update the vector database."""
self.vector_store = Chroma.from_documents(
documents=documents,
embedding=self.embeddings,
persist_directory=self.config.persist_dir
)
print(f"Vector store persisted to {self.config.persist_dir}")
def load_vector_store(self) -> None:
"""Load existing vector store for retrieval."""
if not os.path.exists(self.config.persist_dir):
raise ValueError("Vector store not found. Run build_vector_store first.")
self.vector_store = Chroma(
persist_directory=self.config.persist_dir,
embedding_function=self.embeddings
)
def retrieve_context(self, query: str) -> List[Document]:
"""Retrieve relevant documents with similarity thresholding."""
if not self.vector_store:
raise RuntimeError("Vector store not initialized.")
retriever = self.vector_store.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={
"k": self.config.retrieval_k,
"score_threshold": self.config.similarity_threshold
}
)
return retriever.invoke(query)
def generate_analysis(self, query: str) -> str:
"""Execute retrieval and generation chain."""
context_docs = self.retrieve_context(query)
if not context_docs:
return "No relevant context found for the query."
context_text = "\n\n---\n\n".join(doc.page_content for doc in context_docs)
prompt = ChatPromptTemplate.from_template("""
You are a market intelligence analyst. Analyze the user query using ONLY the provided context.
Rules:
- Cite specific data points from the context.
- If the context lacks information, state "Data not available in context."
- Do not hallucinate metrics or values.
- Structure the response with clear headings.
Context:
{context}
Query:
{query}
Analysis:
""")
chain = prompt | self.llm | StrOutputParser()
return chain.invoke({"context": context_text, "query": query})
### Usage Example
```python
# Configuration
config = PipelineConfig(
chunk_size=1000,
similarity_threshold=0.7,
retrieval_k=4
)
pipeline = MarketIntelligencePipeline(config)
# 1. Ingest mixed sources
docs = []
docs.extend(pipeline.ingest_text_source("competitor_report.txt", {"report_date": "2024-01"}))
docs.extend(pipeline.ingest_csv_source("product_catalog.csv", id_column="sku"))
docs.extend(pipeline.ingest_web_source(
urls=["https://example.com/market-trends", "https://example.com/competitor-news"],
metadata={"category": "news"}
))
# 2. Build index
pipeline.build_vector_store(docs)
# 3. Query
result = pipeline.generate_analysis(
"Compare the pricing strategy of Product X against competitors and identify market risks."
)
print(result)
Pitfall Guide
Production RAG systems fail due to subtle implementation errors. The following pitfalls are derived from deployment experience.
-
Semantic Fragmentation in Chunking
- Explanation: Splitting text at arbitrary character counts breaks sentences or tables, causing chunks to lose meaning. The LLM receives incomplete context.
- Fix: Use
RecursiveCharacterTextSplitter with appropriate separators. For tables or code, use specialized splitters that preserve structure. Increase chunk_overlap to 10-15% to maintain context continuity across boundaries.
-
Embedding Model Mismatch
- Explanation: Indexing with one embedding model and querying with another results in vector space misalignment. Retrieval returns irrelevant documents because the vectors are incomparable.
- Fix: Enforce model consistency via configuration management. Store the embedding model name in the vector store metadata and validate it during retrieval initialization.
-
Context Window Overflow in Generation
- Explanation: Retrieving too many chunks (
k too high) or chunks that are too large can exceed the LLM's context window, causing truncation or API errors.
- Fix: Calculate maximum context size:
max_tokens - prompt_tokens - output_tokens. Limit k and chunk_size accordingly. Implement dynamic chunk selection based on similarity scores rather than fixed k.
-
CSV Schema Drift and LLM Confusion
- Explanation: Passing raw CSV text to an LLM without structure causes the model to misinterpret columns, especially when headers are missing or data types vary.
- Fix: Convert CSV rows to structured text with explicit column labels. Inject schema metadata into the prompt. For complex queries, use pandas for pre-filtering before LLM analysis.
-
Web Scraping Fragility
- Explanation: Relying on static HTML selectors breaks when websites update. Scrapers may also capture navigation, ads, and footers, polluting the context.
- Fix: Use robust loaders like
UnstructuredURLLoader that extract semantic elements. Implement retry logic with exponential backoff. Filter content by element type (e.g., exclude nav, footer).
-
Vector Store Persistence Loss
- Explanation: In-memory vector stores lose data on restart. Re-indexing large datasets on every run is inefficient and costly.
- Fix: Always use persistent storage (e.g.,
Chroma with persist_directory). Implement incremental indexing strategies to update only changed documents.
-
Hallucination via Low-Similarity Retrieval
- Explanation: Retrieving documents with low relevance scores forces the LLM to guess or hallucinate when context is insufficient.
- Fix: Implement
similarity_score_threshold filtering. If no documents meet the threshold, return a fallback response indicating insufficient data rather than proceeding with weak context.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Static Knowledge Base | Vector Retrieval with Chroma | Efficient for read-heavy, infrequently updated data. | Low storage, moderate embedding cost. |
| Dynamic Web Data | Hybrid: Web Loader + Vector DB | Handles frequent updates; vector DB enables fast retrieval. | Higher ingestion cost due to scraping/embedding. |
| Structured CSV Analysis | Pandas Pre-filter + LLM | Preserves schema integrity; reduces noise. | Minimal embedding cost; higher compute for pandas. |
| Real-time Streaming | Sliding Window Vector Store | Maintains recent context; discards stale data. | Continuous embedding cost; optimized retrieval. |
Configuration Template
Use this template to standardize pipeline configuration across environments.
# pipeline_config.yaml
pipeline:
chunking:
size: 800
overlap: 150
separators: ["\n\n", "\n", ". ", " ", ""]
embedding:
model: "text-embedding-3-small"
dimensions: 1536
retrieval:
k: 5
score_threshold: 0.65
search_type: "similarity_score_threshold"
storage:
type: "chroma"
persist_directory: "./data/vector_store"
collection_name: "market_intelligence"
generation:
model: "gpt-4o-mini"
temperature: 0.2
max_tokens: 1000
Quick Start Guide
-
Install Dependencies:
pip install langchain langchain-community langchain-openai langchain-chroma pandas unstructured
-
Set Environment Variables:
export OPENAI_API_KEY="your-api-key"
-
Initialize Pipeline:
from pipeline import MarketIntelligencePipeline, PipelineConfig
config = PipelineConfig()
pipeline = MarketIntelligencePipeline(config)
-
Ingest and Index:
docs = pipeline.ingest_text_source("data/report.txt")
pipeline.build_vector_store(docs)
-
Query:
response = pipeline.generate_analysis("Summarize key risks in the report.")
print(response)
This architecture provides a scalable foundation for data-driven LLM applications. By separating ingestion from retrieval and enforcing strict relevance thresholds, systems maintain performance and accuracy as data volumes grow.