Batch Processing¶
Process multiple documents efficiently with COSMIC's batch processing capabilities.
Python API¶
Basic Batch Processing¶
from cosmic import BatchProcessor, Document, COSMICConfig
# Create processor
processor = BatchProcessor(
config=COSMICConfig(),
max_workers=4,
)
# Prepare documents
documents = [
Document.from_text(text, doc_id=f"doc_{i}")
for i, text in enumerate(texts)
]
# Process with progress bar
result = processor.process(
documents,
strategy="auto",
show_progress=True,
)
# Access results
print(f"Processed: {result.documents_processed}")
print(f"Failed: {result.documents_failed}")
print(f"Total chunks: {result.total_chunks}")
print(f"Success rate: {result.success_rate:.1%}")
Accessing Results¶
# Get chunks by document
for doc_id, chunks in result.chunks_by_document.items():
print(f"{doc_id}: {len(chunks)} chunks")
# Get all chunks
all_chunks = result.get_all_chunks()
# Get failed documents
for doc_id, error in result.errors.items():
print(f"{doc_id} failed: {error}")
Streaming Processing¶
For memory-efficient processing of large document sets:
from cosmic import StreamingProcessor, Document
processor = StreamingProcessor()
# Process as generator
for doc_id, chunks in processor.process_stream(documents):
print(f"Processed {doc_id}: {len(chunks)} chunks")
# Process chunks immediately
save_to_database(chunks)
# Memory is freed after each iteration
Async Processing¶
import asyncio
from cosmic import BatchProcessor
async def process_async():
processor = BatchProcessor(max_workers=4)
result = await processor.process_async(documents, strategy="auto")
return result
result = asyncio.run(process_async())
CLI Batch Processing¶
Using Shell Loops¶
# Process multiple files
for f in documents/*.txt; do
cosmic chunk "$f" -o "output/$(basename "$f" .txt).json"
done
Parallel Processing with xargs¶
Using GNU Parallel¶
# More control over parallelism
parallel -j4 cosmic chunk {} -o output/{/.}.json ::: documents/*.txt
Configuration¶
Worker Count¶
# More workers for I/O-bound tasks
processor = BatchProcessor(max_workers=8)
# Fewer workers for memory-constrained systems
processor = BatchProcessor(max_workers=2)
Error Handling¶
# Continue on errors (default)
result = processor.process(documents, fail_fast=False)
# Stop on first error
result = processor.process(documents, fail_fast=True)
Progress Reporting¶
# With progress bar (tqdm)
result = processor.process(documents, show_progress=True)
# Custom callback
def on_complete(doc_id, chunks, error):
if error:
log.error(f"{doc_id}: {error}")
else:
log.info(f"{doc_id}: {len(chunks)} chunks")
result = processor.process(documents, callback=on_complete)
Performance Tips¶
Memory Management¶
# Stream results to avoid memory buildup
for doc_id, chunks in processor.process_stream(documents):
save_chunks(chunks)
# Chunks are garbage collected after each iteration
GPU Utilization¶
# Batch embeddings for better GPU utilization
config = COSMICConfig()
config.embedding.batch_size = 128 # Increase for more GPU memory
processor = BatchProcessor(config=config)
Strategy Selection¶
# Use faster strategy for large batches
result = processor.process(documents, strategy="semantic") # Faster
# Use auto for varied documents
result = processor.process(documents, strategy="auto") # Adaptive
Result Object¶
The BatchResult object contains:
@dataclass
class BatchResult:
documents_processed: int # Successfully processed count
documents_failed: int # Failed count
total_chunks: int # Total chunks created
chunks_by_document: dict # doc_id -> list[COSMICChunk]
errors: dict # doc_id -> error message
processing_time: float # Total time in seconds
@property
def success_rate(self) -> float:
"""Percentage of documents successfully processed."""
def get_all_chunks(self) -> list[COSMICChunk]:
"""Get flat list of all chunks."""
Example: Processing a Directory¶
from pathlib import Path
from cosmic import BatchProcessor, Document
def process_directory(input_dir: Path, output_dir: Path):
# Load all documents
documents = []
for file_path in input_dir.glob("*.txt"):
doc = Document.from_file(file_path)
documents.append(doc)
# Process
processor = BatchProcessor(max_workers=4)
result = processor.process(documents, strategy="auto", show_progress=True)
# Save results
output_dir.mkdir(exist_ok=True)
for doc_id, chunks in result.chunks_by_document.items():
output_path = output_dir / f"{doc_id}.json"
save_chunks_to_json(chunks, output_path)
print(f"Processed {result.documents_processed} documents")
print(f"Created {result.total_chunks} chunks")
print(f"Success rate: {result.success_rate:.1%}")
# Usage
process_directory(Path("input"), Path("output"))