Parallel Loader¤
The ParallelLoader class is an implementation of the Loader class optimized for large data sets using Python's ThreadPoolExecutor for parallel I/O operations.
VALID_FILE_TYPES = {*TEXT_TYPES, *PDF_TYPES, *DOCX_TYPES, *ZIP_TYPES}
module-attribute
¤
class ParallelLoader¤
Note
Mkdocstrings does not properly render the the class docstrings because griffe_pydantic gets confused when trying to render fields inherited from BaseLoader combined with new fields in ParallelLoader. Attributes are still documented correctly, but the overall class docstring is missing.
max_workers: Optional[int] = None
pydantic-field
¤
Maximum number of worker threads. Can be an integer or will be auto-calculated based on worker_strategy.
worker_strategy: str = 'auto'
pydantic-field
¤
Worker allocation strategy: 'auto' (analyze files), 'io_bound' (more workers), 'cpu_bound' (fewer workers), 'balanced' (middle ground).
batch_size: int = 100
pydantic-field
¤
Number of files to process in each batch for memory management.
show_progress: bool = True
pydantic-field
¤
Whether to show a progress bar during loading.
callback: Optional[Callable[..., None]] = None
pydantic-field
¤
Optional callback function for custom progress handling.
__init__(**data)
¤
_calculate_optimal_workers(file_list: list[tuple[Path | str, str]]) -> int
¤
Calculate optimal worker count based on file types and strategy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_list
|
list[tuple]
|
List of (path, mime_type) tuples. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
int |
int
|
Optimal number of workers. |
Source code in lexos/io/parallel_loader.py
_detect_mime_types_parallel(file_list: list[tuple[Path | str, Optional[str]]], progress: Optional[Progress] = None, task_id: Optional[int] = None) -> list[tuple[Path | str, str]]
¤
Detect MIME types for all files in parallel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_list
|
list[tuple]
|
List of (path, mime_type) tuples. |
required |
progress
|
Optional[Progress]
|
Rich progress bar instance. |
None
|
task_id
|
Optional[int]
|
Task ID for progress tracking. |
None
|
Returns:
| Type | Description |
|---|---|
list[tuple[Path | str, str]]
|
list[tuple]: List of (path, mime_type) tuples with detected types. |
Source code in lexos/io/parallel_loader.py
_get_mime_type(path: Path | str, file_start: bytes) -> str
¤
Get the mime type of a file with caching.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Path | str
|
The path to the file. |
required |
file_start
|
bytes
|
The first bytes of the file. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The mime type of the file. |
Source code in lexos/io/parallel_loader.py
_group_by_type(file_list: list[tuple[Path | str, str]]) -> dict[str, list[Path | str]]
¤
Group files by MIME type for optimized processing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_list
|
list[tuple]
|
List of (path, mime_type) tuples. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
dict |
dict[str, list[Path | str]]
|
Dictionary mapping mime_type to list of paths. |
Source code in lexos/io/parallel_loader.py
_load_docx_file(path: Path | str) -> tuple[str, str, str, str, Optional[Exception]]
¤
Load a docx file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Path | str
|
The path to the file. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
tuple |
tuple[str, str, str, str, Optional[Exception]]
|
(path_name, name, mime_type, text, error) |
Source code in lexos/io/parallel_loader.py
_load_pdf_file(path: Path | str) -> list[tuple[str, str, str, str, Optional[Exception]]]
¤
Load a pdf file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Path | str
|
The path to the file. |
required |
Returns:
| Type | Description |
|---|---|
list[tuple[str, str, str, str, Optional[Exception]]]
|
list[tuple]: List of (path_name, name, mime_type, text, error) for each page. |
Source code in lexos/io/parallel_loader.py
_load_text_file(path: Path | str, mime_type: str) -> tuple[str, str, str, str, Optional[Exception]]
¤
Load a text file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Path | str
|
The path to the file. |
required |
mime_type
|
str
|
The mime type of the file. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
tuple |
tuple[str, str, str, str, Optional[Exception]]
|
(path_name, name, mime_type, text, error) |
Source code in lexos/io/parallel_loader.py
_load_zip_file(path: Path | str) -> list[tuple[str, str, str, str, Optional[Exception]]]
¤
Handle a zip file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Path | str
|
The path to the file. |
required |
Returns:
| Type | Description |
|---|---|
list[tuple[str, str, str, str, Optional[Exception]]]
|
list[tuple]: List of (path_name, name, mime_type, text, error) for each file in zip. |
Source code in lexos/io/parallel_loader.py
_load_file_concurrent(path: Path | str, mime_type: str) -> list[tuple[str, str, str, str, Optional[Exception]]]
¤
Load a single file (wrapper for concurrent execution).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Path | str
|
The path to the file. |
required |
mime_type
|
str
|
The mime type of the file. |
required |
Returns:
| Type | Description |
|---|---|
list[tuple[str, str, str, str, Optional[Exception]]]
|
list[tuple]: List of (path_name, name, mime_type, text, error) tuples. |
Source code in lexos/io/parallel_loader.py
_prepare_file_list(paths: list[Path | str]) -> list[tuple[Path | str, str]]
¤
Prepare list of files with MIME types, expanding directories.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
paths
|
list[Path | str]
|
List of file or directory paths. |
required |
Returns:
| Type | Description |
|---|---|
list[tuple[Path | str, str]]
|
list[tuple]: List of (path, mime_type) tuples. |
Source code in lexos/io/parallel_loader.py
_process_results(results: list[tuple]) -> None
¤
Process and store results in a thread-safe manner.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
results
|
list[tuple]
|
List of (path_name, name, mime_type, text, error) tuples. |
required |
Source code in lexos/io/parallel_loader.py
_sort_files_by_type(file_list: list[tuple[Path | str, str]]) -> list[tuple[Path | str, str]]
¤
Sort files by MIME type for better cache locality.
Groups similar file types together to improve processing efficiency through better cache utilization and reduced context switching.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_list
|
list[tuple]
|
List of (path, mime_type) tuples. |
required |
Returns:
| Type | Description |
|---|---|
list[tuple[Path | str, str]]
|
list[tuple]: Sorted list of (path, mime_type) tuples. |
Source code in lexos/io/parallel_loader.py
load_dataset(dataset: Self) -> None
¤
Load a dataset.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dataset
|
DataLoader
|
The dataset to load. |
required |
As of v2.10.5, Pydantic does not support recursive types (Self).
As a result, this method performs its own check to see if the
value of dataset is of type DataLoader.
Source code in lexos/io/parallel_loader.py
load(paths: Path | str | list[Path | str]) -> None
¤
Load files in parallel with batching and progress tracking.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
paths
|
Path | str | list[Path | str]
|
The list of paths to load. |
required |
Source code in lexos/io/parallel_loader.py
450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 | |