"""
module with the main trajectories class
"""
# -*- coding:utf-8 -*-
from datetime import datetime, timedelta
from functools import partial, wraps
from multiprocessing.pool import Pool
from tempfile import mkdtemp
from warnings import warn
import numpy as np
from path import Path
try:
from shutil import SameFileError
except ImportError:
from shutil import Error as SameFileError
from .formats import from_ascii, from_netcdf, hhmm_to_frac_hour, to_ascii, to_netcdf
from .tools import LagrantoException, run_cmd
__all__ = ["Tra", "LagrantoRun"]
[docs]class Tra:
"""Class to work with LAGRANTO output.
Read trajectories from a LAGRANTO file and return a structured numpy array
Parameters
----------
filename : string
File containing lagranto trajectories
usedatetime: bool
Read times as datetime objects, default True
array: structured array
If defined creates a new Tra object filled with the array
Returns
-------
structured array (Tra): trajs(ntra,ntime) with variables as tuple.
Examples
--------
>>> filename = 'mylslfile.nc'
>>> trajs = Tra()
>>> trajs.load_netcdf(filename)
>>> trajs['lon'][0,:] # return the longitudes for the first trajectory.
>>> trajs = Tra(filename)
>>> selected_trajs = Tra(array=trajs[[10, 20, 30], :])
Author : Nicolas Piaget, ETH Zurich , 2014
Sebastiaan Crezee, ETH Zurich , 2014
"""
_startdate = None
def __init__(self, filename="", usedatetime=True, array=None, **kwargs):
"""Initialized a Tra object.
If filename is given, try to load it directly;
Arguments to the load function can be passed as key=value argument.
"""
typefile = kwargs.pop("typefile", None)
if typefile is not None:
msg = "typefile is not used anymore;" "it will be remove in futur version"
raise DeprecationWarning(msg)
if not filename:
if array is None:
self._array = None
else:
self._array = array
return
try:
self.load_netcdf(filename, usedatetime=usedatetime, **kwargs)
except (OSError, RuntimeError):
try:
self.load_ascii(filename, usedatetime=usedatetime, **kwargs)
except Exception:
raise OSError("Unkown fileformat. Known formats " "are ascii or netcdf")
def __len__(self):
return len(self._array)
def __getattr__(self, attr):
if attr in self.__dict__:
return getattr(self, attr)
return getattr(self._array, attr)
def __getitem__(self, key):
return self._array[key]
def __setitem__(self, key, item):
if isinstance(key, slice):
self._array[key] = item
elif key in self.dtype.names:
self._array[key] = item
else:
dtypes = self._array.dtype.descr
dtypes.append((key, item.dtype.descr[0][1]))
dtypes = [(str(d[0]), d[1]) for d in dtypes]
newarr = np.zeros(self._array.shape, dtype=dtypes)
for var in self.variables:
newarr[var] = self._array[var]
newarr[key] = item
self._array = newarr
def __repr__(self):
try:
string = " \
{} trajectories with {} time steps. \n \
Available fields: {}\n \
total duration: {} minutes".format(
self.ntra, self.ntime, "/".join(self.variables), self.duration
)
except AttributeError:
# Assume it's an empty Tra()
string = "\
Empty trajectories container.\n\
Hint: use load_ascii() or load_netcdf()\n\
to load data"
return string
@property
def ntra(self):
"""Return the number of trajectories"""
if self.ndim < 2:
warn(
"\nBe careful with the dimensions, "
"you may want to change the shape:\n"
"either shape + (1,) or (1,)+shape"
)
return None
return self.shape[0]
@property
def ntime(self):
"""Return the number of time steps"""
if self.ndim < 2:
warn(
"\nBe careful with the dimensions, "
"you may want to change the shape:\n"
"either shape + (1,) or (1,)+shape"
)
return None
return self.shape[1]
@property
def variables(self):
"""Return the names of the variables"""
return list(self.dtype.names)
@property
def duration(self):
"""Time duration in minutes."""
end = self["time"][0, -1]
start = self["time"][0, 0]
delta = end - start
if isinstance(delta, np.timedelta64):
return delta.astype(timedelta).total_seconds() / 60.0
delta = hhmm_to_frac_hour(end) - hhmm_to_frac_hour(start)
return delta * 60.0
@property
def initial(self):
"""Give the initial time of the trajectories."""
starttime = self["time"][0, 0]
return starttime.astype(datetime)
@property
def startdate(self):
"""Return the starting date of the trajectories"""
if self._startdate is None:
time0 = self["time"][0, 0]
if isinstance(time0, np.datetime64):
self._startdate = time0.astype(datetime)
else:
self._startdate = datetime(1900, 1, 1, 0)
return self._startdate
[docs] def set_array(self, array):
"""To change the trajectories array."""
self._array = array
[docs] def get_array(self):
"""Return the trajectories array as numpy object"""
return self._array
[docs] def concatenate(self, trajs, time=False, inplace=False):
"""To concatenate trajectories together.
Concatenate trajectories together and return a new object.
The trajectories should contain the same variables.
if time=False, the number of timestep in each trajs should be the same
if time=True, the number of trajectories in each Tra should be the same
Parameters
----------
trajs: Tra or list of Tra
Trajectories to concatenate with the current one
time: bool, default False
if True concatenate along the time dimension
inplace: bool, default False
if True append the trajs to current Tra object and return None
Returns
-------
Tra
Return a new Tra (trajectories) object
Examples
--------
Create a new Tra object which include trajs and all newtrajs
>>> files = ['lsl_20001010_00.4', 'lsl_2001010_01.4']
>>> filename = files[0]
>>> trajs = Tra(filename)
>>> newtrajs = [Tra(f) for f in files]
>>> alltrajs = trajs.concatenate(newtrajs)
Append newtrajs to trajs
>>> trajs = Tra(filename)
>>> newtrajs = [Tra(f) for f in files]
>>> trajs.concatenate(newtrajs, inplace=True)
"""
if not isinstance(trajs, (tuple, list)):
trajs = (trajs,)
if time:
trajstuple = (self.T,)
trajstuple += tuple(tra.T for tra in trajs)
test = np.concatenate(trajstuple).T
else:
trajstuple = (self.get_array(),)
trajstuple += tuple(tra.get_array() for tra in trajs)
test = np.concatenate(trajstuple)
if inplace:
self._array = test
else:
newtrajs = Tra()
newtrajs.set_array(test)
return newtrajs
[docs] def append(self, trajs):
"""append trajectories
Parameters
---------
trajs: single Tra or list of Tra
Trajectories to concatenate with the current one
Examples
--------
>>> files = ['lsl_20001010_00.4', 'lsl_2001010_01.4']
>>> filename = files[0]
>>> trajs = Tra(filename)
>>> newtrajs = [Tra(f) for f in files]
>>> trajs.append(newtrajs)
"""
self.concatenate(trajs, inplace=True)
[docs] def write(self, filename, fileformat="netcdf"):
"""Method to write the trajectories to a file"""
globals()[f"_write_{fileformat}"](self, filename)
[docs] def load_netcdf(self, filename, usedatetime=True, msv=-999, unit="hours", **kwargs):
"""Method to load trajectories from a netcdf file"""
self._array, self._startdate = from_netcdf(
filename, usedatetime=usedatetime, msv=msv, unit=unit, **kwargs
)
load_netcdf.__doc__ = from_netcdf.__doc__
[docs] def write_netcdf(self, filename, exclude=None, unit="hours"):
"""Write netcdf"""
to_netcdf(self, filename, exclude=exclude, unit=unit)
write_netcdf.__doc__ = to_netcdf.__doc__
[docs] def write_ascii(self, filename, gz=False, digit=3, mode="w"):
"""Write ascii"""
to_ascii(self, filename, gz=gz, digit=digit, mode=mode)
write_ascii.__doc__ = to_ascii.__doc__
[docs] def load_ascii(self, filename, usedatetime=True, msv=-999.999, gz=False):
"""Load ascii"""
self._array, self._startdate = from_ascii(
filename, usedatetime=usedatetime, msv=msv, gz=gz
)
load_ascii.__doc__ = from_ascii.__doc__
def intmpdir(afunc):
"""Wrapper to excute a command in a temporary directory
Require workingdir as first argument of the function wrapped
"""
@wraps(afunc)
def wrapper(*args, **kwds):
"""Wrapper itself"""
workingdir = args[0]
kwargs = kwds.copy()
no_tmp_dir = kwargs.pop("no_tmp_dir", False)
if Path(workingdir).parent == "/tmp" or no_tmp_dir:
return afunc(*args, **kwargs)
tmpdir = Path(mkdtemp())
lwkds = {
key: value
for key, value in kwds.items()
if key in ["outputdir", "version", "sdate"]
}
link_files(workingdir, tmpdir, **lwkds)
nargs = (tmpdir,) + args[1:]
out = afunc(*nargs, **kwargs)
tmpdir.rmdir_p()
return out
return wrapper
@intmpdir
def create_startf(
workingdir,
date,
filename,
specifier,
version="cosmo",
outputdir="",
sdate=None,
tolist=False,
cmd_header=None,
options=None,
):
"""Create a startfile for LAGRANTO."""
outputdir = Path(outputdir)
workingdir = Path(workingdir)
create = "startf.{version} "
create += "{date:%Y%m%d_%H} {filename} {specifier}"
if options is not None:
if isinstance(options, list):
for option in options:
create += " " + option
else:
raise LagrantoException("options as to be a list of string")
# if tolist:
# filename = Path(filename).splitext()[0] + '.4'
create = create.format(
version=version, date=date, filename=filename, specifier=specifier
)
out = run_cmd(create, workingdir, cmd_header=cmd_header)
if tolist:
lslname = filename
filename = Path(lslname).splitext()[0] + ".startf"
lsl2list = f"lsl2list.{version} {lslname} {filename}"
out += run_cmd(lsl2list, workingdir, cmd_header=cmd_header)
try:
Path.copy(workingdir / filename, outputdir / filename)
except SameFileError:
pass
if tolist:
return out, filename
return out
@intmpdir
def select(
workingdir,
inpfile,
outfile,
crit,
outputdir="",
sdate=None,
version="cosmo",
cmd_header=None,
**kwargs,
):
"""Select trajectories."""
netcdf_format = kwargs.pop("netcdf_format", "CF")
select_cmd = 'select.{version} {inpfile} {outfile} "{crit}"'
select_cmd = select_cmd.format(
version=version, inpfile=inpfile, outfile=outfile, crit=crit
)
out = run_cmd(
select_cmd, workingdir, netcdf_format=netcdf_format, cmd_header=cmd_header
)
outputdir = Path(outputdir)
try:
Path.copy(workingdir / outfile, outputdir / outfile)
except SameFileError:
pass
return out
@intmpdir
def trace(
workingdir,
filename,
outputdir="",
outfile="",
tracevars="",
tracevars_content="",
field="",
sdate=None,
version="cosmo",
cmd_header=None,
**kwargs,
):
"""Trace variables along a trajectory."""
netcdf_format = kwargs.pop("netcdf_format", "CF")
workingdir = Path(workingdir)
outputdir = Path(outputdir)
if not outfile:
outfile = filename
trace_cmd = "trace.{version} {filename} {outfile}"
tracevars_file = "tracevars"
if tracevars:
tracevars_file = tracevars
trace_cmd += " -v " + tracevars
if tracevars_content:
with (Path(workingdir) / tracevars_file).open("w") as fname:
fname.write(tracevars_content)
if field:
trace_cmd += " -f " + field
trace_cmd = trace_cmd.format(filename=filename, outfile=outfile, version=version)
out = run_cmd(
trace_cmd, workingdir, netcdf_format=netcdf_format, cmd_header=cmd_header
)
try:
Path.copy(workingdir / outfile, outputdir / outfile)
except SameFileError:
pass
return out
@intmpdir
def caltra(
workingdir,
startdate,
enddate,
startfile,
filename,
jump=True,
outputdir="",
sdate=None,
version="cosmo",
cmd_header=None,
withtrace=False,
tracevars_content="",
tracevars="",
**kwargs,
):
"""Calculate trajectories for air parcels.
starting at positions specified in startfile.
"""
outputdir = Path(outputdir)
netcdf_format = kwargs.pop("netcdf_format", "CF")
prefix = kwargs.pop("prefix", "")
if not withtrace:
caltra_cmd = "caltra.{version}"
caltra_cmd += " {startdate:%Y%m%d_%H} {enddate:%Y%m%d_%H}"
caltra_cmd += " {startfile} {filename}"
if jump:
caltra_cmd += " -j"
for key, value in kwargs.items():
caltra_cmd += f" -{key} {value}"
caltra_cmd = caltra_cmd.format(
startdate=startdate,
enddate=enddate,
startfile=startfile,
filename=filename,
version=version,
)
else:
tracevars_file = tracevars if tracevars else "tracevars"
if tracevars_content:
with (Path(workingdir) / tracevars_file).open("w") as fname:
fname.write(tracevars_content)
delta = (enddate - startdate).total_seconds() / (60 * 60)
caltra_cmd = "caltra {startfile} {delta:1.0f} {filename} "
caltra_cmd += "-ref {startdate:%Y%m%d_%H%M}"
for key, value in kwargs.items():
caltra_cmd += f" -{key} {value}"
caltra_cmd = caltra_cmd.format(
startdate=startdate, delta=delta, startfile=startfile, filename=filename
)
out = run_cmd(
caltra_cmd,
workingdir,
netcdf_format=netcdf_format,
cmd_header=cmd_header,
prefix=prefix,
)
try:
Path.copy(workingdir / filename, outputdir / filename)
except SameFileError:
pass
return out
@intmpdir
def density(
workingdir,
inpfile,
outfile,
outputdir="",
sdate=None,
version="cosmo",
cmd_header=None,
**kwargs,
):
"""Calculate trajectories density."""
outputdir = Path(outputdir)
netcdf_format = kwargs.pop("netcdf_format", "CF")
density_cmd = "density.{version} {inpfile} {outfile}"
for key, value in kwargs.items():
density_cmd += f" -{key} {value}"
density_cmd = density_cmd.format(inpfile=inpfile, outfile=outfile, version=version)
out = run_cmd(
density_cmd, workingdir, netcdf_format=netcdf_format, cmd_header=cmd_header
)
try:
Path.copy(workingdir / outfile, outputdir / outfile)
except SameFileError:
pass
return out
def link_files(
folder,
tmpdir,
outputdir="",
version="cosmo",
sdate=None,
sfiles=True,
startdate=None,
enddate=None,
):
"""Link data from `folder`, `outputdir`, to `tmpdir`.
Transform lff[d,f]*.nc files from `folder` in P and S files.
Parameters
----------
folder: string,
Path to the origin folder
tmpdir: string,
Path to the destination folder
outputdir: string,
Path to the folder where results are saved
version: string,
Only cosmo is available for now
sdate: datetime object,
Starting date of the cosmo run;
Useful is the cosmo file are named in forecast mode
sfiles: boolean,
If True create link with prefix S as well, pointing to the same files
as the P files.
startdate: datetime object,
First date for which to link files. (inclusive)
enddate: datetime object,
Last date for which ti link files (exclusive)
"""
# the necessary files
folder = Path(folder).abspath()
tmpdir = Path(tmpdir)
files = set(folder.files())
if version == "cosmo":
constant_file = folder.files("l*c.nc")[0]
if constant_file.isfile():
constant_file.symlink(tmpdir / "LMCONSTANTS")
cosmofiles = set(folder.files("lff[df]*[0-9].nc"))
for fname in cosmofiles:
try:
date = datetime.strptime(fname.name, "lffd%Y%m%d%H.nc")
except ValueError:
sfname = fname.name[4:-3]
days, hours, minutes, seconds = (
int(sfname[i : i + 2]) for i in range(0, len(sfname), 2)
)
date = sdate + timedelta(
days=days, hours=hours, minutes=minutes, seconds=seconds
)
if (startdate is not None) and (date < startdate):
continue
if (enddate is not None) and (date >= enddate):
continue
pfile = f"P{date:%Y%m%d_%H}"
fname.symlink(tmpdir / pfile)
if sfiles:
sfile = f"S{date:%Y%m%d_%H}"
fname.symlink(tmpdir / sfile)
files = files.difference(cosmofiles)
for fname in files:
try:
fname.symlink(tmpdir / fname.name)
except FileExistsError:
pass
if outputdir:
outputdir = Path(outputdir).abspath()
for fname in outputdir.files():
try:
fname.symlink(tmpdir / fname.name)
except FileExistsError:
pass
[docs]class LagrantoRun:
"""Perform Lagranto calculation.
Parameters
----------
dates: list
list of (startdate, enddate) tuple
workingdir: string, optional
path to the model output directory, default to current
outputdir: string, optional
path to the trajectory utput directory, defautl to current
startf: string, optional
name of the startf to use (or to create), default to startf.4
The format parameter date can be used, for ex: startf_{date:%Y%m%d%H}.4
lslname: string, optional
name of the lsl file, define its type, default to lsl_{:%Y%m%d%H}.4
tracevars: string, optional
name of a tracevars file as used by trace, default to none
field: string, optional
name of a single field to trace, default to none
version: string, optional
name of the model version to use, currently only cosmo (default)
linkfiles: function, optional
function used to overwrite link_files in run.
Should be used if COSMO output is not standard netcdf
nprocesses: int, optional
Number of processes used when running in parallel, default to 10
sdate: datetime object,
Starting date of the simulation;
useful if files are named in forecast mode
fresh: bool, optional
Fresh start. Remove output directory first.
cmd_header: string, optional
Change the header using for running the command;
see ``run_cmd`` help for more details.
no_tmp_dir: bool, optional
If set to True, do not create temporary folder. Useful only for special version
of lagranto, and if the system as no tmp filesystem.
"""
def __init__(
self,
dates,
workingdir=".",
outputdir=".",
startf="startf.4",
lslname="lsl_{:%Y%m%d%H}.4",
tracevars="",
field="",
version="cosmo",
linkfiles=None,
nprocesses=10,
sdate=None,
fresh=False,
cmd_header=None,
no_tmp_dir=False,
):
self.dates = dates
self.workingdir = Path(workingdir).abspath()
self.outputdir = Path(outputdir).abspath()
if fresh:
self.clear()
self.outputdir.makedirs_p()
self.startf = Path(startf)
self.lslname = lslname
self.tracevars = tracevars
self.field = field
self.version = version
self.link_files = linkfiles if linkfiles else link_files
self.nprocesses = nprocesses
self.sdate = sdate
self.no_tmp_dir = no_tmp_dir
self.cmd_header = cmd_header
self.caltra_outfile = None
self.density_outfile = None
self.select_outfile = None
[docs] def create_startf(self, date, specifier, filename="", tolist=False, **kwargs):
"""Create a file with the starting position of the trajectories.
Parameters
----------
date: datetime
date for which the startf date is produced
specifier: string
detailed description of starting positions;
(see LAGRANTO documentations for more details)
filename: string
filename where starting positions are saved;
the default is defined by `LagrantoRun`
Can make use of the format parameter date, for example:
startf_{date:%Y%m%d%H}.4
tolist: bool
If the starting position should be saved as a list of points;
Useful if the same file is used for different starting times
kwargs: dict
key, value options, possible values are listed bellow;
- cmd_header:
header to add to the shell command;
(see `run_cmd` for more details)
- options:
list of options to pass to the startf function;
(see the LAGRANTO documentation for more details)
"""
if filename:
startf = Path(filename.format(date=date))
self.startf = startf
else:
startf = self.startf.format(date=date)
kwargs["no_tmp_dir"] = self.no_tmp_dir
out = create_startf(
self.workingdir,
date,
startf,
specifier,
outputdir=self.outputdir,
sdate=self.sdate,
version=self.version,
tolist=tolist,
cmd_header=self.cmd_header,
**kwargs,
)
if tolist:
self.startf = out[1]
return out[0]
return out
[docs] def caltra(self, startdate, enddate, filename="", outfile="", **kwargs):
"""Compute trajectories for the given startdate"""
if filename:
self.lslname = filename
self.startf = self.startf.format(date=startdate)
if outfile:
self.caltra_outfile = outfile.format(date=startdate)
else:
self.caltra_outfile = self.lslname.format(startdate)
kwargs["no_tmp_dir"] = self.no_tmp_dir
return caltra(
self.workingdir,
startdate,
enddate,
self.startf,
self.caltra_outfile,
outputdir=self.outputdir,
sdate=self.sdate,
version=self.version,
cmd_header=self.cmd_header,
**kwargs,
)
[docs] def trace(
self,
date,
filename="",
outfile="",
tracevars="",
tracevars_content="",
field="",
**kwargs,
):
r"""Trace variable along a trajectory.
Trace meteorological fields along trajectories
Parameters
----------
date: datetime object
starting date of the trajectories to trace
filename: string, optional
If not specified use `LagrantoRun.lslname`
outfile: string, optional
If not specified same as `filename`
tracevars: string, optional
Name of the file with the field to trace;
If not specified use `LagrantoRun.tracevars`
tracevars_content: string, optional
Content of the tracevars file;
.. code_block: python
\"\"\"\n
QV 1000. P 1\n
PV 1. T 1\n
\"\"\"
field: string,
Specify a field to trace as follow: QV 1.
kwargs
"""
tracef = tracevars if tracevars else self.tracevars
fieldv = field if field else self.field
filename = filename if filename else self.lslname
filename = filename.format(date)
kwargs["no_tmp_dir"] = self.no_tmp_dir
return trace(
self.workingdir,
filename,
outfile=outfile,
outputdir=self.outputdir,
tracevars=tracef,
tracevars_content=tracevars_content,
field=fieldv,
sdate=self.sdate,
version=self.version,
cmd_header=self.cmd_header,
**kwargs,
)
[docs] def density(self, inpfile=None, outfile=None, **kwargs):
"""Return the density of trajectories"""
if inpfile is None:
try:
inpfile = self.caltra_outfile
except AttributeError:
raise ValueError("Provide an input file or run caltra")
if outfile is None:
self.density_outfile = "density.4"
else:
self.density_outfile = outfile
kwargs["no_tmp_dir"] = self.no_tmp_dir
return density(
self.workingdir,
inpfile=inpfile,
outfile=self.density_outfile,
outputdir=self.outputdir,
sdate=self.sdate,
version=self.version,
cmd_header=self.cmd_header,
**kwargs,
)
[docs] def select(self, inpfile=None, outfile=None, **kwargs):
"""Perform selection of trajectories"""
if inpfile is None:
try:
inpfile = self.caltra_outfile
except AttributeError:
raise ValueError("Provide an input file or run caltra")
if outfile is None:
self.select_outfile = "selected.4"
else:
self.select_outfile = outfile
kwargs["no_tmp_dir"] = self.no_tmp_dir
return select(
self.workingdir,
inpfile=inpfile,
outfile=self.select_outfile,
outputdir=self.outputdir,
sdate=self.sdate,
version=self.version,
cmd_header=self.cmd_header,
**kwargs,
)
[docs] def run(self, caltra_kw=None, trace_kw=None, startf_kw=None, **kwargs):
"""Run caltra, trace, and/or create_startf.
Run single_run for each dates tuple
See single_run description for more details.
Parameters
----------
caltra_kw: dictionary
Arguments to pass to the caltra function
trace_kw: dictionary
Arguments to pass to the trace function
startf_kw: dictionary
Arguments to pass to the create_startf function
kwargs:
Arguments to pass to the single_run function
Returns
------
A list of single_run result: list
"""
results = []
if caltra_kw is None:
caltra_kw = dict()
if trace_kw is None:
trace_kw = dict()
if startf_kw is None:
startf_kw = dict()
for startdate, enddate in self.dates:
res = self.single_run(
startdate,
enddate,
caltra_kw=caltra_kw,
trace_kw=trace_kw,
startf_kw=startf_kw,
**kwargs,
)
results.append(res)
return results
[docs] def run_parallel(
self, caltra_kw=None, trace_kw=None, startf_kw=None, nprocesses=None, **kwargs
):
"""Run caltra, trace, and/or create_startf in parallel.
Run single_run for each dates tuple using `multiprocessing.Pool`.
See single_run description for more details.
Warning: If you use run_parallel with create_startf and
if the starting file is not a list, be sure that every starting file
has a different name, for example doing as follow:
>>> LagrantoRun(startf='startf_{date:%Y%m%d_%H.4}', ...)
Parameters
----------
caltra_kw: dictionary
Arguments to pass to the caltra function
trace_kw: dictionary
Arguments to pass to the trace function
startf_kw: dictionary
Arguments to pass to the create_startf function
nprocesses: int,
Number of processors to use
kwargs:
Arguments to pass to the single_run function
Returns
-------
A list of single_run results: list
"""
if caltra_kw is None:
caltra_kw = dict()
if trace_kw is None:
trace_kw = dict()
if startf_kw is None:
startf_kw = dict()
if nprocesses is not None:
self.nprocesses = nprocesses
single_run = partial(
self.single_run,
caltra_kw=caltra_kw,
trace_kw=trace_kw,
startf_kw=startf_kw,
**kwargs,
)
with Pool(processes=self.nprocesses) as pool:
results = pool.starmap(single_run, self.dates)
return results
[docs] def single_run(
self,
sd,
ed,
caltra_kw=None,
trace_kw=None,
startf_kw=None,
type="both",
debug=False,
**kwargs,
):
"""Run caltra, trace and/or create_startf for a given starting date
Compute a trajectory set using caltra, trace and/or create_startf in a
temporary directory. Need a working installation of LAGRANTO.
If caltra, trace or create_startf return a error, single_run return
a LagrantoException error with the path of the temporary directory.
In this case, the temporary directory is not deleted.
Parameters
----------
sd: datetime
Starting date of the trajectory
ed: datetime
Ending date of the trajectory
caltra_kw: dictionary
Arguments to pass to the caltra function
trace_kw: dictionary
Arguments to pass to the trace function
startf_kw: dictionary
Arguments to pass to the create_startf function
type: string (default both)
Define the type a run to perform:
both: caltra + trace
caltra: caltra
trace: trace
all: create_startf + caltra + trace
create: create_startf + caltra
debug: Boolean
If True raise LagrantoException instead of return it as string
kwargs: dictionnary
Arguments to pass to the link_files function
Returns
-------
output of caltra, trace and/or create_startf: str
"""
if caltra_kw is None:
caltra_kw = dict()
if trace_kw is None:
trace_kw = dict()
if startf_kw is None:
startf_kw = dict()
workingdir = self.workingdir
no_tmp_dir = caltra_kw.get("no_tmp_dir", self.no_tmp_dir)
if no_tmp_dir:
tmpdir = workingdir
else:
tmpdir = Path(mkdtemp())
self.workingdir = tmpdir
nkwargs = {
"version": self.version,
"outputdir": self.outputdir,
"sdate": self.sdate,
}
nkwargs.update(kwargs)
self.link_files(workingdir, tmpdir, **nkwargs)
out = ""
try:
if type == "both":
out += self.caltra(sd, ed, **caltra_kw)
out += self.trace(sd, **trace_kw)
elif type == "caltra":
out += self.caltra(sd, ed, **caltra_kw)
elif type == "trace":
out += self.trace(sd, **trace_kw)
elif (type == "all") or (type == "create"):
if "specifier" not in startf_kw.keys():
raise LagrantoException(
"specifier need to be specified"
' in startf_kw for type:"{}", '
"see create_starf function"
"".format(type)
)
out += self.create_startf(sd, **startf_kw)
out += self.caltra(sd, ed, **caltra_kw)
if type == "all":
out += self.trace(sd, **trace_kw)
except LagrantoException as err:
err.args += (tmpdir,)
if debug:
raise
else:
return err
self.workingdir = workingdir
if not no_tmp_dir:
tmpdir.rmtree_p()
return out
[docs] def clear(self):
"""Remove the output directory"""
self.outputdir.rmtree_p()