Source code for Acquire.Client._fileops


__all__ = ["create_new_file", "compress", "uncompress"]


[docs]def compress(inputfile=None, outputfile=None, inputdata=None, compression_type="bz2"): """Compress either the passed filename or filedata using the specified compression type. This will compress either to the file called 'outputfile', or to a tmpfile. The name of the file will be returned. If this is compressing data, then it will return the compressed data """ import os as _os if compression_type == "bz2": import bz2 as _bz2 if inputfile is not None: IFILE = open(inputfile, "rb") # compress to a tmpfile and then move to outputfile later... import tempfile as _tempfile (fd, bz2file) = _tempfile.mkstemp(dir=".") _os.close(fd) try: OFILE = _bz2.BZ2File(bz2file, "wb", compresslevel=9) # compress data in MB blocks data = IFILE.read(1048576) while data: OFILE.write(data) data = IFILE.read(1048576) IFILE.close() OFILE.close() except Exception as e: print(e) # make sure we delete the temporary bz2file _os.unlink(bz2file) raise if outputfile is None: return bz2file try: # move the bz2file to the correct output name _os.rename(bz2file, outputfile) return outputfile except Exception as e: print(e) # we can't rename the file - just return the bz2 name return bz2file elif inputdata is not None: # compress the passed data and return return _bz2.compress(inputdata) else: raise ValueError("Unrecognised compression type '%s'" % compression_type)
[docs]def uncompress(inputfile=None, outputfile=None, inputdata=None, compression_type="bz2"): """Uncompress either the passed filename or filedata using the specified compression type. This will uncompress either to the file called 'outputfile', or to a tmpfile. The name of the file will be returned. If this is uncompressing data, then it will return the uncompressed data """ import os as _os if compression_type == "bz2": import bz2 as _bz2 if inputfile is not None: IFILE = _bz2.BZ2File(inputfile, "rb") # compress to a tmpfile and then move to outputfile later... import tempfile as _tempfile (fd, bz2file) = _tempfile.mkstemp(dir=".") _os.close(fd) try: OFILE = open(bz2file, "wb") # compress data in MB blocks data = IFILE.read(1048576) while data: OFILE.write(data) data = IFILE.read(1048576) IFILE.close() OFILE.close() except Exception as e: print(e) # make sure we delete the temporary bz2file _os.unlink(bz2file) raise if outputfile is None: return bz2file try: # move the bz2file to the correct output name _os.rename(bz2file, outputfile) return outputfile except Exception as e: print(e) # we can't rename the file - just return the bz2 name return bz2file elif inputdata is not None: # compress the passed data and return return _bz2.decompress(inputdata) else: raise ValueError("Unrecognised compression type '%s'" % compression_type)
[docs]def create_new_file(filename, dir=None): """Create a new file in directory 'dir' (default current directory) called 'filename'. If the file already exists, then create a new file with name derived from 'filename' """ import os as _os filename = _os.path.split(filename)[1] if filename.find(".") != -1: parts = filename.split(".") base = ".".join(parts[0:-1]) ext = parts[-1] if len(ext) == 0: ext = None else: base = filename ext = None if dir is not None: filebase = _os.path.join(dir, base) else: filebase = base if ext is None: filename = filebase else: filename = "%s.%s" % (filebase, ext) while True: idx = 1 while _os.path.exists(filename): filename = "%s_%s.%s" % (filebase, idx, ext) idx += 1 try: # open in "x" move should prevent race condition FILE = open(filename, "x") FILE.close() return _os.path.realpath(filename) except: pass