Skip to content

Commit

Permalink
support to filter the datasource
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 committed Jan 24, 2024
1 parent e57651a commit c121554
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ case class UdfConfigEntry(sep: String, oldColNames: List[String], newColName: St
}
}

case class FilterConfigEntry(filter: String){
override def toString(): String = s"filter:$filter"
}

/**
*
*/
Expand Down Expand Up @@ -467,6 +471,10 @@ object Configs {
Some(UdfConfigEntry(sep, cols, newCol))
} else None

val filterConfig = if(tagConfig.hasPath("filter")) {
Some(FilterConfigEntry(tagConfig.getString("filter")))
} else None

LOG.info(s"name ${tagName} batch ${batch}")
val entry = TagConfigEntry(
tagName,
Expand All @@ -485,7 +493,8 @@ object Configs {
enableTagless,
ignoreIndex,
deleteEdge,
vertexUdf
vertexUdf,
filterConfig
)
LOG.info(s"Tag Config: ${entry}")
tags += entry
Expand Down Expand Up @@ -608,6 +617,11 @@ object Configs {
Some(UdfConfigEntry(sep, cols, newCol))
} else None


val filterConfig = if (edgeConfig.hasPath("filter")) {
Some(FilterConfigEntry(edgeConfig.getString("filter")))
} else None

val entry = EdgeConfigEntry(
edgeName,
sourceConfig,
Expand All @@ -631,7 +645,8 @@ object Configs {
repartitionWithNebula,
ignoreIndex,
srcUdf,
dstUdf
dstUdf,
filterConfig
)
LOG.info(s"Edge Config: ${entry}")
edges += entry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ case class TagConfigEntry(override val name: String,
enableTagless: Boolean = false,
ignoreIndex: Boolean = false,
deleteEdge: Boolean = false,
vertexUdf: Option[UdfConfigEntry] = None)
vertexUdf: Option[UdfConfigEntry] = None,
filterConfig: Option[FilterConfigEntry] = None)
extends SchemaConfigEntry {
require(name.trim.nonEmpty, "tag name cannot be empty")
require(vertexField.trim.nonEmpty, "tag vertex id cannot be empty")
Expand All @@ -89,7 +90,8 @@ case class TagConfigEntry(override val name: String,
s"repartitionWithNebula: $repartitionWithNebula, " +
s"enableTagless: $enableTagless, " +
s"ignoreIndex: $ignoreIndex, " +
s"vertexUdf: $vertexUdf."
s"vertexUdf: $vertexUdf, " +
s"filter: $filterConfig."
}
}

Expand Down Expand Up @@ -134,7 +136,8 @@ case class EdgeConfigEntry(override val name: String,
repartitionWithNebula: Boolean = false,
ignoreIndex: Boolean = false,
srcVertexUdf: Option[UdfConfigEntry] = None,
dstVertexUdf: Option[UdfConfigEntry] = None)
dstVertexUdf: Option[UdfConfigEntry] = None,
filterConfig: Option[FilterConfigEntry] = None)
extends SchemaConfigEntry {
require(name.trim.nonEmpty, "edge name cannot be empty")
require(sourceField.trim.nonEmpty, "edge source id cannot be empty")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,47 +10,8 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import java.io.File
import com.vesoft.exchange.Argument
import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler}
import com.vesoft.exchange.common.config.{
ClickHouseConfigEntry,
Configs,
DataSourceConfigEntry,
EdgeConfigEntry,
FileBaseSourceConfigEntry,
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
JanusGraphSourceConfigEntry,
JdbcConfigEntry,
KafkaSourceConfigEntry,
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
OracleConfigEntry,
PostgreSQLSourceConfigEntry,
PulsarSourceConfigEntry,
SchemaConfigEntry,
SinkCategory,
SourceCategory,
TagConfigEntry,
UdfConfigEntry
}
import com.vesoft.nebula.exchange.reader.{
CSVReader,
ClickhouseReader,
HBaseReader,
HiveReader,
JSONReader,
JanusGraphReader,
JdbcReader,
KafkaReader,
MaxcomputeReader,
MySQLReader,
Neo4JReader,
ORCReader,
OracleReader,
ParquetReader,
PostgreSQLReader,
PulsarReader
}
import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, EdgeConfigEntry, FileBaseSourceConfigEntry, FilterConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, JdbcConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, OracleConfigEntry, PostgreSQLSourceConfigEntry, PulsarSourceConfigEntry, SchemaConfigEntry, SinkCategory, SourceCategory, TagConfigEntry, UdfConfigEntry}
import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, JdbcReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, OracleReader, ParquetReader, PostgreSQLReader, PulsarReader}
import com.vesoft.exchange.common.processor.ReloadProcessor
import com.vesoft.exchange.common.utils.SparkValidate
import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor}
Expand Down Expand Up @@ -124,7 +85,7 @@ object Exchange {
var totalClientBatchFailure: Long = 0L
var totalClientRecordSuccess: Long = 0L
var totalClientRecordFailure: Long = 0L
var totalSstRecordSuccess: Long = 0l
var totalSstRecordSuccess: Long = 0L
var totalSstRecordFailure: Long = 0L

// reload for failed import tasks
Expand Down Expand Up @@ -171,10 +132,11 @@ object Exchange {
data.get.show(truncate = false)
}
if (data.isDefined && !c.dry) {
val df = if (tagConfig.vertexUdf.isDefined) {
dataUdf(data.get, tagConfig.vertexUdf.get)
var df = filterDf(data.get, tagConfig.filterConfig)
df = if (tagConfig.vertexUdf.isDefined) {
dataUdf(df, tagConfig.vertexUdf.get)
} else {
data.get
df
}
val batchSuccess =
spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}")
Expand Down Expand Up @@ -238,7 +200,7 @@ object Exchange {
data.get.show(truncate = false)
}
if (data.isDefined && !c.dry) {
var df = data.get
var df = filterDf(data.get, edgeConfig.filterConfig)
if (edgeConfig.srcVertexUdf.isDefined) {
df = dataUdf(df, edgeConfig.srcVertexUdf.get)
}
Expand Down Expand Up @@ -437,4 +399,13 @@ object Exchange {
finalColNames.append(concat_ws(sep, oldCols.map(c => col(c)): _*).cast(StringType).as(newCol))
data.select(finalColNames: _*)
}


private[this] def filterDf(data: DataFrame, filter: Option[FilterConfigEntry]): DataFrame = {
if (filter.isDefined && filter.get != null && filter.get.filter != null) {
data.filter(filter.get.filter)
} else {
data
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,47 +10,8 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import java.io.File
import com.vesoft.exchange.Argument
import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler}
import com.vesoft.exchange.common.config.{
ClickHouseConfigEntry,
Configs,
DataSourceConfigEntry,
EdgeConfigEntry,
FileBaseSourceConfigEntry,
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
JanusGraphSourceConfigEntry,
JdbcConfigEntry,
KafkaSourceConfigEntry,
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
OracleConfigEntry,
PostgreSQLSourceConfigEntry,
PulsarSourceConfigEntry,
SchemaConfigEntry,
SinkCategory,
SourceCategory,
TagConfigEntry,
UdfConfigEntry
}
import com.vesoft.nebula.exchange.reader.{
CSVReader,
ClickhouseReader,
HBaseReader,
HiveReader,
JSONReader,
JanusGraphReader,
JdbcReader,
KafkaReader,
MaxcomputeReader,
MySQLReader,
Neo4JReader,
ORCReader,
OracleReader,
ParquetReader,
PostgreSQLReader,
PulsarReader
}
import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, EdgeConfigEntry, FileBaseSourceConfigEntry, FilterConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, JdbcConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, OracleConfigEntry, PostgreSQLSourceConfigEntry, PulsarSourceConfigEntry, SchemaConfigEntry, SinkCategory, SourceCategory, TagConfigEntry, UdfConfigEntry}
import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, JdbcReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, OracleReader, ParquetReader, PostgreSQLReader, PulsarReader}
import com.vesoft.exchange.common.processor.ReloadProcessor
import com.vesoft.exchange.common.utils.SparkValidate
import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor}
Expand Down Expand Up @@ -124,7 +85,7 @@ object Exchange {
var totalClientBatchFailure: Long = 0L
var totalClientRecordSuccess: Long = 0L
var totalClientRecordFailure: Long = 0L
var totalSstRecordSuccess: Long = 0l
var totalSstRecordSuccess: Long = 0L
var totalSstRecordFailure: Long = 0L

// reload for failed import tasks
Expand Down Expand Up @@ -170,10 +131,11 @@ object Exchange {
data.get.show(truncate = false)
}
if (data.isDefined && !c.dry) {
val df = if (tagConfig.vertexUdf.isDefined) {
dataUdf(data.get, tagConfig.vertexUdf.get)
var df = filterDf(data.get, tagConfig.filterConfig)
df = if (tagConfig.vertexUdf.isDefined) {
dataUdf(df, tagConfig.vertexUdf.get)
} else {
data.get
df
}

val batchSuccess =
Expand Down Expand Up @@ -237,7 +199,7 @@ object Exchange {
data.get.show(truncate = false)
}
if (data.isDefined && !c.dry) {
var df = data.get
var df = filterDf(data.get, edgeConfig.filterConfig)
if (edgeConfig.srcVertexUdf.isDefined) {
df = dataUdf(df, edgeConfig.srcVertexUdf.get)
}
Expand Down Expand Up @@ -436,4 +398,13 @@ object Exchange {
finalColNames.append(concat_ws(sep, oldCols.map(c => col(c)): _*).cast(StringType).as(newCol))
data.select(finalColNames: _*)
}

private[this] def filterDf(data: DataFrame, filter: Option[FilterConfigEntry]): DataFrame = {
data.show()
if (filter.isDefined && filter.get != null && filter.get.filter != null) {
data.filter(filter.get.filter)
} else {
data
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,47 +10,8 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import java.io.File
import com.vesoft.exchange.Argument
import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler}
import com.vesoft.exchange.common.config.{
ClickHouseConfigEntry,
Configs,
DataSourceConfigEntry,
EdgeConfigEntry,
FileBaseSourceConfigEntry,
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
JanusGraphSourceConfigEntry,
JdbcConfigEntry,
KafkaSourceConfigEntry,
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
OracleConfigEntry,
PostgreSQLSourceConfigEntry,
PulsarSourceConfigEntry,
SchemaConfigEntry,
SinkCategory,
SourceCategory,
TagConfigEntry,
UdfConfigEntry
}
import com.vesoft.nebula.exchange.reader.{
CSVReader,
ClickhouseReader,
HBaseReader,
HiveReader,
JSONReader,
JanusGraphReader,
JdbcReader,
KafkaReader,
MaxcomputeReader,
MySQLReader,
Neo4JReader,
ORCReader,
OracleReader,
ParquetReader,
PostgreSQLReader,
PulsarReader
}
import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, EdgeConfigEntry, FileBaseSourceConfigEntry, FilterConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, JdbcConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, OracleConfigEntry, PostgreSQLSourceConfigEntry, PulsarSourceConfigEntry, SchemaConfigEntry, SinkCategory, SourceCategory, TagConfigEntry, UdfConfigEntry}
import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, JdbcReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, OracleReader, ParquetReader, PostgreSQLReader, PulsarReader}
import com.vesoft.exchange.common.processor.ReloadProcessor
import com.vesoft.exchange.common.utils.SparkValidate
import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor}
Expand Down Expand Up @@ -124,7 +85,7 @@ object Exchange {
var totalClientBatchFailure: Long = 0L
var totalClientRecordSuccess: Long = 0L
var totalClientRecordFailure: Long = 0L
var totalSstRecordSuccess: Long = 0l
var totalSstRecordSuccess: Long = 0L
var totalSstRecordFailure: Long = 0L

// reload for failed import tasks
Expand Down Expand Up @@ -170,10 +131,11 @@ object Exchange {
data.get.show(truncate = false)
}
if (data.isDefined && !c.dry) {
val df = if (tagConfig.vertexUdf.isDefined) {
dataUdf(data.get, tagConfig.vertexUdf.get)
var df = filterDf(data.get, tagConfig.filterConfig)
df = if (tagConfig.vertexUdf.isDefined) {
dataUdf(df, tagConfig.vertexUdf.get)
} else {
data.get
df
}
val batchSuccess =
spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}")
Expand Down Expand Up @@ -236,7 +198,7 @@ object Exchange {
data.get.show(truncate = false)
}
if (data.isDefined && !c.dry) {
var df = data.get
var df = filterDf(data.get, edgeConfig.filterConfig)
if (edgeConfig.srcVertexUdf.isDefined) {
df = dataUdf(df, edgeConfig.srcVertexUdf.get)
}
Expand Down Expand Up @@ -434,4 +396,13 @@ object Exchange {
finalColNames.append(concat_ws(sep, oldCols.map(c => col(c)): _*).cast(StringType).as(newCol))
data.select(finalColNames: _*)
}


private[this] def filterDf(data: DataFrame, filter: Option[FilterConfigEntry]): DataFrame = {
if (filter.isDefined && filter.get != null && filter.get.filter != null) {
data.filter(filter.get.filter)
} else {
data
}
}
}

0 comments on commit c121554

Please sign in to comment.