Skip to content

Splitter

Introduction

The Splitter component implements the main functionality of this library. This component is designed to deliver classes (inherited from BaseSplitter) which supports to split a markdown text or a string following many different strategies.

Splitter strategies description

Splitting Technique Description
Character Splitter Splits text into chunks based on a specified number of characters. Supports overlapping by character count or percentage.
Main Parameters: chunk_size (max chars per chunk), chunk_overlap (overlapping chars: int or %).
Compatible with: Text.
Word Splitter Splits text into chunks based on a specified number of words. Supports overlapping by word count or percentage.
Main Parameters: chunk_size (max words per chunk), chunk_overlap (overlapping words: int or %).
Compatible with: Text.
Sentence Splitter Splits text into chunks by a specified number of sentences. Allows overlap defined by a number or percentage of words from the end of the previous chunk. Customizable sentence separators (e.g., ., !, ?).
Main Parameters: chunk_size (max sentences per chunk), chunk_overlap (overlapping words: int or %), sentence_separators (list of characters).
Compatible with: Text.
Paragraph Splitter Splits text into chunks based on a specified number of paragraphs. Allows overlapping by word count or percentage, and customizable line breaks.
Main Parameters: chunk_size (max paragraphs per chunk), chunk_overlap (overlapping words: int or %), line_break (delimiter(s) for paragraphs).
Compatible with: Text.
Recursive Character Splitter Recursively splits text based on a hierarchy of separators (e.g., paragraph, sentence, word, character) until chunks reach a target size. Tries to preserve semantic units as long as possible.
Main Parameters: chunk_size (max chars per chunk), chunk_overlap (overlapping chars), separators (list of characters to split on, e.g., ["\n\n", "\n", " ", ""]).
Compatible with: Text.
Token Splitter Splits text into chunks based on the number of tokens, using various tokenization models (e.g., tiktoken, spaCy, NLTK). Useful for ensuring chunks are compatible with LLM context limits.
Main Parameters: chunk_size (max tokens per chunk), model_name (tokenizer/model, e.g., "tiktoken/cl100k_base", "spacy/en_core_web_sm", "nltk/punkt"), language (for NLTK).
Compatible with: Text.
Paged Splitter WORK IN PROGRESS. Splits text by pages for documents that have page structure. Each chunk contains a specified number of pages, with optional word overlap.
Main Parameters: num_pages (pages per chunk), chunk_overlap (overlapping words).
Compatible with: Word, PDF, Excel, PowerPoint.
Row/Column Splitter For tabular formats, splits data by a set number of rows or columns per chunk, with possible overlap. Row-based and column-based splitting are mutually exclusive.
Main Parameters: num_rows, num_cols (rows/columns per chunk), overlap (overlapping rows or columns).
Compatible with: Tabular formats (csv, tsv, parquet, flat json).
JSON Recursive Splitter Recursively splits JSON documents into smaller sub-structures that preserve the original JSON schema.
Main Parameters: max_chunk_size (max chars per chunk), min_chunk_size (min chars per chunk).
Compatible with: JSON.
Semantic Splitter WORK IN PROGRESS. Splits text into chunks based on semantic similarity, using an embedding model and a max tokens parameter. Useful for meaningful semantic groupings.
Main Parameters: embedding_model (model for embeddings), max_tokens (max tokens per chunk).
Compatible with: Text.
HTMLTagSplitter Splits HTML content based on a specified tag, or automatically detects the most frequent and shallowest tag if not specified. Each chunk is a complete HTML fragment for that tag.
Main Parameters: chunk_size (max chars per chunk), tag (HTML tag to split on, optional).
Compatible with: HTML.
HeaderSplitter Splits Markdown or HTML documents into chunks using header levels (e.g., #, ##, or <h1>, <h2>). Uses configurable headers for chunking.
Main Parameters: headers_to_split_on (list of headers and semantic names), chunk_size (unused, for compatibility).
Compatible with: Markdown, HTML.
Code Splitter Splits source code files into programmatically meaningful chunks (functions, classes, methods, etc.), aware of the syntax of the specified programming language (e.g., Python, Java, Kotlin). Uses language-aware logic to avoid splitting inside code blocks.
Main Parameters: chunk_size (max chars per chunk), language (programming language as string, e.g., "python", "java").
Compatible with: Source code files (Python, Java, Kotlin, C++, JavaScript, Go, etc.).
Token Splitter Splits text files into LLM-aware minimal semantic units (tokens). You can use nltk, spacy and tiktoken as tokenizers.
Main Parameters: chunk_size (max chars per chunk), model (e.g., nltk/punkt, tiktoken/cl100k, etc.), language (e.g., "spanish", "english").
Compatible with: Text.

Warning

PagedSplitter amd Semantic Splitter are not fully implemented yet. Stay aware to updates!

Output format

Dataclass defining the output structure for all splitters.

Source code in src/splitter_mr/schema/schemas.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@dataclass
class SplitterOutput:
    """
    Dataclass defining the output structure for all splitters.
    """

    chunks: List[str]
    chunk_id: List[str]
    document_name: Optional[str] = None
    document_path: str = ""
    document_id: Optional[str] = None
    conversion_method: Optional[str] = None
    reader_method: Optional[str] = None
    ocr_method: Optional[str] = None
    split_method: str = ""
    split_params: Optional[Dict[str, Any]] = field(default_factory=dict)
    metadata: Optional[Dict[str, Any]] = field(default_factory=dict)

Splitters

BaseSplitter

BaseSplitter

Bases: ABC

Abstract base class for all splitter implementations.

This class defines the common interface and utility methods for splitters that divide text or data into smaller chunks, typically for downstream natural language processing tasks or information retrieval. Subclasses should implement the split method, which takes in a dictionary (typically from a document reader) and returns a structured output with the required chunking.

Attributes:

Name Type Description
chunk_size int

The maximum number of units (e.g., characters, words, etc.) per chunk.

Methods:

Name Description
split

Abstract method. Should be implemented by all subclasses to perform the actual splitting logic.

_generate_chunk_ids

Generates a list of unique chunk IDs using UUID4, for use in the output.

_default_metadata

Returns a default (empty) metadata dictionary, which can be extended by subclasses.

Source code in src/splitter_mr/splitter/base_splitter.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class BaseSplitter(ABC):
    """
    Abstract base class for all splitter implementations.

    This class defines the common interface and utility methods for splitters that
    divide text or data into smaller chunks, typically for downstream natural language
    processing tasks or information retrieval. Subclasses should implement the `split`
    method, which takes in a dictionary (typically from a document reader) and returns
    a structured output with the required chunking.

    Attributes:
        chunk_size (int): The maximum number of units (e.g., characters, words, etc.) per chunk.

    Methods:
        split: Abstract method. Should be implemented by all subclasses to perform the actual
            splitting logic.

        _generate_chunk_ids: Generates a list of unique chunk IDs using UUID4, for use in the output.

        _default_metadata: Returns a default (empty) metadata dictionary, which can be extended by subclasses.
    """

    def __init__(self, chunk_size: int = 1000):
        self.chunk_size = chunk_size

    @abstractmethod
    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Abstract method to split input data into chunks.

        Args:
            reader_output (ReaderOutput): Input data, typically from a document reader,
                including the text to split and any relevant metadata.

        Returns:
            SplitterOutput: A dictionary containing split chunks and associated metadata.
        """
        pass

    def _generate_chunk_ids(self, num_chunks: int) -> List[str]:
        """
        Generate a list of unique chunk identifiers.

        Args:
            num_chunks (int): Number of chunk IDs to generate.

        Returns:
            List[str]: List of unique string IDs (UUID4).
        """
        return [str(uuid.uuid4()) for _ in range(num_chunks)]

    def _default_metadata(self) -> dict:
        """
        Return a default metadata dictionary.

        Returns:
            dict: An empty dictionary; subclasses may override to provide additional metadata.
        """
        return {}
split(reader_output) abstractmethod

Abstract method to split input data into chunks.

Parameters:

Name Type Description Default
reader_output ReaderOutput

Input data, typically from a document reader, including the text to split and any relevant metadata.

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

A dictionary containing split chunks and associated metadata.

Source code in src/splitter_mr/splitter/base_splitter.py
33
34
35
36
37
38
39
40
41
42
43
44
45
@abstractmethod
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Abstract method to split input data into chunks.

    Args:
        reader_output (ReaderOutput): Input data, typically from a document reader,
            including the text to split and any relevant metadata.

    Returns:
        SplitterOutput: A dictionary containing split chunks and associated metadata.
    """
    pass

CharacterSplitter

CharacterSplitter

Bases: BaseSplitter

CharacterSplitter splits a given text into overlapping or non-overlapping chunks based on a specified number of characters per chunk.

This splitter is configurable with a maximum chunk size (chunk_size) and an overlap between consecutive chunks (chunk_overlap). The overlap can be specified either as an integer (number of characters) or as a float between 0 and 1 (fraction of chunk size). This is particularly useful for downstream NLP tasks where context preservation between chunks is important.

Parameters:

Name Type Description Default
chunk_size int

Maximum number of characters per chunk.

1000
chunk_overlap Union[int, float]

Number or percentage of overlapping characters between chunks.

0
Source code in src/splitter_mr/splitter/splitters/character_splitter.py
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class CharacterSplitter(BaseSplitter):
    """
    CharacterSplitter splits a given text into overlapping or non-overlapping chunks
    based on a specified number of characters per chunk.

    This splitter is configurable with a maximum chunk size (`chunk_size`) and an overlap
    between consecutive chunks (`chunk_overlap`). The overlap can be specified either as
    an integer (number of characters) or as a float between 0 and 1 (fraction of chunk size).
    This is particularly useful for downstream NLP tasks where context preservation between
    chunks is important.

    Args:
        chunk_size (int): Maximum number of characters per chunk.
        chunk_overlap (Union[int, float]): Number or percentage of overlapping characters
            between chunks.
    """

    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 0):
        super().__init__(chunk_size)
        self.chunk_overlap = chunk_overlap

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Splits the input text from the reader_output dictionary into character-based chunks.

        Each chunk contains at most `chunk_size` characters, and adjacent chunks can overlap
        by a specified number or percentage of characters, according to the `chunk_overlap`
        parameter set at initialization. Returns a dictionary with the same document metadata,
        unique chunk identifiers, and the split parameters used.

        Args:
            reader_output (Dict[str, Any]):
                Dictionary containing at least a 'text' key (str) and optional document metadata
                (e.g., 'document_name', 'document_path', etc.).

        Returns:
            SplitterOutput: Dataclass defining the output structure for all splitters.

        Raises:
            ValueError: If chunk_overlap is greater than or equal to chunk_size.

        Example:
            ```python
            from splitter_mr.splitter import CharacterSplitter

            # This dictionary has been obtained as the output from a Reader object.
            reader_output = ReaderOutput(
                text: "abcdefghijklmnopqrstuvwxyz",
                document_name: "doc.txt",
                document_path: "/path/doc.txt",
            )
            splitter = CharacterSplitter(chunk_size=5, chunk_overlap=2)
            output = splitter.split(reader_output)
            print(output["chunks"])
            ```
            ```python
            ['abcde', 'defgh', 'ghijk', ..., 'yz']
            ```
        """
        # Initialize variables
        text = reader_output.text
        chunk_size = self.chunk_size

        # Determine overlap in characters
        if isinstance(self.chunk_overlap, float) and 0 <= self.chunk_overlap < 1:
            overlap = int(chunk_size * self.chunk_overlap)
        else:
            overlap = int(self.chunk_overlap)
        if overlap >= chunk_size:
            raise ValueError("chunk_overlap must be smaller than chunk_size")

        # Split into chunks
        chunks = []
        start = 0
        while start < len(text):
            end = start + chunk_size
            chunks.append(text[start:end])
            start += chunk_size - overlap if (chunk_size - overlap) > 0 else 1

        # Generate chunk_id and append metadata
        chunk_ids = self._generate_chunk_ids(len(chunks))
        metadata = self._default_metadata()

        # Return output
        output = SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="character_splitter",
            split_params={
                "chunk_size": chunk_size,
                "chunk_overlap": self.chunk_overlap,
            },
            metadata=metadata,
        )
        return output
split(reader_output)

Splits the input text from the reader_output dictionary into character-based chunks.

Each chunk contains at most chunk_size characters, and adjacent chunks can overlap by a specified number or percentage of characters, according to the chunk_overlap parameter set at initialization. Returns a dictionary with the same document metadata, unique chunk identifiers, and the split parameters used.

Parameters:

Name Type Description Default
reader_output Dict[str, Any]

Dictionary containing at least a 'text' key (str) and optional document metadata (e.g., 'document_name', 'document_path', etc.).

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

Dataclass defining the output structure for all splitters.

Raises:

Type Description
ValueError

If chunk_overlap is greater than or equal to chunk_size.

Example

from splitter_mr.splitter import CharacterSplitter

# This dictionary has been obtained as the output from a Reader object.
reader_output = ReaderOutput(
    text: "abcdefghijklmnopqrstuvwxyz",
    document_name: "doc.txt",
    document_path: "/path/doc.txt",
)
splitter = CharacterSplitter(chunk_size=5, chunk_overlap=2)
output = splitter.split(reader_output)
print(output["chunks"])
['abcde', 'defgh', 'ghijk', ..., 'yz']

Source code in src/splitter_mr/splitter/splitters/character_splitter.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Splits the input text from the reader_output dictionary into character-based chunks.

    Each chunk contains at most `chunk_size` characters, and adjacent chunks can overlap
    by a specified number or percentage of characters, according to the `chunk_overlap`
    parameter set at initialization. Returns a dictionary with the same document metadata,
    unique chunk identifiers, and the split parameters used.

    Args:
        reader_output (Dict[str, Any]):
            Dictionary containing at least a 'text' key (str) and optional document metadata
            (e.g., 'document_name', 'document_path', etc.).

    Returns:
        SplitterOutput: Dataclass defining the output structure for all splitters.

    Raises:
        ValueError: If chunk_overlap is greater than or equal to chunk_size.

    Example:
        ```python
        from splitter_mr.splitter import CharacterSplitter

        # This dictionary has been obtained as the output from a Reader object.
        reader_output = ReaderOutput(
            text: "abcdefghijklmnopqrstuvwxyz",
            document_name: "doc.txt",
            document_path: "/path/doc.txt",
        )
        splitter = CharacterSplitter(chunk_size=5, chunk_overlap=2)
        output = splitter.split(reader_output)
        print(output["chunks"])
        ```
        ```python
        ['abcde', 'defgh', 'ghijk', ..., 'yz']
        ```
    """
    # Initialize variables
    text = reader_output.text
    chunk_size = self.chunk_size

    # Determine overlap in characters
    if isinstance(self.chunk_overlap, float) and 0 <= self.chunk_overlap < 1:
        overlap = int(chunk_size * self.chunk_overlap)
    else:
        overlap = int(self.chunk_overlap)
    if overlap >= chunk_size:
        raise ValueError("chunk_overlap must be smaller than chunk_size")

    # Split into chunks
    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunks.append(text[start:end])
        start += chunk_size - overlap if (chunk_size - overlap) > 0 else 1

    # Generate chunk_id and append metadata
    chunk_ids = self._generate_chunk_ids(len(chunks))
    metadata = self._default_metadata()

    # Return output
    output = SplitterOutput(
        chunks=chunks,
        chunk_id=chunk_ids,
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="character_splitter",
        split_params={
            "chunk_size": chunk_size,
            "chunk_overlap": self.chunk_overlap,
        },
        metadata=metadata,
    )
    return output

WordSplitter

WordSplitter

Bases: BaseSplitter

WordSplitter splits a given text into overlapping or non-overlapping chunks based on a specified number of words per chunk.

This splitter is configurable with a maximum chunk size (chunk_size, in words) and an overlap between consecutive chunks (chunk_overlap). The overlap can be specified either as an integer (number of words) or as a float between 0 and 1 (fraction of chunk size). Useful for NLP tasks where word-based boundaries are important for context preservation.

Parameters:

Name Type Description Default
chunk_size int

Maximum number of words per chunk.

5
chunk_overlap Union[int, float]

Number or percentage of overlapping words between chunks.

0
Source code in src/splitter_mr/splitter/splitters/word_splitter.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class WordSplitter(BaseSplitter):
    """
    WordSplitter splits a given text into overlapping or non-overlapping chunks
    based on a specified number of words per chunk.

    This splitter is configurable with a maximum chunk size (`chunk_size`, in words)
    and an overlap between consecutive chunks (`chunk_overlap`). The overlap can be
    specified either as an integer (number of words) or as a float between 0 and 1
    (fraction of chunk size). Useful for NLP tasks where word-based boundaries are
    important for context preservation.

    Args:
        chunk_size (int): Maximum number of words per chunk.
        chunk_overlap (Union[int, float]): Number or percentage of overlapping words between chunks.
    """

    def __init__(self, chunk_size: int = 5, chunk_overlap: Union[int, float] = 0):
        super().__init__(chunk_size)
        self.chunk_overlap = chunk_overlap

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Splits the input text from the reader_output dictionary into word-based chunks.

        Each chunk contains at most `chunk_size` words, and adjacent chunks can overlap
        by a specified number or percentage of words, according to the `chunk_overlap`
        parameter set at initialization.

        Args:
            reader_output (Dict[str, Any]):
                Dictionary containing at least a 'text' key (str) and optional document metadata
                (e.g., 'document_name', 'document_path', etc.).

        Returns:
            SplitterOutput: Dataclass defining the output structure for all splitters.

        Raises:
            ValueError: If chunk_overlap is greater than or equal to chunk_size.

        Example:
            ```python
            from splitter_mr.splitter import WordSplitter

            reader_output = ReaderOutput(
                text: "The quick brown fox jumps over the lazy dog. Pack my box with five dozen liquor jugs. Sphinx of black quartz, judge my vow.",
                document_name: "pangrams.txt",
                document_path: "/https://raw.githubusercontent.com/andreshere00/Splitter_MR/refs/heads/main/data/pangrams.txt",
            )

            # Split into chunks of 5 words, overlapping by 2 words
            splitter = WordSplitter(chunk_size=5, chunk_overlap=2)
            output = splitter.split(reader_output)
            print(output["chunks"])
            ```
            ```python
            ['The quick brown fox jumps',
            'fox jumps over the lazy',
            'over the lazy dog. Pack', ...]
            ```
        """
        # Initialize variables
        text = reader_output.text
        chunk_size = self.chunk_size

        # Split text into words (using simple whitespace tokenization)
        words = text.split()
        total_words = len(words)

        # Determine overlap in characters
        if isinstance(self.chunk_overlap, float) and 0 <= self.chunk_overlap < 1:
            overlap = int(chunk_size * self.chunk_overlap)
        else:
            overlap = int(self.chunk_overlap)
        if overlap >= chunk_size:
            raise ValueError("chunk_overlap must be smaller than chunk_size")

        # Split into chunks
        chunks = []
        start = 0
        step = chunk_size - overlap if (chunk_size - overlap) > 0 else 1
        while start < total_words:
            end = start + chunk_size
            chunk_words = words[start:end]
            chunks.append(" ".join(chunk_words))
            start += step

        # Generate chunk_id and append metadata
        chunk_ids = self._generate_chunk_ids(len(chunks))
        metadata = self._default_metadata()

        # Return output
        output = SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="word_splitter",
            split_params={
                "chunk_size": chunk_size,
                "chunk_overlap": self.chunk_overlap,
            },
            metadata=metadata,
        )
        return output
split(reader_output)

Splits the input text from the reader_output dictionary into word-based chunks.

Each chunk contains at most chunk_size words, and adjacent chunks can overlap by a specified number or percentage of words, according to the chunk_overlap parameter set at initialization.

Parameters:

Name Type Description Default
reader_output Dict[str, Any]

Dictionary containing at least a 'text' key (str) and optional document metadata (e.g., 'document_name', 'document_path', etc.).

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

Dataclass defining the output structure for all splitters.

Raises:

Type Description
ValueError

If chunk_overlap is greater than or equal to chunk_size.

Example

from splitter_mr.splitter import WordSplitter

reader_output = ReaderOutput(
    text: "The quick brown fox jumps over the lazy dog. Pack my box with five dozen liquor jugs. Sphinx of black quartz, judge my vow.",
    document_name: "pangrams.txt",
    document_path: "/https://raw.githubusercontent.com/andreshere00/Splitter_MR/refs/heads/main/data/pangrams.txt",
)

# Split into chunks of 5 words, overlapping by 2 words
splitter = WordSplitter(chunk_size=5, chunk_overlap=2)
output = splitter.split(reader_output)
print(output["chunks"])
['The quick brown fox jumps',
'fox jumps over the lazy',
'over the lazy dog. Pack', ...]

Source code in src/splitter_mr/splitter/splitters/word_splitter.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Splits the input text from the reader_output dictionary into word-based chunks.

    Each chunk contains at most `chunk_size` words, and adjacent chunks can overlap
    by a specified number or percentage of words, according to the `chunk_overlap`
    parameter set at initialization.

    Args:
        reader_output (Dict[str, Any]):
            Dictionary containing at least a 'text' key (str) and optional document metadata
            (e.g., 'document_name', 'document_path', etc.).

    Returns:
        SplitterOutput: Dataclass defining the output structure for all splitters.

    Raises:
        ValueError: If chunk_overlap is greater than or equal to chunk_size.

    Example:
        ```python
        from splitter_mr.splitter import WordSplitter

        reader_output = ReaderOutput(
            text: "The quick brown fox jumps over the lazy dog. Pack my box with five dozen liquor jugs. Sphinx of black quartz, judge my vow.",
            document_name: "pangrams.txt",
            document_path: "/https://raw.githubusercontent.com/andreshere00/Splitter_MR/refs/heads/main/data/pangrams.txt",
        )

        # Split into chunks of 5 words, overlapping by 2 words
        splitter = WordSplitter(chunk_size=5, chunk_overlap=2)
        output = splitter.split(reader_output)
        print(output["chunks"])
        ```
        ```python
        ['The quick brown fox jumps',
        'fox jumps over the lazy',
        'over the lazy dog. Pack', ...]
        ```
    """
    # Initialize variables
    text = reader_output.text
    chunk_size = self.chunk_size

    # Split text into words (using simple whitespace tokenization)
    words = text.split()
    total_words = len(words)

    # Determine overlap in characters
    if isinstance(self.chunk_overlap, float) and 0 <= self.chunk_overlap < 1:
        overlap = int(chunk_size * self.chunk_overlap)
    else:
        overlap = int(self.chunk_overlap)
    if overlap >= chunk_size:
        raise ValueError("chunk_overlap must be smaller than chunk_size")

    # Split into chunks
    chunks = []
    start = 0
    step = chunk_size - overlap if (chunk_size - overlap) > 0 else 1
    while start < total_words:
        end = start + chunk_size
        chunk_words = words[start:end]
        chunks.append(" ".join(chunk_words))
        start += step

    # Generate chunk_id and append metadata
    chunk_ids = self._generate_chunk_ids(len(chunks))
    metadata = self._default_metadata()

    # Return output
    output = SplitterOutput(
        chunks=chunks,
        chunk_id=chunk_ids,
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="word_splitter",
        split_params={
            "chunk_size": chunk_size,
            "chunk_overlap": self.chunk_overlap,
        },
        metadata=metadata,
    )
    return output

SentenceSplitter

SentenceSplitter

Bases: BaseSplitter

SentenceSplitter splits a given text into overlapping or non-overlapping chunks, where each chunk contains a specified number of sentences, and overlap is defined by a number or percentage of words from the end of the previous chunk.

Parameters:

Name Type Description Default
chunk_size int

Maximum number of sentences per chunk.

5
chunk_overlap Union[int, float]

Number or percentage of overlapping words between chunks.

0
separators Union[str, List[str]]

Character(s) to split sentences.

['.', '!', '?']
Source code in src/splitter_mr/splitter/splitters/sentence_splitter.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
class SentenceSplitter(BaseSplitter):
    """
    SentenceSplitter splits a given text into overlapping or non-overlapping chunks,
    where each chunk contains a specified number of sentences, and overlap is defined
    by a number or percentage of words from the end of the previous chunk.

    Args:
        chunk_size (int): Maximum number of sentences per chunk.
        chunk_overlap (Union[int, float]): Number or percentage of overlapping words between chunks.
        separators (Union[str, List[str]]): Character(s) to split sentences.
    """

    def __init__(
        self,
        chunk_size: int = 5,
        chunk_overlap: Union[int, float] = 0,
        separators: Union[str, List[str]] = [".", "!", "?"],
    ):
        super().__init__(chunk_size)
        self.chunk_overlap = chunk_overlap
        self.sentence_separators = (
            separators if isinstance(separators, list) else [separators]
        )

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Splits the input text from the `reader_output` dictionary into sentence-based chunks,
        allowing for overlap at the word level.

        Each chunk contains at most `chunk_size` sentences, where sentence boundaries are
        detected using the specified `sentence_separators` (e.g., '.', '!', '?').
        Overlap between consecutive chunks is specified by `chunk_overlap`, which can be an
        integer (number of words) or a float (fraction of the maximum words in a sentence).
        This is useful for downstream NLP tasks that require context preservation.

        Args:
            reader_output (Dict[str, Any]):
                Dictionary containing at least a 'text' key (str) and optional document metadata,
                such as 'document_name', 'document_path', 'document_id', etc.

        Returns:
            SplitterOutput: Dataclass defining the output structure for all splitters.

        Raises:
            ValueError: If `chunk_overlap` is negative or greater than or equal to `chunk_size`.
            ValueError: If 'text' is missing in `reader_output`.

        Example:
            ```python
            from splitter_mr.splitter import SentenceSplitter

            # Example input: 7 sentences with varied punctuation
            # This dictionary has been obtained as an output from a Reader class.
            reader_output = ReaderOutput(
                text: "Hello world! How are you? I am fine. Testing sentence splitting. Short. End! And another?",
                document_name: "sample.txt",
                document_path: "/tmp/sample.txt",
                document_id: "123"
            )

            # Split into chunks of 3 sentences each, no overlap
            splitter = SentenceSplitter(chunk_size=3, chunk_overlap=0)
            result = splitter.split(reader_output)
            print(result.chunks)
            ```
            ```python
            ['Hello world! How are you? I am fine.',
            'Testing sentence splitting. Short. End!',
            'And another?', ...]
            ```
        """
        # Initialize variables
        text = reader_output.text
        chunk_size = self.chunk_size

        # Split text into sentences
        separators_pattern = "|".join([re.escape(d) for d in self.sentence_separators])
        sentences = re.split(f"({separators_pattern})", text)
        merged_sentences = []
        for i in range(0, len(sentences) - 1, 2):
            sent = sentences[i].strip()
            punct = sentences[i + 1].strip() if i + 1 < len(sentences) else ""
            merged = (sent + punct).strip()
            if merged:
                merged_sentences.append(merged)
        if len(sentences) % 2 == 1 and sentences[-1].strip():
            merged_sentences.append(sentences[-1].strip())
        num_sentences = len(merged_sentences)

        # Determine overlap in words
        if isinstance(self.chunk_overlap, float) and 0 <= self.chunk_overlap < 1:
            max_sent_words = max((len(s.split()) for s in merged_sentences), default=0)
            overlap = int(max_sent_words * self.chunk_overlap)
        else:
            overlap = int(self.chunk_overlap)

        # Split into sentences
        chunks = []
        start = 0
        while start < num_sentences:
            end = min(start + chunk_size, num_sentences)
            chunk_sents = merged_sentences[start:end]
            chunk_text = " ".join(chunk_sents)
            if overlap > 0 and chunks:
                prev_words = chunks[-1].split()
                overlap_words = (
                    prev_words[-overlap:] if overlap <= len(prev_words) else prev_words
                )
                chunk_text = " ".join([" ".join(overlap_words), chunk_text]).strip()
            chunks.append(chunk_text)
            start += chunk_size

        # Generate chunk_id and append metadata
        chunk_ids = self._generate_chunk_ids(len(chunks))
        metadata = self._default_metadata()

        # Return output
        output = SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="sentence_splitter",
            split_params={
                "chunk_size": chunk_size,
                "chunk_overlap": self.chunk_overlap,
                "sentence_separators": self.sentence_separators,
            },
            metadata=metadata,
        )
        return output
split(reader_output)

Splits the input text from the reader_output dictionary into sentence-based chunks, allowing for overlap at the word level.

Each chunk contains at most chunk_size sentences, where sentence boundaries are detected using the specified sentence_separators (e.g., '.', '!', '?'). Overlap between consecutive chunks is specified by chunk_overlap, which can be an integer (number of words) or a float (fraction of the maximum words in a sentence). This is useful for downstream NLP tasks that require context preservation.

Parameters:

Name Type Description Default
reader_output Dict[str, Any]

Dictionary containing at least a 'text' key (str) and optional document metadata, such as 'document_name', 'document_path', 'document_id', etc.

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

Dataclass defining the output structure for all splitters.

Raises:

Type Description
ValueError

If chunk_overlap is negative or greater than or equal to chunk_size.

ValueError

If 'text' is missing in reader_output.

Example

from splitter_mr.splitter import SentenceSplitter

# Example input: 7 sentences with varied punctuation
# This dictionary has been obtained as an output from a Reader class.
reader_output = ReaderOutput(
    text: "Hello world! How are you? I am fine. Testing sentence splitting. Short. End! And another?",
    document_name: "sample.txt",
    document_path: "/tmp/sample.txt",
    document_id: "123"
)

# Split into chunks of 3 sentences each, no overlap
splitter = SentenceSplitter(chunk_size=3, chunk_overlap=0)
result = splitter.split(reader_output)
print(result.chunks)
['Hello world! How are you? I am fine.',
'Testing sentence splitting. Short. End!',
'And another?', ...]

Source code in src/splitter_mr/splitter/splitters/sentence_splitter.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Splits the input text from the `reader_output` dictionary into sentence-based chunks,
    allowing for overlap at the word level.

    Each chunk contains at most `chunk_size` sentences, where sentence boundaries are
    detected using the specified `sentence_separators` (e.g., '.', '!', '?').
    Overlap between consecutive chunks is specified by `chunk_overlap`, which can be an
    integer (number of words) or a float (fraction of the maximum words in a sentence).
    This is useful for downstream NLP tasks that require context preservation.

    Args:
        reader_output (Dict[str, Any]):
            Dictionary containing at least a 'text' key (str) and optional document metadata,
            such as 'document_name', 'document_path', 'document_id', etc.

    Returns:
        SplitterOutput: Dataclass defining the output structure for all splitters.

    Raises:
        ValueError: If `chunk_overlap` is negative or greater than or equal to `chunk_size`.
        ValueError: If 'text' is missing in `reader_output`.

    Example:
        ```python
        from splitter_mr.splitter import SentenceSplitter

        # Example input: 7 sentences with varied punctuation
        # This dictionary has been obtained as an output from a Reader class.
        reader_output = ReaderOutput(
            text: "Hello world! How are you? I am fine. Testing sentence splitting. Short. End! And another?",
            document_name: "sample.txt",
            document_path: "/tmp/sample.txt",
            document_id: "123"
        )

        # Split into chunks of 3 sentences each, no overlap
        splitter = SentenceSplitter(chunk_size=3, chunk_overlap=0)
        result = splitter.split(reader_output)
        print(result.chunks)
        ```
        ```python
        ['Hello world! How are you? I am fine.',
        'Testing sentence splitting. Short. End!',
        'And another?', ...]
        ```
    """
    # Initialize variables
    text = reader_output.text
    chunk_size = self.chunk_size

    # Split text into sentences
    separators_pattern = "|".join([re.escape(d) for d in self.sentence_separators])
    sentences = re.split(f"({separators_pattern})", text)
    merged_sentences = []
    for i in range(0, len(sentences) - 1, 2):
        sent = sentences[i].strip()
        punct = sentences[i + 1].strip() if i + 1 < len(sentences) else ""
        merged = (sent + punct).strip()
        if merged:
            merged_sentences.append(merged)
    if len(sentences) % 2 == 1 and sentences[-1].strip():
        merged_sentences.append(sentences[-1].strip())
    num_sentences = len(merged_sentences)

    # Determine overlap in words
    if isinstance(self.chunk_overlap, float) and 0 <= self.chunk_overlap < 1:
        max_sent_words = max((len(s.split()) for s in merged_sentences), default=0)
        overlap = int(max_sent_words * self.chunk_overlap)
    else:
        overlap = int(self.chunk_overlap)

    # Split into sentences
    chunks = []
    start = 0
    while start < num_sentences:
        end = min(start + chunk_size, num_sentences)
        chunk_sents = merged_sentences[start:end]
        chunk_text = " ".join(chunk_sents)
        if overlap > 0 and chunks:
            prev_words = chunks[-1].split()
            overlap_words = (
                prev_words[-overlap:] if overlap <= len(prev_words) else prev_words
            )
            chunk_text = " ".join([" ".join(overlap_words), chunk_text]).strip()
        chunks.append(chunk_text)
        start += chunk_size

    # Generate chunk_id and append metadata
    chunk_ids = self._generate_chunk_ids(len(chunks))
    metadata = self._default_metadata()

    # Return output
    output = SplitterOutput(
        chunks=chunks,
        chunk_id=chunk_ids,
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="sentence_splitter",
        split_params={
            "chunk_size": chunk_size,
            "chunk_overlap": self.chunk_overlap,
            "sentence_separators": self.sentence_separators,
        },
        metadata=metadata,
    )
    return output

ParagraphSplitter

ParagraphSplitter

Bases: BaseSplitter

ParagraphSplitter splits a given text into overlapping or non-overlapping chunks, where each chunk contains a specified number of paragraphs, and overlap is defined by a number or percentage of words from the end of the previous chunk.

Parameters:

Name Type Description Default
chunk_size int

Maximum number of paragraphs per chunk.

3
chunk_overlap Union[int, float]

Number or percentage of overlapping words between chunks.

0
line_break Union[str, List[str]]

Character(s) used to split text into paragraphs.

'\n'
Source code in src/splitter_mr/splitter/splitters/paragraph_splitter.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class ParagraphSplitter(BaseSplitter):
    """
    ParagraphSplitter splits a given text into overlapping or non-overlapping chunks,
    where each chunk contains a specified number of paragraphs, and overlap is defined
    by a number or percentage of words from the end of the previous chunk.

    Args:
        chunk_size (int): Maximum number of paragraphs per chunk.
        chunk_overlap (Union[int, float]): Number or percentage of overlapping words between chunks.
        line_break (Union[str, List[str]]): Character(s) used to split text into paragraphs.
    """

    def __init__(
        self,
        chunk_size: int = 3,
        chunk_overlap: Union[int, float] = 0,
        line_break: Union[str, List[str]] = "\n",
    ):
        super().__init__(chunk_size)
        self.chunk_overlap = chunk_overlap
        self.line_break = line_break if isinstance(line_break, list) else [line_break]

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Splits text in `reader_output['text']` into paragraph-based chunks, with optional word overlap.

        Args:
            reader_output (Dict[str, Any]): Dictionary containing at least a 'text' key (str)
                and optional document metadata (e.g., 'document_name', 'document_path').

        Returns:
            SplitterOutput: Dataclass defining the output structure for all splitters.

        Raises:
            ValueError: If 'text' is missing from `reader_output` or is not a string.

        Example:
            ```python
            from splitter_mr.splitter import ParagraphSplitter

            # This dictionary has been obtained as the output from a Reader object.
            reader_output = ReaderOutput(
                text: "Para 1.\\n\\nPara 2.\\n\\nPara 3.",
                document_name: "test.txt",
                document_path: "/tmp/test.txt"
            )
            splitter = ParagraphSplitter(chunk_size=2, chunk_overlap=1, line_break="\\n\\n")
            output = splitter.split(reader_output)
            print(output["chunks"])
            ```
            ```python
            ['Para 1.\\n\\nPara 2.', '2. Para 3.']
            ```
        """
        # Intialize variables
        text = reader_output.text
        line_breaks_pattern = "|".join(map(re.escape, self.line_break))
        paragraphs = [p for p in re.split(line_breaks_pattern, text) if p.strip()]
        num_paragraphs = len(paragraphs)

        # Determine overlap in words
        if isinstance(self.chunk_overlap, float) and 0 <= self.chunk_overlap < 1:
            max_para_words = max((len(p.split()) for p in paragraphs), default=0)
            overlap = int(max_para_words * self.chunk_overlap)
        else:
            overlap = int(self.chunk_overlap)

        # Split into chunks
        chunks = []
        start = 0
        while start < num_paragraphs:
            end = min(start + self.chunk_size, num_paragraphs)
            chunk_paragraphs = paragraphs[start:end]
            chunk_text = self.line_break[0].join(chunk_paragraphs)
            if overlap > 0 and chunks:
                prev_words = chunks[-1].split()
                overlap_words = (
                    prev_words[-overlap:] if overlap <= len(prev_words) else prev_words
                )
                chunk_text = (
                    self.line_break[0]
                    .join([" ".join(overlap_words), chunk_text])
                    .strip()
                )
            chunks.append(chunk_text)
            start += self.chunk_size

        # Generate chunk_id and append metadata
        chunk_ids = self._generate_chunk_ids(len(chunks))
        metadata = self._default_metadata()

        # Return output
        output = SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="paragraph_splitter",
            split_params={
                "chunk_size": self.chunk_size,
                "chunk_overlap": self.chunk_overlap,
                "line_break": self.line_break,
            },
            metadata=metadata,
        )
        return output
split(reader_output)

Splits text in reader_output['text'] into paragraph-based chunks, with optional word overlap.

Parameters:

Name Type Description Default
reader_output Dict[str, Any]

Dictionary containing at least a 'text' key (str) and optional document metadata (e.g., 'document_name', 'document_path').

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

Dataclass defining the output structure for all splitters.

Raises:

Type Description
ValueError

If 'text' is missing from reader_output or is not a string.

Example

from splitter_mr.splitter import ParagraphSplitter

# This dictionary has been obtained as the output from a Reader object.
reader_output = ReaderOutput(
    text: "Para 1.\n\nPara 2.\n\nPara 3.",
    document_name: "test.txt",
    document_path: "/tmp/test.txt"
)
splitter = ParagraphSplitter(chunk_size=2, chunk_overlap=1, line_break="\n\n")
output = splitter.split(reader_output)
print(output["chunks"])
['Para 1.\n\nPara 2.', '2. Para 3.']

Source code in src/splitter_mr/splitter/splitters/paragraph_splitter.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Splits text in `reader_output['text']` into paragraph-based chunks, with optional word overlap.

    Args:
        reader_output (Dict[str, Any]): Dictionary containing at least a 'text' key (str)
            and optional document metadata (e.g., 'document_name', 'document_path').

    Returns:
        SplitterOutput: Dataclass defining the output structure for all splitters.

    Raises:
        ValueError: If 'text' is missing from `reader_output` or is not a string.

    Example:
        ```python
        from splitter_mr.splitter import ParagraphSplitter

        # This dictionary has been obtained as the output from a Reader object.
        reader_output = ReaderOutput(
            text: "Para 1.\\n\\nPara 2.\\n\\nPara 3.",
            document_name: "test.txt",
            document_path: "/tmp/test.txt"
        )
        splitter = ParagraphSplitter(chunk_size=2, chunk_overlap=1, line_break="\\n\\n")
        output = splitter.split(reader_output)
        print(output["chunks"])
        ```
        ```python
        ['Para 1.\\n\\nPara 2.', '2. Para 3.']
        ```
    """
    # Intialize variables
    text = reader_output.text
    line_breaks_pattern = "|".join(map(re.escape, self.line_break))
    paragraphs = [p for p in re.split(line_breaks_pattern, text) if p.strip()]
    num_paragraphs = len(paragraphs)

    # Determine overlap in words
    if isinstance(self.chunk_overlap, float) and 0 <= self.chunk_overlap < 1:
        max_para_words = max((len(p.split()) for p in paragraphs), default=0)
        overlap = int(max_para_words * self.chunk_overlap)
    else:
        overlap = int(self.chunk_overlap)

    # Split into chunks
    chunks = []
    start = 0
    while start < num_paragraphs:
        end = min(start + self.chunk_size, num_paragraphs)
        chunk_paragraphs = paragraphs[start:end]
        chunk_text = self.line_break[0].join(chunk_paragraphs)
        if overlap > 0 and chunks:
            prev_words = chunks[-1].split()
            overlap_words = (
                prev_words[-overlap:] if overlap <= len(prev_words) else prev_words
            )
            chunk_text = (
                self.line_break[0]
                .join([" ".join(overlap_words), chunk_text])
                .strip()
            )
        chunks.append(chunk_text)
        start += self.chunk_size

    # Generate chunk_id and append metadata
    chunk_ids = self._generate_chunk_ids(len(chunks))
    metadata = self._default_metadata()

    # Return output
    output = SplitterOutput(
        chunks=chunks,
        chunk_id=chunk_ids,
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="paragraph_splitter",
        split_params={
            "chunk_size": self.chunk_size,
            "chunk_overlap": self.chunk_overlap,
            "line_break": self.line_break,
        },
        metadata=metadata,
    )
    return output

RecursiveCharacterSplitter

RecursiveCharacterSplitter

Bases: BaseSplitter

RecursiveCharacterSplitter splits a given text into overlapping or non-overlapping chunks, where each chunk is created repeatedly breaking down the text until it reaches the desired chunk size. This class implements the Langchain RecursiveCharacterTextSplitter.

Parameters:

Name Type Description Default
chunk_size int

Approximate chunk size, in characters.

1000
chunk_overlap Union[int, float]

Number or percentage of overlapping characters between chunks.

0.1
separators Union[str, List[str]]

Character(s) to recursively split sentences.

['\n\n', '\n', ' ', '.', ',', '\u200b', ',', '、', '.', '。', '']
Notes

More info about the RecursiveCharacterTextSplitter: Langchain Docs.

Source code in src/splitter_mr/splitter/splitters/recursive_splitter.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class RecursiveCharacterSplitter(BaseSplitter):
    """
    RecursiveCharacterSplitter splits a given text into overlapping or non-overlapping chunks,
    where each chunk is created repeatedly breaking down the text until it reaches the
    desired chunk size. This class implements the Langchain RecursiveCharacterTextSplitter.

    Args:
        chunk_size (int): Approximate chunk size, in characters.
        chunk_overlap (Union[int, float]): Number or percentage of overlapping characters between
            chunks.
        separators (Union[str, List[str]]): Character(s) to recursively split sentences.

    Notes:
        More info about the RecursiveCharacterTextSplitter:
        [Langchain Docs](https://python.langchain.com/docs/how_to/recursive_text_splitter/).
    """

    def __init__(
        self,
        chunk_size: int = 1000,
        chunk_overlap: Union[int, float] = 0.1,
        separators: Union[str, List[str]] = [
            "\n\n",
            "\n",
            " ",
            ".",
            ",",
            "\u200b",  # Zero-width space
            "\uff0c",  # Fullwidth comma
            "\u3001",  # Ideographic comma
            "\uff0e",  # Fullwidth full stop
            "\u3002",  # Ideographic full stop
            "",
        ],
    ):
        super().__init__(chunk_size)
        self.chunk_overlap = chunk_overlap
        self.separators = separators if isinstance(separators, list) else [separators]

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Splits the input text into character-based chunks using a recursive splitting strategy
        (via Langchain's `RecursiveCharacterTextSplitter`), supporting configurable separators,
        chunk size, and overlap.

        Args:
            reader_output (Dict[str, Any]): Dictionary containing at least a 'text' key (str)
                and optional document metadata (e.g., 'document_name', 'document_path', etc.).

        Returns:
            SplitterOutput: Dataclass defining the output structure for all splitters.

        Raises:
            ValueError: If 'text' is missing in `reader_output` or is not a string.

        Example:
            ```python
            from splitter_mr.splitter import RecursiveCharacterSplitter

            # This dictionary has been obtained as the output from a Reader object.
            reader_output = ReaderOutput(
                text: "This is a long document.
                It will be recursively split into smaller chunks using the specified separators.
                Each chunk will have some overlap with the next.",
                document_name: "sample.txt",
                document_path: "/tmp/sample.txt"
            )

            splitter = RecursiveCharacterSplitter(chunk_size=40, chunk_overlap=5)
            output = splitter.split(reader_output)
            print(output["chunks"])
            ```
            ```python
            ['This is a long document. It will be', 'be recursively split into smaller chunks', ...]
            ```
        """
        # Initialize variables
        text = reader_output.text
        chunk_size = self.chunk_size

        # Determine overlap in characters
        if isinstance(self.chunk_overlap, float) and 0 <= self.chunk_overlap < 1:
            overlap = int(chunk_size * self.chunk_overlap)
        else:
            overlap = int(self.chunk_overlap)
        if overlap >= chunk_size:
            raise ValueError("chunk_overlap must be smaller than chunk_size")

        # Split text into sentences
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.chunk_size,
            chunk_overlap=self.chunk_overlap,
            separators=self.separators,
        )
        texts = splitter.create_documents([text])
        chunks = [doc.page_content for doc in texts]

        # Generate chunk_id and append metadata
        chunk_ids = self._generate_chunk_ids(len(chunks))
        metadata = self._default_metadata()

        # Return output
        output = SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="recursive_character_splitter",
            split_params={
                "chunk_size": chunk_size,
                "chunk_overlap": self.chunk_overlap,
                "separators": self.separators,
            },
            metadata=metadata,
        )
        return output
split(reader_output)

Splits the input text into character-based chunks using a recursive splitting strategy (via Langchain's RecursiveCharacterTextSplitter), supporting configurable separators, chunk size, and overlap.

Parameters:

Name Type Description Default
reader_output Dict[str, Any]

Dictionary containing at least a 'text' key (str) and optional document metadata (e.g., 'document_name', 'document_path', etc.).

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

Dataclass defining the output structure for all splitters.

Raises:

Type Description
ValueError

If 'text' is missing in reader_output or is not a string.

Example

from splitter_mr.splitter import RecursiveCharacterSplitter

# This dictionary has been obtained as the output from a Reader object.
reader_output = ReaderOutput(
    text: "This is a long document.
    It will be recursively split into smaller chunks using the specified separators.
    Each chunk will have some overlap with the next.",
    document_name: "sample.txt",
    document_path: "/tmp/sample.txt"
)

splitter = RecursiveCharacterSplitter(chunk_size=40, chunk_overlap=5)
output = splitter.split(reader_output)
print(output["chunks"])
['This is a long document. It will be', 'be recursively split into smaller chunks', ...]

Source code in src/splitter_mr/splitter/splitters/recursive_splitter.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Splits the input text into character-based chunks using a recursive splitting strategy
    (via Langchain's `RecursiveCharacterTextSplitter`), supporting configurable separators,
    chunk size, and overlap.

    Args:
        reader_output (Dict[str, Any]): Dictionary containing at least a 'text' key (str)
            and optional document metadata (e.g., 'document_name', 'document_path', etc.).

    Returns:
        SplitterOutput: Dataclass defining the output structure for all splitters.

    Raises:
        ValueError: If 'text' is missing in `reader_output` or is not a string.

    Example:
        ```python
        from splitter_mr.splitter import RecursiveCharacterSplitter

        # This dictionary has been obtained as the output from a Reader object.
        reader_output = ReaderOutput(
            text: "This is a long document.
            It will be recursively split into smaller chunks using the specified separators.
            Each chunk will have some overlap with the next.",
            document_name: "sample.txt",
            document_path: "/tmp/sample.txt"
        )

        splitter = RecursiveCharacterSplitter(chunk_size=40, chunk_overlap=5)
        output = splitter.split(reader_output)
        print(output["chunks"])
        ```
        ```python
        ['This is a long document. It will be', 'be recursively split into smaller chunks', ...]
        ```
    """
    # Initialize variables
    text = reader_output.text
    chunk_size = self.chunk_size

    # Determine overlap in characters
    if isinstance(self.chunk_overlap, float) and 0 <= self.chunk_overlap < 1:
        overlap = int(chunk_size * self.chunk_overlap)
    else:
        overlap = int(self.chunk_overlap)
    if overlap >= chunk_size:
        raise ValueError("chunk_overlap must be smaller than chunk_size")

    # Split text into sentences
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=self.chunk_size,
        chunk_overlap=self.chunk_overlap,
        separators=self.separators,
    )
    texts = splitter.create_documents([text])
    chunks = [doc.page_content for doc in texts]

    # Generate chunk_id and append metadata
    chunk_ids = self._generate_chunk_ids(len(chunks))
    metadata = self._default_metadata()

    # Return output
    output = SplitterOutput(
        chunks=chunks,
        chunk_id=chunk_ids,
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="recursive_character_splitter",
        split_params={
            "chunk_size": chunk_size,
            "chunk_overlap": self.chunk_overlap,
            "separators": self.separators,
        },
        metadata=metadata,
    )
    return output

HeaderSplitter

HeaderSplitter

Bases: BaseSplitter

Splits an HTML or Markdown document into chunks based on header levels.

This splitter converts a list of semantic header names (e.g., ["Header 1", "Header 2"]) into the correct header tokens for Markdown ("#", "##", ...) or HTML ("h1", "h2", ...), and uses Langchain's splitters under the hood. You can choose whether to group headers with their following content or split on each leaf element.

Parameters:

Name Type Description Default
chunk_size int

Kept for compatibility. Defaults to 1000.

1000
headers_to_split_on Optional[List[str]]

List of semantic header names such as ["Header 1", "Header 2"]. If None, all levels 1–6 are enabled.

None
group_header_with_content bool

If True (default), keeps each header with its following block(s). If False, falls back to line/element splitting.

True
Notes
  • Only actual Markdown (#) or HTML (

    ) headings are supported.
  • Output is a SplitterOutput dataclass compatible with splitter_mr.
Example

from splitter_mr.splitter import HeaderSplitter

reader_output = ReaderOutput(
    text = '<!DOCTYPE html><html><body><h1>Main Title</h1><h2>Section 1</h2><h2>Section 2</h2></body></html>',
    ...
)
splitter = HeaderSplitter(headers_to_split_on=["Header 1", "Header 2"])
output = splitter.split(reader_output)
print(output.chunks)
['<!DOCTYPE html><html><body><h1>Main Title</h1>', '<h2>Section 1</h2>', '<h2>Section 2</h2></body></html>']

Source code in src/splitter_mr/splitter/splitters/header_splitter.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
class HeaderSplitter(BaseSplitter):
    """
    Splits an HTML or Markdown document into chunks based on header levels.

    This splitter converts a list of semantic header names (e.g., ["Header 1", "Header 2"])
    into the correct header tokens for Markdown ("#", "##", ...) or HTML ("h1", "h2", ...),
    and uses Langchain's splitters under the hood. You can choose whether to group headers
    with their following content or split on each leaf element.

    Args:
        chunk_size (int, optional): Kept for compatibility. Defaults to 1000.
        headers_to_split_on (Optional[List[str]]): List of semantic header names such as
            ["Header 1", "Header 2"]. If None, all levels 1–6 are enabled.
        group_header_with_content (bool, optional): If True (default), keeps each header with
            its following block(s). If False, falls back to line/element splitting.

    Notes:
        - Only actual Markdown (#) or HTML (<h1>–<h6>) headings are supported.
        - Output is a SplitterOutput dataclass compatible with splitter_mr.

    Example:
        ```python
        from splitter_mr.splitter import HeaderSplitter

        reader_output = ReaderOutput(
            text = '<!DOCTYPE html><html><body><h1>Main Title</h1><h2>Section 1</h2><h2>Section 2</h2></body></html>',
            ...
        )
        splitter = HeaderSplitter(headers_to_split_on=["Header 1", "Header 2"])
        output = splitter.split(reader_output)
        print(output.chunks)
        ```
        ```python
        ['<!DOCTYPE html><html><body><h1>Main Title</h1>', '<h2>Section 1</h2>', '<h2>Section 2</h2></body></html>']
        ```
    """

    def __init__(
        self,
        chunk_size: int = 1000,
        headers_to_split_on: Optional[List[str]] = None,
        *,
        group_header_with_content: bool = True,
    ):
        """
        Initializes the TagSplitter with header configuration.

        Args:
            chunk_size (int): Unused, for API compatibility.
            headers_to_split_on (Optional[List[str]]): List of header names (e.g., ["Header 2"]).
            group_header_with_content (bool): If True, group header with body. Default True.
        """
        super().__init__(chunk_size)
        self.headers_to_split_on = headers_to_split_on or [
            f"Header {i}" for i in range(1, 7)
        ]
        self.group_header_with_content = bool(group_header_with_content)

    def _make_tuples(self, filetype: str) -> List[Tuple[str, str]]:
        """
        Converts semantic header names into tuples for Langchain splitters.

        Args:
            filetype (str): "md" for Markdown, "html" for HTML.

        Returns:
            List[Tuple[str, str]]: Tuples with (header_token, semantic_name).

        Raises:
            ValueError: If filetype is unknown.
        """
        tuples: List[Tuple[str, str]] = []
        for header in self.headers_to_split_on:
            lvl = self._header_level(header)
            if filetype == "md":
                tuples.append(("#" * lvl, header))
            elif filetype == "html":
                tuples.append((f"h{lvl}", header))
            else:
                raise ValueError(f"Unsupported filetype: {filetype!r}")
        return tuples

    @staticmethod
    def _header_level(header: str) -> int:
        """
        Extracts the numeric level from a header name like "Header 2".

        Args:
            header (str): Header string, e.g. "Header 2".

        Returns:
            int: Level of the header (e.g., 2 for "Header 2").

        Raises:
            ValueError: If header string is not of expected format.
        """
        m = re.match(r"header\s*(\d+)", header.lower())
        if not m:
            raise ValueError(f"Invalid header: {header}")  # Fix error message
        return int(m.group(1))

    @staticmethod
    def _guess_filetype(reader_output: ReaderOutput) -> str:
        """
        Guesses if the document is HTML or Markdown based on filename or content.

        Args:
            reader_output (ReaderOutput): Reader output with text and metadata.

        Returns:
            str: "html" or "md".
        """
        name = (reader_output.document_name or "").lower()
        if name.endswith((".html", ".htm")):
            return "html"
        if name.endswith((".md", ".markdown")):
            return "md"

        soup = BeautifulSoup(reader_output.text, "html.parser")
        if soup.find("html") or soup.find(re.compile(r"^h[1-6]$")) or soup.find("div"):
            return "html"
        return "md"

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Splits the document into chunks using the configured header levels.

        Args:
            reader_output (ReaderOutput): Input object with document text and metadata.

        Returns:
            SplitterOutput: Output dataclass with chunked text and metadata.

        Raises:
            ValueError: If reader_output.text is empty.
        """
        if not reader_output.text:
            raise ValueError("reader_output.text is empty or None")

        filetype = self._guess_filetype(reader_output)
        tuples = self._make_tuples(filetype)

        if filetype == "html":
            splitter = HTMLHeaderTextSplitter(
                headers_to_split_on=tuples,
                return_each_element=False,
            )
        else:
            splitter = MarkdownHeaderTextSplitter(
                headers_to_split_on=tuples, return_each_line=False, strip_headers=False
            )

        docs = splitter.split_text(reader_output.text)
        chunks = [doc.page_content for doc in docs]

        return SplitterOutput(
            chunks=chunks,
            chunk_id=self._generate_chunk_ids(len(chunks)),
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="header_splitter",
            split_params={
                "headers_to_split_on": self.headers_to_split_on,
                "group_header_with_content": self.group_header_with_content,
            },
            metadata=self._default_metadata(),
        )
__init__(chunk_size=1000, headers_to_split_on=None, *, group_header_with_content=True)

Initializes the TagSplitter with header configuration.

Parameters:

Name Type Description Default
chunk_size int

Unused, for API compatibility.

1000
headers_to_split_on Optional[List[str]]

List of header names (e.g., ["Header 2"]).

None
group_header_with_content bool

If True, group header with body. Default True.

True
Source code in src/splitter_mr/splitter/splitters/header_splitter.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def __init__(
    self,
    chunk_size: int = 1000,
    headers_to_split_on: Optional[List[str]] = None,
    *,
    group_header_with_content: bool = True,
):
    """
    Initializes the TagSplitter with header configuration.

    Args:
        chunk_size (int): Unused, for API compatibility.
        headers_to_split_on (Optional[List[str]]): List of header names (e.g., ["Header 2"]).
        group_header_with_content (bool): If True, group header with body. Default True.
    """
    super().__init__(chunk_size)
    self.headers_to_split_on = headers_to_split_on or [
        f"Header {i}" for i in range(1, 7)
    ]
    self.group_header_with_content = bool(group_header_with_content)
split(reader_output)

Splits the document into chunks using the configured header levels.

Parameters:

Name Type Description Default
reader_output ReaderOutput

Input object with document text and metadata.

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

Output dataclass with chunked text and metadata.

Raises:

Type Description
ValueError

If reader_output.text is empty.

Source code in src/splitter_mr/splitter/splitters/header_splitter.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Splits the document into chunks using the configured header levels.

    Args:
        reader_output (ReaderOutput): Input object with document text and metadata.

    Returns:
        SplitterOutput: Output dataclass with chunked text and metadata.

    Raises:
        ValueError: If reader_output.text is empty.
    """
    if not reader_output.text:
        raise ValueError("reader_output.text is empty or None")

    filetype = self._guess_filetype(reader_output)
    tuples = self._make_tuples(filetype)

    if filetype == "html":
        splitter = HTMLHeaderTextSplitter(
            headers_to_split_on=tuples,
            return_each_element=False,
        )
    else:
        splitter = MarkdownHeaderTextSplitter(
            headers_to_split_on=tuples, return_each_line=False, strip_headers=False
        )

    docs = splitter.split_text(reader_output.text)
    chunks = [doc.page_content for doc in docs]

    return SplitterOutput(
        chunks=chunks,
        chunk_id=self._generate_chunk_ids(len(chunks)),
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="header_splitter",
        split_params={
            "headers_to_split_on": self.headers_to_split_on,
            "group_header_with_content": self.group_header_with_content,
        },
        metadata=self._default_metadata(),
    )

JSONRecursiveSplitter

RecursiveJSONSplitter

Bases: BaseSplitter

JSONRecursiveSplitter splits a JSON string or structure into overlapping or non-overlapping chunks, using the Langchain RecursiveJsonSplitter. This splitter is designed to recursively break down JSON data (including nested objects and arrays) into manageable pieces based on keys, arrays, or other separators, until the desired chunk size is reached.

Parameters:

Name Type Description Default
chunk_size int

Maximum chunk size, measured in the number of characters per chunk.

1000
min_chunk_size int

Minimum chunk size, in characters.

200
Notes

See Langchain Docs on RecursiveJsonSplitter.

Source code in src/splitter_mr/splitter/splitters/json_splitter.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class RecursiveJSONSplitter(BaseSplitter):
    """
    JSONRecursiveSplitter splits a JSON string or structure into overlapping or non-overlapping
    chunks, using the Langchain RecursiveJsonSplitter. This splitter is designed to recursively
    break down JSON data (including nested objects and arrays) into manageable pieces based on keys,
    arrays, or other separators, until the desired chunk size is reached.

    Args:
        chunk_size (int): Maximum chunk size, measured in the number of characters per chunk.
        min_chunk_size (int): Minimum chunk size, in characters.

    Notes:
        See [Langchain Docs on RecursiveJsonSplitter](https://python.langchain.com/api_reference/text_splitters/json/langchain_text_splitters.json.RecursiveJsonSplitter.html#langchain_text_splitters.json.RecursiveJsonSplitter).
    """

    def __init__(self, chunk_size: int = 1000, min_chunk_size: int = 200):
        super().__init__(chunk_size)
        self.min_chunk_size = min_chunk_size

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Splits the input JSON text from the reader_output dictionary into recursively chunked pieces,
        allowing for overlap by number or percentage of characters.

        Args:
            reader_output (Dict[str, Any]):
                Dictionary containing at least a 'text' key (str) and optional document metadata
                (e.g., 'document_name', 'document_path', etc.).

        Returns:
            SplitterOutput: Dataclass defining the output structure for all splitters.

        Raises:
            ValueError: If the 'text' field is missing from reader_output.
            json.JSONDecodeError: If the 'text' field contains invalid JSON.

        Example:
            ```python
            from splitter_mr.splitter import RecursiveJSONSplitter

            # This dictionary has been obtained from `VanillaReader`
            reader_output = ReaderOutput(
                text: '{"company": {"name": "TechCorp", "employees": [{"name": "Alice"}, {"name": "Bob"}]}}'
                document_name: "company_data.json",
                document_path: "/https://raw.githubusercontent.com/andreshere00/Splitter_MR/refs/heads/main/data/company_data.json",
                document_id: "doc123",
                conversion_method: "vanilla",
                ocr_method: None
            )
            splitter = RecursiveJSONSplitter(chunk_size=100, min_chunk_size=20)
            output = splitter.split(reader_output)
            print(output["chunks"])
            ```
            ```python
            ['{"company": {"name": "TechCorp"}}',
            '{"employees": [{"name": "Alice"},
            {"name": "Bob"}]}']
            ```
        """
        # Initialize variables
        text = json.loads(reader_output.text)

        # Split text into smaller JSON chunks
        splitter = RecursiveJsonSplitter(
            max_chunk_size=self.chunk_size,
            min_chunk_size=int(self.chunk_size - self.min_chunk_size),
        )
        chunks = splitter.split_json(json_data=text, convert_lists=True)

        # Generate chunk_ids and metadata
        chunk_ids = self._generate_chunk_ids(len(chunks))
        metadata = self._default_metadata()

        # Return output
        output = SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="recursive_json_splitter",
            split_params={
                "max_chunk_size": self.chunk_size,
                "min_chunk_size": self.min_chunk_size,
            },
            metadata=metadata,
        )
        return output
split(reader_output)

Splits the input JSON text from the reader_output dictionary into recursively chunked pieces, allowing for overlap by number or percentage of characters.

Parameters:

Name Type Description Default
reader_output Dict[str, Any]

Dictionary containing at least a 'text' key (str) and optional document metadata (e.g., 'document_name', 'document_path', etc.).

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

Dataclass defining the output structure for all splitters.

Raises:

Type Description
ValueError

If the 'text' field is missing from reader_output.

JSONDecodeError

If the 'text' field contains invalid JSON.

Example

from splitter_mr.splitter import RecursiveJSONSplitter

# This dictionary has been obtained from `VanillaReader`
reader_output = ReaderOutput(
    text: '{"company": {"name": "TechCorp", "employees": [{"name": "Alice"}, {"name": "Bob"}]}}'
    document_name: "company_data.json",
    document_path: "/https://raw.githubusercontent.com/andreshere00/Splitter_MR/refs/heads/main/data/company_data.json",
    document_id: "doc123",
    conversion_method: "vanilla",
    ocr_method: None
)
splitter = RecursiveJSONSplitter(chunk_size=100, min_chunk_size=20)
output = splitter.split(reader_output)
print(output["chunks"])
['{"company": {"name": "TechCorp"}}',
'{"employees": [{"name": "Alice"},
{"name": "Bob"}]}']

Source code in src/splitter_mr/splitter/splitters/json_splitter.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Splits the input JSON text from the reader_output dictionary into recursively chunked pieces,
    allowing for overlap by number or percentage of characters.

    Args:
        reader_output (Dict[str, Any]):
            Dictionary containing at least a 'text' key (str) and optional document metadata
            (e.g., 'document_name', 'document_path', etc.).

    Returns:
        SplitterOutput: Dataclass defining the output structure for all splitters.

    Raises:
        ValueError: If the 'text' field is missing from reader_output.
        json.JSONDecodeError: If the 'text' field contains invalid JSON.

    Example:
        ```python
        from splitter_mr.splitter import RecursiveJSONSplitter

        # This dictionary has been obtained from `VanillaReader`
        reader_output = ReaderOutput(
            text: '{"company": {"name": "TechCorp", "employees": [{"name": "Alice"}, {"name": "Bob"}]}}'
            document_name: "company_data.json",
            document_path: "/https://raw.githubusercontent.com/andreshere00/Splitter_MR/refs/heads/main/data/company_data.json",
            document_id: "doc123",
            conversion_method: "vanilla",
            ocr_method: None
        )
        splitter = RecursiveJSONSplitter(chunk_size=100, min_chunk_size=20)
        output = splitter.split(reader_output)
        print(output["chunks"])
        ```
        ```python
        ['{"company": {"name": "TechCorp"}}',
        '{"employees": [{"name": "Alice"},
        {"name": "Bob"}]}']
        ```
    """
    # Initialize variables
    text = json.loads(reader_output.text)

    # Split text into smaller JSON chunks
    splitter = RecursiveJsonSplitter(
        max_chunk_size=self.chunk_size,
        min_chunk_size=int(self.chunk_size - self.min_chunk_size),
    )
    chunks = splitter.split_json(json_data=text, convert_lists=True)

    # Generate chunk_ids and metadata
    chunk_ids = self._generate_chunk_ids(len(chunks))
    metadata = self._default_metadata()

    # Return output
    output = SplitterOutput(
        chunks=chunks,
        chunk_id=chunk_ids,
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="recursive_json_splitter",
        split_params={
            "max_chunk_size": self.chunk_size,
            "min_chunk_size": self.min_chunk_size,
        },
        metadata=metadata,
    )
    return output

HTMLTagSplitter

HTMLTagSplitter

Bases: BaseSplitter

HTMLTagSplitter splits HTML content based on a specified tag. In case that this tag is not specified, it is automatically detected as the most frequent and shallowest tag.

Parameters:

Name Type Description Default
chunk_size int

maximum chunk size, in characters

10000
tag str

lowest level of hierarchy where do you want to split the text.

None
Source code in src/splitter_mr/splitter/splitters/html_tag_splitter.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
class HTMLTagSplitter(BaseSplitter):
    """
    HTMLTagSplitter splits HTML content based on a specified tag.
    In case that this tag is not specified, it is automatically detected as
    the most frequent and shallowest tag.

    Args:
        chunk_size (int): maximum chunk size, in characters
        tag (str): lowest level of hierarchy where do you want to split the text.
    """

    def __init__(self, chunk_size: int = 10000, tag: Optional[str] = None):
        # TODO: chunk_size it is not necessary for this Splitter. Remove from BaseSplitter class.
        super().__init__(chunk_size)
        self.tag = tag

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Splits HTML in `reader_output['text']` using the specified tag or, if not specified,
        automatically selects the most frequent and shallowest tag.

        Args:
            reader_output (Dict[str, Any]): Dictionary containing at least a 'text' key
                (str) and optional document metadata (e.g., 'document_name', 'document_path').

        Returns:
            SplitterOutput: Dataclass defining the output structure for all splitters.

        Raises:
            ValueError: If `reader_output` does not contain a 'text' key or if the HTML cannot be parsed.

        Example:
            ```python
            from splitter_mr.splitter import HTMLTagSplitter

            # This dictionary has been obtained as the output from a Reader object.
            reader_output = ReaderOutput(
                text: "<html><body><div>Chunk 1</div><div>Chunk 2</div></body></html>",
                document_name: "example.html",
                document_path: "/path/to/example.html"
            )
            splitter = HTMLTagSplitter(tag="div")
            output = splitter.split(reader_output)
            print(output["chunks"])
            ```
            ```python
            [
            '<html><body><div>Chunk 1</div></body></html>',
            '<html><body><div>Chunk 2</div></body></html>'
            ]
            ```
        """
        html = reader_output.text
        soup = BeautifulSoup(html, "html.parser")
        tag = self.tag or self._auto_tag(soup)

        elements = soup.find_all(tag)

        chunks: List[str] = []
        buffer = []

        def get_table_header(el):
            """
            If the element or any of its parents is a <table>,
            extract the <thead> or first <tr> inside <table> as the header.
            Return a list of header elements (can be empty).
            """
            # Find the closest table ancestor (or self if it's a table)
            table = el.find_parent("table")
            if el.name == "table":
                table = el

            if not table:
                return []

            # Try to get <thead>
            thead = table.find("thead")
            if thead:
                return [copy.deepcopy(thead)]

            # Else fallback: first <tr> in table (usually header row)
            first_tr = table.find("tr")
            if first_tr:
                # Wrap it in a <thead> tag for consistency
                soup_tmp = BeautifulSoup("", "html.parser")
                thead_tag = soup_tmp.new_tag("thead")
                thead_tag.append(copy.deepcopy(first_tr))
                return [thead_tag]

            return []

        def build_chunk_html(elements):
            chunk_soup = BeautifulSoup("", "html.parser")
            html_tag = chunk_soup.new_tag("html")
            body_tag = chunk_soup.new_tag("body")
            html_tag.append(body_tag)
            chunk_soup.append(html_tag)

            # If the first element is inside a table, prepend the header rows once
            header_elems = []
            if elements:
                header_elems = get_table_header(elements[0])
                for he in header_elems:
                    body_tag.append(he)

            for el in elements:
                # Only append <tr> if it's NOT a duplicate of the header row
                if el.name == "tr":
                    # Check if all children are <th>
                    if all(
                        child.name == "th"
                        for child in el.children
                        if getattr(child, "name", None)
                    ):
                        # This <tr> is a header row (skip it if header already added)
                        if header_elems:
                            continue  # skip, already included in <thead>
                body_tag.append(copy.deepcopy(el))
            return str(chunk_soup)

        for el in elements:
            test_buffer = buffer + [el]
            test_chunk_str = build_chunk_html(test_buffer)
            if len(test_chunk_str) > self.chunk_size and buffer:
                chunk_str = build_chunk_html(buffer)
                chunks.append(chunk_str)
                buffer = [el]
            else:
                buffer.append(el)

        if buffer:
            chunk_str = build_chunk_html(buffer)
            chunks.append(chunk_str)

        chunk_ids = self._generate_chunk_ids(len(chunks))
        metadata = self._default_metadata()

        output = SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="html_tag_splitter",
            split_params={"chunk_size": self.chunk_size, "tag": tag},
            metadata=metadata,
        )
        return output

    def _auto_tag(self, soup: BeautifulSoup) -> str:
        """
        Auto-detect the most repeated tag with the highest (shallowest) level of hierarchy.
        If no repeated tags are found, return the first tag found in <body> or fallback to 'div'.
        """
        from collections import Counter, defaultdict

        body = soup.find("body")
        if not body:
            return "div"

        # Traverse all tags in body, tracking tag: (count, min_depth)
        tag_counter = Counter()
        tag_min_depth = defaultdict(lambda: float("inf"))

        def traverse(el, depth=0):
            for child in el.children:
                if getattr(child, "name", None):
                    tag_counter[child.name] += 1
                    tag_min_depth[child.name] = min(tag_min_depth[child.name], depth)
                    traverse(child, depth + 1)

        traverse(body)

        if not tag_counter:
            # fallback to first tag
            for tag in body.find_all(True, recursive=True):
                return tag.name
            return "div"

        # Find tags with the maximum count
        max_count = max(tag_counter.values())
        candidates = [tag for tag, cnt in tag_counter.items() if cnt == max_count]
        # Of the most frequent, pick the one with the minimum depth (shallowest)
        tag = min(candidates, key=lambda tag: tag_min_depth[tag])
        return tag
split(reader_output)

Splits HTML in reader_output['text'] using the specified tag or, if not specified, automatically selects the most frequent and shallowest tag.

Parameters:

Name Type Description Default
reader_output Dict[str, Any]

Dictionary containing at least a 'text' key (str) and optional document metadata (e.g., 'document_name', 'document_path').

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

Dataclass defining the output structure for all splitters.

Raises:

Type Description
ValueError

If reader_output does not contain a 'text' key or if the HTML cannot be parsed.

Example

from splitter_mr.splitter import HTMLTagSplitter

# This dictionary has been obtained as the output from a Reader object.
reader_output = ReaderOutput(
    text: "<html><body><div>Chunk 1</div><div>Chunk 2</div></body></html>",
    document_name: "example.html",
    document_path: "/path/to/example.html"
)
splitter = HTMLTagSplitter(tag="div")
output = splitter.split(reader_output)
print(output["chunks"])
[
'<html><body><div>Chunk 1</div></body></html>',
'<html><body><div>Chunk 2</div></body></html>'
]

Source code in src/splitter_mr/splitter/splitters/html_tag_splitter.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Splits HTML in `reader_output['text']` using the specified tag or, if not specified,
    automatically selects the most frequent and shallowest tag.

    Args:
        reader_output (Dict[str, Any]): Dictionary containing at least a 'text' key
            (str) and optional document metadata (e.g., 'document_name', 'document_path').

    Returns:
        SplitterOutput: Dataclass defining the output structure for all splitters.

    Raises:
        ValueError: If `reader_output` does not contain a 'text' key or if the HTML cannot be parsed.

    Example:
        ```python
        from splitter_mr.splitter import HTMLTagSplitter

        # This dictionary has been obtained as the output from a Reader object.
        reader_output = ReaderOutput(
            text: "<html><body><div>Chunk 1</div><div>Chunk 2</div></body></html>",
            document_name: "example.html",
            document_path: "/path/to/example.html"
        )
        splitter = HTMLTagSplitter(tag="div")
        output = splitter.split(reader_output)
        print(output["chunks"])
        ```
        ```python
        [
        '<html><body><div>Chunk 1</div></body></html>',
        '<html><body><div>Chunk 2</div></body></html>'
        ]
        ```
    """
    html = reader_output.text
    soup = BeautifulSoup(html, "html.parser")
    tag = self.tag or self._auto_tag(soup)

    elements = soup.find_all(tag)

    chunks: List[str] = []
    buffer = []

    def get_table_header(el):
        """
        If the element or any of its parents is a <table>,
        extract the <thead> or first <tr> inside <table> as the header.
        Return a list of header elements (can be empty).
        """
        # Find the closest table ancestor (or self if it's a table)
        table = el.find_parent("table")
        if el.name == "table":
            table = el

        if not table:
            return []

        # Try to get <thead>
        thead = table.find("thead")
        if thead:
            return [copy.deepcopy(thead)]

        # Else fallback: first <tr> in table (usually header row)
        first_tr = table.find("tr")
        if first_tr:
            # Wrap it in a <thead> tag for consistency
            soup_tmp = BeautifulSoup("", "html.parser")
            thead_tag = soup_tmp.new_tag("thead")
            thead_tag.append(copy.deepcopy(first_tr))
            return [thead_tag]

        return []

    def build_chunk_html(elements):
        chunk_soup = BeautifulSoup("", "html.parser")
        html_tag = chunk_soup.new_tag("html")
        body_tag = chunk_soup.new_tag("body")
        html_tag.append(body_tag)
        chunk_soup.append(html_tag)

        # If the first element is inside a table, prepend the header rows once
        header_elems = []
        if elements:
            header_elems = get_table_header(elements[0])
            for he in header_elems:
                body_tag.append(he)

        for el in elements:
            # Only append <tr> if it's NOT a duplicate of the header row
            if el.name == "tr":
                # Check if all children are <th>
                if all(
                    child.name == "th"
                    for child in el.children
                    if getattr(child, "name", None)
                ):
                    # This <tr> is a header row (skip it if header already added)
                    if header_elems:
                        continue  # skip, already included in <thead>
            body_tag.append(copy.deepcopy(el))
        return str(chunk_soup)

    for el in elements:
        test_buffer = buffer + [el]
        test_chunk_str = build_chunk_html(test_buffer)
        if len(test_chunk_str) > self.chunk_size and buffer:
            chunk_str = build_chunk_html(buffer)
            chunks.append(chunk_str)
            buffer = [el]
        else:
            buffer.append(el)

    if buffer:
        chunk_str = build_chunk_html(buffer)
        chunks.append(chunk_str)

    chunk_ids = self._generate_chunk_ids(len(chunks))
    metadata = self._default_metadata()

    output = SplitterOutput(
        chunks=chunks,
        chunk_id=chunk_ids,
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="html_tag_splitter",
        split_params={"chunk_size": self.chunk_size, "tag": tag},
        metadata=metadata,
    )
    return output

RowColumnSplitter

RowColumnSplitter

Bases: BaseSplitter

RowColumnSplitter splits tabular data (such as CSV, TSV, Markdown tables, or JSON tables) into smaller tables based on rows, columns, or by total character size while preserving row integrity.

This splitter supports several modes:

  • By rows: Split the table into chunks with a fixed number of rows, with optional overlapping rows between chunks.
  • By columns: Split the table into chunks by columns, with optional overlapping columns between chunks.
  • By chunk size: Split the table into markdown-formatted table chunks, where each chunk contains as many complete rows as fit under the specified character limit, optionally overlapping a fixed number of rows between chunks.

This is useful for splitting large tabular files for downstream processing, LLM ingestion, or display, while preserving semantic and structural integrity of the data.

Parameters:

Name Type Description Default
chunk_size int

Maximum number of characters per chunk (when using character-based splitting).

1000
num_rows int

Number of rows per chunk. Mutually exclusive with num_cols.

0
num_cols int

Number of columns per chunk. Mutually exclusive with num_rows.

0
chunk_overlap Union[int, float]

Number of overlapping rows or columns between chunks. If a float in (0,1), interpreted as a percentage of rows or columns. If integer, the number of overlapping rows/columns. When chunking by character size, this refers to the number of overlapping rows (not characters).

0

Supported formats: CSV, TSV, TXT, Markdown table, JSON (tabular: list of dicts or dict of lists).

Source code in src/splitter_mr/splitter/splitters/row_column_splitter.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
class RowColumnSplitter(BaseSplitter):
    """
    RowColumnSplitter splits tabular data (such as CSV, TSV, Markdown tables, or JSON tables)
    into smaller tables based on rows, columns, or by total character size while preserving row integrity.

    This splitter supports several modes:

    - **By rows**: Split the table into chunks with a fixed number of rows, with optional overlapping
        rows between chunks.
    - **By columns**: Split the table into chunks by columns, with optional overlapping columns between chunks.
    - **By chunk size**: Split the table into markdown-formatted table chunks, where each chunk contains
        as many complete rows as fit under the specified character limit, optionally overlapping a fixed
        number of rows between chunks.

    This is useful for splitting large tabular files for downstream processing, LLM ingestion,
    or display, while preserving semantic and structural integrity of the data.

    Args:
        chunk_size (int): Maximum number of characters per chunk (when using character-based splitting).
        num_rows (int): Number of rows per chunk. Mutually exclusive with num_cols.
        num_cols (int): Number of columns per chunk. Mutually exclusive with num_rows.
        chunk_overlap (Union[int, float]): Number of overlapping rows or columns between chunks.
            If a float in (0,1), interpreted as a percentage of rows or columns. If integer, the number of
            overlapping rows/columns. When chunking by character size, this refers to the number of overlapping
            rows (not characters).

    Supported formats: CSV, TSV, TXT, Markdown table, JSON (tabular: list of dicts or dict of lists).
    """

    def __init__(
        self,
        chunk_size: int = 1000,
        num_rows: int = 0,
        num_cols: int = 0,
        chunk_overlap: Union[int, float] = 0,
    ):
        super().__init__(chunk_size)
        self.num_rows = num_rows
        self.num_cols = num_cols
        self.chunk_overlap = chunk_overlap

        if num_rows and num_cols:
            raise ValueError("num_rows and num_cols are mutually exclusive")
        if isinstance(chunk_overlap, float) and chunk_overlap >= 1:
            raise ValueError("chunk_overlap as float must be < 1")

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Splits the input tabular data into multiple markdown table chunks according to the specified
        chunking strategy. Each output chunk is a complete markdown table with header, and will never
        cut a row in half. The overlap is always applied in terms of full rows or columns.

        Args:
            reader_output (Dict[str, Any]):
                Dictionary output from a Reader, containing at least:
                    - 'text': The tabular data as string.
                    - 'conversion_method': Format of the input ('csv', 'tsv', 'markdown', 'json', etc.).
                    - Additional document metadata fields (optional).

        Returns:
            SplitterOutput: Dataclass defining the output structure for all splitters.

        Raises:
            ValueError: If both num_rows and num_cols are set.
            ValueError: If chunk_overlap as float is not in [0,1).
            ValueError: If chunk_size is too small to fit the header and at least one data row.

        Example:
            ```python
            reader_output = ReaderOutput(
                text: '| id | name |\\n|----|------|\\n| 1  | A    |\\n| 2  | B    |\\n| 3  | C    |',
                conversion_method: "markdown",
                document_name: "table.md",
                document_path: "/path/table.md",
            )
            splitter = RowColumnSplitter(chunk_size=80, chunk_overlap=20)
            output = splitter.split(reader_output)
            for chunk in output["chunks"]:
                print("\\n" + str(chunk) + "\\n")
            ```
            ```python
            | id   | name   |
            |------|--------|
            |  1   | A      |
            |  2   | B      |

            | id   | name   |
            |------|--------|
            |  2   | B      |
            |  3   | C      |
            ```
        """
        # Step 1. Parse the table depending on conversion_method
        df = self._load_tabular(reader_output)
        orig_method = reader_output.conversion_method
        col_names = df.columns.tolist()

        # Step 2. Split logic
        chunks = []
        meta_per_chunk = []

        # If splitting strategy is by rows
        if self.num_rows > 0:
            overlap = self._get_overlap(self.num_rows)
            for i in range(
                0,
                len(df),
                self.num_rows - overlap if (self.num_rows - overlap) > 0 else 1,
            ):
                chunk_df = df.iloc[i : i + self.num_rows]
                if not chunk_df.empty:
                    chunk_str = self._to_str(chunk_df, orig_method)
                    chunks.append(chunk_str)
                    meta_per_chunk.append(
                        {"rows": chunk_df.index.tolist(), "type": "row"}
                    )
        # If splitting strategy is by columns
        elif self.num_cols > 0:
            overlap = self._get_overlap(self.num_cols)
            total_cols = len(col_names)
            for i in range(
                0,
                total_cols,
                self.num_cols - overlap if (self.num_cols - overlap) > 0 else 1,
            ):
                sel_cols = col_names[i : i + self.num_cols]
                if sel_cols:
                    chunk_df = df[sel_cols]
                    chunk_str = self._to_str(chunk_df, orig_method, colwise=True)
                    chunks.append(chunk_str)
                    meta_per_chunk.append({"cols": sel_cols, "type": "column"})
        # If splitting strategy is given by the chunk_size
        else:
            header_lines = self._get_markdown_header(df)
            header_length = len(header_lines)

            row_md_list = [self._get_markdown_row(df, i) for i in range(len(df))]
            row_len_list = [len(r) + 1 for r in row_md_list]  # +1 for newline

            if self.chunk_size < header_length + row_len_list[0]:
                raise ValueError(
                    "chunk_size is too small to fit header and at least one row."
                )

            # Compute overlapping and headers in markdown tables
            chunks = []
            meta_per_chunk = []
            i = 0
            n = len(row_md_list)
            overlap = self._get_overlap(1)
            while i < n:
                curr_chunk = []
                curr_len = header_length
                j = i
                while j < n and curr_len + row_len_list[j] <= self.chunk_size:
                    curr_chunk.append(row_md_list[j])
                    curr_len += row_len_list[j]
                    j += 1

                rows_in_chunk = j - i
                chunk_str = header_lines + "\n".join(curr_chunk)
                chunks.append(chunk_str)
                meta_per_chunk.append({"rows": list(range(i, j)), "type": "char_row"})

                # --- compute overlap AFTER we know rows_in_chunk ---
                if isinstance(self.chunk_overlap, float):
                    overlap_rows = int(rows_in_chunk * self.chunk_overlap)
                else:
                    overlap_rows = int(self.chunk_overlap)

                # make sure we don’t loop forever
                overlap_rows = min(overlap_rows, rows_in_chunk - 1)
                i = j - overlap_rows

        # Generate chunk_id
        chunk_ids = self._generate_chunk_ids(len(chunks))

        # Return output
        output = SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="row_column_splitter",
            split_params={
                "chunk_size": self.chunk_size,
                "num_rows": self.num_rows,
                "num_cols": self.num_cols,
                "chunk_overlap": self.chunk_overlap,
            },
            metadata=meta_per_chunk,
        )
        return output

    # Helper functions

    def _get_overlap(self, base: int):
        """
        Returns the overlap value as an integer, based on the configured chunk_overlap.

        If chunk_overlap is a float in (0,1), computes the overlap as a percentage of `base`.
        If chunk_overlap is an integer, returns it directly.

        Args:
            base (int): The base number (rows or columns) to compute the overlap from.
        Returns:
            int: The overlap as an integer.
        """
        if isinstance(self.chunk_overlap, float):
            return int(base * self.chunk_overlap)
        return int(self.chunk_overlap)

    def _load_tabular(self, reader_output: Dict[str, Any]) -> pd.DataFrame:
        """
        Loads and parses the input tabular data from a Reader output dictionary
        into a pandas DataFrame, based on its format.

        If the input is empty, returns an empty DataFrame.
        If the input is malformed (e.g., badly formatted markdown/CSV/TSV), a
        pandas.errors.ParserError is raised.

        Supports Markdown, CSV, TSV, TXT, and tabular JSON.

        Args:
            reader_output (Dict[str, Any]): Dictionary containing the text and conversion_method.

        Returns:
            pd.DataFrame: The loaded table as a DataFrame.

        Raises:
            pandas.errors.ParserError: If the input table is malformed and cannot be parsed.
        """
        text = reader_output.text
        # Return a void dataframe is a empty file is provided
        if not text or not text.strip():
            return pd.DataFrame()
        method = reader_output.conversion_method
        if method == "markdown":
            return self._parse_markdown_table(text)
        elif method == "csv" or method == "txt":
            return pd.read_csv(io.StringIO(text))
        elif method == "tsv":
            return pd.read_csv(io.StringIO(text), sep="\t")
        else:
            # Try JSON
            try:
                js = json.loads(text)
                if isinstance(js, list) and all(isinstance(row, dict) for row in js):
                    return pd.DataFrame(js)
                elif isinstance(js, dict):  # e.g., {col: [vals]}
                    return pd.DataFrame(js)
            except Exception:
                pass
            # Fallback: try CSV
            return pd.read_csv(io.StringIO(text))

    def _parse_markdown_table(self, md: str) -> pd.DataFrame:
        """
        Parses a markdown table string into a pandas DataFrame.

        Ignores non-table lines and trims markdown-specific formatting.
        Also handles the separator line (---) in the header.

        Args:
            md (str): The markdown table as a string.

        Returns:
            pd.DataFrame: Parsed table as a DataFrame.

        Raises:
            pandas.errors.ParserError: If the markdown table is malformed and cannot be parsed.
        """
        # Remove any lines not part of the table (e.g., text before/after)
        table_lines = []
        started = False
        for line in md.splitlines():
            if re.match(r"^\s*\|.*\|\s*$", line):
                started = True
                table_lines.append(line.strip())
            elif started and not line.strip():
                break  # stop at first blank line after table
        table_md = "\n".join(table_lines)
        table_io = io.StringIO(
            re.sub(
                r"^\s*\|",
                "",
                re.sub(r"\|\s*$", "", table_md, flags=re.MULTILINE),
                flags=re.MULTILINE,
            )
        )
        try:
            df = pd.read_csv(table_io, sep="|").rename(
                lambda x: x.strip(), axis="columns"
            )
        except pd.errors.ParserError as e:
            # Propagate the ParserError for your test to catch
            raise pd.errors.ParserError(f"Malformed markdown table: {e}") from e
        if not df.empty and all(re.match(r"^-+$", str(x).strip()) for x in df.iloc[0]):
            df = df.drop(df.index[0]).reset_index(drop=True)
        return df

    def _to_str(self, df: pd.DataFrame, method: str, colwise: bool = False) -> str:
        """
        Converts a DataFrame chunk to a string for output,
        either as a markdown table, CSV, or a list of columns.

        Args:
            df (pd.DataFrame): DataFrame chunk to convert.
            method (str): Input file format (for output style).
            colwise (bool): If True, output as a list of columns (used in column chunking).

        Returns:
            str: The chunk as a formatted string.
        """
        if colwise:
            # List of columns: output as a list of lists
            return (
                "["
                + ", ".join(  # noqa: W503
                    [str([col] + df[col].tolist()) for col in df.columns]  # noqa: W503
                )
                + "]"  # noqa: W503
            )
        if method == "markdown" or "md":
            # Use markdown table format
            return df.to_markdown(index=False)
        else:
            # Default to CSV format
            output = io.StringIO()
            df.to_csv(output, index=False)
            return output.getvalue().strip("\n")

    @staticmethod
    def _get_markdown_header(df):
        """
        Returns the header and separator lines for a markdown table as a string.

        Args:
            df (pd.DataFrame): DataFrame representing the table.

        Returns:
            str: Markdown table header and separator (with trailing newline).
        """

        lines = df.head(0).to_markdown(index=False).splitlines()
        return "\n".join(lines[:2]) + "\n"

    @staticmethod
    def _get_markdown_row(df, row_idx):
        """
        Returns a single row from the DataFrame formatted as a markdown table row.

        Args:
            df (pd.DataFrame): DataFrame containing the table.
            row_idx (int): Index of the row to extract.

        Returns:
            str: The markdown-formatted row string.
        """
        row = df.iloc[[row_idx]]
        # Get the full markdown output (with header),
        # extract only the last line (the data row)
        md = row.to_markdown(index=False).splitlines()
        return md[-1]
split(reader_output)

Splits the input tabular data into multiple markdown table chunks according to the specified chunking strategy. Each output chunk is a complete markdown table with header, and will never cut a row in half. The overlap is always applied in terms of full rows or columns.

Parameters:

Name Type Description Default
reader_output Dict[str, Any]

Dictionary output from a Reader, containing at least: - 'text': The tabular data as string. - 'conversion_method': Format of the input ('csv', 'tsv', 'markdown', 'json', etc.). - Additional document metadata fields (optional).

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

Dataclass defining the output structure for all splitters.

Raises:

Type Description
ValueError

If both num_rows and num_cols are set.

ValueError

If chunk_overlap as float is not in [0,1).

ValueError

If chunk_size is too small to fit the header and at least one data row.

Example

reader_output = ReaderOutput(
    text: '| id | name |\n|----|------|\n| 1  | A    |\n| 2  | B    |\n| 3  | C    |',
    conversion_method: "markdown",
    document_name: "table.md",
    document_path: "/path/table.md",
)
splitter = RowColumnSplitter(chunk_size=80, chunk_overlap=20)
output = splitter.split(reader_output)
for chunk in output["chunks"]:
    print("\n" + str(chunk) + "\n")
| id   | name   |
|------|--------|
|  1   | A      |
|  2   | B      |

| id   | name   |
|------|--------|
|  2   | B      |
|  3   | C      |

Source code in src/splitter_mr/splitter/splitters/row_column_splitter.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Splits the input tabular data into multiple markdown table chunks according to the specified
    chunking strategy. Each output chunk is a complete markdown table with header, and will never
    cut a row in half. The overlap is always applied in terms of full rows or columns.

    Args:
        reader_output (Dict[str, Any]):
            Dictionary output from a Reader, containing at least:
                - 'text': The tabular data as string.
                - 'conversion_method': Format of the input ('csv', 'tsv', 'markdown', 'json', etc.).
                - Additional document metadata fields (optional).

    Returns:
        SplitterOutput: Dataclass defining the output structure for all splitters.

    Raises:
        ValueError: If both num_rows and num_cols are set.
        ValueError: If chunk_overlap as float is not in [0,1).
        ValueError: If chunk_size is too small to fit the header and at least one data row.

    Example:
        ```python
        reader_output = ReaderOutput(
            text: '| id | name |\\n|----|------|\\n| 1  | A    |\\n| 2  | B    |\\n| 3  | C    |',
            conversion_method: "markdown",
            document_name: "table.md",
            document_path: "/path/table.md",
        )
        splitter = RowColumnSplitter(chunk_size=80, chunk_overlap=20)
        output = splitter.split(reader_output)
        for chunk in output["chunks"]:
            print("\\n" + str(chunk) + "\\n")
        ```
        ```python
        | id   | name   |
        |------|--------|
        |  1   | A      |
        |  2   | B      |

        | id   | name   |
        |------|--------|
        |  2   | B      |
        |  3   | C      |
        ```
    """
    # Step 1. Parse the table depending on conversion_method
    df = self._load_tabular(reader_output)
    orig_method = reader_output.conversion_method
    col_names = df.columns.tolist()

    # Step 2. Split logic
    chunks = []
    meta_per_chunk = []

    # If splitting strategy is by rows
    if self.num_rows > 0:
        overlap = self._get_overlap(self.num_rows)
        for i in range(
            0,
            len(df),
            self.num_rows - overlap if (self.num_rows - overlap) > 0 else 1,
        ):
            chunk_df = df.iloc[i : i + self.num_rows]
            if not chunk_df.empty:
                chunk_str = self._to_str(chunk_df, orig_method)
                chunks.append(chunk_str)
                meta_per_chunk.append(
                    {"rows": chunk_df.index.tolist(), "type": "row"}
                )
    # If splitting strategy is by columns
    elif self.num_cols > 0:
        overlap = self._get_overlap(self.num_cols)
        total_cols = len(col_names)
        for i in range(
            0,
            total_cols,
            self.num_cols - overlap if (self.num_cols - overlap) > 0 else 1,
        ):
            sel_cols = col_names[i : i + self.num_cols]
            if sel_cols:
                chunk_df = df[sel_cols]
                chunk_str = self._to_str(chunk_df, orig_method, colwise=True)
                chunks.append(chunk_str)
                meta_per_chunk.append({"cols": sel_cols, "type": "column"})
    # If splitting strategy is given by the chunk_size
    else:
        header_lines = self._get_markdown_header(df)
        header_length = len(header_lines)

        row_md_list = [self._get_markdown_row(df, i) for i in range(len(df))]
        row_len_list = [len(r) + 1 for r in row_md_list]  # +1 for newline

        if self.chunk_size < header_length + row_len_list[0]:
            raise ValueError(
                "chunk_size is too small to fit header and at least one row."
            )

        # Compute overlapping and headers in markdown tables
        chunks = []
        meta_per_chunk = []
        i = 0
        n = len(row_md_list)
        overlap = self._get_overlap(1)
        while i < n:
            curr_chunk = []
            curr_len = header_length
            j = i
            while j < n and curr_len + row_len_list[j] <= self.chunk_size:
                curr_chunk.append(row_md_list[j])
                curr_len += row_len_list[j]
                j += 1

            rows_in_chunk = j - i
            chunk_str = header_lines + "\n".join(curr_chunk)
            chunks.append(chunk_str)
            meta_per_chunk.append({"rows": list(range(i, j)), "type": "char_row"})

            # --- compute overlap AFTER we know rows_in_chunk ---
            if isinstance(self.chunk_overlap, float):
                overlap_rows = int(rows_in_chunk * self.chunk_overlap)
            else:
                overlap_rows = int(self.chunk_overlap)

            # make sure we don’t loop forever
            overlap_rows = min(overlap_rows, rows_in_chunk - 1)
            i = j - overlap_rows

    # Generate chunk_id
    chunk_ids = self._generate_chunk_ids(len(chunks))

    # Return output
    output = SplitterOutput(
        chunks=chunks,
        chunk_id=chunk_ids,
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="row_column_splitter",
        split_params={
            "chunk_size": self.chunk_size,
            "num_rows": self.num_rows,
            "num_cols": self.num_cols,
            "chunk_overlap": self.chunk_overlap,
        },
        metadata=meta_per_chunk,
    )
    return output

CodeSplitter

CodeSplitter

Bases: BaseSplitter

CodeSplitter recursively splits source code into programmatically meaningful chunks (functions, classes, methods, etc.) for the given programming language.

Parameters:

Name Type Description Default
chunk_size int

Maximum chunk size, in characters.

1000
language str

Programming language (e.g., "python", "java", "kotlin", etc.)

'python'
Notes
  • Uses Langchain's RecursiveCharacterTextSplitter and its language-aware from_language method.
  • See Langchain docs: https://python.langchain.com/docs/how_to/code_splitter/
Source code in src/splitter_mr/splitter/splitters/code_splitter.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
class CodeSplitter(BaseSplitter):
    """
    CodeSplitter recursively splits source code into programmatically meaningful chunks
    (functions, classes, methods, etc.) for the given programming language.

    Args:
        chunk_size (int): Maximum chunk size, in characters.
        language (str): Programming language (e.g., "python", "java", "kotlin", etc.)

    Notes:
        - Uses Langchain's RecursiveCharacterTextSplitter and its language-aware `from_language` method.
        - See Langchain docs: https://python.langchain.com/docs/how_to/code_splitter/
    """

    def __init__(
        self,
        chunk_size: int = 1000,
        language: str = "python",
    ):
        super().__init__(chunk_size)
        self.language = language

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Splits code in `reader_output['text']` according to the syntax of the specified
        programming language, using function/class boundaries where possible.

        Args:
            reader_output (ReaderOutput): Object containing at least a 'text' field,
                plus optional document metadata.

        Returns:
            SplitterOutput: Dataclass defining the output structure for all splitters.

        Raises:
            ValueError: If language is not supported.

        Example:
            ```python
            from splitter_mr.splitter import CodeSplitter

            reader_output = ReaderOutput(
                text: "def foo():\\n    pass\\n\\nclass Bar:\\n    def baz(self):\\n        pass",
                document_name: "example.py",
                document_path: "/tmp/example.py"
            )
            splitter = CodeSplitter(chunk_size=50, language="python")
            output = splitter.split(reader_output)
            print(output.chunks)
            ```
            ```python
            ['def foo():\\n    pass\\n', 'class Bar:\\n    def baz(self):\\n        pass']
            ```
        """
        # Initialize variables
        text = reader_output.text
        chunk_size = self.chunk_size

        # Get Langchain language enum
        lang_enum = get_langchain_language(self.language)

        splitter = RecursiveCharacterTextSplitter.from_language(
            language=lang_enum, chunk_size=chunk_size, chunk_overlap=0
        )
        texts = splitter.create_documents([text])
        chunks = [doc.page_content for doc in texts]

        # Generate chunk_id and append metadata
        chunk_ids = self._generate_chunk_ids(len(chunks))
        metadata = self._default_metadata()

        # Return output
        output = SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="code_splitter",
            split_params={"chunk_size": chunk_size, "language": self.language},
            metadata=metadata,
        )
        return output
split(reader_output)

Splits code in reader_output['text'] according to the syntax of the specified programming language, using function/class boundaries where possible.

Parameters:

Name Type Description Default
reader_output ReaderOutput

Object containing at least a 'text' field, plus optional document metadata.

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

Dataclass defining the output structure for all splitters.

Raises:

Type Description
ValueError

If language is not supported.

Example

from splitter_mr.splitter import CodeSplitter

reader_output = ReaderOutput(
    text: "def foo():\n    pass\n\nclass Bar:\n    def baz(self):\n        pass",
    document_name: "example.py",
    document_path: "/tmp/example.py"
)
splitter = CodeSplitter(chunk_size=50, language="python")
output = splitter.split(reader_output)
print(output.chunks)
['def foo():\n    pass\n', 'class Bar:\n    def baz(self):\n        pass']

Source code in src/splitter_mr/splitter/splitters/code_splitter.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Splits code in `reader_output['text']` according to the syntax of the specified
    programming language, using function/class boundaries where possible.

    Args:
        reader_output (ReaderOutput): Object containing at least a 'text' field,
            plus optional document metadata.

    Returns:
        SplitterOutput: Dataclass defining the output structure for all splitters.

    Raises:
        ValueError: If language is not supported.

    Example:
        ```python
        from splitter_mr.splitter import CodeSplitter

        reader_output = ReaderOutput(
            text: "def foo():\\n    pass\\n\\nclass Bar:\\n    def baz(self):\\n        pass",
            document_name: "example.py",
            document_path: "/tmp/example.py"
        )
        splitter = CodeSplitter(chunk_size=50, language="python")
        output = splitter.split(reader_output)
        print(output.chunks)
        ```
        ```python
        ['def foo():\\n    pass\\n', 'class Bar:\\n    def baz(self):\\n        pass']
        ```
    """
    # Initialize variables
    text = reader_output.text
    chunk_size = self.chunk_size

    # Get Langchain language enum
    lang_enum = get_langchain_language(self.language)

    splitter = RecursiveCharacterTextSplitter.from_language(
        language=lang_enum, chunk_size=chunk_size, chunk_overlap=0
    )
    texts = splitter.create_documents([text])
    chunks = [doc.page_content for doc in texts]

    # Generate chunk_id and append metadata
    chunk_ids = self._generate_chunk_ids(len(chunks))
    metadata = self._default_metadata()

    # Return output
    output = SplitterOutput(
        chunks=chunks,
        chunk_id=chunk_ids,
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="code_splitter",
        split_params={"chunk_size": chunk_size, "language": self.language},
        metadata=metadata,
    )
    return output
get_langchain_language(lang_str)

Map a string language name to Langchain Language enum. Raises ValueError if not found.

Source code in src/splitter_mr/splitter/splitters/code_splitter.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
def get_langchain_language(lang_str: str) -> Language:
    """
    Map a string language name to Langchain Language enum.
    Raises ValueError if not found.
    """
    lookup = {lang.name.lower(): lang for lang in Language}
    key = lang_str.lower()
    if key not in lookup:
        raise ValueError(
            f"Unsupported language '{lang_str}'. Supported: {list(lookup.keys())}"
        )
    return lookup[key]

TokenSplitter

TokenSplitter

Bases: BaseSplitter

TokenSplitter splits a given text into chunks based on token counts derived from different tokenization models or libraries.

This splitter supports tokenization via tiktoken (OpenAI tokenizer), spacy (spaCy tokenizer), and nltk (NLTK tokenizer). It allows splitting text into chunks of a maximum number of tokens (chunk_size), using the specified tokenizer model.

Parameters:

Name Type Description Default
chunk_size int

Maximum number of tokens per chunk.

1000
model_name str

Specifies the tokenizer and model in the format tokenizer/model. Supported tokenizers are:

  • tiktoken/gpt-4o (OpenAI GPT-4o tokenizer via tiktoken)
  • spacy/en_core_web_sm (spaCy English model)
  • nltk/punkt (NLTK tokenizer models like punkt)
'tiktoken/cl100k_base'
language str

Language code for NLTK tokenizer (default "english").

'english'
Notes

More info about the splitting methods by Tokens for Langchain: Langchain Docs.

Source code in src/splitter_mr/splitter/splitters/token_splitter.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
class TokenSplitter(BaseSplitter):
    """
    TokenSplitter splits a given text into chunks based on token counts
    derived from different tokenization models or libraries.

    This splitter supports tokenization via `tiktoken` (OpenAI tokenizer),
    `spacy` (spaCy tokenizer), and `nltk` (NLTK tokenizer). It allows splitting
    text into chunks of a maximum number of tokens (`chunk_size`), using the
    specified tokenizer model.

    Args:
        chunk_size (int): Maximum number of tokens per chunk.
        model_name (str): Specifies the tokenizer and model in the format `tokenizer/model`. Supported tokenizers are:

            - `tiktoken/gpt-4o` (OpenAI GPT-4o tokenizer via tiktoken)
            - `spacy/en_core_web_sm` (spaCy English model)
            - `nltk/punkt` (NLTK tokenizer models like punkt)

        language (str): Language code for NLTK tokenizer (default `"english"`).

    Notes:
        More info about the splitting methods by Tokens for Langchain:
        [Langchain Docs](https://python.langchain.com/docs/how_to/split_by_token/).
    """

    def __init__(
        self,
        chunk_size: int = 1000,
        model_name: str = "tiktoken/cl100k_base",
        language: str = "english",
    ):
        super().__init__(chunk_size)
        self.model_name = model_name
        self.language = language

    @staticmethod
    def list_nltk_punkt_languages():
        """Return a sorted list of available punkt models (languages) for NLTK."""
        models = set()
        for base in map(Path, nltk.data.path):
            punkt_dir = base / "tokenizers" / "punkt"
            if punkt_dir.exists():
                models.update(f.stem for f in punkt_dir.glob("*.pickle"))
        return sorted(models)

    def split(self, reader_output: ReaderOutput) -> SplitterOutput:
        """
        Splits the input text from `reader_output` into token-based chunks using
        the specified tokenizer.

        Depending on `model_name`, the splitter chooses the appropriate tokenizer:

        - For `tiktoken`, uses `RecursiveCharacterTextSplitter` with tiktoken encoding.
            e.g.: `tiktoken/cl100k_base`.
        - For `spacy`, uses `SpacyTextSplitter` with the specified spaCy pipeline.
            e.g., `spacy/en_core_web_sm`.
        - For `nltk`, uses `NLTKTextSplitter` with the specified language tokenizer.
            e.g., `nltk/punkt_tab`.

        Automatically downloads spaCy and NLTK models if missing.

        Args:
            reader_output (Dict[str, Any]):
                Dictionary containing at least a 'text' key (str) and optional document metadata,
                such as 'document_name', 'document_path', 'document_id', etc.

        Returns:
            SplitterOutput: Dataclass defining the output structure for all splitters.

        Raises:
            RuntimeError: If a spaCy model specified in `model_name` is not available.
            ValueError: If an unsupported tokenizer is specified in `model_name`.

        Example:
            ```python
            from splitter_mr.splitter import TokenSplitter

            reader_output = ReaderOutput(
                text: "The quick brown fox jumps over the lazy dog. Pack my box with five dozen liquor jugs.",
                document_name: "pangrams.txt",
                document_path: "/https://raw.githubusercontent.com/andreshere00/Splitter_MR/refs/heads/main/data/pangrams.txt",
            )

            splitter = TokenSplitter(chunk_size=10, model_name="tiktoken/gpt-4o")
            output = splitter.split(reader_output)
            print(output.chunks)
            ```
            ```python
            ['The quick brown fox jumps over the lazy dog.',
            'Pack my box with five dozen liquor jugs.']
            ```
        """
        # Initialize variables
        text = reader_output.text
        model_name = self.model_name
        TOKENIZERS = ("tiktoken", "spacy", "nltk")
        tokenizer, model = model_name.split("/")

        if tokenizer == "tiktoken":
            # Check if the model is available in tiktoken
            available_models = tiktoken.list_encoding_names()
            if model not in available_models:
                raise ValueError(
                    f"tiktoken encoding '{model}' is not available. "
                    f"Available encodings are: {available_models}"
                )
            splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
                encoding_name=model,
                chunk_size=self.chunk_size,
                chunk_overlap=0,
            )
        elif tokenizer == "spacy":
            if not spacy.util.is_package(model):
                try:
                    spacy.cli.download(model)
                except Exception as e:
                    print(
                        f"spaCy model '{model}' is not available for download. Error: {e}"
                    )
                    raise RuntimeError(
                        f"spaCy model '{model}' is not available for download."
                    ) from e
            spacy.load(model)
            MAX_SAFE_LENGTH = 1_000_000
            # If text is too long, raise a warning
            if self.chunk_size > MAX_SAFE_LENGTH:
                warnings.warn(
                    "Too many characters: the v2.x parser and NER models require roughly 1GB of temporary memory per 100,000 characters in the input",
                    UserWarning,
                )
            # Set max_length to text length + some buffer
            splitter = SpacyTextSplitter(
                chunk_size=self.chunk_size,
                chunk_overlap=0,
                max_length=MAX_SAFE_LENGTH,
                pipeline=model,
            )
        elif tokenizer == "nltk":
            try:
                nltk.data.find(f"tokenizers/punkt/{self.language}.pickle")
            except LookupError:
                nltk.download("punkt")
            splitter = NLTKTextSplitter(
                chunk_size=self.chunk_size, chunk_overlap=0, language=self.language
            )
        else:
            raise ValueError(
                f"Unsupported tokenizer '{tokenizer}'. Supported tokenizers: {TOKENIZERS}"
            )

        chunks = splitter.split_text(text)

        # Generate chunks_id
        chunk_ids = self._generate_chunk_ids(len(chunks))
        metadata = self._default_metadata()

        # Return output
        output = SplitterOutput(
            chunks=chunks,
            chunk_id=chunk_ids,
            document_name=reader_output.document_name,
            document_path=reader_output.document_path,
            document_id=reader_output.document_id,
            conversion_method=reader_output.conversion_method,
            reader_method=reader_output.reader_method,
            ocr_method=reader_output.ocr_method,
            split_method="token_splitter",
            split_params={
                "chunk_size": self.chunk_size,
                "model_name": self.model_name,
                "language": self.language,
            },
            metadata=metadata,
        )
        return output
list_nltk_punkt_languages() staticmethod

Return a sorted list of available punkt models (languages) for NLTK.

Source code in src/splitter_mr/splitter/splitters/token_splitter.py
52
53
54
55
56
57
58
59
60
@staticmethod
def list_nltk_punkt_languages():
    """Return a sorted list of available punkt models (languages) for NLTK."""
    models = set()
    for base in map(Path, nltk.data.path):
        punkt_dir = base / "tokenizers" / "punkt"
        if punkt_dir.exists():
            models.update(f.stem for f in punkt_dir.glob("*.pickle"))
    return sorted(models)
split(reader_output)

Splits the input text from reader_output into token-based chunks using the specified tokenizer.

Depending on model_name, the splitter chooses the appropriate tokenizer:

  • For tiktoken, uses RecursiveCharacterTextSplitter with tiktoken encoding. e.g.: tiktoken/cl100k_base.
  • For spacy, uses SpacyTextSplitter with the specified spaCy pipeline. e.g., spacy/en_core_web_sm.
  • For nltk, uses NLTKTextSplitter with the specified language tokenizer. e.g., nltk/punkt_tab.

Automatically downloads spaCy and NLTK models if missing.

Parameters:

Name Type Description Default
reader_output Dict[str, Any]

Dictionary containing at least a 'text' key (str) and optional document metadata, such as 'document_name', 'document_path', 'document_id', etc.

required

Returns:

Name Type Description
SplitterOutput SplitterOutput

Dataclass defining the output structure for all splitters.

Raises:

Type Description
RuntimeError

If a spaCy model specified in model_name is not available.

ValueError

If an unsupported tokenizer is specified in model_name.

Example

from splitter_mr.splitter import TokenSplitter

reader_output = ReaderOutput(
    text: "The quick brown fox jumps over the lazy dog. Pack my box with five dozen liquor jugs.",
    document_name: "pangrams.txt",
    document_path: "/https://raw.githubusercontent.com/andreshere00/Splitter_MR/refs/heads/main/data/pangrams.txt",
)

splitter = TokenSplitter(chunk_size=10, model_name="tiktoken/gpt-4o")
output = splitter.split(reader_output)
print(output.chunks)
['The quick brown fox jumps over the lazy dog.',
'Pack my box with five dozen liquor jugs.']

Source code in src/splitter_mr/splitter/splitters/token_splitter.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def split(self, reader_output: ReaderOutput) -> SplitterOutput:
    """
    Splits the input text from `reader_output` into token-based chunks using
    the specified tokenizer.

    Depending on `model_name`, the splitter chooses the appropriate tokenizer:

    - For `tiktoken`, uses `RecursiveCharacterTextSplitter` with tiktoken encoding.
        e.g.: `tiktoken/cl100k_base`.
    - For `spacy`, uses `SpacyTextSplitter` with the specified spaCy pipeline.
        e.g., `spacy/en_core_web_sm`.
    - For `nltk`, uses `NLTKTextSplitter` with the specified language tokenizer.
        e.g., `nltk/punkt_tab`.

    Automatically downloads spaCy and NLTK models if missing.

    Args:
        reader_output (Dict[str, Any]):
            Dictionary containing at least a 'text' key (str) and optional document metadata,
            such as 'document_name', 'document_path', 'document_id', etc.

    Returns:
        SplitterOutput: Dataclass defining the output structure for all splitters.

    Raises:
        RuntimeError: If a spaCy model specified in `model_name` is not available.
        ValueError: If an unsupported tokenizer is specified in `model_name`.

    Example:
        ```python
        from splitter_mr.splitter import TokenSplitter

        reader_output = ReaderOutput(
            text: "The quick brown fox jumps over the lazy dog. Pack my box with five dozen liquor jugs.",
            document_name: "pangrams.txt",
            document_path: "/https://raw.githubusercontent.com/andreshere00/Splitter_MR/refs/heads/main/data/pangrams.txt",
        )

        splitter = TokenSplitter(chunk_size=10, model_name="tiktoken/gpt-4o")
        output = splitter.split(reader_output)
        print(output.chunks)
        ```
        ```python
        ['The quick brown fox jumps over the lazy dog.',
        'Pack my box with five dozen liquor jugs.']
        ```
    """
    # Initialize variables
    text = reader_output.text
    model_name = self.model_name
    TOKENIZERS = ("tiktoken", "spacy", "nltk")
    tokenizer, model = model_name.split("/")

    if tokenizer == "tiktoken":
        # Check if the model is available in tiktoken
        available_models = tiktoken.list_encoding_names()
        if model not in available_models:
            raise ValueError(
                f"tiktoken encoding '{model}' is not available. "
                f"Available encodings are: {available_models}"
            )
        splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
            encoding_name=model,
            chunk_size=self.chunk_size,
            chunk_overlap=0,
        )
    elif tokenizer == "spacy":
        if not spacy.util.is_package(model):
            try:
                spacy.cli.download(model)
            except Exception as e:
                print(
                    f"spaCy model '{model}' is not available for download. Error: {e}"
                )
                raise RuntimeError(
                    f"spaCy model '{model}' is not available for download."
                ) from e
        spacy.load(model)
        MAX_SAFE_LENGTH = 1_000_000
        # If text is too long, raise a warning
        if self.chunk_size > MAX_SAFE_LENGTH:
            warnings.warn(
                "Too many characters: the v2.x parser and NER models require roughly 1GB of temporary memory per 100,000 characters in the input",
                UserWarning,
            )
        # Set max_length to text length + some buffer
        splitter = SpacyTextSplitter(
            chunk_size=self.chunk_size,
            chunk_overlap=0,
            max_length=MAX_SAFE_LENGTH,
            pipeline=model,
        )
    elif tokenizer == "nltk":
        try:
            nltk.data.find(f"tokenizers/punkt/{self.language}.pickle")
        except LookupError:
            nltk.download("punkt")
        splitter = NLTKTextSplitter(
            chunk_size=self.chunk_size, chunk_overlap=0, language=self.language
        )
    else:
        raise ValueError(
            f"Unsupported tokenizer '{tokenizer}'. Supported tokenizers: {TOKENIZERS}"
        )

    chunks = splitter.split_text(text)

    # Generate chunks_id
    chunk_ids = self._generate_chunk_ids(len(chunks))
    metadata = self._default_metadata()

    # Return output
    output = SplitterOutput(
        chunks=chunks,
        chunk_id=chunk_ids,
        document_name=reader_output.document_name,
        document_path=reader_output.document_path,
        document_id=reader_output.document_id,
        conversion_method=reader_output.conversion_method,
        reader_method=reader_output.reader_method,
        ocr_method=reader_output.ocr_method,
        split_method="token_splitter",
        split_params={
            "chunk_size": self.chunk_size,
            "model_name": self.model_name,
            "language": self.language,
        },
        metadata=metadata,
    )
    return output

PagedSplitter

Splits text by pages for documents that have page structure. Each chunk contains a specified number of pages, with optional word overlap.

Coming soon!

SemanticSplitter

Splits text into chunks based on semantic similarity, using an embedding model and a max tokens parameter. Useful for meaningful semantic groupings.

Coming soon!