|
9 | 9 | import contextlib
|
10 | 10 | import os
|
11 | 11 | import gzip
|
| 12 | +import bz2 |
| 13 | +import zstd |
12 | 14 | import io
|
13 | 15 | import tempfile
|
14 | 16 | import subprocess
|
@@ -328,40 +330,50 @@ def touch_p(path, times=None):
|
328 | 330 | touch(path, times=times)
|
329 | 331 |
|
330 | 332 |
|
331 |
| -def open_or_gzopen(fname, *opts, **kwargs): |
332 |
| - mode = 'r' |
333 |
| - open_opts = list(opts) |
| 333 | +@contextlib.contextmanager |
| 334 | +def zstd_open(fname, mode='r'): |
| 335 | + '''Handle both text and byte decompression of the file.''' |
| 336 | + if 'r' in mode: |
| 337 | + with open(fname, 'rb') as fh: |
| 338 | + dctx = zstd.ZstdDecompressor() |
| 339 | + stream_reader = dctx.stream_reader(fh) |
| 340 | + if 'b' not in mode: |
| 341 | + text_stream = io.TextIOWrapper(stream_reader, encoding='utf-8') |
| 342 | + yield text_stream |
| 343 | + return |
| 344 | + yield stream_reader |
| 345 | + else: |
| 346 | + with open(fname, 'wb') as fh: |
| 347 | + cctx = zstd.ZstdCompressor(level=kwargs.get('level', 10), |
| 348 | + threads=kwargs.get('threads', 1)) |
| 349 | + stream_writer = cctx.stream_writer(fh) |
| 350 | + if 'b' not in mode: |
| 351 | + text_stream = io.TextIOWrapper(stream_reader, encoding='utf-8') |
| 352 | + yield text_stream |
| 353 | + return |
| 354 | + yield stream_writer |
| 355 | + |
| 356 | +def open_or_gzopen(fname, mode='r', **kwargs): |
334 | 357 | assert type(mode) == str, "open mode must be of type str"
|
335 | 358 |
|
336 | 359 | # 'U' mode is deprecated in py3 and may be unsupported in future versions,
|
337 | 360 | # so use newline=None when 'U' is specified
|
338 |
| - if len(open_opts) > 0: |
339 |
| - mode = open_opts[0] |
340 |
| - if sys.version_info[0] == 3: |
341 |
| - if 'U' in mode: |
342 |
| - if 'newline' not in kwargs: |
343 |
| - kwargs['newline'] = None |
344 |
| - open_opts[0] = mode.replace("U","") |
345 |
| - |
346 |
| - # if this is a gzip file |
| 361 | + if 'U' in mode: |
| 362 | + if 'newline' not in kwargs: |
| 363 | + kwargs['newline'] = None |
| 364 | + mode = mode.replace("U","") |
| 365 | + |
347 | 366 | if fname.endswith('.gz'):
|
348 |
| - # if text read mode is desired (by spec or default) |
349 |
| - if ('b' not in mode) and (len(open_opts)==0 or 'r' in mode): |
350 |
| - # if python 2 |
351 |
| - if sys.version_info[0] == 2: |
352 |
| - # gzip.open() under py2 does not support universal newlines |
353 |
| - # so we need to wrap it with something that does |
354 |
| - # By ignoring errors in BufferedReader, errors should be handled by TextIoWrapper |
355 |
| - return io.TextIOWrapper(io.BufferedReader(gzip.open(fname))) |
356 |
| - |
357 |
| - # if 't' for text mode is not explicitly included, |
358 |
| - # replace "U" with "t" since under gzip "rb" is the |
359 |
| - # default and "U" depends on "rt" |
360 |
| - gz_mode = str(mode).replace("U","" if "t" in mode else "t") |
361 |
| - gz_opts = [gz_mode]+list(opts)[1:] |
362 |
| - return gzip.open(fname, *gz_opts, **kwargs) |
| 367 | + # Allow using 'level' kwarg as an alias for gzip files. |
| 368 | + if 'level' in kwargs: |
| 369 | + kwargs['compresslevel'] = kwargs.pop('level') |
| 370 | + return gzip.open(fname, mode=mode, **kwargs) |
| 371 | + elif fname.endswith('.bz2'): |
| 372 | + return bz2.open(fname, mode=mode, **kwargs) |
| 373 | + elif fname.endswith('.zst'): |
| 374 | + return zstd_open(fname, mode=mode, **kwargs) |
363 | 375 | else:
|
364 |
| - return open(fname, *open_opts, **kwargs) |
| 376 | + return open(fname, mode=mode, **kwargs) |
365 | 377 |
|
366 | 378 |
|
367 | 379 | def read_tabfile_dict(inFile, header_prefix="#", skip_prefix=None, rowcount_limit=None):
|
@@ -986,8 +998,8 @@ def choose_compressor(filepath, threads=8):
|
986 | 998 | return_obj["compress_cmd"] = compressor + ["-c"]
|
987 | 999 | elif re.search(r'\.?zst$', filepath):
|
988 | 1000 | compressor = ['zstd']
|
989 |
| - return_obj["decompress_cmd"] = compressor + ["-d"] |
990 |
| - return_obj["compress_cmd"] = compressor + ["-19"] |
| 1001 | + return_obj["decompress_cmd"] = compressor + ["-dc"] |
| 1002 | + return_obj["compress_cmd"] = compressor + ["-c19"] |
991 | 1003 | elif re.search(r'\.?tar$', filepath):
|
992 | 1004 | compressor = ['cat']
|
993 | 1005 | return_obj["decompress_cmd"] = compressor
|
@@ -1031,7 +1043,7 @@ def read(self, size):
|
1031 | 1043 | compressor = choose_compressor(pipe_hint_out)["compress_cmd"]
|
1032 | 1044 | outfile = None
|
1033 | 1045 | else:
|
1034 |
| - compressor =choose_compressor(out_compressed_tarball)["compress_cmd"] |
| 1046 | + compressor = choose_compressor(out_compressed_tarball)["compress_cmd"] |
1035 | 1047 | outfile = open(out_compressed_tarball, "w")
|
1036 | 1048 |
|
1037 | 1049 | out_compress_ps = subprocess.Popen(compressor, stdout=sys.stdout if out_compressed_tarball == "-" else outfile, stdin=subprocess.PIPE)
|
|
0 commit comments