Batch Processing¶
Learn how to process many documents efficiently with parallel processing.
The clean_and_split_docs Function¶
Process multiple documents with one function call:
from rs_document import Document, clean_and_split_docs
# Create multiple documents
documents = [
Document(
page_content=f"Document {i} content " * 100,
metadata={"doc_id": str(i)}
)
for i in range(100)
]
# Clean and split all at once
chunks = clean_and_split_docs(documents, chunk_size=1000)
print(f"Processed {len(documents)} documents")
print(f"Created {len(chunks)} chunks")
What It Does¶
clean_and_split_docs performs these steps in parallel:
- Runs
.clean()on each document - Splits each with
.recursive_character_splitter(chunk_size) - Returns a flat list of all chunks
Why Use Batch Processing?¶
Performance¶
Processing documents individually:
# Slow - processes sequentially
all_chunks = []
for doc in documents:
doc.clean()
chunks = doc.recursive_character_splitter(1000)
all_chunks.extend(chunks)
Using batch processing:
Result: 20-25x faster, processing ~23,000 documents/second on typical hardware.
Simplicity¶
One function call instead of a loop:
# Before
all_chunks = []
for doc in documents:
doc.clean()
chunks = doc.recursive_character_splitter(1000)
all_chunks.extend(chunks)
# After
chunks = clean_and_split_docs(documents, chunk_size=1000)
Complete Example¶
Process a directory of text files:
from pathlib import Path
from rs_document import Document, clean_and_split_docs
# Load all text files
documents = []
for file_path in Path("./documents").glob("*.txt"):
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
doc = Document(
page_content=content,
metadata={
"source": file_path.name,
"path": str(file_path)
}
)
documents.append(doc)
print(f"Loaded {len(documents)} documents")
# Process all at once
chunks = clean_and_split_docs(documents, chunk_size=1000)
print(f"Created {len(chunks)} chunks")
print(f"Average chunks per doc: {len(chunks) / len(documents):.1f}")
Tracking Progress¶
For large batches, show progress:
from rs_document import Document, clean_and_split_docs
import time
documents = [...] # Your documents
print(f"Processing {len(documents)} documents...")
start_time = time.time()
chunks = clean_and_split_docs(documents, chunk_size=1000)
elapsed = time.time() - start_time
docs_per_sec = len(documents) / elapsed
print(f"Done in {elapsed:.2f}s ({docs_per_sec:.0f} docs/sec)")
print(f"Created {len(chunks)} chunks")
Working with Results¶
Group Chunks by Source¶
Track which chunks came from which document:
from collections import defaultdict
# Add unique IDs before processing
for i, doc in enumerate(documents):
doc.metadata["doc_id"] = str(i)
# Process
chunks = clean_and_split_docs(documents, chunk_size=1000)
# Group by source
chunks_by_doc = defaultdict(list)
for chunk in chunks:
doc_id = chunk.metadata["doc_id"]
chunks_by_doc[doc_id].append(chunk)
# See distribution
for doc_id, doc_chunks in chunks_by_doc.items():
print(f"Document {doc_id}: {len(doc_chunks)} chunks")
Filter Small Chunks¶
Remove chunks below a minimum size:
chunks = clean_and_split_docs(documents, chunk_size=1000)
# Keep only chunks >= 200 characters
min_size = 200
filtered = [c for c in chunks if len(c.page_content) >= min_size]
print(f"Kept {len(filtered)} of {len(chunks)} chunks")
Add Global Metadata¶
Add metadata to all chunks:
chunks = clean_and_split_docs(documents, chunk_size=1000)
# Add processing info to all chunks
batch_id = "batch_001"
for chunk in chunks:
chunk.metadata["batch_id"] = batch_id
chunk.metadata["processed_at"] = "2024-01-01"
Performance Considerations¶
Optimal Batch Size¶
Process documents in batches for best performance:
from rs_document import clean_and_split_docs
def process_in_batches(documents, chunk_size=1000, batch_size=1000):
"""Process documents in batches."""
all_chunks = []
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
chunks = clean_and_split_docs(batch, chunk_size=chunk_size)
all_chunks.extend(chunks)
print(f"Processed batch {i//batch_size + 1}: {len(chunks)} chunks")
return all_chunks
# Process 10,000 documents in batches of 1,000
documents = [...] # Your 10,000 documents
chunks = process_in_batches(documents, chunk_size=1000, batch_size=1000)
Memory Management¶
For very large datasets:
def process_and_save(documents, output_file, chunk_size=1000):
"""Process documents and save chunks incrementally."""
import json
with open(output_file, "w") as f:
batch_size = 1000
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
chunks = clean_and_split_docs(batch, chunk_size=chunk_size)
# Save chunks to file
for chunk in chunks:
data = {
"content": chunk.page_content,
"metadata": chunk.metadata
}
f.write(json.dumps(data) + "\n")
print(f"Saved {len(chunks)} chunks from batch {i//batch_size + 1}")
# Use it
documents = [...] # Large list
process_and_save(documents, "chunks.jsonl", chunk_size=1000)
Integration Example¶
Complete pipeline for RAG:
from pathlib import Path
from rs_document import Document, clean_and_split_docs
def prepare_documents_for_rag(directory, chunk_size=1000):
"""Load, clean, split, and prepare documents for RAG."""
# 1. Load documents
print("Loading documents...")
documents = []
for file_path in Path(directory).glob("**/*.txt"):
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
doc = Document(
page_content=content,
metadata={
"source": file_path.name,
"path": str(file_path),
"category": file_path.parent.name
}
)
documents.append(doc)
print(f"Loaded {len(documents)} documents")
# 2. Clean and split
print("Processing...")
chunks = clean_and_split_docs(documents, chunk_size=chunk_size)
# 3. Filter small chunks
min_size = chunk_size // 4
filtered = [c for c in chunks if len(c.page_content) >= min_size]
# 4. Add chunk IDs
for i, chunk in enumerate(filtered):
chunk.metadata["chunk_id"] = str(i)
print(f"Created {len(filtered)} chunks")
return filtered
# Use it
chunks = prepare_documents_for_rag("./my_documents", chunk_size=1000)
# Ready for embedding
texts = [c.page_content for c in chunks]
metadatas = [c.metadata for c in chunks]
Benchmarking¶
Measure your processing speed:
import time
from rs_document import Document, clean_and_split_docs
# Create test documents
num_docs = 1000
documents = [
Document(
page_content="test content " * 500,
metadata={"id": str(i)}
)
for i in range(num_docs)
]
# Benchmark
start = time.time()
chunks = clean_and_split_docs(documents, chunk_size=1000)
elapsed = time.time() - start
# Results
docs_per_sec = num_docs / elapsed
print(f"Processed {num_docs} documents in {elapsed:.2f}s")
print(f"Speed: {docs_per_sec:.0f} documents/second")
print(f"Created {len(chunks)} chunks")
Summary¶
You've learned:
- ✅ How to use
clean_and_split_docsfor batch processing - ✅ Why batch processing is faster (parallel processing)
- ✅ How to track and group chunks by source
- ✅ Performance optimization techniques
- ✅ Complete RAG pipeline example
Next Steps¶
You've completed the tutorial! Here's what to explore next:
- How-To Guides - Solve specific problems
- API Reference - Look up exact method signatures
- Explanation - Understand design decisions
Happy document processing! 🚀