Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter: Improve speed of --output-strains and --output-metadata #1469

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions augur/filter/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from augur.io.vcf import is_vcf as filename_is_vcf, write_vcf
from augur.types import EmptyOutputReportingMethod
from . import include_exclude_rules
from .io import cleanup_outputs, get_useful_metadata_columns, read_priority_scores, write_metadata_based_outputs
from .io import cleanup_outputs, get_useful_metadata_columns, read_priority_scores, write_metadata, write_strains
from .include_exclude_rules import apply_filters, construct_filters
from .subsample import PriorityQueue, TooManyGroupsError, calculate_sequences_per_group, create_queues_by_group, get_groups_for_subsampling

Expand Down Expand Up @@ -398,10 +398,13 @@ def run(args):
# Update the set of available sequence strains.
sequence_strains = observed_sequence_strains

if args.output_metadata or args.output_strains:
write_metadata_based_outputs(args.metadata, args.metadata_delimiters,
args.metadata_id_columns, args.output_metadata,
args.output_strains, valid_strains)
if args.output_metadata:
write_metadata(args.metadata, args.metadata_delimiters,
args.metadata_id_columns, args.output_metadata,
valid_strains)

if args.output_strains:
write_strains(args.output_strains, valid_strains)

# Calculate the number of strains that don't exist in either metadata or
# sequences.
Expand Down
111 changes: 75 additions & 36 deletions augur/filter/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@
from argparse import Namespace
import os
import re
from shutil import which
from subprocess import Popen, PIPE
from tempfile import mkstemp
from textwrap import dedent
from typing import Sequence, Set
from typing import Iterable, Sequence, Set
import numpy as np
from collections import defaultdict
from xopen import xopen
from shutil import copyfileobj

from augur.errors import AugurError
from augur.io.file import open_file
from augur.io.metadata import Metadata, METADATA_DATE_COLUMN
from augur.io.print import print_err
from augur.write_file import BUFFER_SIZE
from .constants import GROUP_BY_GENERATED_COLUMNS
from .include_exclude_rules import extract_variables, parse_filter_query

Expand Down Expand Up @@ -96,46 +101,80 @@ def constant_factory(value):
raise AugurError(f"missing or malformed priority scores file {fname}")


def write_metadata_based_outputs(input_metadata_path: str, delimiters: Sequence[str],
id_columns: Sequence[str], output_metadata_path: str,
output_strains_path: str, ids_to_write: Set[str]):
def get_cat(file):
if file.endswith(".gz"):
return which("gzcat")
if file.endswith(".xz"):
return which("xzcat")
if file.endswith(".zst"):
return which("zstdcat")
else:
return which("cat")


def write_metadata(input_metadata_path: str, delimiters: Sequence[str],
id_columns: Sequence[str], output_metadata_path: str,
ids_to_write: Set[str]):
"""
Write output metadata and/or strains file given input metadata information
Write output metadata file given input metadata information
and a set of IDs to write.
"""
input_metadata = Metadata(input_metadata_path, delimiters, id_columns)

# Handle all outputs with one pass of metadata. This requires using
# conditionals both outside of and inside the loop through metadata rows.

# Make these conditionally set variables available at this scope.
output_metadata_handle = None
output_metadata = None
output_strains = None

# Set up output streams.
if output_metadata_path:
output_metadata_handle = xopen(output_metadata_path, "w", newline="")
output_metadata = csv.DictWriter(output_metadata_handle, fieldnames=input_metadata.columns,
delimiter="\t", lineterminator=os.linesep)
output_metadata.writeheader()
if output_strains_path:
output_strains = open(output_strains_path, "w")

# Write outputs based on rows in the original metadata.
for row in input_metadata.rows():
row_id = row[input_metadata.id_column]
if row_id in ids_to_write:
if output_metadata:
output_metadata.writerow(row)
if output_strains:
output_strains.write(row_id + '\n')

# Close file handles.
if output_metadata_handle:
output_metadata_handle.close()
if output_strains:
output_strains.close()
tsv_join = which("tsv-join")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using tsv-utils/tsv-join in Augur

@tsibley and I chatted about this yesterday. Two options:

  1. Detect tsv-join in the environment and use it if available. Otherwise, fall back to the Python approach. Maintenance and additional testing on both code paths would be necessary in this case. This is effectively the same approach as current invocation of fasttree/raxml/iqtree/vcftools/etc. except those are explicitly requested by the user while tsv-join could be detected and used automatically as a faster alternative to the Python approach.
  2. We could bundle tsv-join as part of Augur to avoid the the downsides of (1). Based on the latest release v2.2.1, I thought tsv-utils only distributed binaries for macOS, but it looks like previous versions distribute binaries for both Linux and macOS (and this is how it's advertised). I think we can get away with using an older version.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could bundle tsv-join as part of Augur to avoid the the downsides of (1).

Last I checked tsv-utils wasn't available for osx-arm64. It may be something we could fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@victorlin This is a clever solution and the speed-up you observe with ncov data suggests it's worth pursuing! Regarding:

We could bundle tsv-join as part of Augur to avoid the the downsides of (1).

This seems like the best way to provide this better experience to the most users and follows the pattern of bundling other third-party tools like you mention above.

At first, I liked the idea of tsv-utils being an implementation detail that users don't have to know about, but I wonder about the user experience for people who don't have tsv-utils installed and don't realize why the same command runs slower than in an environment where tsv-utils is available. What if we provided some warning when tsv-utils isn't available to alert users that we are using the fallback implementation? Is there a potential cost to exposing the implementation detail that outweighs the benefit of letting users know they could speed up their filters by installing tsv-utils?

Copy link
Member Author

@victorlin victorlin May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[bundling] seems like the best way to provide this better experience to the most users

I'm wary of the extra work required to figure out how to properly bundle tsv-join with Augur. I'd argue that the best way to provide this experience is already accomplished by including tsv-join in the managed runtimes.

[bundling] follows the pattern of bundling other third-party tools like you mention above.

Oh, I meant that we don't bundle any other third-party tools currently so this would be a new approach.

What if we provided some warning when tsv-utils isn't available to alert users that we are using the fallback implementation? Is there a potential cost to exposing the implementation detail that outweighs the benefit of letting users know they could speed up their filters by installing tsv-utils?

Great point - I think this will be the easiest way to push the feature through:

  • don't bundle
  • use tsv-join if it's available
  • use Python fallback with a warning to consider downloading tsv-join in the environment if experiencing slowness

We can still consider bundling in a future version.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last I checked tsv-utils wasn't available for osx-arm64. It may be something we could fix.

Cornelius has made this available in conda-forge. Note that bioconda's tsv-utils still does not support osx-arm64.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All bioconda environments always use conda-forge preferentially (if correctly configured) so the migration from bioconda -> conda-forge is not an issue. conda-base uses the conda-forge one seamlessly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tsv-utils is built from source over at conda-forge, so it's available for more platforms than the pre-built binaries. linux-aarch64 and osx-arm64 don't have pre-built binaries, but conda-forge has them now.

cat = get_cat(input_metadata_path)

if tsv_join and cat:
# Write the IDs to a temporary single-column TSV file for tsv-join
strains_fd, strains_path = mkstemp(prefix="augur-filter-", suffix=".tsv")
os.close(strains_fd)
with open(strains_path, "w") as f:
f.write(input_metadata.id_column + '\n')
for strain in ids_to_write:
f.write(strain + '\n')

# Open a process to stream the input metadata as text (handling compression)
cat_args = [cat, input_metadata_path]
cat_process = Popen(cat_args, stdout=PIPE)

# Use tsv-join to subset the input metadata by the IDs in the temporary file
tsv_join_args = [
tsv_join,
'-H',
'--filter-file', strains_path,
'--key-fields', input_metadata.id_column,
]

with xopen(output_metadata_path, "wb") as output_metadata_handle:
tsv_join_process = Popen(tsv_join_args, stdin=cat_process.stdout, stdout=PIPE)
# Ignore mypy error: https://github.com/python/mypy/issues/14943
copyfileobj(tsv_join_process.stdout, output_metadata_handle, BUFFER_SIZE) # type: ignore
tsv_join_process.wait()
if tsv_join_process.returncode != 0:
raise AugurError(f"Failed to output metadata: {tsv_join_process.stderr.read().decode().strip()}")

# Remove the temporary file
os.unlink(strains_path)
else:
with xopen(output_metadata_path, "w") as output_metadata_handle:
output_metadata = csv.DictWriter(output_metadata_handle, fieldnames=input_metadata.columns,
delimiter="\t", lineterminator=os.linesep)
output_metadata.writeheader()

# Write outputs based on rows in the original metadata.
for row in input_metadata.rows():
row_id = row[input_metadata.id_column]
if row_id in ids_to_write:
output_metadata.writerow(row)


def write_strains(output_strains_path: str, ids_to_write: Iterable[str]):
"""
Write set of strains to a plain text file, one ID per row.
"""
with open(output_strains_path, "w") as f:
for strain in sorted(ids_to_write):
f.write(strain + '\n')


# These are the types accepted in the following function.
Expand Down
12 changes: 6 additions & 6 deletions tests/functional/filter/cram/filter-output-contents.t
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,20 @@ Check that the row for a strain is identical between input and output metadata.
> <(grep -F "$strain" "$TESTDIR/../data/metadata.tsv") \
> <(grep -F "$strain" filtered_metadata.tsv)

Check the order of strains in the filtered strains file.
The strains in the filtered strains file should be sorted alphabetically.

$ cat filtered_strains.txt
BRA/2016/FC_6706
Colombia/2016/ZC204Se
ZKC2/2016
DOM/2016/BB_0059
BRA/2016/FC_6706
EcEs062_16
ZKC2/2016

Check that the order of strains in the metadata is the same as above.
Check that the same strains are on both outputs.

$ diff \
> <(cat filtered_strains.txt) \
> <(tail -n+2 filtered_metadata.tsv | cut -f 1)
> <(sort filtered_strains.txt) \
> <(tail -n+2 filtered_metadata.tsv | cut -f 1 | sort)

Check the order of strains in the FASTA sequence output.

Expand Down
29 changes: 29 additions & 0 deletions tests/functional/filter/cram/filter-output-metadata-compressed.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
Setup

$ source "$TESTDIR"/_setup.sh

Use the same options with 3 different compression methods.

$ ${AUGUR} filter \
> --metadata "$TESTDIR/../data/metadata.tsv" \
> --subsample-max-sequences 5 \
> --subsample-seed 0 \
> --output-metadata filtered_metadata.tsv.gz 2>/dev/null

$ ${AUGUR} filter \
> --metadata "$TESTDIR/../data/metadata.tsv" \
> --subsample-max-sequences 5 \
> --subsample-seed 0 \
> --output-metadata filtered_metadata.tsv.xz 2>/dev/null

$ ${AUGUR} filter \
> --metadata "$TESTDIR/../data/metadata.tsv" \
> --subsample-max-sequences 5 \
> --subsample-seed 0 \
> --output-metadata filtered_metadata.tsv.zst 2>/dev/null

# The uncompressed outputs are identical.

$ diff <(gzcat filtered_metadata.tsv.gz) <(xzcat filtered_metadata.tsv.xz)

$ diff <(gzcat filtered_metadata.tsv.gz) <(zstdcat filtered_metadata.tsv.zst)
2 changes: 2 additions & 0 deletions tests/functional/filter/cram/filter-output-metadata-header.t
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ the default quotechar, any column names with that character may be altered.

Quoted columns containing the tab delimiter are left unchanged.

# FIXME: tsv-join has different behavior here. Test both?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These differences should be tested more before we use this as default behavior across pathogen workflows (and others start using it too). Maybe we can start by releasing this as an opt-in "beta" e.g. --output-metadata-attempt-tsv-utils.


$ cat >metadata.tsv <<~~
> strain "col 1"
> SEQ_1 a
Expand Down
Loading