Skip to content

Commit

Permalink
athenad: fix memory leak in _do_upload() (commaai#30237)
Browse files Browse the repository at this point in the history
* fix memory leak

* test: stash

* clean up

* clean up

* ruff

* rm

* add py memory profiler

* test compress and no compress

* proper test

* comment

---------

Co-authored-by: Shane Smiskol <[email protected]>
  • Loading branch information
deanlee and sshane authored Oct 29, 2023
1 parent 0eea00e commit 61288df
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 21 deletions.
15 changes: 12 additions & 3 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ parameterized = "^0.8"
pprofile = "*"
pre-commit = "*"
pygame = "*"
pympler = "*"
pyprof2calltree = "*"
pytest = "*"
pytest-cov = "*"
Expand Down
16 changes: 6 additions & 10 deletions selfdrive/athena/athenad.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from datetime import datetime
from functools import partial
from queue import Queue
from typing import BinaryIO, Callable, Dict, List, Optional, Set, Union, cast
from typing import Callable, Dict, List, Optional, Set, Union, cast

import requests
from jsonrpc import JSONRPCResponseManager, dispatcher
Expand Down Expand Up @@ -290,19 +290,15 @@ def _do_upload(upload_item: UploadItem, callback: Optional[Callable] = None) ->
compress = True

with open(path, "rb") as f:
data: BinaryIO
content = f.read()
if compress:
cloudlog.event("athena.upload_handler.compress", fn=path, fn_orig=upload_item.path)
compressed = bz2.compress(f.read())
size = len(compressed)
data = io.BytesIO(compressed)
else:
size = os.fstat(f.fileno()).st_size
data = f
content = bz2.compress(content)

with io.BytesIO(content) as data:
return requests.put(upload_item.url,
data=CallbackReader(data, callback, size) if callback else data,
headers={**upload_item.headers, 'Content-Length': str(size)},
data=CallbackReader(data, callback, len(content)) if callback else data,
headers={**upload_item.headers, 'Content-Length': str(len(content))},
timeout=30)


Expand Down
30 changes: 22 additions & 8 deletions selfdrive/athena/tests/test_athenad.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
import unittest
from dataclasses import asdict, replace
from datetime import datetime, timedelta
from parameterized import parameterized
from typing import Optional

from multiprocessing import Process
from pathlib import Path
from pympler.tracker import SummaryTracker
from unittest import mock
from websocket import ABNF
from websocket._exceptions import WebSocketConnectionClosedException
Expand Down Expand Up @@ -57,10 +58,11 @@ def _wait_for_upload():
break

@staticmethod
def _create_file(file: str, parent: Optional[str] = None) -> str:
def _create_file(file: str, parent: Optional[str] = None, data: bytes = b'') -> str:
fn = os.path.join(Paths.log_root() if parent is None else parent, file)
os.makedirs(os.path.dirname(fn), exist_ok=True)
Path(fn).touch()
with open(fn, 'wb') as f:
f.write(data)
return fn


Expand Down Expand Up @@ -137,19 +139,31 @@ def test_strip_bz2_extension(self):
if fn.endswith('.bz2'):
self.assertEqual(athenad.strip_bz2_extension(fn), fn[:-4])


@parameterized.expand([(True,), (False,)])
@with_http_server
def test_do_upload(self, host):
fn = self._create_file('qlog.bz2')
def test_do_upload(self, compress, host):
# random bytes to ensure rather large object post-compression
fn = self._create_file('qlog', data=os.urandom(10000 * 1024))

item = athenad.UploadItem(path=fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='')
# warm up object tracker
tracker = SummaryTracker()
for _ in range(5):
tracker.diff()

upload_fn = fn + ('.bz2' if compress else '')
item = athenad.UploadItem(path=upload_fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='')
with self.assertRaises(requests.exceptions.ConnectionError):
athenad._do_upload(item)

item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='')
item = athenad.UploadItem(path=upload_fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='')
resp = athenad._do_upload(item)
self.assertEqual(resp.status_code, 201)

# assert memory cleaned up
for _type, num_objects, total_size in tracker.diff():
with self.subTest(_type=_type):
self.assertLess(total_size / 1024, 10, f'Object {_type} ({num_objects=}) grew larger than 10 kB while uploading file')

@with_http_server
def test_uploadFileToUrl(self, host):
fn = self._create_file('qlog.bz2')
Expand Down

0 comments on commit 61288df

Please sign in to comment.