Source code for Acquire.Client._filehandle


__all__ = ["FileHandle"]


_magic_dict = {
    b"\x1f\x8b\x08": "gz",
    b"\x42\x5a\x68": "bz2",
    b"\x50\x4b\x03\x04": "zip"
    }


_max_magic_len = max(len(x) for x in _magic_dict)


def _should_compress(filename, filesize):
    """Return whether or not the passed file is worth compressing.
       It is not worth compressing very small files (<128 bytes) or
       already-compressed files
    """
    if filesize < 128:
        return False

    with open(filename, "rb") as FILE:
        file_start = FILE.read(_max_magic_len)

    for magic in _magic_dict.keys():
        if file_start.startswith(magic):
            return False

    return True


def _bz2compress(inputfile, outputfile=None):
    """Compress 'inputfile', writing the output to 'outputfile'
       If 'outputfile' is None, then this will create a new filename
       in the current directory for the file. This returns
       the filename for the compressed file
    """
    from Acquire.Client import compress as _compress
    return _compress(inputfile=inputfile, outputfile=outputfile,
                     compression_type="bz2")


[docs]class FileHandle: """This class holds all of the information about a file that is held in a Drive, including its size and checksum, and information about previous versions. It provides a handle that you can use to download or delete the file, to upload new versions, or to move the data between hot and cold storage or pay for extended storage """ def __init__(self, filename=None, remote_filename=None, aclrules=None, drive_uid=None, compress=True, local_cutoff=None): """Construct a handle for the local file 'filename'. This will create the initial version of the file that can be uploaded to the storage service. If the file is less than 'local_cutoff' bytes then the file will be held directly in this handle. By default local_cutoff is 1 MB """ self._local_filename = None self._local_filedata = None self._compression = None self._compressed_filename = None self._drive_uid = drive_uid self._aclrules = None if filename is not None: if local_cutoff is None: local_cutoff = 1048576 else: local_cutoff = int(local_cutoff) if aclrules is None: # will be automatically set to 'inherit' on the service self._aclrules = None else: from Acquire.Identity import ACLRules as _ACLRules self._aclrules = _ACLRules.create(rule=aclrules) from Acquire.Access import get_filesize_and_checksum \ as _get_filesize_and_checksum import os as _os (filesize, cksum) = _get_filesize_and_checksum(filename=filename) if compress and _should_compress(filename=filename, filesize=filesize): import bz2 as _bz2 if filesize < local_cutoff: # this is not big, so better to compress in memory from Acquire.Access import get_size_and_checksum \ as _get_size_and_checksum data = open(filename, "rb").read() data = _bz2.compress(data) (filesize, cksum) = _get_size_and_checksum(data=data) self._local_filedata = data self._compression = "bz2" else: # this is a bigger file, so compress on disk try: self._compressed_filename = _bz2compress( inputfile=filename) except: pass if self._compressed_filename is not None: self._compression = "bz2" (filesize, cksum) = _get_filesize_and_checksum( filename=self._compressed_filename) elif filesize < local_cutoff: # this is small enough to hold in memory self._local_filedata = open(filename, "rb").read() if self._compressed_filename is None: self._local_filename = filename self._filesize = filesize self._checksum = cksum if remote_filename is None: self._filename = _os.path.split(filename)[1] else: self._filename = _os.path.split(remote_filename)[1] else: self._filename = None def __del__(self): """Ensure we delete the temporary file before being destroyed""" if self._compressed_filename is not None: import os as _os _os.unlink(self._compressed_filename) self._compressed_filename = None def __str__(self): """Return a string representation of the file""" if self.is_null(): return "FileHandle::null" return "FileHandle(filename='%s')" % self.filename()
[docs] def is_null(self): """Return whether or not this this null""" return self._filename is None
[docs] def is_compressed(self): """Return whether or not the file is compressed on transport""" return self._compression is not None
[docs] def compression_type(self): """Return a string describing the compression scheme used by the filehandle when transporting the file, or None if the data is not compressed """ return self._compression
[docs] def is_localdata(self): """Return whether or not this file is so small that the data is held in memory """ return self._local_filedata is not None
[docs] def local_filedata(self, uncompress=False): """Return the filedata for this file, assuming it is sufficiently small to be read in this way. Returns 'None' if not... If 'uncompress' is true, then uncompress the data (if it is compressed) before returning """ if uncompress and self.is_compressed(): if self._local_filedata is not None: import bz2 as _bz2 return _bz2.decompress(self._local_filedata) else: return None else: return self._local_filedata
[docs] def local_filename(self): """Return the local filename for this file""" if self.is_localdata(): return None elif self.is_compressed(): return self._compressed_filename else: return self._local_filename
[docs] def drive_uid(self): """Return the UID of the drive on which this file is located""" return self._drive_uid
[docs] def aclrules(self): """Return the ACL rules for this file""" return self._aclrules
[docs] def filename(self): """Return the remote (object store) filename for this file""" return self._filename
[docs] def filesize(self): """Return the size (in bytes) of this file""" if self.is_null(): return 0 else: return self._filesize
[docs] def checksum(self): """Return the checksum of the contents of this file""" if self.is_null(): return None else: return self._checksum
[docs] def fingerprint(self): """Return a fingerprint for this file""" return "%s:%s:%s" % (self.filename(), self.filesize(), self.checksum())
[docs] def to_data(self): """Return a json-serialisable dictionary for this object. Note that this does not contain any information about the local file itself - just the name it should be called on the object store and the size, checksum and acl. If the file (or compressed file) is sufficiently small then this will also contain the packed version of that file data """ data = {} if not self.is_null(): from Acquire.ObjectStore import datetime_to_string \ as _datetime_to_string data["filename"] = self.filename() data["filesize"] = self.filesize() data["checksum"] = self.checksum() if self._aclrules is not None: data["aclrules"] = self._aclrules.to_data() data["drive_uid"] = self.drive_uid() if self._local_filedata is not None: from Acquire.ObjectStore import bytes_to_string \ as _bytes_to_string data["filedata"] = _bytes_to_string(self._local_filedata) if self._compression is not None: data["compression"] = self._compression return data
[docs] @staticmethod def from_data(data): """Return an object created from the passed json-deserialised dictionary. Note that this does not contain any information about the local file itself - just the name it should be called on the object store and the size, checksum and acl. If the file (or compressed file) is sufficiently small then this will also contain the packed version of that file data """ f = FileHandle() if data is not None and len(data) > 0: from Acquire.Storage import ACLRule as _ACLRule f._filename = data["filename"] f._filesize = int(data["filesize"]) f._checksum = data["checksum"] f._drive_uid = data["drive_uid"] if "compression" in data: f._compression = data["compression"] if "aclrules" in data: from Acquire.Storage import ACLRules as _ACLRules f._aclrules = _ACLRules.from_data(data["aclrules"]) if "filedata" in data: from Acquire.ObjectStore import string_to_bytes \ as _string_to_bytes f._local_filedata = _string_to_bytes(data["filedata"]) return f