Skip to content

Commit

Permalink
#989 Added first basic working impl of a virtualized session table
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisjstevo committed Dec 29, 2023
1 parent 1e9ca7b commit f6480a4
Show file tree
Hide file tree
Showing 38 changed files with 315 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//import org.finos.vuu.core.module.FieldDefString
//import org.finos.vuu.core.module.basket.BasketModule.{BasketConstituentColumnNames => BC}
//import org.finos.vuu.core.module.basket.provider.BasketConstituentProvider
//import org.finos.vuu.core.table.{Column, Columns, SimpleDataTable, ViewPortColumnCreator}
//import org.finos.vuu.core.table.{Column, Columns, InMemDataTable, ViewPortColumnCreator}
//import org.scalatest.BeforeAndAfter
//import org.scalatest.featurespec.AnyFeatureSpec
//import org.scalatest.matchers.should.Matchers
Expand All @@ -32,7 +32,7 @@
// VisualLinks(),
// joinFields = BC.RicBasketId
// )
// val table = new SimpleDataTable(tableDef, joinProvider)
// val table = new InMemDataTable(tableDef, joinProvider)
// val provider = new BasketConstituentProvider(table)
// val columns: Array[Column] = provider.table.getTableDef.columns
// val headers: Array[String] = columns.map(_.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//import org.finos.vuu.core.module.FieldDefString
//import org.finos.vuu.core.module.basket.BasketModule.{BasketColumnNames => B}
//import org.finos.vuu.core.module.basket.provider.BasketProvider
//import org.finos.vuu.core.table.{Column, Columns, SimpleDataTable, ViewPortColumnCreator}
//import org.finos.vuu.core.table.{Column, Columns, InMemDataTable, ViewPortColumnCreator}
//import org.scalatest.BeforeAndAfter
//import org.scalatest.featurespec.AnyFeatureSpec
//import org.scalatest.matchers.should.Matchers
Expand All @@ -30,7 +30,7 @@
// VisualLinks(),
// joinFields = B.Id
// )
// val table = new SimpleDataTable(tableDef, joinProvider)
// val table = new InMemDataTable(tableDef, joinProvider)
// val provider = new BasketProvider(table)
// val columns: Array[Column] = provider.table.getTableDef.columns
// val headers: Array[String] = columns.map(_.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import org.finos.toolbox.lifecycle.LifecycleContainer
import org.finos.toolbox.text.AsciiUtil
import org.finos.toolbox.time.TestFriendlyClock
import org.finos.vuu.api.TableDef
import org.finos.vuu.core.table.{Columns, SimpleDataTable, ViewPortColumnCreator}
import org.finos.vuu.core.table.{Columns, InMemDataTable, ViewPortColumnCreator}
import org.scalatest.featurespec.AnyFeatureSpec
import org.scalatest.matchers.should.Matchers

Expand Down Expand Up @@ -46,7 +46,7 @@ class SimulatedPricesProviderTest extends AnyFeatureSpec with Matchers {

val pricesDef = getDef

val table = new SimpleDataTable(pricesDef, joinProvider)
val table = new InMemDataTable(pricesDef, joinProvider)

val provider = new SimulatedPricesProvider(table)

Expand Down
6 changes: 3 additions & 3 deletions plugin/ignite-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package org.finos.vuu.feature.ignite.table

import org.finos.toolbox.jmx.MetricsProvider
import org.finos.toolbox.time.Clock
import org.finos.vuu.core.table.SimpleSessionDataTable
import org.finos.vuu.core.table.InMemSessionDataTable
import org.finos.vuu.feature.ignite.api.IgniteSessionTableDef
import org.finos.vuu.net.ClientSessionId
import org.finos.vuu.provider.JoinTableProvider

class IgniteVirtualizedSessionTable(clientSessionId: ClientSessionId, sessionTableDef: IgniteSessionTableDef, joinTableProvider: JoinTableProvider)(implicit metrics: MetricsProvider, clock: Clock) extends SimpleSessionDataTable(clientSessionId, sessionTableDef, joinTableProvider)(metrics, clock) {
class IgniteVirtualizedSessionTable(clientSessionId: ClientSessionId, sessionTableDef: IgniteSessionTableDef, joinTableProvider: JoinTableProvider)(implicit metrics: MetricsProvider, clock: Clock) extends InMemSessionDataTable(clientSessionId, sessionTableDef, joinTableProvider)(metrics, clock) {


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.finos.vuu.table.virtualized

import org.finos.vuu.core.table.RowData
import org.finos.vuu.table.virtualized.cache.GuavaRollingRowDataCache

trait RollingCache[KEY, VALUE] {
def put(key: KEY, v: VALUE): Unit
def get(key: KEY): Option[VALUE]
def remove(key: KEY): Unit
def removeAll(): Unit
}

object RowDataCache{
def apply(cacheSize: Int): RollingCache[String, RowData] = {
new GuavaRollingRowDataCache(cacheSize)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.finos.vuu.table.virtualized

import org.finos.toolbox.collection.window.{ArrayBackedMovingWindow, MovingWindow}

object RollingKeysWindow {
def apply(cacheSize: Int): MovingWindow[String] = {
new ArrayBackedMovingWindow[String](cacheSize)
}

}
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
package org.finos.vuu.table.virtualized

case class VirtualizedRange(from: Int, to: Int, size: Int)
case class VirtualizedRange(from: Int, to: Int){
def contains(index: Int): Boolean = {
index >= from && index < to
}
}
Original file line number Diff line number Diff line change
@@ -1,42 +1,69 @@
package org.finos.vuu.table.virtualized

import com.typesafe.scalalogging.StrictLogging
import org.finos.toolbox.collection.array.ImmutableArray
import org.finos.toolbox.jmx.MetricsProvider
import org.finos.toolbox.time.Clock
import org.finos.vuu.api.SessionTableDef
import org.finos.vuu.core.table.{SimpleSessionDataTable, TablePrimaryKeys}
import org.finos.vuu.core.table.{InMemSessionDataTable, RowWithData, TableData, TablePrimaryKeys}
import org.finos.vuu.net.ClientSessionId
import org.finos.vuu.provider.JoinTableProvider
import org.finos.vuu.viewport.ViewPortRange

class VirtualizedSessionTable(clientSessionId: ClientSessionId, sessionTableDef: SessionTableDef, joinTableProvider: JoinTableProvider)(implicit metrics: MetricsProvider, clock: Clock) extends SimpleSessionDataTable(clientSessionId, sessionTableDef, joinTableProvider) {
class VirtualizedSessionTable(clientSessionId: ClientSessionId,
sessionTableDef: SessionTableDef,
joinTableProvider: JoinTableProvider,
val cacheSize: Int = 10_000)
(implicit metrics: MetricsProvider, clock: Clock) extends InMemSessionDataTable(clientSessionId, sessionTableDef, joinTableProvider) with StrictLogging {

@volatile private var pendingRange: Option[VirtualizedRange] = None
@volatile private var pendingData: Option[VirtualizedSessionTableData] = None
@volatile private var dataSetSize: Int = 0
@volatile private var range = VirtualizedRange(0, 0)

@volatile private var theData: Option[VirtualizedSessionTableData] = None
@volatile private var range: Option[VirtualizedRange] = None
override def primaryKeys: TablePrimaryKeys = super.primaryKeys

def withBatch(range: VirtualizedRange, block: (VirtualizedSessionTable) => Unit): Unit = {
startBatch(range)
block.apply(this)
endBatch()
override protected def createDataTableData(): TableData = {
new VirtualizedSessionTableData(cacheSize)
}

def startBatch(range: VirtualizedRange): Unit = {
pendingRange = Some(range)
pendingData = Some(new VirtualizedSessionTableData())
def processUpdateForIndex(index: Int, rowKey: String, rowData: RowWithData, timeStamp: Long): Unit = {
if(isInCurrentRange(index)){
data.setKeyAt(index, rowKey)
super.processUpdate(rowKey, rowData, timeStamp)
}
}
def endBatch(): Unit = {
range = pendingRange
pendingRange = None
theData = pendingData
pendingData = None

def processDeleteForIndex(index: Int, rowKey: String, rowData: RowWithData, timeStamp: Long): Unit = {
super.processUpdate(rowKey, rowData, timeStamp)
}
def length: Int = range match {
case Some(r) => r.size
case None => 0

/**
* Set the total data set size after gathering the results.
*
* @param size
*/
def setSize(size: Int): Unit = {
dataSetSize = size
this.data match {
case virtData: VirtualizedSessionTableData => virtData.setLength(size)
case _ =>
logger.error("Trying to set range on non-virtualized data, something has gone bad.")
}
}
override def processUpdate(rowKey: String, rowData: RowWithData, timeStamp: Long): Unit = super.processUpdate(rowKey, rowData, timeStamp)

override def processDelete(rowKey: String): Unit = super.processDelete(rowKey)

def setRange(range: VirtualizedRange): Unit = {
this.range = range
this.data match {
case virtData: VirtualizedSessionTableData => virtData.setRangeForKeys(range)
case _ =>
logger.error("Trying to set range on non-virtualized data, something has gone bad.")
}
}

private def isInCurrentRange(index: Int): Boolean = {
range.contains(index)
}

override def primaryKeys: TablePrimaryKeys = super.primaryKeys
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,48 @@
package org.finos.vuu.table.virtualized

import org.finos.toolbox.collection.array.ImmutableArray
import org.finos.vuu.core.table.{RowData, RowWithData, TableData, TablePrimaryKeys}
import org.finos.toolbox.collection.window.MovingWindow
import org.finos.vuu.core.table.{EmptyRowData, RowData, RowWithData, TableData, TablePrimaryKeys}

class VirtualizedSessionTableData extends TableData {
class VirtualizedSessionTableData(cacheSize: Int) extends TableData {

private val primaryKeys: VirtualizedTableKeys = VirtualizedTableKeys(VirtualizedRange(0, 0, 0), Array())
final val rowCache: RollingCache[String, RowData] = RowDataCache(cacheSize)
final val keysWindow: MovingWindow[String] = RollingKeysWindow(cacheSize)

override def dataByKey(key: String): RowData = ???
@volatile var length: Int = 0

override def update(key: String, update: RowWithData): TableData = ???
override def dataByKey(key: String): RowData = {
rowCache.get(key) match {
case Some(row) => row
case None => EmptyRowData
}
}

override def delete(key: String): TableData = ???
override def update(key: String, update: RowWithData): TableData = {
rowCache.put(key, update)
this
}

override def deleteAll(): TableData = ???
override def primaryKeyValues: TablePrimaryKeys = ???
override def delete(key: String): TableData = {
rowCache.remove(key)
this
}

override def deleteAll(): TableData = {
rowCache.removeAll()
this
}
override def setKeyAt(index: Int, key: String): Unit = {
keysWindow.setAtIndex(index, key)
}

def setRangeForKeys(range: VirtualizedRange): Unit = {
keysWindow.setRange(range.from, range.to)
}

def setLength(length: Int): Unit = {
this.length = length
}

override def primaryKeyValues: TablePrimaryKeys = VirtualizedTableKeys(window = keysWindow, dataSize = length)
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
package org.finos.vuu.table.virtualized

import org.finos.toolbox.collection.window.ArrayBackedMovingWindow
import org.finos.toolbox.collection.window.MovingWindow
import org.finos.vuu.core.table.TablePrimaryKeys

case class VirtualizedTableKeys(range: VirtualizedRange, data: Array[String], cacheSize: Int = 10_000) extends TablePrimaryKeys{

private val window = new ArrayBackedMovingWindow[String](10_000)

window.setRange(range.from, range.to)

(range.from until range.to).foreach(i => window.setAtIndex(i, data(i)))

def length: Int = range.size
case class VirtualizedTableKeys(window: MovingWindow[String], dataSize: Int) extends TablePrimaryKeys{
override def set(index: Int, key: String): TablePrimaryKeys = throw new Exception("Cannot mutate virtualized keys")
def length: Int = dataSize
def getAtIndex(index:Int): Option[String] = window.getAtIndex(index)
override def add(key: String): TablePrimaryKeys = ???
override def +(key: String): TablePrimaryKeys = ???
override def remove(key: String): TablePrimaryKeys = ???
override def -(key: String): TablePrimaryKeys = ???
override def +(key: String): TablePrimaryKeys = throw new Exception("Cannot mutate virtualized keys")
override def remove(key: String): TablePrimaryKeys = throw new Exception("Cannot mutate virtualized keys")
override def -(key: String): TablePrimaryKeys = throw new Exception("Cannot mutate virtualized keys")
override def sliceTableKeys(from: Int, until: Int): TablePrimaryKeys = ???
override def get(index: Int): String = ???
override def iterator: Iterator[String] = ???
override def get(index: Int): String = window.getAtIndex(index) match {
case Some(s) => s
case None => null
}
override def iterator: Iterator[String] = window.iterator
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.finos.vuu.table.virtualized.cache

import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification}
import com.typesafe.scalalogging.StrictLogging
import org.finos.vuu.core.table.RowData
import org.finos.vuu.table.virtualized.RollingCache

class GuavaRollingRowDataCache(val cacheSize: Int) extends RollingCache[String, RowData] with StrictLogging {

val cache: Cache[String, RowData] = CacheBuilder.newBuilder()
.maximumSize(cacheSize)
.removalListener((notification: RemovalNotification[String, RowData]) => logger.info(s"Removing rowCache key: ${notification.getKey} value:${notification.getValue}"))
.build()

override def put(key: String, v: RowData): Unit = {
cache.put(key, v)
}

override def get(key: String): Option[RowData] = {
Option(cache.getIfPresent(key))
}
override def removeAll(): Unit = cache.invalidateAll()

override def remove(key: String): Unit = cache.invalidate(key)
}
Loading

0 comments on commit f6480a4

Please sign in to comment.