Skip to content

Commit

Permalink
Python API: retry fetch when clues of dirty cache
Browse files Browse the repository at this point in the history
  • Loading branch information
fvennetier committed May 19, 2020
1 parent d0e18f5 commit ec0637f
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 30 deletions.
96 changes: 69 additions & 27 deletions oio/api/object_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1120,34 +1120,11 @@ def _ttfb_wrapper(stream, req_start, download_start, perfdata):
+ req_end - download_start
perfdata['data_size'] = size

@handle_object_not_found
@patch_kwargs
@ensure_headers
@ensure_request_id
def object_fetch(self, account, container, obj, version=None, ranges=None,
key_file=None, **kwargs):
def _object_fetch_impl(self, account, container, obj,
version=None, ranges=None, key_file=None,
**kwargs):
"""
Download an object.
:param account: name of the account in which the object is stored
:param container: name of the container in which the object is stored
:param obj: name of the object to fetch
:param version: version of the object to fetch
:type version: `str`
:param ranges: a list of object ranges to download
:type ranges: `list` of `tuple`
:param key_file: path to the file containing credentials
:keyword properties: should the request return object properties
along with content description (True by default)
:type properties: `bool`
:keyword perfdata: optional `dict` that will be filled with metrics
of time spent to resolve the meta2 address, to do the meta2
request, and the time-to-first-byte, as seen by this API.
:returns: a dictionary of object metadata and
a stream of object data
:rtype: tuple
Actual implementation of object fetch logic.
"""
perfdata = kwargs.get('perfdata', None)
if perfdata is not None:
Expand Down Expand Up @@ -1195,6 +1172,71 @@ def object_fetch(self, account, container, obj, version=None, ranges=None,

return meta, stream

@handle_object_not_found
@patch_kwargs
@ensure_headers
@ensure_request_id
def object_fetch(self, account, container, obj, version=None, ranges=None,
key_file=None, **kwargs):
"""
Download an object.
:param account: name of the account in which the object is stored
:param container: name of the container in which the object is stored
:param obj: name of the object to fetch
:param version: version of the object to fetch
:type version: `str`
:param ranges: a list of object ranges to download
:type ranges: `list` of `tuple`
:param key_file: path to the file containing credentials
:keyword properties: should the request return object properties
along with content description (True by default)
:type properties: `bool`
:keyword perfdata: optional `dict` that will be filled with metrics
of time spent to resolve the meta2 address, to do the meta2
request, and the time-to-first-byte, as seen by this API.
:returns: a dictionary of object metadata and
a stream of object data
:rtype: tuple
"""
# Fetch object metadata (possibly from cache) and object stream.
meta, stream = self._object_fetch_impl(
account, container, obj,
version=version, ranges=ranges, key_file=key_file,
**kwargs)

def _data_error_wrapper(buggy_stream):
blocks = 0
try:
for dat in buggy_stream:
yield dat
blocks += 1
except exc.UnrecoverableContent:
# Maybe we got this error because the cached object
# metadata was stale.
cache = kwargs.pop('cache', None)
if cache is None:
# No cache configured: nothing more to do.
raise
elif blocks >= 1:
# The first blocks of data were already sent to the
# caller, we cannot start again.
raise
# Retry the request without reading from the cache.
new_meta, new_stream = self._object_fetch_impl(
account, container, obj,
version=version, ranges=ranges,
key_file=key_file, cache=None, **kwargs)
# Hack the metadata dictionary which has already been
# returned to the caller.
meta.update(new_meta)
# Send data from the new stream.
for dat in new_stream:
yield dat
return meta, _data_error_wrapper(stream)

@handle_object_not_found
@patch_kwargs
@ensure_headers
Expand Down
30 changes: 27 additions & 3 deletions tests/functional/api/test_objectstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2092,13 +2092,37 @@ def test_object_fetch(self):
for chunk in stream:
data += chunk
self.fail('This should not happen with the deleted chunks')
except exc.UnrecoverableContent:
except (exc.UnrecoverableContent, exc.NoSuchObject):
# A former version raised UnrecoverableContent, but newer versions
# do one retry, and thus see that the object does not exist.
pass
self.assertEqual(2, self.api.container._direct_request.call_count)
self.assertEqual(3, self.api.container._direct_request.call_count)
self.assertEqual(0, len(self.cache))

self.assertRaises(
exc.NoSuchObject, self.api.object_fetch,
self.account, self.container, self.path)
self.assertEqual(3, self.api.container._direct_request.call_count)
self.assertEqual(4, self.api.container._direct_request.call_count)
self.assertEqual(0, len(self.cache))

def test_object_fetch_dirty_cache(self):
# Fetch the original object to make sure the cache is filled.
self.api.object_fetch(
self.account, self.container, self.path)
# Make the cache invalid by overwriting the object without
# clearing the cache.
self.api.object_create(self.account, self.container,
obj_name=self.path, data='overwritten',
cache=None)
# Wait for the original chunks to be deleted.
self.wait_for_event('oio-preserved',
types=(EventTypes.CHUNK_DELETED, ), timeout=5.0)
# Read the object. An error will be raised internally, but the latest
# object should be fetched.
meta, stream = self.api.object_fetch(
self.account, self.container, self.path)
data = b''
for chunk in stream:
data += chunk
self.assertEqual('overwritten', data)
self.assertEqual(len('overwritten'), int(meta['size']))

0 comments on commit ec0637f

Please sign in to comment.