Skip to content

Batch Processing

Process multiple documents efficiently with COSMIC's batch processing capabilities.

Python API

Basic Batch Processing

from cosmic import BatchProcessor, Document, COSMICConfig

# Create processor
processor = BatchProcessor(
    config=COSMICConfig(),
    max_workers=4,
)

# Prepare documents
documents = [
    Document.from_text(text, doc_id=f"doc_{i}")
    for i, text in enumerate(texts)
]

# Process with progress bar
result = processor.process(
    documents,
    strategy="auto",
    show_progress=True,
)

# Access results
print(f"Processed: {result.documents_processed}")
print(f"Failed: {result.documents_failed}")
print(f"Total chunks: {result.total_chunks}")
print(f"Success rate: {result.success_rate:.1%}")

Accessing Results

# Get chunks by document
for doc_id, chunks in result.chunks_by_document.items():
    print(f"{doc_id}: {len(chunks)} chunks")

# Get all chunks
all_chunks = result.get_all_chunks()

# Get failed documents
for doc_id, error in result.errors.items():
    print(f"{doc_id} failed: {error}")

Streaming Processing

For memory-efficient processing of large document sets:

from cosmic import StreamingProcessor, Document

processor = StreamingProcessor()

# Process as generator
for doc_id, chunks in processor.process_stream(documents):
    print(f"Processed {doc_id}: {len(chunks)} chunks")
    # Process chunks immediately
    save_to_database(chunks)
    # Memory is freed after each iteration

Async Processing

import asyncio
from cosmic import BatchProcessor

async def process_async():
    processor = BatchProcessor(max_workers=4)
    result = await processor.process_async(documents, strategy="auto")
    return result

result = asyncio.run(process_async())

CLI Batch Processing

Using Shell Loops

# Process multiple files
for f in documents/*.txt; do
    cosmic chunk "$f" -o "output/$(basename "$f" .txt).json"
done

Parallel Processing with xargs

# Process 4 files in parallel
ls documents/*.txt | xargs -P4 -I{} cosmic chunk {} -o output/{}.json

Using GNU Parallel

# More control over parallelism
parallel -j4 cosmic chunk {} -o output/{/.}.json ::: documents/*.txt

Configuration

Worker Count

# More workers for I/O-bound tasks
processor = BatchProcessor(max_workers=8)

# Fewer workers for memory-constrained systems
processor = BatchProcessor(max_workers=2)

Error Handling

# Continue on errors (default)
result = processor.process(documents, fail_fast=False)

# Stop on first error
result = processor.process(documents, fail_fast=True)

Progress Reporting

# With progress bar (tqdm)
result = processor.process(documents, show_progress=True)

# Custom callback
def on_complete(doc_id, chunks, error):
    if error:
        log.error(f"{doc_id}: {error}")
    else:
        log.info(f"{doc_id}: {len(chunks)} chunks")

result = processor.process(documents, callback=on_complete)

Performance Tips

Memory Management

# Stream results to avoid memory buildup
for doc_id, chunks in processor.process_stream(documents):
    save_chunks(chunks)
    # Chunks are garbage collected after each iteration

GPU Utilization

# Batch embeddings for better GPU utilization
config = COSMICConfig()
config.embedding.batch_size = 128  # Increase for more GPU memory

processor = BatchProcessor(config=config)

Strategy Selection

# Use faster strategy for large batches
result = processor.process(documents, strategy="semantic")  # Faster

# Use auto for varied documents
result = processor.process(documents, strategy="auto")  # Adaptive

Result Object

The BatchResult object contains:

@dataclass
class BatchResult:
    documents_processed: int      # Successfully processed count
    documents_failed: int         # Failed count
    total_chunks: int            # Total chunks created
    chunks_by_document: dict     # doc_id -> list[COSMICChunk]
    errors: dict                 # doc_id -> error message
    processing_time: float       # Total time in seconds

    @property
    def success_rate(self) -> float:
        """Percentage of documents successfully processed."""

    def get_all_chunks(self) -> list[COSMICChunk]:
        """Get flat list of all chunks."""

Example: Processing a Directory

from pathlib import Path
from cosmic import BatchProcessor, Document

def process_directory(input_dir: Path, output_dir: Path):
    # Load all documents
    documents = []
    for file_path in input_dir.glob("*.txt"):
        doc = Document.from_file(file_path)
        documents.append(doc)

    # Process
    processor = BatchProcessor(max_workers=4)
    result = processor.process(documents, strategy="auto", show_progress=True)

    # Save results
    output_dir.mkdir(exist_ok=True)
    for doc_id, chunks in result.chunks_by_document.items():
        output_path = output_dir / f"{doc_id}.json"
        save_chunks_to_json(chunks, output_path)

    print(f"Processed {result.documents_processed} documents")
    print(f"Created {result.total_chunks} chunks")
    print(f"Success rate: {result.success_rate:.1%}")

# Usage
process_directory(Path("input"), Path("output"))