Utility Functions¶
Utility functions for processing multiple documents efficiently.
Overview¶
The clean_and_split_docs() function provides high-performance batch processing for multiple documents using parallel execution.
from rs_document import Document, clean_and_split_docs
# Create many documents
documents = [
Document(page_content=text, metadata={"id": str(i)})
for i, text in enumerate(text_list)
]
# Process all at once
chunks = clean_and_split_docs(documents, chunk_size=1000)
Functions¶
clean_and_split_docs()¶
Process multiple documents in parallel: clean and split all at once.
Signature:
Description:
Processes a list of documents by cleaning and splitting each one, using parallel execution across all available CPU cores. Returns a flattened list of all resulting chunks.
Parameters:
documents(list[Document]): List of Document instances to processchunk_size(int): Target size for chunks in characters (used forrecursive_character_splitter())
Returns:
list[Document] - Flattened list containing all chunks from all input documents
Processing Steps:
For each document in the input list:
- Runs
document.clean()(applies all cleaning operations) - Splits with
document.recursive_character_splitter(chunk_size) - Processes documents in parallel using all CPU cores
- Returns flattened list of all resulting chunks
Example:
from rs_document import Document, clean_and_split_docs
# Create multiple documents
documents = [
Document(
page_content=f"Document {i} content " * 1000,
metadata={"doc_id": str(i), "source": f"file_{i}.txt"}
)
for i in range(100)
]
# Process all documents in parallel
chunks = clean_and_split_docs(documents, chunk_size=1000)
print(f"Processed {len(documents)} documents")
print(f"Created {len(chunks)} total chunks")
# Chunks are flattened - all mixed together
for chunk in chunks[:5]: # First 5 chunks
print(f"Source: {chunk.metadata['source']}, Length: {len(chunk.page_content)}")
Large Batch Processing:
import time
from rs_document import Document, clean_and_split_docs
# Simulate loading many documents
documents = []
for i in range(10_000):
doc = Document(
page_content="Sample text " * 500, # ~6000 characters
metadata={"id": str(i), "batch": "1"}
)
documents.append(doc)
# Process with timing
start = time.time()
chunks = clean_and_split_docs(documents, chunk_size=1000)
elapsed = time.time() - start
print(f"Processed {len(documents):,} documents")
print(f"Created {len(chunks):,} chunks")
print(f"Time: {elapsed:.2f} seconds")
print(f"Throughput: {len(documents) / elapsed:,.0f} docs/sec")
# Expected output on typical hardware:
# Processed 10,000 documents
# Created 60,000 chunks
# Time: 0.43 seconds
# Throughput: 23,000 docs/sec
Edge Cases:
# Empty list
chunks = clean_and_split_docs([], chunk_size=1000)
print(chunks) # []
# Single document
doc = Document(page_content="Single doc", metadata={})
chunks = clean_and_split_docs([doc], chunk_size=1000)
print(len(chunks)) # 1
# Documents with varying sizes
documents = [
Document(page_content="Short", metadata={"id": "1"}),
Document(page_content="A" * 10000, metadata={"id": "2"}),
Document(page_content="", metadata={"id": "3"}), # Empty
]
chunks = clean_and_split_docs(documents, chunk_size=1000)
# Returns varying number of chunks per document
Metadata Preservation:
documents = [
Document(
page_content="Long text " * 1000,
metadata={"source": "doc1.txt", "author": "Alice", "date": "2024-01-01"}
),
Document(
page_content="More text " * 1000,
metadata={"source": "doc2.txt", "author": "Bob", "date": "2024-01-02"}
),
]
chunks = clean_and_split_docs(documents, chunk_size=500)
# Each chunk retains its parent document's metadata
for chunk in chunks:
print(f"Source: {chunk.metadata['source']}, Author: {chunk.metadata['author']}")
Comparison with Sequential Processing:
import time
from rs_document import Document, clean_and_split_docs
# Create test documents
documents = [
Document(page_content="Text " * 1000, metadata={"id": str(i)})
for i in range(1000)
]
# Method 1: Sequential (manual loop)
start = time.time()
sequential_chunks = []
for doc in documents:
doc.clean()
chunks = doc.recursive_character_splitter(1000)
sequential_chunks.extend(chunks)
sequential_time = time.time() - start
# Method 2: Parallel (clean_and_split_docs)
start = time.time()
parallel_chunks = clean_and_split_docs(documents, chunk_size=1000)
parallel_time = time.time() - start
print(f"Sequential: {sequential_time:.2f}s")
print(f"Parallel: {parallel_time:.2f}s")
print(f"Speedup: {sequential_time / parallel_time:.1f}x")
# Expected results:
# Sequential: 0.50s
# Parallel: 0.02s
# Speedup: 25.0x
Performance Characteristics¶
Parallel Processing¶
clean_and_split_docs() uses Rayon (Rust's parallelism library) to process documents in parallel:
- CPU cores: Automatically uses all available CPU cores
- Work stealing: Efficient load balancing across threads
- No GIL: True parallelism (not limited by Python's Global Interpreter Lock)
Benchmarks¶
Typical performance on modern hardware (8-core CPU):
| Documents | Avg Size | Chunk Size | Time | Throughput |
|---|---|---|---|---|
| 100 | 5 KB | 1000 | 0.004s | 25,000 docs/s |
| 1,000 | 5 KB | 1000 | 0.04s | 25,000 docs/s |
| 10,000 | 5 KB | 1000 | 0.43s | 23,000 docs/s |
| 100,000 | 5 KB | 1000 | 4.3s | 23,000 docs/s |
Key observations:
- Near-linear scaling with document count
- 20-25x faster than pure Python implementations
- Minimal overhead for small batches
- Consistent throughput across batch sizes
Memory Usage¶
Memory consumption is proportional to:
- Total size of input documents
- Number of resulting chunks
- Chunk overlap (~33%)
Example:
# Estimate memory for large batch
num_docs = 100_000
avg_doc_size = 5_000 # bytes
chunk_size = 1_000
chunks_per_doc = 7 # Approximate with overlap
input_memory = num_docs * avg_doc_size # 500 MB
output_memory = num_docs * chunks_per_doc * chunk_size # 700 MB
total_memory = input_memory + output_memory # ~1.2 GB
print(f"Estimated memory: {total_memory / 1e9:.1f} GB")
Memory optimization tips:
- Process in batches if total memory is a concern
- Clear input documents after processing if not needed
- Consider chunk size to control output size
Optimization Tips¶
Batch Size:
# For very large datasets, process in batches
def process_large_dataset(all_documents, batch_size=10_000):
all_chunks = []
for i in range(0, len(all_documents), batch_size):
batch = all_documents[i:i + batch_size]
chunks = clean_and_split_docs(batch, chunk_size=1000)
all_chunks.extend(chunks)
print(f"Processed batch {i // batch_size + 1}")
return all_chunks
# Process 100,000 documents in batches of 10,000
chunks = process_large_dataset(huge_document_list)
Chunk Size Selection:
# Smaller chunks = more total chunks = more memory
chunks_small = clean_and_split_docs(docs, chunk_size=500) # More chunks
# Larger chunks = fewer total chunks = less memory
chunks_large = clean_and_split_docs(docs, chunk_size=2000) # Fewer chunks
print(f"Small chunks: {len(chunks_small)}")
print(f"Large chunks: {len(chunks_large)}")
Metadata Optimization:
# Keep metadata minimal to reduce memory
documents = [
Document(
page_content=text,
metadata={"id": str(i)} # Only essential metadata
)
for i, text in enumerate(texts)
]
chunks = clean_and_split_docs(documents, chunk_size=1000)
Common Patterns¶
Basic Batch Processing¶
from rs_document import Document, clean_and_split_docs
# Load documents
documents = [
Document(page_content=load_file(f), metadata={"source": f})
for f in file_list
]
# Process all at once
chunks = clean_and_split_docs(documents, chunk_size=1000)
Processing with Progress Tracking¶
from rs_document import Document, clean_and_split_docs
def process_with_progress(documents, chunk_size, batch_size=1000):
"""Process documents in batches with progress tracking."""
all_chunks = []
total = len(documents)
for i in range(0, total, batch_size):
batch = documents[i:i + batch_size]
chunks = clean_and_split_docs(batch, chunk_size)
all_chunks.extend(chunks)
processed = min(i + batch_size, total)
print(f"Progress: {processed}/{total} documents ({100*processed/total:.1f}%)")
return all_chunks
# Use it
chunks = process_with_progress(huge_list, chunk_size=1000, batch_size=5000)
From Files to Chunks¶
import os
from rs_document import Document, clean_and_split_docs
def process_directory(directory, chunk_size=1000):
"""Load all text files from directory and process them."""
documents = []
for filename in os.listdir(directory):
if filename.endswith('.txt'):
path = os.path.join(directory, filename)
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
doc = Document(
page_content=content,
metadata={"source": filename, "path": path}
)
documents.append(doc)
print(f"Loaded {len(documents)} documents")
chunks = clean_and_split_docs(documents, chunk_size)
print(f"Created {len(chunks)} chunks")
return chunks
# Process entire directory
chunks = process_directory('/path/to/documents')
Integration with Vector Stores¶
from rs_document import Document, clean_and_split_docs
def prepare_for_embedding(documents, chunk_size=1000):
"""Prepare documents for embedding in vector store."""
# Clean and split
chunks = clean_and_split_docs(documents, chunk_size)
# Add chunk identifiers
for i, chunk in enumerate(chunks):
chunk.metadata["chunk_id"] = str(i)
chunk.metadata["char_count"] = str(len(chunk.page_content))
return chunks
# Prepare for vector database
documents = [...] # Your documents
chunks = prepare_for_embedding(documents)
# Now ready for embedding
# embeddings = embed_model.embed_documents([c.page_content for c in chunks])
# vector_store.add_documents(chunks, embeddings)
See Also¶
- Document Class - Creating Document instances
- Cleaning Methods - Individual cleaning operations
- Splitting Methods - Document splitting details
- Types and Constants - Type signatures and defaults