Skip to content

Commit

Permalink
cherry pick support write back to original tag for jar (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 authored Jan 6, 2022
1 parent 0dc354c commit ac445cc
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 5 deletions.
2 changes: 2 additions & 0 deletions nebula-algorithm/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
space:nb
# Nebula tag name, the algorithm result will be write into this tag
tag:pagerank
# algorithm result is insert into new tag or update to original tag. type: insert/update
type:insert
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,15 @@ object NebulaConfigEntry {
val pswd = nebulaConfig.getString("write.pswd")
val writeSpace = nebulaConfig.getString("write.space")
val writeTag = nebulaConfig.getString("write.tag")
val writeType = nebulaConfig.getString("write.type")
val writeConfigEntry =
NebulaWriteConfigEntry(graphAddress, writeMetaAddress, user, pswd, writeSpace, writeTag)
NebulaWriteConfigEntry(graphAddress,
writeMetaAddress,
user,
pswd,
writeSpace,
writeTag,
writeType)
NebulaConfigEntry(readConfigEntry, writeConfigEntry)
}
}
Expand Down Expand Up @@ -211,10 +218,11 @@ case class NebulaWriteConfigEntry(graphAddress: String = "",
user: String = "",
pswd: String = "",
space: String = "",
tag: String = "") {
tag: String = "",
writeType: String = "insert") {
override def toString: String = {
s"NebulaWriteConfigEntry: " +
s"{graphAddress: $graphAddress, user: $user, password: $pswd, space: $space, tag: $tag}"
s"{graphAddress: $graphAddress, user: $user, password: $pswd, space: $space, tag: $tag, type: $writeType}"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package com.vesoft.nebula.algorithm.writer

import com.vesoft.nebula.connector.connector.NebulaDataFrameWriter
import com.vesoft.nebula.connector.{NebulaConnectionConfig, WriteNebulaVertexConfig}
import com.vesoft.nebula.connector.{NebulaConnectionConfig, WriteMode, WriteNebulaVertexConfig}
import com.vesoft.nebula.algorithm.config.{AlgoConstants, Configs}
import org.apache.spark.sql.DataFrame

Expand All @@ -22,6 +22,8 @@ class NebulaWriter(data: DataFrame, configs: Configs) extends AlgoWriter(data, c
val tag = configs.nebulaConfig.writeConfigEntry.tag
val user = configs.nebulaConfig.writeConfigEntry.user
val passwd = configs.nebulaConfig.writeConfigEntry.pswd
val writeType = configs.nebulaConfig.writeConfigEntry.writeType
val writeMode = if (writeType.equals("insert")) WriteMode.INSERT else WriteMode.UPDATE

val config =
NebulaConnectionConfig
Expand All @@ -30,13 +32,16 @@ class NebulaWriter(data: DataFrame, configs: Configs) extends AlgoWriter(data, c
.withGraphAddress(graphAddress)
.withConenctionRetry(2)
.build()
val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
val nebulaWriteVertexConfig = WriteNebulaVertexConfig
.builder()
.withUser(user)
.withPasswd(passwd)
.withSpace(space)
.withTag(tag)
.withVidField(AlgoConstants.ALGO_ID_COL)
.withVidAsProp(false)
.withBatch(1000)
.withWriteMode(writeMode)
.build()
data.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
}
Expand Down

0 comments on commit ac445cc

Please sign in to comment.