Skip to content

COSMICChunker

Main orchestrator for the COSMIC chunking pipeline.

Class Definition

class COSMICChunker:
    def __init__(
        self,
        config: Optional[COSMICConfig] = None,
        config_path: Optional[Path] = None,
        taxonomy_path: Optional[Path] = None,
    ) -> None:
        """
        Initialize the COSMIC chunker.

        Args:
            config: Configuration object. If None, uses defaults.
            config_path: Path to YAML configuration file.
            taxonomy_path: Path to domain taxonomy YAML.
        """

Methods

chunk_document

def chunk_document(
    self,
    document: Document,
    strategy: str = "auto",
) -> list[COSMICChunk]:
    """
    Process a document and return chunks.

    Args:
        document: Input document to chunk.
        strategy: Chunking strategy. Options:
            - "auto": Select based on document structure
            - "full": Full 6-stage pipeline
            - "semantic": DCS-based boundaries only
            - "sliding": Sliding window
            - "fixed": Fixed-length splitting

    Returns:
        List of COSMICChunk objects with metadata.

    Raises:
        ConfigurationError: If configuration is invalid.
    """

analyze_structure

def analyze_structure(self, document: Document) -> float:
    """
    Analyze document structure and return structure score.

    Args:
        document: Input document.

    Returns:
        Structure score between 0 and 1.
        Higher scores indicate more structure.
    """

Usage Examples

Basic Usage

from cosmic import COSMICChunker, Document

chunker = COSMICChunker()

doc = Document.from_text("""
Introduction to Machine Learning

Machine learning is a subset of artificial intelligence.
It enables computers to learn from data without explicit programming.

Types of Machine Learning

There are three main types: supervised, unsupervised, and reinforcement.
""")

chunks = chunker.chunk_document(doc, strategy="auto")

for chunk in chunks:
    print(f"Domain: {chunk.domain}")
    print(f"Coherence: {chunk.coherence_score:.2f}")
    print(f"Text: {chunk.text[:100]}...")

With Configuration

from cosmic import COSMICChunker, COSMICConfig
from cosmic.core.config import DCSConfig, ChunkConstraints

config = COSMICConfig(
    dcs=DCSConfig(
        alpha=0.5,
        beta=0.3,
        gamma=0.2,
        threshold=0.5,
    ),
    chunk_constraints=ChunkConstraints(
        min_tokens=50,
        max_tokens=1024,
        target_tokens=512,
    ),
)

chunker = COSMICChunker(config=config)
chunks = chunker.chunk_document(doc)

From YAML Configuration

from pathlib import Path
from cosmic import COSMICChunker

chunker = COSMICChunker(config_path=Path("configs/custom.yaml"))
chunks = chunker.chunk_document(doc)

Strategy Selection

# Full pipeline (highest quality)
chunks = chunker.chunk_document(doc, strategy="full")

# Semantic only (faster)
chunks = chunker.chunk_document(doc, strategy="semantic")

# Sliding window (fast)
chunks = chunker.chunk_document(doc, strategy="sliding")

# Fixed length (fastest)
chunks = chunker.chunk_document(doc, strategy="fixed")

# Auto-select based on structure
chunks = chunker.chunk_document(doc, strategy="auto")

Attributes

Attribute Type Description
config COSMICConfig Current configuration
taxonomy dict Domain taxonomy

Strategy Selection Logic

When strategy="auto", selection is based on structure score:

score = chunker.analyze_structure(doc)

if score > 0.7:
    # Use full pipeline
elif score > 0.4:
    # Use semantic-only
else:
    # Use fallback chain

Error Handling

COSMIC implements graceful degradation:

try:
    chunks = chunker.chunk_document(doc, strategy="full")
except StructureAnalysisError:
    # Falls back to semantic-only
except SemanticBoundaryError:
    # Falls back to sliding window
except Exception:
    # Falls back to fixed-length

Thread Safety

COSMICChunker is not thread-safe. Create separate instances for concurrent processing:

from concurrent.futures import ThreadPoolExecutor

def process_doc(text):
    chunker = COSMICChunker()  # New instance per thread
    doc = Document.from_text(text)
    return chunker.chunk_document(doc)

with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(process_doc, texts))