Skip to content

Splitter

Introduction

The Splitter component implements the main functionality of this library. This component is designed to deliver classes (inherited from BaseSplitter) which supports to split a markdown text or a string following many different strategies.

Splitter strategies description

Splitting Technique Description
Character Splitter Splits text into chunks based on a specified number of characters. Supports overlapping by character count or percentage.
Parameters: chunk_size (max chars per chunk), chunk_overlap (overlapping chars: int or %).
Compatible with: Text.
Word Splitter Splits text into chunks based on a specified number of words. Supports overlapping by word count or percentage.
Parameters: chunk_size (max words per chunk), chunk_overlap (overlapping words: int or %).
Compatible with: Text.
Sentence Splitter Splits text into chunks by a specified number of sentences. Allows overlap defined by a number or percentage of words from the end of the previous chunk. Customizable sentence separators (e.g., ., !, ?).
Parameters: chunk_size (max sentences per chunk), chunk_overlap (overlapping words: int or %), sentence_separators (list of characters).
Compatible with: Text.
Paragraph Splitter Splits text into chunks based on a specified number of paragraphs. Allows overlapping by word count or percentage, and customizable line breaks.
Parameters: chunk_size (max paragraphs per chunk), chunk_overlap (overlapping words: int or %), line_break (delimiter(s) for paragraphs).
Compatible with: Text.
Recursive Splitter Recursively splits text based on a hierarchy of separators (e.g., paragraph, sentence, word, character) until chunks reach a target size. Tries to preserve semantic units as long as possible.
Parameters: chunk_size (max chars per chunk), chunk_overlap (overlapping chars), separators (list of characters to split on, e.g., ["\n\n", "\n", " ", ""]).
Compatible with: Text.
Keyword Splitter Splits text into chunks around matches of specified keywords, using one or more regex patterns. Supports precise boundary control—matched keywords can be included before, after, both sides, or omitted from the split. Each keyword can have a custom name (via dict) for metadata counting. Secondary soft-wrapping by chunk_size is supported.
Parameters: patterns (list of regex patterns, or dict mapping names to patterns), include_delimiters ("before", "after", "both", or "none"), flags (regex flags, e.g. re.MULTILINE), chunk_size (max chars per chunk, soft-wrapped).
Compatible with: Text.
Token Splitter Splits text into chunks based on the number of tokens, using various tokenization models (e.g., tiktoken, spaCy, NLTK). Useful for ensuring chunks are compatible with LLM context limits.
Parameters: chunk_size (max tokens per chunk), model_name (tokenizer/model, e.g., "tiktoken/cl100k_base", "spacy/en_core_web_sm", "nltk/punkt"), language (for NLTK).
Compatible with: Text.
Paged Splitter Splits text by pages for documents that have page structure. Each chunk contains a specified number of pages, with optional word overlap.
Parameters: num_pages (pages per chunk), chunk_overlap (overlapping words).
Compatible with: Word, PDF, Excel, PowerPoint.
Row/Column Splitter For tabular formats, splits data by a set number of rows or columns per chunk, with possible overlap. Row-based and column-based splitting are mutually exclusive.
Parameters: num_rows, num_cols (rows/columns per chunk), overlap (overlapping rows or columns).
Compatible with: Tabular formats (csv, tsv, parquet, flat json).
JSON Splitter Recursively splits JSON documents into smaller sub-structures that preserve the original JSON schema.
Parameters: max_chunk_size (max chars per chunk), min_chunk_size (min chars per chunk).
Compatible with: JSON.
Semantic Splitter Splits text into chunks based on semantic similarity, using an embedding model and a max tokens parameter. Useful for meaningful semantic groupings.
Parameters: embedding_model (model for embeddings), max_tokens (max tokens per chunk).
Compatible with: Text.
HTML Tag Splitter Splits HTML content based on a specified tag, or automatically detects the most frequent and shallowest tag if not specified. Each chunk is a complete HTML fragment for that tag.
Parameters: chunk_size (max chars per chunk), tag (HTML tag to split on, optional).
Compatible with: HTML.
Header Splitter Splits Markdown or HTML documents into chunks using header levels (e.g., #, ##, or <h1>, <h2>). Uses configurable headers for chunking.
Parameters: headers_to_split_on (list of headers and semantic names), chunk_size (unused, for compatibility).
Compatible with: Markdown, HTML.
Code Splitter Splits source code files into programmatically meaningful chunks (functions, classes, methods, etc.), aware of the syntax of the specified programming language (e.g., Python, Java, Kotlin). Uses language-aware logic to avoid splitting inside code blocks.
Parameters: chunk_size (max chars per chunk), language (programming language as string, e.g., "python", "java").
Compatible with: Source code files (Python, Java, Kotlin, C++, JavaScript, Go, etc.).

Output format

Bases: BaseModel

Pydantic model defining the output structure for all splitters.

Attributes:

Name Type Description
chunks List[str]

List of text chunks produced by splitting.

chunk_id List[str]

List of unique IDs corresponding to each chunk.

document_name Optional[str]

The name of the document.

document_path str

The path to the document.

document_id Optional[str]

A unique identifier for the document.

conversion_method Optional[str]

The method used for document conversion.

reader_method Optional[str]

The method used for reading the document.

ocr_method Optional[str]

The OCR method used, if any.

split_method str

The method used to split the document.

split_params Optional[Dict[str, Any]]

Parameters used during the splitting process.

metadata Optional[Dict[str, Any]]

Additional metadata associated with the splitting.

Source code in src/splitter_mr/schema/models.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
class SplitterOutput(BaseModel):
    """Pydantic model defining the output structure for all splitters.

    Attributes:
        chunks: List of text chunks produced by splitting.
        chunk_id: List of unique IDs corresponding to each chunk.
        document_name: The name of the document.
        document_path: The path to the document.
        document_id: A unique identifier for the document.
        conversion_method: The method used for document conversion.
        reader_method: The method used for reading the document.
        ocr_method: The OCR method used, if any.
        split_method: The method used to split the document.
        split_params: Parameters used during the splitting process.
        metadata: Additional metadata associated with the splitting.
    """

    chunks: List[str] = Field(default_factory=list)
    chunk_id: List[str] = Field(default_factory=list)
    document_name: Optional[str] = None
    document_path: str = ""
    document_id: Optional[str] = None
    conversion_method: Optional[str] = None
    reader_method: Optional[str] = None
    ocr_method: Optional[str] = None
    split_method: str = ""
    split_params: Optional[Dict[str, Any]] = Field(default_factory=dict)
    metadata: Optional[Dict[str, Any]] = Field(default_factory=dict)

    @model_validator(mode="after")
    def validate_and_set_defaults(self):
        """Validates and sets defaults for the SplitterOutput instance.

        Raises:
            ValueError: If `chunks` is empty or if `chunk_id` length does not match `chunks` length.

        Returns:
            self (SplitterOutput): The validated and updated instance.
        """
        if not self.chunks:
            raise ValueError("Chunks list cannot be empty.")

        if self.chunk_id is not None:
            if len(self.chunk_id) != len(self.chunks):
                raise ValueError(
                    f"chunk_id length ({len(self.chunk_id)}) does not match chunks length ({len(self.chunks)})."
                )
        else:
            self.chunk_id = [str(uuid.uuid4()) for _ in self.chunks]

        if not self.document_id:
            self.document_id = str(uuid.uuid4())

        return self

    @classmethod
    def from_chunks(cls, chunks: List[str]) -> "SplitterOutput":
        """Create a SplitterOutput from a list of chunks, with all other fields set to their defaults.

        Args:
            chunks (List[str]): A list of text chunks.

        Returns:
            SplitterOutput: An instance of SplitterOutput with the given chunks.
        """
        return cls(chunks=chunks)

    def append_metadata(self, metadata: Dict[str, Any]) -> None:
        """
        Append (update) the metadata dictionary with new key-value pairs.

        Args:
            metadata (Dict[str, Any]): The metadata to add or update.
        """
        if self.metadata is None:
            self.metadata = {}
        self.metadata.update(metadata)
append_metadata(metadata)

Append (update) the metadata dictionary with new key-value pairs.

Parameters:

Name Type Description Default
metadata Dict[str, Any]

The metadata to add or update.

required
Source code in src/splitter_mr/schema/models.py
183
184
185
186
187
188
189
190
191
192
def append_metadata(self, metadata: Dict[str, Any]) -> None:
    """
    Append (update) the metadata dictionary with new key-value pairs.

    Args:
        metadata (Dict[str, Any]): The metadata to add or update.
    """
    if self.metadata is None:
        self.metadata = {}
    self.metadata.update(metadata)
from_chunks(chunks) classmethod

Create a SplitterOutput from a list of chunks, with all other fields set to their defaults.

Parameters:

Name Type Description Default
chunks List[str]

A list of text chunks.

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

An instance of SplitterOutput with the given chunks.

Source code in src/splitter_mr/schema/models.py
171
172
173
174
175
176
177
178
179
180
181
@classmethod
def from_chunks(cls, chunks: List[str]) -> "SplitterOutput":
    """Create a SplitterOutput from a list of chunks, with all other fields set to their defaults.

    Args:
        chunks (List[str]): A list of text chunks.

    Returns:
        SplitterOutput: An instance of SplitterOutput with the given chunks.
    """
    return cls(chunks=chunks)
validate_and_set_defaults()

Validates and sets defaults for the SplitterOutput instance.

Raises:

Type Description
ValueError

If chunks is empty or if chunk_id length does not match chunks length.

Returns:

Name Type Description
self SplitterOutput

The validated and updated instance.

Source code in src/splitter_mr/schema/models.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
@model_validator(mode="after")
def validate_and_set_defaults(self):
    """Validates and sets defaults for the SplitterOutput instance.

    Raises:
        ValueError: If `chunks` is empty or if `chunk_id` length does not match `chunks` length.

    Returns:
        self (SplitterOutput): The validated and updated instance.
    """
    if not self.chunks:
        raise ValueError("Chunks list cannot be empty.")

    if self.chunk_id is not None:
        if len(self.chunk_id) != len(self.chunks):
            raise ValueError(
                f"chunk_id length ({len(self.chunk_id)}) does not match chunks length ({len(self.chunks)})."
            )
    else:
        self.chunk_id = [str(uuid.uuid4()) for _ in self.chunks]

    if not self.document_id:
        self.document_id = str(uuid.uuid4())

    return self

Splitters

BaseSplitter

BaseSplitter

Bases: ABC

Abstract base class for all splitter implementations.

This class defines the common interface and utility methods for splitters that divide text or data into smaller chunks, typically for downstream natural language processing tasks or information retrieval. Subclasses should implement the split method, which takes in a dictionary (typically from a document reader) and returns a structured output with the required chunking.

Attributes:

Name Type Description
chunk_size int

The maximum number of units (e.g., characters, words, etc.) per chunk.

Methods:

Name Description
split

Abstract method. Should be implemented by all subclasses to perform the actual splitting logic.

_generate_chunk_ids

Generates a list of unique chunk IDs using UUID4, for use in the output.

_default_metadata

Returns a default (empty) metadata dictionary, which can be extended by subclasses.

Source code in src/splitter_mr/splitter/base_splitter.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class BaseSplitter(ABC):
    """
    Abstract base class for all splitter implementations.

    This class defines the common interface and utility methods for splitters that
    divide text or data into smaller chunks, typically for downstream natural language
    processing tasks or information retrieval. Subclasses should implement the `split`
    method, which takes in a dictionary (typically from a document reader) and returns
    a structured output with the required chunking.

    Attributes:
        chunk_size (int): The maximum number of units (e.g., characters, words, etc.) per chunk.

    Methods:
        split: Abstract method. Should be implemented by all subclasses to perform the actual
            splitting logic.

        _generate_chunk_ids: Generates a list of unique chunk IDs using UUID4, for use in the output.

        _default_metadata: Returns a default (empty) metadata dictionary, which can be extended by subclasses.
    """

    def __init__(self, chunk_size: int = 1000):
        """
        Initializer method for BaseSplitter classes
        """
        self.chunk_size = chunk_size

    @abstractmethod
    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Abstract method to split input data into chunks.

        Args:
            reader_output (ReaderOutput): Input data, typically from a document reader,
                including the text to split and any relevant metadata.

        Returns:
            SplitterOutput: A dictionary containing split chunks and associated metadata.
        """

    def _generate_chunk_ids(self, num_chunks: int) -> List[str]:
        """
        Generate a list of unique chunk identifiers.

        Args:
            num_chunks (int): Number of chunk IDs to generate.

        Returns:
            List[str]: List of unique string IDs (UUID4).
        """
        return [str(uuid.uuid4()) for _ in range(num_chunks)]

    def _default_metadata(self) -> dict:
        """
        Return a default metadata dictionary.

        Returns:
            dict: An empty dictionary; subclasses may override to provide additional metadata.
        """
        return {}
__init__(chunk_size=1000)

Initializer method for BaseSplitter classes

Source code in src/splitter_mr/splitter/base_splitter.py
30
31
32
33
34
def __init__(self, chunk_size: int = 1000):
    """
    Initializer method for BaseSplitter classes
    """
    self.chunk_size = chunk_size
split(reader_output) abstractmethod

Abstract method to split input data into chunks.

Parameters:

Name Type Description Default
reader_output ReaderOutput

Input data, typically from a document reader, including the text to split and any relevant metadata.

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

A dictionary containing split chunks and associated metadata.

Source code in src/splitter_mr/splitter/base_splitter.py
36
37
38
39
40
41
42
43
44
45
46
47
@abstractmethod
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Abstract method to split input data into chunks.

    Args:
        reader_output (ReaderOutput): Input data, typically from a document reader,
            including the text to split and any relevant metadata.

    Returns:
        SplitterOutput: A dictionary containing split chunks and associated metadata.
    """

CharacterSplitter

CharacterSplitter

Bases: BaseSplitter

CharacterSplitter splits a given text into overlapping or non-overlapping chunks based on a specified number of characters per chunk.

This splitter is configurable with a maximum chunk size (chunk_size) and an overlap between consecutive chunks (chunk_overlap). The overlap can be specified either as an integer (number of characters) or as a float between 0 and 1 (fraction of chunk size). This is particularly useful for downstream NLP tasks where context preservation between chunks is important.

Parameters:

Name Type Description Default
chunk_size int

Maximum number of characters per chunk.

1000
chunk_overlap Union[int, float]

Number or percentage of overlapping characters between chunks.

0
Source code in src/splitter_mr/splitter/splitters/character_splitter.py
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class CharacterSplitter(BaseSplitter):
    """
    CharacterSplitter splits a given text into overlapping or non-overlapping chunks
    based on a specified number of characters per chunk.

    This splitter is configurable with a maximum chunk size (`chunk_size`) and an overlap
    between consecutive chunks (`chunk_overlap`). The overlap can be specified either as
    an integer (number of characters) or as a float between 0 and 1 (fraction of chunk size).
    This is particularly useful for downstream NLP tasks where context preservation between
    chunks is important.

    Args:
        chunk_size (int): Maximum number of characters per chunk.
        chunk_overlap (Union[int, float]): Number or percentage of overlapping characters
            between chunks.
    """

    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 0):
        super().__init__(chunk_size)
        self.chunk_overlap = chunk_overlap

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Splits the input text from the reader_output dictionary into character-based chunks.

        Each chunk contains at most `chunk_size` characters, and adjacent chunks can overlap
        by a specified number or percentage of characters, according to the `chunk_overlap`
        parameter set at initialization. Returns a dictionary with the same document metadata,
        unique chunk identifiers, and the split parameters used.

        Args:
            reader_output (Dict[str, Any]):
                Dictionary containing at least a 'text' key (str) and optional document metadata
                (e.g., 'document_name', 'document_path', etc.).

        Returns:
            SplitterOutput: Dataclass defining the output structure for all splitters.

        Raises:
            ValueError: If chunk_overlap is greater than or equal to chunk_size.

        Example:
            ```python
            from splitter_mr.splitter import CharacterSplitter

            # This dictionary has been obtained as the output from a Reader object.
            reader_output = ReaderOutput(
                text: "abcdefghijklmnopqrstuvwxyz",
                document_name: "doc.txt",
                document_path: "/path/doc.txt",
            )
            splitter = CharacterSplitter(chunk_size=5, chunk_overlap=2)
            output = splitter.split(reader_output)
            print(output["chunks"])
            ```
            ```python
            ['abcde', 'defgh', 'ghijk', ..., 'yz']
            ```
        """
        # Initialize variables
        text = reader_output.text
        chunk_size = self.chunk_size

        # Determine overlap in characters
        if isinstance(self.chunk_overlap, float) and 0 <= self.chunk_overlap < 1:
            overlap = int(chunk_size * self.chunk_overlap)
        else:
            overlap = int(self.chunk_overlap)
        if overlap >= chunk_size:
            raise ValueError("chunk_overlap must be smaller than chunk_size")

        # Split into chunks
        chunks = []
        start = 0
        while start < len(text):
            end = start + chunk_size
            chunks.append(text[start:end])
            start += chunk_size - overlap if (chunk_size - overlap) > 0 else 1

        # Generate chunk_id and append metadata
        chunk_ids = self._generate_chunk_ids(len(chunks))
        metadata = self._default_metadata()

        # Return output
        output = SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="character_splitter",
            split_params={
                "chunk_size": self.chunk_size,
                "chunk_overlap": self.chunk_overlap,
            },
            metadata=metadata,
        )
        return output
split(reader_output)

Splits the input text from the reader_output dictionary into character-based chunks.

Each chunk contains at most chunk_size characters, and adjacent chunks can overlap by a specified number or percentage of characters, according to the chunk_overlap parameter set at initialization. Returns a dictionary with the same document metadata, unique chunk identifiers, and the split parameters used.

Parameters:

Name Type Description Default
reader_output Dict[str, Any]

Dictionary containing at least a 'text' key (str) and optional document metadata (e.g., 'document_name', 'document_path', etc.).

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

Dataclass defining the output structure for all splitters.

Raises:

Type Description
ValueError

If chunk_overlap is greater than or equal to chunk_size.

Example

from splitter_mr.splitter import CharacterSplitter

# This dictionary has been obtained as the output from a Reader object.
reader_output = ReaderOutput(
    text: "abcdefghijklmnopqrstuvwxyz",
    document_name: "doc.txt",
    document_path: "/path/doc.txt",
)
splitter = CharacterSplitter(chunk_size=5, chunk_overlap=2)
output = splitter.split(reader_output)
print(output["chunks"])
['abcde', 'defgh', 'ghijk', ..., 'yz']

Source code in src/splitter_mr/splitter/splitters/character_splitter.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Splits the input text from the reader_output dictionary into character-based chunks.

    Each chunk contains at most `chunk_size` characters, and adjacent chunks can overlap
    by a specified number or percentage of characters, according to the `chunk_overlap`
    parameter set at initialization. Returns a dictionary with the same document metadata,
    unique chunk identifiers, and the split parameters used.

    Args:
        reader_output (Dict[str, Any]):
            Dictionary containing at least a 'text' key (str) and optional document metadata
            (e.g., 'document_name', 'document_path', etc.).

    Returns:
        SplitterOutput: Dataclass defining the output structure for all splitters.

    Raises:
        ValueError: If chunk_overlap is greater than or equal to chunk_size.

    Example:
        ```python
        from splitter_mr.splitter import CharacterSplitter

        # This dictionary has been obtained as the output from a Reader object.
        reader_output = ReaderOutput(
            text: "abcdefghijklmnopqrstuvwxyz",
            document_name: "doc.txt",
            document_path: "/path/doc.txt",
        )
        splitter = CharacterSplitter(chunk_size=5, chunk_overlap=2)
        output = splitter.split(reader_output)
        print(output["chunks"])
        ```
        ```python
        ['abcde', 'defgh', 'ghijk', ..., 'yz']
        ```
    """
    # Initialize variables
    text = reader_output.text
    chunk_size = self.chunk_size

    # Determine overlap in characters
    if isinstance(self.chunk_overlap, float) and 0 <= self.chunk_overlap < 1:
        overlap = int(chunk_size * self.chunk_overlap)
    else:
        overlap = int(self.chunk_overlap)
    if overlap >= chunk_size:
        raise ValueError("chunk_overlap must be smaller than chunk_size")

    # Split into chunks
    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunks.append(text[start:end])
        start += chunk_size - overlap if (chunk_size - overlap) > 0 else 1

    # Generate chunk_id and append metadata
    chunk_ids = self._generate_chunk_ids(len(chunks))
    metadata = self._default_metadata()

    # Return output
    output = SplitterOutput(
        chunks=chunks,
        chunk_id=chunk_ids,
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="character_splitter",
        split_params={
            "chunk_size": self.chunk_size,
            "chunk_overlap": self.chunk_overlap,
        },
        metadata=metadata,
    )
    return output

WordSplitter

WordSplitter

Bases: BaseSplitter

WordSplitter splits a given text into overlapping or non-overlapping chunks based on a specified number of words per chunk.

This splitter is configurable with a maximum chunk size (chunk_size, in words) and an overlap between consecutive chunks (chunk_overlap). The overlap can be specified either as an integer (number of words) or as a float between 0 and 1 (fraction of chunk size). Useful for NLP tasks where word-based boundaries are important for context preservation.

Parameters:

Name Type Description Default
chunk_size int

Maximum number of words per chunk.

5
chunk_overlap Union[int, float]

Number or percentage of overlapping words between chunks.

0
Source code in src/splitter_mr/splitter/splitters/word_splitter.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class WordSplitter(BaseSplitter):
    """
    WordSplitter splits a given text into overlapping or non-overlapping chunks
    based on a specified number of words per chunk.

    This splitter is configurable with a maximum chunk size (`chunk_size`, in words)
    and an overlap between consecutive chunks (`chunk_overlap`). The overlap can be
    specified either as an integer (number of words) or as a float between 0 and 1
    (fraction of chunk size). Useful for NLP tasks where word-based boundaries are
    important for context preservation.

    Args:
        chunk_size (int): Maximum number of words per chunk.
        chunk_overlap (Union[int, float]): Number or percentage of overlapping words between chunks.
    """

    def __init__(self, chunk_size: int = 5, chunk_overlap: Union[int, float] = 0):
        super().__init__(chunk_size)
        self.chunk_overlap = chunk_overlap

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Splits the input text from the reader_output dictionary into word-based chunks.

        Each chunk contains at most `chunk_size` words, and adjacent chunks can overlap
        by a specified number or percentage of words, according to the `chunk_overlap`
        parameter set at initialization.

        Args:
            reader_output (Dict[str, Any]):
                Dictionary containing at least a 'text' key (str) and optional document metadata
                (e.g., 'document_name', 'document_path', etc.).

        Returns:
            SplitterOutput: Dataclass defining the output structure for all splitters.

        Raises:
            ValueError: If chunk_overlap is greater than or equal to chunk_size.

        Example:
            ```python
            from splitter_mr.splitter import WordSplitter

            reader_output = ReaderOutput(
                text: "The quick brown fox jumps over the lazy dog. Pack my box with five dozen liquor jugs. Sphinx of black quartz, judge my vow.",
                document_name: "pangrams.txt",
                document_path: "/https://raw.githubusercontent.com/andreshere00/Splitter_MR/refs/heads/main/data/pangrams.txt",
            )

            # Split into chunks of 5 words, overlapping by 2 words
            splitter = WordSplitter(chunk_size=5, chunk_overlap=2)
            output = splitter.split(reader_output)
            print(output["chunks"])
            ```
            ```python
            ['The quick brown fox jumps',
            'fox jumps over the lazy',
            'over the lazy dog. Pack', ...]
            ```
        """
        # Initialize variables
        text = reader_output.text
        chunk_size = self.chunk_size

        # Split text into words (using simple whitespace tokenization)
        words = text.split()
        total_words = len(words)

        # Determine overlap in characters
        if isinstance(self.chunk_overlap, float) and 0 <= self.chunk_overlap < 1:
            overlap = int(chunk_size * self.chunk_overlap)
        else:
            overlap = int(self.chunk_overlap)
        if overlap >= chunk_size:
            raise ValueError("chunk_overlap must be smaller than chunk_size")

        # Split into chunks
        chunks = []
        start = 0
        step = chunk_size - overlap if (chunk_size - overlap) > 0 else 1
        while start < total_words:
            end = start + chunk_size
            chunk_words = words[start:end]
            chunks.append(" ".join(chunk_words))
            start += step

        # Generate chunk_id and append metadata
        chunk_ids = self._generate_chunk_ids(len(chunks))
        metadata = self._default_metadata()

        # Return output
        output = SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="word_splitter",
            split_params={
                "chunk_size": chunk_size,
                "chunk_overlap": self.chunk_overlap,
            },
            metadata=metadata,
        )
        return output
split(reader_output)

Splits the input text from the reader_output dictionary into word-based chunks.

Each chunk contains at most chunk_size words, and adjacent chunks can overlap by a specified number or percentage of words, according to the chunk_overlap parameter set at initialization.

Parameters:

Name Type Description Default
reader_output Dict[str, Any]

Dictionary containing at least a 'text' key (str) and optional document metadata (e.g., 'document_name', 'document_path', etc.).

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

Dataclass defining the output structure for all splitters.

Raises:

Type Description
ValueError

If chunk_overlap is greater than or equal to chunk_size.

Example

from splitter_mr.splitter import WordSplitter

reader_output = ReaderOutput(
    text: "The quick brown fox jumps over the lazy dog. Pack my box with five dozen liquor jugs. Sphinx of black quartz, judge my vow.",
    document_name: "pangrams.txt",
    document_path: "/https://raw.githubusercontent.com/andreshere00/Splitter_MR/refs/heads/main/data/pangrams.txt",
)

# Split into chunks of 5 words, overlapping by 2 words
splitter = WordSplitter(chunk_size=5, chunk_overlap=2)
output = splitter.split(reader_output)
print(output["chunks"])
['The quick brown fox jumps',
'fox jumps over the lazy',
'over the lazy dog. Pack', ...]

Source code in src/splitter_mr/splitter/splitters/word_splitter.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Splits the input text from the reader_output dictionary into word-based chunks.

    Each chunk contains at most `chunk_size` words, and adjacent chunks can overlap
    by a specified number or percentage of words, according to the `chunk_overlap`
    parameter set at initialization.

    Args:
        reader_output (Dict[str, Any]):
            Dictionary containing at least a 'text' key (str) and optional document metadata
            (e.g., 'document_name', 'document_path', etc.).

    Returns:
        SplitterOutput: Dataclass defining the output structure for all splitters.

    Raises:
        ValueError: If chunk_overlap is greater than or equal to chunk_size.

    Example:
        ```python
        from splitter_mr.splitter import WordSplitter

        reader_output = ReaderOutput(
            text: "The quick brown fox jumps over the lazy dog. Pack my box with five dozen liquor jugs. Sphinx of black quartz, judge my vow.",
            document_name: "pangrams.txt",
            document_path: "/https://raw.githubusercontent.com/andreshere00/Splitter_MR/refs/heads/main/data/pangrams.txt",
        )

        # Split into chunks of 5 words, overlapping by 2 words
        splitter = WordSplitter(chunk_size=5, chunk_overlap=2)
        output = splitter.split(reader_output)
        print(output["chunks"])
        ```
        ```python
        ['The quick brown fox jumps',
        'fox jumps over the lazy',
        'over the lazy dog. Pack', ...]
        ```
    """
    # Initialize variables
    text = reader_output.text
    chunk_size = self.chunk_size

    # Split text into words (using simple whitespace tokenization)
    words = text.split()
    total_words = len(words)

    # Determine overlap in characters
    if isinstance(self.chunk_overlap, float) and 0 <= self.chunk_overlap < 1:
        overlap = int(chunk_size * self.chunk_overlap)
    else:
        overlap = int(self.chunk_overlap)
    if overlap >= chunk_size:
        raise ValueError("chunk_overlap must be smaller than chunk_size")

    # Split into chunks
    chunks = []
    start = 0
    step = chunk_size - overlap if (chunk_size - overlap) > 0 else 1
    while start < total_words:
        end = start + chunk_size
        chunk_words = words[start:end]
        chunks.append(" ".join(chunk_words))
        start += step

    # Generate chunk_id and append metadata
    chunk_ids = self._generate_chunk_ids(len(chunks))
    metadata = self._default_metadata()

    # Return output
    output = SplitterOutput(
        chunks=chunks,
        chunk_id=chunk_ids,
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="word_splitter",
        split_params={
            "chunk_size": chunk_size,
            "chunk_overlap": self.chunk_overlap,
        },
        metadata=metadata,
    )
    return output

SentenceSplitter

SentenceSplitter

Bases: BaseSplitter

SentenceSplitter splits a given text into overlapping or non-overlapping chunks, where each chunk contains a specified number of sentences, and overlap is defined by a number or percentage of words from the end of the previous chunk.

Parameters:

Name Type Description Default
chunk_size int

Maximum number of sentences per chunk.

5
chunk_overlap Union[int, float]

Number or percentage of overlapping words between chunks.

0
separators Union[str, List[str]]

Character(s) to split sentences.

DEFAULT_SENTENCE_SEPARATORS
Source code in src/splitter_mr/splitter/splitters/sentence_splitter.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
class SentenceSplitter(BaseSplitter):
    """
    SentenceSplitter splits a given text into overlapping or non-overlapping chunks,
    where each chunk contains a specified number of sentences, and overlap is defined
    by a number or percentage of words from the end of the previous chunk.

    Args:
        chunk_size (int): Maximum number of sentences per chunk.
        chunk_overlap (Union[int, float]): Number or percentage of overlapping words between chunks.
        separators (Union[str, List[str]]): Character(s) to split sentences.
    """

    def __init__(
        self,
        chunk_size: int = 5,
        chunk_overlap: Union[int, float] = 0,
        separators: Union[str, List[str]] = DEFAULT_SENTENCE_SEPARATORS,
    ):
        super().__init__(chunk_size)
        self.chunk_overlap = chunk_overlap

        if isinstance(separators, list):
            # Legacy path (NOT recommended): join list with alternation, ensure "..." before "."
            parts = sorted({*separators}, key=lambda s: (s != "...", s))
            sep_pattern = "|".join(re.escape(s) for s in parts)
            # Attach trailing quotes/brackets if user insisted on a list
            self.separators = rf'(?:{sep_pattern})(?:["”’\'\)\]\}}»]*)\s*'
        else:
            # Recommended path: already a full regex pattern
            self.separators = separators

        self._sep_re = re.compile(f"({self.separators})")

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Splits the input text from the `reader_output` dictionary into sentence-based chunks,
        allowing for overlap at the word level.

        Each chunk contains at most `chunk_size` sentences, where sentence boundaries are
        detected using the specified `separators` (e.g., '.', '!', '?').
        Overlap between consecutive chunks is specified by `chunk_overlap`, which can be an
        integer (number of words) or a float (fraction of the maximum words in a sentence).
        This is useful for downstream NLP tasks that require context preservation.

        Args:
            reader_output (Dict[str, Any]):
                Dictionary containing at least a 'text' key (str) and optional document metadata,
                such as 'document_name', 'document_path', 'document_id', etc.

        Returns:
            SplitterOutput: Dataclass defining the output structure for all splitters.

        Raises:
            ValueError: If `chunk_overlap` is negative.
            ValueError: If 'text' is missing in `reader_output`.

        Example:
            ```python
            from splitter_mr.splitter import SentenceSplitter

            # Example input: 7 sentences with varied punctuation
            # This dictionary has been obtained as an output from a Reader class.
            reader_output = ReaderOutput(
                text: "Hello world! How are you? I am fine. Testing sentence splitting. Short. End! And another?",
                document_name: "sample.txt",
                document_path: "/tmp/sample.txt",
                document_id: "123"
            )

            # Split into chunks of 3 sentences each, no overlap
            splitter = SentenceSplitter(chunk_size=3, chunk_overlap=0)
            result = splitter.split(reader_output)
            print(result.chunks)
            ```
            ```python
            ['Hello world! How are you? I am fine.',
             'Testing sentence splitting. Short. End!',
             'And another?', ...]
            ```
        """
        # Initialize variables
        text = reader_output.text or ""
        chunk_size = self.chunk_size

        # Build sentence list
        if not text.strip():
            merged_sentences: List[str] = [""]
        else:
            parts = self._sep_re.split(text)  # [text, sep, text, sep, ...]
            merged_sentences = []
            i = 0
            while i < len(parts):
                segment = (parts[i] or "").strip()
                if i + 1 < len(
                    parts
                ):  # we have a separator that belongs to this sentence
                    sep = parts[i + 1] or ""
                    sentence = (segment + sep).strip()
                    if sentence:
                        merged_sentences.append(sentence)
                    i += 2
                else:
                    # tail without terminator
                    if segment:
                        merged_sentences.append(segment)
                    i += 1

            if not merged_sentences:
                merged_sentences = [""]

        num_sentences = len(merged_sentences)

        # Determine overlap in words
        if isinstance(self.chunk_overlap, float) and 0 <= self.chunk_overlap < 1:
            max_sent_words = max((len(s.split()) for s in merged_sentences), default=0)
            overlap = int(max_sent_words * self.chunk_overlap)
        else:
            overlap = int(self.chunk_overlap)
        if overlap < 0:
            raise ValueError("chunk_overlap must be >= 0")

        # Build chunks of up to `chunk_size` sentences (single implementation, no duplication)
        chunks: List[str] = []
        start = 0
        while start < num_sentences:
            end = min(start + chunk_size, num_sentences)
            chunk_sents = merged_sentences[start:end]
            chunk_text = " ".join(chunk_sents)

            if overlap > 0 and chunks:
                prev_words = chunks[-1].split()
                overlap_words = (
                    prev_words[-overlap:] if overlap <= len(prev_words) else prev_words
                )
                chunk_text = " ".join([" ".join(overlap_words), chunk_text]).strip()

            chunks.append(chunk_text)
            start += chunk_size

        # Generate chunk_id and append metadata, then return once
        chunk_ids = self._generate_chunk_ids(len(chunks))
        metadata = self._default_metadata()

        return SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="sentence_splitter",
            split_params={
                "chunk_size": chunk_size,
                "chunk_overlap": self.chunk_overlap,
                "separators": self.separators,
            },
            metadata=metadata,
        )
split(reader_output)

Splits the input text from the reader_output dictionary into sentence-based chunks, allowing for overlap at the word level.

Each chunk contains at most chunk_size sentences, where sentence boundaries are detected using the specified separators (e.g., '.', '!', '?'). Overlap between consecutive chunks is specified by chunk_overlap, which can be an integer (number of words) or a float (fraction of the maximum words in a sentence). This is useful for downstream NLP tasks that require context preservation.

Parameters:

Name Type Description Default
reader_output Dict[str, Any]

Dictionary containing at least a 'text' key (str) and optional document metadata, such as 'document_name', 'document_path', 'document_id', etc.

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

Dataclass defining the output structure for all splitters.

Raises:

Type Description
ValueError

If chunk_overlap is negative.

ValueError

If 'text' is missing in reader_output.

Example

from splitter_mr.splitter import SentenceSplitter

# Example input: 7 sentences with varied punctuation
# This dictionary has been obtained as an output from a Reader class.
reader_output = ReaderOutput(
    text: "Hello world! How are you? I am fine. Testing sentence splitting. Short. End! And another?",
    document_name: "sample.txt",
    document_path: "/tmp/sample.txt",
    document_id: "123"
)

# Split into chunks of 3 sentences each, no overlap
splitter = SentenceSplitter(chunk_size=3, chunk_overlap=0)
result = splitter.split(reader_output)
print(result.chunks)
['Hello world! How are you? I am fine.',
 'Testing sentence splitting. Short. End!',
 'And another?', ...]

Source code in src/splitter_mr/splitter/splitters/sentence_splitter.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Splits the input text from the `reader_output` dictionary into sentence-based chunks,
    allowing for overlap at the word level.

    Each chunk contains at most `chunk_size` sentences, where sentence boundaries are
    detected using the specified `separators` (e.g., '.', '!', '?').
    Overlap between consecutive chunks is specified by `chunk_overlap`, which can be an
    integer (number of words) or a float (fraction of the maximum words in a sentence).
    This is useful for downstream NLP tasks that require context preservation.

    Args:
        reader_output (Dict[str, Any]):
            Dictionary containing at least a 'text' key (str) and optional document metadata,
            such as 'document_name', 'document_path', 'document_id', etc.

    Returns:
        SplitterOutput: Dataclass defining the output structure for all splitters.

    Raises:
        ValueError: If `chunk_overlap` is negative.
        ValueError: If 'text' is missing in `reader_output`.

    Example:
        ```python
        from splitter_mr.splitter import SentenceSplitter

        # Example input: 7 sentences with varied punctuation
        # This dictionary has been obtained as an output from a Reader class.
        reader_output = ReaderOutput(
            text: "Hello world! How are you? I am fine. Testing sentence splitting. Short. End! And another?",
            document_name: "sample.txt",
            document_path: "/tmp/sample.txt",
            document_id: "123"
        )

        # Split into chunks of 3 sentences each, no overlap
        splitter = SentenceSplitter(chunk_size=3, chunk_overlap=0)
        result = splitter.split(reader_output)
        print(result.chunks)
        ```
        ```python
        ['Hello world! How are you? I am fine.',
         'Testing sentence splitting. Short. End!',
         'And another?', ...]
        ```
    """
    # Initialize variables
    text = reader_output.text or ""
    chunk_size = self.chunk_size

    # Build sentence list
    if not text.strip():
        merged_sentences: List[str] = [""]
    else:
        parts = self._sep_re.split(text)  # [text, sep, text, sep, ...]
        merged_sentences = []
        i = 0
        while i < len(parts):
            segment = (parts[i] or "").strip()
            if i + 1 < len(
                parts
            ):  # we have a separator that belongs to this sentence
                sep = parts[i + 1] or ""
                sentence = (segment + sep).strip()
                if sentence:
                    merged_sentences.append(sentence)
                i += 2
            else:
                # tail without terminator
                if segment:
                    merged_sentences.append(segment)
                i += 1

        if not merged_sentences:
            merged_sentences = [""]

    num_sentences = len(merged_sentences)

    # Determine overlap in words
    if isinstance(self.chunk_overlap, float) and 0 <= self.chunk_overlap < 1:
        max_sent_words = max((len(s.split()) for s in merged_sentences), default=0)
        overlap = int(max_sent_words * self.chunk_overlap)
    else:
        overlap = int(self.chunk_overlap)
    if overlap < 0:
        raise ValueError("chunk_overlap must be >= 0")

    # Build chunks of up to `chunk_size` sentences (single implementation, no duplication)
    chunks: List[str] = []
    start = 0
    while start < num_sentences:
        end = min(start + chunk_size, num_sentences)
        chunk_sents = merged_sentences[start:end]
        chunk_text = " ".join(chunk_sents)

        if overlap > 0 and chunks:
            prev_words = chunks[-1].split()
            overlap_words = (
                prev_words[-overlap:] if overlap <= len(prev_words) else prev_words
            )
            chunk_text = " ".join([" ".join(overlap_words), chunk_text]).strip()

        chunks.append(chunk_text)
        start += chunk_size

    # Generate chunk_id and append metadata, then return once
    chunk_ids = self._generate_chunk_ids(len(chunks))
    metadata = self._default_metadata()

    return SplitterOutput(
        chunks=chunks,
        chunk_id=chunk_ids,
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="sentence_splitter",
        split_params={
            "chunk_size": chunk_size,
            "chunk_overlap": self.chunk_overlap,
            "separators": self.separators,
        },
        metadata=metadata,
    )

ParagraphSplitter

ParagraphSplitter

Bases: BaseSplitter

ParagraphSplitter splits a given text into overlapping or non-overlapping chunks, where each chunk contains a specified number of paragraphs, and overlap is defined by a number or percentage of words from the end of the previous chunk.

Parameters:

Name Type Description Default
chunk_size int

Maximum number of paragraphs per chunk.

3
chunk_overlap Union[int, float]

Number or percentage of overlapping words between chunks.

0
line_break Union[str, List[str]]

Character(s) used to split text into paragraphs.

DEFAULT_PARAGRAPH_SEPARATORS
Source code in src/splitter_mr/splitter/splitters/paragraph_splitter.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class ParagraphSplitter(BaseSplitter):
    """
    ParagraphSplitter splits a given text into overlapping or non-overlapping chunks,
    where each chunk contains a specified number of paragraphs, and overlap is defined
    by a number or percentage of words from the end of the previous chunk.

    Args:
        chunk_size (int): Maximum number of paragraphs per chunk.
        chunk_overlap (Union[int, float]): Number or percentage of overlapping words between chunks.
        line_break (Union[str, List[str]]): Character(s) used to split text into paragraphs.
    """

    def __init__(
        self,
        chunk_size: int = 3,
        chunk_overlap: Union[int, float] = 0,
        line_break: Union[str, List[str]] = DEFAULT_PARAGRAPH_SEPARATORS,
    ):
        super().__init__(chunk_size)
        self.chunk_overlap = chunk_overlap
        self.line_break = line_break if isinstance(line_break, list) else [line_break]

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Splits text in `reader_output['text']` into paragraph-based chunks, with optional word overlap.

        Args:
            reader_output (Dict[str, Any]): Dictionary containing at least a 'text' key (str)
                and optional document metadata (e.g., 'document_name', 'document_path').

        Returns:
            SplitterOutput: Dataclass defining the output structure for all splitters.

        Raises:
            ValueError: If 'text' is missing from `reader_output` or is not a string.

        Example:
            ```python
            from splitter_mr.splitter import ParagraphSplitter

            # This dictionary has been obtained as the output from a Reader object.
            reader_output = ReaderOutput(
                text: "Para 1.\\n\\nPara 2.\\n\\nPara 3.",
                document_name: "test.txt",
                document_path: "/tmp/test.txt"
            )
            splitter = ParagraphSplitter(chunk_size=2, chunk_overlap=1, line_break="\\n\\n")
            output = splitter.split(reader_output)
            print(output["chunks"])
            ```
            ```python
            ['Para 1.\\n\\nPara 2.', '2. Para 3.']
            ```
        """
        # Intialize variables
        text = reader_output.text
        line_breaks_pattern = "|".join(map(re.escape, self.line_break))
        paragraphs = [p for p in re.split(line_breaks_pattern, text) if p.strip()]
        num_paragraphs = len(paragraphs)

        # Determine overlap in words
        if isinstance(self.chunk_overlap, float) and 0 <= self.chunk_overlap < 1:
            max_para_words = max((len(p.split()) for p in paragraphs), default=0)
            overlap = int(max_para_words * self.chunk_overlap)
        else:
            overlap = int(self.chunk_overlap)

        # Split into chunks
        chunks = []
        start = 0
        while start < num_paragraphs:
            end = min(start + self.chunk_size, num_paragraphs)
            chunk_paragraphs = paragraphs[start:end]
            chunk_text = self.line_break[0].join(chunk_paragraphs)
            if overlap > 0 and chunks:
                prev_words = chunks[-1].split()
                overlap_words = (
                    prev_words[-overlap:] if overlap <= len(prev_words) else prev_words
                )
                chunk_text = (
                    self.line_break[0]
                    .join([" ".join(overlap_words), chunk_text])
                    .strip()
                )
            chunks.append(chunk_text)
            start += self.chunk_size

        # Generate chunk_id and append metadata
        chunk_ids = self._generate_chunk_ids(len(chunks))
        metadata = self._default_metadata()

        # Return output
        output = SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="paragraph_splitter",
            split_params={
                "chunk_size": self.chunk_size,
                "chunk_overlap": self.chunk_overlap,
                "line_break": self.line_break,
            },
            metadata=metadata,
        )
        return output
split(reader_output)

Splits text in reader_output['text'] into paragraph-based chunks, with optional word overlap.

Parameters:

Name Type Description Default
reader_output Dict[str, Any]

Dictionary containing at least a 'text' key (str) and optional document metadata (e.g., 'document_name', 'document_path').

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

Dataclass defining the output structure for all splitters.

Raises:

Type Description
ValueError

If 'text' is missing from reader_output or is not a string.

Example

from splitter_mr.splitter import ParagraphSplitter

# This dictionary has been obtained as the output from a Reader object.
reader_output = ReaderOutput(
    text: "Para 1.\n\nPara 2.\n\nPara 3.",
    document_name: "test.txt",
    document_path: "/tmp/test.txt"
)
splitter = ParagraphSplitter(chunk_size=2, chunk_overlap=1, line_break="\n\n")
output = splitter.split(reader_output)
print(output["chunks"])
['Para 1.\n\nPara 2.', '2. Para 3.']

Source code in src/splitter_mr/splitter/splitters/paragraph_splitter.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Splits text in `reader_output['text']` into paragraph-based chunks, with optional word overlap.

    Args:
        reader_output (Dict[str, Any]): Dictionary containing at least a 'text' key (str)
            and optional document metadata (e.g., 'document_name', 'document_path').

    Returns:
        SplitterOutput: Dataclass defining the output structure for all splitters.

    Raises:
        ValueError: If 'text' is missing from `reader_output` or is not a string.

    Example:
        ```python
        from splitter_mr.splitter import ParagraphSplitter

        # This dictionary has been obtained as the output from a Reader object.
        reader_output = ReaderOutput(
            text: "Para 1.\\n\\nPara 2.\\n\\nPara 3.",
            document_name: "test.txt",
            document_path: "/tmp/test.txt"
        )
        splitter = ParagraphSplitter(chunk_size=2, chunk_overlap=1, line_break="\\n\\n")
        output = splitter.split(reader_output)
        print(output["chunks"])
        ```
        ```python
        ['Para 1.\\n\\nPara 2.', '2. Para 3.']
        ```
    """
    # Intialize variables
    text = reader_output.text
    line_breaks_pattern = "|".join(map(re.escape, self.line_break))
    paragraphs = [p for p in re.split(line_breaks_pattern, text) if p.strip()]
    num_paragraphs = len(paragraphs)

    # Determine overlap in words
    if isinstance(self.chunk_overlap, float) and 0 <= self.chunk_overlap < 1:
        max_para_words = max((len(p.split()) for p in paragraphs), default=0)
        overlap = int(max_para_words * self.chunk_overlap)
    else:
        overlap = int(self.chunk_overlap)

    # Split into chunks
    chunks = []
    start = 0
    while start < num_paragraphs:
        end = min(start + self.chunk_size, num_paragraphs)
        chunk_paragraphs = paragraphs[start:end]
        chunk_text = self.line_break[0].join(chunk_paragraphs)
        if overlap > 0 and chunks:
            prev_words = chunks[-1].split()
            overlap_words = (
                prev_words[-overlap:] if overlap <= len(prev_words) else prev_words
            )
            chunk_text = (
                self.line_break[0]
                .join([" ".join(overlap_words), chunk_text])
                .strip()
            )
        chunks.append(chunk_text)
        start += self.chunk_size

    # Generate chunk_id and append metadata
    chunk_ids = self._generate_chunk_ids(len(chunks))
    metadata = self._default_metadata()

    # Return output
    output = SplitterOutput(
        chunks=chunks,
        chunk_id=chunk_ids,
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="paragraph_splitter",
        split_params={
            "chunk_size": self.chunk_size,
            "chunk_overlap": self.chunk_overlap,
            "line_break": self.line_break,
        },
        metadata=metadata,
    )
    return output

RecursiveCharacterSplitter

RecursiveCharacterSplitter

Bases: BaseSplitter

RecursiveCharacterSplitter splits a given text into overlapping or non-overlapping chunks, where each chunk is created repeatedly breaking down the text until it reaches the desired chunk size. This class implements the Langchain RecursiveCharacterTextSplitter.

Parameters:

Name Type Description Default
chunk_size int

Approximate chunk size, in characters.

1000
chunk_overlap Union[int, float]

Number or percentage of overlapping characters between chunks.

0.1
separators Union[str, List[str]]

Character(s) to recursively split sentences.

DEFAULT_RECURSIVE_SEPARATORS
Notes

More info about the RecursiveCharacterTextSplitter: Langchain Docs.

Source code in src/splitter_mr/splitter/splitters/recursive_splitter.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class RecursiveCharacterSplitter(BaseSplitter):
    """
    RecursiveCharacterSplitter splits a given text into overlapping or non-overlapping chunks,
    where each chunk is created repeatedly breaking down the text until it reaches the
    desired chunk size. This class implements the Langchain RecursiveCharacterTextSplitter.

    Args:
        chunk_size (int): Approximate chunk size, in characters.
        chunk_overlap (Union[int, float]): Number or percentage of overlapping characters between
            chunks.
        separators (Union[str, List[str]]): Character(s) to recursively split sentences.

    Notes:
        More info about the RecursiveCharacterTextSplitter:
        [Langchain Docs](https://python.langchain.com/docs/how_to/recursive_text_splitter/).
    """

    def __init__(
        self,
        chunk_size: int = 1000,
        chunk_overlap: Union[int, float] = 0.1,
        separators: Union[str, List[str]] = DEFAULT_RECURSIVE_SEPARATORS,
    ):
        super().__init__(chunk_size)
        self.chunk_overlap = chunk_overlap
        self.separators = separators if isinstance(separators, list) else [separators]

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Splits the input text into character-based chunks using a recursive splitting strategy
        (via Langchain's `RecursiveCharacterTextSplitter`), supporting configurable separators,
        chunk size, and overlap.

        Args:
            reader_output (Dict[str, Any]): Dictionary containing at least a 'text' key (str)
                and optional document metadata (e.g., 'document_name', 'document_path', etc.).

        Returns:
            SplitterOutput: Dataclass defining the output structure for all splitters.

        Raises:
            ValueError: If 'text' is missing in `reader_output` or is not a string.

        Example:
            ```python
            from splitter_mr.splitter import RecursiveCharacterSplitter

            # This dictionary has been obtained as the output from a Reader object.
            reader_output = ReaderOutput(
                text: "This is a long document.
                It will be recursively split into smaller chunks using the specified separators.
                Each chunk will have some overlap with the next.",
                document_name: "sample.txt",
                document_path: "/tmp/sample.txt"
            )

            splitter = RecursiveCharacterSplitter(chunk_size=40, chunk_overlap=5)
            output = splitter.split(reader_output)
            print(output["chunks"])
            ```
            ```python
            ['This is a long document. It will be', 'be recursively split into smaller chunks', ...]
            ```
        """
        # Initialize variables
        text = reader_output.text
        chunk_size = self.chunk_size

        # Determine overlap in characters
        if isinstance(self.chunk_overlap, float) and 0 <= self.chunk_overlap < 1:
            overlap = int(chunk_size * self.chunk_overlap)
        else:
            overlap = int(self.chunk_overlap)
        if overlap >= chunk_size:
            raise ValueError("chunk_overlap must be smaller than chunk_size")

        # Split text into sentences
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.chunk_size,
            chunk_overlap=self.chunk_overlap,
            separators=self.separators,
        )
        texts = splitter.create_documents([text])
        chunks = [doc.page_content for doc in texts]

        # Generate chunk_id and append metadata
        chunk_ids = self._generate_chunk_ids(len(chunks))
        metadata = self._default_metadata()

        # Return output
        output = SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="recursive_character_splitter",
            split_params={
                "chunk_size": chunk_size,
                "chunk_overlap": self.chunk_overlap,
                "separators": self.separators,
            },
            metadata=metadata,
        )
        return output
split(reader_output)

Splits the input text into character-based chunks using a recursive splitting strategy (via Langchain's RecursiveCharacterTextSplitter), supporting configurable separators, chunk size, and overlap.

Parameters:

Name Type Description Default
reader_output Dict[str, Any]

Dictionary containing at least a 'text' key (str) and optional document metadata (e.g., 'document_name', 'document_path', etc.).

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

Dataclass defining the output structure for all splitters.

Raises:

Type Description
ValueError

If 'text' is missing in reader_output or is not a string.

Example

from splitter_mr.splitter import RecursiveCharacterSplitter

# This dictionary has been obtained as the output from a Reader object.
reader_output = ReaderOutput(
    text: "This is a long document.
    It will be recursively split into smaller chunks using the specified separators.
    Each chunk will have some overlap with the next.",
    document_name: "sample.txt",
    document_path: "/tmp/sample.txt"
)

splitter = RecursiveCharacterSplitter(chunk_size=40, chunk_overlap=5)
output = splitter.split(reader_output)
print(output["chunks"])
['This is a long document. It will be', 'be recursively split into smaller chunks', ...]

Source code in src/splitter_mr/splitter/splitters/recursive_splitter.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Splits the input text into character-based chunks using a recursive splitting strategy
    (via Langchain's `RecursiveCharacterTextSplitter`), supporting configurable separators,
    chunk size, and overlap.

    Args:
        reader_output (Dict[str, Any]): Dictionary containing at least a 'text' key (str)
            and optional document metadata (e.g., 'document_name', 'document_path', etc.).

    Returns:
        SplitterOutput: Dataclass defining the output structure for all splitters.

    Raises:
        ValueError: If 'text' is missing in `reader_output` or is not a string.

    Example:
        ```python
        from splitter_mr.splitter import RecursiveCharacterSplitter

        # This dictionary has been obtained as the output from a Reader object.
        reader_output = ReaderOutput(
            text: "This is a long document.
            It will be recursively split into smaller chunks using the specified separators.
            Each chunk will have some overlap with the next.",
            document_name: "sample.txt",
            document_path: "/tmp/sample.txt"
        )

        splitter = RecursiveCharacterSplitter(chunk_size=40, chunk_overlap=5)
        output = splitter.split(reader_output)
        print(output["chunks"])
        ```
        ```python
        ['This is a long document. It will be', 'be recursively split into smaller chunks', ...]
        ```
    """
    # Initialize variables
    text = reader_output.text
    chunk_size = self.chunk_size

    # Determine overlap in characters
    if isinstance(self.chunk_overlap, float) and 0 <= self.chunk_overlap < 1:
        overlap = int(chunk_size * self.chunk_overlap)
    else:
        overlap = int(self.chunk_overlap)
    if overlap >= chunk_size:
        raise ValueError("chunk_overlap must be smaller than chunk_size")

    # Split text into sentences
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=self.chunk_size,
        chunk_overlap=self.chunk_overlap,
        separators=self.separators,
    )
    texts = splitter.create_documents([text])
    chunks = [doc.page_content for doc in texts]

    # Generate chunk_id and append metadata
    chunk_ids = self._generate_chunk_ids(len(chunks))
    metadata = self._default_metadata()

    # Return output
    output = SplitterOutput(
        chunks=chunks,
        chunk_id=chunk_ids,
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="recursive_character_splitter",
        split_params={
            "chunk_size": chunk_size,
            "chunk_overlap": self.chunk_overlap,
            "separators": self.separators,
        },
        metadata=metadata,
    )
    return output

KeywordSplitter

KeywordSplitter

Bases: BaseSplitter

Splitter that chunks text around keyword boundaries using regular expressions.

This splitter searches the input text for one or more keyword patterns (regex) and creates chunks at each match boundary. You can control how the matched delimiter is attached to the resulting chunks (before/after/both/none) and apply a secondary, size-based re-chunking to respect chunk_size.

The splitter emits a :class:~..schema.SplitterOutput with metadata including per-keyword match counts and raw match spans.

Parameters:

Name Type Description Default
patterns Union[List[str], Dict[str, str]]

A list of regex pattern strings or a mapping of name -> regex pattern. When a dict is provided, the keys are used in the metadata counts. When a list is provided, synthetic names are generated (k0, k1, ...).

required
flags int

Standard re flags combined with | (e.g., re.IGNORECASE).

0
include_delimiters str

Where to attach the matched keyword delimiter. One of "none", "before", "after", "both". - before (default) appends the match to the preceding chunk. - after prepends the match to the following chunk. - both duplicates the match on both sides. - none omits the delimiter from both sides.

'before'
chunk_size int

Target maximum size (in characters) for each chunk. When a produced chunk exceeds this value, it is soft-wrapped by whitespace using a greedy strategy.

100000
Notes
  • All regexes are compiled into one alternation with named groups when patterns is a dict. This simplifies per-keyword accounting.
  • If the input text is empty or no matches are found, the entire text becomes a single chunk (subject to size-based re-chunking).
Source code in src/splitter_mr/splitter/splitters/keyword_splitter.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
class KeywordSplitter(BaseSplitter):
    """
    Splitter that chunks text around *keyword* boundaries using regular expressions.

    This splitter searches the input text for one or more *keyword patterns* (regex)
    and creates chunks at each match boundary. You can control how the matched
    delimiter is attached to the resulting chunks (before/after/both/none) and apply a
    secondary, size-based re-chunking to respect ``chunk_size``.

    The splitter emits a :class:`~..schema.SplitterOutput` with metadata including
    per-keyword match counts and raw match spans.

    Args:
        patterns (Union[List[str], Dict[str, str]]): A list of regex pattern strings **or** a mapping of
            ``name -> regex pattern``. When a dict is provided, the keys are used in
            the metadata counts. When a list is provided, synthetic names are
            generated (``k0``, ``k1``, ...).
        flags (int): Standard ``re`` flags combined with ``|`` (e.g., ``re.IGNORECASE``).
        include_delimiters (str): Where to attach the matched keyword delimiter.
            One of ``"none"``, ``"before"``, ``"after"``, ``"both"``.
            - ``before`` (default) appends the match to the *preceding* chunk.
            - ``after`` prepends the match to the *following* chunk.
            - ``both`` duplicates the match on both sides.
            - ``none`` omits the delimiter from both sides.
        chunk_size (int): Target maximum size (in characters) for each chunk. When a
            produced chunk exceeds this value, it is *soft*-wrapped by whitespace
            using a greedy strategy.

    Notes:
        - All regexes are compiled into **one** alternation with *named groups* when
          ``patterns`` is a dict. This simplifies per-keyword accounting.
        - If the input text is empty or no matches are found, the entire text
          becomes a single chunk (subject to size-based re-chunking).
    """

    def __init__(
        self,
        patterns: Union[List[str], Dict[str, str]],
        *,
        flags: int = 0,
        include_delimiters: str = "before",
        chunk_size: int = 100000,
    ) -> None:
        """
        Initialize the KeywordSplitter.

        Args:
            patterns (Union[List[str], Dict[str, str]]): Keyword regex patterns.
            flags (int): Regex flags.
            include_delimiters (str): How to include delimiters (before, after, both, none).
            chunk_size (int): Max chunk size in characters.
        """
        super().__init__(chunk_size=chunk_size)
        self.include_delimiters = self._validate_include_delimiters(include_delimiters)
        self.pattern_names, self.compiled = self._compile_patterns(patterns, flags)
        self.flags = flags

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Split ReaderOutput into keyword-delimited chunks and build structured output.

        Args:
            reader_output (ReaderOutput): Input document and metadata.

        Returns:
            SplitterOutput: Output structure with chunked text and metadata.
        """
        text = reader_output.text or ""

        # Ensure document_id is present so it propagates (fixes metadata test)
        if not reader_output.document_id:
            reader_output.document_id = str(uuid.uuid4())

        # Primary split by keyword matches (names used for counts)
        raw_chunks, match_spans, match_names = self._split_by_keywords(text)

        # Secondary size-based re-chunking to respect chunk_size
        sized_chunks: List[str] = []
        for ch in raw_chunks:
            sized_chunks.extend(self._soft_wrap(ch, self.chunk_size))
        if not sized_chunks:
            sized_chunks = [""]

        # Generate IDs
        chunk_ids = self._generate_chunk_ids(len(sized_chunks))

        # Build metadata (ensure counts/spans are always present)
        matches_meta = {
            "counts": self._count_by_name(match_names),
            "spans": match_spans,
            "include_delimiters": self.include_delimiters,
            "flags": self.flags,
            "pattern_names": self.pattern_names,
            "chunk_size": self.chunk_size,
        }

        return self._build_output(
            reader_output=reader_output,
            chunks=sized_chunks,
            chunk_ids=chunk_ids,
            matches_meta=matches_meta,
        )

    # ---- Internals ------------------------------------------------------ #

    @staticmethod
    def _validate_include_delimiters(value: str) -> str:
        """
        Validate and normalize include_delimiters argument.

        Args:
            value (str): One of {"none", "before", "after", "both"}.

        Returns:
            str: Normalized delimiter mode.

        Raises:
            ValueError: If the mode is invalid.
        """
        allowed = {"none", "before", "after", "both"}
        v = value.lower().strip()
        if v not in allowed:
            raise ValueError(
                f"include_delimiters must be one of {sorted(allowed)}, got {value!r}"
            )
        return v

    @staticmethod
    def _compile_patterns(
        patterns: Union[List[str], Dict[str, str]], flags: int
    ) -> Tuple[List[str], Pattern[str]]:
        """
        Compile patterns into a single alternation regex.

        If a dict is given, build a pattern with **named** groups to preserve the
        provided names. If a list is given, synthesize names (k0, k1, ...).

        Args:
            patterns (Union[List[str], Dict[str, str]]): Patterns or mapping.
            flags (int): Regex flags.

        Returns:
            Tuple[List[str], Pattern[str]]: Names and compiled regex.
        """
        if isinstance(patterns, dict):
            names = list(patterns.keys())
            parts = [f"(?P<{name}>{pat})" for name, pat in patterns.items()]
        else:
            names = [f"k{i}" for i in range(len(patterns))]
            parts = [f"(?P<{n}>{pat})" for n, pat in zip(names, patterns)]

        combined = "|".join(parts) if parts else r"(?!x)x"  # never matches if empty
        compiled = re.compile(combined, flags)
        return names, compiled

    def _split_by_keywords(
        self, text: str
    ) -> Tuple[List[str], List[Tuple[int, int]], List[str]]:
        """
        Split ``text`` around matches of ``self.compiled``.

        Respects include_delimiters in {"before", "after", "both", "none"}.

        Args:
            text (str): The text to split.

        Returns:
            Tuple[List[str], List[Tuple[int, int]], List[str]]:
                (chunks, spans, names) where `chunks` are before size re-wrapping,
                spans are (start, end) tuples, and names are group names for each match.
        """

        def _append_chunk(acc: List[str], chunk: str) -> None:
            # Keep only non-empty (after strip) chunks here; final fallback to [""] is done by caller
            if chunk and chunk.strip():
                acc.append(chunk)

        chunks: List[str] = []
        spans: List[Tuple[int, int]] = []
        names: List[str] = []

        matches = list(self.compiled.finditer(text))
        last_idx = 0
        pending_prefix = ""  # used when include_delimiters is "after" or "both"

        for m in matches:
            start, end = m.span()
            match_txt = text[start:end]
            group_name = m.lastgroup or "unknown"

            spans.append((start, end))
            names.append(group_name)

            # Build the piece between last match end and this match start, prefixing any pending delimiter
            before_piece = pending_prefix + text[last_idx:start]
            pending_prefix = ""

            # Attach delimiter to the left side if requested
            if self.include_delimiters in ("before", "both"):
                before_piece += match_txt

            _append_chunk(chunks, before_piece)

            # If delimiter should be on the right, carry it forward to prefix next chunk
            if self.include_delimiters in ("after", "both"):
                pending_prefix = match_txt

            last_idx = end

        # Remainder after the last match (may contain pending_prefix)
        remainder = pending_prefix + text[last_idx:]
        _append_chunk(chunks, remainder)

        # If no non-empty chunks were appended, return a single empty chunk (tests expect this)
        if not chunks:
            return [""], spans, names

        # normalize whitespace trimming for each chunk
        chunks = [c.strip() for c in chunks if c and c.strip()]

        if not chunks:
            return [""], spans, names

        return chunks, spans, names

    @staticmethod
    def _soft_wrap(text: str, max_size: int) -> List[str]:
        """
        Greedy soft-wrap by whitespace to respect ``max_size``.

        - If ``len(text) <= max_size``: return ``[text]``.
        - Else: split on whitespace and rebuild lines greedily.
        - If a single token is longer than ``max_size``, it is hard-split.

        Args:
            text (str): Text to wrap.
            max_size (int): Maximum chunk size.

        Returns:
            List[str]: List of size-constrained chunks.
        """
        if max_size <= 0 or len(text) <= max_size:
            return [text] if text else []

        tokens = re.findall(r"\S+|\s+", text)
        out: List[str] = []
        buf = ""
        for tok in tokens:
            if len(buf) + len(tok) <= max_size:
                buf += tok
                continue
            if buf:
                out.append(buf)
                buf = ""
            # token alone is too big -> hard split
            while len(tok) > max_size:
                out.append(tok[:max_size])
                tok = tok[max_size:]
            buf = tok
        if buf:
            out.append(buf)
        return [c for c in (s.strip() for s in out) if c]

    @staticmethod
    def _count_by_name(names: Iterable[str]) -> Dict[str, int]:
        """
        Aggregate match counts by group name (k0/k1/... for list patterns, custom names for dict).

        Args:
            names (Iterable[str]): Group names.

        Returns:
            Dict[str, int]: Count of matches per group name.
        """
        counts: Dict[str, int] = {}
        for n in names:
            counts[n] = counts.get(n, 0) + 1
        return counts

    def _build_output(
        self,
        reader_output: ReaderOutput,
        chunks: List[str],
        chunk_ids: List[str],
        matches_meta: Dict[str, object],
    ) -> SplitterOutput:
        """
        Assemble a :class:`SplitterOutput` carrying over reader metadata.

        Args:
            reader_output (ReaderOutput): Input document and metadata.
            chunks (List[str]): Final list of chunks.
            chunk_ids (List[str]): Unique chunk IDs.
            matches_meta (Dict[str, object]): Keyword matches metadata.

        Returns:
            SplitterOutput: Populated output object.
        """
        return SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="keyword",
            split_params={
                "include_delimiters": self.include_delimiters,
                "flags": self.flags,
                "chunk_size": self.chunk_size,
                "pattern_names": self.pattern_names,
            },
            metadata={
                **(reader_output.metadata or {}),
                "keyword_matches": matches_meta,
            },
        )
__init__(patterns, *, flags=0, include_delimiters='before', chunk_size=100000)

Initialize the KeywordSplitter.

Parameters:

Name Type Description Default
patterns Union[List[str], Dict[str, str]]

Keyword regex patterns.

required
flags int

Regex flags.

0
include_delimiters str

How to include delimiters (before, after, both, none).

'before'
chunk_size int

Max chunk size in characters.

100000
Source code in src/splitter_mr/splitter/splitters/keyword_splitter.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def __init__(
    self,
    patterns: Union[List[str], Dict[str, str]],
    *,
    flags: int = 0,
    include_delimiters: str = "before",
    chunk_size: int = 100000,
) -> None:
    """
    Initialize the KeywordSplitter.

    Args:
        patterns (Union[List[str], Dict[str, str]]): Keyword regex patterns.
        flags (int): Regex flags.
        include_delimiters (str): How to include delimiters (before, after, both, none).
        chunk_size (int): Max chunk size in characters.
    """
    super().__init__(chunk_size=chunk_size)
    self.include_delimiters = self._validate_include_delimiters(include_delimiters)
    self.pattern_names, self.compiled = self._compile_patterns(patterns, flags)
    self.flags = flags
split(reader_output)

Split ReaderOutput into keyword-delimited chunks and build structured output.

Parameters:

Name Type Description Default
reader_output ReaderOutput

Input document and metadata.

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

Output structure with chunked text and metadata.

Source code in src/splitter_mr/splitter/splitters/keyword_splitter.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Split ReaderOutput into keyword-delimited chunks and build structured output.

    Args:
        reader_output (ReaderOutput): Input document and metadata.

    Returns:
        SplitterOutput: Output structure with chunked text and metadata.
    """
    text = reader_output.text or ""

    # Ensure document_id is present so it propagates (fixes metadata test)
    if not reader_output.document_id:
        reader_output.document_id = str(uuid.uuid4())

    # Primary split by keyword matches (names used for counts)
    raw_chunks, match_spans, match_names = self._split_by_keywords(text)

    # Secondary size-based re-chunking to respect chunk_size
    sized_chunks: List[str] = []
    for ch in raw_chunks:
        sized_chunks.extend(self._soft_wrap(ch, self.chunk_size))
    if not sized_chunks:
        sized_chunks = [""]

    # Generate IDs
    chunk_ids = self._generate_chunk_ids(len(sized_chunks))

    # Build metadata (ensure counts/spans are always present)
    matches_meta = {
        "counts": self._count_by_name(match_names),
        "spans": match_spans,
        "include_delimiters": self.include_delimiters,
        "flags": self.flags,
        "pattern_names": self.pattern_names,
        "chunk_size": self.chunk_size,
    }

    return self._build_output(
        reader_output=reader_output,
        chunks=sized_chunks,
        chunk_ids=chunk_ids,
        matches_meta=matches_meta,
    )

HeaderSplitter

HeaderSplitter

Bases: BaseSplitter

Split HTML or Markdown documents into chunks by header levels (H1–H6).

  • If the input looks like HTML, it is first converted to Markdown using the project's HtmlToMarkdown utility, which emits ATX-style headings (#, ##, ...).
  • If the input is Markdown, Setext-style headings (underlines with === / ---) are normalized to ATX so headers are reliably detected.
  • Splitting is performed with LangChain's MarkdownHeaderTextSplitter.
  • If no headers are detected after conversion/normalization, a safe fallback splitter (RecursiveCharacterTextSplitter) is used to avoid returning a single, excessively large chunk.

Parameters:

Name Type Description Default
chunk_size int

Size hint for fallback splitting; not used by header splitting itself. Defaults to 1000.

1000
headers_to_split_on Optional[List[str]]

Semantic header names like ["Header 1", "Header 2"]. If None, all levels 1–6 are enabled.

None
group_header_with_content bool

If True (default), headers are kept with their following content (strip_headers=False). If False, headers are stripped from chunks (strip_headers=True).

True
Example
from splitter_mr.splitter import HeaderSplitter

splitter = HeaderSplitter(headers_to_split_on=["Header 1", "Header 2", "Header 3"])
output = splitter.split(reader_output)  # reader_output.text may be HTML or MD
for idx, chunk in enumerate(output.chunks):
    print(f"--- Chunk {idx+1} ---")
    print(chunk)
Source code in src/splitter_mr/splitter/splitters/header_splitter.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
class HeaderSplitter(BaseSplitter):
    """
    Split HTML or Markdown documents into chunks by header levels (H1–H6).

    - If the input looks like HTML, it is first converted to Markdown using the
      project's HtmlToMarkdown utility, which emits ATX-style headings (`#`, `##`, ...).
    - If the input is Markdown, Setext-style headings (underlines with `===` / `---`)
      are normalized to ATX so headers are reliably detected.
    - Splitting is performed with LangChain's MarkdownHeaderTextSplitter.
    - If no headers are detected after conversion/normalization, a safe fallback
      splitter (RecursiveCharacterTextSplitter) is used to avoid returning a single,
      excessively large chunk.

    Args:
        chunk_size (int, optional): Size hint for fallback splitting; not used by
            header splitting itself. Defaults to 1000.
        headers_to_split_on (Optional[List[str]]): Semantic header names like
            ["Header 1", "Header 2"]. If None, all levels 1–6 are enabled.
        group_header_with_content (bool, optional): If True (default), headers are
            kept with their following content (strip_headers=False). If False,
            headers are stripped from chunks (strip_headers=True).

    Example:
        ```python
        from splitter_mr.splitter import HeaderSplitter

        splitter = HeaderSplitter(headers_to_split_on=["Header 1", "Header 2", "Header 3"])
        output = splitter.split(reader_output)  # reader_output.text may be HTML or MD
        for idx, chunk in enumerate(output.chunks):
            print(f"--- Chunk {idx+1} ---")
            print(chunk)
        ```
    """

    def __init__(
        self,
        chunk_size: int = 1000,
        headers_to_split_on: Optional[List[str]] = None,
        *,
        group_header_with_content: bool = True,
    ):
        """
        Initialize the HeaderSplitter.

        Args:
            chunk_size (int): Used by fallback character splitter if no headers are found.
            headers_to_split_on (Optional[List[str]]): Semantic headers, e.g. ["Header 1", "Header 2"].
                Defaults to all levels 1–6.
            group_header_with_content (bool): Keep headers attached to following content if True.
        """
        super().__init__(chunk_size)
        # Default to all 6 levels for robust splitting unless caller narrows it.
        self.headers_to_split_on = headers_to_split_on or [
            f"Header {i}" for i in range(1, 7)
        ]
        self.group_header_with_content = bool(group_header_with_content)

    def _make_tuples(self, filetype: str) -> List[Tuple[str, str]]:
        """
        Convert semantic header names (e.g., "Header 2") into Markdown tokens.

        Args:
            filetype (str): Only "md" is supported (HTML is converted to MD first).

        Returns:
            List[Tuple[str, str]]: Tuples of (header_token, semantic_name), e.g. ("##", "Header 2").
        """
        tuples: List[Tuple[str, str]] = []
        for header in self.headers_to_split_on:
            lvl = self._header_level(header)
            if filetype == "md":
                tuples.append(("#" * lvl, header))
            else:
                raise ValueError(f"Unsupported filetype: {filetype!r}")
        return tuples

    @staticmethod
    def _header_level(header: str) -> int:
        """
        Extract numeric level from a header name like "Header 2".

        Raises:
            ValueError: If the header string is not of the expected form.
        """
        m = re.match(r"header\s*(\d+)", header.lower())
        if not m:
            raise ValueError(f"Invalid header: {header}")
        return int(m.group(1))

    @staticmethod
    def _guess_filetype(reader_output: ReaderOutput) -> str:
        """
        Heuristically determine whether the input is HTML or Markdown.

        Checks filename extensions first, then looks for HTML elements as a hint.
        """
        name = (reader_output.document_name or "").lower()
        if name.endswith((".html", ".htm")):
            return "html"
        if name.endswith((".md", ".markdown")):
            return "md"

        soup = BeautifulSoup(reader_output.text or "", "html.parser")
        if soup.find("html") or soup.find(re.compile(r"^h[1-6]$")) or soup.find("div"):
            return "html"
        return "md"

    @staticmethod
    def _normalize_setext(md_text: str) -> str:
        """
        Normalize Setext-style headings to ATX so MarkdownHeaderTextSplitter can detect them.

        H1:  Title\\n====  →  # Title
        H2:  Title\\n----  →  ## Title
        """
        # H1 underlines
        md_text = re.sub(r"^(?P<t>[^\n]+)\n=+\s*$", r"# \g<t>", md_text, flags=re.M)
        # H2 underlines
        md_text = re.sub(r"^(?P<t>[^\n]+)\n-+\s*$", r"## \g<t>", md_text, flags=re.M)
        return md_text

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Perform header-based splitting with HTML→Markdown conversion and safe fallback.

        Steps:
          1) Detect filetype (HTML/MD).
          2) If HTML, convert to Markdown with HtmlToMarkdown (emits ATX headings).
          3) If Markdown, normalize Setext headings to ATX.
          4) Split by headers via MarkdownHeaderTextSplitter.
          5) If no headers found, fallback to RecursiveCharacterTextSplitter.
        """
        if not reader_output.text:
            raise ValueError("reader_output.text is empty or None")

        filetype = self._guess_filetype(reader_output)
        tuples = self._make_tuples("md")  # Always work in Markdown space.

        text = reader_output.text

        # HTML → Markdown using the project's converter
        if filetype == "html":
            text = HtmlToMarkdown().convert(text)
        else:
            # Normalize Setext headings if already Markdown
            text = self._normalize_setext(text)

        # Detect presence of ATX headers (after conversion/normalization)
        has_headers = bool(re.search(r"(?m)^\s*#{1,6}\s+\S", text))

        # Configure header splitter. group_header_with_content -> strip_headers False
        splitter = MarkdownHeaderTextSplitter(
            headers_to_split_on=tuples,
            return_each_line=False,
            strip_headers=not self.group_header_with_content,
        )

        docs = splitter.split_text(text) if has_headers else []
        # Fallback if no headers were found
        if not docs:
            rc = RecursiveCharacterTextSplitter(
                chunk_size=max(1, int(self.chunk_size) or 1000),
                chunk_overlap=min(200, max(0, int(self.chunk_size) // 10)),
            )
            docs = rc.create_documents([text])

        chunks = [doc.page_content for doc in docs]

        return SplitterOutput(
            chunks=chunks,
            chunk_id=self._generate_chunk_ids(len(chunks)),
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="header_splitter",
            split_params={
                "headers_to_split_on": self.headers_to_split_on,
                "group_header_with_content": self.group_header_with_content,
            },
            metadata=self._default_metadata(),
        )
__init__(chunk_size=1000, headers_to_split_on=None, *, group_header_with_content=True)

Initialize the HeaderSplitter.

Parameters:

Name Type Description Default
chunk_size int

Used by fallback character splitter if no headers are found.

1000
headers_to_split_on Optional[List[str]]

Semantic headers, e.g. ["Header 1", "Header 2"]. Defaults to all levels 1–6.

None
group_header_with_content bool

Keep headers attached to following content if True.

True
Source code in src/splitter_mr/splitter/splitters/header_splitter.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def __init__(
    self,
    chunk_size: int = 1000,
    headers_to_split_on: Optional[List[str]] = None,
    *,
    group_header_with_content: bool = True,
):
    """
    Initialize the HeaderSplitter.

    Args:
        chunk_size (int): Used by fallback character splitter if no headers are found.
        headers_to_split_on (Optional[List[str]]): Semantic headers, e.g. ["Header 1", "Header 2"].
            Defaults to all levels 1–6.
        group_header_with_content (bool): Keep headers attached to following content if True.
    """
    super().__init__(chunk_size)
    # Default to all 6 levels for robust splitting unless caller narrows it.
    self.headers_to_split_on = headers_to_split_on or [
        f"Header {i}" for i in range(1, 7)
    ]
    self.group_header_with_content = bool(group_header_with_content)
split(reader_output)

Perform header-based splitting with HTML→Markdown conversion and safe fallback.

Steps

1) Detect filetype (HTML/MD). 2) If HTML, convert to Markdown with HtmlToMarkdown (emits ATX headings). 3) If Markdown, normalize Setext headings to ATX. 4) Split by headers via MarkdownHeaderTextSplitter. 5) If no headers found, fallback to RecursiveCharacterTextSplitter.

Source code in src/splitter_mr/splitter/splitters/header_splitter.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Perform header-based splitting with HTML→Markdown conversion and safe fallback.

    Steps:
      1) Detect filetype (HTML/MD).
      2) If HTML, convert to Markdown with HtmlToMarkdown (emits ATX headings).
      3) If Markdown, normalize Setext headings to ATX.
      4) Split by headers via MarkdownHeaderTextSplitter.
      5) If no headers found, fallback to RecursiveCharacterTextSplitter.
    """
    if not reader_output.text:
        raise ValueError("reader_output.text is empty or None")

    filetype = self._guess_filetype(reader_output)
    tuples = self._make_tuples("md")  # Always work in Markdown space.

    text = reader_output.text

    # HTML → Markdown using the project's converter
    if filetype == "html":
        text = HtmlToMarkdown().convert(text)
    else:
        # Normalize Setext headings if already Markdown
        text = self._normalize_setext(text)

    # Detect presence of ATX headers (after conversion/normalization)
    has_headers = bool(re.search(r"(?m)^\s*#{1,6}\s+\S", text))

    # Configure header splitter. group_header_with_content -> strip_headers False
    splitter = MarkdownHeaderTextSplitter(
        headers_to_split_on=tuples,
        return_each_line=False,
        strip_headers=not self.group_header_with_content,
    )

    docs = splitter.split_text(text) if has_headers else []
    # Fallback if no headers were found
    if not docs:
        rc = RecursiveCharacterTextSplitter(
            chunk_size=max(1, int(self.chunk_size) or 1000),
            chunk_overlap=min(200, max(0, int(self.chunk_size) // 10)),
        )
        docs = rc.create_documents([text])

    chunks = [doc.page_content for doc in docs]

    return SplitterOutput(
        chunks=chunks,
        chunk_id=self._generate_chunk_ids(len(chunks)),
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="header_splitter",
        split_params={
            "headers_to_split_on": self.headers_to_split_on,
            "group_header_with_content": self.group_header_with_content,
        },
        metadata=self._default_metadata(),
    )

RecursiveJSONSplitter

RecursiveJSONSplitter

Bases: BaseSplitter

RecursiveJSONSplitter splits a JSON string or structure into overlapping or non-overlapping chunks, using the Langchain RecursiveJsonSplitter. This splitter is designed to recursively break down JSON data (including nested objects and arrays) into manageable pieces based on keys, arrays, or other separators, until the desired chunk size is reached.

Parameters:

Name Type Description Default
chunk_size int

Maximum chunk size, measured in the number of characters per chunk.

1000
min_chunk_size int

Minimum chunk size, in characters.

200
Notes

See Langchain Docs on RecursiveJsonSplitter.

Source code in src/splitter_mr/splitter/splitters/json_splitter.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class RecursiveJSONSplitter(BaseSplitter):
    """
    RecursiveJSONSplitter splits a JSON string or structure into overlapping or non-overlapping
    chunks, using the Langchain RecursiveJsonSplitter. This splitter is designed to recursively
    break down JSON data (including nested objects and arrays) into manageable pieces based on keys,
    arrays, or other separators, until the desired chunk size is reached.

    Args:
        chunk_size (int): Maximum chunk size, measured in the number of characters per chunk.
        min_chunk_size (int): Minimum chunk size, in characters.

    Notes:
        See [Langchain Docs on RecursiveJsonSplitter](https://python.langchain.com/api_reference/text_splitters/json/langchain_text_splitters.json.RecursiveJsonSplitter.html#langchain_text_splitters.json.RecursiveJsonSplitter).
    """

    def __init__(self, chunk_size: int = 1000, min_chunk_size: int = 200):
        super().__init__(chunk_size)
        self.min_chunk_size = min_chunk_size

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Splits the input JSON text from the reader_output dictionary into recursively chunked pieces,
        allowing for overlap by number or percentage of characters.

        Args:
            reader_output (Dict[str, Any]):
                Dictionary containing at least a 'text' key (str) and optional document metadata
                (e.g., 'document_name', 'document_path', etc.).

        Returns:
            SplitterOutput: Dataclass defining the output structure for all splitters.

        Raises:
            ValueError: If the 'text' field is missing from reader_output.
            json.JSONDecodeError: If the 'text' field contains invalid JSON.

        Example:
            ```python
            from splitter_mr.splitter import RecursiveJSONSplitter

            # This dictionary has been obtained from `VanillaReader`
            reader_output = ReaderOutput(
                text: '{"company": {"name": "TechCorp", "employees": [{"name": "Alice"}, {"name": "Bob"}]}}'
                document_name: "company_data.json",
                document_path: "/https://raw.githubusercontent.com/andreshere00/Splitter_MR/refs/heads/main/data/company_data.json",
                document_id: "doc123",
                conversion_method: "vanilla",
                ocr_method: None
            )
            splitter = RecursiveJSONSplitter(chunk_size=100, min_chunk_size=20)
            output = splitter.split(reader_output)
            print(output["chunks"])
            ```
            ```python
            ['{"company": {"name": "TechCorp"}}',
            '{"employees": [{"name": "Alice"},
            {"name": "Bob"}]}']
            ```
        """
        # Initialize variables
        text = json.loads(reader_output.text)

        # Split text into smaller JSON chunks
        splitter = RecursiveJsonSplitter(
            max_chunk_size=self.chunk_size,
            min_chunk_size=int(self.chunk_size - self.min_chunk_size),
        )
        chunks = splitter.split_text(json_data=text, convert_lists=True)

        # Generate chunk_ids and metadata
        chunk_ids = self._generate_chunk_ids(len(chunks))
        metadata = self._default_metadata()

        # Return output
        output = SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="recursive_json_splitter",
            split_params={
                "max_chunk_size": self.chunk_size,
                "min_chunk_size": self.min_chunk_size,
            },
            metadata=metadata,
        )
        return output
split(reader_output)

Splits the input JSON text from the reader_output dictionary into recursively chunked pieces, allowing for overlap by number or percentage of characters.

Parameters:

Name Type Description Default
reader_output Dict[str, Any]

Dictionary containing at least a 'text' key (str) and optional document metadata (e.g., 'document_name', 'document_path', etc.).

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

Dataclass defining the output structure for all splitters.

Raises:

Type Description
ValueError

If the 'text' field is missing from reader_output.

JSONDecodeError

If the 'text' field contains invalid JSON.

Example

from splitter_mr.splitter import RecursiveJSONSplitter

# This dictionary has been obtained from `VanillaReader`
reader_output = ReaderOutput(
    text: '{"company": {"name": "TechCorp", "employees": [{"name": "Alice"}, {"name": "Bob"}]}}'
    document_name: "company_data.json",
    document_path: "/https://raw.githubusercontent.com/andreshere00/Splitter_MR/refs/heads/main/data/company_data.json",
    document_id: "doc123",
    conversion_method: "vanilla",
    ocr_method: None
)
splitter = RecursiveJSONSplitter(chunk_size=100, min_chunk_size=20)
output = splitter.split(reader_output)
print(output["chunks"])
['{"company": {"name": "TechCorp"}}',
'{"employees": [{"name": "Alice"},
{"name": "Bob"}]}']

Source code in src/splitter_mr/splitter/splitters/json_splitter.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Splits the input JSON text from the reader_output dictionary into recursively chunked pieces,
    allowing for overlap by number or percentage of characters.

    Args:
        reader_output (Dict[str, Any]):
            Dictionary containing at least a 'text' key (str) and optional document metadata
            (e.g., 'document_name', 'document_path', etc.).

    Returns:
        SplitterOutput: Dataclass defining the output structure for all splitters.

    Raises:
        ValueError: If the 'text' field is missing from reader_output.
        json.JSONDecodeError: If the 'text' field contains invalid JSON.

    Example:
        ```python
        from splitter_mr.splitter import RecursiveJSONSplitter

        # This dictionary has been obtained from `VanillaReader`
        reader_output = ReaderOutput(
            text: '{"company": {"name": "TechCorp", "employees": [{"name": "Alice"}, {"name": "Bob"}]}}'
            document_name: "company_data.json",
            document_path: "/https://raw.githubusercontent.com/andreshere00/Splitter_MR/refs/heads/main/data/company_data.json",
            document_id: "doc123",
            conversion_method: "vanilla",
            ocr_method: None
        )
        splitter = RecursiveJSONSplitter(chunk_size=100, min_chunk_size=20)
        output = splitter.split(reader_output)
        print(output["chunks"])
        ```
        ```python
        ['{"company": {"name": "TechCorp"}}',
        '{"employees": [{"name": "Alice"},
        {"name": "Bob"}]}']
        ```
    """
    # Initialize variables
    text = json.loads(reader_output.text)

    # Split text into smaller JSON chunks
    splitter = RecursiveJsonSplitter(
        max_chunk_size=self.chunk_size,
        min_chunk_size=int(self.chunk_size - self.min_chunk_size),
    )
    chunks = splitter.split_text(json_data=text, convert_lists=True)

    # Generate chunk_ids and metadata
    chunk_ids = self._generate_chunk_ids(len(chunks))
    metadata = self._default_metadata()

    # Return output
    output = SplitterOutput(
        chunks=chunks,
        chunk_id=chunk_ids,
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="recursive_json_splitter",
        split_params={
            "max_chunk_size": self.chunk_size,
            "min_chunk_size": self.min_chunk_size,
        },
        metadata=metadata,
    )
    return output

HTMLTagSplitter

HTMLTagSplitter

Bases: BaseSplitter

HTMLTagSplitter splits HTML content into chunks based on a specified tag. Supports batching and optional Markdown conversion.

Behavior
  • When tag is specified (e.g., tag="div"), finds all matching elements.
  • When tag is None, splits by the most frequent and shallowest tag.

Parameters:

Name Type Description Default
chunk_size int

Maximum chunk size in characters (only used when batch=True).

1
tag str | None

HTML tag to split on. If None, auto-detects the best tag.

None
batch bool

If True (default), groups multiple tags into a chunk, not exceeding chunk_size. If False, returns one chunk per tag, ignoring chunk_size.

True
to_markdown bool

If True, converts each chunk to Markdown using HtmlToMarkdown.

True
Example

reader_output = ReaderOutput(text="

A
B
") splitter = HTMLTagSplitter(tag="div", batch=False) splitter.split(reader_output).chunks ['
A
', '
B
'] splitter = HTMLTagSplitter(tag="div", batch=True, chunk_size=100) splitter.split(reader_output).chunks ['
A
B
'] splitter = HTMLTagSplitter(tag="div", batch=False, to_markdown=True) splitter.split(reader_output).chunks ['A', 'B']

Attributes:

Name Type Description
chunk_size int

Maximum chunk size.

tag Optional[str]

Tag to split on.

batch bool

Whether to group elements into chunks.

to_markdown bool

Whether to convert each chunk to Markdown.

Source code in src/splitter_mr/splitter/splitters/html_tag_splitter.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
class HTMLTagSplitter(BaseSplitter):
    """
    HTMLTagSplitter splits HTML content into chunks based on a specified tag.
    Supports batching and optional Markdown conversion.

    Behavior:
      - When `tag` is specified (e.g., tag="div"), finds all matching elements.
      - When `tag` is None, splits by the most frequent and shallowest tag.

    Args:
        chunk_size (int): Maximum chunk size in characters (only used when `batch=True`).
        tag (str | None): HTML tag to split on. If None, auto-detects the best tag.
        batch (bool): If True (default), groups multiple tags into a chunk, not exceeding `chunk_size`.
            If False, returns one chunk per tag, ignoring chunk_size.
        to_markdown (bool): If True, converts each chunk to Markdown using HtmlToMarkdown.

    Example:
        >>> reader_output = ReaderOutput(text="<div>A</div><div>B</div>")
        >>> splitter = HTMLTagSplitter(tag="div", batch=False)
        >>> splitter.split(reader_output).chunks
        ['<html><body><div>A</div></body></html>', '<html><body><div>B</div></body></html>']
        >>> splitter = HTMLTagSplitter(tag="div", batch=True, chunk_size=100)
        >>> splitter.split(reader_output).chunks
        ['<html><body><div>A</div><div>B</div></body></html>']
        >>> splitter = HTMLTagSplitter(tag="div", batch=False, to_markdown=True)
        >>> splitter.split(reader_output).chunks
        ['A', 'B']

    Attributes:
        chunk_size (int): Maximum chunk size.
        tag (Optional[str]): Tag to split on.
        batch (bool): Whether to group elements into chunks.
        to_markdown (bool): Whether to convert each chunk to Markdown.
    """

    def __init__(
        self,
        chunk_size: int = 1,
        tag: Optional[str] = None,
        *,
        batch: bool = True,
        to_markdown: bool = True,
    ):
        """
        Initialize HTMLTagSplitter.

        Args:
            chunk_size (int): Maximum chunk size, in characters (only for batching).
            tag (str | None): Tag to split on. If None, auto-detects.
            batch (bool): If True (default), groups tags up to `chunk_size`.
            to_markdown (bool): If True (default), convert each chunk to Markdown.
        """
        super().__init__(chunk_size)
        self.tag = tag
        self.batch = batch
        self.to_markdown = to_markdown

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Splits HTML using the specified tag and batching, with optional Markdown conversion.

        Semantics:
        - Tables:
            * batch=False -> one chunk per requested element. If splitting by a row-level tag
                (e.g. 'tr'), emit a mini-table per row: <thead> once + that row in <tbody>.
            * batch=True and chunk_size in (0, 1, None) -> all tables in one chunk.
            * batch=True and chunk_size > 1 -> split each table into multiple chunks
                by batching <tr> rows (copying a <thead> into every chunk and
                skipping the header row from the body).
        - Non-table tags:
            * batch=False -> one chunk per element.
            * batch=True and chunk_size in (0, 1, None) -> all elements in one chunk.
            * batch=True and chunk_size > 1 -> batch by total HTML length.

        Args:
            reader_output: ReaderOutput containing at least `text`.

        Returns:
            SplitterOutput
        """
        html = getattr(reader_output, "text", "") or ""
        soup = BeautifulSoup(html, "html.parser")
        tag = self.tag or self._auto_tag(soup)

        # Locate elements for the chosen tag.
        try:
            elements = soup.find_all(tag)
            table_children = {"tr", "thead", "tbody", "th", "td"}
            # Only escalate to table when batching is enabled. For non-batch,
            # keep the exact tag so we can emit one chunk per element.
            if self.batch and tag in table_children:
                seen = set()
                parent_tables = []
                for el in elements:
                    table = el.find_parent("table")
                    if table and id(table) not in seen:
                        seen.add(id(table))
                        parent_tables.append(table)
                if parent_tables:
                    elements = parent_tables
                    tag = "table"
        except Exception:
            elements = []

        # -------- helpers -------- #

        def build_doc_with_children(children: List) -> str:
            """Wrap a list of top-level nodes into <html><body>…</body></html>."""
            doc = BeautifulSoup("", "html.parser")
            html_tag = doc.new_tag("html")
            body_tag = doc.new_tag("body")
            html_tag.append(body_tag)
            doc.append(html_tag)
            for c in children:
                body_tag.append(copy.deepcopy(c))
            return str(doc)

        def extract_table_header_and_rows(table_tag):
            """
            Return (header_thead, data_rows, header_row_src) where:
            - header_thead is a <thead> (deep-copied) or None
            - data_rows is a list of original <tr> nodes that are NOT header rows
            - header_row_src is the original <tr> used to synthesize <thead> (if any)
            """
            header = table_tag.find("thead")
            header_row_src = None

            if header is not None:
                data_rows = []
                for tr in table_tag.find_all("tr"):
                    if tr.find_parent("thead") is not None:
                        continue
                    data_rows.append(tr)
                return copy.deepcopy(header), data_rows, None

            first_tr = table_tag.find("tr")
            header_thead = None
            if first_tr is not None:
                tmp = BeautifulSoup("", "html.parser")
                thead = tmp.new_tag("thead")
                thead.append(copy.deepcopy(first_tr))
                header_thead = thead
                header_row_src = first_tr

            data_rows = []
            for tr in table_tag.find_all("tr"):
                if header_row_src is not None and tr is header_row_src:
                    continue
                if tr.find_parent("thead") is not None:
                    continue
                data_rows.append(tr)

            return header_thead, data_rows, header_row_src

        def build_table_chunk(table_tag, rows_subset: List) -> str:
            """
            Build a <html><body><table>… chunk with:
            - original table attributes
            - a <thead> (original or synthesized)
            - a <tbody> containing rows_subset
            """
            header_thead, _, _ = extract_table_header_and_rows(table_tag)
            doc = BeautifulSoup("", "html.parser")
            html_tag = doc.new_tag("html")
            body_tag = doc.new_tag("body")
            html_tag.append(body_tag)
            doc.append(html_tag)

            new_table = doc.new_tag("table", **table_tag.attrs)
            if header_thead is not None:
                new_table.append(copy.deepcopy(header_thead))

            tbody = doc.new_tag("tbody")
            for r in rows_subset:
                tbody.append(copy.deepcopy(r))
            new_table.append(tbody)

            body_tag.append(new_table)
            return str(doc)

        # -------- main chunking -------- #

        chunks: List[str] = []

        if tag == "table":
            # TABLES: custom batching
            if not self.batch:
                # one chunk per table (full)
                chunks = [build_doc_with_children([el]) for el in elements]

            elif self.chunk_size in (0, 1, None):
                # all tables together
                chunks = [build_doc_with_children(elements)] if elements else [""]

            else:
                # batch rows within each table
                for table_el in elements:
                    header_thead, rows, _ = extract_table_header_and_rows(table_el)
                    if not rows:
                        chunks.append(build_doc_with_children([table_el]))
                        continue

                    buf: List = []
                    for row in rows:
                        test_buf = buf + [row]
                        test_html = build_table_chunk(table_el, test_buf)
                        if len(test_html) > self.chunk_size and buf:
                            chunks.append(build_table_chunk(table_el, buf))
                            buf = [row]
                        else:
                            buf = test_buf
                    if buf:
                        chunks.append(build_table_chunk(table_el, buf))

        else:
            # NON-TABLE (including table children when batch=False)
            table_children = {"tr", "thead", "tbody", "th", "td"}

            if not self.batch:
                if tag in table_children:
                    # one chunk per row-like element, but keep header context
                    for el in elements:
                        table_el = el.find_parent("table")
                        if not table_el:
                            # Fallback: wrap the element as-is
                            chunks.append(build_doc_with_children([el]))
                            continue
                        # skip header-only rows
                        if el.name == "tr" and el.find_parent("thead") is not None:
                            continue
                        if el.name in {"thead", "th"}:
                            continue
                        chunks.append(build_table_chunk(table_el, [el]))
                else:
                    for el in elements:
                        chunks.append(build_doc_with_children([el]))

            elif self.chunk_size in (0, 1, None):
                chunks = [build_doc_with_children(elements)] if elements else [""]

            else:
                buffer = []
                for el in elements:
                    test_buffer = buffer + [el]
                    test_chunk_str = build_doc_with_children(test_buffer)
                    if len(test_chunk_str) > self.chunk_size and buffer:
                        chunks.append(build_doc_with_children(buffer))
                        buffer = [el]
                    else:
                        buffer = test_buffer
                if buffer:
                    chunks.append(build_doc_with_children(buffer))

        if not chunks:
            chunks = [""]

        if self.to_markdown:
            md = HtmlToMarkdown()
            chunks = [md.convert(chunk) for chunk in chunks]

        chunk_ids = self._generate_chunk_ids(len(chunks))
        return SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="html_tag_splitter",
            split_params={
                "chunk_size": self.chunk_size,
                "tag": tag,
                "batch": self.batch,
                "to_markdown": self.to_markdown,
            },
            metadata=self._default_metadata(),
        )

    def _auto_tag(self, soup: BeautifulSoup) -> str:
        """
        Auto-detect the most repeated tag with the highest (shallowest) level of hierarchy.
        If no repeated tags are found, return the first tag found in <body> or fallback to 'div'.
        """
        from collections import Counter, defaultdict

        body = soup.find("body")
        if not body:
            return "div"

        # Traverse all tags in body, tracking tag: (count, min_depth)
        tag_counter = Counter()
        tag_min_depth = defaultdict(lambda: float("inf"))

        def traverse(el, depth=0):
            for child in el.children:
                if getattr(child, "name", None):
                    tag_counter[child.name] += 1
                    tag_min_depth[child.name] = min(tag_min_depth[child.name], depth)
                    traverse(child, depth + 1)

        traverse(body)

        if not tag_counter:
            # fallback to first tag
            for t in body.find_all(True, recursive=True):
                return t.name
            return "div"

        # Find tags with the maximum count
        max_count = max(tag_counter.values())
        candidates = [t for t, cnt in tag_counter.items() if cnt == max_count]
        # Of the most frequent, pick the one with the minimum depth (shallowest)
        chosen = min(candidates, key=lambda t: tag_min_depth[t])
        return chosen
__init__(chunk_size=1, tag=None, *, batch=True, to_markdown=True)

Initialize HTMLTagSplitter.

Parameters:

Name Type Description Default
chunk_size int

Maximum chunk size, in characters (only for batching).

1
tag str | None

Tag to split on. If None, auto-detects.

None
batch bool

If True (default), groups tags up to chunk_size.

True
to_markdown bool

If True (default), convert each chunk to Markdown.

True
Source code in src/splitter_mr/splitter/splitters/html_tag_splitter.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def __init__(
    self,
    chunk_size: int = 1,
    tag: Optional[str] = None,
    *,
    batch: bool = True,
    to_markdown: bool = True,
):
    """
    Initialize HTMLTagSplitter.

    Args:
        chunk_size (int): Maximum chunk size, in characters (only for batching).
        tag (str | None): Tag to split on. If None, auto-detects.
        batch (bool): If True (default), groups tags up to `chunk_size`.
        to_markdown (bool): If True (default), convert each chunk to Markdown.
    """
    super().__init__(chunk_size)
    self.tag = tag
    self.batch = batch
    self.to_markdown = to_markdown
split(reader_output)

Splits HTML using the specified tag and batching, with optional Markdown conversion.

Semantics: - Tables: * batch=False -> one chunk per requested element. If splitting by a row-level tag (e.g. 'tr'), emit a mini-table per row: once + that row in . * batch=True and chunk_size in (0, 1, None) -> all tables in one chunk. * batch=True and chunk_size > 1 -> split each table into multiple chunks by batching rows (copying a into every chunk and skipping the header row from the body). - Non-table tags: * batch=False -> one chunk per element. * batch=True and chunk_size in (0, 1, None) -> all elements in one chunk. * batch=True and chunk_size > 1 -> batch by total HTML length.

Parameters:

Name Type Description Default
reader_output ReaderOutput

ReaderOutput containing at least text.

required

Returns:

Type Description
SplitterOutput

SplitterOutput

Source code in src/splitter_mr/splitter/splitters/html_tag_splitter.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Splits HTML using the specified tag and batching, with optional Markdown conversion.

    Semantics:
    - Tables:
        * batch=False -> one chunk per requested element. If splitting by a row-level tag
            (e.g. 'tr'), emit a mini-table per row: <thead> once + that row in <tbody>.
        * batch=True and chunk_size in (0, 1, None) -> all tables in one chunk.
        * batch=True and chunk_size > 1 -> split each table into multiple chunks
            by batching <tr> rows (copying a <thead> into every chunk and
            skipping the header row from the body).
    - Non-table tags:
        * batch=False -> one chunk per element.
        * batch=True and chunk_size in (0, 1, None) -> all elements in one chunk.
        * batch=True and chunk_size > 1 -> batch by total HTML length.

    Args:
        reader_output: ReaderOutput containing at least `text`.

    Returns:
        SplitterOutput
    """
    html = getattr(reader_output, "text", "") or ""
    soup = BeautifulSoup(html, "html.parser")
    tag = self.tag or self._auto_tag(soup)

    # Locate elements for the chosen tag.
    try:
        elements = soup.find_all(tag)
        table_children = {"tr", "thead", "tbody", "th", "td"}
        # Only escalate to table when batching is enabled. For non-batch,
        # keep the exact tag so we can emit one chunk per element.
        if self.batch and tag in table_children:
            seen = set()
            parent_tables = []
            for el in elements:
                table = el.find_parent("table")
                if table and id(table) not in seen:
                    seen.add(id(table))
                    parent_tables.append(table)
            if parent_tables:
                elements = parent_tables
                tag = "table"
    except Exception:
        elements = []

    # -------- helpers -------- #

    def build_doc_with_children(children: List) -> str:
        """Wrap a list of top-level nodes into <html><body>…</body></html>."""
        doc = BeautifulSoup("", "html.parser")
        html_tag = doc.new_tag("html")
        body_tag = doc.new_tag("body")
        html_tag.append(body_tag)
        doc.append(html_tag)
        for c in children:
            body_tag.append(copy.deepcopy(c))
        return str(doc)

    def extract_table_header_and_rows(table_tag):
        """
        Return (header_thead, data_rows, header_row_src) where:
        - header_thead is a <thead> (deep-copied) or None
        - data_rows is a list of original <tr> nodes that are NOT header rows
        - header_row_src is the original <tr> used to synthesize <thead> (if any)
        """
        header = table_tag.find("thead")
        header_row_src = None

        if header is not None:
            data_rows = []
            for tr in table_tag.find_all("tr"):
                if tr.find_parent("thead") is not None:
                    continue
                data_rows.append(tr)
            return copy.deepcopy(header), data_rows, None

        first_tr = table_tag.find("tr")
        header_thead = None
        if first_tr is not None:
            tmp = BeautifulSoup("", "html.parser")
            thead = tmp.new_tag("thead")
            thead.append(copy.deepcopy(first_tr))
            header_thead = thead
            header_row_src = first_tr

        data_rows = []
        for tr in table_tag.find_all("tr"):
            if header_row_src is not None and tr is header_row_src:
                continue
            if tr.find_parent("thead") is not None:
                continue
            data_rows.append(tr)

        return header_thead, data_rows, header_row_src

    def build_table_chunk(table_tag, rows_subset: List) -> str:
        """
        Build a <html><body><table>… chunk with:
        - original table attributes
        - a <thead> (original or synthesized)
        - a <tbody> containing rows_subset
        """
        header_thead, _, _ = extract_table_header_and_rows(table_tag)
        doc = BeautifulSoup("", "html.parser")
        html_tag = doc.new_tag("html")
        body_tag = doc.new_tag("body")
        html_tag.append(body_tag)
        doc.append(html_tag)

        new_table = doc.new_tag("table", **table_tag.attrs)
        if header_thead is not None:
            new_table.append(copy.deepcopy(header_thead))

        tbody = doc.new_tag("tbody")
        for r in rows_subset:
            tbody.append(copy.deepcopy(r))
        new_table.append(tbody)

        body_tag.append(new_table)
        return str(doc)

    # -------- main chunking -------- #

    chunks: List[str] = []

    if tag == "table":
        # TABLES: custom batching
        if not self.batch:
            # one chunk per table (full)
            chunks = [build_doc_with_children([el]) for el in elements]

        elif self.chunk_size in (0, 1, None):
            # all tables together
            chunks = [build_doc_with_children(elements)] if elements else [""]

        else:
            # batch rows within each table
            for table_el in elements:
                header_thead, rows, _ = extract_table_header_and_rows(table_el)
                if not rows:
                    chunks.append(build_doc_with_children([table_el]))
                    continue

                buf: List = []
                for row in rows:
                    test_buf = buf + [row]
                    test_html = build_table_chunk(table_el, test_buf)
                    if len(test_html) > self.chunk_size and buf:
                        chunks.append(build_table_chunk(table_el, buf))
                        buf = [row]
                    else:
                        buf = test_buf
                if buf:
                    chunks.append(build_table_chunk(table_el, buf))

    else:
        # NON-TABLE (including table children when batch=False)
        table_children = {"tr", "thead", "tbody", "th", "td"}

        if not self.batch:
            if tag in table_children:
                # one chunk per row-like element, but keep header context
                for el in elements:
                    table_el = el.find_parent("table")
                    if not table_el:
                        # Fallback: wrap the element as-is
                        chunks.append(build_doc_with_children([el]))
                        continue
                    # skip header-only rows
                    if el.name == "tr" and el.find_parent("thead") is not None:
                        continue
                    if el.name in {"thead", "th"}:
                        continue
                    chunks.append(build_table_chunk(table_el, [el]))
            else:
                for el in elements:
                    chunks.append(build_doc_with_children([el]))

        elif self.chunk_size in (0, 1, None):
            chunks = [build_doc_with_children(elements)] if elements else [""]

        else:
            buffer = []
            for el in elements:
                test_buffer = buffer + [el]
                test_chunk_str = build_doc_with_children(test_buffer)
                if len(test_chunk_str) > self.chunk_size and buffer:
                    chunks.append(build_doc_with_children(buffer))
                    buffer = [el]
                else:
                    buffer = test_buffer
            if buffer:
                chunks.append(build_doc_with_children(buffer))

    if not chunks:
        chunks = [""]

    if self.to_markdown:
        md = HtmlToMarkdown()
        chunks = [md.convert(chunk) for chunk in chunks]

    chunk_ids = self._generate_chunk_ids(len(chunks))
    return SplitterOutput(
        chunks=chunks,
        chunk_id=chunk_ids,
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="html_tag_splitter",
        split_params={
            "chunk_size": self.chunk_size,
            "tag": tag,
            "batch": self.batch,
            "to_markdown": self.to_markdown,
        },
        metadata=self._default_metadata(),
    )

RowColumnSplitter

RowColumnSplitter

Bases: BaseSplitter

RowColumnSplitter splits tabular data (such as CSV, TSV, Markdown tables, or JSON tables) into smaller tables based on rows, columns, or by total character size while preserving row integrity.

This splitter supports several modes:

  • By rows: Split the table into chunks with a fixed number of rows, with optional overlapping rows between chunks.
  • By columns: Split the table into chunks by columns, with optional overlapping columns between chunks.
  • By chunk size: Split the table into markdown-formatted table chunks, where each chunk contains as many complete rows as fit under the specified character limit, optionally overlapping a fixed number of rows between chunks.

This is useful for splitting large tabular files for downstream processing, LLM ingestion, or display, while preserving semantic and structural integrity of the data.

Parameters:

Name Type Description Default
chunk_size int

Maximum number of characters per chunk (when using character-based splitting).

1000
num_rows int

Number of rows per chunk. Mutually exclusive with num_cols.

0
num_cols int

Number of columns per chunk. Mutually exclusive with num_rows.

0
chunk_overlap Union[int, float]

Number of overlapping rows or columns between chunks. If a float in (0,1), interpreted as a percentage of rows or columns. If integer, the number of overlapping rows/columns. When chunking by character size, this refers to the number of overlapping rows (not characters).

0

Supported formats: CSV, TSV, TXT, Markdown table, JSON (tabular: list of dicts or dict of lists).

Source code in src/splitter_mr/splitter/splitters/row_column_splitter.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
class RowColumnSplitter(BaseSplitter):
    """
    RowColumnSplitter splits tabular data (such as CSV, TSV, Markdown tables, or JSON tables)
    into smaller tables based on rows, columns, or by total character size while preserving row integrity.

    This splitter supports several modes:

    - **By rows**: Split the table into chunks with a fixed number of rows, with optional overlapping
        rows between chunks.
    - **By columns**: Split the table into chunks by columns, with optional overlapping columns between chunks.
    - **By chunk size**: Split the table into markdown-formatted table chunks, where each chunk contains
        as many complete rows as fit under the specified character limit, optionally overlapping a fixed
        number of rows between chunks.

    This is useful for splitting large tabular files for downstream processing, LLM ingestion,
    or display, while preserving semantic and structural integrity of the data.

    Args:
        chunk_size (int): Maximum number of characters per chunk (when using character-based splitting).
        num_rows (int): Number of rows per chunk. Mutually exclusive with num_cols.
        num_cols (int): Number of columns per chunk. Mutually exclusive with num_rows.
        chunk_overlap (Union[int, float]): Number of overlapping rows or columns between chunks.
            If a float in (0,1), interpreted as a percentage of rows or columns. If integer, the number of
            overlapping rows/columns. When chunking by character size, this refers to the number of overlapping
            rows (not characters).

    Supported formats: CSV, TSV, TXT, Markdown table, JSON (tabular: list of dicts or dict of lists).
    """

    def __init__(
        self,
        chunk_size: int = 1000,
        num_rows: int = 0,
        num_cols: int = 0,
        chunk_overlap: Union[int, float] = 0,
    ):
        super().__init__(chunk_size)
        self.num_rows = num_rows
        self.num_cols = num_cols
        self.chunk_overlap = chunk_overlap

        if num_rows and num_cols:
            raise ValueError("num_rows and num_cols are mutually exclusive")
        if isinstance(chunk_overlap, float) and chunk_overlap >= 1:
            raise ValueError("chunk_overlap as float must be < 1")

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Splits the input tabular data into multiple markdown table chunks according to the specified
        chunking strategy. Each output chunk is a complete markdown table with header, and will never
        cut a row in half. The overlap is always applied in terms of full rows or columns.

        Args:
            reader_output (Dict[str, Any]):
                Dictionary output from a Reader, containing at least:
                    - 'text': The tabular data as string.
                    - 'conversion_method': Format of the input ('csv', 'tsv', 'markdown', 'json', etc.).
                    - Additional document metadata fields (optional).

        Returns:
            SplitterOutput: Dataclass defining the output structure for all splitters.

        Raises:
            ValueError: If both num_rows and num_cols are set.
            ValueError: If chunk_overlap as float is not in [0,1).
            ValueError: If chunk_size is too small to fit the header and at least one data row.

        Example:
            ```python
            reader_output = ReaderOutput(
                text: '| id | name |\\n|----|------|\\n| 1  | A    |\\n| 2  | B    |\\n| 3  | C    |',
                conversion_method: "markdown",
                document_name: "table.md",
                document_path: "/path/table.md",
            )
            splitter = RowColumnSplitter(chunk_size=80, chunk_overlap=20)
            output = splitter.split(reader_output)
            for chunk in output["chunks"]:
                print("\\n" + str(chunk) + "\\n")
            ```
            ```python
            | id   | name   |
            |------|--------|
            |  1   | A      |
            |  2   | B      |

            | id   | name   |
            |------|--------|
            |  2   | B      |
            |  3   | C      |
            ```
        """
        # Step 1. Parse the table depending on conversion_method
        df = self._load_tabular(reader_output)
        orig_method = reader_output.conversion_method
        col_names = df.columns.tolist()

        # Step 2. Split logic
        chunks = []
        meta_per_chunk = []

        # If splitting strategy is by rows
        if self.num_rows > 0:
            overlap = self._get_overlap(self.num_rows)
            for i in range(
                0,
                len(df),
                self.num_rows - overlap if (self.num_rows - overlap) > 0 else 1,
            ):
                chunk_df = df.iloc[i : i + self.num_rows]
                if not chunk_df.empty:
                    chunk_str = self._to_str(chunk_df, orig_method)
                    chunks.append(chunk_str)
                    meta_per_chunk.append(
                        {"rows": chunk_df.index.tolist(), "type": "row"}
                    )
        # If splitting strategy is by columns
        elif self.num_cols > 0:
            overlap = self._get_overlap(self.num_cols)
            total_cols = len(col_names)
            for i in range(
                0,
                total_cols,
                self.num_cols - overlap if (self.num_cols - overlap) > 0 else 1,
            ):
                sel_cols = col_names[i : i + self.num_cols]
                if sel_cols:
                    chunk_df = df[sel_cols]
                    chunk_str = self._to_str(chunk_df, orig_method, colwise=True)
                    chunks.append(chunk_str)
                    meta_per_chunk.append({"cols": sel_cols, "type": "column"})
        # If splitting strategy is given by the chunk_size
        else:
            header_lines = self._get_markdown_header(df)
            header_length = len(header_lines)

            row_md_list = [self._get_markdown_row(df, i) for i in range(len(df))]
            row_len_list = [len(r) + 1 for r in row_md_list]  # +1 for newline

            if self.chunk_size < header_length + row_len_list[0]:
                raise ValueError(
                    "chunk_size is too small to fit header and at least one row."
                )

            # Compute overlapping and headers in markdown tables
            chunks = []
            meta_per_chunk = []
            i = 0
            n = len(row_md_list)
            overlap = self._get_overlap(1)
            while i < n:
                curr_chunk = []
                curr_len = header_length
                j = i
                while j < n and curr_len + row_len_list[j] <= self.chunk_size:
                    curr_chunk.append(row_md_list[j])
                    curr_len += row_len_list[j]
                    j += 1

                rows_in_chunk = j - i
                chunk_str = header_lines + "\n".join(curr_chunk)
                chunks.append(chunk_str)
                meta_per_chunk.append({"rows": list(range(i, j)), "type": "char_row"})

                # --- compute overlap AFTER we know rows_in_chunk ---
                if isinstance(self.chunk_overlap, float):
                    overlap_rows = int(rows_in_chunk * self.chunk_overlap)
                else:
                    overlap_rows = int(self.chunk_overlap)

                # make sure we don’t loop forever
                overlap_rows = min(overlap_rows, rows_in_chunk - 1)
                i = j - overlap_rows

        # Generate chunk_id
        chunk_ids = self._generate_chunk_ids(len(chunks))

        # Return output
        output = SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="row_column_splitter",
            split_params={
                "chunk_size": self.chunk_size,
                "num_rows": self.num_rows,
                "num_cols": self.num_cols,
                "chunk_overlap": self.chunk_overlap,
            },
            metadata={"chunks": meta_per_chunk},
        )
        return output

    # Helper functions

    def _get_overlap(self, base: int):
        """
        Returns the overlap value as an integer, based on the configured chunk_overlap.

        If chunk_overlap is a float in (0,1), computes the overlap as a percentage of `base`.
        If chunk_overlap is an integer, returns it directly.

        Args:
            base (int): The base number (rows or columns) to compute the overlap from.
        Returns:
            int: The overlap as an integer.
        """
        if isinstance(self.chunk_overlap, float):
            return int(base * self.chunk_overlap)
        return int(self.chunk_overlap)

    def _load_tabular(self, reader_output: Dict[str, Any]) -> pd.DataFrame:
        """
        Loads and parses the input tabular data from a Reader output dictionary
        into a pandas DataFrame, based on its format.

        If the input is empty, returns an empty DataFrame.
        If the input is malformed (e.g., badly formatted markdown/CSV/TSV), a
        pandas.errors.ParserError is raised.

        Supports Markdown, CSV, TSV, TXT, and tabular JSON.

        Args:
            reader_output (Dict[str, Any]): Dictionary containing the text and conversion_method.

        Returns:
            pd.DataFrame: The loaded table as a DataFrame.

        Raises:
            pandas.errors.ParserError: If the input table is malformed and cannot be parsed.
        """
        text = reader_output.text
        # Return a void dataframe is a empty file is provided
        if not text or not text.strip():
            return pd.DataFrame()
        method = reader_output.conversion_method
        if method == "markdown":
            return self._parse_markdown_table(text)
        elif method == "csv" or method == "txt":
            return pd.read_csv(io.StringIO(text))
        elif method == "tsv":
            return pd.read_csv(io.StringIO(text), sep="\t")
        else:
            # Try JSON
            try:
                js = json.loads(text)
                if isinstance(js, list) and all(isinstance(row, dict) for row in js):
                    return pd.DataFrame(js)
                elif isinstance(js, dict):  # e.g., {col: [vals]}
                    return pd.DataFrame(js)
            except Exception:
                pass
            # Fallback: try CSV
            return pd.read_csv(io.StringIO(text))

    def _parse_markdown_table(self, md: str) -> pd.DataFrame:
        """
        Parses a markdown table string into a pandas DataFrame.

        Ignores non-table lines and trims markdown-specific formatting.
        Also handles the separator line (---) in the header.

        Args:
            md (str): The markdown table as a string.

        Returns:
            pd.DataFrame: Parsed table as a DataFrame.

        Raises:
            pandas.errors.ParserError: If the markdown table is malformed and cannot be parsed.
        """
        # Remove any lines not part of the table (e.g., text before/after)
        table_lines = []
        started = False
        for line in md.splitlines():
            if re.match(r"^\s*\|.*\|\s*$", line):
                started = True
                table_lines.append(line.strip())
            elif started and not line.strip():
                break  # stop at first blank line after table
        table_md = "\n".join(table_lines)
        table_io = io.StringIO(
            re.sub(
                r"^\s*\|",
                "",
                re.sub(r"\|\s*$", "", table_md, flags=re.MULTILINE),
                flags=re.MULTILINE,
            )
        )
        try:
            df = pd.read_csv(table_io, sep="|").rename(
                lambda x: x.strip(), axis="columns"
            )
        except pd.errors.ParserError as e:
            # Propagate the ParserError for your test to catch
            raise pd.errors.ParserError(f"Malformed markdown table: {e}") from e
        if not df.empty and all(re.match(r"^-+$", str(x).strip()) for x in df.iloc[0]):
            df = df.drop(df.index[0]).reset_index(drop=True)
        return df

    def _to_str(self, df: pd.DataFrame, method: str, colwise: bool = False) -> str:
        """
        Converts a DataFrame chunk to a string for output,
        either as a markdown table, CSV, or a list of columns.

        Args:
            df (pd.DataFrame): DataFrame chunk to convert.
            method (str): Input file format (for output style).
            colwise (bool): If True, output as a list of columns (used in column chunking).

        Returns:
            str: The chunk as a formatted string.
        """
        if colwise:
            # List of columns: output as a list of lists
            return (
                "["
                + ", ".join(  # noqa: W503
                    [str([col] + df[col].tolist()) for col in df.columns]  # noqa: W503
                )
                + "]"  # noqa: W503
            )
        if method == "markdown" or "md":
            # Use markdown table format
            return df.to_markdown(index=False)
        else:
            # Default to CSV format
            output = io.StringIO()
            df.to_csv(output, index=False)
            return output.getvalue().strip("\n")

    @staticmethod
    def _get_markdown_header(df):
        """
        Returns the header and separator lines for a markdown table as a string.

        Args:
            df (pd.DataFrame): DataFrame representing the table.

        Returns:
            str: Markdown table header and separator (with trailing newline).
        """

        lines = df.head(0).to_markdown(index=False).splitlines()
        return "\n".join(lines[:2]) + "\n"

    @staticmethod
    def _get_markdown_row(df, row_idx):
        """
        Returns a single row from the DataFrame formatted as a markdown table row.

        Args:
            df (pd.DataFrame): DataFrame containing the table.
            row_idx (int): Index of the row to extract.

        Returns:
            str: The markdown-formatted row string.
        """
        row = df.iloc[[row_idx]]
        # Get the full markdown output (with header),
        # extract only the last line (the data row)
        md = row.to_markdown(index=False).splitlines()
        return md[-1]
split(reader_output)

Splits the input tabular data into multiple markdown table chunks according to the specified chunking strategy. Each output chunk is a complete markdown table with header, and will never cut a row in half. The overlap is always applied in terms of full rows or columns.

Parameters:

Name Type Description Default
reader_output Dict[str, Any]

Dictionary output from a Reader, containing at least: - 'text': The tabular data as string. - 'conversion_method': Format of the input ('csv', 'tsv', 'markdown', 'json', etc.). - Additional document metadata fields (optional).

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

Dataclass defining the output structure for all splitters.

Raises:

Type Description
ValueError

If both num_rows and num_cols are set.

ValueError

If chunk_overlap as float is not in [0,1).

ValueError

If chunk_size is too small to fit the header and at least one data row.

Example

reader_output = ReaderOutput(
    text: '| id | name |\n|----|------|\n| 1  | A    |\n| 2  | B    |\n| 3  | C    |',
    conversion_method: "markdown",
    document_name: "table.md",
    document_path: "/path/table.md",
)
splitter = RowColumnSplitter(chunk_size=80, chunk_overlap=20)
output = splitter.split(reader_output)
for chunk in output["chunks"]:
    print("\n" + str(chunk) + "\n")
| id   | name   |
|------|--------|
|  1   | A      |
|  2   | B      |

| id   | name   |
|------|--------|
|  2   | B      |
|  3   | C      |

Source code in src/splitter_mr/splitter/splitters/row_column_splitter.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Splits the input tabular data into multiple markdown table chunks according to the specified
    chunking strategy. Each output chunk is a complete markdown table with header, and will never
    cut a row in half. The overlap is always applied in terms of full rows or columns.

    Args:
        reader_output (Dict[str, Any]):
            Dictionary output from a Reader, containing at least:
                - 'text': The tabular data as string.
                - 'conversion_method': Format of the input ('csv', 'tsv', 'markdown', 'json', etc.).
                - Additional document metadata fields (optional).

    Returns:
        SplitterOutput: Dataclass defining the output structure for all splitters.

    Raises:
        ValueError: If both num_rows and num_cols are set.
        ValueError: If chunk_overlap as float is not in [0,1).
        ValueError: If chunk_size is too small to fit the header and at least one data row.

    Example:
        ```python
        reader_output = ReaderOutput(
            text: '| id | name |\\n|----|------|\\n| 1  | A    |\\n| 2  | B    |\\n| 3  | C    |',
            conversion_method: "markdown",
            document_name: "table.md",
            document_path: "/path/table.md",
        )
        splitter = RowColumnSplitter(chunk_size=80, chunk_overlap=20)
        output = splitter.split(reader_output)
        for chunk in output["chunks"]:
            print("\\n" + str(chunk) + "\\n")
        ```
        ```python
        | id   | name   |
        |------|--------|
        |  1   | A      |
        |  2   | B      |

        | id   | name   |
        |------|--------|
        |  2   | B      |
        |  3   | C      |
        ```
    """
    # Step 1. Parse the table depending on conversion_method
    df = self._load_tabular(reader_output)
    orig_method = reader_output.conversion_method
    col_names = df.columns.tolist()

    # Step 2. Split logic
    chunks = []
    meta_per_chunk = []

    # If splitting strategy is by rows
    if self.num_rows > 0:
        overlap = self._get_overlap(self.num_rows)
        for i in range(
            0,
            len(df),
            self.num_rows - overlap if (self.num_rows - overlap) > 0 else 1,
        ):
            chunk_df = df.iloc[i : i + self.num_rows]
            if not chunk_df.empty:
                chunk_str = self._to_str(chunk_df, orig_method)
                chunks.append(chunk_str)
                meta_per_chunk.append(
                    {"rows": chunk_df.index.tolist(), "type": "row"}
                )
    # If splitting strategy is by columns
    elif self.num_cols > 0:
        overlap = self._get_overlap(self.num_cols)
        total_cols = len(col_names)
        for i in range(
            0,
            total_cols,
            self.num_cols - overlap if (self.num_cols - overlap) > 0 else 1,
        ):
            sel_cols = col_names[i : i + self.num_cols]
            if sel_cols:
                chunk_df = df[sel_cols]
                chunk_str = self._to_str(chunk_df, orig_method, colwise=True)
                chunks.append(chunk_str)
                meta_per_chunk.append({"cols": sel_cols, "type": "column"})
    # If splitting strategy is given by the chunk_size
    else:
        header_lines = self._get_markdown_header(df)
        header_length = len(header_lines)

        row_md_list = [self._get_markdown_row(df, i) for i in range(len(df))]
        row_len_list = [len(r) + 1 for r in row_md_list]  # +1 for newline

        if self.chunk_size < header_length + row_len_list[0]:
            raise ValueError(
                "chunk_size is too small to fit header and at least one row."
            )

        # Compute overlapping and headers in markdown tables
        chunks = []
        meta_per_chunk = []
        i = 0
        n = len(row_md_list)
        overlap = self._get_overlap(1)
        while i < n:
            curr_chunk = []
            curr_len = header_length
            j = i
            while j < n and curr_len + row_len_list[j] <= self.chunk_size:
                curr_chunk.append(row_md_list[j])
                curr_len += row_len_list[j]
                j += 1

            rows_in_chunk = j - i
            chunk_str = header_lines + "\n".join(curr_chunk)
            chunks.append(chunk_str)
            meta_per_chunk.append({"rows": list(range(i, j)), "type": "char_row"})

            # --- compute overlap AFTER we know rows_in_chunk ---
            if isinstance(self.chunk_overlap, float):
                overlap_rows = int(rows_in_chunk * self.chunk_overlap)
            else:
                overlap_rows = int(self.chunk_overlap)

            # make sure we don’t loop forever
            overlap_rows = min(overlap_rows, rows_in_chunk - 1)
            i = j - overlap_rows

    # Generate chunk_id
    chunk_ids = self._generate_chunk_ids(len(chunks))

    # Return output
    output = SplitterOutput(
        chunks=chunks,
        chunk_id=chunk_ids,
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="row_column_splitter",
        split_params={
            "chunk_size": self.chunk_size,
            "num_rows": self.num_rows,
            "num_cols": self.num_cols,
            "chunk_overlap": self.chunk_overlap,
        },
        metadata={"chunks": meta_per_chunk},
    )
    return output

CodeSplitter

CodeSplitter

Bases: BaseSplitter

CodeSplitter recursively splits source code into programmatically meaningful chunks (functions, classes, methods, etc.) for the given programming language.

Parameters:

Name Type Description Default
chunk_size int

Maximum chunk size, in characters.

1000
language str

Programming language (e.g., "python", "java", "kotlin", etc.)

'python'
Notes
  • Uses Langchain's RecursiveCharacterTextSplitter and its language-aware from_language method.
  • See Langchain docs: https://python.langchain.com/docs/how_to/code_splitter/
Source code in src/splitter_mr/splitter/splitters/code_splitter.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
class CodeSplitter(BaseSplitter):
    """
    CodeSplitter recursively splits source code into programmatically meaningful chunks
    (functions, classes, methods, etc.) for the given programming language.

    Args:
        chunk_size (int): Maximum chunk size, in characters.
        language (str): Programming language (e.g., "python", "java", "kotlin", etc.)

    Notes:
        - Uses Langchain's RecursiveCharacterTextSplitter and its language-aware `from_language` method.
        - See Langchain docs: https://python.langchain.com/docs/how_to/code_splitter/
    """

    def __init__(
        self,
        chunk_size: int = 1000,
        language: str = "python",
    ):
        super().__init__(chunk_size)
        self.language = language

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Splits code in `reader_output['text']` according to the syntax of the specified
        programming language, using function/class boundaries where possible.

        Args:
            reader_output (ReaderOutput): Object containing at least a 'text' field,
                plus optional document metadata.

        Returns:
            SplitterOutput: Dataclass defining the output structure for all splitters.

        Raises:
            ValueError: If language is not supported.

        Example:
            ```python
            from splitter_mr.splitter import CodeSplitter

            reader_output = ReaderOutput(
                text: "def foo():\\n    pass\\n\\nclass Bar:\\n    def baz(self):\\n        pass",
                document_name: "example.py",
                document_path: "/tmp/example.py"
            )
            splitter = CodeSplitter(chunk_size=50, language="python")
            output = splitter.split(reader_output)
            print(output.chunks)
            ```
            ```python
            ['def foo():\\n    pass\\n', 'class Bar:\\n    def baz(self):\\n        pass']
            ```
        """
        # Initialize variables
        text = reader_output.text
        chunk_size = self.chunk_size

        # Get Langchain language enum
        lang_enum = get_langchain_language(self.language)

        splitter = RecursiveCharacterTextSplitter.from_language(
            language=lang_enum, chunk_size=chunk_size, chunk_overlap=0
        )
        texts = splitter.create_documents([text])
        chunks = [doc.page_content for doc in texts]

        # Generate chunk_id and append metadata
        chunk_ids = self._generate_chunk_ids(len(chunks))
        metadata = self._default_metadata()

        # Return output
        output = SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="code_splitter",
            split_params={"chunk_size": chunk_size, "language": self.language},
            metadata=metadata,
        )
        return output
split(reader_output)

Splits code in reader_output['text'] according to the syntax of the specified programming language, using function/class boundaries where possible.

Parameters:

Name Type Description Default
reader_output ReaderOutput

Object containing at least a 'text' field, plus optional document metadata.

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

Dataclass defining the output structure for all splitters.

Raises:

Type Description
ValueError

If language is not supported.

Example

from splitter_mr.splitter import CodeSplitter

reader_output = ReaderOutput(
    text: "def foo():\n    pass\n\nclass Bar:\n    def baz(self):\n        pass",
    document_name: "example.py",
    document_path: "/tmp/example.py"
)
splitter = CodeSplitter(chunk_size=50, language="python")
output = splitter.split(reader_output)
print(output.chunks)
['def foo():\n    pass\n', 'class Bar:\n    def baz(self):\n        pass']

Source code in src/splitter_mr/splitter/splitters/code_splitter.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Splits code in `reader_output['text']` according to the syntax of the specified
    programming language, using function/class boundaries where possible.

    Args:
        reader_output (ReaderOutput): Object containing at least a 'text' field,
            plus optional document metadata.

    Returns:
        SplitterOutput: Dataclass defining the output structure for all splitters.

    Raises:
        ValueError: If language is not supported.

    Example:
        ```python
        from splitter_mr.splitter import CodeSplitter

        reader_output = ReaderOutput(
            text: "def foo():\\n    pass\\n\\nclass Bar:\\n    def baz(self):\\n        pass",
            document_name: "example.py",
            document_path: "/tmp/example.py"
        )
        splitter = CodeSplitter(chunk_size=50, language="python")
        output = splitter.split(reader_output)
        print(output.chunks)
        ```
        ```python
        ['def foo():\\n    pass\\n', 'class Bar:\\n    def baz(self):\\n        pass']
        ```
    """
    # Initialize variables
    text = reader_output.text
    chunk_size = self.chunk_size

    # Get Langchain language enum
    lang_enum = get_langchain_language(self.language)

    splitter = RecursiveCharacterTextSplitter.from_language(
        language=lang_enum, chunk_size=chunk_size, chunk_overlap=0
    )
    texts = splitter.create_documents([text])
    chunks = [doc.page_content for doc in texts]

    # Generate chunk_id and append metadata
    chunk_ids = self._generate_chunk_ids(len(chunks))
    metadata = self._default_metadata()

    # Return output
    output = SplitterOutput(
        chunks=chunks,
        chunk_id=chunk_ids,
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="code_splitter",
        split_params={"chunk_size": chunk_size, "language": self.language},
        metadata=metadata,
    )
    return output
get_langchain_language(lang_str)

Map a string language name to Langchain Language enum. Raises ValueError if not found.

Source code in src/splitter_mr/splitter/splitters/code_splitter.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
def get_langchain_language(lang_str: str) -> Language:
    """
    Map a string language name to Langchain Language enum.
    Raises ValueError if not found.
    """
    lookup = {lang.name.lower(): lang for lang in Language}
    key = lang_str.lower()
    if key not in lookup:
        raise ValueError(
            f"Unsupported language '{lang_str}'. Supported: {list(lookup.keys())}"
        )
    return lookup[key]

TokenSplitter

TokenSplitter

Bases: BaseSplitter

TokenSplitter splits a given text into chunks based on token counts derived from different tokenization models or libraries.

This splitter supports tokenization via tiktoken (OpenAI tokenizer), spacy (spaCy tokenizer), and nltk (NLTK tokenizer). It allows splitting text into chunks of a maximum number of tokens (chunk_size), using the specified tokenizer model.

Parameters:

Name Type Description Default
chunk_size int

Maximum number of tokens per chunk.

1000
model_name str

Specifies the tokenizer and model in the format tokenizer/model. Supported tokenizers are:

  • tiktoken/cl100k_base (OpenAI tokenizer via tiktoken)
  • spacy/en_core_web_sm (spaCy English model)
  • nltk/punkt_tab (NLTK Punkt tokenizer variant)
DEFAULT_TOKENIZER
language str

Language code for NLTK tokenizer (default "english").

DEFAULT_TOKEN_LANGUAGE
Notes

More info about the splitting methods by Tokens for Langchain: Langchain Docs.

Source code in src/splitter_mr/splitter/splitters/token_splitter.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
class TokenSplitter(BaseSplitter):
    """
    TokenSplitter splits a given text into chunks based on token counts
    derived from different tokenization models or libraries.

    This splitter supports tokenization via `tiktoken` (OpenAI tokenizer),
    `spacy` (spaCy tokenizer), and `nltk` (NLTK tokenizer). It allows splitting
    text into chunks of a maximum number of tokens (`chunk_size`), using the
    specified tokenizer model.

    Args:
        chunk_size (int): Maximum number of tokens per chunk.
        model_name (str): Specifies the tokenizer and model in the format `tokenizer/model`. Supported tokenizers are:

            - `tiktoken/cl100k_base` (OpenAI tokenizer via tiktoken)
            - `spacy/en_core_web_sm` (spaCy English model)
            - `nltk/punkt_tab` (NLTK Punkt tokenizer variant)

        language (str): Language code for NLTK tokenizer (default `"english"`).

    Notes:
        More info about the splitting methods by Tokens for Langchain:
        [Langchain Docs](https://python.langchain.com/docs/how_to/split_by_token/).
    """

    def __init__(
        self,
        chunk_size: int = 1000,
        model_name: str = DEFAULT_TOKENIZER,
        language: str = DEFAULT_TOKEN_LANGUAGE,
    ):
        super().__init__(chunk_size)
        # Use centralized defaults (already applied via signature) and keep on instance
        self.model_name = model_name or DEFAULT_TOKENIZER
        self.language = language or DEFAULT_TOKEN_LANGUAGE

    @staticmethod
    def list_nltk_punkt_languages():
        """Return a sorted list of available punkt models (languages) for NLTK."""
        models = set()
        for base in map(Path, nltk.data.path):
            punkt_dir = base / "tokenizers" / "punkt"
            if punkt_dir.exists():
                models.update(f.stem for f in punkt_dir.glob("*.pickle"))
        return sorted(models)

    def _parse_model(self) -> tuple[str, str]:
        """Parse `tokenizer/model` and validate the format."""
        if "/" not in self.model_name:
            raise ValueError(
                "model_name must be in the format 'tokenizer/model', "
                f"e.g. '{DEFAULT_TOKENIZER}'."
            )
        tokenizer, model = self.model_name.split("/", 1)
        return tokenizer, model

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Splits the input text from `reader_output` into token-based chunks using
        the specified tokenizer.

        Depending on `model_name`, the splitter chooses the appropriate tokenizer:

        - For `tiktoken`, uses `RecursiveCharacterTextSplitter` with tiktoken encoding.
            e.g.: `tiktoken/cl100k_base`.
        - For `spacy`, uses `SpacyTextSplitter` with the specified spaCy pipeline.
            e.g., `spacy/en_core_web_sm`.
        - For `nltk`, uses `NLTKTextSplitter` with the specified language tokenizer.
            e.g., `nltk/punkt_tab`.

        Automatically downloads spaCy and NLTK models if missing.

        Args:
            reader_output (Dict[str, Any]):
                Dictionary containing at least a 'text' key (str) and optional document metadata,
                such as 'document_name', 'document_path', 'document_id', etc.

        Returns:
            SplitterOutput: Dataclass defining the output structure for all splitters.

        Raises:
            RuntimeError: If a spaCy model specified in `model_name` is not available.
            ValueError: If an unsupported tokenizer is specified in `model_name`.
        """
        text = reader_output.text
        tokenizer, model = self._parse_model()

        if tokenizer == "tiktoken":
            # Validate against installed tiktoken encodings; hint with our common defaults
            available_models = tiktoken.list_encoding_names()
            if model not in available_models:
                raise ValueError(
                    f"tiktoken encoding '{model}' is not available. "
                    f"Available encodings include (subset): {TIKTOKEN_DEFAULTS}. "
                    f"Full list from tiktoken: {available_models}"
                )
            splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
                encoding_name=model,
                chunk_size=self.chunk_size,
                chunk_overlap=0,
            )

        elif tokenizer == "spacy":
            if not spacy.util.is_package(model):
                # Try to download; we surface our recommended list in the error if it fails
                try:
                    spacy.cli.download(model)
                except Exception as e:
                    raise RuntimeError(
                        f"spaCy model '{model}' is not available for download. "
                        f"Common models include: {SPACY_DEFAULTS}"
                    ) from e
            spacy.load(model)
            MAX_SAFE_LENGTH = 1_000_000
            if self.chunk_size > MAX_SAFE_LENGTH:
                warnings.warn(
                    "Too many characters: the v2.x parser and NER models require roughly "
                    "1GB of temporary memory per 100,000 characters in the input",
                    UserWarning,
                )
            splitter = SpacyTextSplitter(
                chunk_size=self.chunk_size,
                chunk_overlap=0,
                max_length=MAX_SAFE_LENGTH,
                pipeline=model,
            )

        elif tokenizer == "nltk":
            # Ensure punkt language is present; download our specified default model if missing
            try:
                nltk.data.find(f"tokenizers/punkt/{self.language}.pickle")
            except LookupError:
                # Use constants instead of hard-coded 'punkt_tab'
                nltk.download(NLTK_DEFAULTS[0])
            splitter = NLTKTextSplitter(
                chunk_size=self.chunk_size,
                chunk_overlap=0,
                language=self.language,
            )

        else:
            raise ValueError(
                f"Unsupported tokenizer '{tokenizer}'. Supported tokenizers: {SUPPORTED_TOKENIZERS}"
            )

        chunks = splitter.split_text(text)
        chunk_ids = self._generate_chunk_ids(len(chunks))
        metadata = self._default_metadata()

        return SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="token_splitter",
            split_params={
                "chunk_size": self.chunk_size,
                "model_name": self.model_name,  # keeps centralized default visible
                "language": self.language,  # keeps centralized default visible
            },
            metadata=metadata,
        )
list_nltk_punkt_languages() staticmethod

Return a sorted list of available punkt models (languages) for NLTK.

Source code in src/splitter_mr/splitter/splitters/token_splitter.py
61
62
63
64
65
66
67
68
69
@staticmethod
def list_nltk_punkt_languages():
    """Return a sorted list of available punkt models (languages) for NLTK."""
    models = set()
    for base in map(Path, nltk.data.path):
        punkt_dir = base / "tokenizers" / "punkt"
        if punkt_dir.exists():
            models.update(f.stem for f in punkt_dir.glob("*.pickle"))
    return sorted(models)
split(reader_output)

Splits the input text from reader_output into token-based chunks using the specified tokenizer.

Depending on model_name, the splitter chooses the appropriate tokenizer:

  • For tiktoken, uses RecursiveCharacterTextSplitter with tiktoken encoding. e.g.: tiktoken/cl100k_base.
  • For spacy, uses SpacyTextSplitter with the specified spaCy pipeline. e.g., spacy/en_core_web_sm.
  • For nltk, uses NLTKTextSplitter with the specified language tokenizer. e.g., nltk/punkt_tab.

Automatically downloads spaCy and NLTK models if missing.

Parameters:

Name Type Description Default
reader_output Dict[str, Any]

Dictionary containing at least a 'text' key (str) and optional document metadata, such as 'document_name', 'document_path', 'document_id', etc.

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

Dataclass defining the output structure for all splitters.

Raises:

Type Description
RuntimeError

If a spaCy model specified in model_name is not available.

ValueError

If an unsupported tokenizer is specified in model_name.

Source code in src/splitter_mr/splitter/splitters/token_splitter.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Splits the input text from `reader_output` into token-based chunks using
    the specified tokenizer.

    Depending on `model_name`, the splitter chooses the appropriate tokenizer:

    - For `tiktoken`, uses `RecursiveCharacterTextSplitter` with tiktoken encoding.
        e.g.: `tiktoken/cl100k_base`.
    - For `spacy`, uses `SpacyTextSplitter` with the specified spaCy pipeline.
        e.g., `spacy/en_core_web_sm`.
    - For `nltk`, uses `NLTKTextSplitter` with the specified language tokenizer.
        e.g., `nltk/punkt_tab`.

    Automatically downloads spaCy and NLTK models if missing.

    Args:
        reader_output (Dict[str, Any]):
            Dictionary containing at least a 'text' key (str) and optional document metadata,
            such as 'document_name', 'document_path', 'document_id', etc.

    Returns:
        SplitterOutput: Dataclass defining the output structure for all splitters.

    Raises:
        RuntimeError: If a spaCy model specified in `model_name` is not available.
        ValueError: If an unsupported tokenizer is specified in `model_name`.
    """
    text = reader_output.text
    tokenizer, model = self._parse_model()

    if tokenizer == "tiktoken":
        # Validate against installed tiktoken encodings; hint with our common defaults
        available_models = tiktoken.list_encoding_names()
        if model not in available_models:
            raise ValueError(
                f"tiktoken encoding '{model}' is not available. "
                f"Available encodings include (subset): {TIKTOKEN_DEFAULTS}. "
                f"Full list from tiktoken: {available_models}"
            )
        splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
            encoding_name=model,
            chunk_size=self.chunk_size,
            chunk_overlap=0,
        )

    elif tokenizer == "spacy":
        if not spacy.util.is_package(model):
            # Try to download; we surface our recommended list in the error if it fails
            try:
                spacy.cli.download(model)
            except Exception as e:
                raise RuntimeError(
                    f"spaCy model '{model}' is not available for download. "
                    f"Common models include: {SPACY_DEFAULTS}"
                ) from e
        spacy.load(model)
        MAX_SAFE_LENGTH = 1_000_000
        if self.chunk_size > MAX_SAFE_LENGTH:
            warnings.warn(
                "Too many characters: the v2.x parser and NER models require roughly "
                "1GB of temporary memory per 100,000 characters in the input",
                UserWarning,
            )
        splitter = SpacyTextSplitter(
            chunk_size=self.chunk_size,
            chunk_overlap=0,
            max_length=MAX_SAFE_LENGTH,
            pipeline=model,
        )

    elif tokenizer == "nltk":
        # Ensure punkt language is present; download our specified default model if missing
        try:
            nltk.data.find(f"tokenizers/punkt/{self.language}.pickle")
        except LookupError:
            # Use constants instead of hard-coded 'punkt_tab'
            nltk.download(NLTK_DEFAULTS[0])
        splitter = NLTKTextSplitter(
            chunk_size=self.chunk_size,
            chunk_overlap=0,
            language=self.language,
        )

    else:
        raise ValueError(
            f"Unsupported tokenizer '{tokenizer}'. Supported tokenizers: {SUPPORTED_TOKENIZERS}"
        )

    chunks = splitter.split_text(text)
    chunk_ids = self._generate_chunk_ids(len(chunks))
    metadata = self._default_metadata()

    return SplitterOutput(
        chunks=chunks,
        chunk_id=chunk_ids,
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="token_splitter",
        split_params={
            "chunk_size": self.chunk_size,
            "model_name": self.model_name,  # keeps centralized default visible
            "language": self.language,  # keeps centralized default visible
        },
        metadata=metadata,
    )

PagedSplitter

Splits text by pages for documents that have page structure. Each chunk contains a specified number of pages, with optional word overlap.

PagedSplitter

Bases: BaseSplitter

Splits a multi-page document into page-based or multi-page chunks using a placeholder marker.

Supports overlap in characters between consecutive chunks.

Parameters:

Name Type Description Default
chunk_size int

Number of pages per chunk.

1
chunk_overlap int

Number of overlapping characters to include from the end of the previous chunk.

0

Raises:

Type Description
ValueError

If chunk_size is less than 1.

Source code in src/splitter_mr/splitter/splitters/paged_splitter.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
class PagedSplitter(BaseSplitter):
    """
    Splits a multi-page document into page-based or multi-page chunks using a placeholder marker.

    Supports overlap in characters between consecutive chunks.

    Args:
        chunk_size (int): Number of pages per chunk.
        chunk_overlap (int): Number of overlapping characters to include from the end of the previous chunk.

    Raises:
        ValueError: If chunk_size is less than 1.
    """

    def __init__(self, chunk_size: int = 1, chunk_overlap: int = 0):
        """
        Args:
            chunk_size (int): Number of pages per chunk.
            chunk_overlap (int): Number of overlapping characters to include from the end of the previous chunk.
        """
        if chunk_size < 1:
            raise ValueError("chunk_size must be ≥ 1")
        if chunk_overlap < 0:
            raise ValueError("chunk_overlap must be ≥ 0")
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Splits the input text into chunks using the page_placeholder in the ReaderOutput.
        Optionally adds character overlap between chunks.

        Args:
            reader_output (ReaderOutput): The output from a reader containing text and metadata.

        Returns:
            SplitterOutput: The result with chunks and related metadata.

        Raises:
            ValueError: If the reader_output does not contain a valid page_placeholder.

        Example:
            ```python
            from splitter_mr.splitter import PagedSplitter

            reader_output = ReaderOutput(
                text: "<!-- page --> Page 1 <!-- page --> This is the page 2.",
                document_name: "test.md",
                document_path: "tmp/test.md",
                page_placeholder: "<!-- page -->",
                ...
            )
            splitter = PagedSplitter(chunk_size = 1)
            output = splitter.split(reader_output)
            print(output["chunks"])
            ```
            ```python
            [" Page 1 ", " This is the page 2."]
            ```
        """
        page_placeholder: str = reader_output.page_placeholder

        if not bool(page_placeholder):
            raise ValueError(
                "The specified file does not contain page placeholders. "
                "Please, use a compatible file extension (pdf, docx, xlsx, pptx) "
                "or read the file using any BaseReader by pages and try again"
            )

        # Split the document into pages using the placeholder.
        pages: List[str] = [
            page.strip()  # Normalize spacing
            for page in reader_output.text.split(page_placeholder)
            if page.strip()
        ]

        chunks: List[str] = []
        for i in range(0, len(pages), self.chunk_size):
            chunk = "\n".join(pages[i : i + self.chunk_size])
            if self.chunk_overlap > 0 and i > 0 and chunks:
                # Add character overlap from previous chunk
                overlap_text = chunks[-1][-self.chunk_overlap :]
                chunk = overlap_text + chunk
            chunks.append(chunk)

        # Generate chunk_id and append metadata
        chunk_ids = self._generate_chunk_ids(len(chunks))
        metadata = self._default_metadata()

        output = SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="paged_splitter",
            split_params={
                "chunk_size": self.chunk_size,
                "chunk_overlap": self.chunk_overlap,
            },
            metadata=metadata,
        )
        return output
__init__(chunk_size=1, chunk_overlap=0)

Parameters:

Name Type Description Default
chunk_size int

Number of pages per chunk.

1
chunk_overlap int

Number of overlapping characters to include from the end of the previous chunk.

0
Source code in src/splitter_mr/splitter/splitters/paged_splitter.py
21
22
23
24
25
26
27
28
29
30
31
32
def __init__(self, chunk_size: int = 1, chunk_overlap: int = 0):
    """
    Args:
        chunk_size (int): Number of pages per chunk.
        chunk_overlap (int): Number of overlapping characters to include from the end of the previous chunk.
    """
    if chunk_size < 1:
        raise ValueError("chunk_size must be ≥ 1")
    if chunk_overlap < 0:
        raise ValueError("chunk_overlap must be ≥ 0")
    self.chunk_size = chunk_size
    self.chunk_overlap = chunk_overlap
split(reader_output)

Splits the input text into chunks using the page_placeholder in the ReaderOutput. Optionally adds character overlap between chunks.

Parameters:

Name Type Description Default
reader_output ReaderOutput

The output from a reader containing text and metadata.

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

The result with chunks and related metadata.

Raises:

Type Description
ValueError

If the reader_output does not contain a valid page_placeholder.

Example

from splitter_mr.splitter import PagedSplitter

reader_output = ReaderOutput(
    text: "<!-- page --> Page 1 <!-- page --> This is the page 2.",
    document_name: "test.md",
    document_path: "tmp/test.md",
    page_placeholder: "<!-- page -->",
    ...
)
splitter = PagedSplitter(chunk_size = 1)
output = splitter.split(reader_output)
print(output["chunks"])
[" Page 1 ", " This is the page 2."]

Source code in src/splitter_mr/splitter/splitters/paged_splitter.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Splits the input text into chunks using the page_placeholder in the ReaderOutput.
    Optionally adds character overlap between chunks.

    Args:
        reader_output (ReaderOutput): The output from a reader containing text and metadata.

    Returns:
        SplitterOutput: The result with chunks and related metadata.

    Raises:
        ValueError: If the reader_output does not contain a valid page_placeholder.

    Example:
        ```python
        from splitter_mr.splitter import PagedSplitter

        reader_output = ReaderOutput(
            text: "<!-- page --> Page 1 <!-- page --> This is the page 2.",
            document_name: "test.md",
            document_path: "tmp/test.md",
            page_placeholder: "<!-- page -->",
            ...
        )
        splitter = PagedSplitter(chunk_size = 1)
        output = splitter.split(reader_output)
        print(output["chunks"])
        ```
        ```python
        [" Page 1 ", " This is the page 2."]
        ```
    """
    page_placeholder: str = reader_output.page_placeholder

    if not bool(page_placeholder):
        raise ValueError(
            "The specified file does not contain page placeholders. "
            "Please, use a compatible file extension (pdf, docx, xlsx, pptx) "
            "or read the file using any BaseReader by pages and try again"
        )

    # Split the document into pages using the placeholder.
    pages: List[str] = [
        page.strip()  # Normalize spacing
        for page in reader_output.text.split(page_placeholder)
        if page.strip()
    ]

    chunks: List[str] = []
    for i in range(0, len(pages), self.chunk_size):
        chunk = "\n".join(pages[i : i + self.chunk_size])
        if self.chunk_overlap > 0 and i > 0 and chunks:
            # Add character overlap from previous chunk
            overlap_text = chunks[-1][-self.chunk_overlap :]
            chunk = overlap_text + chunk
        chunks.append(chunk)

    # Generate chunk_id and append metadata
    chunk_ids = self._generate_chunk_ids(len(chunks))
    metadata = self._default_metadata()

    output = SplitterOutput(
        chunks=chunks,
        chunk_id=chunk_ids,
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="paged_splitter",
        split_params={
            "chunk_size": self.chunk_size,
            "chunk_overlap": self.chunk_overlap,
        },
        metadata=metadata,
    )
    return output

SemanticSplitter

Splits text into chunks based on semantic similarity, using an embedding model and a max tokens parameter. Useful for meaningful semantic groupings.

SemanticSplitter

Bases: BaseSplitter

Split text into semantically coherent chunks using embedding similarity.

Pipeline:

  • Split text into sentences via SentenceSplitter (one sentence chunks).
  • Build a sliding window around each sentence (buffer_size).
  • Embed each window with BaseEmbedding (batched).
  • Compute cosine distances between consecutive windows (1 - cosine_sim).
  • Pick breakpoints using a thresholding strategy, or aim for number_of_chunks.
  • Join sentences between breakpoints; enforce minimum size via chunk_size.
Source code in src/splitter_mr/splitter/splitters/semantic_splitter.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
class SemanticSplitter(BaseSplitter):
    """
    Split text into semantically coherent chunks using embedding similarity.

    **Pipeline:**

    - Split text into sentences via `SentenceSplitter` (one sentence chunks).
    - Build a sliding window around each sentence (`buffer_size`).
    - Embed each window with `BaseEmbedding` (batched).
    - Compute cosine *distances* between consecutive windows (1 - cosine_sim).
    - Pick breakpoints using a thresholding strategy, or aim for `number_of_chunks`.
    - Join sentences between breakpoints; enforce minimum size via `chunk_size`.
    """

    def __init__(
        self,
        embedding: BaseEmbedding,
        *,
        buffer_size: int = 1,
        breakpoint_threshold_type: BreakpointThresholdType = "percentile",
        breakpoint_threshold_amount: Optional[float] = None,
        number_of_chunks: Optional[int] = None,
        chunk_size: int = 1000,
    ) -> None:
        """Initialize the semantic splitter.

        Args:
            embedding (BaseEmbedding): Embedding backend.
            buffer_size (int): Neighbor window size around each sentence.
            breakpoint_threshold_type (BreakpointThresholdType): Threshold strategy:
                "percentile" | "standard_deviation" | "interquartile" | "gradient".
            breakpoint_threshold_amount (Optional[float]): Threshold parameter. If None,
                uses sensible defaults per strategy (e.g., 95th percentile).
            number_of_chunks (Optional[int]): If set, pick a threshold that
                approximately yields this number of chunks (inverse percentile).
            chunk_size (int): **Minimum** characters required to emit a chunk.
        """
        super().__init__(chunk_size=chunk_size)
        self.embedding = embedding
        self.buffer_size = int(buffer_size)
        self.breakpoint_threshold_type = cast(
            BreakpointThresholdType, breakpoint_threshold_type
        )
        self.breakpoint_threshold_amount = (
            DEFAULT_BREAKPOINTS[self.breakpoint_threshold_type]
            if breakpoint_threshold_amount is None
            else float(breakpoint_threshold_amount)
        )
        self.number_of_chunks = number_of_chunks
        self._sentence_splitter = SentenceSplitter(
            chunk_size=1, chunk_overlap=0, separators=[".", "!", "?"]
        )

    # ---------- Helpers ----------

    def _split_into_sentences(self, reader_output: ReaderOutput) -> List[str]:
        """Split the input text into sentences using `SentenceSplitter` (no overlap).

        Args:
            reader_output (ReaderOutput): The document to split.

        Returns:
            List[str]: List of sentences preserving punctuation.
        """
        sent_out = self._sentence_splitter.split(reader_output)
        return sent_out.chunks

    def _calculate_sentence_distances(
        self, single_sentences: List[str]
    ) -> Tuple[List[float], List[Dict[str, Any]]]:
        """Embed sentence windows (batch) and compute consecutive cosine distances.

        Args:
            single_sentences (List[str]): Sentences in order.

        Returns:
            Tuple[List[float], List[Dict[str, Any]]]:
                - distances between consecutive windows (len = n-1)
                - sentence dicts enriched with combined text and embeddings
        """
        # Prepare sentence dicts and combine with buffer
        sentences = [
            {"sentence": s, "index": i} for i, s in enumerate(single_sentences)
        ]
        sentences = _combine_sentences(sentences, self.buffer_size)

        # Batch embed all combined sentences
        windows = [item["combined_sentence"] for item in sentences]
        embeddings = self.embedding.embed_documents(windows)

        for item, emb in zip(sentences, embeddings):
            item["combined_sentence_embedding"] = emb

        # Distances (1 - cosine similarity) between consecutive windows
        n = len(sentences)
        if n <= 1:
            return [], sentences

        distances: List[float] = []
        for i in range(n - 1):
            sim = _cosine_similaritynp(
                sentences[i]["combined_sentence_embedding"],
                sentences[i + 1]["combined_sentence_embedding"],
            )
            dist = 1.0 - sim
            distances.append(dist)
            sentences[i]["distance_to_next"] = dist

        return distances, sentences

    def _threshold_from_clusters(self, distances: List[float]) -> float:
        """Estimate a percentile threshold to reach `number_of_chunks`.

        Maps desired chunks x∈[1, len(distances)] to percentile y∈[100, 0].

        Args:
            distances (List[float]): Consecutive distances.

        Returns:
            float: Threshold value as a percentile over `distances`.
        """
        assert self.number_of_chunks is not None
        x1, y1 = float(len(distances)), 0.0
        x2, y2 = 1.0, 100.0
        x = max(min(float(self.number_of_chunks), x1), x2)
        y = y1 + ((y2 - y1) / (x2 - x1)) * (x - x1) if x2 != x1 else y2
        y = float(np.clip(y, 0.0, 100.0))
        return float(np.percentile(distances, y)) if distances else 0.0

    def _calculate_breakpoint_threshold(
        self, distances: List[float]
    ) -> Tuple[float, List[float]]:
        """Compute the breakpoint threshold and reference array per selected strategy.

        Args:
            distances (List[float]): Consecutive distances between windows.

        Returns:
            Tuple[float, List[float]]: (threshold, reference_array)
                If strategy == "gradient", reference_array is the gradient;
                otherwise it's `distances`.
        """
        if not distances:
            return 0.0, distances

        if self.breakpoint_threshold_type == "percentile":
            return (
                float(np.percentile(distances, self.breakpoint_threshold_amount)),
                distances,
            )

        if self.breakpoint_threshold_type == "standard_deviation":
            mu = float(np.mean(distances))
            sd = float(np.std(distances))
            return mu + self.breakpoint_threshold_amount * sd, distances

        if self.breakpoint_threshold_type == "interquartile":
            q1, q3 = np.percentile(distances, [25.0, 75.0])
            iqr = float(q3 - q1)
            mu = float(np.mean(distances))
            return mu + self.breakpoint_threshold_amount * iqr, distances

        if self.breakpoint_threshold_type == "gradient":
            grads = np.gradient(np.asarray(distances, dtype=np.float64)).tolist()
            thr = float(np.percentile(grads, self.breakpoint_threshold_amount))
            return thr, grads  # use gradient array as the reference

        raise ValueError(
            f"Unexpected breakpoint_threshold_type: {self.breakpoint_threshold_type}"
        )

    # ---------- Public API ----------

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """Split the document text into semantically coherent chunks.

        Args:
            reader_output (ReaderOutput): The document text & metadata.

        Returns:
            SplitterOutput: Chunks, IDs, metadata, and splitter configuration.

        Notes:
            - With 1 sentence (or 2 in gradient mode), returns the text/sentences as-is.
            - Chunks shorter than `chunk_size` (minimum) are skipped and merged forward.
            - `chunk_size` behaves as the *minimum* chunk size in this splitter.
        """
        text = reader_output.text
        if text == "" or text is None:
            raise ValueError("No text has been provided")

        amt = self.breakpoint_threshold_amount
        if (
            self.breakpoint_threshold_type in ("percentile", "gradient")
            and 0.0 < amt <= 1.0  # noqa: W503
        ):
            self.breakpoint_threshold_amount = amt * 100.0

        sentences = self._split_into_sentences(reader_output)

        # Edge cases where thresholds aren't meaningful
        if len(sentences) <= 1:
            chunks = sentences if sentences else [text]
        elif self.breakpoint_threshold_type == "gradient" and len(sentences) == 2:
            chunks = sentences
        else:
            distances, sentence_dicts = self._calculate_sentence_distances(sentences)

            if self.number_of_chunks is not None and distances:
                # Pick top (k-1) distances as breakpoints
                k = int(self.number_of_chunks)
                m = max(0, min(k - 1, len(distances)))  # number of cuts to make
                if m == 0:
                    indices_above = []  # single chunk
                else:
                    # indices of the m largest distances (breaks), sorted in ascending order
                    idxs = np.argsort(np.asarray(distances))[-m:]
                    indices_above = sorted(int(i) for i in idxs.tolist())
            else:
                threshold, ref_array = self._calculate_breakpoint_threshold(distances)
                indices_above = [
                    i for i, val in enumerate(ref_array) if val > threshold
                ]

            chunks: List[str] = []
            start_idx = 0

            for idx in indices_above:
                end = idx + 1  # inclusive slice end
                candidate = " ".join(
                    d["sentence"] for d in sentence_dicts[start_idx:end]
                ).strip()
                if len(candidate) < self.chunk_size:
                    # too small: keep accumulating (do NOT move start_idx)
                    continue
                chunks.append(candidate)
                start_idx = end

            # Tail (always emit whatever remains)
            if start_idx < len(sentence_dicts):
                tail = " ".join(
                    d["sentence"] for d in sentence_dicts[start_idx:]
                ).strip()
                if tail:
                    chunks.append(tail)

            if not chunks:
                chunks = [" ".join(sentences).strip() or (reader_output.text or "")]

        # IDs & metadata
        chunk_ids = self._generate_chunk_ids(len(chunks))
        metadata = self._default_metadata()
        model_name = getattr(self.embedding, "model_name", None)

        return SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="semantic_splitter",
            split_params={
                "buffer_size": self.buffer_size,
                "breakpoint_threshold_type": self.breakpoint_threshold_type,
                "breakpoint_threshold_amount": self.breakpoint_threshold_amount,
                "number_of_chunks": self.number_of_chunks,
                "chunk_size": self.chunk_size,
                "model_name": model_name,
            },
            metadata=metadata,
        )
__init__(embedding, *, buffer_size=1, breakpoint_threshold_type='percentile', breakpoint_threshold_amount=None, number_of_chunks=None, chunk_size=1000)

Initialize the semantic splitter.

Parameters:

Name Type Description Default
embedding BaseEmbedding

Embedding backend.

required
buffer_size int

Neighbor window size around each sentence.

1
breakpoint_threshold_type BreakpointThresholdType

Threshold strategy: "percentile" | "standard_deviation" | "interquartile" | "gradient".

'percentile'
breakpoint_threshold_amount Optional[float]

Threshold parameter. If None, uses sensible defaults per strategy (e.g., 95th percentile).

None
number_of_chunks Optional[int]

If set, pick a threshold that approximately yields this number of chunks (inverse percentile).

None
chunk_size int

Minimum characters required to emit a chunk.

1000
Source code in src/splitter_mr/splitter/splitters/semantic_splitter.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def __init__(
    self,
    embedding: BaseEmbedding,
    *,
    buffer_size: int = 1,
    breakpoint_threshold_type: BreakpointThresholdType = "percentile",
    breakpoint_threshold_amount: Optional[float] = None,
    number_of_chunks: Optional[int] = None,
    chunk_size: int = 1000,
) -> None:
    """Initialize the semantic splitter.

    Args:
        embedding (BaseEmbedding): Embedding backend.
        buffer_size (int): Neighbor window size around each sentence.
        breakpoint_threshold_type (BreakpointThresholdType): Threshold strategy:
            "percentile" | "standard_deviation" | "interquartile" | "gradient".
        breakpoint_threshold_amount (Optional[float]): Threshold parameter. If None,
            uses sensible defaults per strategy (e.g., 95th percentile).
        number_of_chunks (Optional[int]): If set, pick a threshold that
            approximately yields this number of chunks (inverse percentile).
        chunk_size (int): **Minimum** characters required to emit a chunk.
    """
    super().__init__(chunk_size=chunk_size)
    self.embedding = embedding
    self.buffer_size = int(buffer_size)
    self.breakpoint_threshold_type = cast(
        BreakpointThresholdType, breakpoint_threshold_type
    )
    self.breakpoint_threshold_amount = (
        DEFAULT_BREAKPOINTS[self.breakpoint_threshold_type]
        if breakpoint_threshold_amount is None
        else float(breakpoint_threshold_amount)
    )
    self.number_of_chunks = number_of_chunks
    self._sentence_splitter = SentenceSplitter(
        chunk_size=1, chunk_overlap=0, separators=[".", "!", "?"]
    )
split(reader_output)

Split the document text into semantically coherent chunks.

Parameters:

Name Type Description Default
reader_output ReaderOutput

The document text & metadata.

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

Chunks, IDs, metadata, and splitter configuration.

Notes
  • With 1 sentence (or 2 in gradient mode), returns the text/sentences as-is.
  • Chunks shorter than chunk_size (minimum) are skipped and merged forward.
  • chunk_size behaves as the minimum chunk size in this splitter.
Source code in src/splitter_mr/splitter/splitters/semantic_splitter.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """Split the document text into semantically coherent chunks.

    Args:
        reader_output (ReaderOutput): The document text & metadata.

    Returns:
        SplitterOutput: Chunks, IDs, metadata, and splitter configuration.

    Notes:
        - With 1 sentence (or 2 in gradient mode), returns the text/sentences as-is.
        - Chunks shorter than `chunk_size` (minimum) are skipped and merged forward.
        - `chunk_size` behaves as the *minimum* chunk size in this splitter.
    """
    text = reader_output.text
    if text == "" or text is None:
        raise ValueError("No text has been provided")

    amt = self.breakpoint_threshold_amount
    if (
        self.breakpoint_threshold_type in ("percentile", "gradient")
        and 0.0 < amt <= 1.0  # noqa: W503
    ):
        self.breakpoint_threshold_amount = amt * 100.0

    sentences = self._split_into_sentences(reader_output)

    # Edge cases where thresholds aren't meaningful
    if len(sentences) <= 1:
        chunks = sentences if sentences else [text]
    elif self.breakpoint_threshold_type == "gradient" and len(sentences) == 2:
        chunks = sentences
    else:
        distances, sentence_dicts = self._calculate_sentence_distances(sentences)

        if self.number_of_chunks is not None and distances:
            # Pick top (k-1) distances as breakpoints
            k = int(self.number_of_chunks)
            m = max(0, min(k - 1, len(distances)))  # number of cuts to make
            if m == 0:
                indices_above = []  # single chunk
            else:
                # indices of the m largest distances (breaks), sorted in ascending order
                idxs = np.argsort(np.asarray(distances))[-m:]
                indices_above = sorted(int(i) for i in idxs.tolist())
        else:
            threshold, ref_array = self._calculate_breakpoint_threshold(distances)
            indices_above = [
                i for i, val in enumerate(ref_array) if val > threshold
            ]

        chunks: List[str] = []
        start_idx = 0

        for idx in indices_above:
            end = idx + 1  # inclusive slice end
            candidate = " ".join(
                d["sentence"] for d in sentence_dicts[start_idx:end]
            ).strip()
            if len(candidate) < self.chunk_size:
                # too small: keep accumulating (do NOT move start_idx)
                continue
            chunks.append(candidate)
            start_idx = end

        # Tail (always emit whatever remains)
        if start_idx < len(sentence_dicts):
            tail = " ".join(
                d["sentence"] for d in sentence_dicts[start_idx:]
            ).strip()
            if tail:
                chunks.append(tail)

        if not chunks:
            chunks = [" ".join(sentences).strip() or (reader_output.text or "")]

    # IDs & metadata
    chunk_ids = self._generate_chunk_ids(len(chunks))
    metadata = self._default_metadata()
    model_name = getattr(self.embedding, "model_name", None)

    return SplitterOutput(
        chunks=chunks,
        chunk_id=chunk_ids,
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="semantic_splitter",
        split_params={
            "buffer_size": self.buffer_size,
            "breakpoint_threshold_type": self.breakpoint_threshold_type,
            "breakpoint_threshold_amount": self.breakpoint_threshold_amount,
            "number_of_chunks": self.number_of_chunks,
            "chunk_size": self.chunk_size,
            "model_name": model_name,
        },
        metadata=metadata,
    )