Skip to content

Commit

Permalink
Merge pull request #1890 from dandi/embargo-redesign
Browse files Browse the repository at this point in the history
Embargo Re-Design
  • Loading branch information
jjnesbitt authored Apr 29, 2024
2 parents 3211cf2 + 478a7b4 commit cef8fab
Show file tree
Hide file tree
Showing 45 changed files with 678 additions and 939 deletions.
1 change: 1 addition & 0 deletions .github/workflows/backend-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,5 @@ jobs:
DJANGO_DANDI_WEB_APP_URL: http://localhost:8085
DJANGO_DANDI_API_URL: http://localhost:8000
DJANGO_DANDI_JUPYTERHUB_URL: https://hub.dandiarchive.org/
DJANGO_DANDI_DEV_EMAIL: [email protected]
DANDI_ALLOW_LOCALHOST_URLS: 1
4 changes: 2 additions & 2 deletions .github/workflows/frontend-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ jobs:
DJANGO_STORAGE_BUCKET_NAME: dandi-bucket
DJANGO_DANDI_DANDISETS_BUCKET_NAME: dandi-bucket
DJANGO_DANDI_DANDISETS_LOG_BUCKET_NAME: dandiapi-dandisets-logs
DJANGO_DANDI_DANDISETS_EMBARGO_BUCKET_NAME: dandi-embargo-dandisets
DJANGO_DANDI_DANDISETS_EMBARGO_LOG_BUCKET_NAME: dandiapi-embargo-dandisets-logs
DJANGO_DANDI_WEB_APP_URL: http://localhost:8085
DJANGO_DANDI_API_URL: http://localhost:8000
DJANGO_DANDI_JUPYTERHUB_URL: https://hub.dandiarchive.org/
DJANGO_DANDI_DEV_EMAIL: [email protected]
DANDI_ALLOW_LOCALHOST_URLS: 1

# Web client env vars
Expand Down Expand Up @@ -174,11 +174,11 @@ jobs:
DJANGO_STORAGE_BUCKET_NAME: dandi-bucket
DJANGO_DANDI_DANDISETS_BUCKET_NAME: dandi-bucket
DJANGO_DANDI_DANDISETS_LOG_BUCKET_NAME: dandiapi-dandisets-logs
DJANGO_DANDI_DANDISETS_EMBARGO_BUCKET_NAME: dandi-embargo-dandisets
DJANGO_DANDI_DANDISETS_EMBARGO_LOG_BUCKET_NAME: dandiapi-embargo-dandisets-logs
DJANGO_DANDI_WEB_APP_URL: http://localhost:8085
DJANGO_DANDI_API_URL: http://localhost:8000
DJANGO_DANDI_JUPYTERHUB_URL: https://hub.dandiarchive.org/
DJANGO_DANDI_DEV_EMAIL: [email protected]
DANDI_ALLOW_LOCALHOST_URLS: 1

# Web client env vars
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Generated by Django 4.1.13 on 2024-03-20 15:59
from __future__ import annotations

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
('analytics', '0001_initial_v2'),
]

operations = [
migrations.RemoveConstraint(
model_name='processeds3log',
name='analytics_processeds3log_unique_name_embargoed',
),
migrations.RenameField(
model_name='processeds3log',
old_name='embargoed',
new_name='historically_embargoed',
),
migrations.AlterField(
model_name='processeds3log',
name='historically_embargoed',
field=models.BooleanField(default=False),
),
migrations.AddConstraint(
model_name='processeds3log',
constraint=models.UniqueConstraint(
fields=('name', 'historically_embargoed'),
name='analytics_processeds3log_unique_name_embargoed',
),
),
]
9 changes: 6 additions & 3 deletions dandiapi/analytics/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ class ProcessedS3Log(models.Model):
RegexValidator(r'^\d{4}-(\d{2}-){5}[A-F0-9]{16}$')
],
)
# This is necessary to determine which bucket the logfile corresponds to
embargoed = models.BooleanField()

# Represents if this s3 log file was embargoed prior to the embargo re-design.
# If this field is True, the log file lives in the S3 bucket pointed to by the
# DANDI_DANDISETS_EMBARGO_LOG_BUCKET_NAME setting.
historically_embargoed = models.BooleanField(default=False)

class Meta:
constraints = [
models.UniqueConstraint(
fields=['name', 'embargoed'],
fields=['name', 'historically_embargoed'],
name='%(app_label)s_%(class)s_unique_name_embargoed',
)
]
Expand Down
59 changes: 18 additions & 41 deletions dandiapi/analytics/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
from s3logparse import s3logparse

from dandiapi.analytics.models import ProcessedS3Log
from dandiapi.api.models.asset import AssetBlob, EmbargoedAssetBlob
from dandiapi.api.storage import get_boto_client, get_embargo_storage, get_storage
from dandiapi.api.models.asset import AssetBlob
from dandiapi.api.storage import get_boto_client, get_storage

if TYPE_CHECKING:
from collections.abc import Generator
Expand All @@ -25,64 +25,41 @@
LogBucket = str


def _bucket_objects_after(bucket: str, after: str | None) -> Generator[dict, None, None]:
if bucket not in {
settings.DANDI_DANDISETS_LOG_BUCKET_NAME,
settings.DANDI_DANDISETS_EMBARGO_LOG_BUCKET_NAME,
}:
raise ValueError(f'Non-log bucket: {bucket}')
embargoed = bucket == settings.DANDI_DANDISETS_EMBARGO_LOG_BUCKET_NAME
s3 = get_boto_client(get_storage() if not embargoed else get_embargo_storage())
def _bucket_objects_after(after: str | None) -> Generator[dict, None, None]:
s3 = get_boto_client(get_storage())
kwargs = {}
if after:
kwargs['StartAfter'] = after

paginator = s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket, **kwargs):
for page in paginator.paginate(Bucket=settings.DANDI_DANDISETS_LOG_BUCKET_NAME, **kwargs):
yield from page.get('Contents', [])


@shared_task(queue='s3-log-processing', soft_time_limit=60, time_limit=80)
def collect_s3_log_records_task(bucket: LogBucket) -> None:
def collect_s3_log_records_task() -> None:
"""Dispatch a task per S3 log file to process for download counts."""
if bucket not in {
settings.DANDI_DANDISETS_LOG_BUCKET_NAME,
settings.DANDI_DANDISETS_EMBARGO_LOG_BUCKET_NAME,
}:
raise RuntimeError
embargoed = bucket == settings.DANDI_DANDISETS_EMBARGO_LOG_BUCKET_NAME
after = ProcessedS3Log.objects.filter(embargoed=embargoed).aggregate(last_log=Max('name'))[
'last_log'
]
after = ProcessedS3Log.objects.aggregate(last_log=Max('name'))['last_log']

for s3_log_object in _bucket_objects_after(bucket, after):
process_s3_log_file_task.delay(bucket, s3_log_object['Key'])
for s3_log_object in _bucket_objects_after(after):
process_s3_log_file_task.delay(s3_log_object['Key'])


@shared_task(queue='s3-log-processing', soft_time_limit=120, time_limit=140)
def process_s3_log_file_task(bucket: LogBucket, s3_log_key: str) -> None:
def process_s3_log_file_task(s3_log_key: str) -> None:
"""
Process a single S3 log file for download counts.
Creates a ProcessedS3Log entry and updates the download counts for the relevant
asset blobs. Prevents duplicate processing with a unique constraint on the ProcessedS3Log name
and embargoed fields.
asset blobs. Prevents duplicate processing with a unique constraint on the ProcessedS3Log name.
"""
if bucket not in {
settings.DANDI_DANDISETS_LOG_BUCKET_NAME,
settings.DANDI_DANDISETS_EMBARGO_LOG_BUCKET_NAME,
}:
raise RuntimeError
embargoed = bucket == settings.DANDI_DANDISETS_EMBARGO_LOG_BUCKET_NAME

# short circuit if the log file has already been processed. note that this doesn't guarantee
# exactly once processing, that's what the unique constraint on ProcessedS3Log is for.
if ProcessedS3Log.objects.filter(name=s3_log_key.split('/')[-1], embargoed=embargoed).exists():
if ProcessedS3Log.objects.filter(name=s3_log_key.split('/')[-1]).exists():
return

s3 = get_boto_client(get_storage() if not embargoed else get_embargo_storage())
BlobModel = AssetBlob if not embargoed else EmbargoedAssetBlob # noqa: N806
data = s3.get_object(Bucket=bucket, Key=s3_log_key)
s3 = get_boto_client(get_storage())
data = s3.get_object(Bucket=settings.DANDI_DANDISETS_LOG_BUCKET_NAME, Key=s3_log_key)
download_counts = Counter()

for log_entry in s3logparse.parse_log_lines(
Expand All @@ -93,20 +70,20 @@ def process_s3_log_file_task(bucket: LogBucket, s3_log_key: str) -> None:

with transaction.atomic():
try:
log = ProcessedS3Log(name=s3_log_key.split('/')[-1], embargoed=embargoed)
log = ProcessedS3Log(name=s3_log_key.split('/')[-1])
# disable constraint validation checking so duplicate errors can be detected and
# ignored. the rest of the full_clean errors should still be raised.
log.full_clean(validate_constraints=False)
log.save()
except IntegrityError as e:
if 'unique_name_embargoed' in str(e):
logger.info('Already processed log file %s, embargo: %s', s3_log_key, embargoed)
if '_unique_name' in str(e):
logger.info('Already processed log file %s', s3_log_key)
return

# note this task is run serially per log file. this is to avoid the contention between
# multiple log files trying to update the same blobs. this serialization is enforced through
# the task queue configuration.
for blob, download_count in download_counts.items():
BlobModel.objects.filter(blob=blob).update(
AssetBlob.objects.filter(blob=blob).update(
download_count=F('download_count') + download_count
)
26 changes: 10 additions & 16 deletions dandiapi/analytics/tests/test_download_counts.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,7 @@

from dandiapi.analytics.models import ProcessedS3Log
from dandiapi.analytics.tasks import collect_s3_log_records_task, process_s3_log_file_task
from dandiapi.api.storage import (
create_s3_storage,
get_boto_client,
get_embargo_storage,
get_storage,
)
from dandiapi.api.storage import create_s3_storage, get_boto_client


@pytest.fixture()
Expand All @@ -20,8 +15,7 @@ def s3_log_bucket():

@pytest.fixture()
def s3_log_file(s3_log_bucket, asset_blob):
embargoed = s3_log_bucket == settings.DANDI_DANDISETS_EMBARGO_LOG_BUCKET_NAME
s3 = get_boto_client(get_storage() if not embargoed else get_embargo_storage())
s3 = get_boto_client()

log_file_name = '2019-02-06-00-00-38-5C5B0E0CA8F2B1B5'
s3.put_object(
Expand Down Expand Up @@ -51,34 +45,34 @@ def s3_log_file(s3_log_bucket, asset_blob):


@pytest.mark.django_db()
def test_processing_s3_log_files(s3_log_bucket, s3_log_file, asset_blob):
collect_s3_log_records_task(s3_log_bucket)
def test_processing_s3_log_files(s3_log_file, asset_blob):
collect_s3_log_records_task()
asset_blob.refresh_from_db()

assert ProcessedS3Log.objects.count() == 1
assert asset_blob.download_count == 1


@pytest.mark.django_db()
def test_processing_s3_log_files_idempotent(s3_log_bucket, s3_log_file, asset_blob):
def test_processing_s3_log_files_idempotent(s3_log_file, asset_blob):
# this tests that the outer task which collects the log files to process is
# idempotent, in other words, it uses StartAfter correctly.
collect_s3_log_records_task(s3_log_bucket)
collect_s3_log_records_task()
# run the task again, it should skip the existing log record
collect_s3_log_records_task(s3_log_bucket)
collect_s3_log_records_task()
asset_blob.refresh_from_db()

assert ProcessedS3Log.objects.count() == 1
assert asset_blob.download_count == 1


@pytest.mark.django_db()
def test_processing_s3_log_file_task_idempotent(s3_log_bucket, s3_log_file, asset_blob):
def test_processing_s3_log_file_task_idempotent(s3_log_file, asset_blob):
# this tests that the inner task which processes a single log file is
# idempotent, utilizing the unique constraint on ProcessedS3Log correctly.
process_s3_log_file_task(s3_log_bucket, s3_log_file)
process_s3_log_file_task(s3_log_file)
# run the task again, it should ignore the new log
process_s3_log_file_task(s3_log_bucket, s3_log_file)
process_s3_log_file_task(s3_log_file)
asset_blob.refresh_from_db()

assert ProcessedS3Log.objects.count() == 1
Expand Down
19 changes: 1 addition & 18 deletions dandiapi/api/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
Asset,
AssetBlob,
Dandiset,
EmbargoedAssetBlob,
Upload,
UserMetadata,
Version,
Expand Down Expand Up @@ -188,33 +187,17 @@ def get_queryset(self, request):
return super().get_queryset(request).prefetch_related('assets')


@admin.register(EmbargoedAssetBlob)
class EmbargoedAssetBlobAdmin(AssetBlobAdmin):
list_display = [
'id',
'blob_id',
'dandiset',
'blob',
'references',
'size',
'sha256',
'modified',
'created',
]


class AssetBlobInline(LimitedTabularInline):
model = AssetBlob


@admin.register(Asset)
class AssetAdmin(admin.ModelAdmin):
autocomplete_fields = ['blob', 'embargoed_blob', 'zarr', 'versions']
autocomplete_fields = ['blob', 'zarr', 'versions']
fields = [
'asset_id',
'path',
'blob',
'embargoed_blob',
'zarr',
'metadata',
'versions',
Expand Down
27 changes: 5 additions & 22 deletions dandiapi/api/asset_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,7 @@ def get_root_paths_many(versions: QuerySet[Version], *, join_assets=False) -> Qu
# Use prefetch_related here instead of select_related,
# as otherwise the resulting join is very large
if join_assets:
qs = qs.prefetch_related(
'asset',
'asset__blob',
'asset__embargoed_blob',
'asset__zarr',
)
qs = qs.prefetch_related('asset', 'asset__blob', 'asset__zarr')

return qs.filter(version__in=versions).exclude(path__contains='/').order_by('path')

Expand All @@ -51,12 +46,7 @@ def get_root_paths(version: Version) -> QuerySet[AssetPath]:
"""Return all root paths for a version."""
# Use prefetch_related here instead of select_related,
# as otherwise the resulting join is very large
qs = AssetPath.objects.prefetch_related(
'asset',
'asset__blob',
'asset__embargoed_blob',
'asset__zarr',
)
qs = AssetPath.objects.prefetch_related('asset', 'asset__blob', 'asset__zarr')
return qs.filter(version=version).exclude(path__contains='/').order_by('path')


Expand All @@ -73,12 +63,7 @@ def get_path_children(path: AssetPath, depth: int | None = 1) -> QuerySet[AssetP

path_ids = relation_qs.values_list('child', flat=True).distinct()
return (
AssetPath.objects.select_related(
'asset',
'asset__blob',
'asset__embargoed_blob',
'asset__zarr',
)
AssetPath.objects.select_related('asset', 'asset__blob', 'asset__zarr')
.filter(id__in=path_ids)
.order_by('path')
)
Expand Down Expand Up @@ -258,13 +243,11 @@ def add_version_asset_paths(version: Version):

# Get all aggregates
sizes = child_leaves.aggregate(
size=Coalesce(Sum('asset__blob__size'), 0),
esize=Coalesce(Sum('asset__embargoed_blob__size'), 0),
zsize=Coalesce(Sum('asset__zarr__size'), 0),
size=Coalesce(Sum('asset__blob__size'), 0), zsize=Coalesce(Sum('asset__zarr__size'), 0)
)

node.aggregate_files += child_leaves.count()
node.aggregate_size += sizes['size'] + sizes['esize'] + sizes['zsize']
node.aggregate_size += sizes['size'] + sizes['zsize']
node.save()


Expand Down
Binary file modified dandiapi/api/fixtures/playwright.json.xz
Binary file not shown.
Loading

0 comments on commit cef8fab

Please sign in to comment.