Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can Nebula2Nebula support multi-thread sync ? #136

Open
awang12345 opened this issue Dec 28, 2023 · 13 comments
Open

Can Nebula2Nebula support multi-thread sync ? #136

awang12345 opened this issue Dec 28, 2023 · 13 comments
Labels
type/question Type: question about the product

Comments

@awang12345
Copy link

awang12345 commented Dec 28, 2023

reason
The current logic is to read all the partition data and then compose insert statements to write it out to new nebula .
It's possible to run out of memory if you have a large amount of data, and of course spark does overflow to disk, but I did run into OOM job interruptions. In addition, it does not give full play to the ability of multitasking, in fact, it is completely possible to split the partition, each partition a task, read a partition to write, the efficiency will be much higher.

// multi-thread sync  one partition data of tag or edge
for (partitionId <- 1 to partitions) {
      val task = new Runnable {
        def run(): Unit = {
          syncTagPartitionData(spark,
            ........
            partitionId
          )
        }
      }
 
      threadPool.execute(task);
    }

//set  special scan partitionId  
   val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .with.....
      ......
      .withPartitionId(partitionId) 
      .build()
    var vertex = spark.read.nebula(sourceConfig, nebulaReadVertexConfig).loadVerticesToDF()
 
  // create task for special partition id
   class SimpleScan(nebulaOptions: NebulaOptions, nebulaTotalPart: Int, schema: StructType)
  extends Scan
    with Batch {
  override def planInputPartitions(): Array[InputPartition] = {
    //return special partiton id for task
    if (nebulaOptions.readPartitionId != null && nebulaOptions.readPartitionId > 0) {
      LOG.info(s"planInputPartitions partions:${nebulaOptions.readPartitionId}")
      return Array(NebulaPartitionBatch(Array(nebulaOptions.readPartitionId)))
    }
    .....
  }
  
@QingZ11 QingZ11 added the type/question Type: question about the product label Jan 2, 2024
@Nicole00
Copy link
Contributor

Nicole00 commented Jan 2, 2024

Thanks to propose a great idea. Your purpose is to increase the concurrency and avoid the OOM problem?
I wonder Why use multiple threads but not use the ability of spark multiple partitions? If we want a high concurrency, we can distribute more total executor cores to allow all partitions of all tags and edges to run together.
And if we want to avoid oom, On the contrary we should decrease the concurrency to decrease the data amount in memory.

@awang12345
Copy link
Author

oom is only one of the reasons, and the main reason is that the existing example Nebula2Nebula needs to read all the data before writing, which is too slow. Part of the partition can be read first, and then the data after reading can be written. The purpose is to make reading and writing concurrent as far as possible, rather than only reading or writing at a certain time, for faster synchronization

@awang12345
Copy link
Author

If we simply increase the concurrency of reading, we can improve the efficiency, but it will put too much pressure on the source Nebula server

@Nicole00
Copy link
Contributor

Nicole00 commented Jan 2, 2024

No matter which method is used, as long as the number of concurrency on the upper layer is the same, the pressure on the server is the same.
Besides, the reader is using client's ScanIterator, which cannot be split into sub tasks.

@awang12345
Copy link
Author

But if I want to read a partition and write it, can I do that? I don't know much about spark, and from my practice, I currently write when I've read everything.For example, 256 partitions, I can set a maximum of 16 partitions to be read concurrently and synchronize to the target nebula server every time I read a partition

@Nicole00
Copy link
Contributor

Nicole00 commented Jan 3, 2024

But if I want to read a partition and write it, can I do that? I don't know much about spark, and from my practice, I currently write when I've read everything.For example, 256 partitions, I can set a maximum of 16 partitions to be read concurrently and synchronize to the target nebula server every time I read a partition

First of all, for the spark connector, the reader refers to reading the data of the specified tag/edge. This does not involve specifying the part, so the partId is not exposed to the outside world.
Secondly, from the connector's perspective, there is no business requirement to specify only a certain part of data, because such data is incomplete and this part of data has no business significance.

If we want to scan a certain part, we should use the StorageClient in the Java client to scan the data of the specified part.

@awang12345
Copy link
Author

Sorry, maybe I didn't make that clear, but what I meant to say was that I could write as soon as one of the one partitions had been read, instead of waiting for all the partitions to be read

@Nicole00
Copy link
Contributor

Nicole00 commented Jan 3, 2024

Yeah, I got your point.
You want to read and write at the same time, and the granularity of reading and writing needs to be lower. Currently, the granularity of reading and writing is at the part level. After the data of a part is read, it will be written subsequently.

For now, the part level is the smallest granularity. It is impossible to read data concurrently within a single part because you do not know where the cursor for data scanning is in each iteration.

@awang12345
Copy link
Author

Thank you very much for your response.My idea is to split the partition list of a tag or edge into one dataframe by dataframe to sync. In my example above, I can read a partition and then write it, and it is already applied online

@Nicole00
Copy link
Contributor

Nicole00 commented Jan 3, 2024

Wow that's great. can you use the ability of multiple machines of spark cluster?

@awang12345
Copy link
Author

awang12345 commented Jan 3, 2024

Yes,The online spark environment is a YARN-based cluster.Synchronization speed up by 50%

@Nicole00
Copy link
Contributor

Nicole00 commented Jan 8, 2024

Yes,The online spark environment is a YARN-based cluster.Synchronization speed up by 50%

This is amazing improvement. How fast can data be migrated between Nebulas? I'll test it and see if we can get it in our environment.

@awang12345
Copy link
Author

awang12345 commented Jan 8, 2024

3 billion data 6 completed migration.The original program was cost 14 hours. Of course, this has something to do with limiting the read rate. But the same derived rate case. Using multithreaded partition for synchronization is still 50% better. Because reads and writes are parallel

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/question Type: question about the product
Projects
None yet
Development

No branches or pull requests

3 participants