Skip to content

Filesplit¤

The Filesplit class allows the user to cut binary files into numbered file segments with the format filename_1.txt, filename_2.txt, etc. It would typically be used as a first step in a workflow if a large file needs to be divided into many smaller files.

The class is a fork of Ram Prakash Jayapalan's filesplit module with a few minor tweaks. The most important is that the split function takes a sep argument to allow the user to specify the separator between the filename and number in each generated file.

lexos.cutter.filesplit.Filesplit ¤

Filesplit class.

Source code in lexos\cutter\filesplit.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
class Filesplit:
    """Filesplit class."""

    def __init__(self) -> None:
        """Constructor. """
        self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
        self.man_filename = "fs_manifest.csv"
        self._buffer_size = 1000000  # 1 MB

    def __process_split(
        self,
        fi: IO,
        fo: IO,
        split_size: int,
        carry_over: Optional[str],
        newline: bool = False,
        output_encoding: str = None,
        include_header: bool = False,
        header: str = None,
    ) -> Tuple:
        """Split. the incoming stream.

        Args:
            fi (IO): Input-file like object that implements read() and readline()
        method.
            fo (IO): File-like object that implements write() method.
            split_size (int): File split size in bytes.
            newline (bool): When True, splits at newline on top of bytes.
            output_encoding (str): Split file encoding.
            include_header (bool): When True, first line is treated as
                header and each split receives the header. This flag is
                dependant on newline flag to be set to True as well.
            carry_over (str): Any carry over bytes to the next file.
            header (str): Header from the file if any.
        Returns:
            tuple: carry_over, output_size, header
        """
        buffer_size = (
            split_size if split_size < self._buffer_size else self._buffer_size
        )
        buffer = 0
        if not newline:
            while True:
                if carry_over:
                    fo.write(carry_over)
                    buffer += (
                        len(carry_over)
                        if not output_encoding
                        else len(carry_over.encode(output_encoding))
                    )
                    carry_over = None
                    continue
                chunk = fi.read(buffer_size)
                if not chunk:
                    break
                chunk_size = (
                    len(chunk)
                    if not output_encoding
                    else len(chunk.encode(output_encoding))
                )
                if buffer + chunk_size <= split_size:
                    fo.write(chunk)
                    buffer += chunk_size
                else:
                    carry_over = chunk
                    break
            # Set the carry_over to None if there is no carry_over available
            if not carry_over:
                carry_over = None
            return carry_over, buffer, None
        else:
            if carry_over:
                if header:
                    fo.write(header)
                fo.write(carry_over)
                if header:
                    buffer += (
                        len(carry_over) + len(header)
                        if not output_encoding
                        else len(carry_over.encode(output_encoding))
                        + len(header.encode(output_encoding))
                    )
                else:
                    buffer += (
                        len(carry_over)
                        if not output_encoding
                        else len(carry_over.encode(output_encoding))
                    )
                carry_over = None
            for line in fi:
                if include_header and not header:
                    header = line
                line_size = (
                    len(line)
                    if not output_encoding
                    else len(line.encode(output_encoding))
                )
                if buffer + line_size <= split_size:
                    fo.write(line)
                    buffer += line_size
                else:
                    carry_over = line
                    break
            # Set the carry_over to None if there is no carry_over available
            if not carry_over:
                carry_over = None
            return carry_over, buffer, header

    def split(
        self,
        file: str,
        split_size: int,
        sep: str = "_",
        output_dir: str = ".",
        callback: Callable = None,
        **kwargs,
    ) -> None:
        """Splits the file into chunks based on the newline char in the file.

        By default uses binary mode.

        Args:
            file (str): Path to the source file.
            split_size (int): File split size in bytes.
            sep (str): Separator to be used in the file name.
            output_dir (str): Output dir to write the split files.
            callback (Callable): Callback function [func (str, long,
                long)] that accepts three arguments - full file
                path to the destination, size of the file in bytes
                and line count.
        """
        start_time = time.time()
        self.log.info("Starting file split process...")

        newline = kwargs.get("newline", False)
        include_header = kwargs.get("include_header", False)
        # If include_header is provided, default newline flag to True
        # as this should apply only to structured file.
        if include_header:
            newline = True
        encoding = kwargs.get("encoding", None)
        split_file_encoding = kwargs.get("split_file_encoding", None)

        f = ntpath.split(file)[1]
        filename, ext = os.path.splitext(f)
        fi, man = None, None

        # Split file encoding cannot be specified without specifying
        # encoding which is required to read the file in text mode.
        if split_file_encoding and not encoding:
            raise ValueError(
                "`encoding` needs to be specified "
                "when providing `split_file_encoding`."
            )
        try:
            # Determine the splits based off bytes when newline is set to False.
            # If newline is True, split only at newline considering the bytes
            # as well.
            if encoding and not split_file_encoding:
                fi = open(file=file, mode="r", encoding=encoding)
            elif encoding and split_file_encoding:
                fi = open(file=file, mode="r", encoding=encoding)
            else:
                fi = open(file=file, mode="rb")
            # Create file handler for the manifest file
            man_file = os.path.join(output_dir, self.man_filename)
            man = open(file=man_file, mode="w+", encoding="utf-8")
            # Create man file csv dict writer object
            man_writer = csv.DictWriter(
                f=man, fieldnames=["filename", "filesize", "encoding", "header"]
            )
            # Write man file header
            man_writer.writeheader()

            split_counter, carry_over, header = 1, "", None

            while carry_over is not None:
                split_file = os.path.join(
                    output_dir, f"{filename}{sep}{split_counter}{ext}"
                )
                fo = None
                try:
                    if encoding and not split_file_encoding:
                        fo = open(file=split_file, mode="w+", encoding=encoding)
                    elif encoding and split_file_encoding:
                        fo = open(
                            file=split_file, mode="w+", encoding=split_file_encoding
                        )
                    else:
                        fo = open(file=split_file, mode="wb+")
                    carry_over, output_size, header = self.__process_split(
                        fi=fi,
                        fo=fo,
                        split_size=split_size,
                        newline=newline,
                        output_encoding=split_file_encoding,
                        carry_over=carry_over,
                        include_header=include_header,
                        header=header,
                    )
                    if callback:
                        callback(split_file, output_size)
                    # Write to manifest file
                    di = {
                        "filename": ntpath.split(split_file)[1],
                        "filesize": output_size,
                        "encoding": encoding,
                        "header": True if header else None,
                    }
                    man_writer.writerow(di)

                    split_counter += 1
                finally:
                    if fo:
                        fo.close()
        finally:
            if fi:
                fi.close()
            if man:
                man.close()

        run_time = round((time.time() - start_time) / 60)

        self.log.info(f"Process complete.")
        self.log.info(f"Run time(m): {run_time}")

    def merge(
        self,
        input_dir: str,
        sep: str = "_",
        output_file: str = None,
        manifest_file: str = None,
        callback: Callable = None,
        cleanup: bool = False,
    ) -> None:
        """Merge the split files based off manifest file.

        Args:
            input_dir (str): Directory containing the split files and
                manifest file
            sep (str): Separator used in the file names.
            output_file (str): Final merged output file path. If not
                provided, the final merged filename is derived from
                the split filename and placed in the same input dir.
            manifest_file (str): Path to the manifest file. If not provided,
                the process will look for the file within the input_dir.
            callback (Callable): Callback function [func (str, long)]
                that accepts 2 arguments - path to destination,
                size of the file in bytes.
            cleanup (bool): If True, all the split files and the manifest file
                will be deleted after the merge, leaving behind the merged file.
        Raises:
            FileNotFoundError: If missing manifest and split files.
            NotADirectoryError: If input path is not a directory.
        """
        start_time = time.time()
        self.log.info("Starting file merge process...")

        if not os.path.isdir(input_dir):
            raise NotADirectoryError("Input directory is not a valid directory.")

        manifest_file = (
            os.path.join(input_dir, self.man_filename)
            if not manifest_file
            else manifest_file
        )
        if not os.path.exists(manifest_file):
            raise FileNotFoundError("Unable to locate manifest file.")

        fo = None
        clear_output_file = True
        header_set = False

        try:
            # Read from manifest every split and merge to single file
            with open(file=manifest_file, mode="r", encoding="utf-8") as man_fh:
                man_reader = csv.DictReader(f=man_fh)
                for line in man_reader:
                    encoding = line.get("encoding", None)
                    header_avail = line.get("header", None)
                    # Derive output filename from split file if output file
                    # not provided
                    if not output_file:
                        f, ext = ntpath.splitext(line.get("filename"))
                        output_filename = "".join([f.rsplit({sep}, 1)[0], ext])
                        output_file = os.path.join(input_dir, output_filename)
                    # Clear output file present before merging. This should
                    # happen only once during beginning of merge
                    if clear_output_file:
                        if os.path.exists(output_file):
                            os.remove(output_file)
                        clear_output_file = False
                    # Create write file handle based on the encoding from
                    # man file
                    if not fo:
                        if encoding:
                            fo = open(file=output_file, mode="a", encoding=encoding)
                        else:
                            fo = open(file=output_file, mode="ab")
                    # Open the split file in read more and write contents to the
                    # output file
                    try:
                        input_file = os.path.join(input_dir, line.get("filename"))
                        if encoding:
                            fi = open(file=input_file, mode="r", encoding=encoding)
                        else:
                            fi = open(file=input_file, mode="rb")
                        # Skip header if the flag is set to True
                        if header_set:
                            next(fi)
                        for line in fi:
                            if header_avail and not header_set:
                                header_set = True
                            fo.write(line)
                    finally:
                        if fi:
                            fi.close()
        finally:
            if fo:
                fo.close()

        # Clean up files if required
        if cleanup:
            # Clean up split files
            with open(file=manifest_file, mode="r", encoding="utf-8") as man_fh:
                man_reader = csv.DictReader(f=man_fh)
                for line in man_reader:
                    f = os.path.join(input_dir, line.get("filename"))
                    if os.path.exists(f):
                        os.remove(f)
            # Clean up man file
            if os.path.exists(manifest_file):
                os.remove(manifest_file)

        # Call the callback function with path and file size
        if callback:
            callback(output_file, os.stat(output_file).st_size)

        run_time = round((time.time() - start_time) / 60)

        self.log.info(f"Process complete.")
        self.log.info(f"Run time(m): {run_time}")

__init__() ¤

Constructor.

Source code in lexos\cutter\filesplit.py
22
23
24
25
26
def __init__(self) -> None:
    """Constructor. """
    self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
    self.man_filename = "fs_manifest.csv"
    self._buffer_size = 1000000  # 1 MB

__process_split(fi, fo, split_size, carry_over, newline=False, output_encoding=None, include_header=False, header=None) ¤

Split. the incoming stream.

Parameters:

Name Type Description Default
fi IO

Input-file like object that implements read() and readline()

required

method. fo (IO): File-like object that implements write() method. split_size (int): File split size in bytes. newline (bool): When True, splits at newline on top of bytes. output_encoding (str): Split file encoding. include_header (bool): When True, first line is treated as header and each split receives the header. This flag is dependant on newline flag to be set to True as well. carry_over (str): Any carry over bytes to the next file. header (str): Header from the file if any.

Returns:

Name Type Description
tuple Tuple

carry_over, output_size, header

Source code in lexos\cutter\filesplit.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def __process_split(
    self,
    fi: IO,
    fo: IO,
    split_size: int,
    carry_over: Optional[str],
    newline: bool = False,
    output_encoding: str = None,
    include_header: bool = False,
    header: str = None,
) -> Tuple:
    """Split. the incoming stream.

    Args:
        fi (IO): Input-file like object that implements read() and readline()
    method.
        fo (IO): File-like object that implements write() method.
        split_size (int): File split size in bytes.
        newline (bool): When True, splits at newline on top of bytes.
        output_encoding (str): Split file encoding.
        include_header (bool): When True, first line is treated as
            header and each split receives the header. This flag is
            dependant on newline flag to be set to True as well.
        carry_over (str): Any carry over bytes to the next file.
        header (str): Header from the file if any.
    Returns:
        tuple: carry_over, output_size, header
    """
    buffer_size = (
        split_size if split_size < self._buffer_size else self._buffer_size
    )
    buffer = 0
    if not newline:
        while True:
            if carry_over:
                fo.write(carry_over)
                buffer += (
                    len(carry_over)
                    if not output_encoding
                    else len(carry_over.encode(output_encoding))
                )
                carry_over = None
                continue
            chunk = fi.read(buffer_size)
            if not chunk:
                break
            chunk_size = (
                len(chunk)
                if not output_encoding
                else len(chunk.encode(output_encoding))
            )
            if buffer + chunk_size <= split_size:
                fo.write(chunk)
                buffer += chunk_size
            else:
                carry_over = chunk
                break
        # Set the carry_over to None if there is no carry_over available
        if not carry_over:
            carry_over = None
        return carry_over, buffer, None
    else:
        if carry_over:
            if header:
                fo.write(header)
            fo.write(carry_over)
            if header:
                buffer += (
                    len(carry_over) + len(header)
                    if not output_encoding
                    else len(carry_over.encode(output_encoding))
                    + len(header.encode(output_encoding))
                )
            else:
                buffer += (
                    len(carry_over)
                    if not output_encoding
                    else len(carry_over.encode(output_encoding))
                )
            carry_over = None
        for line in fi:
            if include_header and not header:
                header = line
            line_size = (
                len(line)
                if not output_encoding
                else len(line.encode(output_encoding))
            )
            if buffer + line_size <= split_size:
                fo.write(line)
                buffer += line_size
            else:
                carry_over = line
                break
        # Set the carry_over to None if there is no carry_over available
        if not carry_over:
            carry_over = None
        return carry_over, buffer, header

merge(input_dir, sep='_', output_file=None, manifest_file=None, callback=None, cleanup=False) ¤

Merge the split files based off manifest file.

Parameters:

Name Type Description Default
input_dir str

Directory containing the split files and manifest file

required
sep str

Separator used in the file names.

'_'
output_file str

Final merged output file path. If not provided, the final merged filename is derived from the split filename and placed in the same input dir.

None
manifest_file str

Path to the manifest file. If not provided, the process will look for the file within the input_dir.

None
callback Callable

Callback function [func (str, long)] that accepts 2 arguments - path to destination, size of the file in bytes.

None
cleanup bool

If True, all the split files and the manifest file will be deleted after the merge, leaving behind the merged file.

False

Raises:

Type Description
FileNotFoundError

If missing manifest and split files.

NotADirectoryError

If input path is not a directory.

Source code in lexos\cutter\filesplit.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
def merge(
    self,
    input_dir: str,
    sep: str = "_",
    output_file: str = None,
    manifest_file: str = None,
    callback: Callable = None,
    cleanup: bool = False,
) -> None:
    """Merge the split files based off manifest file.

    Args:
        input_dir (str): Directory containing the split files and
            manifest file
        sep (str): Separator used in the file names.
        output_file (str): Final merged output file path. If not
            provided, the final merged filename is derived from
            the split filename and placed in the same input dir.
        manifest_file (str): Path to the manifest file. If not provided,
            the process will look for the file within the input_dir.
        callback (Callable): Callback function [func (str, long)]
            that accepts 2 arguments - path to destination,
            size of the file in bytes.
        cleanup (bool): If True, all the split files and the manifest file
            will be deleted after the merge, leaving behind the merged file.
    Raises:
        FileNotFoundError: If missing manifest and split files.
        NotADirectoryError: If input path is not a directory.
    """
    start_time = time.time()
    self.log.info("Starting file merge process...")

    if not os.path.isdir(input_dir):
        raise NotADirectoryError("Input directory is not a valid directory.")

    manifest_file = (
        os.path.join(input_dir, self.man_filename)
        if not manifest_file
        else manifest_file
    )
    if not os.path.exists(manifest_file):
        raise FileNotFoundError("Unable to locate manifest file.")

    fo = None
    clear_output_file = True
    header_set = False

    try:
        # Read from manifest every split and merge to single file
        with open(file=manifest_file, mode="r", encoding="utf-8") as man_fh:
            man_reader = csv.DictReader(f=man_fh)
            for line in man_reader:
                encoding = line.get("encoding", None)
                header_avail = line.get("header", None)
                # Derive output filename from split file if output file
                # not provided
                if not output_file:
                    f, ext = ntpath.splitext(line.get("filename"))
                    output_filename = "".join([f.rsplit({sep}, 1)[0], ext])
                    output_file = os.path.join(input_dir, output_filename)
                # Clear output file present before merging. This should
                # happen only once during beginning of merge
                if clear_output_file:
                    if os.path.exists(output_file):
                        os.remove(output_file)
                    clear_output_file = False
                # Create write file handle based on the encoding from
                # man file
                if not fo:
                    if encoding:
                        fo = open(file=output_file, mode="a", encoding=encoding)
                    else:
                        fo = open(file=output_file, mode="ab")
                # Open the split file in read more and write contents to the
                # output file
                try:
                    input_file = os.path.join(input_dir, line.get("filename"))
                    if encoding:
                        fi = open(file=input_file, mode="r", encoding=encoding)
                    else:
                        fi = open(file=input_file, mode="rb")
                    # Skip header if the flag is set to True
                    if header_set:
                        next(fi)
                    for line in fi:
                        if header_avail and not header_set:
                            header_set = True
                        fo.write(line)
                finally:
                    if fi:
                        fi.close()
    finally:
        if fo:
            fo.close()

    # Clean up files if required
    if cleanup:
        # Clean up split files
        with open(file=manifest_file, mode="r", encoding="utf-8") as man_fh:
            man_reader = csv.DictReader(f=man_fh)
            for line in man_reader:
                f = os.path.join(input_dir, line.get("filename"))
                if os.path.exists(f):
                    os.remove(f)
        # Clean up man file
        if os.path.exists(manifest_file):
            os.remove(manifest_file)

    # Call the callback function with path and file size
    if callback:
        callback(output_file, os.stat(output_file).st_size)

    run_time = round((time.time() - start_time) / 60)

    self.log.info(f"Process complete.")
    self.log.info(f"Run time(m): {run_time}")

split(file, split_size, sep='_', output_dir='.', callback=None, **kwargs) ¤

Splits the file into chunks based on the newline char in the file.

By default uses binary mode.

Parameters:

Name Type Description Default
file str

Path to the source file.

required
split_size int

File split size in bytes.

required
sep str

Separator to be used in the file name.

'_'
output_dir str

Output dir to write the split files.

'.'
callback Callable

Callback function [func (str, long, long)] that accepts three arguments - full file path to the destination, size of the file in bytes and line count.

None
Source code in lexos\cutter\filesplit.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def split(
    self,
    file: str,
    split_size: int,
    sep: str = "_",
    output_dir: str = ".",
    callback: Callable = None,
    **kwargs,
) -> None:
    """Splits the file into chunks based on the newline char in the file.

    By default uses binary mode.

    Args:
        file (str): Path to the source file.
        split_size (int): File split size in bytes.
        sep (str): Separator to be used in the file name.
        output_dir (str): Output dir to write the split files.
        callback (Callable): Callback function [func (str, long,
            long)] that accepts three arguments - full file
            path to the destination, size of the file in bytes
            and line count.
    """
    start_time = time.time()
    self.log.info("Starting file split process...")

    newline = kwargs.get("newline", False)
    include_header = kwargs.get("include_header", False)
    # If include_header is provided, default newline flag to True
    # as this should apply only to structured file.
    if include_header:
        newline = True
    encoding = kwargs.get("encoding", None)
    split_file_encoding = kwargs.get("split_file_encoding", None)

    f = ntpath.split(file)[1]
    filename, ext = os.path.splitext(f)
    fi, man = None, None

    # Split file encoding cannot be specified without specifying
    # encoding which is required to read the file in text mode.
    if split_file_encoding and not encoding:
        raise ValueError(
            "`encoding` needs to be specified "
            "when providing `split_file_encoding`."
        )
    try:
        # Determine the splits based off bytes when newline is set to False.
        # If newline is True, split only at newline considering the bytes
        # as well.
        if encoding and not split_file_encoding:
            fi = open(file=file, mode="r", encoding=encoding)
        elif encoding and split_file_encoding:
            fi = open(file=file, mode="r", encoding=encoding)
        else:
            fi = open(file=file, mode="rb")
        # Create file handler for the manifest file
        man_file = os.path.join(output_dir, self.man_filename)
        man = open(file=man_file, mode="w+", encoding="utf-8")
        # Create man file csv dict writer object
        man_writer = csv.DictWriter(
            f=man, fieldnames=["filename", "filesize", "encoding", "header"]
        )
        # Write man file header
        man_writer.writeheader()

        split_counter, carry_over, header = 1, "", None

        while carry_over is not None:
            split_file = os.path.join(
                output_dir, f"{filename}{sep}{split_counter}{ext}"
            )
            fo = None
            try:
                if encoding and not split_file_encoding:
                    fo = open(file=split_file, mode="w+", encoding=encoding)
                elif encoding and split_file_encoding:
                    fo = open(
                        file=split_file, mode="w+", encoding=split_file_encoding
                    )
                else:
                    fo = open(file=split_file, mode="wb+")
                carry_over, output_size, header = self.__process_split(
                    fi=fi,
                    fo=fo,
                    split_size=split_size,
                    newline=newline,
                    output_encoding=split_file_encoding,
                    carry_over=carry_over,
                    include_header=include_header,
                    header=header,
                )
                if callback:
                    callback(split_file, output_size)
                # Write to manifest file
                di = {
                    "filename": ntpath.split(split_file)[1],
                    "filesize": output_size,
                    "encoding": encoding,
                    "header": True if header else None,
                }
                man_writer.writerow(di)

                split_counter += 1
            finally:
                if fo:
                    fo.close()
    finally:
        if fi:
            fi.close()
        if man:
            man.close()

    run_time = round((time.time() - start_time) / 60)

    self.log.info(f"Process complete.")
    self.log.info(f"Run time(m): {run_time}")