From c5f1dc3cfe13a6ea2aff5ff1edf065d4a490e1dd Mon Sep 17 00:00:00 2001 From: Jarrod Baker Date: Thu, 10 Feb 2022 21:31:56 +0000 Subject: [PATCH 1/6] Add variant index configuration --- src/main/resources/reference.conf | 24 +++++++++++++++++++ .../etl/backend/Configuration.scala | 23 +++++++++++++++++- 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 7d4c53eb..cbf70420 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -1178,3 +1178,27 @@ otarproject { path = ${common.output}"/otar_projects" } } + +genetics { + release = "22.02.2" + output = "gs://genetics-portal-dev-data/"${genetics.release}"/outputs" + input = "gs://genetics-portal-dev-data/"${genetics.release}"/inputs" +} + +variant { + excluded-biotypes = [] + tss-distance = 5000000 + inputs { + variant-annotation { + path = ${genetics.input}"/variant-annotation/variant-annotation.parquet" + format = "parquet" + } + target-index = ${target.outputs.target} + } + outputs { + variants { + path = ${genetics.output}"/variant-index/" + format = ${common.output-format} + } + } +} diff --git a/src/main/scala/io/opentargets/etl/backend/Configuration.scala b/src/main/scala/io/opentargets/etl/backend/Configuration.scala index 2e8625ab..a5051029 100644 --- a/src/main/scala/io/opentargets/etl/backend/Configuration.scala +++ b/src/main/scala/io/opentargets/etl/backend/Configuration.scala @@ -259,6 +259,26 @@ object Configuration extends LazyLogging { ) // --- END --- // + // --- Genetics start --- // + case class Genetics(release: String, output: String, input: String) + + case class VariantInputs( + variantAnnotation: IOResourceConfig, + targetIndex: IOResourceConfig + ) + + case class VariantOutputs( + variants: IOResourceConfig + ) + + case class Variants( + excludedBiotypes: List[String], + tssDistance: Long, + inputs: VariantInputs, + outputs: VariantOutputs + ) + // --- Genetics end --- // + case class EtlStep[T](step: T, dependencies: List[T]) case class EtlDagConfig(steps: List[EtlStep[String]], resolve: Boolean) @@ -283,6 +303,7 @@ object Configuration extends LazyLogging { expression: ExpressionSection, openfda: OpenfdaSection, ebisearch: EBISearchSection, - otarproject: OtarProjectSection + otarproject: OtarProjectSection, + variant: Variants ) } From 2c1cb03919b2a0815e01f411098a46be95a2b316 Mon Sep 17 00:00:00 2001 From: Jarrod Baker Date: Mon, 14 Feb 2022 16:08:29 +0000 Subject: [PATCH 2/6] Adding variant implementation --- src/main/resources/reference.conf | 16 ++- src/main/scala/io/opentargets/etl/Main.scala | 3 + .../io/opentargets/etl/backend/Variant.scala | 129 ++++++++++++++++++ 3 files changed, 147 insertions(+), 1 deletion(-) create mode 100644 src/main/scala/io/opentargets/etl/backend/Variant.scala diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index cbf70420..bea0ef18 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -1186,7 +1186,21 @@ genetics { } variant { - excluded-biotypes = [] + excluded-biotypes = [ + "3prime_overlapping_ncRNA", + "antisense", + "bidirectional_promoter_lncRNA", + "IG_C_gene", + "IG_D_gene", + "IG_J_gene", + "IG_V_gene", + "lincRNA", + "macro_lncRNA", + "non_coding", + "protein_coding", + "sense_intronic", + "sense_overlapping" + ] tss-distance = 5000000 inputs { variant-annotation { diff --git a/src/main/scala/io/opentargets/etl/Main.scala b/src/main/scala/io/opentargets/etl/Main.scala index 8818b113..b9bc6194 100644 --- a/src/main/scala/io/opentargets/etl/Main.scala +++ b/src/main/scala/io/opentargets/etl/Main.scala @@ -68,6 +68,9 @@ object ETL extends LazyLogging { case "targetValidation" => logger.info("run step targetValidation") TargetValidation() + case "variantIndex" => + logger.info("run step variant-index (genetics)") + Variant() case _ => logger.warn(s"step $step is unknown so nothing to execute") } logger.info(s"finished to run step ($step)") diff --git a/src/main/scala/io/opentargets/etl/backend/Variant.scala b/src/main/scala/io/opentargets/etl/backend/Variant.scala new file mode 100644 index 00000000..5a94bcc0 --- /dev/null +++ b/src/main/scala/io/opentargets/etl/backend/Variant.scala @@ -0,0 +1,129 @@ +package io.opentargets.etl.backend + +import com.typesafe.scalalogging.LazyLogging +import io.opentargets.etl.backend.spark.{IOResource, IoHelpers} +import io.opentargets.etl.backend.spark.IoHelpers.IOResources +import org.apache.spark.sql.expressions.Aggregator +import org.apache.spark.sql.functions.{abs, col, lit, min, percent_rank, round, udaf, when} +import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession, functions} + +/** UDAF to find the ENSG ID closest to a variant by distance. First argument should be an ENSG ID, the second is the + * distance. + * */ +object ClosestGeneId extends Aggregator[(String, Long), (String, Long), String] { + type GeneAndDistance = (String, Long) + + def zero: GeneAndDistance = ("", Long.MaxValue) + + def reduce(buffer: GeneAndDistance, nxt: GeneAndDistance): GeneAndDistance = { + if (buffer._2 < nxt._2) buffer else nxt + } + + def merge(b1: GeneAndDistance, b2: GeneAndDistance): GeneAndDistance = reduce(b1, b2) + + def finish(reduction: GeneAndDistance): String = reduction._1 + + def outputEncoder: Encoder[String] = Encoders.STRING + + def bufferEncoder: Encoder[(String, Long)] = Encoders.product +} + +object Variant extends LazyLogging { + + def apply()(implicit context: ETLSessionContext): IOResources = { + + logger.info("Executing Variant step.") + implicit val ss: SparkSession = context.sparkSession + + val closestGeneId = udaf(ClosestGeneId) + ss.udf.register("closest_gene", closestGeneId) + + val variantConfiguration = context.configuration.variant + + logger.info(s"Configuration for Variant: $variantConfiguration") + + val mappedInputs = Map( + "variants" -> variantConfiguration.inputs.variantAnnotation, + "targets" -> variantConfiguration.inputs.targetIndex + ) + val inputs = IoHelpers.readFrom(mappedInputs) + + val variantRawDf: DataFrame = inputs("variants").data + val targetRawDf: DataFrame = inputs("targets").data + + val approvedBioTypes = variantConfiguration.excludedBiotypes.toSet + val excludedChromosomes: Set[String] = Set("MT") + logger.info("Generate target DF for variant index.") + val targetDf = targetRawDf + .select( + col("id") as "gene_id", + col("genomicLocation.*"), + col("biotype"), + when(col("genomicLocation.strand") > 0, col("genomicLocation.start")) + .otherwise("genomicLocation.end") as "tss" + ) + .filter( + (col("biotype") isInCollection approvedBioTypes) && !(col("chromosome") isInCollection excludedChromosomes)) + + logger.info("Generate protein coding DF for variant index.") + val proteinCodingDf = targetDf.filter(col("biotype") === "protein_coding") + + logger.info("Generate variant DF for variant index.") + val variantDf = variantRawDf + .filter(col("chrom_b38").isNotNull && col("pos_b38").isNotNull) + .select( + col("chrom_b37") as "chr_id_b37", + col("pos_b37") as "position_b37", + col("chrom_b38") as "chr_id", + col("pos_b38") as "position", + col("ref") as "ref_allele", + col("alt") as "alt_allele", + col("rsid") as "rs_id", + col("vep.most_severe_consequence") as "most_severe_consequence", + col("cadd") as "cadd", + col("af") as "af", + ) + + def variantGeneDistance(target: DataFrame) = + variantDf + .join( + target, + (col("chr_id") === col("chromosome")) && (abs(col("position") - col("tss")) <= variantConfiguration.tssDistance)) + .withColumn("d", abs(col("position") - col("tss"))) + + logger.info("Calculate distance score for variant to gene.") + val variantGeneDistanceDf = variantGeneDistance(targetDf) + val variantPcDistanceDf = variantGeneDistance(proteinCodingDf) + + // these four components uniquely identify a variant + val variantId = Seq("chr_id", "position", "ref_allele", "alt_allele") + + logger.info("Rank variant scores by distance") + val variantGeneScored = variantGeneDistanceDf + .groupBy(col("chr_id"), col("position"), col("ref_allele"), col("alt_allele")) + .agg(closestGeneId(col("gene_id"), col("d")) as "gene_id_any", + min(col("d")) as "gene_id_any_distance") + + val variantPcScored = variantPcDistanceDf + .groupBy( + col("chr_id"), + col("position"), + col("ref_allele"), + col("alt_allele"), + ) + .agg(closestGeneId(col("gene_id"), col("d")) as "gene_id_prot_coding", + min(col("d")) as "gene_id_prot_coding_distance") + + logger.info("Join scored distances variants and scored protein coding.") + val vgDistances = variantGeneScored.join(variantPcScored, variantId, "full_outer") + + logger.info("Join distances to variants.") + val variantIndex = variantDf.join(vgDistances, variantId, "left_outer") + + val outputs = Map( + "variant" -> IOResource(variantIndex, variantConfiguration.outputs.variants) + ) + logger.info("Write variant index outputs.") + IoHelpers.writeTo(outputs) + } +} From e3f43bff1b1a7f8b63235cf63c7040edd8e2a5b4 Mon Sep 17 00:00:00 2001 From: Jarrod Baker Date: Mon, 14 Feb 2022 16:08:29 +0000 Subject: [PATCH 3/6] Adding variant implementation --- src/main/resources/reference.conf | 16 ++- src/main/scala/io/opentargets/etl/Main.scala | 3 + .../io/opentargets/etl/backend/Variant.scala | 130 ++++++++++++++++++ 3 files changed, 148 insertions(+), 1 deletion(-) create mode 100644 src/main/scala/io/opentargets/etl/backend/Variant.scala diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index cbf70420..bea0ef18 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -1186,7 +1186,21 @@ genetics { } variant { - excluded-biotypes = [] + excluded-biotypes = [ + "3prime_overlapping_ncRNA", + "antisense", + "bidirectional_promoter_lncRNA", + "IG_C_gene", + "IG_D_gene", + "IG_J_gene", + "IG_V_gene", + "lincRNA", + "macro_lncRNA", + "non_coding", + "protein_coding", + "sense_intronic", + "sense_overlapping" + ] tss-distance = 5000000 inputs { variant-annotation { diff --git a/src/main/scala/io/opentargets/etl/Main.scala b/src/main/scala/io/opentargets/etl/Main.scala index 8818b113..b9bc6194 100644 --- a/src/main/scala/io/opentargets/etl/Main.scala +++ b/src/main/scala/io/opentargets/etl/Main.scala @@ -68,6 +68,9 @@ object ETL extends LazyLogging { case "targetValidation" => logger.info("run step targetValidation") TargetValidation() + case "variantIndex" => + logger.info("run step variant-index (genetics)") + Variant() case _ => logger.warn(s"step $step is unknown so nothing to execute") } logger.info(s"finished to run step ($step)") diff --git a/src/main/scala/io/opentargets/etl/backend/Variant.scala b/src/main/scala/io/opentargets/etl/backend/Variant.scala new file mode 100644 index 00000000..40b4d20c --- /dev/null +++ b/src/main/scala/io/opentargets/etl/backend/Variant.scala @@ -0,0 +1,130 @@ +package io.opentargets.etl.backend + +import com.typesafe.scalalogging.LazyLogging +import io.opentargets.etl.backend.spark.{IOResource, IoHelpers} +import io.opentargets.etl.backend.spark.IoHelpers.IOResources +import org.apache.spark.sql.expressions.Aggregator +import org.apache.spark.sql.functions.{abs, col, min, udaf, when} +import org.apache.spark.sql.types.LongType +import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession, functions} + +/** UDAF to find the ENSG ID closest to a variant by distance. First argument should be an ENSG ID, the second is the + * distance. + * */ +object ClosestGeneId extends Aggregator[(String, Long), (String, Long), String] { + type GeneAndDistance = (String, Long) + + def zero: GeneAndDistance = ("", Long.MaxValue) + + def reduce(buffer: GeneAndDistance, nxt: GeneAndDistance): GeneAndDistance = { + if (buffer._2 < nxt._2) buffer else nxt + } + + def merge(b1: GeneAndDistance, b2: GeneAndDistance): GeneAndDistance = reduce(b1, b2) + + def finish(reduction: GeneAndDistance): String = reduction._1 + + def outputEncoder: Encoder[String] = Encoders.STRING + + def bufferEncoder: Encoder[(String, Long)] = Encoders.product +} + +object Variant extends LazyLogging { + + def apply()(implicit context: ETLSessionContext): IOResources = { + + logger.info("Executing Variant step.") + implicit val ss: SparkSession = context.sparkSession + + val closestGeneId = udaf(ClosestGeneId) + ss.udf.register("closest_gene", closestGeneId) + + val variantConfiguration = context.configuration.variant + + logger.info(s"Configuration for Variant: $variantConfiguration") + + val mappedInputs = Map( + "variants" -> variantConfiguration.inputs.variantAnnotation, + "targets" -> variantConfiguration.inputs.targetIndex + ) + val inputs = IoHelpers.readFrom(mappedInputs) + + val variantRawDf: DataFrame = inputs("variants").data + val targetRawDf: DataFrame = inputs("targets").data + + val approvedBioTypes = variantConfiguration.excludedBiotypes.toSet + val excludedChromosomes: Set[String] = Set("MT") + logger.info("Generate target DF for variant index.") + val targetDf = targetRawDf + .select( + col("id") as "gene_id", + col("genomicLocation.*"), + col("biotype"), + when(col("genomicLocation.strand") > 0, col("genomicLocation.start")) + .otherwise("genomicLocation.end") as "tss" + ) + .filter( + (col("biotype") isInCollection approvedBioTypes) && !(col("chromosome") isInCollection excludedChromosomes)) + + logger.info("Generate protein coding DF for variant index.") + val proteinCodingDf = targetDf.filter(col("biotype") === "protein_coding") + + logger.info("Generate variant DF for variant index.") + val variantDf = variantRawDf + .filter(col("chrom_b38").isNotNull && col("pos_b38").isNotNull) + .select( + col("chrom_b37") as "chr_id_b37", + col("pos_b37") as "position_b37", + col("chrom_b38") as "chr_id", + col("pos_b38") as "position", + col("ref") as "ref_allele", + col("alt") as "alt_allele", + col("rsid") as "rs_id", + col("vep.most_severe_consequence") as "most_severe_consequence", + col("cadd") as "cadd", + col("af") as "af", + ) + + def variantGeneDistance(target: DataFrame) = + variantDf + .join( + target, + (col("chr_id") === col("chromosome")) && (abs(col("position") - col("tss")) <= variantConfiguration.tssDistance)) + .withColumn("d", abs(col("position") - col("tss"))) + + logger.info("Calculate distance score for variant to gene.") + val variantGeneDistanceDf = variantGeneDistance(targetDf) + val variantPcDistanceDf = variantGeneDistance(proteinCodingDf) + + // these four components uniquely identify a variant + val variantId = Seq("chr_id", "position", "ref_allele", "alt_allele") + + logger.info("Rank variant scores by distance") + val variantGeneScored = variantGeneDistanceDf + .groupBy(col("chr_id"), col("position"), col("ref_allele"), col("alt_allele")) + .agg(closestGeneId(col("gene_id"), col("d")) as "gene_id_any", + min(col("d")) cast LongType as "gene_id_any_distance") + + val variantPcScored = variantPcDistanceDf + .groupBy( + col("chr_id"), + col("position"), + col("ref_allele"), + col("alt_allele"), + ) + .agg(closestGeneId(col("gene_id"), col("d")) as "gene_id_prot_coding", + min(col("d")) cast LongType as "gene_id_prot_coding_distance") + + logger.info("Join scored distances variants and scored protein coding.") + val vgDistances = variantGeneScored.join(variantPcScored, variantId, "full_outer") + + logger.info("Join distances to variants.") + val variantIndex = variantDf.join(vgDistances, variantId, "left_outer") + + val outputs = Map( + "variant" -> IOResource(variantIndex, variantConfiguration.outputs.variants) + ) + logger.info("Write variant index outputs.") + IoHelpers.writeTo(outputs) + } +} From 6f1cade4d7e8e74e768013d47fee01b9836871ac Mon Sep 17 00:00:00 2001 From: Jarrod Baker Date: Fri, 18 Feb 2022 11:21:57 +0000 Subject: [PATCH 4/6] Remove unused dependencies. --- project/Dependencies.scala | 43 ++----------------- .../etl/backend/openfda/stage/LoadData.scala | 43 ++++++++++--------- .../etl/backend/spark/IoHelpers.scala | 18 ++++---- 3 files changed, 34 insertions(+), 70 deletions(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 1e472a5e..ab341167 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -3,33 +3,18 @@ import sbt._ object Dependencies { lazy val dependencies: Seq[ModuleID] = Seq( - aoyi, Seq(betterFiles), - codeDeps, configDeps, loggingDeps, graphDeps, sparkDeps, testingDeps, gcp, - Seq(typeSafeConfig), - cats, - monocle, - smile, - johnS + Seq(typeSafeConfig) ).flatten - lazy val aoyi = Seq( - "com.lihaoyi" %% "pprint" % "0.6.0" - ) - lazy val betterFiles = "com.github.pathikrit" %% "better-files-akka" % "3.9.1" - lazy val codeDeps = Seq( - "com.beachape" %% "enumeratum" % "1.6.1", - "com.github.scopt" %% "scopt" % "3.7.1" - ) - lazy val configDeps = Seq( "com.github.pureconfig" %% "pureconfig" % "0.14.1" ) @@ -60,31 +45,9 @@ object Dependencies { "org.scalatest" %% "scalatest" % testVersion % "test" ) :+ scalaCheck lazy val gcp = Seq( - "com.google.cloud" % "google-cloud-storage" % "2.3.0" + "com.google.cloud" % "google-cloud-dataproc" % "2.3.2" % "provided", + "com.google.cloud" % "google-cloud-storage" % "2.4.2" % "provided" ) lazy val typeSafeConfig = "com.typesafe" % "config" % "1.4.1" - lazy val catsVersion = "2.4.2" - lazy val cats = Seq( - "org.typelevel" %% "cats-core" % catsVersion, - "org.typelevel" %% "cats-laws" % catsVersion, - "org.typelevel" %% "cats-kernel" % catsVersion, - "org.typelevel" %% "cats-kernel-laws" % catsVersion - ) - - lazy val monocleVersion = "2.1.0" - lazy val monocle = Seq( - "com.github.julien-truffaut" %% "monocle-core" % monocleVersion, - "com.github.julien-truffaut" %% "monocle-macro" % monocleVersion - ) - - lazy val smileVersion = "2.6.0" - lazy val smile = Seq( - "com.github.haifengl" %% "smile-scala" % smileVersion - ) - - lazy val johnSVersion = "3.0.0" - lazy val johnS = Seq( - "com.johnsnowlabs.nlp" % "spark-nlp_2.12" % johnSVersion - ) } diff --git a/src/main/scala/io/opentargets/etl/backend/openfda/stage/LoadData.scala b/src/main/scala/io/opentargets/etl/backend/openfda/stage/LoadData.scala index b40f3231..1a6f14f9 100644 --- a/src/main/scala/io/opentargets/etl/backend/openfda/stage/LoadData.scala +++ b/src/main/scala/io/opentargets/etl/backend/openfda/stage/LoadData.scala @@ -1,13 +1,14 @@ package io.opentargets.etl.backend.openfda.stage -import akka.actor.TypedActor.context -import io.opentargets.etl.backend.spark.Helpers.IOResourceConfig import io.opentargets.etl.backend.spark.IoHelpers -import io.opentargets.etl.backend.spark.IoHelpers.IOResourceConfigurations -import io.opentargets.etl.backend.{Blacklisting, DrugData, ETLSessionContext, FdaData, MeddraLowLevelTermsData, MeddraPreferredTermsData} -import org.apache.spark.sql.SparkSession - -import scala.collection.immutable.Stream.Empty +import io.opentargets.etl.backend.{ + Blacklisting, + DrugData, + ETLSessionContext, + FdaData, + MeddraLowLevelTermsData, + MeddraPreferredTermsData +} object LoadData { def apply()(implicit context: ETLSessionContext) = { @@ -18,19 +19,21 @@ object LoadData { // Prepare the loading Map val sourceData = { context.configuration.openfda.meddra match { - // DISCLAIMER - There's probably a better way to do this - case Some(meddraConfig) => Map( - DrugData() -> context.configuration.openfda.chemblDrugs, - Blacklisting() -> context.configuration.openfda.blacklistedEvents, - FdaData() -> context.configuration.openfda.fdaData, - MeddraPreferredTermsData() -> meddraConfig.meddraPreferredTerms, - MeddraLowLevelTermsData() -> meddraConfig.meddraLowLevelTerms - ) - case _ => Map( - DrugData() -> context.configuration.openfda.chemblDrugs, - Blacklisting() -> context.configuration.openfda.blacklistedEvents, - FdaData() -> context.configuration.openfda.fdaData, - ) + // DISCLAIMER - There's probably a better way to do this + case Some(meddraConfig) => + Map( + DrugData() -> context.configuration.openfda.chemblDrugs, + Blacklisting() -> context.configuration.openfda.blacklistedEvents, + FdaData() -> context.configuration.openfda.fdaData, + MeddraPreferredTermsData() -> meddraConfig.meddraPreferredTerms, + MeddraLowLevelTermsData() -> meddraConfig.meddraLowLevelTerms + ) + case _ => + Map( + DrugData() -> context.configuration.openfda.chemblDrugs, + Blacklisting() -> context.configuration.openfda.blacklistedEvents, + FdaData() -> context.configuration.openfda.fdaData, + ) } } // Load the data diff --git a/src/main/scala/io/opentargets/etl/backend/spark/IoHelpers.scala b/src/main/scala/io/opentargets/etl/backend/spark/IoHelpers.scala index 292019bb..33f8e86a 100644 --- a/src/main/scala/io/opentargets/etl/backend/spark/IoHelpers.scala +++ b/src/main/scala/io/opentargets/etl/backend/spark/IoHelpers.scala @@ -3,7 +3,6 @@ package io.opentargets.etl.backend.spark import com.typesafe.scalalogging.LazyLogging import io.opentargets.etl.backend.Configuration.OTConfig import io.opentargets.etl.backend.ETLSessionContext -import monocle.macros.syntax.lens.toGenApplyLensOps import org.apache.spark.sql.functions.current_timestamp import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession} @@ -172,18 +171,17 @@ object IoHelpers extends LazyLogging { import session.implicits._ val serialisedSchema = ior.data.schema.json - val iores = - ior.configuration - .lens(_.path) - .modify( - _.stripPrefix(context.configuration.common.output) - .split("/") - .filter(_.nonEmpty) - .mkString("/", "/", "")) + val iores = ior.configuration.copy( + path = ior.configuration.path + .stripPrefix(context.configuration.common.output) + .split("/") + .filter(_.nonEmpty) + .mkString("/", "/", "")) + val cols = ior.data.columns.toList val id = ior.configuration.path.split("/").filter(_.nonEmpty).last val newPath = withConfig.path + s"/$id" - val metadataConfig = withConfig.lens(_.path).set(newPath) + val metadataConfig = withConfig.copy(path = newPath) val metadata = List(Metadata(id, iores, serialisedSchema, cols)).toDF From a2cf6f1eb7d59cac657ebe69cee0718330c7580c Mon Sep 17 00:00:00 2001 From: Jarrod Baker Date: Fri, 4 Mar 2022 15:56:57 +0000 Subject: [PATCH 5/6] Update Variant to remove UDAF --- .../io/opentargets/etl/backend/Variant.scala | 90 +++++++++---------- .../etl/backend/spark/Helpers.scala | 1 + 2 files changed, 41 insertions(+), 50 deletions(-) diff --git a/src/main/scala/io/opentargets/etl/backend/Variant.scala b/src/main/scala/io/opentargets/etl/backend/Variant.scala index 40b4d20c..e7d8822a 100644 --- a/src/main/scala/io/opentargets/etl/backend/Variant.scala +++ b/src/main/scala/io/opentargets/etl/backend/Variant.scala @@ -3,31 +3,18 @@ package io.opentargets.etl.backend import com.typesafe.scalalogging.LazyLogging import io.opentargets.etl.backend.spark.{IOResource, IoHelpers} import io.opentargets.etl.backend.spark.IoHelpers.IOResources -import org.apache.spark.sql.expressions.Aggregator -import org.apache.spark.sql.functions.{abs, col, min, udaf, when} -import org.apache.spark.sql.types.LongType -import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession, functions} - -/** UDAF to find the ENSG ID closest to a variant by distance. First argument should be an ENSG ID, the second is the - * distance. - * */ -object ClosestGeneId extends Aggregator[(String, Long), (String, Long), String] { - type GeneAndDistance = (String, Long) - - def zero: GeneAndDistance = ("", Long.MaxValue) - - def reduce(buffer: GeneAndDistance, nxt: GeneAndDistance): GeneAndDistance = { - if (buffer._2 < nxt._2) buffer else nxt - } - - def merge(b1: GeneAndDistance, b2: GeneAndDistance): GeneAndDistance = reduce(b1, b2) - - def finish(reduction: GeneAndDistance): String = reduction._1 - - def outputEncoder: Encoder[String] = Encoders.STRING - - def bufferEncoder: Encoder[(String, Long)] = Encoders.product +import org.apache.spark.sql.functions.{ + abs, + arrays_zip, + col, + collect_list, + map_from_entries, + min, + udaf, + when } +import org.apache.spark.sql.types.LongType +import org.apache.spark.sql.{DataFrame, SparkSession} object Variant extends LazyLogging { @@ -36,9 +23,6 @@ object Variant extends LazyLogging { logger.info("Executing Variant step.") implicit val ss: SparkSession = context.sparkSession - val closestGeneId = udaf(ClosestGeneId) - ss.udf.register("closest_gene", closestGeneId) - val variantConfiguration = context.configuration.variant logger.info(s"Configuration for Variant: $variantConfiguration") @@ -54,6 +38,11 @@ object Variant extends LazyLogging { val approvedBioTypes = variantConfiguration.excludedBiotypes.toSet val excludedChromosomes: Set[String] = Set("MT") + + // these four components uniquely identify a variant + val variantIdStr = Seq("chr_id", "position", "ref_allele", "alt_allele") + val variantIdCol = variantIdStr.map(col) + logger.info("Generate target DF for variant index.") val targetDf = targetRawDf .select( @@ -61,7 +50,7 @@ object Variant extends LazyLogging { col("genomicLocation.*"), col("biotype"), when(col("genomicLocation.strand") > 0, col("genomicLocation.start")) - .otherwise("genomicLocation.end") as "tss" + .otherwise(col("genomicLocation.end")) as "tss" ) .filter( (col("biotype") isInCollection approvedBioTypes) && !(col("chromosome") isInCollection excludedChromosomes)) @@ -84,8 +73,9 @@ object Variant extends LazyLogging { col("cadd") as "cadd", col("af") as "af", ) + .repartition(variantIdCol: _*) - def variantGeneDistance(target: DataFrame) = + def variantGeneDistance(target: DataFrame): DataFrame = variantDf .join( target, @@ -93,33 +83,33 @@ object Variant extends LazyLogging { .withColumn("d", abs(col("position") - col("tss"))) logger.info("Calculate distance score for variant to gene.") - val variantGeneDistanceDf = variantGeneDistance(targetDf) - val variantPcDistanceDf = variantGeneDistance(proteinCodingDf) - - // these four components uniquely identify a variant - val variantId = Seq("chr_id", "position", "ref_allele", "alt_allele") + val variantGeneDistanceDf = targetDf.transform(variantGeneDistance) + val variantPcDistanceDf = proteinCodingDf.transform(variantGeneDistance) logger.info("Rank variant scores by distance") - val variantGeneScored = variantGeneDistanceDf - .groupBy(col("chr_id"), col("position"), col("ref_allele"), col("alt_allele")) - .agg(closestGeneId(col("gene_id"), col("d")) as "gene_id_any", - min(col("d")) cast LongType as "gene_id_any_distance") - - val variantPcScored = variantPcDistanceDf - .groupBy( - col("chr_id"), - col("position"), - col("ref_allele"), - col("alt_allele"), - ) - .agg(closestGeneId(col("gene_id"), col("d")) as "gene_id_prot_coding", - min(col("d")) cast LongType as "gene_id_prot_coding_distance") + + def findNearestGene(name: String)(df: DataFrame): DataFrame = { + val nameDistance = s"${name}_distance" + df.groupBy(variantIdCol: _*) + .agg(collect_list(col("gene_id")) as "geneList", + collect_list(col("d")) as "dist", + min(col("d")) cast LongType as nameDistance) + .select( + variantIdCol ++ Seq( + col(nameDistance), + map_from_entries(arrays_zip(col("dist"), col("geneList"))) as "distToGeneMap"): _*) + .withColumn(name, col("distToGeneMap")(col(nameDistance))) + .drop("distToGeneMap", "geneList", "dist") + } + + val variantGeneScored = variantGeneDistanceDf.transform(findNearestGene("gene_id_any")) + val variantPcScored = variantPcDistanceDf.transform(findNearestGene("gene_id_prot_coding")) logger.info("Join scored distances variants and scored protein coding.") - val vgDistances = variantGeneScored.join(variantPcScored, variantId, "full_outer") + val vgDistances = variantGeneScored.join(variantPcScored, variantIdStr, "full_outer") logger.info("Join distances to variants.") - val variantIndex = variantDf.join(vgDistances, variantId, "left_outer") + val variantIndex = variantDf.join(vgDistances, variantIdStr, "left_outer") val outputs = Map( "variant" -> IOResource(variantIndex, variantConfiguration.outputs.variants) diff --git a/src/main/scala/io/opentargets/etl/backend/spark/Helpers.scala b/src/main/scala/io/opentargets/etl/backend/spark/Helpers.scala index ab96f6e4..93c6838e 100644 --- a/src/main/scala/io/opentargets/etl/backend/spark/Helpers.scala +++ b/src/main/scala/io/opentargets/etl/backend/spark/Helpers.scala @@ -79,6 +79,7 @@ object Helpers extends LazyLogging { .setAppName(appName) .set("spark.driver.maxResultSize", "0") .set("spark.debug.maxToStringFields", "2000") + .set("spark.sql.mapKeyDedupPolicy", "LAST_WIN") // if some uri then setmaster must be set otherwise // it tries to get from env if any yarn running From 12b2a9fa884127518396128604c12956f489574c Mon Sep 17 00:00:00 2001 From: Jarrod Baker Date: Tue, 8 Mar 2022 15:55:15 +0000 Subject: [PATCH 6/6] Add documentation for Variant step. --- README.md | 9 +++++++ documentation/etl_current.puml | 4 ++++ .../io/opentargets/etl/backend/Variant.scala | 24 +++++++++++++------ 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 64a5dad7..cb169621 100644 --- a/README.md +++ b/README.md @@ -179,6 +179,15 @@ responsibility_ to ensure that required inputs are available. ## Step notes +### Variant + +Variant relates to the former genetics-pipe project. The step requires two inputs: + +| input | notes | +| --- | --- | +| `variant-annotation` | This is provided by the data- or genetics-team, and is confusingly also referred to as the variant-index in some places. | +| `target-index` | Produced by the target step of the ETL. | + ### Target Validation Inputs can be provided here where the only logic is to match an ENSG ID against an input column. Any input rows which diff --git a/documentation/etl_current.puml b/documentation/etl_current.puml index 5c87e779..84d1f811 100644 --- a/documentation/etl_current.puml +++ b/documentation/etl_current.puml @@ -6,6 +6,7 @@ skinparam interface { skinparam artifact { backgroundColor<> orchid backgroundColor<> darkturquoise + backgroundColor<> green } ' steps artifact associations <> @@ -24,6 +25,8 @@ artifact target <> artifact openfda <> artifact ebiSearch <> +artifact variant <> + reactome --> target evidence --> associations @@ -52,6 +55,7 @@ drug --> search associations --> search target --> interactions +target --> variant target --> targetValidation diff --git a/src/main/scala/io/opentargets/etl/backend/Variant.scala b/src/main/scala/io/opentargets/etl/backend/Variant.scala index e7d8822a..d6a1915f 100644 --- a/src/main/scala/io/opentargets/etl/backend/Variant.scala +++ b/src/main/scala/io/opentargets/etl/backend/Variant.scala @@ -53,7 +53,10 @@ object Variant extends LazyLogging { .otherwise(col("genomicLocation.end")) as "tss" ) .filter( - (col("biotype") isInCollection approvedBioTypes) && !(col("chromosome") isInCollection excludedChromosomes)) + (col("biotype") isInCollection approvedBioTypes) && !(col( + "chromosome" + ) isInCollection excludedChromosomes) + ) logger.info("Generate protein coding DF for variant index.") val proteinCodingDf = targetDf.filter(col("biotype") === "protein_coding") @@ -71,7 +74,7 @@ object Variant extends LazyLogging { col("rsid") as "rs_id", col("vep.most_severe_consequence") as "most_severe_consequence", col("cadd") as "cadd", - col("af") as "af", + col("af") as "af" ) .repartition(variantIdCol: _*) @@ -79,7 +82,10 @@ object Variant extends LazyLogging { variantDf .join( target, - (col("chr_id") === col("chromosome")) && (abs(col("position") - col("tss")) <= variantConfiguration.tssDistance)) + (col("chr_id") === col("chromosome")) && (abs( + col("position") - col("tss") + ) <= variantConfiguration.tssDistance) + ) .withColumn("d", abs(col("position") - col("tss"))) logger.info("Calculate distance score for variant to gene.") @@ -91,13 +97,17 @@ object Variant extends LazyLogging { def findNearestGene(name: String)(df: DataFrame): DataFrame = { val nameDistance = s"${name}_distance" df.groupBy(variantIdCol: _*) - .agg(collect_list(col("gene_id")) as "geneList", - collect_list(col("d")) as "dist", - min(col("d")) cast LongType as nameDistance) + .agg( + collect_list(col("gene_id")) as "geneList", + collect_list(col("d")) as "dist", + min(col("d")) cast LongType as nameDistance + ) .select( variantIdCol ++ Seq( col(nameDistance), - map_from_entries(arrays_zip(col("dist"), col("geneList"))) as "distToGeneMap"): _*) + map_from_entries(arrays_zip(col("dist"), col("geneList"))) as "distToGeneMap" + ): _* + ) .withColumn(name, col("distToGeneMap")(col(nameDistance))) .drop("distToGeneMap", "geneList", "dist") }