From 200ce65b44b33cfe16bbc54693f53d33aca1c9a5 Mon Sep 17 00:00:00 2001 From: Chendi Xue Date: Wed, 3 Apr 2019 11:00:43 +0800 Subject: [PATCH] C++ PMDK wrapper refine and refactor 1. Add partition remove feature, so we will remove used spilled data from pmem to release more memory. 2. stageArray and partitionArray append feature. Originally we need to tell Pmem the max stage size and max partition size, while these sizes are not predicted. So once size exceeds our default size, it will do append creation. Signed-off-by: Chendi Xue --- spark.conf.example | 2 +- src/main/cpp/PersistentMemoryPool.cpp | 143 +++++ src/main/cpp/PersistentMemoryPool.h | 530 +----------------- src/main/cpp/Request.cpp | 340 +++++++++++ src/main/cpp/Request.h | 248 ++++++++ src/main/cpp/WorkQueue.h | 16 +- src/main/cpp/lib_jni_pmdk.cpp | 18 +- src/main/cpp/lib_jni_pmdk.h | 26 +- src/main/cpp/make | 2 +- .../storage/pmof/PersistentMemoryPool.java | 22 +- .../shuffle/pmof/PmemShuffleReader.scala | 2 +- .../shuffle/pmof/PmemShuffleWriter.scala | 16 +- .../pmof/PersistentMemoryHandler.scala | 24 +- .../storage/pmof/PmemBlockObjectStream.scala | 2 +- .../spark/storage/pmof/PmemInputStream.scala | 11 +- .../spark/storage/pmof/PmemOutputStream.scala | 5 +- .../collection/pmof/PmemExternalSorter.scala | 1 + 17 files changed, 852 insertions(+), 556 deletions(-) create mode 100644 src/main/cpp/PersistentMemoryPool.cpp create mode 100644 src/main/cpp/Request.cpp create mode 100644 src/main/cpp/Request.h diff --git a/spark.conf.example b/spark.conf.example index dccfbf3e..fedce213 100644 --- a/spark.conf.example +++ b/spark.conf.example @@ -9,8 +9,8 @@ spark.shuffle.pmof.enable_rdma true spark.shuffle.pmof.enable_pmem true # for persistent memory -spark.shuffle.pmof.max_stage_num 100 spark.shuffle.pmof.pmem_list /dev/dax0.0,/dev/dax1.0 +spark.shuffle.spill.pmof.MemoryThreshold 16777216 # for rdma spark.shuffle.pmof.server_buffer_nums 32 diff --git a/src/main/cpp/PersistentMemoryPool.cpp b/src/main/cpp/PersistentMemoryPool.cpp new file mode 100644 index 00000000..93353aaf --- /dev/null +++ b/src/main/cpp/PersistentMemoryPool.cpp @@ -0,0 +1,143 @@ +#include "PersistentMemoryPool.h" +#include +#include +#include +#include +#include +#include +#include + +PMPool::PMPool(const char* dev, int maxStage, int maxMap, long size): + maxStage(maxStage), + maxMap(maxMap), + stop(false), + dev(dev), + worker(&PMPool::process, this) { + + const char *pool_layout_name = "pmem_spark_shuffle"; + cout << "PMPOOL is " << dev << endl; + // if this is a fsdax device + // we need to create + // if this is a devdax device + + pmpool = pmemobj_open(dev, pool_layout_name); + if (pmpool == NULL) { + pmpool = pmemobj_create(dev, pool_layout_name, size, S_IRUSR | S_IWUSR); + } + if (pmpool == NULL) { + cerr << "Failed to create pool, kill process, errmsg: " << pmemobj_errormsg() << endl; + exit(-1); + } + + stageArrayRoot = POBJ_ROOT(pmpool, struct StageArrayRoot); +} + +PMPool::~PMPool() { + while(request_queue.size() > 0) { + fprintf(stderr, "%s request queue size is %d\n", dev, request_queue.size()); + sleep(1); + } + fprintf(stderr, "%s request queue size is %d\n", dev, request_queue.size()); + stop = true; + worker.join(); + pmemobj_close(pmpool); +} + +long PMPool::getRootAddr() { + return (long)pmpool; +} + +void PMPool::process() { + Request *cur_req; + while(!stop) { + cur_req = (Request*)request_queue.dequeue(); + if (cur_req != nullptr) { + cur_req->exec(); + } + } +} + +long PMPool::setMapPartition( + int partitionNum, + int stageId, + int mapId, + int partitionId, + long size, + char* data, + bool clean, + int numMaps) { + WriteRequest write_request(this, maxStage, numMaps, partitionNum, stageId, 0, mapId, partitionId, size, data, clean); + request_queue.enqueue((void*)&write_request); + return write_request.getResult(); +} + +long PMPool::setReducePartition( + int partitionNum, + int stageId, + int partitionId, + long size, + char* data, + bool clean, + int numMaps) { + WriteRequest write_request(this, maxStage, 1, partitionNum, stageId, 1, 0, partitionId, size, data, clean); + request_queue.enqueue((void*)&write_request); + + return write_request.getResult(); +} + +long PMPool::getMapPartition( + MemoryBlock* mb, + int stageId, + int mapId, + int partitionId ) { + ReadRequest read_request(this, mb, stageId, 0, mapId, partitionId); + read_request.exec(); + return read_request.getResult(); +} + +long PMPool::getReducePartition( + MemoryBlock* mb, + int stageId, + int mapId, + int partitionId ) { + ReadRequest read_request(this, mb, stageId, 1, mapId, partitionId); + read_request.exec(); + read_request.getResult(); +} + +long PMPool::getMapPartitionBlockInfo(BlockInfo *blockInfo, int stageId, int mapId, int partitionId) { + MetaRequest meta_request(this, blockInfo, stageId, 0, mapId, partitionId); + meta_request.exec(); + return meta_request.getResult(); +} + +long PMPool::getReducePartitionBlockInfo(BlockInfo *blockInfo, int stageId, int mapId, int partitionId) { + MetaRequest meta_request(this, blockInfo, stageId, 1, mapId, partitionId); + meta_request.exec(); + return meta_request.getResult(); +} + +long PMPool::getMapPartitionSize(int stageId, int mapId, int partitionId) { + SizeRequest size_request(this, stageId, 0, mapId, partitionId); + size_request.exec(); + return size_request.getResult(); +} + +long PMPool::getReducePartitionSize(int stageId, int mapId, int partitionId) { + SizeRequest size_request(this, stageId, 1, mapId, partitionId); + size_request.exec(); + return size_request.getResult(); +} + +long PMPool::deleteMapPartition(int stageId, int mapId, int partitionId) { + DeleteRequest delete_request(this, stageId, 0, mapId, partitionId); + request_queue.enqueue((void*)&delete_request); + return delete_request.getResult(); +} + +long PMPool::deleteReducePartition(int stageId, int mapId, int partitionId) { + DeleteRequest delete_request(this, stageId, 1, mapId, partitionId); + request_queue.enqueue((void*)&delete_request); + return delete_request.getResult(); +} + diff --git a/src/main/cpp/PersistentMemoryPool.h b/src/main/cpp/PersistentMemoryPool.h index 504be055..a48e3d75 100644 --- a/src/main/cpp/PersistentMemoryPool.h +++ b/src/main/cpp/PersistentMemoryPool.h @@ -1,18 +1,9 @@ -#include -#include -#include -#include +#ifndef PMPOOL_H +#define PMPOOL_H -#include -#include -#include -#include #include -#include -#include -#include -#include #include "WorkQueue.h" +#include "Request.h" /* This class is used to make PM as a huge pool * All shuffle files will be stored in the same pool @@ -42,518 +33,39 @@ using namespace std; -#define TOID_ARRAY_TYPE(x) TOID(x) -#define TOID_ARRAY(x) TOID_ARRAY_TYPE(TOID(x)) -#define TYPENUM 2 - -POBJ_LAYOUT_BEGIN(PersistentMemoryStruct); -POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct StageArrayRoot); -POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct StageArray); -POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct StageArrayItem); -POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct TypeArray); -POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct TypeArrayItem); -POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct MapArray); -POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct MapArrayItem); -POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct PartitionArray); -POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct PartitionArrayItem); -POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct PartitionBlock); -POBJ_LAYOUT_END(PersistentMemoryStruct); - -struct StageArrayRoot { - TOID(struct StageArray) stageArray; -}; - -struct StageArray { - int size; - TOID(struct StageArrayItem) items[]; -}; - -struct StageArrayItem { - TOID(struct TypeArray) typeArray; -}; - -struct TypeArray { - int size; - TOID(struct TypeArrayItem) items[]; -}; - -struct TypeArrayItem { - TOID(struct MapArray) mapArray; -}; - -struct MapArray { - int size; - TOID(struct MapArrayItem) items[]; -}; - -struct MapArrayItem { - TOID(struct PartitionArray) partitionArray; -}; - -struct PartitionArray { - int size; - TOID(struct PartitionArrayItem) items[]; -}; - -struct PartitionArrayItem { - TOID(struct PartitionBlock) first_block; - TOID(struct PartitionBlock) last_block; - long partition_size; - int numBlocks; -}; - -struct PartitionBlock { - TOID(struct PartitionBlock) next_block; - long data_size; - PMEMoid data; -}; - -struct MemoryBlock { - char* buf; - int len; - MemoryBlock() { - } - - ~MemoryBlock() { - delete[] buf; - } -}; - -struct BlockInfo { - long* data; - BlockInfo() { - } - - ~BlockInfo() { - delete data; - } -}; - class PMPool { public: PMEMobjpool *pmpool; + + std::thread worker; + WorkQueue request_queue; + bool stop; + TOID(struct StageArrayRoot) stageArrayRoot; int maxStage; int maxMap; - std::mutex pmem_index_lock; - int core_s; - int core_e; - std::thread worker; - bool stop; const char* dev; - WorkQueue request_queue; + PMPool(const char* dev, int maxStage, int maxMap, long size); ~PMPool(); long getRootAddr(); - long setMapPartition(int partitionNum, int stageId, int mapId, int partitionId, long size, char* data, bool clean); - long setReducePartition(int partitionNum, int stageId, int partitionId, long size, char* data, bool clean); + + long setMapPartition(int partitionNum, int stageId, int mapId, int partitionId, long size, char* data, bool clean, int numMaps); + long setReducePartition(int partitionNum, int stageId, int partitionId, long size, char* data, bool clean, int numMaps); + long getMapPartition(MemoryBlock* mb, int stageId, int mapId, int partitionId); long getReducePartition(MemoryBlock* mb, int stageId, int mapId, int partitionId); - long getPartition(MemoryBlock* mb, int stageId, int typeId, int mapId, int partitionId); - int getMapPartitionBlockInfo(BlockInfo *block_info, int stageId, int mapId, int partitionId); - int getReducePartitionBlockInfo(BlockInfo *block_info, int stageId, int mapId, int partitionId); + + long getMapPartitionBlockInfo(BlockInfo *block_info, int stageId, int mapId, int partitionId); + long getReducePartitionBlockInfo(BlockInfo *block_info, int stageId, int mapId, int partitionId); + long getMapPartitionSize(int stageId, int mapId, int partitionId); long getReducePartitionSize(int stageId, int mapId, int partitionId); + long deleteMapPartition(int stageId, int mapId, int partitionId); + long deleteReducePartition(int stageId, int mapId, int partitionId); +private: void process(); - TOID(struct PartitionArrayItem) getPartitionBlock(int stageId, int typeId, int mapId, int partitionId); -}; - -class Request { -public: - // add lock to make this request blocked - std::mutex mtx; - std::condition_variable cv; - bool processed; - std::unique_lock lck; - - // add lock to make func blocked - std::mutex block_mtx; - std::condition_variable block_cv; - bool committed; - std::unique_lock block_lck; - - int maxStage; - int maxMap; - int partitionNum; - int stageId; - int typeId; - int mapId; - int partitionId; - long size; - char *data; - bool set_clean; - char* data_addr; - PMPool* pmpool_ptr; - - Request(PMPool* pmpool_ptr, - int maxStage, - int maxMap, - int partitionNum, - int stageId, - int typeId, - int mapId, - int partitionId, - long size, - char* data, - bool set_clean); - ~Request(); - void setPartition(); - long getResult(); }; -PMPool::PMPool(const char* dev, int maxStage, int maxMap, long size): - maxStage(maxStage), - maxMap(maxMap), - stop(false), - dev(dev), - worker(&PMPool::process, this) { - - const char *pool_layout_name = "pmem_spark_shuffle"; - cout << "PMPOOL is " << dev << endl; - // if this is a fsdax device - // we need to create - // if this is a devdax device - - pmpool = pmemobj_open(dev, pool_layout_name); - if (pmpool == NULL) { - cout << "Failed to open dev, try to create, errmsg: " << pmemobj_errormsg() << endl; - pmpool = pmemobj_create(dev, pool_layout_name, size, S_IRUSR | S_IWUSR); - } - if (pmpool == NULL) { - cerr << "Failed to create pool, kill process, errmsg: " << pmemobj_errormsg() << endl; - exit(-1); - } - - stageArrayRoot = POBJ_ROOT(pmpool, struct StageArrayRoot); -} - -PMPool::~PMPool() { - while(request_queue.size() > 0) { - fprintf(stderr, "%s request queue size is %d\n", dev, request_queue.size()); - sleep(1); - } - fprintf(stderr, "%s request queue size is %d\n", dev, request_queue.size()); - stop = true; - worker.join(); - pmemobj_close(pmpool); -} - -long PMPool::getRootAddr() { - return (long)pmpool; -} - -void PMPool::process() { - Request *cur_req; - while(!stop) { - cur_req = (Request*)request_queue.dequeue(); - if (cur_req != nullptr) { - cur_req->setPartition(); - } - } -} - -long PMPool::setMapPartition( - int partitionNum, - int stageId, - int mapId, - int partitionId, - long size, - char* data, - bool clean) { - Request write_request(this, maxStage, maxMap, partitionNum, stageId, 0, mapId, partitionId, size, data, clean); - request_queue.enqueue((void*)&write_request); - return write_request.getResult(); -} - -long PMPool::setReducePartition( - int partitionNum, - int stageId, - int partitionId, - long size, - char* data, - bool clean) { - Request write_request(this, maxStage, 1, partitionNum, stageId, 1, 0, partitionId, size, data, clean); - request_queue.enqueue((void*)&write_request); - return write_request.getResult(); -} - -long PMPool::getMapPartition( - MemoryBlock* mb, - int stageId, - int mapId, - int partitionId ) { - return getPartition(mb, stageId, 0, mapId, partitionId); -} - -long PMPool::getReducePartition( - MemoryBlock* mb, - int stageId, - int mapId, - int partitionId ) { - return getPartition(mb, stageId, 1, mapId, partitionId); -} - -long PMPool::getPartition( - MemoryBlock* mb, - int stageId, - int typeId, - int mapId, - int partitionId ) { - //taskset(core_s, core_e); - TOID(struct PartitionArrayItem) partitionArrayItem = getPartitionBlock(stageId, typeId, mapId, partitionId); - if (TOID_IS_NULL(partitionArrayItem)) return -1; - long data_length = D_RO(partitionArrayItem)->partition_size; - mb->buf = new char[data_length](); - long off = 0; - TOID(struct PartitionBlock) partitionBlock = D_RO(partitionArrayItem)->first_block; - - char* data_addr; - while(!TOID_IS_NULL(partitionBlock)) { - data_addr = (char*)pmemobj_direct(D_RO(partitionBlock)->data); - //printf("getPartition data_addr: %p\n", data_addr); - - memcpy(mb->buf + off, data_addr, D_RO(partitionBlock)->data_size); - off += D_RO(partitionBlock)->data_size; - partitionBlock = D_RO(partitionBlock)->next_block; - } - - //printf("getPartition length is %d\n", data_length); - return data_length; -} - -int PMPool::getMapPartitionBlockInfo( - BlockInfo* block_info, - int stageId, - int mapId, - int partitionId) { - TOID(struct PartitionArrayItem) partitionArrayItem = getPartitionBlock(stageId, 0, mapId, partitionId); - if (TOID_IS_NULL(partitionArrayItem)) return -1; - TOID(struct PartitionBlock) partitionBlock = D_RO(partitionArrayItem)->first_block; - - int numBlocks = D_RO(partitionArrayItem)->numBlocks; - block_info->data = new long[numBlocks * 2](); - int i = 0; - - while(!TOID_IS_NULL(partitionBlock)) { - block_info->data[i++] = (long)pmemobj_direct(D_RO(partitionBlock)->data); - block_info->data[i++] = D_RO(partitionBlock)->data_size; - partitionBlock = D_RO(partitionBlock)->next_block; - } - - return numBlocks * 2; -} - -int PMPool::getReducePartitionBlockInfo( - BlockInfo* block_info, - int stageId, - int mapId, - int partitionId) { - TOID(struct PartitionArrayItem) partitionArrayItem = getPartitionBlock(stageId, 1, mapId, partitionId); - if (TOID_IS_NULL(partitionArrayItem)) return -1; - TOID(struct PartitionBlock) partitionBlock = D_RO(partitionArrayItem)->first_block; - - int numBlocks = D_RO(partitionArrayItem)->numBlocks; - block_info->data = new long[numBlocks * 2](); - int i = 0; - - while(!TOID_IS_NULL(partitionBlock)) { - block_info->data[i++] = (long)pmemobj_direct(D_RO(partitionBlock)->data); - block_info->data[i++] = D_RO(partitionBlock)->data_size; - partitionBlock = D_RO(partitionBlock)->next_block; - } - - return numBlocks * 2; -} - -long PMPool::getMapPartitionSize( - int stageId, - int mapId, - int partitionId ) { - TOID(struct PartitionArrayItem) partitionArrayItem = getPartitionBlock(stageId, 0, mapId, partitionId); - if (TOID_IS_NULL(partitionArrayItem)) return -1; - long data_length = D_RO(partitionArrayItem)->partition_size; - return data_length; -} - -long PMPool::getReducePartitionSize( - int stageId, - int mapId, - int partitionId ) { - TOID(struct PartitionArrayItem) partitionArrayItem = getPartitionBlock(stageId, 1, mapId, partitionId); - if (TOID_IS_NULL(partitionArrayItem)) return -1; - long data_length = D_RO(partitionArrayItem)->partition_size; - return data_length; -} - -TOID(struct PartitionArrayItem) PMPool::getPartitionBlock(int stageId, int typeId, int mapId, int partitionId) { - if(TOID_IS_NULL(D_RO(stageArrayRoot)->stageArray)){ - fprintf(stderr, "get Partition of %d_%d_%d failed: stageArray none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); - return TOID_NULL(struct PartitionArrayItem); - } - - TOID(struct StageArrayItem) stageArrayItem = D_RO(D_RO(stageArrayRoot)->stageArray)->items[stageId]; - if(TOID_IS_NULL(stageArrayItem) || TOID_IS_NULL(D_RO(stageArrayItem)->typeArray)) { - fprintf(stderr, "get Partition of %d_%d_%d failed: stageArrayItem OR typeArray none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); - return TOID_NULL(struct PartitionArrayItem); - } - - TOID(struct TypeArray) typeArray = D_RO(stageArrayItem)->typeArray; - TOID(struct TypeArrayItem) typeArrayItem = D_RO(D_RO(stageArrayItem)->typeArray)->items[typeId]; - if(TOID_IS_NULL(typeArrayItem) || TOID_IS_NULL(D_RO(typeArrayItem)->mapArray)) { - fprintf(stderr, "get Partition of %d_%d_%d failed: typeArrayItem OR mapArray none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); - return TOID_NULL(struct PartitionArrayItem); - } - - TOID(struct MapArray) mapArray = D_RO(typeArrayItem)->mapArray; - TOID(struct MapArrayItem) mapArrayItem = D_RO(D_RO(typeArrayItem)->mapArray)->items[mapId]; - if(TOID_IS_NULL(mapArrayItem) || TOID_IS_NULL(D_RO(mapArrayItem)->partitionArray)){ - fprintf(stderr, "get Partition of %d_%d_%d failed: mapArrayItem OR partitionArray none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); - return TOID_NULL(struct PartitionArrayItem); - } - - TOID(struct PartitionArrayItem) partitionArrayItem = D_RO(D_RO(mapArrayItem)->partitionArray)->items[partitionId]; - if(TOID_IS_NULL(partitionArrayItem) || TOID_IS_NULL(D_RO(partitionArrayItem)->first_block)) { - fprintf(stderr, "get Partition of %d_%d_%d failed: partitionArrayItem OR partitionBlock none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); - return TOID_NULL(struct PartitionArrayItem); - } - - return partitionArrayItem; -} - -Request::Request(PMPool* pmpool_ptr, - int maxStage, - int maxMap, - int partitionNum, - int stageId, - int typeId, - int mapId, - int partitionId, - long size, - char* data, - bool set_clean): - pmpool_ptr(pmpool_ptr), - maxStage(maxStage), - maxMap(maxMap), - partitionNum(partitionNum), - stageId(stageId), - typeId(typeId), - mapId(mapId), - partitionId(partitionId), - size(size), - data(data), - data_addr(nullptr), - set_clean(set_clean), - processed(false), - committed(false), - lck(mtx), block_lck(block_mtx) { -} - -Request::~Request() { -} - -long Request::getResult() { - { - while (!processed) { - usleep(5); - } - //cv.wait(lck, [&]{return processed;}); - } - //fprintf(stderr, "get Result for %d_%d_%d\n", stageId, mapId, partitionId); - if (data_addr == nullptr) - return -1; - return (long)data_addr; -} - -void Request::setPartition() { - TX_BEGIN(pmpool_ptr->pmpool) { - //taskset(core_s, core_e); - //cout << this << " enter setPartition tx" << endl; - //fprintf(stderr, "request for %d_%d_%d\n", stageId, mapId, partitionId); - TX_ADD(pmpool_ptr->stageArrayRoot); - // add a lock flag - - if (TOID_IS_NULL(D_RO(pmpool_ptr->stageArrayRoot)->stageArray)) { - D_RW(pmpool_ptr->stageArrayRoot)->stageArray = TX_ZALLOC(struct StageArray, sizeof(struct StageArray) + maxStage * sizeof(struct StageArrayItem)); - } - - TX_ADD_FIELD(pmpool_ptr->stageArrayRoot, stageArray); - TOID(struct StageArrayItem) *stageArrayItem = &(D_RW(D_RW(pmpool_ptr->stageArrayRoot)->stageArray)->items[stageId]); - if (TOID_IS_NULL(*stageArrayItem)) { - *stageArrayItem = TX_ZNEW(struct StageArrayItem); - } - TX_ADD(*stageArrayItem); - if (TOID_IS_NULL(D_RO(*stageArrayItem)->typeArray)) { - D_RW(*stageArrayItem)->typeArray = TX_ZALLOC(struct TypeArray, sizeof(struct TypeArray) + TYPENUM * sizeof(struct TypeArrayItem)); - } - - TX_ADD_FIELD(*stageArrayItem, typeArray); - TOID(struct TypeArrayItem) *typeArrayItem = &(D_RW(D_RW(*stageArrayItem)->typeArray)->items[typeId]); - if (TOID_IS_NULL(*typeArrayItem)) { - *typeArrayItem = TX_ZNEW(struct TypeArrayItem); - } - TX_ADD(*typeArrayItem); - if (TOID_IS_NULL(D_RO(*typeArrayItem)->mapArray)) { - D_RW(*typeArrayItem)->mapArray = TX_ZALLOC(struct MapArray, sizeof(struct MapArray) + maxMap * sizeof(struct MapArrayItem)); - } - - TX_ADD_FIELD(*typeArrayItem, mapArray); - TOID(struct MapArrayItem) *mapArrayItem = &(D_RW(D_RW(*typeArrayItem)->mapArray)->items[mapId]); - if (TOID_IS_NULL(*mapArrayItem)) { - *mapArrayItem = TX_ZNEW(struct MapArrayItem); - } - TX_ADD(*mapArrayItem); - if (TOID_IS_NULL(D_RO(*mapArrayItem)->partitionArray)) { - D_RW(*mapArrayItem)->partitionArray = TX_ZALLOC(struct PartitionArray, sizeof(struct PartitionArray) + partitionNum * sizeof(struct PartitionArrayItem)); - } - - TX_ADD_FIELD(*mapArrayItem, partitionArray); - TOID(struct PartitionArrayItem) *partitionArrayItem = &(D_RW(D_RW(*mapArrayItem)->partitionArray)->items[partitionId]); - if (TOID_IS_NULL(*partitionArrayItem)) { - *partitionArrayItem = TX_ZNEW(struct PartitionArrayItem); - } - TX_ADD(*partitionArrayItem); - TX_ADD_FIELD(*partitionArrayItem, partition_size); - - TOID(struct PartitionBlock) *partitionBlock = &(D_RW(*partitionArrayItem)->first_block); - TOID(struct PartitionBlock) *last_partitionBlock = &(D_RW(*partitionArrayItem)->last_block); - if (set_clean == false) { - // jump to last block - D_RW(*partitionArrayItem)->partition_size += size; - D_RW(*partitionArrayItem)->numBlocks += 1; - if (!TOID_IS_NULL(*last_partitionBlock)) { - partitionBlock = &(D_RW(*last_partitionBlock)->next_block); - } - } else { - //TODO: we should remove unused blocks - D_RW(*partitionArrayItem)->partition_size = size; - D_RW(*partitionArrayItem)->numBlocks = 1; - } - - TX_ADD_DIRECT(partitionBlock); - *partitionBlock = TX_ZALLOC(struct PartitionBlock, 1); - D_RW(*partitionBlock)->data = pmemobj_tx_zalloc(size, 0); - - D_RW(*partitionBlock)->data_size = size; - D_RW(*partitionBlock)->next_block = TOID_NULL(struct PartitionBlock); - D_RW(*partitionArrayItem)->last_block = *partitionBlock; - - data_addr = (char*)pmemobj_direct(D_RW(*partitionBlock)->data); - //printf("setPartition data_addr: %p\n", data_addr); - pmemobj_tx_add_range_direct((const void *)data_addr, size); - - memcpy(data_addr, data, size); - } TX_ONCOMMIT { - committed = true; - block_cv.notify_all(); - } TX_ONABORT { - fprintf(stderr, "set Partition of %d_%d_%d failed, type is %d, partitionNum is %d. Error: %s\n", stageId, mapId, partitionId, typeId, partitionNum, pmemobj_errormsg()); - exit(-1); - } TX_END - - block_cv.wait(block_lck, [&]{return committed;}); - //fprintf(stderr, "request committed %d_%d_%d\n", stageId, mapId, partitionId); - - processed = true; - //cv.notify_all(); -} +#endif diff --git a/src/main/cpp/Request.cpp b/src/main/cpp/Request.cpp new file mode 100644 index 00000000..fbb120f6 --- /dev/null +++ b/src/main/cpp/Request.cpp @@ -0,0 +1,340 @@ +#include "Request.h" +#include "PersistentMemoryPool.h" +#include +#include +#include +#include +#include + +/****** Request ******/ +TOID(struct PartitionArrayItem) Request::getPartitionBlock() { + if(TOID_IS_NULL(D_RO(pmpool_ptr->stageArrayRoot)->stageArray)){ + fprintf(stderr, "get Partition of %d_%d_%d failed: stageArray none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); + return TOID_NULL(struct PartitionArrayItem); + } + + /** get if stageId is greater than default maxStages.**/ + TOID(struct StageArray) stageArray = D_RO(pmpool_ptr->stageArrayRoot)->stageArray; + int stageArrayIndex = stageId / pmpool_ptr->maxStage; + int stageItemIndex = stageId % pmpool_ptr->maxStage; + for (int i = 0; i < stageArrayIndex; i++) { + if (TOID_IS_NULL(D_RO(stageArray)->nextStageArray)) { + fprintf(stderr, "get Partition of %d_%d_%d failed: stageArray none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); + return TOID_NULL(struct PartitionArrayItem); + } + stageArray = D_RO(stageArray)->nextStageArray; + } + /** get done **/ + + TOID(struct StageArrayItem) stageArrayItem = D_RO(stageArray)->items[stageItemIndex]; + if(TOID_IS_NULL(stageArrayItem) || TOID_IS_NULL(D_RO(stageArrayItem)->typeArray)) { + fprintf(stderr, "get Partition of %d_%d_%d failed: stageArrayItem OR typeArray none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); + return TOID_NULL(struct PartitionArrayItem); + } + + TOID(struct TypeArray) typeArray = D_RO(stageArrayItem)->typeArray; + TOID(struct TypeArrayItem) typeArrayItem = D_RO(D_RO(stageArrayItem)->typeArray)->items[typeId]; + if(TOID_IS_NULL(typeArrayItem) || TOID_IS_NULL(D_RO(typeArrayItem)->mapArray)) { + fprintf(stderr, "get Partition of %d_%d_%d failed: typeArrayItem OR mapArray none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); + return TOID_NULL(struct PartitionArrayItem); + } + + TOID(struct MapArray) mapArray = D_RO(typeArrayItem)->mapArray; + TOID(struct MapArrayItem) mapArrayItem = D_RO(D_RO(typeArrayItem)->mapArray)->items[mapId]; + if(TOID_IS_NULL(mapArrayItem) || TOID_IS_NULL(D_RO(mapArrayItem)->partitionArray)){ + fprintf(stderr, "get Partition of %d_%d_%d failed: mapArrayItem OR partitionArray none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); + return TOID_NULL(struct PartitionArrayItem); + } + + /** Do append and get if stageId is greater than default maxStages.**/ + TOID(struct PartitionArray) partitionArray = D_RO(mapArrayItem)->partitionArray; + int partitionNum = D_RO(partitionArray)->numPartitions; + int partitionArrayIndex = partitionId / partitionNum; + int partitionItemIndex = partitionId % partitionNum; + for (int i = 0; i < partitionArrayIndex; i++) { + if (TOID_IS_NULL(D_RO(partitionArray)->nextPartitionArray)) { + fprintf(stderr, "get Partition of %d_%d_%d failed: partitionArray none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); + return TOID_NULL(struct PartitionArrayItem); + } + partitionArray = D_RO(partitionArray)->nextPartitionArray; + } + /** append get done **/ + + TOID(struct PartitionArrayItem) partitionArrayItem = D_RO(partitionArray)->items[partitionItemIndex]; + if(TOID_IS_NULL(partitionArrayItem) || TOID_IS_NULL(D_RO(partitionArrayItem)->first_block)) { + fprintf(stderr, "get Partition of %d_%d_%d failed: partitionArrayItem OR partitionBlock none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); + return TOID_NULL(struct PartitionArrayItem); + } + + return partitionArrayItem; +} + +TOID(struct PartitionArrayItem) Request::getOrCreatePartitionBlock(int maxStage, int maxMap, int partitionNum) { + if (TOID_IS_NULL(D_RO(pmpool_ptr->stageArrayRoot)->stageArray)) { + D_RW(pmpool_ptr->stageArrayRoot)->stageArray = TX_ZALLOC(struct StageArray, sizeof(struct StageArray) + maxStage * sizeof(struct StageArrayItem)); + } + + /** Do append and get if stageId is greater than default maxStages.**/ + TX_ADD_FIELD(pmpool_ptr->stageArrayRoot, stageArray); + TOID(struct StageArray) *stageArray = &(D_RW(pmpool_ptr->stageArrayRoot)->stageArray); + int stageArrayIndex = stageId / maxStage; + int stageItemIndex = stageId % maxStage; + for (int i = 0; i < stageArrayIndex; i++) { + if (TOID_IS_NULL(D_RO(*stageArray)->nextStageArray)) { + D_RW(*stageArray)->nextStageArray = TX_ZALLOC(struct StageArray, sizeof(struct StageArray) + maxStage * sizeof(struct StageArrayItem)); + } + stageArray = &(D_RW(*stageArray)->nextStageArray); + TX_ADD(*stageArray); + } + /** append get done **/ + + TOID(struct StageArrayItem) *stageArrayItem = &(D_RW(*stageArray)->items[stageItemIndex]); + if (TOID_IS_NULL(*stageArrayItem)) { + *stageArrayItem = TX_ZNEW(struct StageArrayItem); + } + TX_ADD(*stageArrayItem); + if (TOID_IS_NULL(D_RO(*stageArrayItem)->typeArray)) { + D_RW(*stageArrayItem)->typeArray = TX_ZALLOC(struct TypeArray, sizeof(struct TypeArray) + TYPENUM * sizeof(struct TypeArrayItem)); + } + + TX_ADD_FIELD(*stageArrayItem, typeArray); + TOID(struct TypeArrayItem) *typeArrayItem = &(D_RW(D_RW(*stageArrayItem)->typeArray)->items[typeId]); + if (TOID_IS_NULL(*typeArrayItem)) { + *typeArrayItem = TX_ZNEW(struct TypeArrayItem); + } + TX_ADD(*typeArrayItem); + if (TOID_IS_NULL(D_RO(*typeArrayItem)->mapArray)) { + D_RW(*typeArrayItem)->mapArray = TX_ZALLOC(struct MapArray, sizeof(struct MapArray) + maxMap * sizeof(struct MapArrayItem)); + } + + TX_ADD_FIELD(*typeArrayItem, mapArray); + TOID(struct MapArrayItem) *mapArrayItem = &(D_RW(D_RW(*typeArrayItem)->mapArray)->items[mapId]); + if (TOID_IS_NULL(*mapArrayItem)) { + *mapArrayItem = TX_ZNEW(struct MapArrayItem); + } + TX_ADD(*mapArrayItem); + if (TOID_IS_NULL(D_RO(*mapArrayItem)->partitionArray)) { + D_RW(*mapArrayItem)->partitionArray = TX_ZALLOC(struct PartitionArray, sizeof(struct PartitionArray) + partitionNum * sizeof(struct PartitionArrayItem)); + TX_ADD_FIELD(*mapArrayItem, partitionArray); + D_RW(D_RW(*mapArrayItem)->partitionArray)->numPartitions = partitionNum; + } else { + TX_ADD_FIELD(*mapArrayItem, partitionArray); + } + TOID(struct PartitionArray) *partitionArray = &(D_RW(*mapArrayItem)->partitionArray); + + /** Do append and get if stageId is greater than default maxStages.**/ + int partitionArrayIndex = partitionId / partitionNum; + int partitionItemIndex = partitionId % partitionNum; + + for (int i = 0; i < partitionArrayIndex; i++) { + if (TOID_IS_NULL(D_RO(*partitionArray)->nextPartitionArray)) { + D_RW(*partitionArray)->nextPartitionArray = TX_ZALLOC(struct PartitionArray, sizeof(struct PartitionArray) + partitionNum * sizeof(struct PartitionArrayItem)); + TX_ADD_FIELD(*partitionArray, nextPartitionArray); + D_RW(D_RW(*partitionArray)->nextPartitionArray)->numPartitions = partitionNum; + } else { + TX_ADD_FIELD(*partitionArray, nextPartitionArray); + } + partitionArray = &(D_RW(*partitionArray)->nextPartitionArray); + } + /** append get done **/ + + TOID(struct PartitionArrayItem) *partitionArrayItem = &(D_RW(*partitionArray)->items[partitionItemIndex]); + if (TOID_IS_NULL(*partitionArrayItem)) { + *partitionArrayItem = TX_ZNEW(struct PartitionArrayItem); + } + return *partitionArrayItem; +} + +/****** WriteRequest ******/ +void WriteRequest::exec() { + setPartition(); +} + +long WriteRequest::getResult() { + while (!processed) { + usleep(5); + } + //cv.wait(lck, [&]{return processed;}); + //fprintf(stderr, "get Result for %d_%d_%d\n", stageId, mapId, partitionId); + return (long)data_addr; +} + +void WriteRequest::setPartition() { + TX_BEGIN(pmpool_ptr->pmpool) { + TX_ADD(pmpool_ptr->stageArrayRoot); + + TOID(struct PartitionArrayItem) partitionArrayItem = getOrCreatePartitionBlock(maxStage, maxMap, partitionNum); + TX_ADD(partitionArrayItem); + TX_ADD_FIELD(partitionArrayItem, partition_size); + + TOID(struct PartitionBlock) *partitionBlock = &(D_RW(partitionArrayItem)->first_block); + TOID(struct PartitionBlock) *last_partitionBlock = &(D_RW(partitionArrayItem)->last_block); + if (set_clean == false) { + // jump to last block + D_RW(partitionArrayItem)->partition_size += size; + D_RW(partitionArrayItem)->numBlocks += 1; + if (!TOID_IS_NULL(*last_partitionBlock)) { + partitionBlock = &(D_RW(*last_partitionBlock)->next_block); + } + } else { + TOID(struct PartitionBlock) curPartitionBlock = *partitionBlock; + TOID(struct PartitionBlock) nextPartitionBlock = curPartitionBlock; + + //remove original blocks per set_clean is true + while(!TOID_IS_NULL(curPartitionBlock)) { + nextPartitionBlock = D_RO(curPartitionBlock)->next_block; + pmemobj_tx_free(D_RO(curPartitionBlock)->data); + TX_FREE(curPartitionBlock); + curPartitionBlock = nextPartitionBlock; + } + + D_RW(partitionArrayItem)->partition_size = size; + D_RW(partitionArrayItem)->numBlocks = 1; + } + + TX_ADD_DIRECT(partitionBlock); + *partitionBlock = TX_ZALLOC(struct PartitionBlock, 1); + D_RW(*partitionBlock)->data = pmemobj_tx_zalloc(size, 0); + + D_RW(*partitionBlock)->data_size = size; + D_RW(*partitionBlock)->next_block = TOID_NULL(struct PartitionBlock); + D_RW(partitionArrayItem)->last_block = *partitionBlock; + + data_addr = (char*)pmemobj_direct(D_RW(*partitionBlock)->data); + //printf("setPartition data_addr: %p\n", data_addr); + pmemobj_tx_add_range_direct((const void *)data_addr, size); + + memcpy(data_addr, data, size); + } TX_ONCOMMIT { + committed = true; + block_cv.notify_all(); + } TX_ONABORT { + fprintf(stderr, "set Partition of %d_%d_%d failed, type is %d, partitionNum is %d, maxStage is %d, maxMap is %d. Error: %s\n", stageId, mapId, partitionId, typeId, partitionNum, maxStage, maxMap, pmemobj_errormsg()); + exit(-1); + } TX_END + + block_cv.wait(block_lck, [&]{return committed;}); + //fprintf(stderr, "request committed %d_%d_%d\n", stageId, mapId, partitionId); + + processed = true; + //cv.notify_all(); +} + +/****** ReadRequest ******/ +void ReadRequest::exec() { + getPartition(); +} + +long ReadRequest::getResult() { + return data_length; +} + +void ReadRequest::getPartition() { + //taskset(core_s, core_e); + TOID(struct PartitionArrayItem) partitionArrayItem = getPartitionBlock(); + if (TOID_IS_NULL(partitionArrayItem)) return; + data_length = D_RO(partitionArrayItem)->partition_size; + mb->buf = new char[data_length](); + long off = 0; + TOID(struct PartitionBlock) partitionBlock = D_RO(partitionArrayItem)->first_block; + + char* data_addr; + while(!TOID_IS_NULL(partitionBlock)) { + data_addr = (char*)pmemobj_direct(D_RO(partitionBlock)->data); + //printf("getPartition data_addr: %p\n", data_addr); + + memcpy(mb->buf + off, data_addr, D_RO(partitionBlock)->data_size); + off += D_RO(partitionBlock)->data_size; + partitionBlock = D_RO(partitionBlock)->next_block; + } + + //printf("getPartition length is %d\n", data_length); +} + +/****** MetaRequest ******/ +void MetaRequest::exec() { + getPartitionBlockInfo(); +} + +long MetaRequest::getResult() { + return array_length; +} + +void MetaRequest::getPartitionBlockInfo() { + TOID(struct PartitionArrayItem) partitionArrayItem = getPartitionBlock(); + if (TOID_IS_NULL(partitionArrayItem)) return; + TOID(struct PartitionBlock) partitionBlock = D_RO(partitionArrayItem)->first_block; + if (TOID_IS_NULL(partitionBlock)) return; + + int numBlocks = D_RO(partitionArrayItem)->numBlocks; + array_length = numBlocks * 2; + block_info->data = new long[array_length](); + int i = 0; + + while(!TOID_IS_NULL(partitionBlock)) { + block_info->data[i++] = (long)pmemobj_direct(D_RO(partitionBlock)->data); + block_info->data[i++] = D_RO(partitionBlock)->data_size; + partitionBlock = D_RO(partitionBlock)->next_block; + } +} + +/****** SizeRequest ******/ +void SizeRequest::exec() { + getPartitionSize(); +} + +long SizeRequest::getResult() { + return data_length; +} + +void SizeRequest::getPartitionSize() { + TOID(struct PartitionArrayItem) partitionArrayItem = getPartitionBlock(); + if (TOID_IS_NULL(partitionArrayItem)) return; + data_length = D_RO(partitionArrayItem)->partition_size; +} + + +/****** DeleteRequest ******/ +void DeleteRequest::exec() { + deletePartition(); +} + +long DeleteRequest::getResult() { + while (!processed) { + usleep(5); + } + return ret; +} + +void DeleteRequest::deletePartition() { + TX_BEGIN(pmpool_ptr->pmpool) { + TOID(struct PartitionArrayItem) partitionArrayItem = getPartitionBlock(); + if (TOID_IS_NULL(partitionArrayItem)) return; + TOID(struct PartitionBlock) partitionBlock = D_RO(partitionArrayItem)->first_block; + TOID(struct PartitionBlock) nextPartitionBlock = partitionBlock; + + while(!TOID_IS_NULL(partitionBlock)) { + nextPartitionBlock = D_RO(partitionBlock)->next_block; + pmemobj_tx_free(D_RO(partitionBlock)->data); + TX_FREE(partitionBlock); + partitionBlock = nextPartitionBlock; + } + //TX_FREE(partitionArrayItem); + //*(&partitionArrayItem) = TOID_NULL(struct PartitionArrayItem); + D_RW(partitionArrayItem)->first_block = TOID_NULL(struct PartitionBlock); + D_RW(partitionArrayItem)->last_block = TOID_NULL(struct PartitionBlock); + D_RW(partitionArrayItem)->partition_size = 0; + D_RW(partitionArrayItem)->numBlocks = 0; + } TX_ONCOMMIT { + committed = true; + block_cv.notify_all(); + } TX_ONABORT { + fprintf(stderr, "delete Partition of %d_%d_%d failed, type is %d. Error: %s\n", stageId, mapId, partitionId, typeId, pmemobj_errormsg()); + exit(-1); + } TX_END + + block_cv.wait(block_lck, [&]{return committed;}); + //fprintf(stderr, "request committed %d_%d_%d\n", stageId, mapId, partitionId); + + processed = true; +} diff --git a/src/main/cpp/Request.h b/src/main/cpp/Request.h new file mode 100644 index 00000000..9c8235e8 --- /dev/null +++ b/src/main/cpp/Request.h @@ -0,0 +1,248 @@ +#ifndef PMPOOL_REQUEST_H +#define PMPOOL_REQUEST_H + +#include +#include +#include + +#define TOID_ARRAY_TYPE(x) TOID(x) +#define TOID_ARRAY(x) TOID_ARRAY_TYPE(TOID(x)) +#define TYPENUM 2 + +POBJ_LAYOUT_BEGIN(PersistentMemoryStruct); +POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct StageArrayRoot); +POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct StageArray); +POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct StageArrayItem); +POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct TypeArray); +POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct TypeArrayItem); +POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct MapArray); +POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct MapArrayItem); +POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct PartitionArray); +POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct PartitionArrayItem); +POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct PartitionBlock); +POBJ_LAYOUT_END(PersistentMemoryStruct); + +struct StageArrayRoot { + TOID(struct StageArray) stageArray; +}; + +struct StageArray { + int size; + TOID(struct StageArray) nextStageArray; + TOID(struct StageArrayItem) items[]; +}; + +struct StageArrayItem { + TOID(struct TypeArray) typeArray; +}; + +struct TypeArray { + int size; + TOID(struct TypeArrayItem) items[]; +}; + +struct TypeArrayItem { + TOID(struct MapArray) mapArray; +}; + +struct MapArray { + int size; + TOID(struct MapArrayItem) items[]; +}; + +struct MapArrayItem { + TOID(struct PartitionArray) partitionArray; +}; + +struct PartitionArray { + int size; + int numPartitions; + TOID(struct PartitionArray) nextPartitionArray; + TOID(struct PartitionArrayItem) items[]; +}; + +struct PartitionArrayItem { + TOID(struct PartitionBlock) first_block; + TOID(struct PartitionBlock) last_block; + long partition_size; + int numBlocks; +}; + +struct PartitionBlock { + TOID(struct PartitionBlock) next_block; + long data_size; + PMEMoid data; +}; + +struct MemoryBlock { + char* buf; + int len; + MemoryBlock() { + } + + ~MemoryBlock() { + delete[] buf; + } +}; + +struct BlockInfo { + long* data; + BlockInfo() { + } + + ~BlockInfo() { + delete data; + } +}; + +using namespace std; +class PMPool; +class Request { +public: + Request(PMPool* pmpool_ptr, int stageId, int typeId, int mapId, int partitionId): + pmpool_ptr(pmpool_ptr), + stageId(stageId), + typeId(typeId), + mapId(mapId), + partitionId(partitionId), + processed(false), + committed(false), + lck(mtx), block_lck(block_mtx) { + } + ~Request(){} + virtual void exec() = 0; + virtual long getResult() = 0; +protected: + // add lock to make this request blocked + std::mutex mtx; + std::condition_variable cv; + bool processed; + std::unique_lock lck; + + // add lock to make func blocked + std::mutex block_mtx; + std::condition_variable block_cv; + bool committed; + std::unique_lock block_lck; + + int stageId; + int typeId; + int mapId; + int partitionId; + PMPool* pmpool_ptr; + + TOID(struct PartitionArrayItem) getOrCreatePartitionBlock(int maxStage, int maxMap, int partitionNum); + TOID(struct PartitionArrayItem) getPartitionBlock(); +}; + +class WriteRequest : Request { +public: + char* data_addr; //Pmem Block Addr + WriteRequest(PMPool* pmpool_ptr, + int maxStage, + int maxMap, + int partitionNum, + int stageId, + int typeId, + int mapId, + int partitionId, + long size, + char* data, + bool set_clean): + Request(pmpool_ptr, stageId, typeId, mapId, partitionId), + maxStage(maxStage), + maxMap(maxMap), + partitionNum(partitionNum), + size(size), + data(data), + data_addr(nullptr), + set_clean(set_clean){ + } + ~WriteRequest(){} + void exec(); + long getResult(); +private: + int maxStage; + int maxMap; + int partitionNum; + + long size; + char *data; + bool set_clean; + + void setPartition(); +}; + +class ReadRequest : Request { +public: + ReadRequest(PMPool* pmpool_ptr, + MemoryBlock *mb, + int stageId, + int typeId, + int mapId, + int partitionId): + Request(pmpool_ptr, stageId, typeId, mapId, partitionId), mb(mb){ + } + ~ReadRequest(){} + void exec(); + long getResult(); +private: + MemoryBlock *mb; + long data_length = -1; + void getPartition(); +}; + +class MetaRequest : Request { +public: + MetaRequest(PMPool* pmpool_ptr, + BlockInfo* block_info, + int stageId, + int typeId, + int mapId, + int partitionId): + Request(pmpool_ptr, stageId, typeId, mapId, partitionId), block_info(block_info){ + } + ~MetaRequest(){} + void exec(); + long getResult(); +private: + BlockInfo* block_info; + long array_length = -1; + void getPartitionBlockInfo(); +}; + +class SizeRequest: Request { +public: + SizeRequest(PMPool* pmpool_ptr, + int stageId, + int typeId, + int mapId, + int partitionId): + Request(pmpool_ptr, stageId, typeId, mapId, partitionId){ + } + ~SizeRequest(){} + void exec(); + long getResult(); +private: + long data_length = -1; + void getPartitionSize(); +}; + +class DeleteRequest: Request { +public: + DeleteRequest(PMPool* pmpool_ptr, + int stageId, + int typeId, + int mapId, + int partitionId): + Request(pmpool_ptr, stageId, typeId, mapId, partitionId){ + } + ~DeleteRequest(){} + void exec(); + long getResult(); +private: + long ret = -1; + void deletePartition(); +}; + +#endif diff --git a/src/main/cpp/WorkQueue.h b/src/main/cpp/WorkQueue.h index 625dcccb..99a017df 100644 --- a/src/main/cpp/WorkQueue.h +++ b/src/main/cpp/WorkQueue.h @@ -1,5 +1,4 @@ #ifndef WORKQUEUE_H - std::unique_lock lck; #define WORKQUEUE_H #include @@ -7,19 +6,18 @@ #include #include #include +#include template class WorkQueue{ public: typedef T queue_type; std::queue _queue; std::mutex _queue_lock; - std::mutex cond_lock; - //std::mutex::scoped_lock scope_cond_lock; - std::condition_variable m_cond; - std::unique_lock unique_lock; + //std::mutex cond_lock; + //std::condition_variable m_cond; + //std::unique_lock unique_lock; - //WorkQueue():scope_cond_lock(cond_lock){} - WorkQueue():unique_lock(cond_lock){} + WorkQueue(){} void enqueue( queue_type _work ){ std::lock_guard guard(_queue_lock); @@ -52,8 +50,8 @@ template class WorkQueue{ return this->_queue.size(); } - void wake_all(){ + /*void wake_all(){ m_cond.notify_all(); - } + }*/ }; #endif diff --git a/src/main/cpp/lib_jni_pmdk.cpp b/src/main/cpp/lib_jni_pmdk.cpp index 46767f0b..2a8095e5 100644 --- a/src/main/cpp/lib_jni_pmdk.cpp +++ b/src/main/cpp/lib_jni_pmdk.cpp @@ -11,26 +11,26 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_ } JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeSetMapPartition - (JNIEnv *env, jclass obj, jlong pmpool, jint partitionNum, jint stageId, jint mapId, jint partitionId, long pmBuffer, jboolean set_clean) { + (JNIEnv *env, jclass obj, jlong pmpool, jint partitionNum, jint stageId, jint mapId, jint partitionId, long pmBuffer, jboolean set_clean, jint numMaps) { int size = ((PmemBuffer*)pmBuffer)->getRemaining(); char* buf = ((PmemBuffer*)pmBuffer)->getDataForFlush(size); if (buf == nullptr) { return -1; } //printf("nativeSetMapPartition for shuffle_%d_%d_%d\n", stageId, mapId, partitionId); - long addr = ((PMPool*)pmpool)->setMapPartition(partitionNum, stageId, mapId, partitionId, size, buf, set_clean); + long addr = ((PMPool*)pmpool)->setMapPartition(partitionNum, stageId, mapId, partitionId, size, buf, set_clean, numMaps); return addr; } JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeSetReducePartition - (JNIEnv *env, jclass obj, jlong pmpool, jint partitionNum, jint stageId, jint partitionId, long pmBuffer, jboolean clean) { + (JNIEnv *env, jclass obj, jlong pmpool, jint partitionNum, jint stageId, jint partitionId, long pmBuffer, jboolean clean, jint numMaps) { int size = ((PmemBuffer*)pmBuffer)->getRemaining(); char* buf = ((PmemBuffer*)pmBuffer)->getDataForFlush(size); if (buf == nullptr) { return -1; } //printf("nativeSetMapPartition for spill_%d_%d\n", stageId, partitionId); - long addr = ((PMPool*)pmpool)->setReducePartition(partitionNum, stageId, partitionId, size, buf, clean); + long addr = ((PMPool*)pmpool)->setReducePartition(partitionNum, stageId, partitionId, size, buf, clean, numMaps); return addr; } @@ -86,6 +86,16 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_ return ((PMPool*)pmpool)->getReducePartitionSize(stageId, mapId, partitionId); } +JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeDeleteMapPartition + (JNIEnv *env, jclass obj, jlong pmpool, jint stageId, jint mapId, jint partitionId) { + return ((PMPool*)pmpool)->deleteMapPartition(stageId, mapId, partitionId); + } + +JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeDeleteReducePartition + (JNIEnv *env, jclass obj, jlong pmpool, jint stageId, jint mapId, jint partitionId) { + return ((PMPool*)pmpool)->deleteReducePartition(stageId, mapId, partitionId); + } + JNIEXPORT jint JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeCloseDevice (JNIEnv *env, jclass obj, jlong pmpool) { delete (PMPool*)pmpool; diff --git a/src/main/cpp/lib_jni_pmdk.h b/src/main/cpp/lib_jni_pmdk.h index e258e8f3..1b9bf0c6 100644 --- a/src/main/cpp/lib_jni_pmdk.h +++ b/src/main/cpp/lib_jni_pmdk.h @@ -18,18 +18,18 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_ /* * Class: lib_jni_pmdk * Method: nativeSetMapPartition - * Signature: (JIIIIJZ)J + * Signature: (JIIIIJZI)J */ JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeSetMapPartition - (JNIEnv *, jclass, jlong, jint, jint, jint, jint, jlong, jboolean); + (JNIEnv *, jclass, jlong, jint, jint, jint, jint, jlong, jboolean, jint); /* * Class: lib_jni_pmdk * Method: nativeSetReducePartition - * Signature: (JIIIIJZ)J + * Signature: (JIIIIJZI)J */ JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeSetReducePartition - (JNIEnv *, jclass, jlong, jint, jint, jint, jlong, jboolean); + (JNIEnv *, jclass, jlong, jint, jint, jint, jlong, jboolean, jint); /* * Class: lib_jni_pmdk @@ -65,7 +65,7 @@ JNIEXPORT jlongArray JNICALL Java_org_apache_spark_storage_pmof_PersistentMemory /* * Class: lib_jni_pmdk - * Method: nativeReduceMapPartitionBlockInfo + * Method: nativeGetReduceMapPartitionBlockInfo * Signature: (JIII)[J */ JNIEXPORT jlongArray JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeGetReducePartitionBlockInfo @@ -87,6 +87,22 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_ JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeGetReducePartitionSize (JNIEnv *, jclass, jlong, jint, jint, jint); +/* + * Class: lib_jni_pmdk + * Method: nativeDeleteMapPartition + * Signature: (JIII)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeDeleteMapPartition + (JNIEnv *, jclass, jlong, jint, jint, jint); + +/* + * Class: lib_jni_pmdk + * Method: nativeDeleteReducePartition + * Signature: (JIII)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeDeleteReducePartition + (JNIEnv *, jclass, jlong, jint, jint, jint); + /* * Class: lib_jni_pmdk * Method: nativeGetRoot diff --git a/src/main/cpp/make b/src/main/cpp/make index 6971fe72..925e5088 100755 --- a/src/main/cpp/make +++ b/src/main/cpp/make @@ -1,2 +1,2 @@ #!/usr/bin/sh -g++ lib_jni_pmdk.cpp -o libjnipmdk.so -I$JAVA_HOME -I$JAVA_HOME/include/linux -I$JAVA_HOME/include -std=c++11 -lpmemobj -lpthread -fPIC -shared -g +g++ lib_jni_pmdk.cpp -o libjnipmdk.so PersistentMemoryPool.cpp Request.cpp -I$JAVA_HOME -I$JAVA_HOME/include/linux -I$JAVA_HOME/include -std=c++11 -lpmemobj -lpthread -fPIC -shared -g diff --git a/src/main/java/org/apache/spark/storage/pmof/PersistentMemoryPool.java b/src/main/java/org/apache/spark/storage/pmof/PersistentMemoryPool.java index 42cc46d2..a96c3fad 100644 --- a/src/main/java/org/apache/spark/storage/pmof/PersistentMemoryPool.java +++ b/src/main/java/org/apache/spark/storage/pmof/PersistentMemoryPool.java @@ -6,14 +6,16 @@ public class PersistentMemoryPool { System.load("/usr/local/lib/libjnipmdk.so"); } private static native long nativeOpenDevice(String path, int maxStage, int maxMap, long size); - private static native long nativeSetMapPartition(long deviceHandler, int numPartitions, int stageId, int mapId, int partutionId, long pmemBufferHandler, boolean clean); - private static native long nativeSetReducePartition(long deviceHandler, int numPartitions, int stageId, int partutionId, long pmemBufferHandler, boolean clean); + private static native long nativeSetMapPartition(long deviceHandler, int numPartitions, int stageId, int mapId, int partutionId, long pmemBufferHandler, boolean clean, int numMaps); + private static native long nativeSetReducePartition(long deviceHandler, int numPartitions, int stageId, int partutionId, long pmemBufferHandler, boolean clean, int numMaps); private static native byte[] nativeGetMapPartition(long deviceHandler, int stageId, int mapId, int partitionId); private static native byte[] nativeGetReducePartition(long deviceHandler, int stageId, int mapId, int partitionId); private static native long[] nativeGetMapPartitionBlockInfo(long deviceHandler, int stageId, int mapId, int partitionId); private static native long[] nativeGetReducePartitionBlockInfo(long deviceHandler, int stageId, int mapId, int partitionId); private static native long nativeGetMapPartitionSize(long deviceHandler, int stageId, int mapId, int partitionId); private static native long nativeGetReducePartitionSize(long deviceHandler, int stageId, int mapId, int partitionId); + private static native long nativeDeleteMapPartition(long deviceHandler, int stageId, int mapId, int partitionId); + private static native long nativeDeleteReducePartition(long deviceHandler, int stageId, int mapId, int partitionId); private static native long nativeGetRoot(long deviceHandler); private static native int nativeCloseDevice(long deviceHandler); @@ -32,12 +34,12 @@ public class PersistentMemoryPool { this.deviceHandler = nativeOpenDevice(path, max_stages_num, max_shuffles_num, pool_size); } - public long setMapPartition(int partitionNum, int stageId, int shuffleId, int partitionId, PmemBuffer buf, boolean set_clean) { - return nativeSetMapPartition(this.deviceHandler, partitionNum, stageId, shuffleId, partitionId, buf.getNativeObject(), set_clean); + public long setMapPartition(int partitionNum, int stageId, int shuffleId, int partitionId, PmemBuffer buf, boolean set_clean, int numMaps) { + return nativeSetMapPartition(this.deviceHandler, partitionNum, stageId, shuffleId, partitionId, buf.getNativeObject(), set_clean, numMaps); } - public long setReducePartition(int partitionNum, int stageId, int partitionId, PmemBuffer buf, boolean set_clean) { - return nativeSetReducePartition(this.deviceHandler, partitionNum, stageId, partitionId, buf.getNativeObject(), set_clean); + public long setReducePartition(int partitionNum, int stageId, int partitionId, PmemBuffer buf, boolean set_clean, int numMaps) { + return nativeSetReducePartition(this.deviceHandler, partitionNum, stageId, partitionId, buf.getNativeObject(), set_clean, numMaps); } public byte[] getMapPartition(int stageId, int shuffleId, int partitionId) { @@ -64,6 +66,14 @@ public long getReducePartitionSize(int stageId, int shuffleId, int partitionId) return nativeGetReducePartitionSize(this.deviceHandler, stageId, shuffleId, partitionId); } + public long deleteMapPartition(int stageId, int shuffleId, int partitionId) { + return nativeDeleteMapPartition(this.deviceHandler, stageId, shuffleId, partitionId); + } + + public long deleteReducePartition(int stageId, int shuffleId, int partitionId) { + return nativeDeleteReducePartition(this.deviceHandler, stageId, shuffleId, partitionId); + } + public long getRootAddr() { return nativeGetRoot(this.deviceHandler); } diff --git a/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleReader.scala b/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleReader.scala index 1ce41994..f9291a49 100644 --- a/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleReader.scala +++ b/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleReader.scala @@ -100,7 +100,7 @@ private[spark] class PmemShuffleReader[K, C]( // Create an ExternalSorter to sort the data. val sorter = new PmemExternalSorter[K, C, C](context, handle, ordering = Some(keyOrd), serializer = dep.serializer) - logInfo("call PmemExternalSorter.insertAll for shuffle_0_" + handle.shuffleId + "_[" + startPartition + "," + endPartition + "]" ) + logDebug("call PmemExternalSorter.insertAll for shuffle_0_" + handle.shuffleId + "_[" + startPartition + "," + endPartition + "]" ) sorter.insertAll(aggregatedIter) // Use completion callback to stop sorter if task was finished/cancelled. context.addTaskCompletionListener(_ => { diff --git a/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriter.scala b/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriter.scala index 6ae4d47f..6d30b2b3 100644 --- a/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriter.scala +++ b/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriter.scala @@ -52,10 +52,6 @@ private[spark] class PmemShuffleWriter[K, V, C]( val enable_rdma: Boolean = conf.getBoolean("spark.shuffle.pmof.enable_rdma", defaultValue = true) val enable_pmem: Boolean = conf.getBoolean("spark.shuffle.pmof.enable_pmem", defaultValue = true) - val maxPoolSize: Long = conf.getLong("spark.shuffle.pmof.pmpool_size", defaultValue = 1073741824) - val maxStages: Int = conf.getInt("spark.shuffle.pmof.max_stage_num", defaultValue = 1000) - val maxMaps: Int = numMaps - val partitionLengths: Array[Long] = Array.fill[Long](numPartitions)(0) var set_clean: Boolean = true private var sorter: PmemExternalSorter[K, V, _] = null @@ -107,16 +103,24 @@ private[spark] class PmemShuffleWriter[K, V, C]( partitionBufferArray(partitionId).maybeSpill(force = true) } } - val data_addr_map = Array.ofDim[(Long, Int)](numPartitions, 1) + + var numSpilledPartitions = 0 + while( numSpilledPartitions < numPartitions && partitionBufferArray(numSpilledPartitions).ifSpilled ) { + numSpilledPartitions += 1 + } + val data_addr_map = Array.ofDim[(Long, Int)](numSpilledPartitions, 1) var output_str : String = "" - for (i <- 0 until numPartitions) { + for (i <- 0 until numSpilledPartitions) { data_addr_map(i) = partitionBufferArray(i).getPartitionMeta.map{info => (info._1, info._2)} writeMetrics.incRecordsWritten(partitionBufferArray(i).records) partitionLengths(i) = partitionBufferArray(i).size output_str += "\tPartition " + i + ": " + partitionLengths(i) + ", records: " + partitionBufferArray(i).records + "\n" partitionBufferArray(i).close() } + for (i <- numSpilledPartitions until numPartitions) { + partitionBufferArray(i).close() + } logDebug("shuffle_" + dep.shuffleId + "_" + mapId + ": \n" + output_str) diff --git a/src/main/scala/org/apache/spark/storage/pmof/PersistentMemoryHandler.scala b/src/main/scala/org/apache/spark/storage/pmof/PersistentMemoryHandler.scala index 65c9f2d2..e3fb334b 100644 --- a/src/main/scala/org/apache/spark/storage/pmof/PersistentMemoryHandler.scala +++ b/src/main/scala/org/apache/spark/storage/pmof/PersistentMemoryHandler.scala @@ -34,7 +34,7 @@ private[spark] class PersistentMemoryHandler( val path_list: List[String], val shuffleId: String, val maxStages: Int = 1000, - val maxShuffles: Int = 1000, + val numMaps: Int = 1000, var poolSize: Long = -1) extends Logging { // need to use a locked file to get which pmem device should be used. val pmMetaHandler: PersistentMemoryMetaHandler = new PersistentMemoryMetaHandler(root_dir) @@ -43,7 +43,7 @@ private[spark] class PersistentMemoryHandler( //this shuffleId haven't been written before, choose a new device val path_array_list = new java.util.ArrayList[String](path_list.asJava) device = pmMetaHandler.getUnusedDevice(path_array_list) - logInfo("This a new shuffleBlock, find an unused device:" + device + ", numMaps of this stage is " + maxShuffles) + logInfo("This a new shuffleBlock, find an unused device:" + device + ", numMaps of this stage is " + numMaps) val dev = Paths.get(device) if (Files.isDirectory(dev)) { @@ -54,10 +54,10 @@ private[spark] class PersistentMemoryHandler( poolSize = 0 } } else { - logInfo("This a recently opened shuffleBlock, use the original device:" + device + ", numMaps of this stage is " + maxShuffles) + logInfo("This a recently opened shuffleBlock, use the original device:" + device + ", numMaps of this stage is " + numMaps) } - val pmpool = new PersistentMemoryPool(device, maxStages, maxShuffles, poolSize) + val pmpool = new PersistentMemoryPool(device, maxStages, numMaps, poolSize) var rkey: Long = 0 def getDevice(): String = { @@ -93,13 +93,13 @@ private[spark] class PersistentMemoryHandler( pmpool.getReducePartitionSize(stageId, shuffleId, partitionId) } - def setPartition(numPartitions: Int, blockId: String, buf: PmemBuffer, clean: Boolean): Long = { + def setPartition(numPartitions: Int, blockId: String, buf: PmemBuffer, clean: Boolean, numMaps: Int = 1): Long = { val (blockType, stageId, shuffleId, partitionId) = getBlockDetail(blockId) var ret_addr: Long = 0 if (blockType == "shuffle") { - ret_addr = pmpool.setMapPartition(numPartitions, stageId, shuffleId, partitionId, buf, clean) + ret_addr = pmpool.setMapPartition(numPartitions, stageId, shuffleId, partitionId, buf, clean, numMaps) } else if (blockType == "reduce_spill") { - ret_addr = pmpool.setReducePartition(10000000/*TODO: should be incrementable*/, stageId, partitionId, buf, clean) + ret_addr = pmpool.setReducePartition(10000, stageId, partitionId, buf, clean, numMaps) } ret_addr } @@ -113,13 +113,21 @@ private[spark] class PersistentMemoryHandler( if (blockType == "shuffle") { pmpool.getMapPartition(stageId, shuffleId, partitionId) } else if (blockType == "reduce_spill") { - logInfo("getReducePartition: partitionId is " + partitionId) pmpool.getReducePartition(stageId, shuffleId, partitionId) } else { new Array[Byte](0) } } + def deletePartition(blockId: String): Unit = { + val (blockType, stageId, shuffleId, partitionId) = getBlockDetail(blockId) + if (blockType == "shuffle") { + pmpool.deleteMapPartition(stageId, shuffleId, partitionId) + } else if (blockType == "reduce_spill") { + pmpool.deleteReducePartition(stageId, shuffleId, partitionId) + } + } + def getPartitionManagedBuffer(blockId: String): ManagedBuffer = { new PmemManagedBuffer(this, blockId) } diff --git a/src/main/scala/org/apache/spark/storage/pmof/PmemBlockObjectStream.scala b/src/main/scala/org/apache/spark/storage/pmof/PmemBlockObjectStream.scala index 1ee5a614..3169e847 100644 --- a/src/main/scala/org/apache/spark/storage/pmof/PmemBlockObjectStream.scala +++ b/src/main/scala/org/apache/spark/storage/pmof/PmemBlockObjectStream.scala @@ -60,7 +60,7 @@ private[spark] class PmemBlockObjectStream( var objStream: SerializationStream = _ var wrappedStream: OutputStream = _ val bytesStream: OutputStream = new PmemOutputStream( - persistentMemoryWriter, numPartitions, blockId.name) + persistentMemoryWriter, numPartitions, blockId.name, numMaps) var inputStream: InputStream = _ override def write(key: Any, value: Any): Unit = { diff --git a/src/main/scala/org/apache/spark/storage/pmof/PmemInputStream.scala b/src/main/scala/org/apache/spark/storage/pmof/PmemInputStream.scala index e3bb8e86..c1388e54 100644 --- a/src/main/scala/org/apache/spark/storage/pmof/PmemInputStream.scala +++ b/src/main/scala/org/apache/spark/storage/pmof/PmemInputStream.scala @@ -6,14 +6,14 @@ import org.apache.spark.internal.Logging import scala.util.control.Breaks._ class PmemInputStream( - persistentMemoryWriter: PersistentMemoryHandler, + persistentMemoryHandler: PersistentMemoryHandler, blockId: String ) extends InputStream with Logging { val buf = new PmemBuffer() var index: Int = 0 var remaining: Int = 0 - var available_bytes: Int = persistentMemoryWriter.getPartitionSize(blockId).toInt - val blockInfo: Array[(Long, Int)] = persistentMemoryWriter.getPartitionBlockInfo(blockId) + var available_bytes: Int = persistentMemoryHandler.getPartitionSize(blockId).toInt + val blockInfo: Array[(Long, Int)] = persistentMemoryHandler.getPartitionBlockInfo(blockId) def loadNextStream(): Int = { if (index >= blockInfo.length) @@ -71,4 +71,9 @@ class PmemInputStream( override def close(): Unit = { buf.close() } + + def deleteBlock(): Unit = { + // FIXME: DELETE PMEM PARTITON HERE + persistentMemoryHandler.deletePartition(blockId) + } } diff --git a/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala b/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala index 392145b1..9e6173a2 100644 --- a/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala +++ b/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala @@ -7,7 +7,8 @@ import org.apache.spark.internal.Logging class PmemOutputStream( persistentMemoryWriter: PersistentMemoryHandler, numPartitions: Int, - blockId: String + blockId: String, + numMaps: Int ) extends OutputStream with Logging { val buf = new PmemBuffer() var set_clean = true @@ -24,7 +25,7 @@ class PmemOutputStream( } override def flush(): Unit = { - persistentMemoryWriter.setPartition(numPartitions, blockId, buf, set_clean) + persistentMemoryWriter.setPartition(numPartitions, blockId, buf, set_clean, numMaps) if (set_clean == true) { set_clean = false } diff --git a/src/main/scala/org/apache/spark/util/collection/pmof/PmemExternalSorter.scala b/src/main/scala/org/apache/spark/util/collection/pmof/PmemExternalSorter.scala index 2b1f4148..0febd721 100644 --- a/src/main/scala/org/apache/spark/util/collection/pmof/PmemExternalSorter.scala +++ b/src/main/scala/org/apache/spark/util/collection/pmof/PmemExternalSorter.scala @@ -356,6 +356,7 @@ private[spark] class PmemExternalSorter[K, V, C]( if (inObjStream == null) { if (inStream != null) { inStream.close() + inStream.asInstanceOf[PmemInputStream].deleteBlock() } return null }