Skip to content

Utility Functions

Utility functions for processing multiple documents efficiently.

Overview

The clean_and_split_docs() function provides high-performance batch processing for multiple documents using parallel execution.

from rs_document import Document, clean_and_split_docs

# Create many documents
documents = [
    Document(page_content=text, metadata={"id": str(i)})
    for i, text in enumerate(text_list)
]

# Process all at once
chunks = clean_and_split_docs(documents, chunk_size=1000)

Functions

clean_and_split_docs()

Process multiple documents in parallel: clean and split all at once.

Signature:

clean_and_split_docs(
    documents: list[Document],
    chunk_size: int
) -> list[Document]

Description:

Processes a list of documents by cleaning and splitting each one, using parallel execution across all available CPU cores. Returns a flattened list of all resulting chunks.

Parameters:

  • documents (list[Document]): List of Document instances to process
  • chunk_size (int): Target size for chunks in characters (used for recursive_character_splitter())

Returns:

list[Document] - Flattened list containing all chunks from all input documents

Processing Steps:

For each document in the input list:

  1. Runs document.clean() (applies all cleaning operations)
  2. Splits with document.recursive_character_splitter(chunk_size)
  3. Processes documents in parallel using all CPU cores
  4. Returns flattened list of all resulting chunks

Example:

from rs_document import Document, clean_and_split_docs

# Create multiple documents
documents = [
    Document(
        page_content=f"Document {i} content " * 1000,
        metadata={"doc_id": str(i), "source": f"file_{i}.txt"}
    )
    for i in range(100)
]

# Process all documents in parallel
chunks = clean_and_split_docs(documents, chunk_size=1000)

print(f"Processed {len(documents)} documents")
print(f"Created {len(chunks)} total chunks")

# Chunks are flattened - all mixed together
for chunk in chunks[:5]:  # First 5 chunks
    print(f"Source: {chunk.metadata['source']}, Length: {len(chunk.page_content)}")

Large Batch Processing:

import time
from rs_document import Document, clean_and_split_docs

# Simulate loading many documents
documents = []
for i in range(10_000):
    doc = Document(
        page_content="Sample text " * 500,  # ~6000 characters
        metadata={"id": str(i), "batch": "1"}
    )
    documents.append(doc)

# Process with timing
start = time.time()
chunks = clean_and_split_docs(documents, chunk_size=1000)
elapsed = time.time() - start

print(f"Processed {len(documents):,} documents")
print(f"Created {len(chunks):,} chunks")
print(f"Time: {elapsed:.2f} seconds")
print(f"Throughput: {len(documents) / elapsed:,.0f} docs/sec")

# Expected output on typical hardware:
# Processed 10,000 documents
# Created 60,000 chunks
# Time: 0.43 seconds
# Throughput: 23,000 docs/sec

Edge Cases:

# Empty list
chunks = clean_and_split_docs([], chunk_size=1000)
print(chunks)  # []

# Single document
doc = Document(page_content="Single doc", metadata={})
chunks = clean_and_split_docs([doc], chunk_size=1000)
print(len(chunks))  # 1

# Documents with varying sizes
documents = [
    Document(page_content="Short", metadata={"id": "1"}),
    Document(page_content="A" * 10000, metadata={"id": "2"}),
    Document(page_content="", metadata={"id": "3"}),  # Empty
]
chunks = clean_and_split_docs(documents, chunk_size=1000)
# Returns varying number of chunks per document

Metadata Preservation:

documents = [
    Document(
        page_content="Long text " * 1000,
        metadata={"source": "doc1.txt", "author": "Alice", "date": "2024-01-01"}
    ),
    Document(
        page_content="More text " * 1000,
        metadata={"source": "doc2.txt", "author": "Bob", "date": "2024-01-02"}
    ),
]

chunks = clean_and_split_docs(documents, chunk_size=500)

# Each chunk retains its parent document's metadata
for chunk in chunks:
    print(f"Source: {chunk.metadata['source']}, Author: {chunk.metadata['author']}")

Comparison with Sequential Processing:

import time
from rs_document import Document, clean_and_split_docs

# Create test documents
documents = [
    Document(page_content="Text " * 1000, metadata={"id": str(i)})
    for i in range(1000)
]

# Method 1: Sequential (manual loop)
start = time.time()
sequential_chunks = []
for doc in documents:
    doc.clean()
    chunks = doc.recursive_character_splitter(1000)
    sequential_chunks.extend(chunks)
sequential_time = time.time() - start

# Method 2: Parallel (clean_and_split_docs)
start = time.time()
parallel_chunks = clean_and_split_docs(documents, chunk_size=1000)
parallel_time = time.time() - start

print(f"Sequential: {sequential_time:.2f}s")
print(f"Parallel: {parallel_time:.2f}s")
print(f"Speedup: {sequential_time / parallel_time:.1f}x")

# Expected results:
# Sequential: 0.50s
# Parallel: 0.02s
# Speedup: 25.0x

Performance Characteristics

Parallel Processing

clean_and_split_docs() uses Rayon (Rust's parallelism library) to process documents in parallel:

  • CPU cores: Automatically uses all available CPU cores
  • Work stealing: Efficient load balancing across threads
  • No GIL: True parallelism (not limited by Python's Global Interpreter Lock)

Benchmarks

Typical performance on modern hardware (8-core CPU):

Documents Avg Size Chunk Size Time Throughput
100 5 KB 1000 0.004s 25,000 docs/s
1,000 5 KB 1000 0.04s 25,000 docs/s
10,000 5 KB 1000 0.43s 23,000 docs/s
100,000 5 KB 1000 4.3s 23,000 docs/s

Key observations:

  • Near-linear scaling with document count
  • 20-25x faster than pure Python implementations
  • Minimal overhead for small batches
  • Consistent throughput across batch sizes

Memory Usage

Memory consumption is proportional to:

  • Total size of input documents
  • Number of resulting chunks
  • Chunk overlap (~33%)

Example:

# Estimate memory for large batch
num_docs = 100_000
avg_doc_size = 5_000  # bytes
chunk_size = 1_000
chunks_per_doc = 7  # Approximate with overlap

input_memory = num_docs * avg_doc_size  # 500 MB
output_memory = num_docs * chunks_per_doc * chunk_size  # 700 MB
total_memory = input_memory + output_memory  # ~1.2 GB

print(f"Estimated memory: {total_memory / 1e9:.1f} GB")

Memory optimization tips:

  • Process in batches if total memory is a concern
  • Clear input documents after processing if not needed
  • Consider chunk size to control output size

Optimization Tips

Batch Size:

# For very large datasets, process in batches
def process_large_dataset(all_documents, batch_size=10_000):
    all_chunks = []

    for i in range(0, len(all_documents), batch_size):
        batch = all_documents[i:i + batch_size]
        chunks = clean_and_split_docs(batch, chunk_size=1000)
        all_chunks.extend(chunks)

        print(f"Processed batch {i // batch_size + 1}")

    return all_chunks

# Process 100,000 documents in batches of 10,000
chunks = process_large_dataset(huge_document_list)

Chunk Size Selection:

# Smaller chunks = more total chunks = more memory
chunks_small = clean_and_split_docs(docs, chunk_size=500)   # More chunks

# Larger chunks = fewer total chunks = less memory
chunks_large = clean_and_split_docs(docs, chunk_size=2000)  # Fewer chunks

print(f"Small chunks: {len(chunks_small)}")
print(f"Large chunks: {len(chunks_large)}")

Metadata Optimization:

# Keep metadata minimal to reduce memory
documents = [
    Document(
        page_content=text,
        metadata={"id": str(i)}  # Only essential metadata
    )
    for i, text in enumerate(texts)
]

chunks = clean_and_split_docs(documents, chunk_size=1000)

Common Patterns

Basic Batch Processing

from rs_document import Document, clean_and_split_docs

# Load documents
documents = [
    Document(page_content=load_file(f), metadata={"source": f})
    for f in file_list
]

# Process all at once
chunks = clean_and_split_docs(documents, chunk_size=1000)

Processing with Progress Tracking

from rs_document import Document, clean_and_split_docs

def process_with_progress(documents, chunk_size, batch_size=1000):
    """Process documents in batches with progress tracking."""
    all_chunks = []
    total = len(documents)

    for i in range(0, total, batch_size):
        batch = documents[i:i + batch_size]
        chunks = clean_and_split_docs(batch, chunk_size)
        all_chunks.extend(chunks)

        processed = min(i + batch_size, total)
        print(f"Progress: {processed}/{total} documents ({100*processed/total:.1f}%)")

    return all_chunks

# Use it
chunks = process_with_progress(huge_list, chunk_size=1000, batch_size=5000)

From Files to Chunks

import os
from rs_document import Document, clean_and_split_docs

def process_directory(directory, chunk_size=1000):
    """Load all text files from directory and process them."""
    documents = []

    for filename in os.listdir(directory):
        if filename.endswith('.txt'):
            path = os.path.join(directory, filename)
            with open(path, 'r', encoding='utf-8') as f:
                content = f.read()

            doc = Document(
                page_content=content,
                metadata={"source": filename, "path": path}
            )
            documents.append(doc)

    print(f"Loaded {len(documents)} documents")
    chunks = clean_and_split_docs(documents, chunk_size)
    print(f"Created {len(chunks)} chunks")

    return chunks

# Process entire directory
chunks = process_directory('/path/to/documents')

Integration with Vector Stores

from rs_document import Document, clean_and_split_docs

def prepare_for_embedding(documents, chunk_size=1000):
    """Prepare documents for embedding in vector store."""
    # Clean and split
    chunks = clean_and_split_docs(documents, chunk_size)

    # Add chunk identifiers
    for i, chunk in enumerate(chunks):
        chunk.metadata["chunk_id"] = str(i)
        chunk.metadata["char_count"] = str(len(chunk.page_content))

    return chunks

# Prepare for vector database
documents = [...]  # Your documents
chunks = prepare_for_embedding(documents)

# Now ready for embedding
# embeddings = embed_model.embed_documents([c.page_content for c in chunks])
# vector_store.add_documents(chunks, embeddings)

See Also