Skip to content

Commit

Permalink
#66 Added first implementation of LuceneTable
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisjstevo committed Apr 9, 2021
1 parent c29abb0 commit 0e7c2ab
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 21 deletions.
21 changes: 0 additions & 21 deletions SAMPLE-MSG.md

This file was deleted.

17 changes: 17 additions & 0 deletions vuu/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
<version>0.1-SNAPSHOT</version>
</parent>

<properties>
<lucene.version>8.8.1</lucene.version>
</properties>

<artifactId>vuu</artifactId>
<name>vuu</name>
<inceptionYear>2014</inceptionYear>
Expand All @@ -20,6 +24,19 @@
<version>0.1-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>${lucene.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.lucene/lucene-queryparser -->
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-queryparser</artifactId>
<version>${lucene.version}</version>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
Expand Down
13 changes: 13 additions & 0 deletions vuu/src/main/scala/io/venuu/vuu/api/TableDef.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ object Fields {
val All = List("*")
}



object VisualLinks {
def apply(link: Link*): VisualLinks = {
new VisualLinks(link.toList)
Expand Down Expand Up @@ -71,7 +73,12 @@ case class Link(fromColumn: String, toTable: String, toColumn: String)
case class VisualLinks(links: List[Link])
case class Indices(indices: Index*)
case class Index(column: String)
case class IndexFilePath(path: String)

trait CleanupPolicy

object DeleteIndexOnShutdown extends CleanupPolicy
object PreserveIndexOnShutdown extends CleanupPolicy

case class AvailableViewPortVisualLink(parentVpId: String, link: Link){
override def toString: String = "(" + parentVpId.split("-").last + ")" + link.fromColumn + " to " + link.toTable + "." + link.toColumn
Expand Down Expand Up @@ -100,6 +107,12 @@ class TableDef(val name: String, val keyField: String, val columns: Array[Column

}

class LuceneTableDef(name: String, keyField: String, columns: Array[Column], joinFields: Seq[String],
autosubscribe: Boolean = false, links: VisualLinks = VisualLinks(),
val indexPath: IndexFilePath, val cleanupPolicy: CleanupPolicy)
extends TableDef(name, keyField, columns, joinFields, false, links, Indices()){
}

trait JoinType

object LeftOuterJoin extends JoinType
Expand Down
134 changes: 134 additions & 0 deletions vuu/src/main/scala/io/venuu/vuu/core/table/LuceneTable.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package io.venuu.vuu.core.table
import io.venuu.toolbox.collection.array.ImmutableArray
import io.venuu.toolbox.jmx.MetricsProvider
import io.venuu.toolbox.lifecycle.LifecycleContainer
import io.venuu.vuu.api.{LuceneTableDef, TableDef}
import io.venuu.vuu.core.index.IndexedField
import io.venuu.vuu.provider.JoinTableProvider
import io.venuu.vuu.viewport.RowProcessor
import org.apache.lucene.analysis.standard.StandardAnalyzer
import org.apache.lucene.document.{Document, Field, TextField}
import org.apache.lucene.index.{DirectoryReader, IndexWriter, IndexWriterConfig, Term}
import org.apache.lucene.search.{IndexSearcher, MatchAllDocsQuery, TermQuery}
import org.apache.lucene.store.MMapDirectory

import java.nio.file.Paths

class LuceneTableData(val tableDef: LuceneTableDef){

private final val rowKeyFieldName = "rowKey"
private final val lastUpdateFieldName = "lastUpdate"

private val index = new MMapDirectory(Paths.get(tableDef.indexPath.path))

private val analyzer = new StandardAnalyzer
private val config = new IndexWriterConfig(analyzer)
private val indexWriter = new IndexWriter(index, config)

private val commitEveryCount = 20
@volatile
private var updateCount: Long = 0

//lazy val reader = DirectoryReader.open(index)
//lazy val searcher = new IndexSearcher(reader)

private def rowUpdateAsDocument(rowKey: String, rowUpdate: RowWithData, timeStamp: Long): Unit = {

val document = new Document

//val config = new IndexWriterConfig(analyzer)
//val indexWriter = new IndexWriter(index, config)

document.add(new TextField(rowKeyFieldName, rowKey, Field.Store.YES))
document.add(new TextField(lastUpdateFieldName, timeStamp.toString, Field.Store.YES))

tableDef.columns.foreach( c => {
rowUpdate.get(c) match {
case null =>
case datum: Any =>
val asString = datum.toString
document.add(new TextField(c.name, asString, Field.Store.YES))
}

})

indexWriter.addDocument(document)
if(commitEveryCount % commitEveryCount == 0){
indexWriter.flush()
indexWriter.commit()
}
}

def processUpdate(rowKey: String, rowUpdate: RowWithData, timeStamp: Long): Unit = {
updateCount += 1
rowUpdateAsDocument(rowKey, rowUpdate, timeStamp)
}
def processDelete(rowKey: String): Unit = {
// indexWriter.deleteDocuments(new Term(rowKeyFieldName, rowKey))
// indexWriter.flush()
// indexWriter.commit()
}

private def toTerm(field:String, value:String): Term = {
new Term(field, value)
}

def loadDocument(rowKey: String): Document = {
val reader = DirectoryReader.open(index)
val searcher = new IndexSearcher(reader)
val query = new TermQuery(toTerm(rowKeyFieldName, rowKey))
//val query = new MatchAllDocsQuery

val topDocs = searcher.search(query, 1)
if(topDocs.totalHits.value == 0l){
null
}else{
searcher.doc(topDocs.scoreDocs(0).doc)
}
}

}

class LuceneTable(val tableDef: LuceneTableDef, val joinProvider: JoinTableProvider)(implicit val metrics: MetricsProvider, val lifecycle: LifecycleContainer) extends DataTable with KeyedObservableHelper[RowKeyUpdate] {

private val luceneData = new LuceneTableData(tableDef)

override def indexForColumn(column: Column): Option[IndexedField[_]] = ???

override def getTableDef: TableDef = tableDef

override def processUpdate(rowKey: String, rowUpdate: RowWithData, timeStamp: Long): Unit = {
luceneData.processUpdate(rowKey, rowUpdate, timeStamp)
}

override def processDelete(rowKey: String): Unit = {
luceneData.processDelete(rowKey)
}

override def name: String = tableDef.name

override def notifyListeners(rowKey: String, isDelete: Boolean = false) = {
getObserversByKey(rowKey).foreach(obs => {
obs.onUpdate(new RowKeyUpdate(rowKey, this, isDelete))
})
}

override def linkableName: String = tableDef.name

override def readRow(key: String, columns: List[String], processor: RowProcessor): Unit = ???

override def primaryKeys: ImmutableArray[String] = ???

override def pullRow(key: String, columns: List[Column]): RowData = ???

override def pullRow(key: String): RowData = ???

override def pullRowAsArray(key: String, columns: List[Column]): Array[Any] = {
luceneData.loadDocument(key) match {
case null => Array()
case doc: Document =>
columns.map( c => doc.getField(c.name).stringValue() ).toArray
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package io.venuu.vuu.core.table.lucene

import io.venuu.toolbox.jmx.MetricsProviderImpl
import io.venuu.toolbox.lifecycle.LifecycleContainer
import io.venuu.toolbox.time.TestFriendlyClock
import io.venuu.toolbox.time.TimeIt.timeIt
import io.venuu.vuu.api.{DeleteIndexOnShutdown, IndexFilePath, LuceneTableDef, TableDef, VisualLinks}
import io.venuu.vuu.core.table.{Columns, LuceneTable, RowWithData}
import io.venuu.vuu.provider.JoinTableProviderImpl
import io.venuu.vuu.viewport.TestTimeStamp
import org.apache.lucene.analysis.standard.StandardAnalyzer
import org.apache.lucene.document.{Document, DoublePoint, Field, IntPoint, LongPoint, TextField}
import org.apache.lucene.index.{DirectoryReader, IndexWriter, IndexWriterConfig, Term}
import org.apache.lucene.search.IndexSearcher
import org.apache.lucene.store.MMapDirectory
import org.scalatest.{BeforeAndAfterAll, Ignore}
import org.scalatest.featurespec.AnyFeatureSpec
import org.scalatest.matchers.should.Matchers

import java.nio.file.Paths

// https://www.xspdf.com/help/52563961.html

//https://stackoverflow.com/questions/15728569/multiple-field-query-handling-in-lucene#15730250
//https://lucenetutorial.com/lucene-query-syntax.html
//http://makble.com/lucene-field-stringfield-vs-textfield


class LuceneTableTest extends AnyFeatureSpec with Matchers with BeforeAndAfterAll {

private final val indexPath = "target/test/LuceneTableTest"

override protected def beforeAll() = {
Paths.get(indexPath).toAbsolutePath.toFile.mkdir()
}

override protected def afterAll(): Unit = {
Paths.get(indexPath).toAbsolutePath.toFile.deleteOnExit()
}

Feature("check creating indexed tables in Lucene"){

Scenario("Check creation and retrieval of data"){

implicit val clock = new TestFriendlyClock(TestTimeStamp.EPOCH_DEFAULT)
implicit val lifeCycle = new LifecycleContainer
implicit val metrics = new MetricsProviderImpl

val joinTableProvider = new JoinTableProviderImpl()

val tableDef = new LuceneTableDef("executions", "executionId",
Columns.fromNames("executionId:String", "ric:String", "quantity:Int", "price:Double", "lastExchange:String"),
Seq("executionId"), false, VisualLinks(), IndexFilePath("target/test"), DeleteIndexOnShutdown)

val table = new LuceneTable(tableDef, joinTableProvider)

val rowCount = 1000

val (millis, _ ) = timeIt {
(0 to rowCount).foreach(i => {
val execId = "exec" + i
val ric = "FOO"
val quantity = i * 100
val price = (100 - i).toDouble
val lastExchange = "XLON"

table.processUpdate(execId, RowWithData(execId, Map(
"executionId" -> execId,
"ric" -> ric,
"quantity" -> quantity,
"price" -> price,
"lastExchange" -> lastExchange
)), clock.now())
})
}

val costPerRow = (millis.toDouble / rowCount.toDouble)

val ratePerSecond = (1000).toDouble / costPerRow.toDouble
val ratePerMinute = (1000 * 60).toDouble / costPerRow.toDouble

println(s"time for $rowCount: " + millis + " millis, per row cost: " + costPerRow + " per sec: " + ratePerSecond + " per min:" + ratePerMinute)

val array = table.pullRowAsArray("exec0", tableDef.columns.toList)

array.size should equal(5)
}

ignore("Check Creation of Index Table"){

val index = new MMapDirectory(Paths.get("target/lucene-index"))
//val index = new RAMDirectory()

val analyzer = new StandardAnalyzer
val config = new IndexWriterConfig(analyzer)
val indexWriter = new IndexWriter(index, config)

var document = new Document
document.add(new TextField("id", "100", Field.Store.YES))
document.add(new LongPoint("id_as_int", 100l))
document.add(new TextField("name", "John Doe", Field.Store.YES))
document.add(new TextField("address", "80 Summer Hill", Field.Store.YES))
document.add(new IntPoint("age", 10))
document.add(new DoublePoint("price", 101.23))
indexWriter.updateDocument(new Term("id", "100"), document)

val document2 = new Document
document2.add(new TextField("id", "101", Field.Store.YES))
document.add(new LongPoint("id_as_int", 101l))
document2.add(new TextField("name", "Chris Stevenson", Field.Store.YES))
document2.add(new TextField("address", "20 Bunker Road", Field.Store.YES))
document2.add(new IntPoint("age", 20))
document2.add(new DoublePoint("price", 50.01))
indexWriter.updateDocument(new Term("id", "101"), document2)
//indexWriter.addDocument(document2)

val document3 = new Document
document3.add(new TextField("id", "102", Field.Store.YES))
document.add(new LongPoint("id_as_int", 102l))
document3.add(new TextField("name", "Steve The Hero", Field.Store.YES))
document3.add(new TextField("address", "50 Rompile Road", Field.Store.YES))
document3.add(new IntPoint("age", 30))
document3.add(new DoublePoint("price", 34.01))
indexWriter.updateDocument(new Term("id", "102"), document3)
//indexWriter.addDocument(document3)

indexWriter.flush()
indexWriter.close()

val reader = DirectoryReader.open(index)
val searcher = new IndexSearcher(reader)

//val query = new TermQuery(new Term("name", "chris"))

val query = IntPoint.newExactQuery("age", 20)
//val query = new MatchAllDocsQuery

val topDocs = searcher.search(query, 10)

println("TopHits:" + topDocs.totalHits.value )

// Display addresses
for (scoreDoc <- topDocs.scoreDocs) {
document = searcher.doc(scoreDoc.doc)
System.out.println("record=>" + document.get("name") + " " + document.get("address")
+ " " + document.get("age") + " " + document.get("id"))
}

//val textQuery = new QueryParser("name", analyzer).parse("chris OR steve")
//val textQuery = new QueryParser("age", analyzer).parse("age:>10")

val rangeQuery = IntPoint.newRangeQuery("age", 10, 30)

val topDocs2 = searcher.search(rangeQuery, 10)

println("TopHits:" + topDocs2.totalHits.value )

// Display addresses
for (scoreDoc <- topDocs2.scoreDocs) {
document = searcher.doc(scoreDoc.doc)
System.out.println("next record=>" + document.get("name") + " " + document.get("address")
+ " " + document.get("age") + " " + document.get("id"))
}

}
}


}

0 comments on commit 0e7c2ab

Please sign in to comment.