Skip to content

DataSink S3 support does not work #3105

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
kimsin98 opened this issue Nov 25, 2019 · 5 comments
Closed

DataSink S3 support does not work #3105

kimsin98 opened this issue Nov 25, 2019 · 5 comments

Comments

@kimsin98
Copy link
Contributor

Summary

The DataSink interface does not work.

Actual behavior

Only the local copy works. Oddly, when used in a workflow, DataSink creates a local directory named s3:/ instead.

Expected behavior

DataSink creates objects in S3 buckets when base_directory starts with s3://.

How to replicate the behavior

ds = DataSink()

ds.inputs.base_directory = 's3://kimsin-test'
ds.inputs.creds_path = os.path.abspath('aws.csv')
ds.inputs.encrypt_bucket_keys = True
ds.inputs.local_copy = os.path.abspath('log')
setattr(ds.inputs, 'out_s3', 'DELETE.me')

ds.run()

This basic S3 test code does not create the desired output in the S3 bucket.

Platform details:

{'commit_hash': '%h',
 'commit_source': 'archive substitution',
 'networkx_version': '2.4',
 'nibabel_version': '2.5.1',
 'nipype_version': '1.3.0-rc1',
 'numpy_version': '1.17.3',
 'pkg_path': '/opt/miniconda-latest/envs/neuro/lib/python3.6/site-packages/nipype',
 'scipy_version': '1.3.1',
 'sys_executable': '/opt/miniconda-latest/envs/neuro/bin/python',
 'sys_platform': 'linux',
 'sys_version': '3.6.7 | packaged by conda-forge | (default, Jul  2 2019, '
                '02:18:42) \n'
                '[GCC 7.3.0]',
 'traits_version': '5.1.2'}

Execution environment

  • My python environment inside container
@effigies
Copy link
Member

For things like this, I'll often debug as follows:

>>> from nipype.interfaces.io import *
>>> ds = DataSink()
>>> ds.inputs.base_directory = 's3://kimsin-test'
>>> ds.inputs.creds_path = os.path.abspath('aws.csv')
>>> ds.inputs.encrypt_bucket_keys = True
>>> ds.inputs.local_copy = os.path.abspath('log')
>>> ds.inputs.out_s3 = 'DELETE.me'
>>> self = ds

Then just manually walk through the _list_outputs method to see where things break down.

def _list_outputs(self):
# infields are mandatory, however I could not figure out how to set 'mandatory' flag dynamically
# hence manual check
import boto
if self._infields:
for key in self._infields:
value = getattr(self.inputs, key)
if not isdefined(value):
msg = (
"%s requires a value for input '%s' because it was listed in 'infields'"
% (self.__class__.__name__, key)
)
raise ValueError(msg)
outputs = {}
# get list of all files in s3 bucket
conn = boto.connect_s3(anon=self.inputs.anon)
bkt = conn.get_bucket(self.inputs.bucket)
bkt_files = list(k.key for k in bkt.list(prefix=self.inputs.bucket_path))
# keys are outfields, args are template args for the outfield
for key, args in list(self.inputs.template_args.items()):
outputs[key] = []
template = self.inputs.template
if (
hasattr(self.inputs, "field_template")
and isdefined(self.inputs.field_template)
and key in self.inputs.field_template
):
template = self.inputs.field_template[
key
] # template override for multiple outfields
if isdefined(self.inputs.bucket_path):
template = os.path.join(self.inputs.bucket_path, template)
if not args:
filelist = []
for fname in bkt_files:
if re.match(template, fname):
filelist.append(fname)
if len(filelist) == 0:
msg = "Output key: %s Template: %s returned no files" % (
key,
template,
)
if self.inputs.raise_on_empty:
raise IOError(msg)
else:
warn(msg)
else:
if self.inputs.sort_filelist:
filelist = human_order_sorted(filelist)
outputs[key] = simplify_list(filelist)
for argnum, arglist in enumerate(args):
maxlen = 1
for arg in arglist:
if isinstance(arg, (str, bytes)) and hasattr(self.inputs, arg):
arg = getattr(self.inputs, arg)
if isinstance(arg, list):
if (maxlen > 1) and (len(arg) != maxlen):
raise ValueError(
"incompatible number of arguments for %s" % key
)
if len(arg) > maxlen:
maxlen = len(arg)
outfiles = []
for i in range(maxlen):
argtuple = []
for arg in arglist:
if isinstance(arg, (str, bytes)) and hasattr(self.inputs, arg):
arg = getattr(self.inputs, arg)
if isinstance(arg, list):
argtuple.append(arg[i])
else:
argtuple.append(arg)
filledtemplate = template
if argtuple:
try:
filledtemplate = template % tuple(argtuple)
except TypeError as e:
raise TypeError(
e.message
+ ": Template %s failed to convert with args %s"
% (template, str(tuple(argtuple)))
)
outfiles = []
for fname in bkt_files:
if re.match(filledtemplate, fname):
outfiles.append(fname)
if len(outfiles) == 0:
msg = "Output key: %s Template: %s returned no files" % (
key,
filledtemplate,
)
if self.inputs.raise_on_empty:
raise IOError(msg)
else:
warn(msg)
outputs[key].append(None)
else:
if self.inputs.sort_filelist:
outfiles = human_order_sorted(outfiles)
outputs[key].append(simplify_list(outfiles))
if any([val is None for val in outputs[key]]):
outputs[key] = []
if len(outputs[key]) == 0:
outputs[key] = None
elif len(outputs[key]) == 1:
outputs[key] = outputs[key][0]
# Outputs are currently stored as locations on S3.
# We must convert to the local location specified
# and download the files.
for key, val in outputs.items():
# This will basically be either list-like or string-like:
# if it's an instance of a list, we'll iterate through it.
# If it isn't, it's string-like (string, unicode), we
# convert that value directly.
if isinstance(val, (list, tuple, set)):
for i, path in enumerate(val):
outputs[key][i] = self.s3tolocal(path, bkt)
else:
outputs[key] = self.s3tolocal(val, bkt)
return outputs

I'm sure there's a way to do this in pdb that will save you typing/pasting, if you prefer...

@wtriplett
Copy link
Contributor

wtriplett commented Dec 31, 2019

Hi,

this may be related to the use of traits.Directory in the input spec, as opposed to Str. Maybe traits.Directory is stripping out the double / in the S3 url.

Edit: Looks like pathlib.Path is the culprit:

def validate(self, objekt, name, value, return_pathlike=False):
"""Validate a value change."""
try:
value = Path(value) # Use pathlib's validation
except Exception:
self.error(objekt, name, str(value))

Python 3.7.4 (default, Aug 13 2019, 15:17:50) 

In[3]: from pathlib import Path
Path('s3://this/is/a/test')
Out[3]: PosixPath('s3:/this/is/a/test')

@kimsin98
Copy link
Contributor Author

kimsin98 commented Jan 2, 2020

Good find!

I suppose the simplest fix would be changing DataSink._check_s3_base_dir to accommodate this quirk. The other option of editing traits.Directory has too much potential to affect other interfaces.

I will submit a quick PR after testing.

@oesteban
Copy link
Contributor

oesteban commented Jan 2, 2020

I think base_directory should not be a traits.Directory. Why not just changing it to traits.Str?

@kimsin98
Copy link
Contributor Author

kimsin98 commented Jan 2, 2020

@oesteban I am not quite sure about all the implications, but doing so would skip whatever cleanup pathlib.Path does on DataSink base_directory. This notably includes various backslash shenanigans.

For now, I will leave the discussion on what format DataSink expects as inputs up to others.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants