Skip to content

database¤

Module Description¤

SQLite database backend for SQLAlchemy to integrate with the Lexos Corpus class.

This is a compatibility layer for SQLModel 0.0.24 that works around primary key issues.

Last Updated: November 20, 2025 Last Tested: November 20, 2025

SQLiteRecord ¤

Bases: Base

SQLAlchemy table for record storage.

Source code in lexos/corpus/sqlite/database.py
class SQLiteRecord(Base):
    """SQLAlchemy table for record storage."""

    __tablename__ = "records"

    # Primary identification
    id = Column(String, primary_key=True)
    name = Column(String)

    # Content storage
    content_text = Column(Text, nullable=False)
    content_doc_bytes = Column(LargeBinary)

    # Status and metadata
    is_active = Column(Boolean, default=True)
    is_parsed = Column(Boolean, default=False)
    model = Column(String)

    # Content statistics (denormalized for query performance)
    num_tokens = Column(Integer, default=0)
    num_terms = Column(Integer, default=0)
    vocab_density = Column(Float, default=0.0)

    # Serialized metadata as JSON string
    metadata_json = Column(Text, default="{}")
    extensions_list = Column(Text, default="[]")

    # Data integrity and versioning
    data_source = Column(String)
    content_hash = Column(String, nullable=False)
    created_at = Column(String, nullable=False)
    updated_at = Column(String, nullable=False)
rendering:
  show_root_heading: true
  heading_level: 3

SQLiteMetadata ¤

Bases: Base

SQLAlchemy table for corpus metadata.

Source code in lexos/corpus/sqlite/database.py
class SQLiteMetadata(Base):
    """SQLAlchemy table for corpus metadata."""

    __tablename__ = "corpus_metadata"

    # Corpus identification
    corpus_id = Column(String, primary_key=True)
    name = Column(String)

    # Aggregate statistics
    num_docs = Column(Integer, default=0)
    num_active_docs = Column(Integer, default=0)
    num_tokens = Column(Integer, default=0)
    num_terms = Column(Integer, default=0)

    # Configuration
    corpus_dir = Column(String, nullable=False)

    # Serialized metadata
    metadata_json = Column(Text, default="{}")
    analysis_results_json = Column(Text, default="{}")

    # Versioning and integrity
    corpus_fingerprint = Column(String, nullable=False)
    created_at = Column(String, nullable=False)
    updated_at = Column(String, nullable=False)
rendering:
  show_root_heading: true
  heading_level: 3

SQLiteBackend ¤

Database interface for corpus operations using SQLite with full-text search.

Methods:

Name Description
__del__

Destructor to ensure database connections are closed.

__init__

Initialize the corpus database.

add_record

Add a Record to the database.

close

Close the database connection and clean up resources.

delete_record

Delete a record from the database.

filter_records

Filter records by various criteria.

get_record

Retrieve a Record from the database.

get_stats

Get aggregate corpus statistics from the database.

search_records

Perform full-text search on records.

update_record

Update an existing record in the database.

Source code in lexos/corpus/sqlite/database.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
class SQLiteBackend:
    """Database interface for corpus operations using SQLite with full-text search."""

    def __del__(self):
        """Destructor to ensure database connections are closed."""
        try:
            self.close()
        except:
            pass  # Ignore errors during cleanup

    def __init__(self, database_path: Union[str, Path] = ":memory:", **kwargs: Any):
        """Initialize the corpus database.

        Args:
            database_path: Path to SQLite database file, or ":memory:" for in-memory database
            **kwargs: Additional keyword arguments for SQLAlchemy engine creation
        """
        self.database_path = str(database_path)
        self.engine = create_engine(f"sqlite:///{self.database_path}", **kwargs)
        self.SessionLocal = sessionmaker(bind=self.engine)
        self._initialize_database()

    def _db_record_to_record(
        self,
        db_record: SQLiteRecord,
        include_doc: bool = True,
        model_cache: Optional[LexosModelCache] = None,
    ) -> Record:
        """Convert a SQLiteRecord back to a Record object."""
        # Deserialize metadata
        metadata = json.loads(db_record.metadata_json)
        extensions = json.loads(db_record.extensions_list)

        # Handle content deserialization
        if include_doc and db_record.content_doc_bytes and db_record.is_parsed:
            # Deserialize spaCy Doc
            content = self._deserialize_doc_content(
                db_record.content_doc_bytes, db_record.model, model_cache
            )
        else:
            # Use text content
            content = db_record.content_text

        # Create Record object
        record = Record(
            id=db_record.id,
            name=db_record.name,
            is_active=db_record.is_active,
            content=content,
            model=db_record.model,
            extensions=extensions,
            data_source=db_record.data_source,
            meta=metadata,
        )

        return record

    def _deserialize_doc_content(
        self,
        doc_bytes: bytes,
        model: Optional[str] = None,
        model_cache: Optional[LexosModelCache] = None,
    ) -> Doc:
        """Deserialize spaCy Doc from bytes."""
        try:
            # Use Record's deserialization method
            temp_record = Record(id=str(uuid4()), content="")
            return temp_record._doc_from_bytes(doc_bytes, model, model_cache)
        except Exception as e:
            raise LexosException(f"Failed to deserialize spaCy Doc: {str(e)}")

    def _initialize_database(self):
        """Initialize database schema and enable full-text search."""
        # Create all tables
        Base.metadata.create_all(self.engine)

        # Enable FTS5 full-text search
        with self.SessionLocal() as session:
            # Create FTS5 virtual table for full-text search
            session.execute(
                text("""
                CREATE VIRTUAL TABLE IF NOT EXISTS records_fts USING fts5(
                    record_id,
                    name,
                    content_text,
                    metadata_text
                )
            """)
            )

            # Create triggers to keep FTS table synchronized
            session.execute(
                text("""
                CREATE TRIGGER IF NOT EXISTS records_fts_insert AFTER INSERT ON records
                BEGIN
                    INSERT INTO records_fts(record_id, name, content_text, metadata_text)
                    VALUES (new.id, new.name, new.content_text, new.metadata_json);
                END
            """)
            )

            session.execute(
                text("""
                CREATE TRIGGER IF NOT EXISTS records_fts_delete AFTER DELETE ON records
                BEGIN
                    DELETE FROM records_fts WHERE record_id = old.id;
                END
            """)
            )

            session.execute(
                text("""
                CREATE TRIGGER IF NOT EXISTS records_fts_update AFTER UPDATE ON records
                BEGIN
                    UPDATE records_fts
                    SET name = new.name,
                        content_text = new.content_text,
                        metadata_text = new.metadata_json
                    WHERE record_id = new.id;
                END
            """)
            )

            session.commit()

    def _record_to_db_record(self, record: Record) -> SQLiteRecord:
        """Convert a Record object to SQLiteRecord for database storage."""
        # Extract text content
        if record.is_parsed and isinstance(record.content, Doc):
            content_text = record.content.text
            # Serialize spaCy Doc if parsed
            content_doc_bytes = record._doc_to_bytes()
        else:
            content_text = str(record.content) if record.content else ""
            content_doc_bytes = None

        # Calculate content hash
        content_hash = hashlib.sha256(content_text.encode()).hexdigest()

        # Calculate statistics
        num_tokens = record.num_tokens() if record.is_parsed else 0
        num_terms = record.num_terms() if record.is_parsed else 0
        vocab_density = record.vocab_density() if record.is_parsed else 0.0

        # Serialize metadata
        metadata_json = json.dumps(record.meta, default=str)
        extensions_list = json.dumps(record.extensions)

        timestamp = datetime.now().isoformat()

        db_record = SQLiteRecord()
        db_record.id = str(record.id)
        db_record.name = record.name
        db_record.content_text = content_text
        db_record.content_doc_bytes = content_doc_bytes
        db_record.is_active = record.is_active
        db_record.is_parsed = record.is_parsed
        db_record.model = record.model
        db_record.num_tokens = num_tokens
        db_record.num_terms = num_terms
        db_record.vocab_density = vocab_density
        db_record.metadata_json = metadata_json
        db_record.extensions_list = extensions_list
        db_record.data_source = record.data_source
        db_record.content_hash = content_hash
        db_record.created_at = timestamp
        db_record.updated_at = timestamp

        return db_record

    def add_record(self, record: Record) -> None:
        """Add a Record to the database."""
        with self.SessionLocal() as session:
            # Check if record already exists
            existing = (
                session.query(SQLiteRecord)
                .filter(SQLiteRecord.id == str(record.id))
                .first()
            )
            if existing:
                raise LexosException(
                    f"Record with ID {record.id} already exists in database"
                )

            # Convert Record to SQLiteRecord
            db_record = self._record_to_db_record(record)

            session.add(db_record)
            session.commit()

    def close(self):
        """Close the database connection and clean up resources."""
        if hasattr(self, "engine") and self.engine:
            self.engine.dispose()

    def delete_record(self, record_id: str) -> bool:
        """Delete a record from the database."""
        with self.SessionLocal() as session:
            record = (
                session.query(SQLiteRecord).filter(SQLiteRecord.id == record_id).first()
            )
            if record:
                session.delete(record)
                session.commit()
                return True
            return False

    def filter_records(
        self,
        is_active: Optional[bool] = None,
        is_parsed: Optional[bool] = None,
        model: Optional[str] = None,
        min_tokens: Optional[int] = None,
        max_tokens: Optional[int] = None,
        limit: Optional[int] = None,
    ) -> list[Record]:
        """Filter records by various criteria."""
        with self.SessionLocal() as session:
            query = session.query(SQLiteRecord)

            if is_active is not None:
                query = query.filter(SQLiteRecord.is_active == is_active)
            if is_parsed is not None:
                query = query.filter(SQLiteRecord.is_parsed == is_parsed)
            if model is not None:
                query = query.filter(SQLiteRecord.model == model)
            if min_tokens is not None:
                query = query.filter(SQLiteRecord.num_tokens >= min_tokens)
            if max_tokens is not None:
                query = query.filter(SQLiteRecord.num_tokens <= max_tokens)

            if limit is not None:
                query = query.limit(limit)

            results = query.all()

            return [
                self._db_record_to_record(db_record, include_doc=False)
                for db_record in results
            ]

    def get_stats(self) -> dict[str, Any]:
        """Get aggregate corpus statistics from the database."""
        with self.SessionLocal() as session:
            # Basic counts
            total_records = session.execute(
                text("SELECT COUNT(*) FROM records")
            ).scalar()
            active_records = session.execute(
                text("SELECT COUNT(*) FROM records WHERE is_active = 1")
            ).scalar()
            parsed_records = session.execute(
                text("SELECT COUNT(*) FROM records WHERE is_parsed = 1")
            ).scalar()

            # Token statistics
            total_tokens = (
                session.execute(
                    text("SELECT SUM(num_tokens) FROM records WHERE is_active = 1")
                ).scalar()
                or 0
            )
            total_terms = (
                session.execute(
                    text("SELECT SUM(num_terms) FROM records WHERE is_active = 1")
                ).scalar()
                or 0
            )

            # Vocabulary density statistics
            avg_vocab_density = (
                session.execute(
                    text(
                        "SELECT AVG(vocab_density) FROM records WHERE is_active = 1 AND num_tokens > 0"
                    )
                ).scalar()
                or 0
            )

            return {
                "total_records": total_records,
                "active_records": active_records,
                "parsed_records": parsed_records,
                "total_tokens": total_tokens,
                "total_terms": total_terms,
                "average_vocab_density": avg_vocab_density,
            }

    # Note: `get_stats()` is the canonical method name. Older code that used
    # `get_corpus_stats()` should call `get_stats()` instead. This wrapper was
    # removed to keep the sqlite submodule's API consistent with the
    # `Corpus` public API. If you need backward compatibility across the
    # deprecated database modules, see `src/lexos/database/database_simple.py`.

    def get_record(
        self,
        record_id: str,
        include_doc: bool = True,
        model_cache: Optional[LexosModelCache] = None,
    ) -> Optional[Record]:
        """Retrieve a Record from the database."""
        with self.SessionLocal() as session:
            db_record = (
                session.query(SQLiteRecord).filter(SQLiteRecord.id == record_id).first()
            )
            if not db_record:
                return None

            return self._db_record_to_record(db_record, include_doc, model_cache)

    def search_records(
        self,
        query: str,
        limit: int = 100,
        include_inactive: bool = False,
        model_filter: Optional[str] = None,
    ) -> list[Record]:
        """Perform full-text search on records."""
        with self.SessionLocal() as session:
            # Build FTS query - Fix: Use proper FTS5 syntax and avoid duplicate joins
            fts_query = text("""
                SELECT DISTINCT r.* FROM records r
                WHERE r.id IN (
                    SELECT record_id FROM records_fts
                    WHERE records_fts MATCH :query
                )
                AND (:include_inactive OR r.is_active = 1)
                AND (:model_filter IS NULL OR r.model = :model_filter)
                ORDER BY r.created_at DESC
                LIMIT :limit
            """)

            result = session.execute(
                fts_query,
                {
                    "query": query,
                    "include_inactive": include_inactive,
                    "model_filter": model_filter,
                    "limit": limit,
                },
            )

            records = []
            for row in result:
                # Convert row to SQLiteRecord manually
                db_record = SQLiteRecord()
                for i, col in enumerate(SQLiteRecord.__table__.columns):
                    setattr(db_record, col.name, row[i])

                record = self._db_record_to_record(db_record, include_doc=False)
                records.append(record)

            return records

    def update_record(self, record: Record) -> None:
        """Update an existing record in the database."""
        with self.SessionLocal() as session:
            existing = (
                session.query(SQLiteRecord)
                .filter(SQLiteRecord.id == str(record.id))
                .first()
            )
            if not existing:
                raise LexosException(
                    f"Record with ID {record.id} not found in database"
                )

            # Update the existing record
            updated_record = self._record_to_db_record(record)
            for key, value in updated_record.__dict__.items():
                if (
                    key != "_sa_instance_state" and key != "id"
                ):  # Skip SQLAlchemy metadata and primary key
                    setattr(existing, key, value)

            session.commit()

__del__() ¤

Destructor to ensure database connections are closed.

Source code in lexos/corpus/sqlite/database.py
def __del__(self):
    """Destructor to ensure database connections are closed."""
    try:
        self.close()
    except:
        pass  # Ignore errors during cleanup

__init__(database_path: Union[str, Path] = ':memory:', **kwargs: Any) ¤

Initialize the corpus database.

Parameters:

Name Type Description Default
database_path Union[str, Path]

Path to SQLite database file, or ":memory:" for in-memory database

':memory:'
**kwargs Any

Additional keyword arguments for SQLAlchemy engine creation

{}
Source code in lexos/corpus/sqlite/database.py
def __init__(self, database_path: Union[str, Path] = ":memory:", **kwargs: Any):
    """Initialize the corpus database.

    Args:
        database_path: Path to SQLite database file, or ":memory:" for in-memory database
        **kwargs: Additional keyword arguments for SQLAlchemy engine creation
    """
    self.database_path = str(database_path)
    self.engine = create_engine(f"sqlite:///{self.database_path}", **kwargs)
    self.SessionLocal = sessionmaker(bind=self.engine)
    self._initialize_database()

add_record(record: Record) -> None ¤

Add a Record to the database.

Source code in lexos/corpus/sqlite/database.py
def add_record(self, record: Record) -> None:
    """Add a Record to the database."""
    with self.SessionLocal() as session:
        # Check if record already exists
        existing = (
            session.query(SQLiteRecord)
            .filter(SQLiteRecord.id == str(record.id))
            .first()
        )
        if existing:
            raise LexosException(
                f"Record with ID {record.id} already exists in database"
            )

        # Convert Record to SQLiteRecord
        db_record = self._record_to_db_record(record)

        session.add(db_record)
        session.commit()

close() ¤

Close the database connection and clean up resources.

Source code in lexos/corpus/sqlite/database.py
def close(self):
    """Close the database connection and clean up resources."""
    if hasattr(self, "engine") and self.engine:
        self.engine.dispose()

delete_record(record_id: str) -> bool ¤

Delete a record from the database.

Source code in lexos/corpus/sqlite/database.py
def delete_record(self, record_id: str) -> bool:
    """Delete a record from the database."""
    with self.SessionLocal() as session:
        record = (
            session.query(SQLiteRecord).filter(SQLiteRecord.id == record_id).first()
        )
        if record:
            session.delete(record)
            session.commit()
            return True
        return False

filter_records(is_active: Optional[bool] = None, is_parsed: Optional[bool] = None, model: Optional[str] = None, min_tokens: Optional[int] = None, max_tokens: Optional[int] = None, limit: Optional[int] = None) -> list[Record] ¤

Filter records by various criteria.

Source code in lexos/corpus/sqlite/database.py
def filter_records(
    self,
    is_active: Optional[bool] = None,
    is_parsed: Optional[bool] = None,
    model: Optional[str] = None,
    min_tokens: Optional[int] = None,
    max_tokens: Optional[int] = None,
    limit: Optional[int] = None,
) -> list[Record]:
    """Filter records by various criteria."""
    with self.SessionLocal() as session:
        query = session.query(SQLiteRecord)

        if is_active is not None:
            query = query.filter(SQLiteRecord.is_active == is_active)
        if is_parsed is not None:
            query = query.filter(SQLiteRecord.is_parsed == is_parsed)
        if model is not None:
            query = query.filter(SQLiteRecord.model == model)
        if min_tokens is not None:
            query = query.filter(SQLiteRecord.num_tokens >= min_tokens)
        if max_tokens is not None:
            query = query.filter(SQLiteRecord.num_tokens <= max_tokens)

        if limit is not None:
            query = query.limit(limit)

        results = query.all()

        return [
            self._db_record_to_record(db_record, include_doc=False)
            for db_record in results
        ]

get_record(record_id: str, include_doc: bool = True, model_cache: Optional[LexosModelCache] = None) -> Optional[Record] ¤

Retrieve a Record from the database.

Source code in lexos/corpus/sqlite/database.py
def get_record(
    self,
    record_id: str,
    include_doc: bool = True,
    model_cache: Optional[LexosModelCache] = None,
) -> Optional[Record]:
    """Retrieve a Record from the database."""
    with self.SessionLocal() as session:
        db_record = (
            session.query(SQLiteRecord).filter(SQLiteRecord.id == record_id).first()
        )
        if not db_record:
            return None

        return self._db_record_to_record(db_record, include_doc, model_cache)

get_stats() -> dict[str, Any] ¤

Get aggregate corpus statistics from the database.

Source code in lexos/corpus/sqlite/database.py
def get_stats(self) -> dict[str, Any]:
    """Get aggregate corpus statistics from the database."""
    with self.SessionLocal() as session:
        # Basic counts
        total_records = session.execute(
            text("SELECT COUNT(*) FROM records")
        ).scalar()
        active_records = session.execute(
            text("SELECT COUNT(*) FROM records WHERE is_active = 1")
        ).scalar()
        parsed_records = session.execute(
            text("SELECT COUNT(*) FROM records WHERE is_parsed = 1")
        ).scalar()

        # Token statistics
        total_tokens = (
            session.execute(
                text("SELECT SUM(num_tokens) FROM records WHERE is_active = 1")
            ).scalar()
            or 0
        )
        total_terms = (
            session.execute(
                text("SELECT SUM(num_terms) FROM records WHERE is_active = 1")
            ).scalar()
            or 0
        )

        # Vocabulary density statistics
        avg_vocab_density = (
            session.execute(
                text(
                    "SELECT AVG(vocab_density) FROM records WHERE is_active = 1 AND num_tokens > 0"
                )
            ).scalar()
            or 0
        )

        return {
            "total_records": total_records,
            "active_records": active_records,
            "parsed_records": parsed_records,
            "total_tokens": total_tokens,
            "total_terms": total_terms,
            "average_vocab_density": avg_vocab_density,
        }

search_records(query: str, limit: int = 100, include_inactive: bool = False, model_filter: Optional[str] = None) -> list[Record] ¤

Perform full-text search on records.

Source code in lexos/corpus/sqlite/database.py
def search_records(
    self,
    query: str,
    limit: int = 100,
    include_inactive: bool = False,
    model_filter: Optional[str] = None,
) -> list[Record]:
    """Perform full-text search on records."""
    with self.SessionLocal() as session:
        # Build FTS query - Fix: Use proper FTS5 syntax and avoid duplicate joins
        fts_query = text("""
            SELECT DISTINCT r.* FROM records r
            WHERE r.id IN (
                SELECT record_id FROM records_fts
                WHERE records_fts MATCH :query
            )
            AND (:include_inactive OR r.is_active = 1)
            AND (:model_filter IS NULL OR r.model = :model_filter)
            ORDER BY r.created_at DESC
            LIMIT :limit
        """)

        result = session.execute(
            fts_query,
            {
                "query": query,
                "include_inactive": include_inactive,
                "model_filter": model_filter,
                "limit": limit,
            },
        )

        records = []
        for row in result:
            # Convert row to SQLiteRecord manually
            db_record = SQLiteRecord()
            for i, col in enumerate(SQLiteRecord.__table__.columns):
                setattr(db_record, col.name, row[i])

            record = self._db_record_to_record(db_record, include_doc=False)
            records.append(record)

        return records

update_record(record: Record) -> None ¤

Update an existing record in the database.

Source code in lexos/corpus/sqlite/database.py
def update_record(self, record: Record) -> None:
    """Update an existing record in the database."""
    with self.SessionLocal() as session:
        existing = (
            session.query(SQLiteRecord)
            .filter(SQLiteRecord.id == str(record.id))
            .first()
        )
        if not existing:
            raise LexosException(
                f"Record with ID {record.id} not found in database"
            )

        # Update the existing record
        updated_record = self._record_to_db_record(record)
        for key, value in updated_record.__dict__.items():
            if (
                key != "_sa_instance_state" and key != "id"
            ):  # Skip SQLAlchemy metadata and primary key
                setattr(existing, key, value)

        session.commit()
rendering:
  show_root_heading: true
  heading_level: 3

__del__() ¤

Destructor to ensure database connections are closed.

Source code in lexos/corpus/sqlite/database.py
def __del__(self):
    """Destructor to ensure database connections are closed."""
    try:
        self.close()
    except:
        pass  # Ignore errors during cleanup
rendering:
  show_root_heading: true
  heading_level: 3

__init__(database_path: Union[str, Path] = ':memory:', **kwargs: Any) ¤

Initialize the corpus database.

Parameters:

Name Type Description Default
database_path Union[str, Path]

Path to SQLite database file, or ":memory:" for in-memory database

':memory:'
**kwargs Any

Additional keyword arguments for SQLAlchemy engine creation

{}
Source code in lexos/corpus/sqlite/database.py
def __init__(self, database_path: Union[str, Path] = ":memory:", **kwargs: Any):
    """Initialize the corpus database.

    Args:
        database_path: Path to SQLite database file, or ":memory:" for in-memory database
        **kwargs: Additional keyword arguments for SQLAlchemy engine creation
    """
    self.database_path = str(database_path)
    self.engine = create_engine(f"sqlite:///{self.database_path}", **kwargs)
    self.SessionLocal = sessionmaker(bind=self.engine)
    self._initialize_database()
rendering:
  show_root_heading: true
  heading_level: 3

_db_record_to_record(db_record: SQLiteRecord, include_doc: bool = True, model_cache: Optional[LexosModelCache] = None) -> Record ¤

Convert a SQLiteRecord back to a Record object.

Source code in lexos/corpus/sqlite/database.py
def _db_record_to_record(
    self,
    db_record: SQLiteRecord,
    include_doc: bool = True,
    model_cache: Optional[LexosModelCache] = None,
) -> Record:
    """Convert a SQLiteRecord back to a Record object."""
    # Deserialize metadata
    metadata = json.loads(db_record.metadata_json)
    extensions = json.loads(db_record.extensions_list)

    # Handle content deserialization
    if include_doc and db_record.content_doc_bytes and db_record.is_parsed:
        # Deserialize spaCy Doc
        content = self._deserialize_doc_content(
            db_record.content_doc_bytes, db_record.model, model_cache
        )
    else:
        # Use text content
        content = db_record.content_text

    # Create Record object
    record = Record(
        id=db_record.id,
        name=db_record.name,
        is_active=db_record.is_active,
        content=content,
        model=db_record.model,
        extensions=extensions,
        data_source=db_record.data_source,
        meta=metadata,
    )

    return record
rendering:
  show_root_heading: true
  heading_level: 3

_deserialize_doc_content(doc_bytes: bytes, model: Optional[str] = None, model_cache: Optional[LexosModelCache] = None) -> Doc ¤

Deserialize spaCy Doc from bytes.

Source code in lexos/corpus/sqlite/database.py
def _deserialize_doc_content(
    self,
    doc_bytes: bytes,
    model: Optional[str] = None,
    model_cache: Optional[LexosModelCache] = None,
) -> Doc:
    """Deserialize spaCy Doc from bytes."""
    try:
        # Use Record's deserialization method
        temp_record = Record(id=str(uuid4()), content="")
        return temp_record._doc_from_bytes(doc_bytes, model, model_cache)
    except Exception as e:
        raise LexosException(f"Failed to deserialize spaCy Doc: {str(e)}")
rendering:
  show_root_heading: true
  heading_level: 3

_initialize_database() ¤

Initialize database schema and enable full-text search.

Source code in lexos/corpus/sqlite/database.py
def _initialize_database(self):
    """Initialize database schema and enable full-text search."""
    # Create all tables
    Base.metadata.create_all(self.engine)

    # Enable FTS5 full-text search
    with self.SessionLocal() as session:
        # Create FTS5 virtual table for full-text search
        session.execute(
            text("""
            CREATE VIRTUAL TABLE IF NOT EXISTS records_fts USING fts5(
                record_id,
                name,
                content_text,
                metadata_text
            )
        """)
        )

        # Create triggers to keep FTS table synchronized
        session.execute(
            text("""
            CREATE TRIGGER IF NOT EXISTS records_fts_insert AFTER INSERT ON records
            BEGIN
                INSERT INTO records_fts(record_id, name, content_text, metadata_text)
                VALUES (new.id, new.name, new.content_text, new.metadata_json);
            END
        """)
        )

        session.execute(
            text("""
            CREATE TRIGGER IF NOT EXISTS records_fts_delete AFTER DELETE ON records
            BEGIN
                DELETE FROM records_fts WHERE record_id = old.id;
            END
        """)
        )

        session.execute(
            text("""
            CREATE TRIGGER IF NOT EXISTS records_fts_update AFTER UPDATE ON records
            BEGIN
                UPDATE records_fts
                SET name = new.name,
                    content_text = new.content_text,
                    metadata_text = new.metadata_json
                WHERE record_id = new.id;
            END
        """)
        )

        session.commit()
rendering:
  show_root_heading: true
  heading_level: 3

_record_to_db_record(record: Record) -> SQLiteRecord ¤

Convert a Record object to SQLiteRecord for database storage.

Source code in lexos/corpus/sqlite/database.py
def _record_to_db_record(self, record: Record) -> SQLiteRecord:
    """Convert a Record object to SQLiteRecord for database storage."""
    # Extract text content
    if record.is_parsed and isinstance(record.content, Doc):
        content_text = record.content.text
        # Serialize spaCy Doc if parsed
        content_doc_bytes = record._doc_to_bytes()
    else:
        content_text = str(record.content) if record.content else ""
        content_doc_bytes = None

    # Calculate content hash
    content_hash = hashlib.sha256(content_text.encode()).hexdigest()

    # Calculate statistics
    num_tokens = record.num_tokens() if record.is_parsed else 0
    num_terms = record.num_terms() if record.is_parsed else 0
    vocab_density = record.vocab_density() if record.is_parsed else 0.0

    # Serialize metadata
    metadata_json = json.dumps(record.meta, default=str)
    extensions_list = json.dumps(record.extensions)

    timestamp = datetime.now().isoformat()

    db_record = SQLiteRecord()
    db_record.id = str(record.id)
    db_record.name = record.name
    db_record.content_text = content_text
    db_record.content_doc_bytes = content_doc_bytes
    db_record.is_active = record.is_active
    db_record.is_parsed = record.is_parsed
    db_record.model = record.model
    db_record.num_tokens = num_tokens
    db_record.num_terms = num_terms
    db_record.vocab_density = vocab_density
    db_record.metadata_json = metadata_json
    db_record.extensions_list = extensions_list
    db_record.data_source = record.data_source
    db_record.content_hash = content_hash
    db_record.created_at = timestamp
    db_record.updated_at = timestamp

    return db_record
rendering:
  show_root_heading: true
  heading_level: 3

add_record(record: Record) -> None ¤

Add a Record to the database.

Source code in lexos/corpus/sqlite/database.py
def add_record(self, record: Record) -> None:
    """Add a Record to the database."""
    with self.SessionLocal() as session:
        # Check if record already exists
        existing = (
            session.query(SQLiteRecord)
            .filter(SQLiteRecord.id == str(record.id))
            .first()
        )
        if existing:
            raise LexosException(
                f"Record with ID {record.id} already exists in database"
            )

        # Convert Record to SQLiteRecord
        db_record = self._record_to_db_record(record)

        session.add(db_record)
        session.commit()
rendering:
  show_root_heading: true
  heading_level: 3

close() ¤

Close the database connection and clean up resources.

Source code in lexos/corpus/sqlite/database.py
def close(self):
    """Close the database connection and clean up resources."""
    if hasattr(self, "engine") and self.engine:
        self.engine.dispose()
rendering:
  show_root_heading: true
  heading_level: 3

delete_record(record_id: str) -> bool ¤

Delete a record from the database.

Source code in lexos/corpus/sqlite/database.py
def delete_record(self, record_id: str) -> bool:
    """Delete a record from the database."""
    with self.SessionLocal() as session:
        record = (
            session.query(SQLiteRecord).filter(SQLiteRecord.id == record_id).first()
        )
        if record:
            session.delete(record)
            session.commit()
            return True
        return False
rendering:
  show_root_heading: true
  heading_level: 3

filter_records(is_active: Optional[bool] = None, is_parsed: Optional[bool] = None, model: Optional[str] = None, min_tokens: Optional[int] = None, max_tokens: Optional[int] = None, limit: Optional[int] = None) -> list[Record] ¤

Filter records by various criteria.

Source code in lexos/corpus/sqlite/database.py
def filter_records(
    self,
    is_active: Optional[bool] = None,
    is_parsed: Optional[bool] = None,
    model: Optional[str] = None,
    min_tokens: Optional[int] = None,
    max_tokens: Optional[int] = None,
    limit: Optional[int] = None,
) -> list[Record]:
    """Filter records by various criteria."""
    with self.SessionLocal() as session:
        query = session.query(SQLiteRecord)

        if is_active is not None:
            query = query.filter(SQLiteRecord.is_active == is_active)
        if is_parsed is not None:
            query = query.filter(SQLiteRecord.is_parsed == is_parsed)
        if model is not None:
            query = query.filter(SQLiteRecord.model == model)
        if min_tokens is not None:
            query = query.filter(SQLiteRecord.num_tokens >= min_tokens)
        if max_tokens is not None:
            query = query.filter(SQLiteRecord.num_tokens <= max_tokens)

        if limit is not None:
            query = query.limit(limit)

        results = query.all()

        return [
            self._db_record_to_record(db_record, include_doc=False)
            for db_record in results
        ]
rendering:
  show_root_heading: true
  heading_level: 3

get_stats() -> dict[str, Any] ¤

Get aggregate corpus statistics from the database.

Source code in lexos/corpus/sqlite/database.py
def get_stats(self) -> dict[str, Any]:
    """Get aggregate corpus statistics from the database."""
    with self.SessionLocal() as session:
        # Basic counts
        total_records = session.execute(
            text("SELECT COUNT(*) FROM records")
        ).scalar()
        active_records = session.execute(
            text("SELECT COUNT(*) FROM records WHERE is_active = 1")
        ).scalar()
        parsed_records = session.execute(
            text("SELECT COUNT(*) FROM records WHERE is_parsed = 1")
        ).scalar()

        # Token statistics
        total_tokens = (
            session.execute(
                text("SELECT SUM(num_tokens) FROM records WHERE is_active = 1")
            ).scalar()
            or 0
        )
        total_terms = (
            session.execute(
                text("SELECT SUM(num_terms) FROM records WHERE is_active = 1")
            ).scalar()
            or 0
        )

        # Vocabulary density statistics
        avg_vocab_density = (
            session.execute(
                text(
                    "SELECT AVG(vocab_density) FROM records WHERE is_active = 1 AND num_tokens > 0"
                )
            ).scalar()
            or 0
        )

        return {
            "total_records": total_records,
            "active_records": active_records,
            "parsed_records": parsed_records,
            "total_tokens": total_tokens,
            "total_terms": total_terms,
            "average_vocab_density": avg_vocab_density,
        }
rendering:
  show_root_heading: true
  heading_level: 3

get_record(record_id: str, include_doc: bool = True, model_cache: Optional[LexosModelCache] = None) -> Optional[Record] ¤

Retrieve a Record from the database.

Source code in lexos/corpus/sqlite/database.py
def get_record(
    self,
    record_id: str,
    include_doc: bool = True,
    model_cache: Optional[LexosModelCache] = None,
) -> Optional[Record]:
    """Retrieve a Record from the database."""
    with self.SessionLocal() as session:
        db_record = (
            session.query(SQLiteRecord).filter(SQLiteRecord.id == record_id).first()
        )
        if not db_record:
            return None

        return self._db_record_to_record(db_record, include_doc, model_cache)
rendering:
  show_root_heading: true
  heading_level: 3

search_records(query: str, limit: int = 100, include_inactive: bool = False, model_filter: Optional[str] = None) -> list[Record] ¤

Perform full-text search on records.

Source code in lexos/corpus/sqlite/database.py
def search_records(
    self,
    query: str,
    limit: int = 100,
    include_inactive: bool = False,
    model_filter: Optional[str] = None,
) -> list[Record]:
    """Perform full-text search on records."""
    with self.SessionLocal() as session:
        # Build FTS query - Fix: Use proper FTS5 syntax and avoid duplicate joins
        fts_query = text("""
            SELECT DISTINCT r.* FROM records r
            WHERE r.id IN (
                SELECT record_id FROM records_fts
                WHERE records_fts MATCH :query
            )
            AND (:include_inactive OR r.is_active = 1)
            AND (:model_filter IS NULL OR r.model = :model_filter)
            ORDER BY r.created_at DESC
            LIMIT :limit
        """)

        result = session.execute(
            fts_query,
            {
                "query": query,
                "include_inactive": include_inactive,
                "model_filter": model_filter,
                "limit": limit,
            },
        )

        records = []
        for row in result:
            # Convert row to SQLiteRecord manually
            db_record = SQLiteRecord()
            for i, col in enumerate(SQLiteRecord.__table__.columns):
                setattr(db_record, col.name, row[i])

            record = self._db_record_to_record(db_record, include_doc=False)
            records.append(record)

        return records
rendering:
  show_root_heading: true
  heading_level: 3

update_record(record: Record) -> None ¤

Update an existing record in the database.

Source code in lexos/corpus/sqlite/database.py
def update_record(self, record: Record) -> None:
    """Update an existing record in the database."""
    with self.SessionLocal() as session:
        existing = (
            session.query(SQLiteRecord)
            .filter(SQLiteRecord.id == str(record.id))
            .first()
        )
        if not existing:
            raise LexosException(
                f"Record with ID {record.id} not found in database"
            )

        # Update the existing record
        updated_record = self._record_to_db_record(record)
        for key, value in updated_record.__dict__.items():
            if (
                key != "_sa_instance_state" and key != "id"
            ):  # Skip SQLAlchemy metadata and primary key
                setattr(existing, key, value)

        session.commit()
rendering:
  show_root_heading: true
  heading_level: 3