import os as _os
import uuid as _uuid
from Acquire.Access import Request as _Request
__all__ = ["RunRequest"]
def _get_abspath_size_md5(basedir, key, filename, max_size=None):
"""Assert that the specified filename associated with key 'key' exists
and is readable by this user. Assert also that the filesize if below
'size' bytes, is 'max_size' has been specified. This returns the
absolute filename path for the file, the size of the file in bytes
and the md5 checksum of the file, as a tuple
Args:
basedir (str): directory in which to find file
key (str): key for file
filename (str): filename
max_size (int, optional, default=None): maximum size
of file to process
Returns:
tuple (str, int, str): filename, filesize in bytes, MD5
checksum of file
"""
if _os.path.isabs(filename):
filename = _os.path.realpath(filename)
else:
filename = _os.path.realpath(_os.path.join(basedir, filename))
try:
FILE = open(filename, "r")
FILE.close()
except Exception as e:
from Acquire.Service import exception_to_string
from Acquire.Access import RunRequestError
raise RunRequestError(
"Cannot complete the run request because the file '%s' is not "
"readable: filename=%s.\n\nCAUSE: %s" %
(key, filename, exception_to_string(e)))
from Acquire.Access import get_filesize_and_checksum \
as _get_filesize_and_checksum
(filesize, md5) = _get_filesize_and_checksum(filename)
if filesize > max_size:
raise RunRequestError(
"Cannot complete the run request because the file '%s' is "
"too large: filename=%s, filesize=%f MB, max_size=%f MB" %
(key, filename, filesize/(1024.0*1024.0),
max_size/(1024.0*1024.0)))
return (filename, filesize, md5)
[docs]class RunRequest(_Request):
"""This class holds a request to run a particular calculation
on a RunService. The result of this request will be a
PAR to which the input should be loaded, and a Bucket
from which the output can be read. The calculation will
start once the input has been signalled as loaded.
"""
def __init__(self, runfile=None):
"""Construct the request
"""
super().__init__()
self._uid = None
self._runinfo = None
self._tarfile = None
self._tarfilename = None
self._tarsize = None
self._tarmd5 = None
if runfile is not None:
# Package up the simulation described in runfile
self._set_runfile(runfile)
[docs] def is_null(self):
"""Return whether or not this is a null request
Returns:
bool: True if UID is set, else False
"""
return self._uid is None
def __str__(self):
if self.is_null():
return "RunRequest::null"
else:
return "RunRequest(uid=%s)" % self._uid
def __eq__(self, other):
if isinstance(other, self.__class__):
return self._uid == other._uid
else:
return False
def __ne__(self, other):
return not self.__eq__(other)
[docs] def uid(self):
"""Return the UID of this request"""
return self._uid
[docs] def fingerprint(self):
"""Return a unique fingerprint for this request that can be
used for signing and verifying authorisations
Returns:
None or str: If null returns None, else returns a string
combining the UID, size of tarfile and tarfile MD5 checksum
as a fingerprint
"""
if self.is_null():
return None
return "%s%s%s" % (self.uid(), self.tarfile_size(),
self.tarfile_md5sum())
[docs] def tarfile(self):
"""Return the name of the tarfile containing all of the
input files
Returns:
str: Name of tarfile
"""
return self._tarfilename
[docs] def tarfile_size(self):
"""Return the size of the tarfile in bytes
Returns:
int: Size of tarfile in bytes
"""
return self._tarsize
[docs] def tarfile_md5sum(self):
"""Return the MD5 checksum of the tarfile containing
the input files
Returns:
str: MD5 checksum of tarfile
"""
return self._tarmd5
[docs] def runinfo(self):
"""Return the processed run information used to describe the
calculation to be run. This includes information about all
of the input files, such as their names, filesizes and
MD5 checksums
Returns:
dict: Dictionary containing information about
input files, names, filesizes, MD5 checksums
"""
import copy as _copy
return _copy.deepcopy(self._runinfo)
def _validate_input(self, basedir, runinfo):
"""Validate that the passed input 'runinfo' is correct, given
it was loaded from the directory 'basedir'. This
makes sure that all of the input files exist and are readable
relative to 'basedir'. These MUST be declared in the 'input'
section of the dictionary. This returns an updated 'runinfo'
which has all relative paths converted to absolute file paths
Args:
basedir (str): directory from which to load data
runinfo (dict): information regarding files to be
used
Returns:
dict: Dictionary of validated file information
including their sizes and MD5 checksums
"""
if "input" not in runinfo:
return runinfo
try:
items = runinfo["input"][0].items()
except:
try:
items = runinfo["input"].items()
except:
from Acquire.Access import RunRequestError
raise RunRequestError(
"Cannot execute the request because the input files "
"are specified with the wrong format. They should be "
"a single dictionary of key-value pairs. "
"Instead it is '%s'" % str(runinfo["input"]))
input = {}
for (key, filename) in items:
# check the file exists and is not more than 100 MB is size
(absfile, filesize, md5) = _get_abspath_size_md5(
basedir, key,
filename,
100*1024*1024)
input[key] = (absfile, filesize, md5)
runinfo["input"] = input
return runinfo
def _create_tarfile(self):
"""This function creates the new tarfile, records its
size and MD5 checksum and updates the runinfo with
the paths for the input files in the zipfile
Returns:
None
"""
if self._tarfile is not None:
from Acquire.Access import RunRequestError
raise RunRequestError("You cannot create the tarfile twice...")
if "input" not in self._runinfo:
return
input = self._runinfo["input"]
import tarfile as _tarfile
import tempfile as _tempfile
# Loop through each file - add it to tar.bz2. The files are added
# flat into the tar.bz2, i.e. with no subdirectory. This is to
# prevent strange complications or clashes with other files that
# the user may create during output (on the server the files will
# be unpacked into a uniquely-named directory)
names = {}
tempfile = _tempfile.NamedTemporaryFile(suffix="tar.bz2")
tarfile = _tarfile.TarFile(fileobj=tempfile, mode="w")
for (key, fileinfo) in input.items():
(filename, filesize, md5) = fileinfo
name = _os.path.basename(filename)
# make sure that there isn't a matching file in the tarfile
i = 0
while name in names:
i += 1
name = "%d_%s" % (i, name)
tarfile.add(name=filename, arcname=name, recursive=False)
input[key] = (name, filesize, md5)
tarfile.close()
# close the file so that it is written to the disk - if we close
# the tempfile then the file is deleted... (which shouldn't happen
# until the object is deleted)
tempfile.file.close()
self._tarfile = tempfile
self._tarfilename = tempfile.name
from Acquire.Access import get_filesize_and_checksum \
as _get_filesize_and_checksum
(filesize, md5) = _get_filesize_and_checksum(tempfile.name)
self._tarsize = filesize
self._tarmd5 = md5
def _set_runfile(self, runfile):
"""Run the simulation described in the passed runfile (should
be in yaml or json format). This gives the type of simulation, the
location of the input files and how the output should be
named
Args:
runfile (str): YAML or JSON format file to be used
to run simulation
Returns:
None
"""
if self._runinfo:
from Acquire.Access import RunRequestError
raise RunRequestError(
"You cannot change runfile of this RunRequest")
if runfile is None:
return
runlines = None
try:
with open(runfile, "r") as FILE:
runlines = FILE.read()
except Exception as e:
from Acquire.Service import exception_to_string
from Acquire.Access import RunRequestError
raise RunRequestError(
"Cannot read '%s'. You must supply a readable input file "
"that describes the calculation to be performed and supplies "
"the names of all of the input files.\n\nCAUSE: %s" %
(runfile, exception_to_string(e)))
# get the directory that contains this file
basedir = _os.path.dirname(_os.path.abspath(runfile))
# try to parse this input as yaml
runinfo = None
try:
import yaml as _yaml
runinfo = _yaml.safe_load(runlines)
except:
pass
if runinfo is None:
try:
import json as _json
runinfo = _json.loads(runlines)
except:
pass
if runinfo is None:
from Acquire.Access import RunRequestError
raise RunRequestError(
"Cannot interpret valid input read from the file '%s'. "
"This should be in json or yaml format, and this parser "
"be built with that support." % runfile)
runinfo = self._validate_input(basedir, runinfo)
self._runinfo = runinfo
self._create_tarfile()
# everything is ok - set the UID of this request
self._uid = str(_uuid.uuid4())
[docs] def to_data(self):
"""Return this request as a json-serialisable dictionary
Returns:
dict: JSON serialisable dictionary created from object
"""
if self.is_null():
return {}
data = super().to_data()
data["uid"] = self._uid
data["runinfo"] = self._runinfo
data["tarsize"] = self._tarsize
data["tarmd5"] = self._tarmd5
return data
[docs] @staticmethod
def from_data(data):
"""
Creates a RunRequest object from the JSON data in data
Args:
data (str): JSON deserialisable string used to create object
Returns:
RunRequest or None: If data contains JSON data create
RunRequest object, else return None
"""
if (data and len(data) > 0):
r = RunRequest()
r._uid = data["uid"]
r._runinfo = data["runinfo"]
r._tarsize = int(data["tarsize"])
r._tarmd5 = data["tarmd5"]
r._from_data(data)
return r
return None