Skip to content

Commit

Permalink
Merge remote-tracking branch 'gpoulin/s3-redshift-py3k'
Browse files Browse the repository at this point in the history
Conflicts:
	luigi/s3.py
  • Loading branch information
Tarrasch committed Feb 25, 2015
2 parents 09ad29c + 7611079 commit 2c06085
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 42 deletions.
6 changes: 5 additions & 1 deletion luigi/contrib/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ class RedshiftManifestTask(S3PathTask):
# should be over ridden to point to a variety
# of folders you wish to copy from
folder_paths = luigi.Parameter()
text_target = True

def run(self):
entries = []
Expand All @@ -319,7 +320,10 @@ def run(self):
})
manifest = {'entries': entries}
target = self.output().open('w')
target.write(json.dumps(manifest).encode('utf8'))
dump = json.dumps(manifest)
if not self.text_target:
dump = dump.encode('utf8')
target.write(dump)
target.close()


Expand Down
4 changes: 2 additions & 2 deletions luigi/format.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,14 +391,14 @@ class TextWrapper(io.TextIOWrapper):

def __exit__(self, *args):
# io.TextIOWrapper close the file on __exit__, let the underlying file decide
if not self.closed:
if not self.closed and self.writable():
super(TextWrapper, self).flush()

self._stream.__exit__(*args)

def __del__(self, *args):
# io.TextIOWrapper close the file on __del__, let the underlying file decide
if not self.closed:
if not self.closed and self.writable():
super(TextWrapper, self).flush()

try:
Expand Down
22 changes: 18 additions & 4 deletions luigi/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
:py:class:`S3Target` is a subclass of the Target class to support S3 file system operations
"""

from __future__ import division

import itertools
import logging
import os
Expand All @@ -35,6 +37,7 @@
from configparser import NoSectionError

from luigi import six
from luigi.six.moves import range

from luigi import configuration
from luigi.format import FileWrapper, get_default_format, MixedUnicodeBytes
Expand Down Expand Up @@ -211,15 +214,15 @@ def put_multipart(self, local_path, destination_s3_path, part_size=67108864):
# use modulo to avoid float precision issues
# for exactly-sized fits
num_parts = \
(source_size / part_size) \
(source_size // part_size) \
if source_size % part_size == 0 \
else (source_size / part_size) + 1
else (source_size // part_size) + 1

mp = None
try:
mp = s3_bucket.initiate_multipart_upload(key)

for i in xrange(num_parts):
for i in range(num_parts):
# upload a part at a time to S3
offset = part_size * i
bytes = min(part_size, source_size - offset)
Expand Down Expand Up @@ -355,12 +358,14 @@ class ReadableS3File(object):
def __init__(self, s3_key):
self.s3_key = s3_key
self.buffer = []
self.closed = False

def read(self, size=0):
return self.s3_key.read(size=size)

def close(self):
self.s3_key.close()
self.closed = True

def __del__(self):
self.close()
Expand All @@ -379,6 +384,15 @@ def _flush_buffer(self):
self.buffer = []
return output

def readable(self):
return True

def writable(self):
return False

def seekable(self):
return False

def __iter__(self):
key_iter = self.s3_key.__iter__()

Expand Down Expand Up @@ -484,7 +498,7 @@ def __init__(self, path, format=None, client=None, flag='_SUCCESS'):
if path[-1] != "/":
raise ValueError("S3FlagTarget requires the path to be to a "
"directory. It must end with a slash ( / ).")
super(S3Target, self).__init__(path)
super(S3FlagTarget, self).__init__(path)
self.format = format
self.fs = client or S3Client()
self.flag = flag
Expand Down
72 changes: 38 additions & 34 deletions test/_s3_test.py → test/s3_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import unittest

from luigi import six
from helpers import with_config

import luigi.format
from boto.exception import S3ResponseError
Expand All @@ -31,6 +32,11 @@
from luigi import configuration
from luigi.s3 import FileNotFoundException, InvalidDeleteException, S3Client, S3Target

try:
from unittest import skip
except ImportError:
from unittest2 import skip

if sys.version_info[:2] == (3, 4):
# spulec/moto#308
mock_s3 = unittest.skip('moto mock doesn\'t work with python3.4')
Expand All @@ -45,8 +51,8 @@ class TestS3Target(unittest.TestCase):
def setUp(self):
f = tempfile.NamedTemporaryFile(mode='wb', delete=False)
self.tempFileContents = (
"I'm a temporary file for testing\nAnd this is the second line\n"
"This is the third.")
b"I'm a temporary file for testing\nAnd this is the second line\n"
b"This is the third.")
self.tempFilePath = f.name
f.write(self.tempFileContents)
f.close()
Expand Down Expand Up @@ -83,16 +89,16 @@ def test_read(self):
t = S3Target('s3://mybucket/tempfile', client=client)
read_file = t.open()
file_str = read_file.read()
self.assertEqual(self.tempFileContents, file_str)
self.assertEqual(self.tempFileContents, file_str.encode('utf-8'))

@mock_s3
def test_read_no_file(self):
client = S3Client(AWS_ACCESS_KEY, AWS_SECRET_KEY)
client.s3.create_bucket('mybucket')
t = S3Target('s3://mybucket/tempfile', client=client)
with self.assertRaises(FileNotFoundException):
t.open()
self.assertRaises(FileNotFoundException, t.open)

@skip('Take for ever!')
@mock_s3
def test_read_iterator(self):
# write a file that is 5X the boto buffersize
Expand All @@ -101,7 +107,7 @@ def test_read_iterator(self):
temppath = tempf.name
firstline = ''.zfill(key.Key.BufferSize * 5) + os.linesep
contents = firstline + 'line two' + os.linesep + 'line three'
tempf.write(contents)
tempf.write(contents.encode('utf-8'))
tempf.close()

client = S3Client(AWS_ACCESS_KEY, AWS_SECRET_KEY)
Expand Down Expand Up @@ -141,26 +147,28 @@ def test_write_cleanup_with_error(self):
pass
self.assertFalse(t.exists())

@skip('moto assume text data')
@mock_s3
def test_gzip(self):
client = S3Client(AWS_ACCESS_KEY, AWS_SECRET_KEY)
client.s3.create_bucket('mybucket')
t = S3Target('s3://mybucket/gzip_test', luigi.format.Gzip,
client=client)
p = t.open('w')
test_data = 'test'
test_data = b'test'
p.write(test_data)
self.assertFalse(t.exists())
p.close()
self.assertTrue(t.exists())

@skip('moto assume text data')
@mock_s3
def test_gzip_works_and_cleans_up(self):
client = S3Client(AWS_ACCESS_KEY, AWS_SECRET_KEY)
client.s3.create_bucket('mybucket')
t = S3Target('s3://mybucket/gzip_test', luigi.format.Gzip,
client=client)
test_data = '123testing'
test_data = b'123testing'
with t.open('w') as f:
f.write(test_data)

Expand All @@ -175,23 +183,11 @@ class TestS3Client(unittest.TestCase):
def setUp(self):
f = tempfile.NamedTemporaryFile(mode='wb', delete=False)
self.tempFilePath = f.name
f.write("I'm a temporary file for testing\n")
f.write(b"I'm a temporary file for testing\n")
f.close()

self.s3_config = dict(aws_access_key_id='foo',
aws_secret_access_key='bar')
with open(tempfile.mktemp(prefix='luigi_s3_test_'), 'w') as f:
self._s3_config_path = f.name
f.write('[s3]\n{}\n'.format(
'\n'.join(['{}: {}'.format(k, v)
for k, v in six.iteritems(self.s3_config)])))
self._old_config_paths = configuration.LuigiConfigParser._config_paths
configuration.LuigiConfigParser._config_paths = self._s3_config_path

def tearDown(self):
os.remove(self.tempFilePath)
os.remove(self._s3_config_path)
configuration.LuigiConfigParser._config_paths = self._old_config_paths

def test_init_with_environment_variables(self):
os.environ['AWS_ACCESS_KEY_ID'] = 'foo'
Expand All @@ -206,13 +202,12 @@ def test_init_with_environment_variables(self):
self.assertEqual(s3_client.s3.gs_access_key_id, 'foo')
self.assertEqual(s3_client.s3.gs_secret_access_key, 'bar')

@with_config({'s3': {'aws_access_key_id': 'foo', 'aws_secret_access_key': 'bar'}})
@mock_s3
def test_init_with_config(self):
s3_client = S3Client()
self.assertEqual(s3_client.s3.access_key,
self.s3_config['aws_access_key_id'])
self.assertEqual(s3_client.s3.secret_key,
self.s3_config['aws_secret_access_key'])
self.assertEqual(s3_client.s3.access_key, 'foo')
self.assertEqual(s3_client.s3.secret_key, 'bar')

@mock_s3
def test_put(self):
Expand Down Expand Up @@ -320,26 +315,35 @@ def test_remove(self):
s3_client = S3Client(AWS_ACCESS_KEY, AWS_SECRET_KEY)
s3_client.s3.create_bucket('mybucket')

with self.assertRaises(S3ResponseError):
s3_client.remove('s3://bucketdoesnotexist/file')
self.assertRaises(
S3ResponseError,
lambda: s3_client.remove('s3://bucketdoesnotexist/file')
)

self.assertFalse(s3_client.remove('s3://mybucket/doesNotExist'))

s3_client.put(self.tempFilePath, 's3://mybucket/existingFile0')
self.assertTrue(s3_client.remove('s3://mybucket/existingFile0'))
self.assertFalse(s3_client.exists('s3://mybucket/existingFile0'))

with self.assertRaises(InvalidDeleteException):
s3_client.remove('s3://mybucket/')
with self.assertRaises(InvalidDeleteException):
s3_client.remove('s3://mybucket')
self.assertRaises(
InvalidDeleteException,
lambda: s3_client.remove('s3://mybucket/')
)

self.assertRaises(
InvalidDeleteException,
lambda: s3_client.remove('s3://mybucket')
)

s3_client.put(self.tempFilePath, 's3://mybucket/removemedir/file')
with self.assertRaises(InvalidDeleteException):
s3_client.remove('s3://mybucket/removemedir', recursive=False)
self.assertRaises(
InvalidDeleteException,
lambda: s3_client.remove('s3://mybucket/removemedir', recursive=False)
)

def _run_multipart_test(self, part_size, file_size):
file_contents = "a" * file_size
file_contents = b"a" * file_size

s3_path = 's3://mybucket/putMe'
tmp_file = tempfile.NamedTemporaryFile(mode='wb', delete=True)
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ commands = isort -w 120 -rc luigi test examples bin
# Call this using `tox -e docs`.
deps =
sqlalchemy
Sphinx
Sphinx>=1.3b1
sphinx_rtd_theme
commands =
# build API docs
Expand Down

0 comments on commit 2c06085

Please sign in to comment.