diff --git a/spark.conf.example b/spark.conf.example index dccfbf3e..fedce213 100644 --- a/spark.conf.example +++ b/spark.conf.example @@ -9,8 +9,8 @@ spark.shuffle.pmof.enable_rdma true spark.shuffle.pmof.enable_pmem true # for persistent memory -spark.shuffle.pmof.max_stage_num 100 spark.shuffle.pmof.pmem_list /dev/dax0.0,/dev/dax1.0 +spark.shuffle.spill.pmof.MemoryThreshold 16777216 # for rdma spark.shuffle.pmof.server_buffer_nums 32 diff --git a/src/main/cpp/PersistentMemoryPool.cpp b/src/main/cpp/PersistentMemoryPool.cpp new file mode 100644 index 00000000..93353aaf --- /dev/null +++ b/src/main/cpp/PersistentMemoryPool.cpp @@ -0,0 +1,143 @@ +#include "PersistentMemoryPool.h" +#include +#include +#include +#include +#include +#include +#include + +PMPool::PMPool(const char* dev, int maxStage, int maxMap, long size): + maxStage(maxStage), + maxMap(maxMap), + stop(false), + dev(dev), + worker(&PMPool::process, this) { + + const char *pool_layout_name = "pmem_spark_shuffle"; + cout << "PMPOOL is " << dev << endl; + // if this is a fsdax device + // we need to create + // if this is a devdax device + + pmpool = pmemobj_open(dev, pool_layout_name); + if (pmpool == NULL) { + pmpool = pmemobj_create(dev, pool_layout_name, size, S_IRUSR | S_IWUSR); + } + if (pmpool == NULL) { + cerr << "Failed to create pool, kill process, errmsg: " << pmemobj_errormsg() << endl; + exit(-1); + } + + stageArrayRoot = POBJ_ROOT(pmpool, struct StageArrayRoot); +} + +PMPool::~PMPool() { + while(request_queue.size() > 0) { + fprintf(stderr, "%s request queue size is %d\n", dev, request_queue.size()); + sleep(1); + } + fprintf(stderr, "%s request queue size is %d\n", dev, request_queue.size()); + stop = true; + worker.join(); + pmemobj_close(pmpool); +} + +long PMPool::getRootAddr() { + return (long)pmpool; +} + +void PMPool::process() { + Request *cur_req; + while(!stop) { + cur_req = (Request*)request_queue.dequeue(); + if (cur_req != nullptr) { + cur_req->exec(); + } + } +} + +long PMPool::setMapPartition( + int partitionNum, + int stageId, + int mapId, + int partitionId, + long size, + char* data, + bool clean, + int numMaps) { + WriteRequest write_request(this, maxStage, numMaps, partitionNum, stageId, 0, mapId, partitionId, size, data, clean); + request_queue.enqueue((void*)&write_request); + return write_request.getResult(); +} + +long PMPool::setReducePartition( + int partitionNum, + int stageId, + int partitionId, + long size, + char* data, + bool clean, + int numMaps) { + WriteRequest write_request(this, maxStage, 1, partitionNum, stageId, 1, 0, partitionId, size, data, clean); + request_queue.enqueue((void*)&write_request); + + return write_request.getResult(); +} + +long PMPool::getMapPartition( + MemoryBlock* mb, + int stageId, + int mapId, + int partitionId ) { + ReadRequest read_request(this, mb, stageId, 0, mapId, partitionId); + read_request.exec(); + return read_request.getResult(); +} + +long PMPool::getReducePartition( + MemoryBlock* mb, + int stageId, + int mapId, + int partitionId ) { + ReadRequest read_request(this, mb, stageId, 1, mapId, partitionId); + read_request.exec(); + read_request.getResult(); +} + +long PMPool::getMapPartitionBlockInfo(BlockInfo *blockInfo, int stageId, int mapId, int partitionId) { + MetaRequest meta_request(this, blockInfo, stageId, 0, mapId, partitionId); + meta_request.exec(); + return meta_request.getResult(); +} + +long PMPool::getReducePartitionBlockInfo(BlockInfo *blockInfo, int stageId, int mapId, int partitionId) { + MetaRequest meta_request(this, blockInfo, stageId, 1, mapId, partitionId); + meta_request.exec(); + return meta_request.getResult(); +} + +long PMPool::getMapPartitionSize(int stageId, int mapId, int partitionId) { + SizeRequest size_request(this, stageId, 0, mapId, partitionId); + size_request.exec(); + return size_request.getResult(); +} + +long PMPool::getReducePartitionSize(int stageId, int mapId, int partitionId) { + SizeRequest size_request(this, stageId, 1, mapId, partitionId); + size_request.exec(); + return size_request.getResult(); +} + +long PMPool::deleteMapPartition(int stageId, int mapId, int partitionId) { + DeleteRequest delete_request(this, stageId, 0, mapId, partitionId); + request_queue.enqueue((void*)&delete_request); + return delete_request.getResult(); +} + +long PMPool::deleteReducePartition(int stageId, int mapId, int partitionId) { + DeleteRequest delete_request(this, stageId, 1, mapId, partitionId); + request_queue.enqueue((void*)&delete_request); + return delete_request.getResult(); +} + diff --git a/src/main/cpp/PersistentMemoryPool.h b/src/main/cpp/PersistentMemoryPool.h index 504be055..a48e3d75 100644 --- a/src/main/cpp/PersistentMemoryPool.h +++ b/src/main/cpp/PersistentMemoryPool.h @@ -1,18 +1,9 @@ -#include -#include -#include -#include +#ifndef PMPOOL_H +#define PMPOOL_H -#include -#include -#include -#include #include -#include -#include -#include -#include #include "WorkQueue.h" +#include "Request.h" /* This class is used to make PM as a huge pool * All shuffle files will be stored in the same pool @@ -42,518 +33,39 @@ using namespace std; -#define TOID_ARRAY_TYPE(x) TOID(x) -#define TOID_ARRAY(x) TOID_ARRAY_TYPE(TOID(x)) -#define TYPENUM 2 - -POBJ_LAYOUT_BEGIN(PersistentMemoryStruct); -POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct StageArrayRoot); -POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct StageArray); -POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct StageArrayItem); -POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct TypeArray); -POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct TypeArrayItem); -POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct MapArray); -POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct MapArrayItem); -POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct PartitionArray); -POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct PartitionArrayItem); -POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct PartitionBlock); -POBJ_LAYOUT_END(PersistentMemoryStruct); - -struct StageArrayRoot { - TOID(struct StageArray) stageArray; -}; - -struct StageArray { - int size; - TOID(struct StageArrayItem) items[]; -}; - -struct StageArrayItem { - TOID(struct TypeArray) typeArray; -}; - -struct TypeArray { - int size; - TOID(struct TypeArrayItem) items[]; -}; - -struct TypeArrayItem { - TOID(struct MapArray) mapArray; -}; - -struct MapArray { - int size; - TOID(struct MapArrayItem) items[]; -}; - -struct MapArrayItem { - TOID(struct PartitionArray) partitionArray; -}; - -struct PartitionArray { - int size; - TOID(struct PartitionArrayItem) items[]; -}; - -struct PartitionArrayItem { - TOID(struct PartitionBlock) first_block; - TOID(struct PartitionBlock) last_block; - long partition_size; - int numBlocks; -}; - -struct PartitionBlock { - TOID(struct PartitionBlock) next_block; - long data_size; - PMEMoid data; -}; - -struct MemoryBlock { - char* buf; - int len; - MemoryBlock() { - } - - ~MemoryBlock() { - delete[] buf; - } -}; - -struct BlockInfo { - long* data; - BlockInfo() { - } - - ~BlockInfo() { - delete data; - } -}; - class PMPool { public: PMEMobjpool *pmpool; + + std::thread worker; + WorkQueue request_queue; + bool stop; + TOID(struct StageArrayRoot) stageArrayRoot; int maxStage; int maxMap; - std::mutex pmem_index_lock; - int core_s; - int core_e; - std::thread worker; - bool stop; const char* dev; - WorkQueue request_queue; + PMPool(const char* dev, int maxStage, int maxMap, long size); ~PMPool(); long getRootAddr(); - long setMapPartition(int partitionNum, int stageId, int mapId, int partitionId, long size, char* data, bool clean); - long setReducePartition(int partitionNum, int stageId, int partitionId, long size, char* data, bool clean); + + long setMapPartition(int partitionNum, int stageId, int mapId, int partitionId, long size, char* data, bool clean, int numMaps); + long setReducePartition(int partitionNum, int stageId, int partitionId, long size, char* data, bool clean, int numMaps); + long getMapPartition(MemoryBlock* mb, int stageId, int mapId, int partitionId); long getReducePartition(MemoryBlock* mb, int stageId, int mapId, int partitionId); - long getPartition(MemoryBlock* mb, int stageId, int typeId, int mapId, int partitionId); - int getMapPartitionBlockInfo(BlockInfo *block_info, int stageId, int mapId, int partitionId); - int getReducePartitionBlockInfo(BlockInfo *block_info, int stageId, int mapId, int partitionId); + + long getMapPartitionBlockInfo(BlockInfo *block_info, int stageId, int mapId, int partitionId); + long getReducePartitionBlockInfo(BlockInfo *block_info, int stageId, int mapId, int partitionId); + long getMapPartitionSize(int stageId, int mapId, int partitionId); long getReducePartitionSize(int stageId, int mapId, int partitionId); + long deleteMapPartition(int stageId, int mapId, int partitionId); + long deleteReducePartition(int stageId, int mapId, int partitionId); +private: void process(); - TOID(struct PartitionArrayItem) getPartitionBlock(int stageId, int typeId, int mapId, int partitionId); -}; - -class Request { -public: - // add lock to make this request blocked - std::mutex mtx; - std::condition_variable cv; - bool processed; - std::unique_lock lck; - - // add lock to make func blocked - std::mutex block_mtx; - std::condition_variable block_cv; - bool committed; - std::unique_lock block_lck; - - int maxStage; - int maxMap; - int partitionNum; - int stageId; - int typeId; - int mapId; - int partitionId; - long size; - char *data; - bool set_clean; - char* data_addr; - PMPool* pmpool_ptr; - - Request(PMPool* pmpool_ptr, - int maxStage, - int maxMap, - int partitionNum, - int stageId, - int typeId, - int mapId, - int partitionId, - long size, - char* data, - bool set_clean); - ~Request(); - void setPartition(); - long getResult(); }; -PMPool::PMPool(const char* dev, int maxStage, int maxMap, long size): - maxStage(maxStage), - maxMap(maxMap), - stop(false), - dev(dev), - worker(&PMPool::process, this) { - - const char *pool_layout_name = "pmem_spark_shuffle"; - cout << "PMPOOL is " << dev << endl; - // if this is a fsdax device - // we need to create - // if this is a devdax device - - pmpool = pmemobj_open(dev, pool_layout_name); - if (pmpool == NULL) { - cout << "Failed to open dev, try to create, errmsg: " << pmemobj_errormsg() << endl; - pmpool = pmemobj_create(dev, pool_layout_name, size, S_IRUSR | S_IWUSR); - } - if (pmpool == NULL) { - cerr << "Failed to create pool, kill process, errmsg: " << pmemobj_errormsg() << endl; - exit(-1); - } - - stageArrayRoot = POBJ_ROOT(pmpool, struct StageArrayRoot); -} - -PMPool::~PMPool() { - while(request_queue.size() > 0) { - fprintf(stderr, "%s request queue size is %d\n", dev, request_queue.size()); - sleep(1); - } - fprintf(stderr, "%s request queue size is %d\n", dev, request_queue.size()); - stop = true; - worker.join(); - pmemobj_close(pmpool); -} - -long PMPool::getRootAddr() { - return (long)pmpool; -} - -void PMPool::process() { - Request *cur_req; - while(!stop) { - cur_req = (Request*)request_queue.dequeue(); - if (cur_req != nullptr) { - cur_req->setPartition(); - } - } -} - -long PMPool::setMapPartition( - int partitionNum, - int stageId, - int mapId, - int partitionId, - long size, - char* data, - bool clean) { - Request write_request(this, maxStage, maxMap, partitionNum, stageId, 0, mapId, partitionId, size, data, clean); - request_queue.enqueue((void*)&write_request); - return write_request.getResult(); -} - -long PMPool::setReducePartition( - int partitionNum, - int stageId, - int partitionId, - long size, - char* data, - bool clean) { - Request write_request(this, maxStage, 1, partitionNum, stageId, 1, 0, partitionId, size, data, clean); - request_queue.enqueue((void*)&write_request); - return write_request.getResult(); -} - -long PMPool::getMapPartition( - MemoryBlock* mb, - int stageId, - int mapId, - int partitionId ) { - return getPartition(mb, stageId, 0, mapId, partitionId); -} - -long PMPool::getReducePartition( - MemoryBlock* mb, - int stageId, - int mapId, - int partitionId ) { - return getPartition(mb, stageId, 1, mapId, partitionId); -} - -long PMPool::getPartition( - MemoryBlock* mb, - int stageId, - int typeId, - int mapId, - int partitionId ) { - //taskset(core_s, core_e); - TOID(struct PartitionArrayItem) partitionArrayItem = getPartitionBlock(stageId, typeId, mapId, partitionId); - if (TOID_IS_NULL(partitionArrayItem)) return -1; - long data_length = D_RO(partitionArrayItem)->partition_size; - mb->buf = new char[data_length](); - long off = 0; - TOID(struct PartitionBlock) partitionBlock = D_RO(partitionArrayItem)->first_block; - - char* data_addr; - while(!TOID_IS_NULL(partitionBlock)) { - data_addr = (char*)pmemobj_direct(D_RO(partitionBlock)->data); - //printf("getPartition data_addr: %p\n", data_addr); - - memcpy(mb->buf + off, data_addr, D_RO(partitionBlock)->data_size); - off += D_RO(partitionBlock)->data_size; - partitionBlock = D_RO(partitionBlock)->next_block; - } - - //printf("getPartition length is %d\n", data_length); - return data_length; -} - -int PMPool::getMapPartitionBlockInfo( - BlockInfo* block_info, - int stageId, - int mapId, - int partitionId) { - TOID(struct PartitionArrayItem) partitionArrayItem = getPartitionBlock(stageId, 0, mapId, partitionId); - if (TOID_IS_NULL(partitionArrayItem)) return -1; - TOID(struct PartitionBlock) partitionBlock = D_RO(partitionArrayItem)->first_block; - - int numBlocks = D_RO(partitionArrayItem)->numBlocks; - block_info->data = new long[numBlocks * 2](); - int i = 0; - - while(!TOID_IS_NULL(partitionBlock)) { - block_info->data[i++] = (long)pmemobj_direct(D_RO(partitionBlock)->data); - block_info->data[i++] = D_RO(partitionBlock)->data_size; - partitionBlock = D_RO(partitionBlock)->next_block; - } - - return numBlocks * 2; -} - -int PMPool::getReducePartitionBlockInfo( - BlockInfo* block_info, - int stageId, - int mapId, - int partitionId) { - TOID(struct PartitionArrayItem) partitionArrayItem = getPartitionBlock(stageId, 1, mapId, partitionId); - if (TOID_IS_NULL(partitionArrayItem)) return -1; - TOID(struct PartitionBlock) partitionBlock = D_RO(partitionArrayItem)->first_block; - - int numBlocks = D_RO(partitionArrayItem)->numBlocks; - block_info->data = new long[numBlocks * 2](); - int i = 0; - - while(!TOID_IS_NULL(partitionBlock)) { - block_info->data[i++] = (long)pmemobj_direct(D_RO(partitionBlock)->data); - block_info->data[i++] = D_RO(partitionBlock)->data_size; - partitionBlock = D_RO(partitionBlock)->next_block; - } - - return numBlocks * 2; -} - -long PMPool::getMapPartitionSize( - int stageId, - int mapId, - int partitionId ) { - TOID(struct PartitionArrayItem) partitionArrayItem = getPartitionBlock(stageId, 0, mapId, partitionId); - if (TOID_IS_NULL(partitionArrayItem)) return -1; - long data_length = D_RO(partitionArrayItem)->partition_size; - return data_length; -} - -long PMPool::getReducePartitionSize( - int stageId, - int mapId, - int partitionId ) { - TOID(struct PartitionArrayItem) partitionArrayItem = getPartitionBlock(stageId, 1, mapId, partitionId); - if (TOID_IS_NULL(partitionArrayItem)) return -1; - long data_length = D_RO(partitionArrayItem)->partition_size; - return data_length; -} - -TOID(struct PartitionArrayItem) PMPool::getPartitionBlock(int stageId, int typeId, int mapId, int partitionId) { - if(TOID_IS_NULL(D_RO(stageArrayRoot)->stageArray)){ - fprintf(stderr, "get Partition of %d_%d_%d failed: stageArray none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); - return TOID_NULL(struct PartitionArrayItem); - } - - TOID(struct StageArrayItem) stageArrayItem = D_RO(D_RO(stageArrayRoot)->stageArray)->items[stageId]; - if(TOID_IS_NULL(stageArrayItem) || TOID_IS_NULL(D_RO(stageArrayItem)->typeArray)) { - fprintf(stderr, "get Partition of %d_%d_%d failed: stageArrayItem OR typeArray none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); - return TOID_NULL(struct PartitionArrayItem); - } - - TOID(struct TypeArray) typeArray = D_RO(stageArrayItem)->typeArray; - TOID(struct TypeArrayItem) typeArrayItem = D_RO(D_RO(stageArrayItem)->typeArray)->items[typeId]; - if(TOID_IS_NULL(typeArrayItem) || TOID_IS_NULL(D_RO(typeArrayItem)->mapArray)) { - fprintf(stderr, "get Partition of %d_%d_%d failed: typeArrayItem OR mapArray none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); - return TOID_NULL(struct PartitionArrayItem); - } - - TOID(struct MapArray) mapArray = D_RO(typeArrayItem)->mapArray; - TOID(struct MapArrayItem) mapArrayItem = D_RO(D_RO(typeArrayItem)->mapArray)->items[mapId]; - if(TOID_IS_NULL(mapArrayItem) || TOID_IS_NULL(D_RO(mapArrayItem)->partitionArray)){ - fprintf(stderr, "get Partition of %d_%d_%d failed: mapArrayItem OR partitionArray none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); - return TOID_NULL(struct PartitionArrayItem); - } - - TOID(struct PartitionArrayItem) partitionArrayItem = D_RO(D_RO(mapArrayItem)->partitionArray)->items[partitionId]; - if(TOID_IS_NULL(partitionArrayItem) || TOID_IS_NULL(D_RO(partitionArrayItem)->first_block)) { - fprintf(stderr, "get Partition of %d_%d_%d failed: partitionArrayItem OR partitionBlock none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); - return TOID_NULL(struct PartitionArrayItem); - } - - return partitionArrayItem; -} - -Request::Request(PMPool* pmpool_ptr, - int maxStage, - int maxMap, - int partitionNum, - int stageId, - int typeId, - int mapId, - int partitionId, - long size, - char* data, - bool set_clean): - pmpool_ptr(pmpool_ptr), - maxStage(maxStage), - maxMap(maxMap), - partitionNum(partitionNum), - stageId(stageId), - typeId(typeId), - mapId(mapId), - partitionId(partitionId), - size(size), - data(data), - data_addr(nullptr), - set_clean(set_clean), - processed(false), - committed(false), - lck(mtx), block_lck(block_mtx) { -} - -Request::~Request() { -} - -long Request::getResult() { - { - while (!processed) { - usleep(5); - } - //cv.wait(lck, [&]{return processed;}); - } - //fprintf(stderr, "get Result for %d_%d_%d\n", stageId, mapId, partitionId); - if (data_addr == nullptr) - return -1; - return (long)data_addr; -} - -void Request::setPartition() { - TX_BEGIN(pmpool_ptr->pmpool) { - //taskset(core_s, core_e); - //cout << this << " enter setPartition tx" << endl; - //fprintf(stderr, "request for %d_%d_%d\n", stageId, mapId, partitionId); - TX_ADD(pmpool_ptr->stageArrayRoot); - // add a lock flag - - if (TOID_IS_NULL(D_RO(pmpool_ptr->stageArrayRoot)->stageArray)) { - D_RW(pmpool_ptr->stageArrayRoot)->stageArray = TX_ZALLOC(struct StageArray, sizeof(struct StageArray) + maxStage * sizeof(struct StageArrayItem)); - } - - TX_ADD_FIELD(pmpool_ptr->stageArrayRoot, stageArray); - TOID(struct StageArrayItem) *stageArrayItem = &(D_RW(D_RW(pmpool_ptr->stageArrayRoot)->stageArray)->items[stageId]); - if (TOID_IS_NULL(*stageArrayItem)) { - *stageArrayItem = TX_ZNEW(struct StageArrayItem); - } - TX_ADD(*stageArrayItem); - if (TOID_IS_NULL(D_RO(*stageArrayItem)->typeArray)) { - D_RW(*stageArrayItem)->typeArray = TX_ZALLOC(struct TypeArray, sizeof(struct TypeArray) + TYPENUM * sizeof(struct TypeArrayItem)); - } - - TX_ADD_FIELD(*stageArrayItem, typeArray); - TOID(struct TypeArrayItem) *typeArrayItem = &(D_RW(D_RW(*stageArrayItem)->typeArray)->items[typeId]); - if (TOID_IS_NULL(*typeArrayItem)) { - *typeArrayItem = TX_ZNEW(struct TypeArrayItem); - } - TX_ADD(*typeArrayItem); - if (TOID_IS_NULL(D_RO(*typeArrayItem)->mapArray)) { - D_RW(*typeArrayItem)->mapArray = TX_ZALLOC(struct MapArray, sizeof(struct MapArray) + maxMap * sizeof(struct MapArrayItem)); - } - - TX_ADD_FIELD(*typeArrayItem, mapArray); - TOID(struct MapArrayItem) *mapArrayItem = &(D_RW(D_RW(*typeArrayItem)->mapArray)->items[mapId]); - if (TOID_IS_NULL(*mapArrayItem)) { - *mapArrayItem = TX_ZNEW(struct MapArrayItem); - } - TX_ADD(*mapArrayItem); - if (TOID_IS_NULL(D_RO(*mapArrayItem)->partitionArray)) { - D_RW(*mapArrayItem)->partitionArray = TX_ZALLOC(struct PartitionArray, sizeof(struct PartitionArray) + partitionNum * sizeof(struct PartitionArrayItem)); - } - - TX_ADD_FIELD(*mapArrayItem, partitionArray); - TOID(struct PartitionArrayItem) *partitionArrayItem = &(D_RW(D_RW(*mapArrayItem)->partitionArray)->items[partitionId]); - if (TOID_IS_NULL(*partitionArrayItem)) { - *partitionArrayItem = TX_ZNEW(struct PartitionArrayItem); - } - TX_ADD(*partitionArrayItem); - TX_ADD_FIELD(*partitionArrayItem, partition_size); - - TOID(struct PartitionBlock) *partitionBlock = &(D_RW(*partitionArrayItem)->first_block); - TOID(struct PartitionBlock) *last_partitionBlock = &(D_RW(*partitionArrayItem)->last_block); - if (set_clean == false) { - // jump to last block - D_RW(*partitionArrayItem)->partition_size += size; - D_RW(*partitionArrayItem)->numBlocks += 1; - if (!TOID_IS_NULL(*last_partitionBlock)) { - partitionBlock = &(D_RW(*last_partitionBlock)->next_block); - } - } else { - //TODO: we should remove unused blocks - D_RW(*partitionArrayItem)->partition_size = size; - D_RW(*partitionArrayItem)->numBlocks = 1; - } - - TX_ADD_DIRECT(partitionBlock); - *partitionBlock = TX_ZALLOC(struct PartitionBlock, 1); - D_RW(*partitionBlock)->data = pmemobj_tx_zalloc(size, 0); - - D_RW(*partitionBlock)->data_size = size; - D_RW(*partitionBlock)->next_block = TOID_NULL(struct PartitionBlock); - D_RW(*partitionArrayItem)->last_block = *partitionBlock; - - data_addr = (char*)pmemobj_direct(D_RW(*partitionBlock)->data); - //printf("setPartition data_addr: %p\n", data_addr); - pmemobj_tx_add_range_direct((const void *)data_addr, size); - - memcpy(data_addr, data, size); - } TX_ONCOMMIT { - committed = true; - block_cv.notify_all(); - } TX_ONABORT { - fprintf(stderr, "set Partition of %d_%d_%d failed, type is %d, partitionNum is %d. Error: %s\n", stageId, mapId, partitionId, typeId, partitionNum, pmemobj_errormsg()); - exit(-1); - } TX_END - - block_cv.wait(block_lck, [&]{return committed;}); - //fprintf(stderr, "request committed %d_%d_%d\n", stageId, mapId, partitionId); - - processed = true; - //cv.notify_all(); -} +#endif diff --git a/src/main/cpp/Request.cpp b/src/main/cpp/Request.cpp new file mode 100644 index 00000000..fbb120f6 --- /dev/null +++ b/src/main/cpp/Request.cpp @@ -0,0 +1,340 @@ +#include "Request.h" +#include "PersistentMemoryPool.h" +#include +#include +#include +#include +#include + +/****** Request ******/ +TOID(struct PartitionArrayItem) Request::getPartitionBlock() { + if(TOID_IS_NULL(D_RO(pmpool_ptr->stageArrayRoot)->stageArray)){ + fprintf(stderr, "get Partition of %d_%d_%d failed: stageArray none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); + return TOID_NULL(struct PartitionArrayItem); + } + + /** get if stageId is greater than default maxStages.**/ + TOID(struct StageArray) stageArray = D_RO(pmpool_ptr->stageArrayRoot)->stageArray; + int stageArrayIndex = stageId / pmpool_ptr->maxStage; + int stageItemIndex = stageId % pmpool_ptr->maxStage; + for (int i = 0; i < stageArrayIndex; i++) { + if (TOID_IS_NULL(D_RO(stageArray)->nextStageArray)) { + fprintf(stderr, "get Partition of %d_%d_%d failed: stageArray none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); + return TOID_NULL(struct PartitionArrayItem); + } + stageArray = D_RO(stageArray)->nextStageArray; + } + /** get done **/ + + TOID(struct StageArrayItem) stageArrayItem = D_RO(stageArray)->items[stageItemIndex]; + if(TOID_IS_NULL(stageArrayItem) || TOID_IS_NULL(D_RO(stageArrayItem)->typeArray)) { + fprintf(stderr, "get Partition of %d_%d_%d failed: stageArrayItem OR typeArray none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); + return TOID_NULL(struct PartitionArrayItem); + } + + TOID(struct TypeArray) typeArray = D_RO(stageArrayItem)->typeArray; + TOID(struct TypeArrayItem) typeArrayItem = D_RO(D_RO(stageArrayItem)->typeArray)->items[typeId]; + if(TOID_IS_NULL(typeArrayItem) || TOID_IS_NULL(D_RO(typeArrayItem)->mapArray)) { + fprintf(stderr, "get Partition of %d_%d_%d failed: typeArrayItem OR mapArray none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); + return TOID_NULL(struct PartitionArrayItem); + } + + TOID(struct MapArray) mapArray = D_RO(typeArrayItem)->mapArray; + TOID(struct MapArrayItem) mapArrayItem = D_RO(D_RO(typeArrayItem)->mapArray)->items[mapId]; + if(TOID_IS_NULL(mapArrayItem) || TOID_IS_NULL(D_RO(mapArrayItem)->partitionArray)){ + fprintf(stderr, "get Partition of %d_%d_%d failed: mapArrayItem OR partitionArray none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); + return TOID_NULL(struct PartitionArrayItem); + } + + /** Do append and get if stageId is greater than default maxStages.**/ + TOID(struct PartitionArray) partitionArray = D_RO(mapArrayItem)->partitionArray; + int partitionNum = D_RO(partitionArray)->numPartitions; + int partitionArrayIndex = partitionId / partitionNum; + int partitionItemIndex = partitionId % partitionNum; + for (int i = 0; i < partitionArrayIndex; i++) { + if (TOID_IS_NULL(D_RO(partitionArray)->nextPartitionArray)) { + fprintf(stderr, "get Partition of %d_%d_%d failed: partitionArray none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); + return TOID_NULL(struct PartitionArrayItem); + } + partitionArray = D_RO(partitionArray)->nextPartitionArray; + } + /** append get done **/ + + TOID(struct PartitionArrayItem) partitionArrayItem = D_RO(partitionArray)->items[partitionItemIndex]; + if(TOID_IS_NULL(partitionArrayItem) || TOID_IS_NULL(D_RO(partitionArrayItem)->first_block)) { + fprintf(stderr, "get Partition of %d_%d_%d failed: partitionArrayItem OR partitionBlock none Exists, type id %d.\n", stageId, mapId, partitionId, typeId); + return TOID_NULL(struct PartitionArrayItem); + } + + return partitionArrayItem; +} + +TOID(struct PartitionArrayItem) Request::getOrCreatePartitionBlock(int maxStage, int maxMap, int partitionNum) { + if (TOID_IS_NULL(D_RO(pmpool_ptr->stageArrayRoot)->stageArray)) { + D_RW(pmpool_ptr->stageArrayRoot)->stageArray = TX_ZALLOC(struct StageArray, sizeof(struct StageArray) + maxStage * sizeof(struct StageArrayItem)); + } + + /** Do append and get if stageId is greater than default maxStages.**/ + TX_ADD_FIELD(pmpool_ptr->stageArrayRoot, stageArray); + TOID(struct StageArray) *stageArray = &(D_RW(pmpool_ptr->stageArrayRoot)->stageArray); + int stageArrayIndex = stageId / maxStage; + int stageItemIndex = stageId % maxStage; + for (int i = 0; i < stageArrayIndex; i++) { + if (TOID_IS_NULL(D_RO(*stageArray)->nextStageArray)) { + D_RW(*stageArray)->nextStageArray = TX_ZALLOC(struct StageArray, sizeof(struct StageArray) + maxStage * sizeof(struct StageArrayItem)); + } + stageArray = &(D_RW(*stageArray)->nextStageArray); + TX_ADD(*stageArray); + } + /** append get done **/ + + TOID(struct StageArrayItem) *stageArrayItem = &(D_RW(*stageArray)->items[stageItemIndex]); + if (TOID_IS_NULL(*stageArrayItem)) { + *stageArrayItem = TX_ZNEW(struct StageArrayItem); + } + TX_ADD(*stageArrayItem); + if (TOID_IS_NULL(D_RO(*stageArrayItem)->typeArray)) { + D_RW(*stageArrayItem)->typeArray = TX_ZALLOC(struct TypeArray, sizeof(struct TypeArray) + TYPENUM * sizeof(struct TypeArrayItem)); + } + + TX_ADD_FIELD(*stageArrayItem, typeArray); + TOID(struct TypeArrayItem) *typeArrayItem = &(D_RW(D_RW(*stageArrayItem)->typeArray)->items[typeId]); + if (TOID_IS_NULL(*typeArrayItem)) { + *typeArrayItem = TX_ZNEW(struct TypeArrayItem); + } + TX_ADD(*typeArrayItem); + if (TOID_IS_NULL(D_RO(*typeArrayItem)->mapArray)) { + D_RW(*typeArrayItem)->mapArray = TX_ZALLOC(struct MapArray, sizeof(struct MapArray) + maxMap * sizeof(struct MapArrayItem)); + } + + TX_ADD_FIELD(*typeArrayItem, mapArray); + TOID(struct MapArrayItem) *mapArrayItem = &(D_RW(D_RW(*typeArrayItem)->mapArray)->items[mapId]); + if (TOID_IS_NULL(*mapArrayItem)) { + *mapArrayItem = TX_ZNEW(struct MapArrayItem); + } + TX_ADD(*mapArrayItem); + if (TOID_IS_NULL(D_RO(*mapArrayItem)->partitionArray)) { + D_RW(*mapArrayItem)->partitionArray = TX_ZALLOC(struct PartitionArray, sizeof(struct PartitionArray) + partitionNum * sizeof(struct PartitionArrayItem)); + TX_ADD_FIELD(*mapArrayItem, partitionArray); + D_RW(D_RW(*mapArrayItem)->partitionArray)->numPartitions = partitionNum; + } else { + TX_ADD_FIELD(*mapArrayItem, partitionArray); + } + TOID(struct PartitionArray) *partitionArray = &(D_RW(*mapArrayItem)->partitionArray); + + /** Do append and get if stageId is greater than default maxStages.**/ + int partitionArrayIndex = partitionId / partitionNum; + int partitionItemIndex = partitionId % partitionNum; + + for (int i = 0; i < partitionArrayIndex; i++) { + if (TOID_IS_NULL(D_RO(*partitionArray)->nextPartitionArray)) { + D_RW(*partitionArray)->nextPartitionArray = TX_ZALLOC(struct PartitionArray, sizeof(struct PartitionArray) + partitionNum * sizeof(struct PartitionArrayItem)); + TX_ADD_FIELD(*partitionArray, nextPartitionArray); + D_RW(D_RW(*partitionArray)->nextPartitionArray)->numPartitions = partitionNum; + } else { + TX_ADD_FIELD(*partitionArray, nextPartitionArray); + } + partitionArray = &(D_RW(*partitionArray)->nextPartitionArray); + } + /** append get done **/ + + TOID(struct PartitionArrayItem) *partitionArrayItem = &(D_RW(*partitionArray)->items[partitionItemIndex]); + if (TOID_IS_NULL(*partitionArrayItem)) { + *partitionArrayItem = TX_ZNEW(struct PartitionArrayItem); + } + return *partitionArrayItem; +} + +/****** WriteRequest ******/ +void WriteRequest::exec() { + setPartition(); +} + +long WriteRequest::getResult() { + while (!processed) { + usleep(5); + } + //cv.wait(lck, [&]{return processed;}); + //fprintf(stderr, "get Result for %d_%d_%d\n", stageId, mapId, partitionId); + return (long)data_addr; +} + +void WriteRequest::setPartition() { + TX_BEGIN(pmpool_ptr->pmpool) { + TX_ADD(pmpool_ptr->stageArrayRoot); + + TOID(struct PartitionArrayItem) partitionArrayItem = getOrCreatePartitionBlock(maxStage, maxMap, partitionNum); + TX_ADD(partitionArrayItem); + TX_ADD_FIELD(partitionArrayItem, partition_size); + + TOID(struct PartitionBlock) *partitionBlock = &(D_RW(partitionArrayItem)->first_block); + TOID(struct PartitionBlock) *last_partitionBlock = &(D_RW(partitionArrayItem)->last_block); + if (set_clean == false) { + // jump to last block + D_RW(partitionArrayItem)->partition_size += size; + D_RW(partitionArrayItem)->numBlocks += 1; + if (!TOID_IS_NULL(*last_partitionBlock)) { + partitionBlock = &(D_RW(*last_partitionBlock)->next_block); + } + } else { + TOID(struct PartitionBlock) curPartitionBlock = *partitionBlock; + TOID(struct PartitionBlock) nextPartitionBlock = curPartitionBlock; + + //remove original blocks per set_clean is true + while(!TOID_IS_NULL(curPartitionBlock)) { + nextPartitionBlock = D_RO(curPartitionBlock)->next_block; + pmemobj_tx_free(D_RO(curPartitionBlock)->data); + TX_FREE(curPartitionBlock); + curPartitionBlock = nextPartitionBlock; + } + + D_RW(partitionArrayItem)->partition_size = size; + D_RW(partitionArrayItem)->numBlocks = 1; + } + + TX_ADD_DIRECT(partitionBlock); + *partitionBlock = TX_ZALLOC(struct PartitionBlock, 1); + D_RW(*partitionBlock)->data = pmemobj_tx_zalloc(size, 0); + + D_RW(*partitionBlock)->data_size = size; + D_RW(*partitionBlock)->next_block = TOID_NULL(struct PartitionBlock); + D_RW(partitionArrayItem)->last_block = *partitionBlock; + + data_addr = (char*)pmemobj_direct(D_RW(*partitionBlock)->data); + //printf("setPartition data_addr: %p\n", data_addr); + pmemobj_tx_add_range_direct((const void *)data_addr, size); + + memcpy(data_addr, data, size); + } TX_ONCOMMIT { + committed = true; + block_cv.notify_all(); + } TX_ONABORT { + fprintf(stderr, "set Partition of %d_%d_%d failed, type is %d, partitionNum is %d, maxStage is %d, maxMap is %d. Error: %s\n", stageId, mapId, partitionId, typeId, partitionNum, maxStage, maxMap, pmemobj_errormsg()); + exit(-1); + } TX_END + + block_cv.wait(block_lck, [&]{return committed;}); + //fprintf(stderr, "request committed %d_%d_%d\n", stageId, mapId, partitionId); + + processed = true; + //cv.notify_all(); +} + +/****** ReadRequest ******/ +void ReadRequest::exec() { + getPartition(); +} + +long ReadRequest::getResult() { + return data_length; +} + +void ReadRequest::getPartition() { + //taskset(core_s, core_e); + TOID(struct PartitionArrayItem) partitionArrayItem = getPartitionBlock(); + if (TOID_IS_NULL(partitionArrayItem)) return; + data_length = D_RO(partitionArrayItem)->partition_size; + mb->buf = new char[data_length](); + long off = 0; + TOID(struct PartitionBlock) partitionBlock = D_RO(partitionArrayItem)->first_block; + + char* data_addr; + while(!TOID_IS_NULL(partitionBlock)) { + data_addr = (char*)pmemobj_direct(D_RO(partitionBlock)->data); + //printf("getPartition data_addr: %p\n", data_addr); + + memcpy(mb->buf + off, data_addr, D_RO(partitionBlock)->data_size); + off += D_RO(partitionBlock)->data_size; + partitionBlock = D_RO(partitionBlock)->next_block; + } + + //printf("getPartition length is %d\n", data_length); +} + +/****** MetaRequest ******/ +void MetaRequest::exec() { + getPartitionBlockInfo(); +} + +long MetaRequest::getResult() { + return array_length; +} + +void MetaRequest::getPartitionBlockInfo() { + TOID(struct PartitionArrayItem) partitionArrayItem = getPartitionBlock(); + if (TOID_IS_NULL(partitionArrayItem)) return; + TOID(struct PartitionBlock) partitionBlock = D_RO(partitionArrayItem)->first_block; + if (TOID_IS_NULL(partitionBlock)) return; + + int numBlocks = D_RO(partitionArrayItem)->numBlocks; + array_length = numBlocks * 2; + block_info->data = new long[array_length](); + int i = 0; + + while(!TOID_IS_NULL(partitionBlock)) { + block_info->data[i++] = (long)pmemobj_direct(D_RO(partitionBlock)->data); + block_info->data[i++] = D_RO(partitionBlock)->data_size; + partitionBlock = D_RO(partitionBlock)->next_block; + } +} + +/****** SizeRequest ******/ +void SizeRequest::exec() { + getPartitionSize(); +} + +long SizeRequest::getResult() { + return data_length; +} + +void SizeRequest::getPartitionSize() { + TOID(struct PartitionArrayItem) partitionArrayItem = getPartitionBlock(); + if (TOID_IS_NULL(partitionArrayItem)) return; + data_length = D_RO(partitionArrayItem)->partition_size; +} + + +/****** DeleteRequest ******/ +void DeleteRequest::exec() { + deletePartition(); +} + +long DeleteRequest::getResult() { + while (!processed) { + usleep(5); + } + return ret; +} + +void DeleteRequest::deletePartition() { + TX_BEGIN(pmpool_ptr->pmpool) { + TOID(struct PartitionArrayItem) partitionArrayItem = getPartitionBlock(); + if (TOID_IS_NULL(partitionArrayItem)) return; + TOID(struct PartitionBlock) partitionBlock = D_RO(partitionArrayItem)->first_block; + TOID(struct PartitionBlock) nextPartitionBlock = partitionBlock; + + while(!TOID_IS_NULL(partitionBlock)) { + nextPartitionBlock = D_RO(partitionBlock)->next_block; + pmemobj_tx_free(D_RO(partitionBlock)->data); + TX_FREE(partitionBlock); + partitionBlock = nextPartitionBlock; + } + //TX_FREE(partitionArrayItem); + //*(&partitionArrayItem) = TOID_NULL(struct PartitionArrayItem); + D_RW(partitionArrayItem)->first_block = TOID_NULL(struct PartitionBlock); + D_RW(partitionArrayItem)->last_block = TOID_NULL(struct PartitionBlock); + D_RW(partitionArrayItem)->partition_size = 0; + D_RW(partitionArrayItem)->numBlocks = 0; + } TX_ONCOMMIT { + committed = true; + block_cv.notify_all(); + } TX_ONABORT { + fprintf(stderr, "delete Partition of %d_%d_%d failed, type is %d. Error: %s\n", stageId, mapId, partitionId, typeId, pmemobj_errormsg()); + exit(-1); + } TX_END + + block_cv.wait(block_lck, [&]{return committed;}); + //fprintf(stderr, "request committed %d_%d_%d\n", stageId, mapId, partitionId); + + processed = true; +} diff --git a/src/main/cpp/Request.h b/src/main/cpp/Request.h new file mode 100644 index 00000000..9c8235e8 --- /dev/null +++ b/src/main/cpp/Request.h @@ -0,0 +1,248 @@ +#ifndef PMPOOL_REQUEST_H +#define PMPOOL_REQUEST_H + +#include +#include +#include + +#define TOID_ARRAY_TYPE(x) TOID(x) +#define TOID_ARRAY(x) TOID_ARRAY_TYPE(TOID(x)) +#define TYPENUM 2 + +POBJ_LAYOUT_BEGIN(PersistentMemoryStruct); +POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct StageArrayRoot); +POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct StageArray); +POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct StageArrayItem); +POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct TypeArray); +POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct TypeArrayItem); +POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct MapArray); +POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct MapArrayItem); +POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct PartitionArray); +POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct PartitionArrayItem); +POBJ_LAYOUT_TOID(PersistentMemoryStruct, struct PartitionBlock); +POBJ_LAYOUT_END(PersistentMemoryStruct); + +struct StageArrayRoot { + TOID(struct StageArray) stageArray; +}; + +struct StageArray { + int size; + TOID(struct StageArray) nextStageArray; + TOID(struct StageArrayItem) items[]; +}; + +struct StageArrayItem { + TOID(struct TypeArray) typeArray; +}; + +struct TypeArray { + int size; + TOID(struct TypeArrayItem) items[]; +}; + +struct TypeArrayItem { + TOID(struct MapArray) mapArray; +}; + +struct MapArray { + int size; + TOID(struct MapArrayItem) items[]; +}; + +struct MapArrayItem { + TOID(struct PartitionArray) partitionArray; +}; + +struct PartitionArray { + int size; + int numPartitions; + TOID(struct PartitionArray) nextPartitionArray; + TOID(struct PartitionArrayItem) items[]; +}; + +struct PartitionArrayItem { + TOID(struct PartitionBlock) first_block; + TOID(struct PartitionBlock) last_block; + long partition_size; + int numBlocks; +}; + +struct PartitionBlock { + TOID(struct PartitionBlock) next_block; + long data_size; + PMEMoid data; +}; + +struct MemoryBlock { + char* buf; + int len; + MemoryBlock() { + } + + ~MemoryBlock() { + delete[] buf; + } +}; + +struct BlockInfo { + long* data; + BlockInfo() { + } + + ~BlockInfo() { + delete data; + } +}; + +using namespace std; +class PMPool; +class Request { +public: + Request(PMPool* pmpool_ptr, int stageId, int typeId, int mapId, int partitionId): + pmpool_ptr(pmpool_ptr), + stageId(stageId), + typeId(typeId), + mapId(mapId), + partitionId(partitionId), + processed(false), + committed(false), + lck(mtx), block_lck(block_mtx) { + } + ~Request(){} + virtual void exec() = 0; + virtual long getResult() = 0; +protected: + // add lock to make this request blocked + std::mutex mtx; + std::condition_variable cv; + bool processed; + std::unique_lock lck; + + // add lock to make func blocked + std::mutex block_mtx; + std::condition_variable block_cv; + bool committed; + std::unique_lock block_lck; + + int stageId; + int typeId; + int mapId; + int partitionId; + PMPool* pmpool_ptr; + + TOID(struct PartitionArrayItem) getOrCreatePartitionBlock(int maxStage, int maxMap, int partitionNum); + TOID(struct PartitionArrayItem) getPartitionBlock(); +}; + +class WriteRequest : Request { +public: + char* data_addr; //Pmem Block Addr + WriteRequest(PMPool* pmpool_ptr, + int maxStage, + int maxMap, + int partitionNum, + int stageId, + int typeId, + int mapId, + int partitionId, + long size, + char* data, + bool set_clean): + Request(pmpool_ptr, stageId, typeId, mapId, partitionId), + maxStage(maxStage), + maxMap(maxMap), + partitionNum(partitionNum), + size(size), + data(data), + data_addr(nullptr), + set_clean(set_clean){ + } + ~WriteRequest(){} + void exec(); + long getResult(); +private: + int maxStage; + int maxMap; + int partitionNum; + + long size; + char *data; + bool set_clean; + + void setPartition(); +}; + +class ReadRequest : Request { +public: + ReadRequest(PMPool* pmpool_ptr, + MemoryBlock *mb, + int stageId, + int typeId, + int mapId, + int partitionId): + Request(pmpool_ptr, stageId, typeId, mapId, partitionId), mb(mb){ + } + ~ReadRequest(){} + void exec(); + long getResult(); +private: + MemoryBlock *mb; + long data_length = -1; + void getPartition(); +}; + +class MetaRequest : Request { +public: + MetaRequest(PMPool* pmpool_ptr, + BlockInfo* block_info, + int stageId, + int typeId, + int mapId, + int partitionId): + Request(pmpool_ptr, stageId, typeId, mapId, partitionId), block_info(block_info){ + } + ~MetaRequest(){} + void exec(); + long getResult(); +private: + BlockInfo* block_info; + long array_length = -1; + void getPartitionBlockInfo(); +}; + +class SizeRequest: Request { +public: + SizeRequest(PMPool* pmpool_ptr, + int stageId, + int typeId, + int mapId, + int partitionId): + Request(pmpool_ptr, stageId, typeId, mapId, partitionId){ + } + ~SizeRequest(){} + void exec(); + long getResult(); +private: + long data_length = -1; + void getPartitionSize(); +}; + +class DeleteRequest: Request { +public: + DeleteRequest(PMPool* pmpool_ptr, + int stageId, + int typeId, + int mapId, + int partitionId): + Request(pmpool_ptr, stageId, typeId, mapId, partitionId){ + } + ~DeleteRequest(){} + void exec(); + long getResult(); +private: + long ret = -1; + void deletePartition(); +}; + +#endif diff --git a/src/main/cpp/WorkQueue.h b/src/main/cpp/WorkQueue.h index 625dcccb..99a017df 100644 --- a/src/main/cpp/WorkQueue.h +++ b/src/main/cpp/WorkQueue.h @@ -1,5 +1,4 @@ #ifndef WORKQUEUE_H - std::unique_lock lck; #define WORKQUEUE_H #include @@ -7,19 +6,18 @@ #include #include #include +#include template class WorkQueue{ public: typedef T queue_type; std::queue _queue; std::mutex _queue_lock; - std::mutex cond_lock; - //std::mutex::scoped_lock scope_cond_lock; - std::condition_variable m_cond; - std::unique_lock unique_lock; + //std::mutex cond_lock; + //std::condition_variable m_cond; + //std::unique_lock unique_lock; - //WorkQueue():scope_cond_lock(cond_lock){} - WorkQueue():unique_lock(cond_lock){} + WorkQueue(){} void enqueue( queue_type _work ){ std::lock_guard guard(_queue_lock); @@ -52,8 +50,8 @@ template class WorkQueue{ return this->_queue.size(); } - void wake_all(){ + /*void wake_all(){ m_cond.notify_all(); - } + }*/ }; #endif diff --git a/src/main/cpp/lib_jni_pmdk.cpp b/src/main/cpp/lib_jni_pmdk.cpp index 46767f0b..2a8095e5 100644 --- a/src/main/cpp/lib_jni_pmdk.cpp +++ b/src/main/cpp/lib_jni_pmdk.cpp @@ -11,26 +11,26 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_ } JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeSetMapPartition - (JNIEnv *env, jclass obj, jlong pmpool, jint partitionNum, jint stageId, jint mapId, jint partitionId, long pmBuffer, jboolean set_clean) { + (JNIEnv *env, jclass obj, jlong pmpool, jint partitionNum, jint stageId, jint mapId, jint partitionId, long pmBuffer, jboolean set_clean, jint numMaps) { int size = ((PmemBuffer*)pmBuffer)->getRemaining(); char* buf = ((PmemBuffer*)pmBuffer)->getDataForFlush(size); if (buf == nullptr) { return -1; } //printf("nativeSetMapPartition for shuffle_%d_%d_%d\n", stageId, mapId, partitionId); - long addr = ((PMPool*)pmpool)->setMapPartition(partitionNum, stageId, mapId, partitionId, size, buf, set_clean); + long addr = ((PMPool*)pmpool)->setMapPartition(partitionNum, stageId, mapId, partitionId, size, buf, set_clean, numMaps); return addr; } JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeSetReducePartition - (JNIEnv *env, jclass obj, jlong pmpool, jint partitionNum, jint stageId, jint partitionId, long pmBuffer, jboolean clean) { + (JNIEnv *env, jclass obj, jlong pmpool, jint partitionNum, jint stageId, jint partitionId, long pmBuffer, jboolean clean, jint numMaps) { int size = ((PmemBuffer*)pmBuffer)->getRemaining(); char* buf = ((PmemBuffer*)pmBuffer)->getDataForFlush(size); if (buf == nullptr) { return -1; } //printf("nativeSetMapPartition for spill_%d_%d\n", stageId, partitionId); - long addr = ((PMPool*)pmpool)->setReducePartition(partitionNum, stageId, partitionId, size, buf, clean); + long addr = ((PMPool*)pmpool)->setReducePartition(partitionNum, stageId, partitionId, size, buf, clean, numMaps); return addr; } @@ -86,6 +86,16 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_ return ((PMPool*)pmpool)->getReducePartitionSize(stageId, mapId, partitionId); } +JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeDeleteMapPartition + (JNIEnv *env, jclass obj, jlong pmpool, jint stageId, jint mapId, jint partitionId) { + return ((PMPool*)pmpool)->deleteMapPartition(stageId, mapId, partitionId); + } + +JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeDeleteReducePartition + (JNIEnv *env, jclass obj, jlong pmpool, jint stageId, jint mapId, jint partitionId) { + return ((PMPool*)pmpool)->deleteReducePartition(stageId, mapId, partitionId); + } + JNIEXPORT jint JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeCloseDevice (JNIEnv *env, jclass obj, jlong pmpool) { delete (PMPool*)pmpool; diff --git a/src/main/cpp/lib_jni_pmdk.h b/src/main/cpp/lib_jni_pmdk.h index e258e8f3..1b9bf0c6 100644 --- a/src/main/cpp/lib_jni_pmdk.h +++ b/src/main/cpp/lib_jni_pmdk.h @@ -18,18 +18,18 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_ /* * Class: lib_jni_pmdk * Method: nativeSetMapPartition - * Signature: (JIIIIJZ)J + * Signature: (JIIIIJZI)J */ JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeSetMapPartition - (JNIEnv *, jclass, jlong, jint, jint, jint, jint, jlong, jboolean); + (JNIEnv *, jclass, jlong, jint, jint, jint, jint, jlong, jboolean, jint); /* * Class: lib_jni_pmdk * Method: nativeSetReducePartition - * Signature: (JIIIIJZ)J + * Signature: (JIIIIJZI)J */ JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeSetReducePartition - (JNIEnv *, jclass, jlong, jint, jint, jint, jlong, jboolean); + (JNIEnv *, jclass, jlong, jint, jint, jint, jlong, jboolean, jint); /* * Class: lib_jni_pmdk @@ -65,7 +65,7 @@ JNIEXPORT jlongArray JNICALL Java_org_apache_spark_storage_pmof_PersistentMemory /* * Class: lib_jni_pmdk - * Method: nativeReduceMapPartitionBlockInfo + * Method: nativeGetReduceMapPartitionBlockInfo * Signature: (JIII)[J */ JNIEXPORT jlongArray JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeGetReducePartitionBlockInfo @@ -87,6 +87,22 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_ JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeGetReducePartitionSize (JNIEnv *, jclass, jlong, jint, jint, jint); +/* + * Class: lib_jni_pmdk + * Method: nativeDeleteMapPartition + * Signature: (JIII)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeDeleteMapPartition + (JNIEnv *, jclass, jlong, jint, jint, jint); + +/* + * Class: lib_jni_pmdk + * Method: nativeDeleteReducePartition + * Signature: (JIII)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeDeleteReducePartition + (JNIEnv *, jclass, jlong, jint, jint, jint); + /* * Class: lib_jni_pmdk * Method: nativeGetRoot diff --git a/src/main/cpp/make b/src/main/cpp/make index 6971fe72..925e5088 100755 --- a/src/main/cpp/make +++ b/src/main/cpp/make @@ -1,2 +1,2 @@ #!/usr/bin/sh -g++ lib_jni_pmdk.cpp -o libjnipmdk.so -I$JAVA_HOME -I$JAVA_HOME/include/linux -I$JAVA_HOME/include -std=c++11 -lpmemobj -lpthread -fPIC -shared -g +g++ lib_jni_pmdk.cpp -o libjnipmdk.so PersistentMemoryPool.cpp Request.cpp -I$JAVA_HOME -I$JAVA_HOME/include/linux -I$JAVA_HOME/include -std=c++11 -lpmemobj -lpthread -fPIC -shared -g diff --git a/src/main/java/org/apache/spark/storage/pmof/PersistentMemoryPool.java b/src/main/java/org/apache/spark/storage/pmof/PersistentMemoryPool.java index 42cc46d2..a96c3fad 100644 --- a/src/main/java/org/apache/spark/storage/pmof/PersistentMemoryPool.java +++ b/src/main/java/org/apache/spark/storage/pmof/PersistentMemoryPool.java @@ -6,14 +6,16 @@ public class PersistentMemoryPool { System.load("/usr/local/lib/libjnipmdk.so"); } private static native long nativeOpenDevice(String path, int maxStage, int maxMap, long size); - private static native long nativeSetMapPartition(long deviceHandler, int numPartitions, int stageId, int mapId, int partutionId, long pmemBufferHandler, boolean clean); - private static native long nativeSetReducePartition(long deviceHandler, int numPartitions, int stageId, int partutionId, long pmemBufferHandler, boolean clean); + private static native long nativeSetMapPartition(long deviceHandler, int numPartitions, int stageId, int mapId, int partutionId, long pmemBufferHandler, boolean clean, int numMaps); + private static native long nativeSetReducePartition(long deviceHandler, int numPartitions, int stageId, int partutionId, long pmemBufferHandler, boolean clean, int numMaps); private static native byte[] nativeGetMapPartition(long deviceHandler, int stageId, int mapId, int partitionId); private static native byte[] nativeGetReducePartition(long deviceHandler, int stageId, int mapId, int partitionId); private static native long[] nativeGetMapPartitionBlockInfo(long deviceHandler, int stageId, int mapId, int partitionId); private static native long[] nativeGetReducePartitionBlockInfo(long deviceHandler, int stageId, int mapId, int partitionId); private static native long nativeGetMapPartitionSize(long deviceHandler, int stageId, int mapId, int partitionId); private static native long nativeGetReducePartitionSize(long deviceHandler, int stageId, int mapId, int partitionId); + private static native long nativeDeleteMapPartition(long deviceHandler, int stageId, int mapId, int partitionId); + private static native long nativeDeleteReducePartition(long deviceHandler, int stageId, int mapId, int partitionId); private static native long nativeGetRoot(long deviceHandler); private static native int nativeCloseDevice(long deviceHandler); @@ -32,12 +34,12 @@ public class PersistentMemoryPool { this.deviceHandler = nativeOpenDevice(path, max_stages_num, max_shuffles_num, pool_size); } - public long setMapPartition(int partitionNum, int stageId, int shuffleId, int partitionId, PmemBuffer buf, boolean set_clean) { - return nativeSetMapPartition(this.deviceHandler, partitionNum, stageId, shuffleId, partitionId, buf.getNativeObject(), set_clean); + public long setMapPartition(int partitionNum, int stageId, int shuffleId, int partitionId, PmemBuffer buf, boolean set_clean, int numMaps) { + return nativeSetMapPartition(this.deviceHandler, partitionNum, stageId, shuffleId, partitionId, buf.getNativeObject(), set_clean, numMaps); } - public long setReducePartition(int partitionNum, int stageId, int partitionId, PmemBuffer buf, boolean set_clean) { - return nativeSetReducePartition(this.deviceHandler, partitionNum, stageId, partitionId, buf.getNativeObject(), set_clean); + public long setReducePartition(int partitionNum, int stageId, int partitionId, PmemBuffer buf, boolean set_clean, int numMaps) { + return nativeSetReducePartition(this.deviceHandler, partitionNum, stageId, partitionId, buf.getNativeObject(), set_clean, numMaps); } public byte[] getMapPartition(int stageId, int shuffleId, int partitionId) { @@ -64,6 +66,14 @@ public long getReducePartitionSize(int stageId, int shuffleId, int partitionId) return nativeGetReducePartitionSize(this.deviceHandler, stageId, shuffleId, partitionId); } + public long deleteMapPartition(int stageId, int shuffleId, int partitionId) { + return nativeDeleteMapPartition(this.deviceHandler, stageId, shuffleId, partitionId); + } + + public long deleteReducePartition(int stageId, int shuffleId, int partitionId) { + return nativeDeleteReducePartition(this.deviceHandler, stageId, shuffleId, partitionId); + } + public long getRootAddr() { return nativeGetRoot(this.deviceHandler); } diff --git a/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleReader.scala b/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleReader.scala index 1ce41994..f9291a49 100644 --- a/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleReader.scala +++ b/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleReader.scala @@ -100,7 +100,7 @@ private[spark] class PmemShuffleReader[K, C]( // Create an ExternalSorter to sort the data. val sorter = new PmemExternalSorter[K, C, C](context, handle, ordering = Some(keyOrd), serializer = dep.serializer) - logInfo("call PmemExternalSorter.insertAll for shuffle_0_" + handle.shuffleId + "_[" + startPartition + "," + endPartition + "]" ) + logDebug("call PmemExternalSorter.insertAll for shuffle_0_" + handle.shuffleId + "_[" + startPartition + "," + endPartition + "]" ) sorter.insertAll(aggregatedIter) // Use completion callback to stop sorter if task was finished/cancelled. context.addTaskCompletionListener(_ => { diff --git a/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriter.scala b/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriter.scala index 6ae4d47f..6d30b2b3 100644 --- a/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriter.scala +++ b/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriter.scala @@ -52,10 +52,6 @@ private[spark] class PmemShuffleWriter[K, V, C]( val enable_rdma: Boolean = conf.getBoolean("spark.shuffle.pmof.enable_rdma", defaultValue = true) val enable_pmem: Boolean = conf.getBoolean("spark.shuffle.pmof.enable_pmem", defaultValue = true) - val maxPoolSize: Long = conf.getLong("spark.shuffle.pmof.pmpool_size", defaultValue = 1073741824) - val maxStages: Int = conf.getInt("spark.shuffle.pmof.max_stage_num", defaultValue = 1000) - val maxMaps: Int = numMaps - val partitionLengths: Array[Long] = Array.fill[Long](numPartitions)(0) var set_clean: Boolean = true private var sorter: PmemExternalSorter[K, V, _] = null @@ -107,16 +103,24 @@ private[spark] class PmemShuffleWriter[K, V, C]( partitionBufferArray(partitionId).maybeSpill(force = true) } } - val data_addr_map = Array.ofDim[(Long, Int)](numPartitions, 1) + + var numSpilledPartitions = 0 + while( numSpilledPartitions < numPartitions && partitionBufferArray(numSpilledPartitions).ifSpilled ) { + numSpilledPartitions += 1 + } + val data_addr_map = Array.ofDim[(Long, Int)](numSpilledPartitions, 1) var output_str : String = "" - for (i <- 0 until numPartitions) { + for (i <- 0 until numSpilledPartitions) { data_addr_map(i) = partitionBufferArray(i).getPartitionMeta.map{info => (info._1, info._2)} writeMetrics.incRecordsWritten(partitionBufferArray(i).records) partitionLengths(i) = partitionBufferArray(i).size output_str += "\tPartition " + i + ": " + partitionLengths(i) + ", records: " + partitionBufferArray(i).records + "\n" partitionBufferArray(i).close() } + for (i <- numSpilledPartitions until numPartitions) { + partitionBufferArray(i).close() + } logDebug("shuffle_" + dep.shuffleId + "_" + mapId + ": \n" + output_str) diff --git a/src/main/scala/org/apache/spark/storage/pmof/PersistentMemoryHandler.scala b/src/main/scala/org/apache/spark/storage/pmof/PersistentMemoryHandler.scala index 65c9f2d2..e3fb334b 100644 --- a/src/main/scala/org/apache/spark/storage/pmof/PersistentMemoryHandler.scala +++ b/src/main/scala/org/apache/spark/storage/pmof/PersistentMemoryHandler.scala @@ -34,7 +34,7 @@ private[spark] class PersistentMemoryHandler( val path_list: List[String], val shuffleId: String, val maxStages: Int = 1000, - val maxShuffles: Int = 1000, + val numMaps: Int = 1000, var poolSize: Long = -1) extends Logging { // need to use a locked file to get which pmem device should be used. val pmMetaHandler: PersistentMemoryMetaHandler = new PersistentMemoryMetaHandler(root_dir) @@ -43,7 +43,7 @@ private[spark] class PersistentMemoryHandler( //this shuffleId haven't been written before, choose a new device val path_array_list = new java.util.ArrayList[String](path_list.asJava) device = pmMetaHandler.getUnusedDevice(path_array_list) - logInfo("This a new shuffleBlock, find an unused device:" + device + ", numMaps of this stage is " + maxShuffles) + logInfo("This a new shuffleBlock, find an unused device:" + device + ", numMaps of this stage is " + numMaps) val dev = Paths.get(device) if (Files.isDirectory(dev)) { @@ -54,10 +54,10 @@ private[spark] class PersistentMemoryHandler( poolSize = 0 } } else { - logInfo("This a recently opened shuffleBlock, use the original device:" + device + ", numMaps of this stage is " + maxShuffles) + logInfo("This a recently opened shuffleBlock, use the original device:" + device + ", numMaps of this stage is " + numMaps) } - val pmpool = new PersistentMemoryPool(device, maxStages, maxShuffles, poolSize) + val pmpool = new PersistentMemoryPool(device, maxStages, numMaps, poolSize) var rkey: Long = 0 def getDevice(): String = { @@ -93,13 +93,13 @@ private[spark] class PersistentMemoryHandler( pmpool.getReducePartitionSize(stageId, shuffleId, partitionId) } - def setPartition(numPartitions: Int, blockId: String, buf: PmemBuffer, clean: Boolean): Long = { + def setPartition(numPartitions: Int, blockId: String, buf: PmemBuffer, clean: Boolean, numMaps: Int = 1): Long = { val (blockType, stageId, shuffleId, partitionId) = getBlockDetail(blockId) var ret_addr: Long = 0 if (blockType == "shuffle") { - ret_addr = pmpool.setMapPartition(numPartitions, stageId, shuffleId, partitionId, buf, clean) + ret_addr = pmpool.setMapPartition(numPartitions, stageId, shuffleId, partitionId, buf, clean, numMaps) } else if (blockType == "reduce_spill") { - ret_addr = pmpool.setReducePartition(10000000/*TODO: should be incrementable*/, stageId, partitionId, buf, clean) + ret_addr = pmpool.setReducePartition(10000, stageId, partitionId, buf, clean, numMaps) } ret_addr } @@ -113,13 +113,21 @@ private[spark] class PersistentMemoryHandler( if (blockType == "shuffle") { pmpool.getMapPartition(stageId, shuffleId, partitionId) } else if (blockType == "reduce_spill") { - logInfo("getReducePartition: partitionId is " + partitionId) pmpool.getReducePartition(stageId, shuffleId, partitionId) } else { new Array[Byte](0) } } + def deletePartition(blockId: String): Unit = { + val (blockType, stageId, shuffleId, partitionId) = getBlockDetail(blockId) + if (blockType == "shuffle") { + pmpool.deleteMapPartition(stageId, shuffleId, partitionId) + } else if (blockType == "reduce_spill") { + pmpool.deleteReducePartition(stageId, shuffleId, partitionId) + } + } + def getPartitionManagedBuffer(blockId: String): ManagedBuffer = { new PmemManagedBuffer(this, blockId) } diff --git a/src/main/scala/org/apache/spark/storage/pmof/PmemBlockObjectStream.scala b/src/main/scala/org/apache/spark/storage/pmof/PmemBlockObjectStream.scala index 1ee5a614..3169e847 100644 --- a/src/main/scala/org/apache/spark/storage/pmof/PmemBlockObjectStream.scala +++ b/src/main/scala/org/apache/spark/storage/pmof/PmemBlockObjectStream.scala @@ -60,7 +60,7 @@ private[spark] class PmemBlockObjectStream( var objStream: SerializationStream = _ var wrappedStream: OutputStream = _ val bytesStream: OutputStream = new PmemOutputStream( - persistentMemoryWriter, numPartitions, blockId.name) + persistentMemoryWriter, numPartitions, blockId.name, numMaps) var inputStream: InputStream = _ override def write(key: Any, value: Any): Unit = { diff --git a/src/main/scala/org/apache/spark/storage/pmof/PmemInputStream.scala b/src/main/scala/org/apache/spark/storage/pmof/PmemInputStream.scala index e3bb8e86..c1388e54 100644 --- a/src/main/scala/org/apache/spark/storage/pmof/PmemInputStream.scala +++ b/src/main/scala/org/apache/spark/storage/pmof/PmemInputStream.scala @@ -6,14 +6,14 @@ import org.apache.spark.internal.Logging import scala.util.control.Breaks._ class PmemInputStream( - persistentMemoryWriter: PersistentMemoryHandler, + persistentMemoryHandler: PersistentMemoryHandler, blockId: String ) extends InputStream with Logging { val buf = new PmemBuffer() var index: Int = 0 var remaining: Int = 0 - var available_bytes: Int = persistentMemoryWriter.getPartitionSize(blockId).toInt - val blockInfo: Array[(Long, Int)] = persistentMemoryWriter.getPartitionBlockInfo(blockId) + var available_bytes: Int = persistentMemoryHandler.getPartitionSize(blockId).toInt + val blockInfo: Array[(Long, Int)] = persistentMemoryHandler.getPartitionBlockInfo(blockId) def loadNextStream(): Int = { if (index >= blockInfo.length) @@ -71,4 +71,9 @@ class PmemInputStream( override def close(): Unit = { buf.close() } + + def deleteBlock(): Unit = { + // FIXME: DELETE PMEM PARTITON HERE + persistentMemoryHandler.deletePartition(blockId) + } } diff --git a/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala b/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala index 392145b1..9e6173a2 100644 --- a/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala +++ b/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala @@ -7,7 +7,8 @@ import org.apache.spark.internal.Logging class PmemOutputStream( persistentMemoryWriter: PersistentMemoryHandler, numPartitions: Int, - blockId: String + blockId: String, + numMaps: Int ) extends OutputStream with Logging { val buf = new PmemBuffer() var set_clean = true @@ -24,7 +25,7 @@ class PmemOutputStream( } override def flush(): Unit = { - persistentMemoryWriter.setPartition(numPartitions, blockId, buf, set_clean) + persistentMemoryWriter.setPartition(numPartitions, blockId, buf, set_clean, numMaps) if (set_clean == true) { set_clean = false } diff --git a/src/main/scala/org/apache/spark/util/collection/pmof/PmemExternalSorter.scala b/src/main/scala/org/apache/spark/util/collection/pmof/PmemExternalSorter.scala index 2b1f4148..0febd721 100644 --- a/src/main/scala/org/apache/spark/util/collection/pmof/PmemExternalSorter.scala +++ b/src/main/scala/org/apache/spark/util/collection/pmof/PmemExternalSorter.scala @@ -356,6 +356,7 @@ private[spark] class PmemExternalSorter[K, V, C]( if (inObjStream == null) { if (inStream != null) { inStream.close() + inStream.asInstanceOf[PmemInputStream].deleteBlock() } return null }