Skip to content

Commit

Permalink
FOGL-7814: Move readings in south plugin ingest. (#1096)
Browse files Browse the repository at this point in the history
* FOGL-7814: Move readings in south plugin ingest.
- Added function to transfer readings vector in reading_set class.
- Added function to remove and return a reading from reading vector in reading_set class.
- Changed plugin ingest for poll and async using moveAllReadings function added to reading_set class.

Signed-off-by: Himanshu Vimal <[email protected]>
  • Loading branch information
cyberwalk3r authored Jul 18, 2023
1 parent 8eadda1 commit b93b276
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 36 deletions.
5 changes: 5 additions & 0 deletions C/common/include/reading_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ class ReadingSet {
// Return the reference of readings
std::vector<Reading *>* getAllReadingsPtr() { return &m_readings; };

// Remove readings from reading set and return reference to readings
std::vector<Reading *>* moveAllReadings();
// Delete a reading from reading set and return pointer of deleted reading
Reading* removeReading(unsigned long id);

// Return the reading id of the last data element
unsigned long getLastId() const { return m_last_id; };
unsigned long getReadingId(uint32_t pos);
Expand Down
30 changes: 30 additions & 0 deletions C/common/reading_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,36 @@ ReadingSet::clear()
m_readings.clear();
}

/**
* Remove readings from the vector and return a reference to new vector
* containing readings*
*/
std::vector<Reading*>* ReadingSet::moveAllReadings()
{
std::vector<Reading*>* transferredPtr = new std::vector<Reading*>(std::move(m_readings));
m_count = 0;
m_last_id = 0;
m_readings.clear();

return transferredPtr;
}

/**
* Remove reading from vector based on index and return its pointer
*/
Reading* ReadingSet::removeReading(unsigned long id)
{
if (id >= m_readings.size()) {
return nullptr;
}

Reading* reading = m_readings[id];
m_readings.erase(m_readings.begin() + id);
m_count--;

return reading;
}

/**
* Return the ID of the nth reading in the reading set
*
Expand Down
7 changes: 2 additions & 5 deletions C/services/south/ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1003,11 +1003,8 @@ void Ingest::useFilteredData(OUTPUT_HANDLE *outHandle,
}
else
{
ingest->m_data = new std::vector<Reading *>;
for (auto & r : readingSet->getAllReadings())
{
ingest->m_data->emplace_back(new Reading(*r)); // Need to copy reading objects here, since "del readingSet" below would remove encapsulated reading objects also
}
// move reading vector to ingest
ingest->m_data = readingSet->moveAllReadings();
}
}
readingSet->clear();
Expand Down
47 changes: 16 additions & 31 deletions C/services/south/south.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,23 +202,16 @@ void doIngest(Ingest *ingest, Reading reading)

void doIngestV2(Ingest *ingest, ReadingSet *set)
{
std::vector<Reading *> *vec = set->getAllReadingsPtr();
std::vector<Reading *> *vec2 = new std::vector<Reading *>;
if (!vec)
{
Logger::getLogger()->info("%s:%d: V2 async ingest method: vec is NULL", __FUNCTION__, __LINE__);
return;
}
else
{
// TODO Remove the need for this copy of all the readings
for (auto & r : *vec)
{
Reading *r2 = new Reading(*r); // Need to copy reading objects here, since "del set" below would remove encapsulated reading objects also
vec2->emplace_back(r2);
}
}
Logger::getLogger()->debug("%s:%d: V2 async ingest method returned: vec->size()=%d", __FUNCTION__, __LINE__, vec->size());
std::vector<Reading *> *vec = set->getAllReadingsPtr();
if (!vec)
{
Logger::getLogger()->info("%s:%d: V2 async ingest method: vec is NULL", __FUNCTION__, __LINE__);
return;
}
// move reading vector from set to new vector vec2
std::vector<Reading *> *vec2 = set->moveAllReadings();

Logger::getLogger()->debug("%s:%d: V2 async ingest method returned: vec->size()=%d", __FUNCTION__, __LINE__, vec->size());

ingest->ingest(vec2);
delete vec2; // each reading object inside vector has been allocated on heap and moved to Ingest class's internal queue
Expand Down Expand Up @@ -565,25 +558,17 @@ void SouthService::start(string& coreAddress, unsigned short corePort)
if (set)
{
std::vector<Reading *> *vec = set->getAllReadingsPtr();
std::vector<Reading *> *vec2 = new std::vector<Reading *>;
if (!vec)
{
Logger::getLogger()->info("%s:%d: V2 poll method: vec is NULL", __FUNCTION__, __LINE__);
continue;
}
else
{
for (auto & r : *vec)
{
Reading *r2 = new Reading(*r); // Need to copy reading objects here, since "del set" below would remove encapsulated reading objects
vec2->emplace_back(r2);
}
}

ingest.ingest(vec2);
pollCount += (int) vec2->size();
delete vec2; // each reading object inside vector has been allocated on heap and moved to Ingest class's internal queue
delete set;
// move reading vector from set to vec2
std::vector<Reading *> *vec2 = set->moveAllReadings();
ingest.ingest(vec2);
pollCount += (int) vec2->size();
delete vec2; // each reading object inside vector has been allocated on heap and moved to Ingest class's internal queue
delete set;
}
}
throttlePoll();
Expand Down

0 comments on commit b93b276

Please sign in to comment.