Skip to content

Utils¤

This module contains helper functions used by multiple modules.

lexos.utils._decode_bytes(raw_bytes) ¤

Decode raw bytes from a user's file into a string.

Args raw_bytes (bytes, str): The bytes to be decoded to a python string.

Returns:

Type Description
str

The decoded string.

Source code in lexos\utils.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
def _decode_bytes(raw_bytes: Union[bytes, str]) -> str:
    """Decode raw bytes from a user's file into a string.

    Args
        raw_bytes (bytes, str): The bytes to be decoded to a python string.

    Returns:
        The decoded string.
    """
    if isinstance(raw_bytes, bytes):
        try:
            decoded_str = _try_decode_bytes_(raw_bytes)

        except (UnicodeDecodeError, TypeError):
            raise LexosException(
                "Chardet failed to detect encoding of your "
                "file. Please make sure your file is in "
                "utf-8 encoding."
            )
    else:
        decoded_str = raw_bytes

    # Normalize line breaks
    # "\r\n" -> "\n"
    if "\r\n" in decoded_str[: constants.MIN_NEWLINE_DETECT]:
        decoded_str = decoded_str.replace("\r", "")

    # "\r" -> "\n"
    if "\r" in decoded_str[: constants.MIN_NEWLINE_DETECT]:
        decoded_str = decoded_str.replace("\r", "\n")

    return decoded_str

lexos.utils._try_decode_bytes_(raw_bytes) ¤

Try to decode raw bytes (helper function for decode_bytes().

Parameters:

Name Type Description Default
raw_bytes bytes

The bytes you want to decode to string.

required

Returns:

Type Description
str

A decoded string.

Source code in lexos\utils.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def _try_decode_bytes_(raw_bytes: bytes) -> str:
    """Try to decode raw bytes (helper function for decode_bytes().

    Args:
        raw_bytes (bytes): The bytes you want to decode to string.

    Returns:
        A decoded string.
    """
    # Detect the encoding with only the first couple of bytes
    encoding_detect = chardet.detect(raw_bytes[: constants.MIN_ENCODING_DETECT])
    # Get the encoding
    encoding_type = encoding_detect["encoding"]
    if encoding_type is None:
        encoding_detect = chardet.detect(raw_bytes)
        encoding_type = encoding_detect["encoding"]

    try:
        # Try to decode the string using the original encoding
        decoded_string = raw_bytes.decode(encoding_type)

    except (UnicodeDecodeError, TypeError):
        # Try UnicodeDammit if chardet didn't work
        if encoding_type == "ascii":
            dammit = UnicodeDammit(
                raw_bytes, ["iso-8859-1", "iso-8859-15", "windows-1252"]
            )
        else:
            dammit = UnicodeDammit(raw_bytes)
        decoded_string = dammit.unicode_markup

    return decoded_string

lexos.utils.ensure_list(item) ¤

Ensure string is converted to a Path.

Parameters:

Name Type Description Default
item Any

Anything.

required

Returns:

Type Description
List

The item inside a list if it is not already a list.

Source code in lexos\utils.py
22
23
24
25
26
27
28
29
30
31
32
33
def ensure_list(item: Any) -> List:
    """Ensure string is converted to a Path.

    Args:
        item (Any): Anything.

    Returns:
        The item inside a list if it is not already a list.
    """
    if not isinstance(item, list):
        item = [item]
    return item

lexos.utils.ensure_path(path) ¤

Ensure string is converted to a Path.

Parameters:

Name Type Description Default
path Any

Anything. If string, it's converted to Path.

required

Returns:

Type Description
Any

Path or original argument.

Source code in lexos\utils.py
36
37
38
39
40
41
42
43
44
45
46
47
48
def ensure_path(path: Any) -> Any:
    """Ensure string is converted to a Path.

    Args:
        path (Any): Anything. If string, it's converted to Path.

    Returns:
        Path or original argument.
    """
    if isinstance(path, str):
        return Path(path.replace("\\", "/"))
    else:
        return path

lexos.utils.get_encoding(input_string) ¤

Use chardet to return the encoding type of a string.

Parameters:

Name Type Description Default
input_string bytes

A bytestring.

required

Returns:

Type Description
str

The string's encoding type.

Source code in lexos\utils.py
217
218
219
220
221
222
223
224
225
226
227
228
def get_encoding(input_string: bytes) -> str:
    """Use chardet to return the encoding type of a string.

    Args:
        input_string (bytes): A bytestring.

    Returns:
        The string's encoding type.
    """
    encoding_detect = chardet.detect(input_string[: constants.MIN_ENCODING_DETECT])
    encoding_type = encoding_detect["encoding"]
    return encoding_type

lexos.utils.get_github_raw_paths(path, user=None, repo=None, branch=None) ¤

Get raw paths to files in a GitHub directory.

Parameters:

Name Type Description Default
path Union[Path, str]

The path to the directory.

required
user Optional[str]

The user name of the GitHub repository.

None
repo Optional[str]

The repository name of the GitHub repository.

None
branch Optional[str]

The branch of the GitHub repository.

None

Returns:

Name Type Description
list list

A list of raw download paths.

Source code in lexos\utils.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def get_github_raw_paths(
    path: Union[Path, str],
    user: Optional[str] = None,
    repo: Optional[str] = None,
    branch: Optional[str] = None,
) -> list:
    """Get raw paths to files in a GitHub directory.

    Args:
        path (Union[Path, str]): The path to the directory.
        user (Optional[str]): The user name of the GitHub repository.
        repo (Optional[str]): The repository name of the GitHub repository.
        branch (Optional[str]): The branch of the GitHub repository.

    Returns:
        list: A list of raw download paths.
    """
    path = str(path)
    if not user or not repo or not branch:
        try:
            prefix, suffix = path.split("tree")
            prefix = prefix.split("/")
            prefix = [x for x in prefix if x != ""]
            user = prefix[-2]
            repo = prefix[-1]
            suffix = suffix.split("/")
            suffix = [x for x in suffix if x != ""]
            branch = suffix[0]
        except ValueError:
            sample = (
                "https://github.com/{user}/{repository}/tree/{branch}/{path_from_root}"
            )
            raise ValueError(f"Invalid GitHub path. Use the format {sample}.")
    relpath = path.split(f"tree/{branch}/")[1]
    api_path = f"https://api.github.com/repos/{user}/{repo}/contents/{relpath}"
    r = requests.get(api_path)
    return [path["download_url"] for path in r.json()]

lexos.utils.get_paths(path) ¤

Get a list paths in a directory.

Parameters:

Name Type Description Default
path Union[Path, str]

The path to the directory.

required

Returns:

Name Type Description
list list

A list of file paths.

Source code in lexos\utils.py
51
52
53
54
55
56
57
58
59
60
def get_paths(path: Union[Path, str]) -> list:
    """Get a list paths in a directory.

    Args:
        path (Union[Path, str]): The path to the directory.

    Returns:
        list: A list of file paths.
    """
    return list(Path(path).glob("**/*"))

lexos.utils.is_dir(filepath) ¤

Check if a path corresponds to a directory.

Source code in lexos\utils.py
102
103
104
def is_dir(filepath: Union[Path, str]) -> bool:
    """Check if a path corresponds to a directory."""
    return ensure_path(filepath).is_dir()

lexos.utils.is_github_dir(filepath) ¤

Check if a path corresponds to a directory on GitHub.

Source code in lexos\utils.py
107
108
109
110
111
112
def is_github_dir(filepath: Union[Path, str]) -> bool:
    """Check if a path corresponds to a directory on GitHub."""
    if "github.com" in str(filepath):
        if ensure_path(filepath).suffix == "":
            return True
    return False

lexos.utils.is_docx(filepath) ¤

Check if a file is a docx.

Source code in lexos\utils.py
115
116
117
def is_docx(filepath: Union[Path, str]) -> bool:
    """Check if a file is a docx."""
    return str(filepath).endswith(".docx")

lexos.utils.is_file(filepath) ¤

Check if a path corresponds to a file.

Source code in lexos\utils.py
120
121
122
def is_file(filepath: Union[Path, str]) -> bool:
    """Check if a path corresponds to a file."""
    return ensure_path(filepath).is_file()

lexos.utils.is_pdf(filepath) ¤

Check if a file is a pdf.

Source code in lexos\utils.py
125
126
127
def is_pdf(filepath: Union[Path, str]) -> bool:
    """Check if a file is a pdf."""
    return str(filepath).endswith(".pdf")

lexos.utils.is_url(s) ¤

Check if string is a URL.

Source code in lexos\utils.py
130
131
132
133
134
135
136
137
138
139
140
141
142
def is_url(s: Union[Path, str]) -> bool:
    """Check if string is a URL."""
    s = str(s)
    return bool(
        re.match(
            r"(https?|ftp)://"  # protocol
            r"(\w+(\-\w+)*\.)?"  # host (optional)
            r"((\w+(\-\w+)*)\.(\w+))"  # domain
            r"(\.\w+)*"  # top-level domain (optional, can have > 1)
            r"([\w\-\._\~/]*)*(?<!\.)",  # path, params, anchors, etc. (optional)
            s,
        )
    )

lexos.utils.normalize(raw_bytes) ¤

Normalise a string to LexosFile format.

Parameters:

Name Type Description Default
raw_bytes bytes

The input bytestring.

required

Returns:

Type Description
str

Normalised version of the input string.

Source code in lexos\utils.py
231
232
233
234
235
236
237
238
239
240
241
def normalize(raw_bytes: Union[bytes, str]) -> str:
    """Normalise a string to LexosFile format.

    Args:
        raw_bytes (bytes): The input bytestring.

    Returns:
        Normalised version of the input string.
    """
    s = _decode_bytes(raw_bytes)
    return s

lexos.utils.normalize_file(filepath, destination_dir='.') ¤

Normalise a file to LexosFile format and save the file.

Parameters:

Name Type Description Default
filepath Union[Path, str]

The path to the input file.

required
destination_dir Union[Path, str]

The path to the directory where the files. will be saved.

'.'
Source code in lexos\utils.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def normalize_file(
    filepath: Union[Path, str], destination_dir: Union[Path, str] = "."
) -> None:
    """Normalise a file to LexosFile format and save the file.

    Args:
        filepath (Union[Path, str]): The path to the input file.
        destination_dir (Union[Path, str]): The path to the directory where the files.
            will be saved.
    """
    # filepath = ensure_path(filepath)
    filepath = Path(filepath)
    destination_dir = ensure_path(destination_dir)
    with open(filepath, "rb") as f:
        doc = f.read()
    with open(destination_dir / Path(filepath.name), "w") as f:
        f.write(normalize(doc))

lexos.utils.normalize_files(filepaths, destination_dir='.') ¤

Normalise a list of files to LexosFile format and save the files.

Parameters:

Name Type Description Default
filepaths List[Union[Path, str]]

The list of paths to input files.

required
destination_dir Union[Path, str]

The path to the directory where the files. will be saved.

'.'
Source code in lexos\utils.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def normalize_files(
    filepaths: List[Union[Path, str]], destination_dir: Union[Path, str] = "."
) -> None:
    """Normalise a list of files to LexosFile format and save the files.

    Args:
        filepaths (List[Union[Path, str]]): The list of paths to input files.
        destination_dir (Union[Path, str]): The path to the directory where the files.
            will be saved.
    """
    for filepath in filepaths:
        filepath = ensure_path(filepath)
        with open(filepath, "rb") as f:
            doc = f.read()
        with open(destination_dir / filepath.name, "w") as f:
            f.write(normalize(doc))

lexos.utils.normalize_strings(strings) ¤

Normalise a list of strings to LexosFile format.

Parameters:

Name Type Description Default
strings List[Union[bytes, str]]

The list of input strings.

required

Returns:

Type Description
List[str]

A list of normalised versions of the input strings.

Source code in lexos\utils.py
244
245
246
247
248
249
250
251
252
253
254
255
256
def normalize_strings(strings: List[str]) -> List[str]:
    """Normalise a list of strings to LexosFile format.

    Args:
        strings (List[Union[bytes, str]]): The list of input strings.

    Returns:
        A list of normalised versions of the input strings.
    """
    normalized_strings = []
    for s in strings:
        normalized_strings.append(normalize(s))
    return normalized_strings

lexos.utils.to_collection(val, val_type, col_type) ¤

Validate and cast a value or values to a collection.

Parameters:

Name Type Description Default
val object

Value or values to validate and cast.

required
val_type type

Type of each value in collection, e.g. int or (str, bytes).

required
col_type type

Type of collection to return, e.g. tuple or set.

required

Returns:

Type Description
Collection[AnyVal]

Collection of type col_type with values all of type val_type.

Raises:

Type Description
TypeError

An invalid value was passed.

Source code in lexos\utils.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def to_collection(
    val: Union[AnyVal, Collection[AnyVal]],
    val_type: Union[Type[Any], Tuple[Type[Any], ...]],
    col_type: Type[Any],
) -> Collection[AnyVal]:
    """Validate and cast a value or values to a collection.

    Args:
        val (object): Value or values to validate and cast.
        val_type (type): Type of each value in collection, e.g. ``int`` or ``(str, bytes)``.
        col_type (type): Type of collection to return, e.g. ``tuple`` or ``set``.

    Returns:
        Collection of type ``col_type`` with values all of type ``val_type``.

    Raises:
        TypeError: An invalid value was passed.
    """
    if val is None:
        return []
    if isinstance(val, val_type):
        return col_type([val])
    elif isinstance(val, (tuple, list, set, frozenset)):
        if not all(isinstance(v, val_type) for v in val):
            raise TypeError(f"not all values are of type {val_type}")
        return col_type(val)
    else:
        # TODO: use standard error message, maybe?
        raise TypeError(
            f"values must be {val_type} or a collection thereof, not {type(val)}"
        )

lexos.utils.unzip_archive(archive_path, extract_dir) ¤

Extract a zip archive.

For adding a progress indicator, see https://stackoverflow.com/questions/4006970/monitor-zip-file-extraction-python.

Parameters:

Name Type Description Default
archive_path str

The path to the archive file to be unzipped.

required
extract_dir str

The path to folder where the archive will be extracted.

required
Source code in lexos\utils.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def unzip_archive(archive_path: str, extract_dir: str):
    """Extract a zip archive.

    For adding a progress indicator, see
    https://stackoverflow.com/questions/4006970/monitor-zip-file-extraction-python.

    Args:
        archive_path (str): The path to the archive file to be unzipped.
        extract_dir (str): The path to folder where the archive will be extracted.
    """
    zf = zipfile.ZipFile(archive_path, "r")
    progress = Progress()
    with progress:
        for file in progress.track(zf.infolist(), description="Processing..."):
            zf.extract(file, path=extract_dir)
            sleep(0.1)

lexos.utils.zip_folder(source_dir, archive_file) ¤

Zip a folder recursively with no extra root folder in the archive.

Works with a progress indicator.

Parameters:

Name Type Description Default
source_dir Path

The path to the source directory.

required
archive_file Path

The path to the archive file to be created (including file extension).

required
Source code in lexos\utils.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def zip_folder(source_dir: Path, archive_file: Path):
    """Zip a folder recursively with no extra root folder in the archive.

    Works with a progress indicator.

    Args:
        source_dir (Path): The path to the source directory.
        archive_file (Path): The path to the archive file to be created (including file extension).
    """
    progress = Progress()
    with zipfile.ZipFile(
        archive_file, mode="w", compression=zipfile.ZIP_DEFLATED, compresslevel=7
    ) as zip:
        files = list(source_dir.rglob("*"))
        with progress:
            for file in progress.track(files, description="Processing..."):
                relative_path = file.relative_to(source_dir)
                zip.write(file, arcname=relative_path)
                sleep(0.1)