Skip to content

integration¤

Module Description¤

Database integration layer for the Lexos Corpus class.

This module extends the existing Corpus class with optional SQLite database capabilities while maintaining full compatibility with the file-based system.

SQLiteCorpus pydantic-model ¤

Bases: Corpus

Corpus with SQLite database backend support.

Extends the standard Corpus with optional database storage: - Dual storage: files + database - Full-text search across records - Efficient metadata queries - Optional database-only mode

The database integration is completely optional and does not break existing file-based workflows.

Fields:

Source code in lexos/corpus/sqlite/integration.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
class SQLiteCorpus(Corpus):
    """Corpus with SQLite database backend support.

    Extends the standard Corpus with optional database storage:
    - Dual storage: files + database
    - Full-text search across records
    - Efficient metadata queries
    - Optional database-only mode

    The database integration is completely optional and does not break
    existing file-based workflows.
    """

    # Add database-related fields to the Pydantic model
    use_sqlite: bool = Field(
        default=False, description="Whether to enable database storage"
    )
    sqlite_only: bool = Field(
        default=False, description="Whether to use database-only mode"
    )
    sqlite_path: Optional[str] = Field(
        default=None, description="Path to SQLite database file"
    )
    db: Optional[SQLiteBackend] = Field(
        default=None, description="Database connection object", exclude=True
    )

    def __init__(self, **data: Any):
        """Initialize corpus with optional database integration.

        Args:
            **data (Any): Standard Corpus initialization parameters
        """
        # Extract database-specific parameters
        sqlite_path = data.pop("sqlite_path", None)
        use_sqlite = data.pop("use_sqlite", False)
        sqlite_only = data.pop("sqlite_only", False)

        # Set the database fields
        data["use_sqlite"] = use_sqlite
        data["sqlite_only"] = sqlite_only
        data["sqlite_path"] = sqlite_path

        # Initialize parent class
        super().__init__(**data)

        # Initialize database if enabled
        if self.use_sqlite or self.sqlite_only:
            db_path = sqlite_path or f"{self.corpus_dir}/corpus.db"
            self.db = SQLiteBackend(database_path=db_path)
            self._initialize_metadata()
        else:
            self.db = None

    def _add_to_backend(
        self,
        content,
        name: Optional[str] = None,
        is_active: Optional[bool] = True,
        model: Optional[str] = None,
        extensions: Optional[list[str]] = None,
        metadata: Optional[dict[str, Any]] = None,
        id_type: Optional[str] = "uuid4",
    ):
        """Add records in database-only mode without file storage."""
        from spacy.tokens import Doc

        # Sanitize metadata to ensure JSON-serializable types (defensive)
        if metadata is not None:
            metadata = self._sanitize_metadata(metadata)

        # Handle single or multiple content items
        if isinstance(content, (Doc, Record, str)):
            items = [content]
        else:
            items = list(content)

        for item in items:
            # Generate unique ID
            new_id = self._generate_unique_id(type=id_type)

            if isinstance(item, Record):
                record = item
            else:
                record_kwargs = dict(
                    id=new_id,
                    name=self._ensure_unique_name(name),
                    is_active=is_active,
                    content=item,
                    model=model,
                    data_source=None,
                )
                if extensions is not None:
                    record_kwargs["extensions"] = extensions
                if metadata is not None:
                    record_kwargs["meta"] = metadata
                record = Record(**record_kwargs)

                # Note: Records are created with string content and can be parsed later if needed
                # The database stores both parsed and unparsed content efficiently

            # Add to in-memory records
            record_id_str = str(record.id)
            self.records[record_id_str] = record
            if record.name not in self.names:
                self.names[record.name] = []
            self.names[record.name].append(record_id_str)
            # Add a meta entry similar to file-based add to keep Corpus metadata consistent
            try:
                meta_entry = record.model_dump(
                    exclude=["content", "terms", "text", "tokens"], mode="json"
                )
                # Ensure id is a string and annotate token/term counts
                meta_entry["id"] = str(meta_entry.get("id", record_id_str))
                meta_entry["num_tokens"] = (
                    record.num_tokens() if record.is_parsed else 0
                )
                meta_entry["num_terms"] = record.num_terms() if record.is_parsed else 0
                self.meta[record_id_str] = meta_entry
            except Exception:
                # Fallback minimal meta if model_dump fails
                self.meta[record_id_str] = {
                    "id": record_id_str,
                    "name": record.name,
                    "is_active": record.is_active,
                    "num_tokens": record.num_tokens() if record.is_parsed else 0,
                    "num_terms": record.num_terms() if record.is_parsed else 0,
                }

            # Store in database
            if self.db:
                self.db.add_record(record)

        # Update corpus state
        self._update_corpus_state()

    def __del__(self):
        """Destructor to ensure database connections are closed."""
        try:
            self.close()
        except:
            pass  # Ignore errors during cleanup

    def _get_timestamp(self) -> str:
        """Get current timestamp as ISO string."""
        from datetime import datetime

        return datetime.now().isoformat()

    def _load_records_from_disk(self):
        """Load records from the corpus directory into memory.

        This is a helper method for sync() to load file-based records
        from disk before syncing them to the database.
        """
        corpus_dir = Path(self.corpus_dir)
        metadata_path = corpus_dir / self.corpus_metadata_file

        # Check if corpus directory and metadata exist
        if not corpus_dir.exists():
            return

        if not metadata_path.exists():
            return

        # Load metadata
        try:
            import srsly

            metadata = srsly.read_json(metadata_path)

            # Load record metadata
            if "meta" in metadata and metadata["meta"]:
                for record_id, record_meta in metadata["meta"].items():
                    # Load the record from disk
                    data_dir = corpus_dir / "data"
                    record_file = data_dir / f"{record_id}.bin"

                    if record_file.exists():
                        # Create a Record object and load from disk
                        record = Record(id=record_id, name=record_meta.get("name", ""))
                        record.from_disk(
                            str(record_file),
                            model=record_meta.get("model"),
                            model_cache=self.model_cache,
                        )

                        # Add to in-memory structures
                        self.records[record_id] = record
                        if record.name not in self.names:
                            self.names[record.name] = []
                        self.names[record.name].append(record_id)

        except Exception as e:
            # If loading fails, just continue with empty records
            print(f"Warning: Failed to load records from disk: {str(e)}")

    def _initialize_metadata(self):
        """Initialize corpus metadata in the database."""
        if not self.db:
            return

        with self.db.SessionLocal() as session:
            # Check if corpus metadata exists
            corpus_id = self.name or "default"
            existing = (
                session.query(SQLiteMetadata)
                .filter(SQLiteMetadata.corpus_id == corpus_id)
                .first()
            )

            if not existing:
                # Create new corpus metadata
                corpus_metadata = SQLiteMetadata()
                corpus_metadata.corpus_id = corpus_id
                corpus_metadata.name = self.name
                corpus_metadata.num_docs = self.num_docs
                corpus_metadata.num_active_docs = self.num_active_docs
                corpus_metadata.num_tokens = self.num_tokens
                corpus_metadata.num_terms = self.num_terms
                corpus_metadata.corpus_dir = self.corpus_dir
                corpus_metadata.metadata_json = json.dumps(self.meta, default=str)
                corpus_metadata.analysis_results_json = json.dumps(
                    self.analysis_results, default=str
                )
                corpus_metadata.corpus_fingerprint = self._generate_corpus_fingerprint()
                corpus_metadata.created_at = self._get_timestamp()
                corpus_metadata.updated_at = self._get_timestamp()
                session.add(corpus_metadata)
                session.commit()

    def _sanitize_metadata(self, metadata: dict[str, Any]) -> dict[str, Any]:
        """Convert non-JSON-serializable types to strings.

        Args:
            metadata: Original metadata dictionary

        Returns:
            Sanitized metadata dictionary with JSON-serializable values
        """
        from datetime import date, datetime
        from pathlib import Path
        from uuid import UUID

        sanitized = {}
        for key, value in metadata.items():
            if isinstance(value, UUID):
                sanitized[key] = str(value)
            elif isinstance(value, (datetime, date)):
                sanitized[key] = value.isoformat()
            elif isinstance(value, Path):
                sanitized[key] = str(value)
            elif isinstance(value, dict):
                sanitized[key] = self._sanitize_metadata(value)  # Recursive
            elif isinstance(value, list):
                sanitized[key] = [
                    self._sanitize_metadata({"item": item})["item"]
                    if isinstance(item, dict)
                    else str(item)
                    if isinstance(item, (UUID, datetime, date, Path))
                    else item
                    for item in value
                ]
            else:
                sanitized[key] = value

        return sanitized

    def _update_corpus_state(self):
        """Update corpus state in both memory and database."""
        # Update in-memory state
        super()._update_corpus_state()

        # Update database metadata if enabled
        if self.db:
            with self.db.SessionLocal() as session:
                corpus_id = self.name or "default"
                corpus_metadata = (
                    session.query(SQLiteMetadata)
                    .filter(SQLiteMetadata.corpus_id == corpus_id)
                    .first()
                )

                if corpus_metadata:
                    corpus_metadata.num_docs = self.num_docs
                    corpus_metadata.num_active_docs = self.num_active_docs
                    corpus_metadata.num_tokens = self.num_tokens
                    corpus_metadata.num_terms = self.num_terms
                    corpus_metadata.metadata_json = json.dumps(self.meta, default=str)
                    corpus_metadata.analysis_results_json = json.dumps(
                        self.analysis_results, default=str
                    )
                    corpus_metadata.corpus_fingerprint = (
                        self._generate_corpus_fingerprint()
                    )
                    corpus_metadata.updated_at = self._get_timestamp()

                    session.commit()

    @validate_call
    def add(
        self,
        content,
        name: Optional[str] = None,
        is_active: Optional[bool] = True,
        model: Optional[str] = None,
        extensions: Optional[list[str]] = None,
        metadata: Optional[dict[str, Any]] = None,
        id_type: Optional[str] = "uuid4",
        cache: Optional[bool] = False,
        store_in_db: Optional[bool] = None,
    ):
        """Add a record to the corpus with optional database storage.

        Args:
            content (str | Doc | Record): The content of the record
            name (Optional[str]): Optional name for the record
            is_active (Optional[bool]): Whether the record is active
            model (Optional[str]): spaCy model name for parsing
            extensions (Optional[list[str]]): List of spaCy extensions to add
            metadata (Optional[dict[str, Any]]): Optional metadata dictionary
            id_type (Optional[str]): Type of ID to generate ('uuid4' or 'int')
            cache (Optional[bool]): Whether to cache the record in memory
            store_in_db (Optional[bool]): Whether to store the record in the database
        """
        # Sanitize metadata to ensure JSON-serializable types
        if metadata is not None:
            metadata = self._sanitize_metadata(metadata)

        # Determine storage strategy
        use_db = (
            store_in_db
            if store_in_db is not None
            else self.use_sqlite or self.sqlite_only
        )
        use_files = not self.sqlite_only

        # Get current record count to track new additions
        initial_record_count = len(self.records)

        # Add using parent implementation if using files
        if use_files:
            super().add(
                content=content,
                name=name,
                is_active=is_active,
                model=model,
                extensions=extensions,
                metadata=metadata,
                id_type=id_type,
                cache=cache,
            )
        else:
            # Database-only mode - implement add logic without file storage
            self._add_to_backend(
                content=content,
                name=name,
                is_active=is_active,
                model=model,
                extensions=extensions,
                metadata=metadata,
                id_type=id_type,
            )

        # Also store in database if enabled and we're using file storage
        if use_db and self.db and use_files:
            # Get the newly added records
            current_records = list(self.records.values())
            new_records = current_records[initial_record_count:]

            for record in new_records:
                try:
                    # Note: Records can be parsed later if needed
                    # The database efficiently stores both parsed and unparsed content

                    self.db.add_record(record)
                except Exception as e:
                    # Log error but don't fail the entire operation
                    print(f"Warning: Failed to add record {record.id} to database: {e}")

    @validate_call
    def filter_records(
        self,
        is_active: Optional[bool] = None,
        is_parsed: Optional[bool] = None,
        model: Optional[str] = None,
        min_tokens: Optional[int] = None,
        max_tokens: Optional[int] = None,
        limit: Optional[int] = None,
        use_database: bool = True,
    ) -> list[Record]:
        """Filter records by various criteria.

        Args:
            is_active: Filter by active status
            is_parsed: Filter by parsed status
            model: Filter by spaCy model name
            min_tokens: Minimum number of tokens
            max_tokens: Maximum number of tokens
            limit: Maximum number of results
            use_database: Whether to use database filtering (vs in-memory)

        Returns:
            List of matching Record objects
        """
        if use_database and self.db:
            return self.db.filter_records(
                is_active=is_active,
                is_parsed=is_parsed,
                model=model,
                min_tokens=min_tokens,
                max_tokens=max_tokens,
                limit=limit,
            )
        else:
            # Fallback to in-memory filtering
            filtered_records = []
            for record in self.records.values():
                if is_active is not None and record.is_active != is_active:
                    continue
                if is_parsed is not None and record.is_parsed != is_parsed:
                    continue
                if model is not None and record.model != model:
                    continue
                if min_tokens is not None:
                    try:
                        if record.num_tokens() < min_tokens:
                            continue
                    except:
                        continue
                if max_tokens is not None:
                    try:
                        if record.num_tokens() > max_tokens:
                            continue
                    except:
                        continue

                filtered_records.append(record)

                if limit and len(filtered_records) >= limit:
                    break

            return filtered_records

    @validate_call
    def get_stats(self) -> dict[str, Any]:
        """Get corpus statistics from the database.

        Returns:
            Dictionary containing database-derived statistics

        Raises:
            LexosException: If database is not enabled
        """
        if not self.db:
            raise LexosException(
                "Database is not enabled. Initialize corpus with use_sqlite=True."
            )

        return self.db.get_stats()

    @validate_call
    def search(
        self,
        query: str,
        limit: int = 100,
        include_inactive: bool = False,
        model_filter: Optional[str] = None,
        load_from_db: bool = True,
    ) -> list[Record]:
        """Perform full-text search on corpus records.

        Args:
            query: FTS5 search query string
            limit: Maximum number of results to return
            include_inactive: Whether to include inactive records
            model_filter: Optional filter by spaCy model name
            load_from_db: Whether to load results from database (vs memory)

        Returns:
            List of matching Record objects

        Raises:
            LexosException: If database is not enabled
        """
        if not self.db:
            raise LexosException(
                "Database is not enabled. Initialize corpus with use_sqlite=True to use search."
            )

        return self.db.search_records(
            query=query,
            limit=limit,
            include_inactive=include_inactive,
            model_filter=model_filter,
        )

    @validate_call
    def sync(self, overwrite: bool = False) -> int:
        """Synchronize existing file-based records to the database.

        This method loads records from the corpus directory on disk and adds them
        to the database. If records are already in memory, they will be used instead.

        Args:
            overwrite: Whether to overwrite existing database records

        Returns:
            Number of records synchronized

        Raises:
            LexosException: If database is not enabled
        """
        if not self.db:
            raise LexosException(
                "Database is not enabled. Initialize corpus with use_sqlite=True."
            )

        # Load records from disk if not already in memory
        if not self.records:
            self._load_records_from_disk()

        synced_count = 0

        for record in self.records.values():
            try:
                if overwrite:
                    # Check if exists and update
                    existing = self.db.get_record(str(record.id), include_doc=False)
                    if existing:
                        self.db.update_record(record)
                    else:
                        self.db.add_record(record)
                else:
                    # Only add if doesn't exist
                    existing = self.db.get_record(str(record.id), include_doc=False)
                    if not existing:
                        self.db.add_record(record)

                synced_count += 1

            except Exception as e:
                # Log error but continue with other records
                print(f"Warning: Failed to sync record {record.id}: {str(e)}")

        return synced_count

    @validate_call
    def load(self, include_docs: bool = False, active_only: bool = True) -> int:
        """Load records from database into memory.

        Args:
            include_docs: Whether to deserialize spaCy Doc content
            active_only: Whether to load only active records

        Returns:
            Number of records loaded

        Raises:
            LexosException: If database is not enabled
        """
        if not self.db:
            raise LexosException(
                "Database is not enabled. Initialize corpus with use_sqlite=True."
            )

        # Clear existing records if loading from database
        self.records.clear()
        self.names.clear()

        # Load records from database
        filters = {"is_active": True} if active_only else {}
        db_records = self.db.filter_records(**filters)

        loaded_count = 0
        for record in db_records:
            # Add to in-memory structures
            record_id_str = str(record.id)
            self.records[record_id_str] = record
            if record.name not in self.names:
                self.names[record.name] = []
            self.names[record.name].append(record_id_str)
            # Populate meta for loaded record so Corpus metadata is consistent
            try:
                meta_entry = record.model_dump(
                    exclude=["content", "terms", "text", "tokens"], mode="json"
                )
                if "id" in meta_entry:
                    meta_entry["id"] = str(meta_entry["id"])
                meta_entry["num_tokens"] = (
                    record.num_tokens() if record.is_parsed else 0
                )
                meta_entry["num_terms"] = record.num_terms() if record.is_parsed else 0
                self.meta[record_id_str] = meta_entry
            except Exception:
                self.meta[record_id_str] = {
                    "id": record_id_str,
                    "name": record.name,
                    "is_active": record.is_active,
                    "num_tokens": record.num_tokens() if record.is_parsed else 0,
                    "num_terms": record.num_terms() if record.is_parsed else 0,
                }
            loaded_count += 1

        # Update corpus state
        self._update_corpus_state()

        return loaded_count

    def close(self):
        """Close database connections and clean up resources."""
        if self.db:
            self.db.close()

active_terms: set property ¤

Return the set of active terms in the Corpus.

Returns:

Name Type Description
set set

A set of active term strings found in active parsed records.

analysis_results: dict[str, dict[str, Any]] pydantic-field ¤

Storage for results from external analysis modules (kmeans, topwords, kwic, etc.)

corpus_dir: str = 'corpus' pydantic-field ¤

The path to the directory where the corpus is stored.

corpus_metadata_file: str = 'corpus_metadata.json' pydantic-field ¤

The name of the corpus metadata file.

db: Optional[SQLiteBackend] = None pydantic-field ¤

Database connection object

meta: dict[str, Any] = {} pydantic-field ¤

Metadata dictionary for arbitrary metadata relating to the corpus.

meta_df: pd.DataFrame property ¤

Return a DataFrame of the Corpus metadata.

model_cache: LexosModelCache = LexosModelCache() pydantic-field ¤

A cache for spaCy models used in the Corpus.

name: str = None pydantic-field ¤

The name of the corpus.

num_active_docs: int = 0 pydantic-field ¤

Number of active records in the corpus.

num_active_terms: int property ¤

Return the number of active terms in the Corpus.

num_active_tokens: int property ¤

Return the number of active tokens in the Corpus.

Returns:

Name Type Description
int int

The total number of tokens in active parsed records.

num_docs: int = 0 pydantic-field ¤

Total number of records in the corpus.

num_terms: int = 0 pydantic-field ¤

Total number of unique terms in the corpus.

num_tokens: int = 0 pydantic-field ¤

Total number of tokens in the corpus.

records: RecordsDict = {} pydantic-field ¤

Dictionary of records in the corpus.

sqlite_only: bool = False pydantic-field ¤

Whether to use database-only mode

sqlite_path: Optional[str] = None pydantic-field ¤

Path to SQLite database file

terms: set = set() pydantic-field ¤

Set of unique terms in the corpus.

use_sqlite: bool = False pydantic-field ¤

Whether to enable database storage

__del__() ¤

Destructor to ensure database connections are closed.

Source code in lexos/corpus/sqlite/integration.py
def __del__(self):
    """Destructor to ensure database connections are closed."""
    try:
        self.close()
    except:
        pass  # Ignore errors during cleanup

__init__(**data: Any) ¤

Initialize corpus with optional database integration.

Parameters:

Name Type Description Default
**data Any

Standard Corpus initialization parameters

{}
Source code in lexos/corpus/sqlite/integration.py
def __init__(self, **data: Any):
    """Initialize corpus with optional database integration.

    Args:
        **data (Any): Standard Corpus initialization parameters
    """
    # Extract database-specific parameters
    sqlite_path = data.pop("sqlite_path", None)
    use_sqlite = data.pop("use_sqlite", False)
    sqlite_only = data.pop("sqlite_only", False)

    # Set the database fields
    data["use_sqlite"] = use_sqlite
    data["sqlite_only"] = sqlite_only
    data["sqlite_path"] = sqlite_path

    # Initialize parent class
    super().__init__(**data)

    # Initialize database if enabled
    if self.use_sqlite or self.sqlite_only:
        db_path = sqlite_path or f"{self.corpus_dir}/corpus.db"
        self.db = SQLiteBackend(database_path=db_path)
        self._initialize_metadata()
    else:
        self.db = None

__iter__() -> Iterable[Record] ¤

Make the corpus iterable.

Returns:

Type Description
Iterable[Record]

Iterator[Record]: An iterator over the Record objects in the corpus.

Source code in lexos/corpus/corpus.py
def __iter__(self) -> Iterable[Record]:
    """Make the corpus iterable.

    Returns:
        Iterator[Record]: An iterator over the Record objects in the corpus.
    """
    return iter(self.records.values())

__repr__() ¤

Return a string representation of the Corpus.

Source code in lexos/corpus/corpus.py
def __repr__(self):
    """Return a string representation of the Corpus."""
    fields = {field: getattr(self, field) for field in self.model_fields_set}
    field_list = [f"{k}={v}" for k, v in fields.items()]
    rep = f"Corpus({', '.join(sorted(field_list))})"
    return rep

add(content, name: Optional[str] = None, is_active: Optional[bool] = True, model: Optional[str] = None, extensions: Optional[list[str]] = None, metadata: Optional[dict[str, Any]] = None, id_type: Optional[str] = 'uuid4', cache: Optional[bool] = False, store_in_db: Optional[bool] = None) ¤

Add a record to the corpus with optional database storage.

Parameters:

Name Type Description Default
content str | Doc | Record

The content of the record

required
name Optional[str]

Optional name for the record

None
is_active Optional[bool]

Whether the record is active

True
model Optional[str]

spaCy model name for parsing

None
extensions Optional[list[str]]

List of spaCy extensions to add

None
metadata Optional[dict[str, Any]]

Optional metadata dictionary

None
id_type Optional[str]

Type of ID to generate ('uuid4' or 'int')

'uuid4'
cache Optional[bool]

Whether to cache the record in memory

False
store_in_db Optional[bool]

Whether to store the record in the database

None
Source code in lexos/corpus/sqlite/integration.py
@validate_call
def add(
    self,
    content,
    name: Optional[str] = None,
    is_active: Optional[bool] = True,
    model: Optional[str] = None,
    extensions: Optional[list[str]] = None,
    metadata: Optional[dict[str, Any]] = None,
    id_type: Optional[str] = "uuid4",
    cache: Optional[bool] = False,
    store_in_db: Optional[bool] = None,
):
    """Add a record to the corpus with optional database storage.

    Args:
        content (str | Doc | Record): The content of the record
        name (Optional[str]): Optional name for the record
        is_active (Optional[bool]): Whether the record is active
        model (Optional[str]): spaCy model name for parsing
        extensions (Optional[list[str]]): List of spaCy extensions to add
        metadata (Optional[dict[str, Any]]): Optional metadata dictionary
        id_type (Optional[str]): Type of ID to generate ('uuid4' or 'int')
        cache (Optional[bool]): Whether to cache the record in memory
        store_in_db (Optional[bool]): Whether to store the record in the database
    """
    # Sanitize metadata to ensure JSON-serializable types
    if metadata is not None:
        metadata = self._sanitize_metadata(metadata)

    # Determine storage strategy
    use_db = (
        store_in_db
        if store_in_db is not None
        else self.use_sqlite or self.sqlite_only
    )
    use_files = not self.sqlite_only

    # Get current record count to track new additions
    initial_record_count = len(self.records)

    # Add using parent implementation if using files
    if use_files:
        super().add(
            content=content,
            name=name,
            is_active=is_active,
            model=model,
            extensions=extensions,
            metadata=metadata,
            id_type=id_type,
            cache=cache,
        )
    else:
        # Database-only mode - implement add logic without file storage
        self._add_to_backend(
            content=content,
            name=name,
            is_active=is_active,
            model=model,
            extensions=extensions,
            metadata=metadata,
            id_type=id_type,
        )

    # Also store in database if enabled and we're using file storage
    if use_db and self.db and use_files:
        # Get the newly added records
        current_records = list(self.records.values())
        new_records = current_records[initial_record_count:]

        for record in new_records:
            try:
                # Note: Records can be parsed later if needed
                # The database efficiently stores both parsed and unparsed content

                self.db.add_record(record)
            except Exception as e:
                # Log error but don't fail the entire operation
                print(f"Warning: Failed to add record {record.id} to database: {e}")

add_from_files(paths: Path | str | list[Path | str], max_workers: Optional[int] = None, worker_strategy: str = 'auto', batch_size: int = 100, show_progress: bool = True, name_template: Optional[str] = None, is_active: bool = True, model: Optional[str] = None, extensions: Optional[list[str]] = None, metadata: Optional[dict[str, Any]] = None, id_type: str = 'uuid4') -> None ¤

Load files directly into corpus using parallel I/O.

This method streams files into the corpus without holding all content in memory, making it suitable for very large datasets. Files are loaded in parallel using the ParallelLoader with all its optimization features (smart file ordering, auto-tuning, etc.).

State updates are deferred until all files are loaded for optimal performance.

Parameters:

Name Type Description Default
paths Path | str | list[Path | str]

File paths or directories to load.

required
max_workers Optional[int]

Maximum number of worker threads. If None, auto-calculated based on worker_strategy.

None
worker_strategy str

Worker allocation strategy. Options: - "auto": Analyzes file types and chooses optimal strategy (default) - "io_bound": More workers for I/O-intensive operations - "cpu_bound": Fewer workers for CPU-intensive operations - "balanced": Middle ground between I/O and CPU

'auto'
batch_size int

Number of files to process in each batch. Default 100.

100
show_progress bool

Whether to show progress bar. Default True.

True
name_template Optional[str]

Template for generating record names. Can include {filename}, {stem}, {index}. If None, uses filename stem.

None
is_active bool

Whether records should be marked as active. Default True.

True
model Optional[str]

Name of language model used to parse records.

None
extensions Optional[list[str]]

List of extension names to add to records.

None
metadata Optional[dict[str, Any]]

Metadata to add to all records.

None
id_type str

Type of ID to generate ("integer" or "uuid4"). Default "uuid4".

'uuid4'
Example
corpus = Corpus("my_corpus")
# Load all text files from a directory
corpus.add_from_files("path/to/texts/")
# With custom naming
corpus.add_from_files(
    ["file1.txt", "file2.txt"],
    name_template="{stem}_{index}",
    metadata={"source": "collection_a"}
)
Source code in lexos/corpus/corpus.py
def add_from_files(
    self,
    paths: Path | str | list[Path | str],
    max_workers: Optional[int] = None,
    worker_strategy: str = "auto",
    batch_size: int = 100,
    show_progress: bool = True,
    name_template: Optional[str] = None,
    is_active: bool = True,
    model: Optional[str] = None,
    extensions: Optional[list[str]] = None,
    metadata: Optional[dict[str, Any]] = None,
    id_type: str = "uuid4",
) -> None:
    """Load files directly into corpus using parallel I/O.

    This method streams files into the corpus without holding all
    content in memory, making it suitable for very large datasets.
    Files are loaded in parallel using the ParallelLoader with all
    its optimization features (smart file ordering, auto-tuning, etc.).

    State updates are deferred until all files are loaded for optimal
    performance.

    Args:
        paths (Path | str | list[Path | str]): File paths or directories to load.
        max_workers (Optional[int]): Maximum number of worker threads.
            If None, auto-calculated based on worker_strategy.
        worker_strategy (str): Worker allocation strategy. Options:
            - "auto": Analyzes file types and chooses optimal strategy (default)
            - "io_bound": More workers for I/O-intensive operations
            - "cpu_bound": Fewer workers for CPU-intensive operations
            - "balanced": Middle ground between I/O and CPU
        batch_size (int): Number of files to process in each batch. Default 100.
        show_progress (bool): Whether to show progress bar. Default True.
        name_template (Optional[str]): Template for generating record names.
            Can include {filename}, {stem}, {index}. If None, uses filename stem.
        is_active (bool): Whether records should be marked as active. Default True.
        model (Optional[str]): Name of language model used to parse records.
        extensions (Optional[list[str]]): List of extension names to add to records.
        metadata (Optional[dict[str, Any]]): Metadata to add to all records.
        id_type (str): Type of ID to generate ("integer" or "uuid4"). Default "uuid4".

    Example:
        ```python
        corpus = Corpus("my_corpus")
        # Load all text files from a directory
        corpus.add_from_files("path/to/texts/")
        # With custom naming
        corpus.add_from_files(
            ["file1.txt", "file2.txt"],
            name_template="{stem}_{index}",
            metadata={"source": "collection_a"}
        )
        ```
    """
    from lexos.io.parallel_loader import ParallelLoader

    # Sanitize metadata if provided
    if metadata is not None:
        metadata = self._sanitize_metadata(metadata)

    # Create ParallelLoader with specified settings
    loader = ParallelLoader(
        max_workers=max_workers,
        worker_strategy=worker_strategy,
        batch_size=batch_size,
        show_progress=show_progress,
    )

    # Track for error reporting
    loaded_count = 0
    error_count = 0
    errors = []

    # Stream files and add to corpus
    for index, (path, name, mime_type, text, error) in enumerate(
        loader.load_streaming(paths), start=1
    ):
        if error:
            error_count += 1
            errors.append((path, error))
            continue

        # Generate record name from template or use default
        if name_template:
            record_name = name_template.format(
                filename=Path(path).name, stem=name, index=index
            )
        else:
            record_name = name

        # Generate unique ID
        record_id = self._generate_unique_id(type=id_type)

        # Create record kwargs
        record_kwargs = dict(
            id=record_id,
            name=record_name,
            is_active=is_active,
            content=text,
            model=model,
            data_source=str(path),
        )

        if extensions is not None:
            record_kwargs["extensions"] = extensions

        if metadata is not None:
            record_kwargs["meta"] = metadata.copy()

        # Create and add record without updating state
        record = Record(**record_kwargs)
        self._add_to_corpus_without_state_update(record)
        loaded_count += 1

    # Update corpus state once at the end
    self._update_corpus_state()

    # Report results
    from wasabi import msg

    msg.good(f"Loaded {loaded_count} files into corpus. Errors: {error_count}")

    if errors and error_count <= 10:  # Show first 10 errors
        msg.warn("Errors encountered:")
        for path, error in errors[:10]:
            msg.fail(f"  {path}: {error}")

close() ¤

Close database connections and clean up resources.

Source code in lexos/corpus/sqlite/integration.py
def close(self):
    """Close database connections and clean up resources."""
    if self.db:
        self.db.close()

export_statistical_fingerprint() -> dict[str, Any] ¤

Export standardized statistical summary for external modules.

Returns:

Type Description
dict[str, Any]

Dictionary containing corpus statistical fingerprint for external module consumption

Note

This provides the standardized API for external modules to consume corpus statistics.

Source code in lexos/corpus/corpus.py
@validate_call(config=model_config)
def export_statistical_fingerprint(self) -> dict[str, Any]:
    """Export standardized statistical summary for external modules.

    Returns:
        Dictionary containing corpus statistical fingerprint for external module consumption

    Note:
        This provides the standardized API for external modules to consume corpus statistics.
    """
    # TODO: Expand fingerprint based on external module requirements
    # TODO: Add feature extraction optimized for different analysis types

    try:
        stats = self.get_stats(active_only=True)

        # Core statistical fingerprint
        fingerprint = {
            "corpus_metadata": {
                "name": self.name,
                "num_docs": self.num_docs,
                "num_active_docs": self.num_active_docs,
                "num_tokens": self.num_tokens,
                "num_terms": self.num_terms,
                "corpus_fingerprint": self._generate_corpus_fingerprint(),
            },
            "distribution_stats": stats.distribution_stats,
            "percentiles": stats.percentiles,
            "text_diversity": stats.text_diversity_stats,
            "basic_stats": {
                "mean": stats.mean,
                "std": stats.standard_deviation,
                "iqr_values": stats.iqr_values,
                "iqr_bounds": stats.iqr_bounds,
            },
            "document_features": stats.doc_stats_df.to_dict("records"),
            "term_frequencies": self.term_counts(
                n=100, most_common=True
            ),  # Top 100 terms
        }

        return fingerprint

    except Exception as e:
        # Fallback fingerprint if CorpusStats fails
        return {
            "corpus_metadata": {
                "name": self.name,
                "num_docs": self.num_docs,
                "num_active_docs": self.num_active_docs,
                "num_tokens": self.num_tokens,
                "num_terms": self.num_terms,
                "corpus_fingerprint": self._generate_corpus_fingerprint(),
            },
            "error": f"Statistical analysis failed: {str(e)}",
            "basic_features": {
                "document_ids": list(self.records.keys()),
                "document_names": list(self.names.keys()),
            },
        }

filter_records(is_active: Optional[bool] = None, is_parsed: Optional[bool] = None, model: Optional[str] = None, min_tokens: Optional[int] = None, max_tokens: Optional[int] = None, limit: Optional[int] = None, use_database: bool = True) -> list[Record] ¤

Filter records by various criteria.

Parameters:

Name Type Description Default
is_active Optional[bool]

Filter by active status

None
is_parsed Optional[bool]

Filter by parsed status

None
model Optional[str]

Filter by spaCy model name

None
min_tokens Optional[int]

Minimum number of tokens

None
max_tokens Optional[int]

Maximum number of tokens

None
limit Optional[int]

Maximum number of results

None
use_database bool

Whether to use database filtering (vs in-memory)

True

Returns:

Type Description
list[Record]

List of matching Record objects

Source code in lexos/corpus/sqlite/integration.py
@validate_call
def filter_records(
    self,
    is_active: Optional[bool] = None,
    is_parsed: Optional[bool] = None,
    model: Optional[str] = None,
    min_tokens: Optional[int] = None,
    max_tokens: Optional[int] = None,
    limit: Optional[int] = None,
    use_database: bool = True,
) -> list[Record]:
    """Filter records by various criteria.

    Args:
        is_active: Filter by active status
        is_parsed: Filter by parsed status
        model: Filter by spaCy model name
        min_tokens: Minimum number of tokens
        max_tokens: Maximum number of tokens
        limit: Maximum number of results
        use_database: Whether to use database filtering (vs in-memory)

    Returns:
        List of matching Record objects
    """
    if use_database and self.db:
        return self.db.filter_records(
            is_active=is_active,
            is_parsed=is_parsed,
            model=model,
            min_tokens=min_tokens,
            max_tokens=max_tokens,
            limit=limit,
        )
    else:
        # Fallback to in-memory filtering
        filtered_records = []
        for record in self.records.values():
            if is_active is not None and record.is_active != is_active:
                continue
            if is_parsed is not None and record.is_parsed != is_parsed:
                continue
            if model is not None and record.model != model:
                continue
            if min_tokens is not None:
                try:
                    if record.num_tokens() < min_tokens:
                        continue
                except:
                    continue
            if max_tokens is not None:
                try:
                    if record.num_tokens() > max_tokens:
                        continue
                except:
                    continue

            filtered_records.append(record)

            if limit and len(filtered_records) >= limit:
                break

        return filtered_records

get(id: Optional[str | list[str]] = None, name: Optional[str | list[str]] = None) -> Record | list[Record] ¤

Get a record from the Corpus by ID.

Tries to get the record from memory; otherwise loads it from file.

Parameters:

Name Type Description Default
id str | list[str]

A record id or list of ids from the Corpus records.

None
name str | list[str]

A record name or list of names from the Corpus records.

None

Returns:

Type Description
Record | list[Record]

Record | list[Record]: The record(s) with the given ID(s) or name(s).

Source code in lexos/corpus/corpus.py
@validate_call(config=model_config)
def get(
    self,
    id: Optional[str | list[str]] = None,
    name: Optional[str | list[str]] = None,
) -> Record | list[Record]:
    """Get a record from the Corpus by ID.

    Tries to get the record from memory; otherwise loads it from file.

    Args:
        id (str | list[str]): A record id or list of ids from the Corpus records.
        name (str | list[str]): A record name or list of names from the Corpus records.

    Returns:
        Record | list[Record]: The record(s) with the given ID(s) or name(s).
    """
    # Ensure either id or name is provided
    if not id and not name:
        raise LexosException(
            "Must provide either an ID or a name to remove a record."
        )

    # Ensure id is a list
    if isinstance(id, str):
        ids = [id]
    elif isinstance(id, list):
        ids = id
    else:
        ids = []

    # If name is provided, get the IDs from the name(s)
    if name and not id:
        if isinstance(name, str):
            name = [name]
        ids = []
        for n in name:
            ids.extend(self._get_by_name(n))

    result = []
    for id in ids:
        # If the id is in the Corpus cache, return the record
        if id in self.records.keys():
            result.append(self.records[id])

        # Otherwise, load the record from file
        else:
            record = self.records[id]
            result.append(
                record._from_disk(
                    record.meta["filepath"], record.model, self.model_cache
                )
            )
    if len(result) == 1:
        return result[0]
    return result

get_analysis_results(module_name: str = None) -> dict[str, Any] ¤

Retrieve analysis results from external modules.

Parameters:

Name Type Description Default
module_name str

Specific module name to retrieve, or None for all results

None

Returns:

Type Description
dict[str, Any]

Dictionary containing analysis results

Source code in lexos/corpus/corpus.py
@validate_call(config=model_config)
def get_analysis_results(self, module_name: str = None) -> dict[str, Any]:
    """Retrieve analysis results from external modules.

    Args:
        module_name: Specific module name to retrieve, or None for all results

    Returns:
        Dictionary containing analysis results
    """
    if module_name:
        if module_name not in self.analysis_results:
            raise ValueError(f"No results found for module '{module_name}'")
        return self.analysis_results[module_name]

    return self.analysis_results

get_stats() -> dict[str, Any] ¤

Get corpus statistics from the database.

Returns:

Type Description
dict[str, Any]

Dictionary containing database-derived statistics

Raises:

Type Description
LexosException

If database is not enabled

Source code in lexos/corpus/sqlite/integration.py
@validate_call
def get_stats(self) -> dict[str, Any]:
    """Get corpus statistics from the database.

    Returns:
        Dictionary containing database-derived statistics

    Raises:
        LexosException: If database is not enabled
    """
    if not self.db:
        raise LexosException(
            "Database is not enabled. Initialize corpus with use_sqlite=True."
        )

    return self.db.get_stats()

import_analysis_results(module_name: str, results_data: dict[str, Any], version: str = '1.0.0', overwrite: bool = False) -> None ¤

Import analysis results from external modules into corpus metadata.

Parameters:

Name Type Description Default
module_name str

Name of the external module (e.g., 'kmeans', 'topwords', 'kwic', 'text_classification')

required
results_data dict[str, Any]

Dictionary containing the analysis results

required
version str

Version string for result versioning and compatibility

'1.0.0'
overwrite bool

Whether to overwrite existing results for this module

False
Note

This is a framework implementation. Full functionality requires peer modules to be implemented and their result schemas defined.

Returns:

Type Description
None

None

Source code in lexos/corpus/corpus.py
@validate_call(config=model_config)
def import_analysis_results(
    self,
    module_name: str,
    results_data: dict[str, Any],
    version: str = "1.0.0",
    overwrite: bool = False,
) -> None:
    """Import analysis results from external modules into corpus metadata.

    Args:
        module_name: Name of the external module (e.g., 'kmeans', 'topwords', 'kwic', 'text_classification')
        results_data: Dictionary containing the analysis results
        version: Version string for result versioning and compatibility
        overwrite: Whether to overwrite existing results for this module

    Note:
        This is a framework implementation. Full functionality requires
        peer modules to be implemented and their result schemas defined.

    Returns:
        None
    """
    # TODO: Add result schema validation once peer modules are available
    # TODO: Add proper versioning system for backward compatibility
    # TODO: Implement result correlation capabilities across modules

    if module_name in self.analysis_results and not overwrite:
        raise ValueError(
            f"Results for module '{module_name}' already exist. "
            f"Use overwrite=True to replace them."
        )

    # Basic result structure with metadata
    self.analysis_results[module_name] = {
        "version": version,
        "timestamp": pd.Timestamp.now().isoformat(),
        "corpus_state": {
            "num_docs": self.num_docs,
            "num_active_docs": self.num_active_docs,
            "corpus_fingerprint": self._generate_corpus_fingerprint(),
        },
        "results": results_data,
    }

    msg.good(f"Imported {module_name} analysis results (version {version})")

load(include_docs: bool = False, active_only: bool = True) -> int ¤

Load records from database into memory.

Parameters:

Name Type Description Default
include_docs bool

Whether to deserialize spaCy Doc content

False
active_only bool

Whether to load only active records

True

Returns:

Type Description
int

Number of records loaded

Raises:

Type Description
LexosException

If database is not enabled

Source code in lexos/corpus/sqlite/integration.py
@validate_call
def load(self, include_docs: bool = False, active_only: bool = True) -> int:
    """Load records from database into memory.

    Args:
        include_docs: Whether to deserialize spaCy Doc content
        active_only: Whether to load only active records

    Returns:
        Number of records loaded

    Raises:
        LexosException: If database is not enabled
    """
    if not self.db:
        raise LexosException(
            "Database is not enabled. Initialize corpus with use_sqlite=True."
        )

    # Clear existing records if loading from database
    self.records.clear()
    self.names.clear()

    # Load records from database
    filters = {"is_active": True} if active_only else {}
    db_records = self.db.filter_records(**filters)

    loaded_count = 0
    for record in db_records:
        # Add to in-memory structures
        record_id_str = str(record.id)
        self.records[record_id_str] = record
        if record.name not in self.names:
            self.names[record.name] = []
        self.names[record.name].append(record_id_str)
        # Populate meta for loaded record so Corpus metadata is consistent
        try:
            meta_entry = record.model_dump(
                exclude=["content", "terms", "text", "tokens"], mode="json"
            )
            if "id" in meta_entry:
                meta_entry["id"] = str(meta_entry["id"])
            meta_entry["num_tokens"] = (
                record.num_tokens() if record.is_parsed else 0
            )
            meta_entry["num_terms"] = record.num_terms() if record.is_parsed else 0
            self.meta[record_id_str] = meta_entry
        except Exception:
            self.meta[record_id_str] = {
                "id": record_id_str,
                "name": record.name,
                "is_active": record.is_active,
                "num_tokens": record.num_tokens() if record.is_parsed else 0,
                "num_terms": record.num_terms() if record.is_parsed else 0,
            }
        loaded_count += 1

    # Update corpus state
    self._update_corpus_state()

    return loaded_count

remove(id: Optional[str | list[str]] = None, name: Optional[str | list[str]] = None) -> None ¤

Remove a record from the corpus by ID.

Parameters:

Name Type Description Default
id str | list[str]

The ID of the record to remove.

None
name str | list[str]

The name of the record to remove.

None

Returns:

Type Description
None

None

Source code in lexos/corpus/corpus.py
@validate_call(config=model_config)
def remove(
    self,
    id: Optional[str | list[str]] = None,
    name: Optional[str | list[str]] = None,
) -> None:
    """Remove a record from the corpus by ID.

    Args:
        id (str | list[str]): The ID of the record to remove.
        name (str | list[str]): The name of the record to remove.

    Returns:
        None
    """
    # Ensure either id or name is provided
    if not id and not name:
        raise LexosException(
            "Must provide either an ID or a name to remove a record."
        )

    # Ensure id is a list
    if isinstance(id, str):
        ids = [id]
    elif isinstance(id, list):
        ids = id
    else:
        ids = []

    # If name is provided, get the IDs from the name(s)
    if name and not id:
        if isinstance(name, str):
            name = [name]
        ids = []
        for n in name:
            ids.extend(self._get_by_name(n))

    for id in ids:
        # Remove the entry from the records dictionary and names list
        try:
            entry = self.records.pop(id)
        except KeyError:
            raise LexosException(
                f"Record with ID {id} does not exist in the Corpus."
            )
        try:
            if entry.name in self.names:
                self.names[entry.name].remove(str(entry.id))
                if not self.names[entry.name]:  # Remove empty lists
                    self.names.pop(entry.name)
        except KeyError:
            raise LexosException(
                f"Record with name {entry.name} does not exist in the Corpus."
            )

    # Update the Corpus state after removing the record
    self._update_corpus_state()

save(path: Path | str = None) -> None ¤

Save the Corpus as a zip archive.

Parameters:

Name Type Description Default
path Path | str

The path to save the Corpus to.

None

Returns:

Type Description
None

None

Source code in lexos/corpus/corpus.py
@validate_call(config=model_config)
def save(self, path: Path | str = None) -> None:
    """Save the Corpus as a zip archive.

    Args:
        path (Path | str): The path to save the Corpus to.

    Returns:
        None
    """
    shutil.make_archive(path / f"{self.name}", "zip", self.corpus_dir)

search(query: str, limit: int = 100, include_inactive: bool = False, model_filter: Optional[str] = None, load_from_db: bool = True) -> list[Record] ¤

Perform full-text search on corpus records.

Parameters:

Name Type Description Default
query str

FTS5 search query string

required
limit int

Maximum number of results to return

100
include_inactive bool

Whether to include inactive records

False
model_filter Optional[str]

Optional filter by spaCy model name

None
load_from_db bool

Whether to load results from database (vs memory)

True

Returns:

Type Description
list[Record]

List of matching Record objects

Raises:

Type Description
LexosException

If database is not enabled

Source code in lexos/corpus/sqlite/integration.py
@validate_call
def search(
    self,
    query: str,
    limit: int = 100,
    include_inactive: bool = False,
    model_filter: Optional[str] = None,
    load_from_db: bool = True,
) -> list[Record]:
    """Perform full-text search on corpus records.

    Args:
        query: FTS5 search query string
        limit: Maximum number of results to return
        include_inactive: Whether to include inactive records
        model_filter: Optional filter by spaCy model name
        load_from_db: Whether to load results from database (vs memory)

    Returns:
        List of matching Record objects

    Raises:
        LexosException: If database is not enabled
    """
    if not self.db:
        raise LexosException(
            "Database is not enabled. Initialize corpus with use_sqlite=True to use search."
        )

    return self.db.search_records(
        query=query,
        limit=limit,
        include_inactive=include_inactive,
        model_filter=model_filter,
    )

set(id: str, **props) -> None ¤

Set a property or properties of a record in the Corpus.

Parameters:

Name Type Description Default
id str

A record id.

required
**props dict

The dict containing any other properties to set.

{}

Returns:

Type Description
None

None

Source code in lexos/corpus/corpus.py
@validate_call(config=model_config)
def set(self, id: str, **props) -> None:
    """Set a property or properties of a record in the Corpus.

    Args:
        id (str): A record id.
        **props (dict): The dict containing any other properties to set.

    Returns:
        None
    """
    # Get the record by ID
    record = self.records[id]

    # Save the record's filepath, thenupdate the specified properties
    old_filepath = record.meta.get("filepath", None)
    record.set(**props)

    # If the filepath has changed, delete the old file
    if record.meta.get("filepath", None) != old_filepath:
        Path(old_filepath).unlink(missing_ok=True)

    # If the record has a filepath, ensure the file is in the data directory
    filepath = record.meta.get("filepath")
    if filepath and filepath not in str(Path(self.corpus_dir) / "data"):
        record.to_disk(filepath, extensions=record.extensions)

    # Update the record in the Corpus and update the corpus state
    self.records[id] = record
    self._update_corpus_state()

sync(overwrite: bool = False) -> int ¤

Synchronize existing file-based records to the database.

This method loads records from the corpus directory on disk and adds them to the database. If records are already in memory, they will be used instead.

Parameters:

Name Type Description Default
overwrite bool

Whether to overwrite existing database records

False

Returns:

Type Description
int

Number of records synchronized

Raises:

Type Description
LexosException

If database is not enabled

Source code in lexos/corpus/sqlite/integration.py
@validate_call
def sync(self, overwrite: bool = False) -> int:
    """Synchronize existing file-based records to the database.

    This method loads records from the corpus directory on disk and adds them
    to the database. If records are already in memory, they will be used instead.

    Args:
        overwrite: Whether to overwrite existing database records

    Returns:
        Number of records synchronized

    Raises:
        LexosException: If database is not enabled
    """
    if not self.db:
        raise LexosException(
            "Database is not enabled. Initialize corpus with use_sqlite=True."
        )

    # Load records from disk if not already in memory
    if not self.records:
        self._load_records_from_disk()

    synced_count = 0

    for record in self.records.values():
        try:
            if overwrite:
                # Check if exists and update
                existing = self.db.get_record(str(record.id), include_doc=False)
                if existing:
                    self.db.update_record(record)
                else:
                    self.db.add_record(record)
            else:
                # Only add if doesn't exist
                existing = self.db.get_record(str(record.id), include_doc=False)
                if not existing:
                    self.db.add_record(record)

            synced_count += 1

        except Exception as e:
            # Log error but continue with other records
            print(f"Warning: Failed to sync record {record.id}: {str(e)}")

    return synced_count

term_counts(n: Optional[int] = 10, most_common: Optional[bool] = True) -> Counter ¤

Get a Counter with the most common Corpus term counts.

Parameters:

Name Type Description Default
n Optional[int]

The number of most common terms to return. Defaults to 10.

10
most_common Optional[bool]

If True, return the n most common terms; otherwise, return the n least common terms.

True

Returns:

Type Description
Counter

A collections.Counter object containing the n most common term counts for all records in the Corpus.

Source code in lexos/corpus/corpus.py
@validate_call(config=model_config)
def term_counts(
    self, n: Optional[int] = 10, most_common: Optional[bool] = True
) -> Counter:
    """Get a Counter with the most common Corpus term counts.

    Args:
        n (Optional[int]): The number of most common terms to return. Defaults to 10.
        most_common (Optional[bool]): If True, return the n most common terms; otherwise, return the n least common terms.

    Returns:
        A collections.Counter object containing the n most common term counts for all records in the Corpus.
    """
    # Count the terms in all records
    counter = Counter()
    for record in self.records.values():
        if record.is_parsed:
            counter.update(record.terms)

    # Optionally filter the results
    if most_common and n:
        return counter.most_common(n)
    elif not most_common and n:
        return counter.most_common()[: -n - 1 : -1]
    elif most_common is False and n is None:
        return counter.most_common()[::]
    else:
        return counter

to_df(exclude: list[str] = ['content', 'terms', 'tokens']) -> pd.DataFrame ¤

Return a table of the Corpus records.

Parameters:

Name Type Description Default
exclude list[str]

A list of fields to exclude from the dataframe. If you wish to exclude metadata fields with the same name as model fields, you can use the prefix "metadata_" to avoid conflicts.

['content', 'terms', 'tokens']

Returns:

Type Description
DataFrame

pd.DataFrame: A dataframe representing the records in the Corpus.

Source code in lexos/corpus/corpus.py
@validate_call(config=model_config)
def to_df(
    self, exclude: list[str] = ["content", "terms", "tokens"]
) -> pd.DataFrame:
    """Return a table of the Corpus records.

    Args:
        exclude (list[str]): A list of fields to exclude from the dataframe. If you wish to exclude metadata fields with the same name as model fields, you can use the prefix "metadata_" to avoid conflicts.

    Returns:
        pd.DataFrame: A dataframe representing the records in the Corpus.
    """
    rows = []
    for record in self.records.values():  # <- Fix the duplicate
        if record is None:  # Skip None records
            continue

        # Get model categories.
        # NOTE: We avoid calling `model_dump()` on `Record` objects that are
        # unparsed because Pydantic may attempt to evaluate computed fields
        # while creating the serialized dict. Several computed properties on
        # `Record` (e.g., `terms`, `tokens`, `num_terms`, and
        # `num_tokens`) raise `LexosException("Record is not parsed.")`
        # when the record is not parsed. If `model_dump()` evaluates those
        # properties for an unparsed record, it will raise and cause
        # `to_df()` to fail. Therefore:
        #  - For parsed records, we call `record.model_dump()` and use the
        #    model-dump output (it includes computed fields safely).
        #  - For unparsed records, we *do not* call `model_dump()`; we
        #    instead build a minimal, safe `row` from stored fields and
        #    set any computed-like values to safe defaults (empty list,
        #    0, or empty string). This produces robust DataFrame output
        #    for corpora that contain a mix of parsed and unparsed
        #    records without triggering computed-field side-effects.
        fields_that_may_raise = {
            "terms",
            "tokens",
            "num_terms",
            "num_tokens",
            "text",
        }
        # Build a dump_exclude set to prevent model_dump from computing
        # sensitive fields on unparsed records
        dump_exclude = set(exclude)
        if hasattr(record, "is_parsed") and record.is_parsed:
            # Parsed records: safely model_dump, excluding any user-requested fields
            row = record.model_dump(exclude=list(dump_exclude))
        else:
            # Unparsed records: avoid model_dump to prevent computed property evaluation
            base_fields = [
                "id",
                "name",
                "is_active",
                "content",
                "model",
                "extensions",
                "data_source",
                "meta",
            ]
            row = {}
            for f in base_fields:
                if f in exclude:
                    continue
                try:
                    value = getattr(record, f, None)
                except Exception:
                    # Defensive: if getattr triggers an error, skip and set None
                    value = None
                # Serialize Doc-like content into text rather than bytes to keep DataFrame friendly
                if f == "content" and value is not None:
                    try:
                        from spacy.tokens import Doc

                        if isinstance(value, Doc):
                            value = value.text
                    except Exception:
                        pass
                # Ensure id is serialized to string to match model_dump output for parsed records
                if f == "id" and value is not None:
                    try:
                        value = str(value)
                    except Exception:
                        pass
                # Sanitize meta similar to model_dump
                if f == "meta" and value is not None:
                    try:
                        value = record._sanitize_metadata(value)
                    except Exception:
                        pass
                row[f] = value

        # Patch for unparsed records: fill terms/tokens/num_terms/num_tokens/text
        # Only if those fields are not excluded
        if "terms" not in exclude:
            if hasattr(record, "is_parsed") and record.is_parsed:
                row["terms"] = list(record.terms)
            else:
                row["terms"] = []
        if "tokens" not in exclude:
            if hasattr(record, "is_parsed") and record.is_parsed:
                row["tokens"] = record.tokens
            else:
                row["tokens"] = []
        if "num_terms" not in exclude:
            if hasattr(record, "is_parsed") and record.is_parsed:
                row["num_terms"] = record.num_terms()
            else:
                row["num_terms"] = 0
        if "num_tokens" not in exclude:
            if hasattr(record, "is_parsed") and record.is_parsed:
                row["num_tokens"] = record.num_tokens()
            else:
                row["num_tokens"] = 0
        if "text" not in exclude:
            if hasattr(record, "is_parsed") and record.is_parsed:
                row["text"] = record.text
            else:
                row["text"] = ""

        # Add metadata categories, respecting exclude list
        metadata = row.pop("meta", {})
        for key, value in metadata.items():
            # Exclude metadata fields if requested
            if key in exclude or f"metadata_{key}" in exclude:
                continue
            if key in row:
                key = f"metadata_{key}"
            row[key] = value

        # Append the row to the rows list
        rows.append(row)

    # Create a DataFrame from the rows
    if rows:  # Only create DataFrame if we have data
        df = pd.DataFrame(rows)
        # Fill NaN with appropriate values based on column dtype
        fill_values = {}
        for col in df.columns:
            if pd.api.types.is_numeric_dtype(df[col]):
                fill_values[col] = 0
            elif pd.api.types.is_bool_dtype(df[col]):
                fill_values[col] = False
            else:
                fill_values[col] = ""

        df = df.fillna(fill_values)  # Use assignment instead of inplace
        return df
    else:
        # Return empty DataFrame with basic columns if no records
        return pd.DataFrame(columns=["id", "name", "is_active"])

validate_analysis_compatibility(module_name: str) -> dict[str, Any] ¤

Validate if stored analysis results are compatible with current corpus state.

Parameters:

Name Type Description Default
module_name str

Name of the module to validate

required

Returns:

Type Description
dict[str, Any]

Dictionary containing validation results and recommendations

Source code in lexos/corpus/corpus.py
@validate_call(config=model_config)
def validate_analysis_compatibility(self, module_name: str) -> dict[str, Any]:
    """Validate if stored analysis results are compatible with current corpus state.

    Args:
        module_name: Name of the module to validate

    Returns:
        Dictionary containing validation results and recommendations
    """
    if module_name not in self.analysis_results:
        return {
            "compatible": False,
            "reason": f"No analysis results found for module '{module_name}'",
        }

    stored_results = self.analysis_results[module_name]
    stored_state = stored_results.get("corpus_state", {})
    current_fingerprint = self._generate_corpus_fingerprint()
    stored_fingerprint = stored_state.get("corpus_fingerprint", "")

    compatibility = {
        "compatible": stored_fingerprint == current_fingerprint,
        "current_fingerprint": current_fingerprint,
        "stored_fingerprint": stored_fingerprint,
        "stored_timestamp": stored_results.get("timestamp", "unknown"),
        "stored_version": stored_results.get("version", "unknown"),
    }

    if not compatibility["compatible"]:
        compatibility["reason"] = (
            "Corpus state has changed since analysis was performed"
        )
        compatibility["recommendation"] = (
            f"Re-run {module_name} analysis with current corpus state"
        )

        # Detailed state comparison
        compatibility["state_changes"] = {
            "num_docs": {
                "stored": stored_state.get("num_docs", 0),
                "current": self.num_docs,
                "changed": stored_state.get("num_docs", 0) != self.num_docs,
            },
            "num_active_docs": {
                "stored": stored_state.get("num_active_docs", 0),
                "current": self.num_active_docs,
                "changed": stored_state.get("num_active_docs", 0)
                != self.num_active_docs,
            },
        }

    return compatibility
rendering:
  show_root_heading: true
  heading_level: 3

__init__(**data: Any) ¤

Initialize corpus with optional database integration.

Parameters:

Name Type Description Default
**data Any

Standard Corpus initialization parameters

{}
Source code in lexos/corpus/sqlite/integration.py
def __init__(self, **data: Any):
    """Initialize corpus with optional database integration.

    Args:
        **data (Any): Standard Corpus initialization parameters
    """
    # Extract database-specific parameters
    sqlite_path = data.pop("sqlite_path", None)
    use_sqlite = data.pop("use_sqlite", False)
    sqlite_only = data.pop("sqlite_only", False)

    # Set the database fields
    data["use_sqlite"] = use_sqlite
    data["sqlite_only"] = sqlite_only
    data["sqlite_path"] = sqlite_path

    # Initialize parent class
    super().__init__(**data)

    # Initialize database if enabled
    if self.use_sqlite or self.sqlite_only:
        db_path = sqlite_path or f"{self.corpus_dir}/corpus.db"
        self.db = SQLiteBackend(database_path=db_path)
        self._initialize_metadata()
    else:
        self.db = None
rendering:
  show_root_heading: true
  heading_level: 3

_add_to_backend(content, name: Optional[str] = None, is_active: Optional[bool] = True, model: Optional[str] = None, extensions: Optional[list[str]] = None, metadata: Optional[dict[str, Any]] = None, id_type: Optional[str] = 'uuid4') ¤

Add records in database-only mode without file storage.

Source code in lexos/corpus/sqlite/integration.py
def _add_to_backend(
    self,
    content,
    name: Optional[str] = None,
    is_active: Optional[bool] = True,
    model: Optional[str] = None,
    extensions: Optional[list[str]] = None,
    metadata: Optional[dict[str, Any]] = None,
    id_type: Optional[str] = "uuid4",
):
    """Add records in database-only mode without file storage."""
    from spacy.tokens import Doc

    # Sanitize metadata to ensure JSON-serializable types (defensive)
    if metadata is not None:
        metadata = self._sanitize_metadata(metadata)

    # Handle single or multiple content items
    if isinstance(content, (Doc, Record, str)):
        items = [content]
    else:
        items = list(content)

    for item in items:
        # Generate unique ID
        new_id = self._generate_unique_id(type=id_type)

        if isinstance(item, Record):
            record = item
        else:
            record_kwargs = dict(
                id=new_id,
                name=self._ensure_unique_name(name),
                is_active=is_active,
                content=item,
                model=model,
                data_source=None,
            )
            if extensions is not None:
                record_kwargs["extensions"] = extensions
            if metadata is not None:
                record_kwargs["meta"] = metadata
            record = Record(**record_kwargs)

            # Note: Records are created with string content and can be parsed later if needed
            # The database stores both parsed and unparsed content efficiently

        # Add to in-memory records
        record_id_str = str(record.id)
        self.records[record_id_str] = record
        if record.name not in self.names:
            self.names[record.name] = []
        self.names[record.name].append(record_id_str)
        # Add a meta entry similar to file-based add to keep Corpus metadata consistent
        try:
            meta_entry = record.model_dump(
                exclude=["content", "terms", "text", "tokens"], mode="json"
            )
            # Ensure id is a string and annotate token/term counts
            meta_entry["id"] = str(meta_entry.get("id", record_id_str))
            meta_entry["num_tokens"] = (
                record.num_tokens() if record.is_parsed else 0
            )
            meta_entry["num_terms"] = record.num_terms() if record.is_parsed else 0
            self.meta[record_id_str] = meta_entry
        except Exception:
            # Fallback minimal meta if model_dump fails
            self.meta[record_id_str] = {
                "id": record_id_str,
                "name": record.name,
                "is_active": record.is_active,
                "num_tokens": record.num_tokens() if record.is_parsed else 0,
                "num_terms": record.num_terms() if record.is_parsed else 0,
            }

        # Store in database
        if self.db:
            self.db.add_record(record)

    # Update corpus state
    self._update_corpus_state()
rendering:
  show_root_heading: true
  heading_level: 3

__del__() ¤

Destructor to ensure database connections are closed.

Source code in lexos/corpus/sqlite/integration.py
def __del__(self):
    """Destructor to ensure database connections are closed."""
    try:
        self.close()
    except:
        pass  # Ignore errors during cleanup
rendering:
  show_root_heading: true
  heading_level: 3

_get_timestamp() -> str ¤

Get current timestamp as ISO string.

Source code in lexos/corpus/sqlite/integration.py
def _get_timestamp(self) -> str:
    """Get current timestamp as ISO string."""
    from datetime import datetime

    return datetime.now().isoformat()
rendering:
  show_root_heading: true
  heading_level: 3

_load_records_from_disk() ¤

Load records from the corpus directory into memory.

This is a helper method for sync() to load file-based records from disk before syncing them to the database.

Source code in lexos/corpus/sqlite/integration.py
def _load_records_from_disk(self):
    """Load records from the corpus directory into memory.

    This is a helper method for sync() to load file-based records
    from disk before syncing them to the database.
    """
    corpus_dir = Path(self.corpus_dir)
    metadata_path = corpus_dir / self.corpus_metadata_file

    # Check if corpus directory and metadata exist
    if not corpus_dir.exists():
        return

    if not metadata_path.exists():
        return

    # Load metadata
    try:
        import srsly

        metadata = srsly.read_json(metadata_path)

        # Load record metadata
        if "meta" in metadata and metadata["meta"]:
            for record_id, record_meta in metadata["meta"].items():
                # Load the record from disk
                data_dir = corpus_dir / "data"
                record_file = data_dir / f"{record_id}.bin"

                if record_file.exists():
                    # Create a Record object and load from disk
                    record = Record(id=record_id, name=record_meta.get("name", ""))
                    record.from_disk(
                        str(record_file),
                        model=record_meta.get("model"),
                        model_cache=self.model_cache,
                    )

                    # Add to in-memory structures
                    self.records[record_id] = record
                    if record.name not in self.names:
                        self.names[record.name] = []
                    self.names[record.name].append(record_id)

    except Exception as e:
        # If loading fails, just continue with empty records
        print(f"Warning: Failed to load records from disk: {str(e)}")
rendering:
  show_root_heading: true
  heading_level: 3

_initialize_metadata() ¤

Initialize corpus metadata in the database.

Source code in lexos/corpus/sqlite/integration.py
def _initialize_metadata(self):
    """Initialize corpus metadata in the database."""
    if not self.db:
        return

    with self.db.SessionLocal() as session:
        # Check if corpus metadata exists
        corpus_id = self.name or "default"
        existing = (
            session.query(SQLiteMetadata)
            .filter(SQLiteMetadata.corpus_id == corpus_id)
            .first()
        )

        if not existing:
            # Create new corpus metadata
            corpus_metadata = SQLiteMetadata()
            corpus_metadata.corpus_id = corpus_id
            corpus_metadata.name = self.name
            corpus_metadata.num_docs = self.num_docs
            corpus_metadata.num_active_docs = self.num_active_docs
            corpus_metadata.num_tokens = self.num_tokens
            corpus_metadata.num_terms = self.num_terms
            corpus_metadata.corpus_dir = self.corpus_dir
            corpus_metadata.metadata_json = json.dumps(self.meta, default=str)
            corpus_metadata.analysis_results_json = json.dumps(
                self.analysis_results, default=str
            )
            corpus_metadata.corpus_fingerprint = self._generate_corpus_fingerprint()
            corpus_metadata.created_at = self._get_timestamp()
            corpus_metadata.updated_at = self._get_timestamp()
            session.add(corpus_metadata)
            session.commit()
rendering:
  show_root_heading: true
  heading_level: 3

_sanitize_metadata(metadata: dict[str, Any]) -> dict[str, Any] ¤

Convert non-JSON-serializable types to strings.

Parameters:

Name Type Description Default
metadata dict[str, Any]

Original metadata dictionary

required

Returns:

Type Description
dict[str, Any]

Sanitized metadata dictionary with JSON-serializable values

Source code in lexos/corpus/sqlite/integration.py
def _sanitize_metadata(self, metadata: dict[str, Any]) -> dict[str, Any]:
    """Convert non-JSON-serializable types to strings.

    Args:
        metadata: Original metadata dictionary

    Returns:
        Sanitized metadata dictionary with JSON-serializable values
    """
    from datetime import date, datetime
    from pathlib import Path
    from uuid import UUID

    sanitized = {}
    for key, value in metadata.items():
        if isinstance(value, UUID):
            sanitized[key] = str(value)
        elif isinstance(value, (datetime, date)):
            sanitized[key] = value.isoformat()
        elif isinstance(value, Path):
            sanitized[key] = str(value)
        elif isinstance(value, dict):
            sanitized[key] = self._sanitize_metadata(value)  # Recursive
        elif isinstance(value, list):
            sanitized[key] = [
                self._sanitize_metadata({"item": item})["item"]
                if isinstance(item, dict)
                else str(item)
                if isinstance(item, (UUID, datetime, date, Path))
                else item
                for item in value
            ]
        else:
            sanitized[key] = value

    return sanitized
rendering:
  show_root_heading: true
  heading_level: 3

_update_corpus_state() ¤

Update corpus state in both memory and database.

Source code in lexos/corpus/sqlite/integration.py
def _update_corpus_state(self):
    """Update corpus state in both memory and database."""
    # Update in-memory state
    super()._update_corpus_state()

    # Update database metadata if enabled
    if self.db:
        with self.db.SessionLocal() as session:
            corpus_id = self.name or "default"
            corpus_metadata = (
                session.query(SQLiteMetadata)
                .filter(SQLiteMetadata.corpus_id == corpus_id)
                .first()
            )

            if corpus_metadata:
                corpus_metadata.num_docs = self.num_docs
                corpus_metadata.num_active_docs = self.num_active_docs
                corpus_metadata.num_tokens = self.num_tokens
                corpus_metadata.num_terms = self.num_terms
                corpus_metadata.metadata_json = json.dumps(self.meta, default=str)
                corpus_metadata.analysis_results_json = json.dumps(
                    self.analysis_results, default=str
                )
                corpus_metadata.corpus_fingerprint = (
                    self._generate_corpus_fingerprint()
                )
                corpus_metadata.updated_at = self._get_timestamp()

                session.commit()
rendering:
  show_root_heading: true
  heading_level: 3

add(content, name: Optional[str] = None, is_active: Optional[bool] = True, model: Optional[str] = None, extensions: Optional[list[str]] = None, metadata: Optional[dict[str, Any]] = None, id_type: Optional[str] = 'uuid4', cache: Optional[bool] = False, store_in_db: Optional[bool] = None) ¤

Add a record to the corpus with optional database storage.

Parameters:

Name Type Description Default
content str | Doc | Record

The content of the record

required
name Optional[str]

Optional name for the record

None
is_active Optional[bool]

Whether the record is active

True
model Optional[str]

spaCy model name for parsing

None
extensions Optional[list[str]]

List of spaCy extensions to add

None
metadata Optional[dict[str, Any]]

Optional metadata dictionary

None
id_type Optional[str]

Type of ID to generate ('uuid4' or 'int')

'uuid4'
cache Optional[bool]

Whether to cache the record in memory

False
store_in_db Optional[bool]

Whether to store the record in the database

None
Source code in lexos/corpus/sqlite/integration.py
@validate_call
def add(
    self,
    content,
    name: Optional[str] = None,
    is_active: Optional[bool] = True,
    model: Optional[str] = None,
    extensions: Optional[list[str]] = None,
    metadata: Optional[dict[str, Any]] = None,
    id_type: Optional[str] = "uuid4",
    cache: Optional[bool] = False,
    store_in_db: Optional[bool] = None,
):
    """Add a record to the corpus with optional database storage.

    Args:
        content (str | Doc | Record): The content of the record
        name (Optional[str]): Optional name for the record
        is_active (Optional[bool]): Whether the record is active
        model (Optional[str]): spaCy model name for parsing
        extensions (Optional[list[str]]): List of spaCy extensions to add
        metadata (Optional[dict[str, Any]]): Optional metadata dictionary
        id_type (Optional[str]): Type of ID to generate ('uuid4' or 'int')
        cache (Optional[bool]): Whether to cache the record in memory
        store_in_db (Optional[bool]): Whether to store the record in the database
    """
    # Sanitize metadata to ensure JSON-serializable types
    if metadata is not None:
        metadata = self._sanitize_metadata(metadata)

    # Determine storage strategy
    use_db = (
        store_in_db
        if store_in_db is not None
        else self.use_sqlite or self.sqlite_only
    )
    use_files = not self.sqlite_only

    # Get current record count to track new additions
    initial_record_count = len(self.records)

    # Add using parent implementation if using files
    if use_files:
        super().add(
            content=content,
            name=name,
            is_active=is_active,
            model=model,
            extensions=extensions,
            metadata=metadata,
            id_type=id_type,
            cache=cache,
        )
    else:
        # Database-only mode - implement add logic without file storage
        self._add_to_backend(
            content=content,
            name=name,
            is_active=is_active,
            model=model,
            extensions=extensions,
            metadata=metadata,
            id_type=id_type,
        )

    # Also store in database if enabled and we're using file storage
    if use_db and self.db and use_files:
        # Get the newly added records
        current_records = list(self.records.values())
        new_records = current_records[initial_record_count:]

        for record in new_records:
            try:
                # Note: Records can be parsed later if needed
                # The database efficiently stores both parsed and unparsed content

                self.db.add_record(record)
            except Exception as e:
                # Log error but don't fail the entire operation
                print(f"Warning: Failed to add record {record.id} to database: {e}")
rendering:
  show_root_heading: true
  heading_level: 3

filter_records(is_active: Optional[bool] = None, is_parsed: Optional[bool] = None, model: Optional[str] = None, min_tokens: Optional[int] = None, max_tokens: Optional[int] = None, limit: Optional[int] = None, use_database: bool = True) -> list[Record] ¤

Filter records by various criteria.

Parameters:

Name Type Description Default
is_active Optional[bool]

Filter by active status

None
is_parsed Optional[bool]

Filter by parsed status

None
model Optional[str]

Filter by spaCy model name

None
min_tokens Optional[int]

Minimum number of tokens

None
max_tokens Optional[int]

Maximum number of tokens

None
limit Optional[int]

Maximum number of results

None
use_database bool

Whether to use database filtering (vs in-memory)

True

Returns:

Type Description
list[Record]

List of matching Record objects

Source code in lexos/corpus/sqlite/integration.py
@validate_call
def filter_records(
    self,
    is_active: Optional[bool] = None,
    is_parsed: Optional[bool] = None,
    model: Optional[str] = None,
    min_tokens: Optional[int] = None,
    max_tokens: Optional[int] = None,
    limit: Optional[int] = None,
    use_database: bool = True,
) -> list[Record]:
    """Filter records by various criteria.

    Args:
        is_active: Filter by active status
        is_parsed: Filter by parsed status
        model: Filter by spaCy model name
        min_tokens: Minimum number of tokens
        max_tokens: Maximum number of tokens
        limit: Maximum number of results
        use_database: Whether to use database filtering (vs in-memory)

    Returns:
        List of matching Record objects
    """
    if use_database and self.db:
        return self.db.filter_records(
            is_active=is_active,
            is_parsed=is_parsed,
            model=model,
            min_tokens=min_tokens,
            max_tokens=max_tokens,
            limit=limit,
        )
    else:
        # Fallback to in-memory filtering
        filtered_records = []
        for record in self.records.values():
            if is_active is not None and record.is_active != is_active:
                continue
            if is_parsed is not None and record.is_parsed != is_parsed:
                continue
            if model is not None and record.model != model:
                continue
            if min_tokens is not None:
                try:
                    if record.num_tokens() < min_tokens:
                        continue
                except:
                    continue
            if max_tokens is not None:
                try:
                    if record.num_tokens() > max_tokens:
                        continue
                except:
                    continue

            filtered_records.append(record)

            if limit and len(filtered_records) >= limit:
                break

        return filtered_records
rendering:
  show_root_heading: true
  heading_level: 3

get_stats() -> dict[str, Any] ¤

Get corpus statistics from the database.

Returns:

Type Description
dict[str, Any]

Dictionary containing database-derived statistics

Raises:

Type Description
LexosException

If database is not enabled

Source code in lexos/corpus/sqlite/integration.py
@validate_call
def get_stats(self) -> dict[str, Any]:
    """Get corpus statistics from the database.

    Returns:
        Dictionary containing database-derived statistics

    Raises:
        LexosException: If database is not enabled
    """
    if not self.db:
        raise LexosException(
            "Database is not enabled. Initialize corpus with use_sqlite=True."
        )

    return self.db.get_stats()
rendering:
  show_root_heading: true
  heading_level: 3

search(query: str, limit: int = 100, include_inactive: bool = False, model_filter: Optional[str] = None, load_from_db: bool = True) -> list[Record] ¤

Perform full-text search on corpus records.

Parameters:

Name Type Description Default
query str

FTS5 search query string

required
limit int

Maximum number of results to return

100
include_inactive bool

Whether to include inactive records

False
model_filter Optional[str]

Optional filter by spaCy model name

None
load_from_db bool

Whether to load results from database (vs memory)

True

Returns:

Type Description
list[Record]

List of matching Record objects

Raises:

Type Description
LexosException

If database is not enabled

Source code in lexos/corpus/sqlite/integration.py
@validate_call
def search(
    self,
    query: str,
    limit: int = 100,
    include_inactive: bool = False,
    model_filter: Optional[str] = None,
    load_from_db: bool = True,
) -> list[Record]:
    """Perform full-text search on corpus records.

    Args:
        query: FTS5 search query string
        limit: Maximum number of results to return
        include_inactive: Whether to include inactive records
        model_filter: Optional filter by spaCy model name
        load_from_db: Whether to load results from database (vs memory)

    Returns:
        List of matching Record objects

    Raises:
        LexosException: If database is not enabled
    """
    if not self.db:
        raise LexosException(
            "Database is not enabled. Initialize corpus with use_sqlite=True to use search."
        )

    return self.db.search_records(
        query=query,
        limit=limit,
        include_inactive=include_inactive,
        model_filter=model_filter,
    )
rendering:
  show_root_heading: true
  heading_level: 3

sync(overwrite: bool = False) -> int ¤

Synchronize existing file-based records to the database.

This method loads records from the corpus directory on disk and adds them to the database. If records are already in memory, they will be used instead.

Parameters:

Name Type Description Default
overwrite bool

Whether to overwrite existing database records

False

Returns:

Type Description
int

Number of records synchronized

Raises:

Type Description
LexosException

If database is not enabled

Source code in lexos/corpus/sqlite/integration.py
@validate_call
def sync(self, overwrite: bool = False) -> int:
    """Synchronize existing file-based records to the database.

    This method loads records from the corpus directory on disk and adds them
    to the database. If records are already in memory, they will be used instead.

    Args:
        overwrite: Whether to overwrite existing database records

    Returns:
        Number of records synchronized

    Raises:
        LexosException: If database is not enabled
    """
    if not self.db:
        raise LexosException(
            "Database is not enabled. Initialize corpus with use_sqlite=True."
        )

    # Load records from disk if not already in memory
    if not self.records:
        self._load_records_from_disk()

    synced_count = 0

    for record in self.records.values():
        try:
            if overwrite:
                # Check if exists and update
                existing = self.db.get_record(str(record.id), include_doc=False)
                if existing:
                    self.db.update_record(record)
                else:
                    self.db.add_record(record)
            else:
                # Only add if doesn't exist
                existing = self.db.get_record(str(record.id), include_doc=False)
                if not existing:
                    self.db.add_record(record)

            synced_count += 1

        except Exception as e:
            # Log error but continue with other records
            print(f"Warning: Failed to sync record {record.id}: {str(e)}")

    return synced_count
rendering:
  show_root_heading: true
  heading_level: 3

load(include_docs: bool = False, active_only: bool = True) -> int ¤

Load records from database into memory.

Parameters:

Name Type Description Default
include_docs bool

Whether to deserialize spaCy Doc content

False
active_only bool

Whether to load only active records

True

Returns:

Type Description
int

Number of records loaded

Raises:

Type Description
LexosException

If database is not enabled

Source code in lexos/corpus/sqlite/integration.py
@validate_call
def load(self, include_docs: bool = False, active_only: bool = True) -> int:
    """Load records from database into memory.

    Args:
        include_docs: Whether to deserialize spaCy Doc content
        active_only: Whether to load only active records

    Returns:
        Number of records loaded

    Raises:
        LexosException: If database is not enabled
    """
    if not self.db:
        raise LexosException(
            "Database is not enabled. Initialize corpus with use_sqlite=True."
        )

    # Clear existing records if loading from database
    self.records.clear()
    self.names.clear()

    # Load records from database
    filters = {"is_active": True} if active_only else {}
    db_records = self.db.filter_records(**filters)

    loaded_count = 0
    for record in db_records:
        # Add to in-memory structures
        record_id_str = str(record.id)
        self.records[record_id_str] = record
        if record.name not in self.names:
            self.names[record.name] = []
        self.names[record.name].append(record_id_str)
        # Populate meta for loaded record so Corpus metadata is consistent
        try:
            meta_entry = record.model_dump(
                exclude=["content", "terms", "text", "tokens"], mode="json"
            )
            if "id" in meta_entry:
                meta_entry["id"] = str(meta_entry["id"])
            meta_entry["num_tokens"] = (
                record.num_tokens() if record.is_parsed else 0
            )
            meta_entry["num_terms"] = record.num_terms() if record.is_parsed else 0
            self.meta[record_id_str] = meta_entry
        except Exception:
            self.meta[record_id_str] = {
                "id": record_id_str,
                "name": record.name,
                "is_active": record.is_active,
                "num_tokens": record.num_tokens() if record.is_parsed else 0,
                "num_terms": record.num_terms() if record.is_parsed else 0,
            }
        loaded_count += 1

    # Update corpus state
    self._update_corpus_state()

    return loaded_count
rendering:
  show_root_heading: true
  heading_level: 3

close() ¤

Close database connections and clean up resources.

Source code in lexos/corpus/sqlite/integration.py
def close(self):
    """Close database connections and clean up resources."""
    if self.db:
        self.db.close()
rendering:
  show_root_heading: true
  heading_level: 3

create_corpus(corpus_dir: str = 'corpus', sqlite_path: Optional[Union[str, Path]] = None, name: Optional[str] = None, sqlite_only: bool = False, **kwargs: Any) -> SQLiteCorpus ¤

Convenience function to create a SQLite-enabled corpus with sensible defaults.

Parameters:

Name Type Description Default
corpus_dir str

Directory for file-based storage

'corpus'
sqlite_path Optional[Union[str, Path]]

Path to SQLite database (None for auto-generated)

None
name Optional[str]

Corpus name

None
sqlite_only bool

Whether to use database-only mode

False
**kwargs Any

Additional Corpus initialization parameters

{}

Returns:

Type Description
SQLiteCorpus

SQLiteCorpus instance

Source code in lexos/corpus/sqlite/integration.py
def create_corpus(
    corpus_dir: str = "corpus",
    sqlite_path: Optional[Union[str, Path]] = None,
    name: Optional[str] = None,
    sqlite_only: bool = False,
    **kwargs: Any,
) -> SQLiteCorpus:
    """Convenience function to create a SQLite-enabled corpus with sensible defaults.

    Args:
        corpus_dir (str): Directory for file-based storage
        sqlite_path (Optional[Union[str, Path]]): Path to SQLite database (None for auto-generated)
        name (Optional[str]): Corpus name
        sqlite_only (bool): Whether to use database-only mode
        **kwargs (Any): Additional Corpus initialization parameters

    Returns:
        SQLiteCorpus instance
    """
    if sqlite_path is None:
        sqlite_path = f"{corpus_dir}/corpus.db"

    return SQLiteCorpus(
        corpus_dir=corpus_dir,
        name=name,
        sqlite_path=sqlite_path,
        use_sqlite=True,
        sqlite_only=sqlite_only,
        **kwargs,
    )
rendering:
  show_root_heading: true
  heading_level: 3