From 748684bfa003e1221a3715f9b8bd4cc268312739 Mon Sep 17 00:00:00 2001 From: Ryan Lim Date: Mon, 10 Jan 2022 18:20:01 +0000 Subject: [PATCH 01/10] add ability to modify diamond command --- .../idseq_utils/idseq_utils/diamond_scatter.py | 11 ++++++++--- .../idseq_utils/idseq_utils/run_diamond.py | 10 +++++++--- short-read-mngs/non_host_alignment.wdl | 3 ++- short-read-mngs/test/local_test_viral_minimap2.yml | 1 + 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py b/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py index 55677869..9c773720 100644 --- a/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py +++ b/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py @@ -50,6 +50,7 @@ def diamond_blastx( out: str, queries: Iterable[str], chunk=False, + diamond_args="", join_chunks=0, ): cmd = [ @@ -64,6 +65,7 @@ def diamond_blastx( database, "--out", out, + diamond_args, ] for query in queries: cmd += ["--query", query] @@ -132,7 +134,7 @@ def make_db(reference_fasta: str, output_dir: str, chunks: int): print(f"COMPLETED CHUNK {i}") -def blastx_chunk(db_chunk: str, output_dir: str, *query: str): +def blastx_chunk(db_chunk: str, output_dir: str, diamond_args: str, *query: str): try: os.mkdir(output_dir) except OSError as e: @@ -150,6 +152,7 @@ def blastx_chunk(db_chunk: str, output_dir: str, *query: str): database=abspath(db_chunk), out="out.tsv", chunk=True, + diamond_args = diamond_args, queries=(abspath(q) for q in query), ) ref_block_name = f"ref_block_{zero_pad(0, 6)}_{zero_pad(int(chunk), 6)}" @@ -216,11 +219,13 @@ def blastx_join(chunk_dir: str, out: str, *query: str): blastx_chunk_parser = subparsers.add_parser("blastx-chunk") blastx_chunk_parser.add_argument("--db", required=True) blastx_chunk_parser.add_argument("--out-dir", required=True) + blastx_chunk_parser.add_argument("--diamond-args", required=False) blastx_chunk_parser.add_argument("--query", required=True, action="append") blastx_chunks_parser = subparsers.add_parser("blastx-chunks") blastx_chunks_parser.add_argument("--db-dir", required=True) blastx_chunks_parser.add_argument("--out-dir", required=True) + blastx_chunks_parser.add_argument("--diamond-args", required=False) blastx_chunks_parser.add_argument("--query", required=True, action="append") blastx_join_parser = subparsers.add_parser("blastx-join") @@ -232,12 +237,12 @@ def blastx_join(chunk_dir: str, out: str, *query: str): if args.command == "make-db": make_db(args.__getattribute__("in"), args.db, args.chunks) elif args.command == "blastx-chunk": - blastx_chunk(args.db, args.out_dir, *args.query) + blastx_chunk(args.db, args.out_dir, args.diamond_args, *args.query) elif args.command == "blastx-chunks": def _blastx_chunk(db): print(f"STARTING: {db}") - res = blastx_chunk(join(args.db_dir, db), args.out_dir, *args.query) + res = blastx_chunk(join(args.db_dir, db), args.out_dir, args.diamond_args, *args.query) print(f"FINISHING: {db}") return res diff --git a/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py b/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py index 0d844a6c..b6a6461b 100644 --- a/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py +++ b/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py @@ -15,7 +15,7 @@ def _run_chunk( - chunk_id: int, input_dir: str, chunk_dir: str, db_chunk: str, *queries: str + chunk_id: int, input_dir: str, chunk_dir: str, db_chunk: str, diamond_args: str, *queries: str ): deployment_environment = os.environ["DEPLOYMENT_ENVIRONMENT"] priority_name = os.environ.get("PRIORITY_NAME", "normal") @@ -42,6 +42,10 @@ def _run_chunk( "name": "OUTPUT_DIR", "value": chunk_dir, }, + { + "name": "EXTRA_ARGS", + "value": diamond_args, + } ] for i, query in enumerate(queries): @@ -64,13 +68,13 @@ def _run_chunk( def run_diamond( - input_dir: str, chunk_dir: str, db_path: str, result_path: str, *queries: str + input_dir: str, chunk_dir: str, db_path: str, result_path: str, diamond_args, *queries: str ): parsed_url = urlparse(db_path, allow_fragments=False) bucket = parsed_url.netloc prefix = parsed_url.path.lstrip("/") chunks = ( - [chunk_id, input_dir, chunk_dir, f"s3://{bucket}/{db_chunk}", *queries] + [chunk_id, input_dir, chunk_dir, diamond_args, f"s3://{bucket}/{db_chunk}", *queries] for chunk_id, db_chunk in enumerate(_db_chunks(bucket, prefix)) ) with Pool(MAX_CHUNKS_IN_FLIGHT) as p: diff --git a/short-read-mngs/non_host_alignment.wdl b/short-read-mngs/non_host_alignment.wdl index be20daad..01782c02 100644 --- a/short-read-mngs/non_host_alignment.wdl +++ b/short-read-mngs/non_host_alignment.wdl @@ -211,7 +211,7 @@ task RunAlignment_diamond_out { set -euxo pipefail if [[ "~{run_locally}" == true ]]; then diamond makedb --in "~{local_diamond_index}" -d reference - diamond blastx -d reference -q "~{sep=' ' fastas}" -o "~{prefix}.m8" + diamond blastx -d reference -q "~{sep=' ' fastas}" -o "~{prefix}.m8" "~{diamond_args}" else export DEPLOYMENT_ENVIRONMENT=dev python3 < Date: Wed, 12 Jan 2022 22:19:33 +0000 Subject: [PATCH 02/10] add option for minimap2 in merge step --- .../idseq_utils/idseq_utils/minimap2_scatter.py | 8 ++++---- short-read-mngs/idseq_utils/idseq_utils/run_minimap2.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py b/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py index 46ec694f..e684083d 100644 --- a/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py +++ b/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py @@ -40,8 +40,8 @@ def minimap2_alignment(cwd, par_tmpdir, cpus, database, out, queries): raise Exception(f"Command failed: {' '.join(cmd)}") -def minimap2_merge_cmd(cwd, par_tmpdir, chunks, queries): - cmd = ["minimap2", "-cx", "sr", "--split-merge", "-o", f"{par_tmpdir}/out.paf"] +def minimap2_merge_cmd(cwd, par_tmpdir, chunks, minimap2_args, queries): + cmd = ["minimap2", minimap2_args, "--split-merge", "-o", f"{par_tmpdir}/out.paf"] for query in queries: cmd += [query] @@ -120,7 +120,7 @@ def minimap2_chunk(db_chunk: str, output_dir: str, *query: str): ) -def minimap2_merge(chunk_dir, out, *query): +def minimap2_merge(chunk_dir, out, minimap2_args, *query): chunk_dir = abspath(chunk_dir) with TemporaryDirectory() as tmp_dir: make_par_dir(tmp_dir, "par-tmp") @@ -134,7 +134,7 @@ def minimap2_merge(chunk_dir, out, *query): for f in os.listdir(chunk_dir): shutil.copy(join(chunk_dir, f), join(tmp_dir, "par-tmp", f)) chunks.append(join(tmp_dir, "par-tmp", f)) - minimap2_merge_cmd(tmp_dir, "par-tmp", chunks, query_tmp) + minimap2_merge_cmd(tmp_dir, "par-tmp", chunks, minimap2_args, query_tmp) shutil.copy(join(tmp_dir, "par-tmp", "out.paf"), out) diff --git a/short-read-mngs/idseq_utils/idseq_utils/run_minimap2.py b/short-read-mngs/idseq_utils/idseq_utils/run_minimap2.py index 8a5d4c6c..9a13912e 100644 --- a/short-read-mngs/idseq_utils/idseq_utils/run_minimap2.py +++ b/short-read-mngs/idseq_utils/idseq_utils/run_minimap2.py @@ -82,4 +82,4 @@ def run_minimap2( with Pool(MAX_CHUNKS_IN_FLIGHT) as p: p.starmap(_run_chunk, chunks) run(["s3parcp", "--recursive", chunk_dir, "chunks"], check=True) - minimap2_merge("chunks", result_path, *queries) + minimap2_merge("chunks", result_path, minimap2_args, *queries) From 66f2114849995b7bd396c4f9e029aa56b496ad8d Mon Sep 17 00:00:00 2001 From: Ryan Lim Date: Wed, 12 Jan 2022 23:41:58 +0000 Subject: [PATCH 03/10] fix order of variables --- short-read-mngs/idseq_utils/idseq_utils/run_diamond.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py b/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py index b6a6461b..73cb8142 100644 --- a/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py +++ b/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py @@ -74,7 +74,7 @@ def run_diamond( bucket = parsed_url.netloc prefix = parsed_url.path.lstrip("/") chunks = ( - [chunk_id, input_dir, chunk_dir, diamond_args, f"s3://{bucket}/{db_chunk}", *queries] + [chunk_id, input_dir, chunk_dir, f"s3://{bucket}/{db_chunk}", diamond_args, *queries] for chunk_id, db_chunk in enumerate(_db_chunks(bucket, prefix)) ) with Pool(MAX_CHUNKS_IN_FLIGHT) as p: From 771ed9a7641ff28e5e05f079c0069a32fc7f85a4 Mon Sep 17 00:00:00 2001 From: Ryan Lim Date: Thu, 13 Jan 2022 01:42:38 +0000 Subject: [PATCH 04/10] fix minimap2 args --- short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py b/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py index e684083d..e67b0fdb 100644 --- a/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py +++ b/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py @@ -1,5 +1,6 @@ import os import shutil +import shlex import sys import errno from argparse import ArgumentParser @@ -41,7 +42,9 @@ def minimap2_alignment(cwd, par_tmpdir, cpus, database, out, queries): def minimap2_merge_cmd(cwd, par_tmpdir, chunks, minimap2_args, queries): - cmd = ["minimap2", minimap2_args, "--split-merge", "-o", f"{par_tmpdir}/out.paf"] + + cmd = ["minimap2", "--split-merge", "-o", f"{par_tmpdir}/out.paf"] + cmd.extend(shlex.split(minimap2_args)) for query in queries: cmd += [query] From db914958d4551f9e0927a58a501195dc6ec13ead Mon Sep 17 00:00:00 2001 From: Ryan Lim Date: Thu, 13 Jan 2022 02:19:09 +0000 Subject: [PATCH 05/10] lint --- short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py | 2 +- short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py b/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py index 9c773720..96dbe5b6 100644 --- a/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py +++ b/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py @@ -152,7 +152,7 @@ def blastx_chunk(db_chunk: str, output_dir: str, diamond_args: str, *query: str) database=abspath(db_chunk), out="out.tsv", chunk=True, - diamond_args = diamond_args, + diamond_args=diamond_args, queries=(abspath(q) for q in query), ) ref_block_name = f"ref_block_{zero_pad(0, 6)}_{zero_pad(int(chunk), 6)}" diff --git a/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py b/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py index e67b0fdb..eaac2180 100644 --- a/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py +++ b/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py @@ -42,7 +42,6 @@ def minimap2_alignment(cwd, par_tmpdir, cpus, database, out, queries): def minimap2_merge_cmd(cwd, par_tmpdir, chunks, minimap2_args, queries): - cmd = ["minimap2", "--split-merge", "-o", f"{par_tmpdir}/out.paf"] cmd.extend(shlex.split(minimap2_args)) for query in queries: From c207934f5661c22a73c31426aa9738c59e53a6b6 Mon Sep 17 00:00:00 2001 From: Ryan Lim Date: Thu, 13 Jan 2022 03:38:03 +0000 Subject: [PATCH 06/10] add inputs --- short-read-mngs/idseq_utils/idseq_utils/run_diamond.py | 4 +++- short-read-mngs/non_host_alignment.wdl | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py b/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py index 73cb8142..0990eebf 100644 --- a/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py +++ b/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py @@ -28,6 +28,7 @@ def _run_chunk( else: project_id, sample_id = "0", "0" + print(db_chunk, diamond_args) job_name = (f"idseq-{deployment_environment}-{alignment_algorithm}-" f"project-{project_id}-sample-{sample_id}-part-{chunk_id}") job_queue = f"idseq-{deployment_environment}-{alignment_algorithm}-{provisioning_model}-{priority_name}" @@ -48,6 +49,7 @@ def _run_chunk( } ] + print(environment) for i, query in enumerate(queries): environment.append( { @@ -68,7 +70,7 @@ def _run_chunk( def run_diamond( - input_dir: str, chunk_dir: str, db_path: str, result_path: str, diamond_args, *queries: str + input_dir: str, chunk_dir: str, db_path: str, result_path: str, diamond_args: str, *queries: str ): parsed_url = urlparse(db_path, allow_fragments=False) bucket = parsed_url.netloc diff --git a/short-read-mngs/non_host_alignment.wdl b/short-read-mngs/non_host_alignment.wdl index 01782c02..2ff987c7 100644 --- a/short-read-mngs/non_host_alignment.wdl +++ b/short-read-mngs/non_host_alignment.wdl @@ -414,7 +414,7 @@ workflow idseq_non_host_alignment { String minimap2_db = "s3://idseq-public-references/minimap2-test/2021-01-22/nt_k12_w8_20/" String diamond_db = "s3://idseq-public-references/diamond-test/2021-01-22/" String minimap2_args = "-cx sr --secondary=yes" - String diamond_args = "" + String diamond_args = "--sensitive" String minimap2_prefix = "gsnap" String diamond_prefix = "rapsearch2" From ee9f9d4439b2db80e00b59c240fed87e86176256 Mon Sep 17 00:00:00 2001 From: Ryan Lim Date: Thu, 13 Jan 2022 04:40:02 +0000 Subject: [PATCH 07/10] add logging --- short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py | 4 +++- short-read-mngs/idseq_utils/idseq_utils/run_diamond.py | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py b/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py index eaac2180..47060191 100644 --- a/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py +++ b/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py @@ -42,8 +42,8 @@ def minimap2_alignment(cwd, par_tmpdir, cpus, database, out, queries): def minimap2_merge_cmd(cwd, par_tmpdir, chunks, minimap2_args, queries): + print("Starting minimap2 merge", file=sys.stderr) cmd = ["minimap2", "--split-merge", "-o", f"{par_tmpdir}/out.paf"] - cmd.extend(shlex.split(minimap2_args)) for query in queries: cmd += [query] @@ -53,6 +53,8 @@ def minimap2_merge_cmd(cwd, par_tmpdir, chunks, minimap2_args, queries): for chunk in chunks: cmd += [f"{chunk}"] + cmd.extend(shlex.split(minimap2_args)) + res = run(cmd, cwd=cwd, stdout=PIPE, stderr=PIPE) if res.returncode != 0: for line in res.stderr.decode().split("\n"): diff --git a/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py b/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py index 0990eebf..f7416c55 100644 --- a/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py +++ b/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py @@ -5,7 +5,7 @@ import logging from urllib.parse import urlparse from multiprocessing import Pool - +import sys from idseq_utils.diamond_scatter import blastx_join from idseq_utils.batch_run_helpers import _run_batch_job, _db_chunks @@ -28,7 +28,7 @@ def _run_chunk( else: project_id, sample_id = "0", "0" - print(db_chunk, diamond_args) + print(db_chunk, diamond_args, file=sys.stderr) job_name = (f"idseq-{deployment_environment}-{alignment_algorithm}-" f"project-{project_id}-sample-{sample_id}-part-{chunk_id}") job_queue = f"idseq-{deployment_environment}-{alignment_algorithm}-{provisioning_model}-{priority_name}" @@ -49,7 +49,7 @@ def _run_chunk( } ] - print(environment) + print(environment, file=sys.stderr) for i, query in enumerate(queries): environment.append( { From f7300f1fa932a2bab73b28cee3455e32da27fd9a Mon Sep 17 00:00:00 2001 From: Ryan Lim Date: Thu, 13 Jan 2022 07:33:00 +0000 Subject: [PATCH 08/10] fix diamond args --- short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py | 2 +- short-read-mngs/non_host_alignment.wdl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py b/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py index 96dbe5b6..fa243b64 100644 --- a/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py +++ b/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py @@ -65,7 +65,7 @@ def diamond_blastx( database, "--out", out, - diamond_args, + f"--{diamond_args}", ] for query in queries: cmd += ["--query", query] diff --git a/short-read-mngs/non_host_alignment.wdl b/short-read-mngs/non_host_alignment.wdl index 2ff987c7..dfef18dd 100644 --- a/short-read-mngs/non_host_alignment.wdl +++ b/short-read-mngs/non_host_alignment.wdl @@ -414,7 +414,7 @@ workflow idseq_non_host_alignment { String minimap2_db = "s3://idseq-public-references/minimap2-test/2021-01-22/nt_k12_w8_20/" String diamond_db = "s3://idseq-public-references/diamond-test/2021-01-22/" String minimap2_args = "-cx sr --secondary=yes" - String diamond_args = "--sensitive" + String diamond_args = "sensitive" String minimap2_prefix = "gsnap" String diamond_prefix = "rapsearch2" From 5bad9b123eaf9b6215937a5af2d6cdd7bec7c0e4 Mon Sep 17 00:00:00 2001 From: Ryan Lim Date: Thu, 13 Jan 2022 17:38:27 +0000 Subject: [PATCH 09/10] fix merge args --- short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py b/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py index fa243b64..f37bfc16 100644 --- a/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py +++ b/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py @@ -176,7 +176,7 @@ def mock_reference_fasta(chunks: int, chunk_size: int): i += 1 -def blastx_join(chunk_dir: str, out: str, *query: str): +def blastx_join(chunk_dir: str, out: str, diamond_args: str, *query: str): with TemporaryDirectory() as tmp_dir: make_par_dir(tmp_dir, "par-tmp") with open(join(tmp_dir, "par-tmp", f"join_todo_{zero_pad(0, 6)}"), "w") as f: @@ -197,6 +197,7 @@ def blastx_join(chunk_dir: str, out: str, *query: str): database=db.name, out=out, join_chunks=chunks, + diamond_args=diamond_args, queries=(abspath(q) for q in query), ) @@ -249,4 +250,4 @@ def _blastx_chunk(db): with Pool(48) as p: p.map(_blastx_chunk, os.listdir(args.db_dir)) elif args.command == "blastx-join": - blastx_join(args.chunk_dir, args.out, *args.query) + blastx_join(args.chunk_dir, args.out, args.diamond_args, *args.query) From 0cef894ba73e0262127a84365b73ca9db02ca3fc Mon Sep 17 00:00:00 2001 From: Ryan Lim Date: Thu, 13 Jan 2022 20:04:12 +0000 Subject: [PATCH 10/10] fix input --- short-read-mngs/idseq_utils/idseq_utils/run_diamond.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py b/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py index f7416c55..ee911f16 100644 --- a/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py +++ b/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py @@ -82,4 +82,4 @@ def run_diamond( with Pool(MAX_CHUNKS_IN_FLIGHT) as p: p.starmap(_run_chunk, chunks) run(["s3parcp", "--recursive", chunk_dir, "chunks"], check=True) - blastx_join("chunks", result_path, *queries) + blastx_join("chunks", result_path, diamond_args, *queries)