Skip to content

Token_Cutter¤

TokenCutter pydantic-model ¤

Bases: BaseModel

TokenCutter class for chunking spaCy Doc objects into smaller segments.

based on token count, line breaks, sentences, or custom milestones. Supports overlapping, merging, and export to disk.

Config:

  • default: validation_config

Fields:

Source code in lexos/cutter/token_cutter.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
class TokenCutter(BaseModel, validate_assignment=True):
    """TokenCutter class for chunking spaCy Doc objects into smaller segments.

    based on token count, line breaks, sentences, or custom milestones.
    Supports overlapping, merging, and export to disk.
    """

    chunks: list[list[Doc]] = Field(default=[], description="The list of chunks.")

    docs: Optional[Doc | list[Doc] | Path | str | list[Path | str]] = Field(
        default=None,
        description="The documents to be split.",
    )
    chunksize: Optional[int] = Field(
        default=1000, gt=0, description="The desired chunk size in tokens."
    )
    n: Optional[int] = Field(
        default=None,
        # gt=0, Removed to allow runtime validation via LexosException instead of Pydantic pre-validation for testing coverage.
        description="The number of chunks or the number of lines or sentences per chunk.",
    )
    names: Optional[list[str]] = Field(
        default=[], description="A list of names for the source docs."
    )
    newline: Optional[bool] = Field(
        default=False, description="Whether to chunk by lines."
    )
    merge_threshold: Optional[float] = Field(
        default=0.5, ge=0, le=1, description="The threshold to merge the last segment."
    )
    overlap: Optional[int] = Field(
        default=None, gt=0, description="The number of tokens to overlap."
    )
    output_dir: Optional[Path | str] = Field(
        default=None, description="The output directory to save the chunks to."
    )
    delimiter: str = Field(
        default="_", description="The delimiter to use for the chunk names."
    )
    pad: int = Field(default=3, gt=0, description="The padding for the chunk names.")
    strip_chunks: bool = Field(
        default=True,
        description="Whether to strip leading and trailing whitespace in the chunks.",
    )

    model_config = validation_config

    def __iter__(self) -> Iterator:
        """Iterate over the object's chunks.

        Returns:
            Iterator: An iterator containing the object's chunks.
        """
        return iter(self.chunks)

    def __len__(self):
        """Return the number of source docs in the instance."""
        if not self.docs:
            return 0
        return len(self.docs)

    @staticmethod
    def list_start_end_indexes(arrays: list[np.ndarray]) -> list[tuple[int, int]]:
        """List start and end indexes for a list of numpy arrays.

        Args:
            arrays (list[np.ndarray]): List of numpy arrays.

        Returns:
            list[tuple[int, int]]: List of tuples with start and end indexes.
        """
        indexes = []
        start = 0

        for array in arrays:
            end = start + len(array)
            indexes.append((start, end))
            start = end

        return indexes

    def _apply_merge_threshold(
        self, chunks: list[Doc], force: bool = False
    ) -> list[Doc]:
        """Apply the merge threshold to the last chunk.

        Args:
            chunks (list[Doc]): The list of chunks.
            force (bool, optional): Whether to force the merge. Defaults to False.

        Returns:
            list[Doc]: The list of chunks with the last chunk merged if necessary.

        Notes:
          - Whitespace is supplied between merged chunks.
          - Length of final chunk is measured in number tokens or number of sentences.
        """
        if len(chunks) == 1:
            return chunks
        merge_threshold = (
            self.merge_threshold if self.merge_threshold is not None else 0.5
        )
        if isinstance(self.n, int):
            threshold = max([len(chunk) for chunk in chunks]) * merge_threshold
        else:
            threshold = (
                self.chunksize if self.chunksize is not None else 1
            ) * merge_threshold
        # If the length of the last chunk < threshold, merge it with the previous chunk
        if force is True or len(chunks[-1]) < threshold:
            # Get rid of the last chunk
            last_chunk = chunks.pop(-1)
            # Combine the last two segments into a single doc
            chunks[-1] = Doc.from_docs([chunks[-1], last_chunk])
        return chunks

    def _apply_overlap(
        self,
        chunks: list[Doc],
    ) -> list[Doc]:
        """Create overlapping chunks.

        Args:
            chunks (list[Doc]): A list of spaCy docs.

        Returns:
            list[Doc]: A list of spaCy docs.
        """
        overlapped_chunks = []
        for i, chunk in enumerate(chunks):
            if i < len(chunks) - 1:
                overlap_doc = chunks[i + 1][: self.overlap].as_doc()
                overlapped_doc = Doc.from_docs([chunk, overlap_doc])
                overlapped_chunks.append(overlapped_doc)
            elif i == len(chunks) - 1:
                overlapped_chunks.append(chunk)
        return overlapped_chunks

    def _chunk_doc(
        self,
        doc: Doc,
        attrs: "Sequence[int | str]" = SPACY_ATTRS,
        header: Sequence[int | str] = ENTITY_HEADER,
    ) -> list[Doc]:
        """Split a Doc into chunks.

        Args:
            doc: The Doc to split.
            attrs: The attributes to include in the chunks.
            header: The NER attributes to include in the chunks.

        Returns:
            list[Doc]: List of Doc chunks.
        """
        # Check that the document is not empty
        if len(doc) == 0:
            raise LexosException("Document is empty.")

        # Return the whole doc if it is less than the chunksize
        if self.n is None and self.chunksize is not None and len(doc) <= self.chunksize:
            return [doc]

        # Get the names of the custom extensions
        extension_names = [name for name in doc[0]._.__dict__["_extensions"].keys()]

        # Split the doc into n chunks
        if isinstance(self.n, int):
            chunks_arr = np.array_split(doc.to_array(list(attrs)), self.n)
            # If there is only one chunk, skip the rest of the function
            if len(chunks_arr) == 1:
                return [doc]
        else:
            chunks_arr = np.array_split(
                doc.to_array(list(attrs)),
                np.arange(self.chunksize, len(attrs), self.chunksize),
            )
            # Remove empty elements
            chunks_arr = [x for x in chunks_arr if x.size > 0]

        # Create a list to hold the chunks and get the chunk indexes
        chunks = []
        chunk_indexes = TokenCutter.list_start_end_indexes(chunks_arr)

        # Iterate over the chunks
        for i, chunk in enumerate(chunks_arr):
            # Get chunk start and end indexes
            start = chunk_indexes[i][0]
            end = chunk_indexes[i][1]
            span = doc[start:end]
            words = [token.text for token in span]

            # Make a new doc for the chunk
            new_doc = Doc(doc.vocab, words=words)

            # Add the attributes to the new chunk doc
            new_doc.from_array(list(attrs), chunk)

            # Add entities to the new chunk doc
            if doc.ents and len(doc.ents) > 0:
                ent_array = np.empty((len(chunk), len(header)), dtype="uint64")
                for i, token in enumerate(span):
                    ent_array[i, 0] = token.ent_iob
                    ent_array[i, 1] = token.ent_type
                new_doc.from_array(list(header), ent_array)

            # Add custom attributes to doc
            if len(extension_names) > 0:
                for i, token in enumerate(span):
                    for ext in extension_names:
                        new_doc[i]._.set(ext, token._.get(ext))

            # Add the chunk to the chunks list
            chunks.append(new_doc)

        # Return the list of chunks
        return chunks

    def _keep_milestones_bool(
        self, doc: Doc, milestones: list[Span], keep_spans: bool = False
    ) -> list[Doc]:
        """Split a spaCy Doc into chunks on milestones, optionally keeping milestones.

        Args:
            doc (Doc): The spaCy Doc to split.
            milestones (list[Span]): The milestones to split on.
            keep_spans (bool): Whether to keep the spans in the split strings.

        Returns:
            list[Doc]: A list of spaCy Docs.
        """
        chunks = []
        start = 0
        for span in milestones:
            if span.start == 0 or span.end == doc[-1].i:
                if keep_spans:
                    chunks.append(span)
            elif start < span.start:
                chunks.append(doc[start : span.start])
                if keep_spans:
                    chunks.append(span)
            start = span.end
        if start < len(doc):
            chunks.append(doc[start:])
        return chunks

    def _keep_milestones_following(self, doc: Doc, milestones: list[Span]) -> list[Doc]:
        """Split a spaCy Doc into chunks on milestones preserving milestones in the following chunk.

        Args:
            doc (Doc): The spaCy Doc to split.
            milestones (list[Span]): The milestones to split on.

        Returns:
            list[Doc]: A list of spaCy Docs.
        """
        chunks = []
        start = 0
        for index, span in enumerate(milestones):
            # Text before milestone
            if start < span.start:
                chunks.append(doc[start : span.start])

            # Find end of chunk (next milestone or doc end)
            end = (
                milestones[index + 1].start if index < len(milestones) - 1 else len(doc)
            )

            # Milestone + following text as one chunk
            chunks.append(doc[span.start : end])
            start = end
        return chunks

    def _keep_milestones_preceding(self, doc: Doc, milestones: list[Span]) -> list[Doc]:
        """Split a spaCy Doc into chunks on milestones preserving milestones in the preceding chunk.

        Args:
            doc (Doc): The spaCy Doc to split.
            milestones (list[Span]): The milestones to split on.

        Returns:
            list[Doc]: A list of spaCy Docs.
        """
        # Check that the document is not empty
        if len(doc) == 0:
            raise LexosException("Document is empty.")
        if len(milestones) == 0:
            return [doc]
        chunks = []
        start = 0
        for span in milestones:
            index = span.start
            if index != -1:
                chunks.append(doc[start : index + len(span)])
                start = index + len(span)
        if start < len(doc):
            chunks.append(doc[start:])
        if milestones[0].start == 0:
            _ = chunks.pop(0)
            chunks[0] = doc[: chunks[0].end]
        return chunks

    def _set_attributes(self, **data) -> None:
        """Set attributes after initialization."""
        for key, value in data.items():
            setattr(self, key, value)

    def _split_doc(
        self,
        doc: Doc,
        attrs: Optional[Sequence[int | str]] = SPACY_ATTRS,
        merge_final: Optional[bool] = False,
    ) -> list[Doc]:
        """Split a spaCy doc into chunks by a fixed number of tokens.

        Args:
            doc (Doc): A spaCy doc.
            attrs (Optional[int | str]): The spaCy attributes to include in the chunks.
            merge_final (Optional[bool]): Whether to merge the final segment.

        Returns:
            list[Doc]: A list of spaCy docs.
        """
        if len(doc) == 0:
            raise LexosException("Document is empty.")

        attrs = attrs if attrs is not None else SPACY_ATTRS
        chunks = self._chunk_doc(doc, attrs)
        chunks = self._apply_merge_threshold(
            chunks, force=merge_final if merge_final is not None else False
        )
        if self.overlap:
            chunks = self._apply_overlap(chunks)
        if self.strip_chunks:
            return [strip_doc(chunk) for chunk in chunks]
        # Ensure that all chunks are spaCy docs
        else:
            return [
                chunk.as_doc() if isinstance(chunk, Span) else chunk for chunk in chunks
            ]

    def _split_doc_by_lines(
        self, doc: Doc, merge_final: Optional[bool] = False
    ) -> list[Doc]:
        """Split a spaCy Doc into chunks of n lines.

        Args:
            doc: spaCy Doc to split.
            merge_final: Whether to merge the final segment.

        Returns:
            list[Doc]: Chunks of the doc split by lines.
        """
        if len(doc) == 0:
            raise LexosException("Document is empty.")

        indices = []  # The indices immediately following the newline tokens
        count = 0
        chunks = []
        for token in doc:
            if "\n" in token.text:
                count += 1
                if (
                    self.n is not None and count % self.n == 0
                ):  # Check if it's the nth occurrence
                    indices.append(token.i + 1)
        if len(indices) == 0:
            chunks.append(doc)
        else:
            prev_index = 0
            for index in indices:
                chunks.append(doc[prev_index:index].as_doc())
                prev_index = index
            chunks.append(doc[prev_index:].as_doc())  # Append the remaining elements

        # Ensure there are no empty docs
        chunks = [chunk for chunk in chunks if len(chunk) > 0]

        # Apply the merge threshold and overlap
        chunks = self._apply_merge_threshold(
            chunks, force=merge_final if merge_final is not None else False
        )
        if self.overlap:
            chunks = self._apply_overlap(chunks)

        if self.strip_chunks:
            return [strip_doc(chunk) for chunk in chunks]

        return chunks

    def _split_doc_by_sentences(
        self, doc: Doc, merge_final: Optional[bool] = False
    ) -> list[Doc]:
        """Split a spaCy Doc into chunks of n sentences.

        Args:
            doc: A spaCy Doc object.
            merge_final: Whether to merge the final segment.

        Returns:
            Doc: Chunks containing n sentences each (last chunk may have fewer).
        """
        if len(doc) == 0:
            raise LexosException("Document is empty.")

        try:
            next(doc.sents)
        except (StopIteration, ValueError):
            raise LexosException("The document has no assigned sentences.")

        # Split the doc into chunks of n sentences
        sents = list(doc.sents)
        chunks = []
        n = self.n if self.n is not None else 1
        for i in range(0, len(sents), n):
            chunk_sents = sents[i : i + n]
            start_idx = chunk_sents[0].start
            end_idx = chunk_sents[-1].end
            chunks.append(doc[start_idx:end_idx].as_doc())
        # No need to append doc[end_idx:] since all sentences are already included in the chunks

        # Ensure there are no empty docs
        chunks = [chunk for chunk in chunks if len(chunk) > 0]

        # Apply the merge threshold and overlap
        chunks = self._apply_merge_threshold(
            chunks, force=merge_final if merge_final is not None else False
        )
        if self.overlap:
            chunks = self._apply_overlap(chunks)

        if self.strip_chunks:
            return [strip_doc(chunk) for chunk in chunks]

        return chunks

    def _split_doc_on_milestones(
        self,
        doc: Doc,
        milestones: Span | list[Span],
        keep_spans: Optional[bool | str] = False,
        merge_final: Optional[bool] = False,
    ) -> list[Doc]:
        """Split document on a milestone.

        Args:
            doc (Doc): The document to be split.
            milestones (Span | list[Span]): A Span or list of Spans to be matched.
            keep_spans (Optional[bool | str]): Whether to keep the spans in the split strings. Defaults to False.
            merge_final (Optional[bool]): Whether to force the merge of the last segment. Defaults to False.

        Returns:
            list[Doc]: A list of chunked spaCy Doc objects.
        """
        if len(doc) == 0:
            raise LexosException("Document is empty.")

        milestones = ensure_list(milestones)
        if keep_spans == "following":
            chunks = self._keep_milestones_following(doc, milestones)
        elif keep_spans == "preceding":
            chunks = self._keep_milestones_preceding(doc, milestones)
        else:
            # Only pass a boolean to keep_spans
            chunks = self._keep_milestones_bool(
                doc, milestones, keep_spans=bool(keep_spans)
            )

        # Ensure that all chunks are spaCy docs
        chunks = [
            chunk.as_doc() if isinstance(chunk, Span) else chunk for chunk in chunks
        ]

        # Apply the merge threshold and overlap
        chunks = self._apply_merge_threshold(
            chunks, force=merge_final if merge_final is not None else False
        )
        if self.overlap:
            chunks = self._apply_overlap(chunks)

        if self.strip_chunks:
            return [strip_doc(chunk) for chunk in chunks]

        return chunks

    def _write_chunk(
        self, path: str, n: int, chunk: Doc, output_dir: Path, as_text: bool = True
    ) -> None:
        """Write chunk text to file with formatted name.

        Args:
            path (str): The path of the original file.
            n (int): The number of the chunk.
            chunk (Doc): The chunk to save.
            output_dir (Path): The output directory for the chunk.
            as_text (bool): Whether to save the chunk as a text file or a spaCy Doc object.
        """
        output_file = f"{path}{self.delimiter}{str(n).zfill(self.pad)}.txt"
        output_path = output_dir / output_file
        if as_text:
            with open(output_path, "w", encoding="utf-8") as f:
                f.write(chunk.text)
        else:
            chunk.to_disk(output_path)

    def merge(self, chunks: list[Doc]) -> Doc:
        """Merge a list of chunks into a single Doc.

        Args:
            chunks (list[Doc]): The list of chunks to merge.

        Returns:
            Doc: The merged doc.

        Note:
            - The user_data dict of the docs will be ignored. If they contain information
              that needs to be preserved, it should be stored in the doc extensions.
              See https://github.com/explosion/spaCy/discussions/9106.
        """
        if len(chunks) == 0:
            raise LexosException("No chunks to merge.")
        return Doc.from_docs(chunks)

    @validate_call(config=validation_config)
    def save(
        self,
        output_dir: Path | str,
        names: Optional[str | list[str]] = None,
        delimiter: Optional[str] = "_",
        pad: Optional[int] = 3,
        strip_chunks: Optional[bool] = True,
        as_text: Optional[bool] = True,
    ) -> None:
        """Save the chunks to disk.

        Args:
            output_dir (Path | str): The output directory to save the chunks to.
            names (Optional[str | list[str]]): The doc names.
            delimiter (str): The delimiter to use for the chunk names.
            pad (int): The padding for the chunk names.
            strip_chunks (bool): Whether to strip leading and trailing whitespace in the chunks.
            as_text (Optional[bool]): Whether to save the chunks as text files or spaCy Doc objects (bytes).
        """
        self._set_attributes(
            output_dir=output_dir,
            delimiter=delimiter,
            names=names,
            pad=pad,
            strip_chunks=strip_chunks,
        )
        if not self.chunks or self.chunks == []:
            raise LexosException("No chunks to save.")
        if self.names:
            if len(self.names) != len(self.chunks):
                raise LexosException(
                    f"The number of docs in `names` ({len(self.names)}) must equal the number of docs in `chunks` ({len(self.chunks)})."
                )
        elif self.names == [] or self.names is None:
            self.names = [
                f"doc{str(i + 1).zfill(self.pad)}" for i in range(len(self.chunks))
            ]
        for i, doc in enumerate(self.chunks):
            for num, chunk in enumerate(doc):
                if strip_chunks:
                    chunk = strip_doc(chunk)
                self._write_chunk(
                    self.names[i], num + 1, chunk, Path(output_dir), as_text
                )

    @validate_call(config=validation_config)
    def split(
        self,
        docs: Optional[Doc | list[Doc] | Path | str | list[Path | str]] = None,
        chunksize: Optional[int] = None,
        n: Optional[int] = None,
        merge_threshold: Optional[float] = 0.5,
        overlap: Optional[int] = None,
        names: Optional[str | list[str]] = None,
        newline: Optional[bool] = None,
        strip_chunks: Optional[bool] = True,
        file: Optional[bool] = False,
        model: Optional[str] = None,
        merge_final: Optional[bool] = False,
    ) -> list[list[Doc]]:
        """Split spaCy docs into chunks by a fixed number of tokens.

        Args:
            docs (Optional[Doc | list[Doc] | Path | str | list[Path | str]]): A spaCy doc, list of spaCy docs, or file paths to spaCy docs saved with Doc.to_disk().
            chunksize (Optional[int]): The number of tokens to split on.
            n (Optional[int]): The number of chunks to produce.
            merge_threshold (Optional[float]): The threshold to merge the last segment.
            overlap (Optional[int]): The number of tokens to overlap.
            names (Optional[str | list[str]]): The doc names.
            newline (Optional[bool]): Whether to chunk by lines.
            strip_chunks (Optional[bool]): Whether to strip leading and trailing whitespace in the chunks.
            file (Optional[bool]): Whether to load docs from files using Doc.from_disk().
            model (Optional[str]): The name of the spaCy model to use when loading docs from files. Required when file=True.
            merge_final (Optional[bool]): Whether to force the merge of the last segment.

        Returns:
            list[list[Doc]]: A list of spaCy docs (chunks).
        """
        if docs:
            self.docs = ensure_list(docs)
        if not self.docs:
            raise LexosException("No documents provided for splitting.")
        self._set_attributes(
            chunksize=chunksize,
            n=n,
            merge_threshold=merge_threshold,
            overlap=overlap,
            names=names,
            newline=newline,
            strip_chunks=strip_chunks,
        )

        # Load docs from files if file=True
        if file:
            if model is None:
                raise LexosException("model parameter is required when file=True")
            nlp = spacy.load(model)
            loaded_docs = []
            for doc in ensure_list(docs):
                try:
                    doc = Doc(nlp.vocab).from_disk(doc)
                except ValueError:
                    raise LexosException(
                        f"Error loading doc from disk. Doc file must be in a valid spaCy serialization format: see https://spacy.io/api/doc#to_disk"
                    )
                loaded_docs.append(doc)
            docs = loaded_docs

        if self.newline:
            if not self.n:
                self.n = self.chunksize
            if not self.n or self.n < 1:
                raise LexosException("n must be greater than 0.")
            for doc in ensure_list(docs):
                self.chunks.append(
                    self._split_doc_by_lines(doc, merge_final=merge_final)
                )
        else:
            for doc in ensure_list(docs):
                self.chunks.append(self._split_doc(doc, merge_final=merge_final))

        return self.chunks

    @validate_call(config=validation_config)
    def split_on_milestones(
        self,
        milestones: Span | list[Span],
        docs: Optional[Doc | list[Doc] | Path | str | list[Path | str]] = None,
        merge_threshold: Optional[float] = 0.5,
        merge_final: Optional[bool] = False,
        overlap: Optional[int] = None,
        keep_spans: Optional[bool | str] = False,
        strip_chunks: Optional[bool] = True,
        names: Optional[str | list[str]] = None,
        file: Optional[bool] = False,
        model: Optional[str] = None,
    ) -> list[list[Doc]]:
        """Split document on a milestone.

        Args:
            milestones (Span | list[Span]): A milestone span or list of milestone spans to be matched.
            docs (Optional[Doc | list[Doc] | Path | str | list[Path | str]]): The document(s) to be split, or file paths to spaCy docs saved with Doc.to_disk().
            merge_threshold (Optional[float]): The threshold to merge the last segment.
            merge_final (Optional[bool]): Whether to force the merge of the last segment.
            overlap (Optional[int]): The number of tokens to overlap.
            keep_spans (Optional[bool | str]): Whether to keep the spans in the split strings. Defaults to False.
            strip_chunks (Optional[bool]): Whether to strip leading and trailing whitespace in the chunks.
            names (Optional[str | list[str]]): The doc names.
            file (Optional[bool]): Whether to load docs from files using Doc.from_disk().
            model (Optional[str]): The name of the spaCy model to use when loading docs from files. Required when file=True.

        Returns:
            list[list[Doc]]: A list of spaCy docs (chunks).
        """
        if docs:
            self.docs = ensure_list(docs)
        if not self.docs:
            raise LexosException("No documents provided for splitting.")
        self._set_attributes(
            merge_threshold=merge_threshold,
            overlap=overlap,
            strip_chunks=strip_chunks,
            names=names,
        )

        # Load docs from files if file=True
        if file:
            if model is None:
                raise LexosException("model parameter is required when file=True")
            nlp = spacy.load(model)
            loaded_docs = []
            for doc in ensure_list(docs):
                doc = Doc(nlp.vocab).from_disk(doc)
                loaded_docs.append(doc)
            docs = loaded_docs

        for doc in ensure_list(docs):
            chunks = self._split_doc_on_milestones(
                doc, milestones, keep_spans=keep_spans, merge_final=merge_final
            )
            self.chunks.append(chunks)
        return self.chunks

    @validate_call(config=validation_config)
    def split_on_sentences(
        self,
        docs: Doc | list[Doc] | Path | str | list[Path | str],
        n: Optional[int] = None,
        merge_final: Optional[bool] = False,
        overlap: Optional[int] = None,
        strip_chunks: Optional[bool] = True,
        names: Optional[str | list[str]] = None,
        file: Optional[bool] = False,
        model: Optional[str] = None,
    ) -> list[list[Doc]]:
        """Split spaCy docs into chunks by a fixed number of sentences.

        Args:
            docs (Doc | list[Doc] | Path | str | list[Path | str]): A spaCy doc, list of spaCy docs, or file paths to spaCy docs saved with Doc.to_disk().
            n (Optional[int]): The number of sentences per chunk.
            merge_final (Optional[bool]): Whether to merge the last segment.
            overlap (Optional[int]): The number of tokens to overlap.
            strip_chunks (Optional[bool]): Whether to strip leading and trailing whitespace in the chunks.
            names (Optional[str | list[str]]): The doc names.
            file (Optional[bool]): Whether to load docs from files using Doc.from_disk().
            model (Optional[str]): The name of the spaCy model to use when loading docs from files. Required when file=True.

        Returns:
            list[list[Doc]]: A list of spaCy docs (chunks).

        Raises:
            ValueError: If n is less than or equal to 0.
            ValueError: If the model has no sentences.
        """
        self._set_attributes(
            n=n,
            overlap=overlap,
            strip_chunks=strip_chunks,
            names=names,
        )

        # Load docs from files if file=True
        if file:
            if model is None:
                raise LexosException("model parameter is required when file=True")
            nlp = spacy.load(model)
            loaded_docs = []
            for doc in ensure_list(docs):
                doc = Doc(nlp.vocab).from_disk(doc)
                loaded_docs.append(doc)
            docs = loaded_docs

        if not self.n:
            self.n = self.chunksize
        if not self.n or self.n < 1:
            raise LexosException("n must be greater than 0.")
        for i, doc in enumerate(ensure_list(docs)):
            if not doc.has_annotation("SENT_START"):
                raise LexosException(
                    f"The spaCy model used to create the Doc {i} does not have sentence boundary detection. Please use a model that includes the 'senter' or 'parser' pipeline component."
                )
            else:
                next(doc.sents)
            self.chunks.append(
                self._split_doc_by_sentences(doc, merge_final=merge_final)
            )
        return self.chunks

    @validate_call(config=validation_config)
    def to_dict(self, names: Optional[list[str]] = None) -> dict[str, list[str]]:
        """Return the chunks as a dictionary.

        Args:
            names (Optional[list[str]]): A list of names for the doc Docs.

        Returns:
            dict[str, list[str]]: The chunks as a dictionary.
        """
        if names:
            self.names = names
        if not self.names:
            self.names = [
                f"doc{str(i + 1).zfill(self.pad)}" for i in range(len(self.chunks))
            ]
        return {
            str(name): [chunk.text for chunk in chunks]
            for name, chunks in zip(self.names, self.chunks)
        }

chunks: list[list[Doc]] = [] pydantic-field ¤

The list of chunks.

chunksize: Optional[int] = 1000 pydantic-field ¤

The desired chunk size in tokens.

delimiter: str = '_' pydantic-field ¤

The delimiter to use for the chunk names.

docs: Optional[Doc | list[Doc] | Path | str | list[Path | str]] = None pydantic-field ¤

The documents to be split.

merge_threshold: Optional[float] = 0.5 pydantic-field ¤

The threshold to merge the last segment.

n: Optional[int] = None pydantic-field ¤

The number of chunks or the number of lines or sentences per chunk.

names: Optional[list[str]] = [] pydantic-field ¤

A list of names for the source docs.

newline: Optional[bool] = False pydantic-field ¤

Whether to chunk by lines.

output_dir: Optional[Path | str] = None pydantic-field ¤

The output directory to save the chunks to.

overlap: Optional[int] = None pydantic-field ¤

The number of tokens to overlap.

pad: int = 3 pydantic-field ¤

The padding for the chunk names.

strip_chunks: bool = True pydantic-field ¤

Whether to strip leading and trailing whitespace in the chunks.

__iter__() -> Iterator ¤

Iterate over the object's chunks.

Returns:

Name Type Description
Iterator Iterator

An iterator containing the object's chunks.

Source code in lexos/cutter/token_cutter.py
def __iter__(self) -> Iterator:
    """Iterate over the object's chunks.

    Returns:
        Iterator: An iterator containing the object's chunks.
    """
    return iter(self.chunks)

__len__() ¤

Return the number of source docs in the instance.

Source code in lexos/cutter/token_cutter.py
def __len__(self):
    """Return the number of source docs in the instance."""
    if not self.docs:
        return 0
    return len(self.docs)

list_start_end_indexes(arrays: list[np.ndarray]) -> list[tuple[int, int]] staticmethod ¤

List start and end indexes for a list of numpy arrays.

Parameters:

Name Type Description Default
arrays list[ndarray]

List of numpy arrays.

required

Returns:

Type Description
list[tuple[int, int]]

list[tuple[int, int]]: List of tuples with start and end indexes.

Source code in lexos/cutter/token_cutter.py
@staticmethod
def list_start_end_indexes(arrays: list[np.ndarray]) -> list[tuple[int, int]]:
    """List start and end indexes for a list of numpy arrays.

    Args:
        arrays (list[np.ndarray]): List of numpy arrays.

    Returns:
        list[tuple[int, int]]: List of tuples with start and end indexes.
    """
    indexes = []
    start = 0

    for array in arrays:
        end = start + len(array)
        indexes.append((start, end))
        start = end

    return indexes

merge(chunks: list[Doc]) -> Doc ¤

Merge a list of chunks into a single Doc.

Parameters:

Name Type Description Default
chunks list[Doc]

The list of chunks to merge.

required

Returns:

Name Type Description
Doc Doc

The merged doc.

Note
Source code in lexos/cutter/token_cutter.py
def merge(self, chunks: list[Doc]) -> Doc:
    """Merge a list of chunks into a single Doc.

    Args:
        chunks (list[Doc]): The list of chunks to merge.

    Returns:
        Doc: The merged doc.

    Note:
        - The user_data dict of the docs will be ignored. If they contain information
          that needs to be preserved, it should be stored in the doc extensions.
          See https://github.com/explosion/spaCy/discussions/9106.
    """
    if len(chunks) == 0:
        raise LexosException("No chunks to merge.")
    return Doc.from_docs(chunks)

save(output_dir: Path | str, names: Optional[str | list[str]] = None, delimiter: Optional[str] = '_', pad: Optional[int] = 3, strip_chunks: Optional[bool] = True, as_text: Optional[bool] = True) -> None ¤

Save the chunks to disk.

Parameters:

Name Type Description Default
output_dir Path | str

The output directory to save the chunks to.

required
names Optional[str | list[str]]

The doc names.

None
delimiter str

The delimiter to use for the chunk names.

'_'
pad int

The padding for the chunk names.

3
strip_chunks bool

Whether to strip leading and trailing whitespace in the chunks.

True
as_text Optional[bool]

Whether to save the chunks as text files or spaCy Doc objects (bytes).

True
Source code in lexos/cutter/token_cutter.py
@validate_call(config=validation_config)
def save(
    self,
    output_dir: Path | str,
    names: Optional[str | list[str]] = None,
    delimiter: Optional[str] = "_",
    pad: Optional[int] = 3,
    strip_chunks: Optional[bool] = True,
    as_text: Optional[bool] = True,
) -> None:
    """Save the chunks to disk.

    Args:
        output_dir (Path | str): The output directory to save the chunks to.
        names (Optional[str | list[str]]): The doc names.
        delimiter (str): The delimiter to use for the chunk names.
        pad (int): The padding for the chunk names.
        strip_chunks (bool): Whether to strip leading and trailing whitespace in the chunks.
        as_text (Optional[bool]): Whether to save the chunks as text files or spaCy Doc objects (bytes).
    """
    self._set_attributes(
        output_dir=output_dir,
        delimiter=delimiter,
        names=names,
        pad=pad,
        strip_chunks=strip_chunks,
    )
    if not self.chunks or self.chunks == []:
        raise LexosException("No chunks to save.")
    if self.names:
        if len(self.names) != len(self.chunks):
            raise LexosException(
                f"The number of docs in `names` ({len(self.names)}) must equal the number of docs in `chunks` ({len(self.chunks)})."
            )
    elif self.names == [] or self.names is None:
        self.names = [
            f"doc{str(i + 1).zfill(self.pad)}" for i in range(len(self.chunks))
        ]
    for i, doc in enumerate(self.chunks):
        for num, chunk in enumerate(doc):
            if strip_chunks:
                chunk = strip_doc(chunk)
            self._write_chunk(
                self.names[i], num + 1, chunk, Path(output_dir), as_text
            )

split(docs: Optional[Doc | list[Doc] | Path | str | list[Path | str]] = None, chunksize: Optional[int] = None, n: Optional[int] = None, merge_threshold: Optional[float] = 0.5, overlap: Optional[int] = None, names: Optional[str | list[str]] = None, newline: Optional[bool] = None, strip_chunks: Optional[bool] = True, file: Optional[bool] = False, model: Optional[str] = None, merge_final: Optional[bool] = False) -> list[list[Doc]] ¤

Split spaCy docs into chunks by a fixed number of tokens.

Parameters:

Name Type Description Default
docs Optional[Doc | list[Doc] | Path | str | list[Path | str]]

A spaCy doc, list of spaCy docs, or file paths to spaCy docs saved with Doc.to_disk().

None
chunksize Optional[int]

The number of tokens to split on.

None
n Optional[int]

The number of chunks to produce.

None
merge_threshold Optional[float]

The threshold to merge the last segment.

0.5
overlap Optional[int]

The number of tokens to overlap.

None
names Optional[str | list[str]]

The doc names.

None
newline Optional[bool]

Whether to chunk by lines.

None
strip_chunks Optional[bool]

Whether to strip leading and trailing whitespace in the chunks.

True
file Optional[bool]

Whether to load docs from files using Doc.from_disk().

False
model Optional[str]

The name of the spaCy model to use when loading docs from files. Required when file=True.

None
merge_final Optional[bool]

Whether to force the merge of the last segment.

False

Returns:

Type Description
list[list[Doc]]

list[list[Doc]]: A list of spaCy docs (chunks).

Source code in lexos/cutter/token_cutter.py
@validate_call(config=validation_config)
def split(
    self,
    docs: Optional[Doc | list[Doc] | Path | str | list[Path | str]] = None,
    chunksize: Optional[int] = None,
    n: Optional[int] = None,
    merge_threshold: Optional[float] = 0.5,
    overlap: Optional[int] = None,
    names: Optional[str | list[str]] = None,
    newline: Optional[bool] = None,
    strip_chunks: Optional[bool] = True,
    file: Optional[bool] = False,
    model: Optional[str] = None,
    merge_final: Optional[bool] = False,
) -> list[list[Doc]]:
    """Split spaCy docs into chunks by a fixed number of tokens.

    Args:
        docs (Optional[Doc | list[Doc] | Path | str | list[Path | str]]): A spaCy doc, list of spaCy docs, or file paths to spaCy docs saved with Doc.to_disk().
        chunksize (Optional[int]): The number of tokens to split on.
        n (Optional[int]): The number of chunks to produce.
        merge_threshold (Optional[float]): The threshold to merge the last segment.
        overlap (Optional[int]): The number of tokens to overlap.
        names (Optional[str | list[str]]): The doc names.
        newline (Optional[bool]): Whether to chunk by lines.
        strip_chunks (Optional[bool]): Whether to strip leading and trailing whitespace in the chunks.
        file (Optional[bool]): Whether to load docs from files using Doc.from_disk().
        model (Optional[str]): The name of the spaCy model to use when loading docs from files. Required when file=True.
        merge_final (Optional[bool]): Whether to force the merge of the last segment.

    Returns:
        list[list[Doc]]: A list of spaCy docs (chunks).
    """
    if docs:
        self.docs = ensure_list(docs)
    if not self.docs:
        raise LexosException("No documents provided for splitting.")
    self._set_attributes(
        chunksize=chunksize,
        n=n,
        merge_threshold=merge_threshold,
        overlap=overlap,
        names=names,
        newline=newline,
        strip_chunks=strip_chunks,
    )

    # Load docs from files if file=True
    if file:
        if model is None:
            raise LexosException("model parameter is required when file=True")
        nlp = spacy.load(model)
        loaded_docs = []
        for doc in ensure_list(docs):
            try:
                doc = Doc(nlp.vocab).from_disk(doc)
            except ValueError:
                raise LexosException(
                    f"Error loading doc from disk. Doc file must be in a valid spaCy serialization format: see https://spacy.io/api/doc#to_disk"
                )
            loaded_docs.append(doc)
        docs = loaded_docs

    if self.newline:
        if not self.n:
            self.n = self.chunksize
        if not self.n or self.n < 1:
            raise LexosException("n must be greater than 0.")
        for doc in ensure_list(docs):
            self.chunks.append(
                self._split_doc_by_lines(doc, merge_final=merge_final)
            )
    else:
        for doc in ensure_list(docs):
            self.chunks.append(self._split_doc(doc, merge_final=merge_final))

    return self.chunks

split_on_milestones(milestones: Span | list[Span], docs: Optional[Doc | list[Doc] | Path | str | list[Path | str]] = None, merge_threshold: Optional[float] = 0.5, merge_final: Optional[bool] = False, overlap: Optional[int] = None, keep_spans: Optional[bool | str] = False, strip_chunks: Optional[bool] = True, names: Optional[str | list[str]] = None, file: Optional[bool] = False, model: Optional[str] = None) -> list[list[Doc]] ¤

Split document on a milestone.

Parameters:

Name Type Description Default
milestones Span | list[Span]

A milestone span or list of milestone spans to be matched.

required
docs Optional[Doc | list[Doc] | Path | str | list[Path | str]]

The document(s) to be split, or file paths to spaCy docs saved with Doc.to_disk().

None
merge_threshold Optional[float]

The threshold to merge the last segment.

0.5
merge_final Optional[bool]

Whether to force the merge of the last segment.

False
overlap Optional[int]

The number of tokens to overlap.

None
keep_spans Optional[bool | str]

Whether to keep the spans in the split strings. Defaults to False.

False
strip_chunks Optional[bool]

Whether to strip leading and trailing whitespace in the chunks.

True
names Optional[str | list[str]]

The doc names.

None
file Optional[bool]

Whether to load docs from files using Doc.from_disk().

False
model Optional[str]

The name of the spaCy model to use when loading docs from files. Required when file=True.

None

Returns:

Type Description
list[list[Doc]]

list[list[Doc]]: A list of spaCy docs (chunks).

Source code in lexos/cutter/token_cutter.py
@validate_call(config=validation_config)
def split_on_milestones(
    self,
    milestones: Span | list[Span],
    docs: Optional[Doc | list[Doc] | Path | str | list[Path | str]] = None,
    merge_threshold: Optional[float] = 0.5,
    merge_final: Optional[bool] = False,
    overlap: Optional[int] = None,
    keep_spans: Optional[bool | str] = False,
    strip_chunks: Optional[bool] = True,
    names: Optional[str | list[str]] = None,
    file: Optional[bool] = False,
    model: Optional[str] = None,
) -> list[list[Doc]]:
    """Split document on a milestone.

    Args:
        milestones (Span | list[Span]): A milestone span or list of milestone spans to be matched.
        docs (Optional[Doc | list[Doc] | Path | str | list[Path | str]]): The document(s) to be split, or file paths to spaCy docs saved with Doc.to_disk().
        merge_threshold (Optional[float]): The threshold to merge the last segment.
        merge_final (Optional[bool]): Whether to force the merge of the last segment.
        overlap (Optional[int]): The number of tokens to overlap.
        keep_spans (Optional[bool | str]): Whether to keep the spans in the split strings. Defaults to False.
        strip_chunks (Optional[bool]): Whether to strip leading and trailing whitespace in the chunks.
        names (Optional[str | list[str]]): The doc names.
        file (Optional[bool]): Whether to load docs from files using Doc.from_disk().
        model (Optional[str]): The name of the spaCy model to use when loading docs from files. Required when file=True.

    Returns:
        list[list[Doc]]: A list of spaCy docs (chunks).
    """
    if docs:
        self.docs = ensure_list(docs)
    if not self.docs:
        raise LexosException("No documents provided for splitting.")
    self._set_attributes(
        merge_threshold=merge_threshold,
        overlap=overlap,
        strip_chunks=strip_chunks,
        names=names,
    )

    # Load docs from files if file=True
    if file:
        if model is None:
            raise LexosException("model parameter is required when file=True")
        nlp = spacy.load(model)
        loaded_docs = []
        for doc in ensure_list(docs):
            doc = Doc(nlp.vocab).from_disk(doc)
            loaded_docs.append(doc)
        docs = loaded_docs

    for doc in ensure_list(docs):
        chunks = self._split_doc_on_milestones(
            doc, milestones, keep_spans=keep_spans, merge_final=merge_final
        )
        self.chunks.append(chunks)
    return self.chunks

split_on_sentences(docs: Doc | list[Doc] | Path | str | list[Path | str], n: Optional[int] = None, merge_final: Optional[bool] = False, overlap: Optional[int] = None, strip_chunks: Optional[bool] = True, names: Optional[str | list[str]] = None, file: Optional[bool] = False, model: Optional[str] = None) -> list[list[Doc]] ¤

Split spaCy docs into chunks by a fixed number of sentences.

Parameters:

Name Type Description Default
docs Doc | list[Doc] | Path | str | list[Path | str]

A spaCy doc, list of spaCy docs, or file paths to spaCy docs saved with Doc.to_disk().

required
n Optional[int]

The number of sentences per chunk.

None
merge_final Optional[bool]

Whether to merge the last segment.

False
overlap Optional[int]

The number of tokens to overlap.

None
strip_chunks Optional[bool]

Whether to strip leading and trailing whitespace in the chunks.

True
names Optional[str | list[str]]

The doc names.

None
file Optional[bool]

Whether to load docs from files using Doc.from_disk().

False
model Optional[str]

The name of the spaCy model to use when loading docs from files. Required when file=True.

None

Returns:

Type Description
list[list[Doc]]

list[list[Doc]]: A list of spaCy docs (chunks).

Raises:

Type Description
ValueError

If n is less than or equal to 0.

ValueError

If the model has no sentences.

Source code in lexos/cutter/token_cutter.py
@validate_call(config=validation_config)
def split_on_sentences(
    self,
    docs: Doc | list[Doc] | Path | str | list[Path | str],
    n: Optional[int] = None,
    merge_final: Optional[bool] = False,
    overlap: Optional[int] = None,
    strip_chunks: Optional[bool] = True,
    names: Optional[str | list[str]] = None,
    file: Optional[bool] = False,
    model: Optional[str] = None,
) -> list[list[Doc]]:
    """Split spaCy docs into chunks by a fixed number of sentences.

    Args:
        docs (Doc | list[Doc] | Path | str | list[Path | str]): A spaCy doc, list of spaCy docs, or file paths to spaCy docs saved with Doc.to_disk().
        n (Optional[int]): The number of sentences per chunk.
        merge_final (Optional[bool]): Whether to merge the last segment.
        overlap (Optional[int]): The number of tokens to overlap.
        strip_chunks (Optional[bool]): Whether to strip leading and trailing whitespace in the chunks.
        names (Optional[str | list[str]]): The doc names.
        file (Optional[bool]): Whether to load docs from files using Doc.from_disk().
        model (Optional[str]): The name of the spaCy model to use when loading docs from files. Required when file=True.

    Returns:
        list[list[Doc]]: A list of spaCy docs (chunks).

    Raises:
        ValueError: If n is less than or equal to 0.
        ValueError: If the model has no sentences.
    """
    self._set_attributes(
        n=n,
        overlap=overlap,
        strip_chunks=strip_chunks,
        names=names,
    )

    # Load docs from files if file=True
    if file:
        if model is None:
            raise LexosException("model parameter is required when file=True")
        nlp = spacy.load(model)
        loaded_docs = []
        for doc in ensure_list(docs):
            doc = Doc(nlp.vocab).from_disk(doc)
            loaded_docs.append(doc)
        docs = loaded_docs

    if not self.n:
        self.n = self.chunksize
    if not self.n or self.n < 1:
        raise LexosException("n must be greater than 0.")
    for i, doc in enumerate(ensure_list(docs)):
        if not doc.has_annotation("SENT_START"):
            raise LexosException(
                f"The spaCy model used to create the Doc {i} does not have sentence boundary detection. Please use a model that includes the 'senter' or 'parser' pipeline component."
            )
        else:
            next(doc.sents)
        self.chunks.append(
            self._split_doc_by_sentences(doc, merge_final=merge_final)
        )
    return self.chunks

to_dict(names: Optional[list[str]] = None) -> dict[str, list[str]] ¤

Return the chunks as a dictionary.

Parameters:

Name Type Description Default
names Optional[list[str]]

A list of names for the doc Docs.

None

Returns:

Type Description
dict[str, list[str]]

dict[str, list[str]]: The chunks as a dictionary.

Source code in lexos/cutter/token_cutter.py
@validate_call(config=validation_config)
def to_dict(self, names: Optional[list[str]] = None) -> dict[str, list[str]]:
    """Return the chunks as a dictionary.

    Args:
        names (Optional[list[str]]): A list of names for the doc Docs.

    Returns:
        dict[str, list[str]]: The chunks as a dictionary.
    """
    if names:
        self.names = names
    if not self.names:
        self.names = [
            f"doc{str(i + 1).zfill(self.pad)}" for i in range(len(self.chunks))
        ]
    return {
        str(name): [chunk.text for chunk in chunks]
        for name, chunks in zip(self.names, self.chunks)
    }
rendering:
  show_root_heading: true
  heading_level: 3

__iter__() -> Iterator ¤

Iterate over the object's chunks.

Returns:

Name Type Description
Iterator Iterator

An iterator containing the object's chunks.

Source code in lexos/cutter/token_cutter.py
def __iter__(self) -> Iterator:
    """Iterate over the object's chunks.

    Returns:
        Iterator: An iterator containing the object's chunks.
    """
    return iter(self.chunks)
rendering:
  show_root_heading: true
  heading_level: 3

__len__() ¤

Return the number of source docs in the instance.

Source code in lexos/cutter/token_cutter.py
def __len__(self):
    """Return the number of source docs in the instance."""
    if not self.docs:
        return 0
    return len(self.docs)
rendering:
  show_root_heading: true
  heading_level: 3

list_start_end_indexes(arrays: list[np.ndarray]) -> list[tuple[int, int]] staticmethod ¤

List start and end indexes for a list of numpy arrays.

Parameters:

Name Type Description Default
arrays list[ndarray]

List of numpy arrays.

required

Returns:

Type Description
list[tuple[int, int]]

list[tuple[int, int]]: List of tuples with start and end indexes.

Source code in lexos/cutter/token_cutter.py
@staticmethod
def list_start_end_indexes(arrays: list[np.ndarray]) -> list[tuple[int, int]]:
    """List start and end indexes for a list of numpy arrays.

    Args:
        arrays (list[np.ndarray]): List of numpy arrays.

    Returns:
        list[tuple[int, int]]: List of tuples with start and end indexes.
    """
    indexes = []
    start = 0

    for array in arrays:
        end = start + len(array)
        indexes.append((start, end))
        start = end

    return indexes
rendering:
  show_root_heading: true
  heading_level: 3

_apply_merge_threshold(chunks: list[Doc], force: bool = False) -> list[Doc] ¤

Apply the merge threshold to the last chunk.

Parameters:

Name Type Description Default
chunks list[Doc]

The list of chunks.

required
force bool

Whether to force the merge. Defaults to False.

False

Returns:

Type Description
list[Doc]

list[Doc]: The list of chunks with the last chunk merged if necessary.

Notes
  • Whitespace is supplied between merged chunks.
  • Length of final chunk is measured in number tokens or number of sentences.
Source code in lexos/cutter/token_cutter.py
def _apply_merge_threshold(
    self, chunks: list[Doc], force: bool = False
) -> list[Doc]:
    """Apply the merge threshold to the last chunk.

    Args:
        chunks (list[Doc]): The list of chunks.
        force (bool, optional): Whether to force the merge. Defaults to False.

    Returns:
        list[Doc]: The list of chunks with the last chunk merged if necessary.

    Notes:
      - Whitespace is supplied between merged chunks.
      - Length of final chunk is measured in number tokens or number of sentences.
    """
    if len(chunks) == 1:
        return chunks
    merge_threshold = (
        self.merge_threshold if self.merge_threshold is not None else 0.5
    )
    if isinstance(self.n, int):
        threshold = max([len(chunk) for chunk in chunks]) * merge_threshold
    else:
        threshold = (
            self.chunksize if self.chunksize is not None else 1
        ) * merge_threshold
    # If the length of the last chunk < threshold, merge it with the previous chunk
    if force is True or len(chunks[-1]) < threshold:
        # Get rid of the last chunk
        last_chunk = chunks.pop(-1)
        # Combine the last two segments into a single doc
        chunks[-1] = Doc.from_docs([chunks[-1], last_chunk])
    return chunks
rendering:
  show_root_heading: true
  heading_level: 3

_apply_overlap(chunks: list[Doc]) -> list[Doc] ¤

Create overlapping chunks.

Parameters:

Name Type Description Default
chunks list[Doc]

A list of spaCy docs.

required

Returns:

Type Description
list[Doc]

list[Doc]: A list of spaCy docs.

Source code in lexos/cutter/token_cutter.py
def _apply_overlap(
    self,
    chunks: list[Doc],
) -> list[Doc]:
    """Create overlapping chunks.

    Args:
        chunks (list[Doc]): A list of spaCy docs.

    Returns:
        list[Doc]: A list of spaCy docs.
    """
    overlapped_chunks = []
    for i, chunk in enumerate(chunks):
        if i < len(chunks) - 1:
            overlap_doc = chunks[i + 1][: self.overlap].as_doc()
            overlapped_doc = Doc.from_docs([chunk, overlap_doc])
            overlapped_chunks.append(overlapped_doc)
        elif i == len(chunks) - 1:
            overlapped_chunks.append(chunk)
    return overlapped_chunks
rendering:
  show_root_heading: true
  heading_level: 3

_chunk_doc(doc: Doc, attrs: Sequence[int | str] = SPACY_ATTRS, header: Sequence[int | str] = ENTITY_HEADER) -> list[Doc] ¤

Split a Doc into chunks.

Parameters:

Name Type Description Default
doc Doc

The Doc to split.

required
attrs Sequence[int | str]

The attributes to include in the chunks.

SPACY_ATTRS
header Sequence[int | str]

The NER attributes to include in the chunks.

ENTITY_HEADER

Returns:

Type Description
list[Doc]

list[Doc]: List of Doc chunks.

Source code in lexos/cutter/token_cutter.py
def _chunk_doc(
    self,
    doc: Doc,
    attrs: "Sequence[int | str]" = SPACY_ATTRS,
    header: Sequence[int | str] = ENTITY_HEADER,
) -> list[Doc]:
    """Split a Doc into chunks.

    Args:
        doc: The Doc to split.
        attrs: The attributes to include in the chunks.
        header: The NER attributes to include in the chunks.

    Returns:
        list[Doc]: List of Doc chunks.
    """
    # Check that the document is not empty
    if len(doc) == 0:
        raise LexosException("Document is empty.")

    # Return the whole doc if it is less than the chunksize
    if self.n is None and self.chunksize is not None and len(doc) <= self.chunksize:
        return [doc]

    # Get the names of the custom extensions
    extension_names = [name for name in doc[0]._.__dict__["_extensions"].keys()]

    # Split the doc into n chunks
    if isinstance(self.n, int):
        chunks_arr = np.array_split(doc.to_array(list(attrs)), self.n)
        # If there is only one chunk, skip the rest of the function
        if len(chunks_arr) == 1:
            return [doc]
    else:
        chunks_arr = np.array_split(
            doc.to_array(list(attrs)),
            np.arange(self.chunksize, len(attrs), self.chunksize),
        )
        # Remove empty elements
        chunks_arr = [x for x in chunks_arr if x.size > 0]

    # Create a list to hold the chunks and get the chunk indexes
    chunks = []
    chunk_indexes = TokenCutter.list_start_end_indexes(chunks_arr)

    # Iterate over the chunks
    for i, chunk in enumerate(chunks_arr):
        # Get chunk start and end indexes
        start = chunk_indexes[i][0]
        end = chunk_indexes[i][1]
        span = doc[start:end]
        words = [token.text for token in span]

        # Make a new doc for the chunk
        new_doc = Doc(doc.vocab, words=words)

        # Add the attributes to the new chunk doc
        new_doc.from_array(list(attrs), chunk)

        # Add entities to the new chunk doc
        if doc.ents and len(doc.ents) > 0:
            ent_array = np.empty((len(chunk), len(header)), dtype="uint64")
            for i, token in enumerate(span):
                ent_array[i, 0] = token.ent_iob
                ent_array[i, 1] = token.ent_type
            new_doc.from_array(list(header), ent_array)

        # Add custom attributes to doc
        if len(extension_names) > 0:
            for i, token in enumerate(span):
                for ext in extension_names:
                    new_doc[i]._.set(ext, token._.get(ext))

        # Add the chunk to the chunks list
        chunks.append(new_doc)

    # Return the list of chunks
    return chunks
rendering:
  show_root_heading: true
  heading_level: 3

_keep_milestones_bool(doc: Doc, milestones: list[Span], keep_spans: bool = False) -> list[Doc] ¤

Split a spaCy Doc into chunks on milestones, optionally keeping milestones.

Parameters:

Name Type Description Default
doc Doc

The spaCy Doc to split.

required
milestones list[Span]

The milestones to split on.

required
keep_spans bool

Whether to keep the spans in the split strings.

False

Returns:

Type Description
list[Doc]

list[Doc]: A list of spaCy Docs.

Source code in lexos/cutter/token_cutter.py
def _keep_milestones_bool(
    self, doc: Doc, milestones: list[Span], keep_spans: bool = False
) -> list[Doc]:
    """Split a spaCy Doc into chunks on milestones, optionally keeping milestones.

    Args:
        doc (Doc): The spaCy Doc to split.
        milestones (list[Span]): The milestones to split on.
        keep_spans (bool): Whether to keep the spans in the split strings.

    Returns:
        list[Doc]: A list of spaCy Docs.
    """
    chunks = []
    start = 0
    for span in milestones:
        if span.start == 0 or span.end == doc[-1].i:
            if keep_spans:
                chunks.append(span)
        elif start < span.start:
            chunks.append(doc[start : span.start])
            if keep_spans:
                chunks.append(span)
        start = span.end
    if start < len(doc):
        chunks.append(doc[start:])
    return chunks
rendering:
  show_root_heading: true
  heading_level: 3

_keep_milestones_following(doc: Doc, milestones: list[Span]) -> list[Doc] ¤

Split a spaCy Doc into chunks on milestones preserving milestones in the following chunk.

Parameters:

Name Type Description Default
doc Doc

The spaCy Doc to split.

required
milestones list[Span]

The milestones to split on.

required

Returns:

Type Description
list[Doc]

list[Doc]: A list of spaCy Docs.

Source code in lexos/cutter/token_cutter.py
def _keep_milestones_following(self, doc: Doc, milestones: list[Span]) -> list[Doc]:
    """Split a spaCy Doc into chunks on milestones preserving milestones in the following chunk.

    Args:
        doc (Doc): The spaCy Doc to split.
        milestones (list[Span]): The milestones to split on.

    Returns:
        list[Doc]: A list of spaCy Docs.
    """
    chunks = []
    start = 0
    for index, span in enumerate(milestones):
        # Text before milestone
        if start < span.start:
            chunks.append(doc[start : span.start])

        # Find end of chunk (next milestone or doc end)
        end = (
            milestones[index + 1].start if index < len(milestones) - 1 else len(doc)
        )

        # Milestone + following text as one chunk
        chunks.append(doc[span.start : end])
        start = end
    return chunks
rendering:
  show_root_heading: true
  heading_level: 3

_keep_milestones_preceding(doc: Doc, milestones: list[Span]) -> list[Doc] ¤

Split a spaCy Doc into chunks on milestones preserving milestones in the preceding chunk.

Parameters:

Name Type Description Default
doc Doc

The spaCy Doc to split.

required
milestones list[Span]

The milestones to split on.

required

Returns:

Type Description
list[Doc]

list[Doc]: A list of spaCy Docs.

Source code in lexos/cutter/token_cutter.py
def _keep_milestones_preceding(self, doc: Doc, milestones: list[Span]) -> list[Doc]:
    """Split a spaCy Doc into chunks on milestones preserving milestones in the preceding chunk.

    Args:
        doc (Doc): The spaCy Doc to split.
        milestones (list[Span]): The milestones to split on.

    Returns:
        list[Doc]: A list of spaCy Docs.
    """
    # Check that the document is not empty
    if len(doc) == 0:
        raise LexosException("Document is empty.")
    if len(milestones) == 0:
        return [doc]
    chunks = []
    start = 0
    for span in milestones:
        index = span.start
        if index != -1:
            chunks.append(doc[start : index + len(span)])
            start = index + len(span)
    if start < len(doc):
        chunks.append(doc[start:])
    if milestones[0].start == 0:
        _ = chunks.pop(0)
        chunks[0] = doc[: chunks[0].end]
    return chunks
rendering:
  show_root_heading: true
  heading_level: 3

_set_attributes(**data) -> None ¤

Set attributes after initialization.

Source code in lexos/cutter/token_cutter.py
def _set_attributes(self, **data) -> None:
    """Set attributes after initialization."""
    for key, value in data.items():
        setattr(self, key, value)
rendering:
  show_root_heading: true
  heading_level: 3

_split_doc(doc: Doc, attrs: Optional[Sequence[int | str]] = SPACY_ATTRS, merge_final: Optional[bool] = False) -> list[Doc] ¤

Split a spaCy doc into chunks by a fixed number of tokens.

Parameters:

Name Type Description Default
doc Doc

A spaCy doc.

required
attrs Optional[int | str]

The spaCy attributes to include in the chunks.

SPACY_ATTRS
merge_final Optional[bool]

Whether to merge the final segment.

False

Returns:

Type Description
list[Doc]

list[Doc]: A list of spaCy docs.

Source code in lexos/cutter/token_cutter.py
def _split_doc(
    self,
    doc: Doc,
    attrs: Optional[Sequence[int | str]] = SPACY_ATTRS,
    merge_final: Optional[bool] = False,
) -> list[Doc]:
    """Split a spaCy doc into chunks by a fixed number of tokens.

    Args:
        doc (Doc): A spaCy doc.
        attrs (Optional[int | str]): The spaCy attributes to include in the chunks.
        merge_final (Optional[bool]): Whether to merge the final segment.

    Returns:
        list[Doc]: A list of spaCy docs.
    """
    if len(doc) == 0:
        raise LexosException("Document is empty.")

    attrs = attrs if attrs is not None else SPACY_ATTRS
    chunks = self._chunk_doc(doc, attrs)
    chunks = self._apply_merge_threshold(
        chunks, force=merge_final if merge_final is not None else False
    )
    if self.overlap:
        chunks = self._apply_overlap(chunks)
    if self.strip_chunks:
        return [strip_doc(chunk) for chunk in chunks]
    # Ensure that all chunks are spaCy docs
    else:
        return [
            chunk.as_doc() if isinstance(chunk, Span) else chunk for chunk in chunks
        ]
rendering:
  show_root_heading: true
  heading_level: 3

_split_doc_by_lines(doc: Doc, merge_final: Optional[bool] = False) -> list[Doc] ¤

Split a spaCy Doc into chunks of n lines.

Parameters:

Name Type Description Default
doc Doc

spaCy Doc to split.

required
merge_final Optional[bool]

Whether to merge the final segment.

False

Returns:

Type Description
list[Doc]

list[Doc]: Chunks of the doc split by lines.

Source code in lexos/cutter/token_cutter.py
def _split_doc_by_lines(
    self, doc: Doc, merge_final: Optional[bool] = False
) -> list[Doc]:
    """Split a spaCy Doc into chunks of n lines.

    Args:
        doc: spaCy Doc to split.
        merge_final: Whether to merge the final segment.

    Returns:
        list[Doc]: Chunks of the doc split by lines.
    """
    if len(doc) == 0:
        raise LexosException("Document is empty.")

    indices = []  # The indices immediately following the newline tokens
    count = 0
    chunks = []
    for token in doc:
        if "\n" in token.text:
            count += 1
            if (
                self.n is not None and count % self.n == 0
            ):  # Check if it's the nth occurrence
                indices.append(token.i + 1)
    if len(indices) == 0:
        chunks.append(doc)
    else:
        prev_index = 0
        for index in indices:
            chunks.append(doc[prev_index:index].as_doc())
            prev_index = index
        chunks.append(doc[prev_index:].as_doc())  # Append the remaining elements

    # Ensure there are no empty docs
    chunks = [chunk for chunk in chunks if len(chunk) > 0]

    # Apply the merge threshold and overlap
    chunks = self._apply_merge_threshold(
        chunks, force=merge_final if merge_final is not None else False
    )
    if self.overlap:
        chunks = self._apply_overlap(chunks)

    if self.strip_chunks:
        return [strip_doc(chunk) for chunk in chunks]

    return chunks
rendering:
  show_root_heading: true
  heading_level: 3

_split_doc_by_sentences(doc: Doc, merge_final: Optional[bool] = False) -> list[Doc] ¤

Split a spaCy Doc into chunks of n sentences.

Parameters:

Name Type Description Default
doc Doc

A spaCy Doc object.

required
merge_final Optional[bool]

Whether to merge the final segment.

False

Returns:

Name Type Description
Doc list[Doc]

Chunks containing n sentences each (last chunk may have fewer).

Source code in lexos/cutter/token_cutter.py
def _split_doc_by_sentences(
    self, doc: Doc, merge_final: Optional[bool] = False
) -> list[Doc]:
    """Split a spaCy Doc into chunks of n sentences.

    Args:
        doc: A spaCy Doc object.
        merge_final: Whether to merge the final segment.

    Returns:
        Doc: Chunks containing n sentences each (last chunk may have fewer).
    """
    if len(doc) == 0:
        raise LexosException("Document is empty.")

    try:
        next(doc.sents)
    except (StopIteration, ValueError):
        raise LexosException("The document has no assigned sentences.")

    # Split the doc into chunks of n sentences
    sents = list(doc.sents)
    chunks = []
    n = self.n if self.n is not None else 1
    for i in range(0, len(sents), n):
        chunk_sents = sents[i : i + n]
        start_idx = chunk_sents[0].start
        end_idx = chunk_sents[-1].end
        chunks.append(doc[start_idx:end_idx].as_doc())
    # No need to append doc[end_idx:] since all sentences are already included in the chunks

    # Ensure there are no empty docs
    chunks = [chunk for chunk in chunks if len(chunk) > 0]

    # Apply the merge threshold and overlap
    chunks = self._apply_merge_threshold(
        chunks, force=merge_final if merge_final is not None else False
    )
    if self.overlap:
        chunks = self._apply_overlap(chunks)

    if self.strip_chunks:
        return [strip_doc(chunk) for chunk in chunks]

    return chunks
rendering:
  show_root_heading: true
  heading_level: 3

_split_doc_on_milestones(doc: Doc, milestones: Span | list[Span], keep_spans: Optional[bool | str] = False, merge_final: Optional[bool] = False) -> list[Doc] ¤

Split document on a milestone.

Parameters:

Name Type Description Default
doc Doc

The document to be split.

required
milestones Span | list[Span]

A Span or list of Spans to be matched.

required
keep_spans Optional[bool | str]

Whether to keep the spans in the split strings. Defaults to False.

False
merge_final Optional[bool]

Whether to force the merge of the last segment. Defaults to False.

False

Returns:

Type Description
list[Doc]

list[Doc]: A list of chunked spaCy Doc objects.

Source code in lexos/cutter/token_cutter.py
def _split_doc_on_milestones(
    self,
    doc: Doc,
    milestones: Span | list[Span],
    keep_spans: Optional[bool | str] = False,
    merge_final: Optional[bool] = False,
) -> list[Doc]:
    """Split document on a milestone.

    Args:
        doc (Doc): The document to be split.
        milestones (Span | list[Span]): A Span or list of Spans to be matched.
        keep_spans (Optional[bool | str]): Whether to keep the spans in the split strings. Defaults to False.
        merge_final (Optional[bool]): Whether to force the merge of the last segment. Defaults to False.

    Returns:
        list[Doc]: A list of chunked spaCy Doc objects.
    """
    if len(doc) == 0:
        raise LexosException("Document is empty.")

    milestones = ensure_list(milestones)
    if keep_spans == "following":
        chunks = self._keep_milestones_following(doc, milestones)
    elif keep_spans == "preceding":
        chunks = self._keep_milestones_preceding(doc, milestones)
    else:
        # Only pass a boolean to keep_spans
        chunks = self._keep_milestones_bool(
            doc, milestones, keep_spans=bool(keep_spans)
        )

    # Ensure that all chunks are spaCy docs
    chunks = [
        chunk.as_doc() if isinstance(chunk, Span) else chunk for chunk in chunks
    ]

    # Apply the merge threshold and overlap
    chunks = self._apply_merge_threshold(
        chunks, force=merge_final if merge_final is not None else False
    )
    if self.overlap:
        chunks = self._apply_overlap(chunks)

    if self.strip_chunks:
        return [strip_doc(chunk) for chunk in chunks]

    return chunks
rendering:
  show_root_heading: true
  heading_level: 3

_write_chunk(path: str, n: int, chunk: Doc, output_dir: Path, as_text: bool = True) -> None ¤

Write chunk text to file with formatted name.

Parameters:

Name Type Description Default
path str

The path of the original file.

required
n int

The number of the chunk.

required
chunk Doc

The chunk to save.

required
output_dir Path

The output directory for the chunk.

required
as_text bool

Whether to save the chunk as a text file or a spaCy Doc object.

True
Source code in lexos/cutter/token_cutter.py
def _write_chunk(
    self, path: str, n: int, chunk: Doc, output_dir: Path, as_text: bool = True
) -> None:
    """Write chunk text to file with formatted name.

    Args:
        path (str): The path of the original file.
        n (int): The number of the chunk.
        chunk (Doc): The chunk to save.
        output_dir (Path): The output directory for the chunk.
        as_text (bool): Whether to save the chunk as a text file or a spaCy Doc object.
    """
    output_file = f"{path}{self.delimiter}{str(n).zfill(self.pad)}.txt"
    output_path = output_dir / output_file
    if as_text:
        with open(output_path, "w", encoding="utf-8") as f:
            f.write(chunk.text)
    else:
        chunk.to_disk(output_path)
rendering:
  show_root_heading: true
  heading_level: 3

merge(chunks: list[Doc]) -> Doc ¤

Merge a list of chunks into a single Doc.

Parameters:

Name Type Description Default
chunks list[Doc]

The list of chunks to merge.

required

Returns:

Name Type Description
Doc Doc

The merged doc.

Note
Source code in lexos/cutter/token_cutter.py
def merge(self, chunks: list[Doc]) -> Doc:
    """Merge a list of chunks into a single Doc.

    Args:
        chunks (list[Doc]): The list of chunks to merge.

    Returns:
        Doc: The merged doc.

    Note:
        - The user_data dict of the docs will be ignored. If they contain information
          that needs to be preserved, it should be stored in the doc extensions.
          See https://github.com/explosion/spaCy/discussions/9106.
    """
    if len(chunks) == 0:
        raise LexosException("No chunks to merge.")
    return Doc.from_docs(chunks)
rendering:
  show_root_heading: true
  heading_level: 3

save(output_dir: Path | str, names: Optional[str | list[str]] = None, delimiter: Optional[str] = '_', pad: Optional[int] = 3, strip_chunks: Optional[bool] = True, as_text: Optional[bool] = True) -> None ¤

Save the chunks to disk.

Parameters:

Name Type Description Default
output_dir Path | str

The output directory to save the chunks to.

required
names Optional[str | list[str]]

The doc names.

None
delimiter str

The delimiter to use for the chunk names.

'_'
pad int

The padding for the chunk names.

3
strip_chunks bool

Whether to strip leading and trailing whitespace in the chunks.

True
as_text Optional[bool]

Whether to save the chunks as text files or spaCy Doc objects (bytes).

True
Source code in lexos/cutter/token_cutter.py
@validate_call(config=validation_config)
def save(
    self,
    output_dir: Path | str,
    names: Optional[str | list[str]] = None,
    delimiter: Optional[str] = "_",
    pad: Optional[int] = 3,
    strip_chunks: Optional[bool] = True,
    as_text: Optional[bool] = True,
) -> None:
    """Save the chunks to disk.

    Args:
        output_dir (Path | str): The output directory to save the chunks to.
        names (Optional[str | list[str]]): The doc names.
        delimiter (str): The delimiter to use for the chunk names.
        pad (int): The padding for the chunk names.
        strip_chunks (bool): Whether to strip leading and trailing whitespace in the chunks.
        as_text (Optional[bool]): Whether to save the chunks as text files or spaCy Doc objects (bytes).
    """
    self._set_attributes(
        output_dir=output_dir,
        delimiter=delimiter,
        names=names,
        pad=pad,
        strip_chunks=strip_chunks,
    )
    if not self.chunks or self.chunks == []:
        raise LexosException("No chunks to save.")
    if self.names:
        if len(self.names) != len(self.chunks):
            raise LexosException(
                f"The number of docs in `names` ({len(self.names)}) must equal the number of docs in `chunks` ({len(self.chunks)})."
            )
    elif self.names == [] or self.names is None:
        self.names = [
            f"doc{str(i + 1).zfill(self.pad)}" for i in range(len(self.chunks))
        ]
    for i, doc in enumerate(self.chunks):
        for num, chunk in enumerate(doc):
            if strip_chunks:
                chunk = strip_doc(chunk)
            self._write_chunk(
                self.names[i], num + 1, chunk, Path(output_dir), as_text
            )
rendering:
  show_root_heading: true
  heading_level: 3

split(docs: Optional[Doc | list[Doc] | Path | str | list[Path | str]] = None, chunksize: Optional[int] = None, n: Optional[int] = None, merge_threshold: Optional[float] = 0.5, overlap: Optional[int] = None, names: Optional[str | list[str]] = None, newline: Optional[bool] = None, strip_chunks: Optional[bool] = True, file: Optional[bool] = False, model: Optional[str] = None, merge_final: Optional[bool] = False) -> list[list[Doc]] ¤

Split spaCy docs into chunks by a fixed number of tokens.

Parameters:

Name Type Description Default
docs Optional[Doc | list[Doc] | Path | str | list[Path | str]]

A spaCy doc, list of spaCy docs, or file paths to spaCy docs saved with Doc.to_disk().

None
chunksize Optional[int]

The number of tokens to split on.

None
n Optional[int]

The number of chunks to produce.

None
merge_threshold Optional[float]

The threshold to merge the last segment.

0.5
overlap Optional[int]

The number of tokens to overlap.

None
names Optional[str | list[str]]

The doc names.

None
newline Optional[bool]

Whether to chunk by lines.

None
strip_chunks Optional[bool]

Whether to strip leading and trailing whitespace in the chunks.

True
file Optional[bool]

Whether to load docs from files using Doc.from_disk().

False
model Optional[str]

The name of the spaCy model to use when loading docs from files. Required when file=True.

None
merge_final Optional[bool]

Whether to force the merge of the last segment.

False

Returns:

Type Description
list[list[Doc]]

list[list[Doc]]: A list of spaCy docs (chunks).

Source code in lexos/cutter/token_cutter.py
@validate_call(config=validation_config)
def split(
    self,
    docs: Optional[Doc | list[Doc] | Path | str | list[Path | str]] = None,
    chunksize: Optional[int] = None,
    n: Optional[int] = None,
    merge_threshold: Optional[float] = 0.5,
    overlap: Optional[int] = None,
    names: Optional[str | list[str]] = None,
    newline: Optional[bool] = None,
    strip_chunks: Optional[bool] = True,
    file: Optional[bool] = False,
    model: Optional[str] = None,
    merge_final: Optional[bool] = False,
) -> list[list[Doc]]:
    """Split spaCy docs into chunks by a fixed number of tokens.

    Args:
        docs (Optional[Doc | list[Doc] | Path | str | list[Path | str]]): A spaCy doc, list of spaCy docs, or file paths to spaCy docs saved with Doc.to_disk().
        chunksize (Optional[int]): The number of tokens to split on.
        n (Optional[int]): The number of chunks to produce.
        merge_threshold (Optional[float]): The threshold to merge the last segment.
        overlap (Optional[int]): The number of tokens to overlap.
        names (Optional[str | list[str]]): The doc names.
        newline (Optional[bool]): Whether to chunk by lines.
        strip_chunks (Optional[bool]): Whether to strip leading and trailing whitespace in the chunks.
        file (Optional[bool]): Whether to load docs from files using Doc.from_disk().
        model (Optional[str]): The name of the spaCy model to use when loading docs from files. Required when file=True.
        merge_final (Optional[bool]): Whether to force the merge of the last segment.

    Returns:
        list[list[Doc]]: A list of spaCy docs (chunks).
    """
    if docs:
        self.docs = ensure_list(docs)
    if not self.docs:
        raise LexosException("No documents provided for splitting.")
    self._set_attributes(
        chunksize=chunksize,
        n=n,
        merge_threshold=merge_threshold,
        overlap=overlap,
        names=names,
        newline=newline,
        strip_chunks=strip_chunks,
    )

    # Load docs from files if file=True
    if file:
        if model is None:
            raise LexosException("model parameter is required when file=True")
        nlp = spacy.load(model)
        loaded_docs = []
        for doc in ensure_list(docs):
            try:
                doc = Doc(nlp.vocab).from_disk(doc)
            except ValueError:
                raise LexosException(
                    f"Error loading doc from disk. Doc file must be in a valid spaCy serialization format: see https://spacy.io/api/doc#to_disk"
                )
            loaded_docs.append(doc)
        docs = loaded_docs

    if self.newline:
        if not self.n:
            self.n = self.chunksize
        if not self.n or self.n < 1:
            raise LexosException("n must be greater than 0.")
        for doc in ensure_list(docs):
            self.chunks.append(
                self._split_doc_by_lines(doc, merge_final=merge_final)
            )
    else:
        for doc in ensure_list(docs):
            self.chunks.append(self._split_doc(doc, merge_final=merge_final))

    return self.chunks
rendering:
  show_root_heading: true
  heading_level: 3

split_on_milestones(milestones: Span | list[Span], docs: Optional[Doc | list[Doc] | Path | str | list[Path | str]] = None, merge_threshold: Optional[float] = 0.5, merge_final: Optional[bool] = False, overlap: Optional[int] = None, keep_spans: Optional[bool | str] = False, strip_chunks: Optional[bool] = True, names: Optional[str | list[str]] = None, file: Optional[bool] = False, model: Optional[str] = None) -> list[list[Doc]] ¤

Split document on a milestone.

Parameters:

Name Type Description Default
milestones Span | list[Span]

A milestone span or list of milestone spans to be matched.

required
docs Optional[Doc | list[Doc] | Path | str | list[Path | str]]

The document(s) to be split, or file paths to spaCy docs saved with Doc.to_disk().

None
merge_threshold Optional[float]

The threshold to merge the last segment.

0.5
merge_final Optional[bool]

Whether to force the merge of the last segment.

False
overlap Optional[int]

The number of tokens to overlap.

None
keep_spans Optional[bool | str]

Whether to keep the spans in the split strings. Defaults to False.

False
strip_chunks Optional[bool]

Whether to strip leading and trailing whitespace in the chunks.

True
names Optional[str | list[str]]

The doc names.

None
file Optional[bool]

Whether to load docs from files using Doc.from_disk().

False
model Optional[str]

The name of the spaCy model to use when loading docs from files. Required when file=True.

None

Returns:

Type Description
list[list[Doc]]

list[list[Doc]]: A list of spaCy docs (chunks).

Source code in lexos/cutter/token_cutter.py
@validate_call(config=validation_config)
def split_on_milestones(
    self,
    milestones: Span | list[Span],
    docs: Optional[Doc | list[Doc] | Path | str | list[Path | str]] = None,
    merge_threshold: Optional[float] = 0.5,
    merge_final: Optional[bool] = False,
    overlap: Optional[int] = None,
    keep_spans: Optional[bool | str] = False,
    strip_chunks: Optional[bool] = True,
    names: Optional[str | list[str]] = None,
    file: Optional[bool] = False,
    model: Optional[str] = None,
) -> list[list[Doc]]:
    """Split document on a milestone.

    Args:
        milestones (Span | list[Span]): A milestone span or list of milestone spans to be matched.
        docs (Optional[Doc | list[Doc] | Path | str | list[Path | str]]): The document(s) to be split, or file paths to spaCy docs saved with Doc.to_disk().
        merge_threshold (Optional[float]): The threshold to merge the last segment.
        merge_final (Optional[bool]): Whether to force the merge of the last segment.
        overlap (Optional[int]): The number of tokens to overlap.
        keep_spans (Optional[bool | str]): Whether to keep the spans in the split strings. Defaults to False.
        strip_chunks (Optional[bool]): Whether to strip leading and trailing whitespace in the chunks.
        names (Optional[str | list[str]]): The doc names.
        file (Optional[bool]): Whether to load docs from files using Doc.from_disk().
        model (Optional[str]): The name of the spaCy model to use when loading docs from files. Required when file=True.

    Returns:
        list[list[Doc]]: A list of spaCy docs (chunks).
    """
    if docs:
        self.docs = ensure_list(docs)
    if not self.docs:
        raise LexosException("No documents provided for splitting.")
    self._set_attributes(
        merge_threshold=merge_threshold,
        overlap=overlap,
        strip_chunks=strip_chunks,
        names=names,
    )

    # Load docs from files if file=True
    if file:
        if model is None:
            raise LexosException("model parameter is required when file=True")
        nlp = spacy.load(model)
        loaded_docs = []
        for doc in ensure_list(docs):
            doc = Doc(nlp.vocab).from_disk(doc)
            loaded_docs.append(doc)
        docs = loaded_docs

    for doc in ensure_list(docs):
        chunks = self._split_doc_on_milestones(
            doc, milestones, keep_spans=keep_spans, merge_final=merge_final
        )
        self.chunks.append(chunks)
    return self.chunks
rendering:
  show_root_heading: true
  heading_level: 3

split_on_sentences(docs: Doc | list[Doc] | Path | str | list[Path | str], n: Optional[int] = None, merge_final: Optional[bool] = False, overlap: Optional[int] = None, strip_chunks: Optional[bool] = True, names: Optional[str | list[str]] = None, file: Optional[bool] = False, model: Optional[str] = None) -> list[list[Doc]] ¤

Split spaCy docs into chunks by a fixed number of sentences.

Parameters:

Name Type Description Default
docs Doc | list[Doc] | Path | str | list[Path | str]

A spaCy doc, list of spaCy docs, or file paths to spaCy docs saved with Doc.to_disk().

required
n Optional[int]

The number of sentences per chunk.

None
merge_final Optional[bool]

Whether to merge the last segment.

False
overlap Optional[int]

The number of tokens to overlap.

None
strip_chunks Optional[bool]

Whether to strip leading and trailing whitespace in the chunks.

True
names Optional[str | list[str]]

The doc names.

None
file Optional[bool]

Whether to load docs from files using Doc.from_disk().

False
model Optional[str]

The name of the spaCy model to use when loading docs from files. Required when file=True.

None

Returns:

Type Description
list[list[Doc]]

list[list[Doc]]: A list of spaCy docs (chunks).

Raises:

Type Description
ValueError

If n is less than or equal to 0.

ValueError

If the model has no sentences.

Source code in lexos/cutter/token_cutter.py
@validate_call(config=validation_config)
def split_on_sentences(
    self,
    docs: Doc | list[Doc] | Path | str | list[Path | str],
    n: Optional[int] = None,
    merge_final: Optional[bool] = False,
    overlap: Optional[int] = None,
    strip_chunks: Optional[bool] = True,
    names: Optional[str | list[str]] = None,
    file: Optional[bool] = False,
    model: Optional[str] = None,
) -> list[list[Doc]]:
    """Split spaCy docs into chunks by a fixed number of sentences.

    Args:
        docs (Doc | list[Doc] | Path | str | list[Path | str]): A spaCy doc, list of spaCy docs, or file paths to spaCy docs saved with Doc.to_disk().
        n (Optional[int]): The number of sentences per chunk.
        merge_final (Optional[bool]): Whether to merge the last segment.
        overlap (Optional[int]): The number of tokens to overlap.
        strip_chunks (Optional[bool]): Whether to strip leading and trailing whitespace in the chunks.
        names (Optional[str | list[str]]): The doc names.
        file (Optional[bool]): Whether to load docs from files using Doc.from_disk().
        model (Optional[str]): The name of the spaCy model to use when loading docs from files. Required when file=True.

    Returns:
        list[list[Doc]]: A list of spaCy docs (chunks).

    Raises:
        ValueError: If n is less than or equal to 0.
        ValueError: If the model has no sentences.
    """
    self._set_attributes(
        n=n,
        overlap=overlap,
        strip_chunks=strip_chunks,
        names=names,
    )

    # Load docs from files if file=True
    if file:
        if model is None:
            raise LexosException("model parameter is required when file=True")
        nlp = spacy.load(model)
        loaded_docs = []
        for doc in ensure_list(docs):
            doc = Doc(nlp.vocab).from_disk(doc)
            loaded_docs.append(doc)
        docs = loaded_docs

    if not self.n:
        self.n = self.chunksize
    if not self.n or self.n < 1:
        raise LexosException("n must be greater than 0.")
    for i, doc in enumerate(ensure_list(docs)):
        if not doc.has_annotation("SENT_START"):
            raise LexosException(
                f"The spaCy model used to create the Doc {i} does not have sentence boundary detection. Please use a model that includes the 'senter' or 'parser' pipeline component."
            )
        else:
            next(doc.sents)
        self.chunks.append(
            self._split_doc_by_sentences(doc, merge_final=merge_final)
        )
    return self.chunks
rendering:
  show_root_heading: true
  heading_level: 3

to_dict(names: Optional[list[str]] = None) -> dict[str, list[str]] ¤

Return the chunks as a dictionary.

Parameters:

Name Type Description Default
names Optional[list[str]]

A list of names for the doc Docs.

None

Returns:

Type Description
dict[str, list[str]]

dict[str, list[str]]: The chunks as a dictionary.

Source code in lexos/cutter/token_cutter.py
@validate_call(config=validation_config)
def to_dict(self, names: Optional[list[str]] = None) -> dict[str, list[str]]:
    """Return the chunks as a dictionary.

    Args:
        names (Optional[list[str]]): A list of names for the doc Docs.

    Returns:
        dict[str, list[str]]: The chunks as a dictionary.
    """
    if names:
        self.names = names
    if not self.names:
        self.names = [
            f"doc{str(i + 1).zfill(self.pad)}" for i in range(len(self.chunks))
        ]
    return {
        str(name): [chunk.text for chunk in chunks]
        for name, chunks in zip(self.names, self.chunks)
    }
rendering:
  show_root_heading: true
  heading_level: 3