diff --git a/README.md b/README.md index 64a5dad7..cb169621 100644 --- a/README.md +++ b/README.md @@ -179,6 +179,15 @@ responsibility_ to ensure that required inputs are available. ## Step notes +### Variant + +Variant relates to the former genetics-pipe project. The step requires two inputs: + +| input | notes | +| --- | --- | +| `variant-annotation` | This is provided by the data- or genetics-team, and is confusingly also referred to as the variant-index in some places. | +| `target-index` | Produced by the target step of the ETL. | + ### Target Validation Inputs can be provided here where the only logic is to match an ENSG ID against an input column. Any input rows which diff --git a/documentation/etl_current.puml b/documentation/etl_current.puml index 5c87e779..84d1f811 100644 --- a/documentation/etl_current.puml +++ b/documentation/etl_current.puml @@ -6,6 +6,7 @@ skinparam interface { skinparam artifact { backgroundColor<> orchid backgroundColor<> darkturquoise + backgroundColor<> green } ' steps artifact associations <> @@ -24,6 +25,8 @@ artifact target <> artifact openfda <> artifact ebiSearch <> +artifact variant <> + reactome --> target evidence --> associations @@ -52,6 +55,7 @@ drug --> search associations --> search target --> interactions +target --> variant target --> targetValidation diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 0579685a..ab341167 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -44,11 +44,10 @@ object Dependencies { "org.scalactic" %% "scalactic" % testVersion, "org.scalatest" %% "scalatest" % testVersion % "test" ) :+ scalaCheck - - lazy val typeSafeConfig = "com.typesafe" % "config" % "1.4.1" - lazy val gcp = Seq( "com.google.cloud" % "google-cloud-dataproc" % "2.3.2" % "provided", - "com.google.cloud" % "google-cloud-storage" % "2.4.2" + "com.google.cloud" % "google-cloud-storage" % "2.4.2" % "provided" ) + lazy val typeSafeConfig = "com.typesafe" % "config" % "1.4.1" + } diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 55b40c31..207b150b 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -1187,3 +1187,41 @@ otarproject { path = ${common.output}"/otar_projects" } } + +genetics { + release = "22.02.2" + output = "gs://genetics-portal-dev-data/"${genetics.release}"/outputs" + input = "gs://genetics-portal-dev-data/"${genetics.release}"/inputs" +} + +variant { + excluded-biotypes = [ + "3prime_overlapping_ncRNA", + "antisense", + "bidirectional_promoter_lncRNA", + "IG_C_gene", + "IG_D_gene", + "IG_J_gene", + "IG_V_gene", + "lincRNA", + "macro_lncRNA", + "non_coding", + "protein_coding", + "sense_intronic", + "sense_overlapping" + ] + tss-distance = 5000000 + inputs { + variant-annotation { + path = ${genetics.input}"/variant-annotation/variant-annotation.parquet" + format = "parquet" + } + target-index = ${target.outputs.target} + } + outputs { + variants { + path = ${genetics.output}"/variant-index/" + format = ${common.output-format} + } + } +} diff --git a/src/main/scala/io/opentargets/etl/Main.scala b/src/main/scala/io/opentargets/etl/Main.scala index 2996cda9..520f5de4 100644 --- a/src/main/scala/io/opentargets/etl/Main.scala +++ b/src/main/scala/io/opentargets/etl/Main.scala @@ -68,6 +68,9 @@ object ETL extends LazyLogging { case "targetvalidation" => logger.info("run step targetValidation") TargetValidation() + case "variantIndex" => + logger.info("run step variant-index (genetics)") + Variant() case _ => logger.warn(s"step $step is unknown so nothing to execute") } logger.info(s"finished to run step ($step)") diff --git a/src/main/scala/io/opentargets/etl/backend/Configuration.scala b/src/main/scala/io/opentargets/etl/backend/Configuration.scala index 2e8625ab..a5051029 100644 --- a/src/main/scala/io/opentargets/etl/backend/Configuration.scala +++ b/src/main/scala/io/opentargets/etl/backend/Configuration.scala @@ -259,6 +259,26 @@ object Configuration extends LazyLogging { ) // --- END --- // + // --- Genetics start --- // + case class Genetics(release: String, output: String, input: String) + + case class VariantInputs( + variantAnnotation: IOResourceConfig, + targetIndex: IOResourceConfig + ) + + case class VariantOutputs( + variants: IOResourceConfig + ) + + case class Variants( + excludedBiotypes: List[String], + tssDistance: Long, + inputs: VariantInputs, + outputs: VariantOutputs + ) + // --- Genetics end --- // + case class EtlStep[T](step: T, dependencies: List[T]) case class EtlDagConfig(steps: List[EtlStep[String]], resolve: Boolean) @@ -283,6 +303,7 @@ object Configuration extends LazyLogging { expression: ExpressionSection, openfda: OpenfdaSection, ebisearch: EBISearchSection, - otarproject: OtarProjectSection + otarproject: OtarProjectSection, + variant: Variants ) } diff --git a/src/main/scala/io/opentargets/etl/backend/Variant.scala b/src/main/scala/io/opentargets/etl/backend/Variant.scala new file mode 100644 index 00000000..d6a1915f --- /dev/null +++ b/src/main/scala/io/opentargets/etl/backend/Variant.scala @@ -0,0 +1,130 @@ +package io.opentargets.etl.backend + +import com.typesafe.scalalogging.LazyLogging +import io.opentargets.etl.backend.spark.{IOResource, IoHelpers} +import io.opentargets.etl.backend.spark.IoHelpers.IOResources +import org.apache.spark.sql.functions.{ + abs, + arrays_zip, + col, + collect_list, + map_from_entries, + min, + udaf, + when +} +import org.apache.spark.sql.types.LongType +import org.apache.spark.sql.{DataFrame, SparkSession} + +object Variant extends LazyLogging { + + def apply()(implicit context: ETLSessionContext): IOResources = { + + logger.info("Executing Variant step.") + implicit val ss: SparkSession = context.sparkSession + + val variantConfiguration = context.configuration.variant + + logger.info(s"Configuration for Variant: $variantConfiguration") + + val mappedInputs = Map( + "variants" -> variantConfiguration.inputs.variantAnnotation, + "targets" -> variantConfiguration.inputs.targetIndex + ) + val inputs = IoHelpers.readFrom(mappedInputs) + + val variantRawDf: DataFrame = inputs("variants").data + val targetRawDf: DataFrame = inputs("targets").data + + val approvedBioTypes = variantConfiguration.excludedBiotypes.toSet + val excludedChromosomes: Set[String] = Set("MT") + + // these four components uniquely identify a variant + val variantIdStr = Seq("chr_id", "position", "ref_allele", "alt_allele") + val variantIdCol = variantIdStr.map(col) + + logger.info("Generate target DF for variant index.") + val targetDf = targetRawDf + .select( + col("id") as "gene_id", + col("genomicLocation.*"), + col("biotype"), + when(col("genomicLocation.strand") > 0, col("genomicLocation.start")) + .otherwise(col("genomicLocation.end")) as "tss" + ) + .filter( + (col("biotype") isInCollection approvedBioTypes) && !(col( + "chromosome" + ) isInCollection excludedChromosomes) + ) + + logger.info("Generate protein coding DF for variant index.") + val proteinCodingDf = targetDf.filter(col("biotype") === "protein_coding") + + logger.info("Generate variant DF for variant index.") + val variantDf = variantRawDf + .filter(col("chrom_b38").isNotNull && col("pos_b38").isNotNull) + .select( + col("chrom_b37") as "chr_id_b37", + col("pos_b37") as "position_b37", + col("chrom_b38") as "chr_id", + col("pos_b38") as "position", + col("ref") as "ref_allele", + col("alt") as "alt_allele", + col("rsid") as "rs_id", + col("vep.most_severe_consequence") as "most_severe_consequence", + col("cadd") as "cadd", + col("af") as "af" + ) + .repartition(variantIdCol: _*) + + def variantGeneDistance(target: DataFrame): DataFrame = + variantDf + .join( + target, + (col("chr_id") === col("chromosome")) && (abs( + col("position") - col("tss") + ) <= variantConfiguration.tssDistance) + ) + .withColumn("d", abs(col("position") - col("tss"))) + + logger.info("Calculate distance score for variant to gene.") + val variantGeneDistanceDf = targetDf.transform(variantGeneDistance) + val variantPcDistanceDf = proteinCodingDf.transform(variantGeneDistance) + + logger.info("Rank variant scores by distance") + + def findNearestGene(name: String)(df: DataFrame): DataFrame = { + val nameDistance = s"${name}_distance" + df.groupBy(variantIdCol: _*) + .agg( + collect_list(col("gene_id")) as "geneList", + collect_list(col("d")) as "dist", + min(col("d")) cast LongType as nameDistance + ) + .select( + variantIdCol ++ Seq( + col(nameDistance), + map_from_entries(arrays_zip(col("dist"), col("geneList"))) as "distToGeneMap" + ): _* + ) + .withColumn(name, col("distToGeneMap")(col(nameDistance))) + .drop("distToGeneMap", "geneList", "dist") + } + + val variantGeneScored = variantGeneDistanceDf.transform(findNearestGene("gene_id_any")) + val variantPcScored = variantPcDistanceDf.transform(findNearestGene("gene_id_prot_coding")) + + logger.info("Join scored distances variants and scored protein coding.") + val vgDistances = variantGeneScored.join(variantPcScored, variantIdStr, "full_outer") + + logger.info("Join distances to variants.") + val variantIndex = variantDf.join(vgDistances, variantIdStr, "left_outer") + + val outputs = Map( + "variant" -> IOResource(variantIndex, variantConfiguration.outputs.variants) + ) + logger.info("Write variant index outputs.") + IoHelpers.writeTo(outputs) + } +} diff --git a/src/main/scala/io/opentargets/etl/backend/openfda/stage/LoadData.scala b/src/main/scala/io/opentargets/etl/backend/openfda/stage/LoadData.scala index b40f3231..1a6f14f9 100644 --- a/src/main/scala/io/opentargets/etl/backend/openfda/stage/LoadData.scala +++ b/src/main/scala/io/opentargets/etl/backend/openfda/stage/LoadData.scala @@ -1,13 +1,14 @@ package io.opentargets.etl.backend.openfda.stage -import akka.actor.TypedActor.context -import io.opentargets.etl.backend.spark.Helpers.IOResourceConfig import io.opentargets.etl.backend.spark.IoHelpers -import io.opentargets.etl.backend.spark.IoHelpers.IOResourceConfigurations -import io.opentargets.etl.backend.{Blacklisting, DrugData, ETLSessionContext, FdaData, MeddraLowLevelTermsData, MeddraPreferredTermsData} -import org.apache.spark.sql.SparkSession - -import scala.collection.immutable.Stream.Empty +import io.opentargets.etl.backend.{ + Blacklisting, + DrugData, + ETLSessionContext, + FdaData, + MeddraLowLevelTermsData, + MeddraPreferredTermsData +} object LoadData { def apply()(implicit context: ETLSessionContext) = { @@ -18,19 +19,21 @@ object LoadData { // Prepare the loading Map val sourceData = { context.configuration.openfda.meddra match { - // DISCLAIMER - There's probably a better way to do this - case Some(meddraConfig) => Map( - DrugData() -> context.configuration.openfda.chemblDrugs, - Blacklisting() -> context.configuration.openfda.blacklistedEvents, - FdaData() -> context.configuration.openfda.fdaData, - MeddraPreferredTermsData() -> meddraConfig.meddraPreferredTerms, - MeddraLowLevelTermsData() -> meddraConfig.meddraLowLevelTerms - ) - case _ => Map( - DrugData() -> context.configuration.openfda.chemblDrugs, - Blacklisting() -> context.configuration.openfda.blacklistedEvents, - FdaData() -> context.configuration.openfda.fdaData, - ) + // DISCLAIMER - There's probably a better way to do this + case Some(meddraConfig) => + Map( + DrugData() -> context.configuration.openfda.chemblDrugs, + Blacklisting() -> context.configuration.openfda.blacklistedEvents, + FdaData() -> context.configuration.openfda.fdaData, + MeddraPreferredTermsData() -> meddraConfig.meddraPreferredTerms, + MeddraLowLevelTermsData() -> meddraConfig.meddraLowLevelTerms + ) + case _ => + Map( + DrugData() -> context.configuration.openfda.chemblDrugs, + Blacklisting() -> context.configuration.openfda.blacklistedEvents, + FdaData() -> context.configuration.openfda.fdaData, + ) } } // Load the data diff --git a/src/main/scala/io/opentargets/etl/backend/spark/Helpers.scala b/src/main/scala/io/opentargets/etl/backend/spark/Helpers.scala index ab96f6e4..93c6838e 100644 --- a/src/main/scala/io/opentargets/etl/backend/spark/Helpers.scala +++ b/src/main/scala/io/opentargets/etl/backend/spark/Helpers.scala @@ -79,6 +79,7 @@ object Helpers extends LazyLogging { .setAppName(appName) .set("spark.driver.maxResultSize", "0") .set("spark.debug.maxToStringFields", "2000") + .set("spark.sql.mapKeyDedupPolicy", "LAST_WIN") // if some uri then setmaster must be set otherwise // it tries to get from env if any yarn running diff --git a/src/main/scala/io/opentargets/etl/backend/spark/IoHelpers.scala b/src/main/scala/io/opentargets/etl/backend/spark/IoHelpers.scala index e195ef03..33f8e86a 100644 --- a/src/main/scala/io/opentargets/etl/backend/spark/IoHelpers.scala +++ b/src/main/scala/io/opentargets/etl/backend/spark/IoHelpers.scala @@ -172,8 +172,11 @@ object IoHelpers extends LazyLogging { val serialisedSchema = ior.data.schema.json val iores = ior.configuration.copy( - path = - context.configuration.common.output.split("/").filter(_.nonEmpty).mkString("/", "/", "")) + path = ior.configuration.path + .stripPrefix(context.configuration.common.output) + .split("/") + .filter(_.nonEmpty) + .mkString("/", "/", "")) val cols = ior.data.columns.toList val id = ior.configuration.path.split("/").filter(_.nonEmpty).last