Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
b6c7cd3
app.scala test from data frame to graph
YaphetKG Aug 28, 2020
846ccef
first pass of kgx to graph parsing
YaphetKG Sep 10, 2020
661841e
path edits
YaphetKG Sep 14, 2020
43b0ca3
cypher timings
YaphetKG Sep 15, 2020
9e47308
caching node and edge tables once read into dataframe
Sep 21, 2020
c34a5b6
git ignore
YaphetKG Sep 25, 2020
8981ab9
conveting t2 to sbt based project, some dir moving and more structure…
YaphetKG Sep 25, 2020
5763270
Merge branch 'test-time' of https://github.com/YaphetKG/t2 into test-…
YaphetKG Sep 25, 2020
a436af6
removing kgx dependency
YaphetKG Sep 25, 2020
bc3e628
kgx meta downloader back to downloading metadata
YaphetKG Sep 25, 2020
8e9ab59
read me edits
YaphetKG Sep 25, 2020
6838fc1
Merge pull request #1 from YaphetKG/test-time
YaphetKG Sep 25, 2020
6bacaf4
Create scala.yml
YaphetKG Sep 25, 2020
c61c38e
adding programmatic session building and testing submit
YaphetKG Oct 2, 2020
cdc4451
play framework wrapping B)
YaphetKG Oct 6, 2020
0050cd2
reverting test model in kgxmetadata
YaphetKG Oct 6, 2020
bc66c0a
more config
YaphetKG Oct 6, 2020
4c7be0e
Merge pull request #3 from YaphetKG/programmatic-session
YaphetKG Oct 7, 2020
2084843
testing out spark
YaphetKG Oct 8, 2020
ea0612b
removing checkpoint
YaphetKG Oct 9, 2020
46bf73a
removing broadcast
YaphetKG Oct 9, 2020
8ca79ed
JSON Parsing to match Neo4j http output
YaphetKG Oct 16, 2020
02ca22a
driver mem and cores conf
YaphetKG Oct 16, 2020
7b8c1a4
Merge pull request #4 from YaphetKG/singleton
YaphetKG Oct 16, 2020
d30af24
start web app script and readme
YaphetKG Oct 16, 2020
9a79b30
Merge pull request #5 from YaphetKG/singleton
YaphetKG Oct 16, 2020
194fe9f
Update README.md
YaphetKG Oct 16, 2020
e0e3c8a
Update t2
YaphetKG Oct 16, 2020
6690d23
some performance testing
YaphetKG Oct 19, 2020
9203b67
Merge branch 'master' into jsonPerformance
YaphetKG Oct 19, 2020
ea0e969
adding k8s artifacts and config
YaphetKG Oct 19, 2020
0e458bb
adding tini to build
YaphetKG Oct 23, 2020
bf30a91
testing out driver host parameter for k8s
YaphetKG Oct 23, 2020
c11ca28
parameterize namespace and image for k8s
YaphetKG Oct 23, 2020
996a83e
namespace typo readme
YaphetKG Oct 23, 2020
98ba086
parameterizing the spark master
YaphetKG Oct 23, 2020
8d52091
line ending
YaphetKG Oct 23, 2020
c413ffe
scala version fix, and remove scala auto inclusion to avoid conflicts…
YaphetKG Oct 29, 2020
7cb0f83
some basic k8s working version
YaphetKG Oct 29, 2020
ff290d7
Update README.md
YaphetKG Oct 29, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .github/workflows/scala.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: Scala CI

on:
push:
branches: [ master ]
pull_request:
branches: [ master ]

jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8
- name: Run tests
run: sbt test
11 changes: 11 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
.cache-main
.classpath
.project
.settings
.idea
.cache-tests
RUNNING_PID
catalog-v001.xml
project/project
project/target
target
50 changes: 28 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,46 @@

## Install

On osx, running JDK 1.8.0
On (linux/OSX) JDK 1.8.0

Untar in ~/app and rename to spark and hadoop

[spark](https://www.apache.org/dyn/closer.lua/spark/spark-2.4.4/spark-2.4.4-bin-without-hadoop-scala-2.12.tgz)

[hadoop](https://archive.apache.org/dist/hadoop/common/hadoop-2.6.5/hadoop-2.6.5.tar.gz)

Create a python3.7+ virtual environment
## Building
```
$ bin/t2 build jar
```

Clone [kgx](https://github.com/NCATS-Tangerine/kgx) inside this repo, overwriting exixting kgx directory.
## Running

Then restore kgx/kgx/transformer.py with `git checkout kgx/kgx/transformer.py`

## Running
#### Settings

Import some robokop data via kgx
```
t2 robokop import
```
In a shell:
```
t2 spark master start
```
In another shell
```bash
export ALLOWED_HOST_NAME=localhost
export APP_SECRET=superSecretkey
export KGX_VERSION=v0.1
export EXECUTOR_CORES=1
export DRIVER_CORES=1
export EXECUTOR_MEM=4g
export DRIVER_MEM=2g
export SPARK_SCRATCH_DIR=/tmp
export SPARK_DEPLOY_MODE=client
export SPARK_DRIVER_HOST=locahost
export SPARK_KUBERNETES_NAMESPACE=default
export SPARK_MASTER=spark://localhost:7077
export SPARK_KUBERNETES_CONTAINER_IMAGE=renciorg/t2:spark-2.4.4-2.12.8
export SPARK_DRIVER_MAXRESULTSIZE=2G
```
t2 spark worker start
```
In another shell
```
t2 spark shell
```
Then, at the scala prompt:

#### Starting services

```
:load load.scala
bin/t2 spark master start
bin/t2 spark worker start
bin/t2 runWebServerLocal
```

36 changes: 0 additions & 36 deletions app.scala

This file was deleted.

48 changes: 48 additions & 0 deletions app/controllers/CypherQueryController.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package controllers

import javax.inject.{Inject, Singleton}
import org.renci.t2.core.Core
import org.renci.t2.util.{KGXMetaData, Version}
import play.api.mvc.{AnyContent, BaseController, ControllerComponents, Request}
import models.CypherQuery
import play.api.libs.json.Reads._
import play.api.libs.json._
import services.T2Service

/**
* This controller creates an `Action` to handle HTTP requests to the
* application's home page.
*/
@Singleton
class CypherQueryController @Inject()(val controllerComponents: ControllerComponents, t2Service: T2Service) extends BaseController {

implicit val queryReads: Reads[CypherQuery] = ((JsPath \ "query").read[String])map(CypherQuery(_))

def runQuery() = Action(parse.json) { request =>
val queryBody = request.body.validate[CypherQuery]
queryBody.fold(
errors => {
BadRequest(Json.obj("message" -> JsError.toJson(errors)))
},
query => {
val res = t2Service.runCypher(cypher=query.query)
Ok(res)
}
)
}

def runQueryOld() = Action (parse.json) { request =>
val queryBody = request.body.validate[CypherQuery]
queryBody.fold(
errors => {
BadRequest(Json.obj("message" -> JsError.toJson(errors)))
},
query => {
val res = t2Service.runCypherOld(cypher=query.query)
Ok(res)
}
)
}


}
39 changes: 39 additions & 0 deletions app/controllers/IndexController.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package controllers

import javax.inject._
import play.api._
import play.api.mvc._
import play.api.libs.json.Reads._
import play.api.libs.json._
import org.renci.t2.core.Core
import org.renci.t2.util.{KGXMetaData, Version}
import play.api.libs.json.JsValue
import services.T2Service
/**
* This controller creates an `Action` to handle HTTP requests to the
* application's home page.
*/
@Singleton
class IndexController @Inject()(val controllerComponents: ControllerComponents, t2Service: T2Service) extends BaseController {

/**
* Create an Action to render an HTML page.
*
* The configuration in the `routes` file means that this method
* will be called when the application receives a `GET` request with
* a path of `/`.
*/
def index() = Action { implicit request: Request[AnyContent] =>
val kgxServerRoot = "https://stars.renci.org/var/kgx_data" //Config.getConfig("kgxFiles.host")
val kgxFilesGrabber: KGXMetaData = new KGXMetaData(kgxServerRoot)
val versionMetadata: Version = kgxFilesGrabber.getVersionData("v0.1")
Ok(versionMetadata.version)
}

def runCount() = Action { implicit request: Request[AnyContent] =>
val core:Core = t2Service.initializeT2Core()
val graph = core.makeGraph("v0.1")
core.runCypherAndShow(cypherQuery = "MATCH (c) return count(c)", graph = graph)
Ok("count is done")
}
}
5 changes: 5 additions & 0 deletions app/models/CypherQuery.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package models


case class CypherQuery(query: String)

23 changes: 23 additions & 0 deletions app/org/renci/t2/app/App.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.renci.t2.app

import org.apache.spark.SparkConf
import org.renci.t2.core.Core


object App {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf()
sparkConf.setMaster(
"spark://localhost:7077"
).setAppName(
"t2-cli"
).set(
"spark.executor.memory", "4g"
)
val core:Core = new Core(sparkConf, "https://stars.renci.org/var/kgx_data")
val graph = core.makeGraph("v0.1")
core.runCypherAndShow(cypherQuery = "MATCH (c) return count(c)", graph = graph)

}

}
90 changes: 90 additions & 0 deletions app/org/renci/t2/core/Core.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package org.renci.t2.core

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.opencypher.morpheus.api.MorpheusSession
import org.opencypher.morpheus.api.io.MorpheusElementTable
import org.opencypher.morpheus.impl.table.SparkTable
import org.opencypher.okapi.relational.api.graph.RelationalCypherGraph
import org.renci.t2.parser.{KGXEdgesFileReader, KGXNodesFileReader}
import org.renci.t2.util.{KGXMetaData, Version, logger, Neo4jJsonCaster}



class Core(sparkConf: SparkConf , kgxFilesAddress: String) {

private val morpheusSession: MorpheusSession = this.createMorpheusSession(sparkConf)

def makeGraph(version: String):RelationalCypherGraph[SparkTable.DataFrameTable] = {
val kgxServerRoot = this.kgxFilesAddress
val kgxFilesGrabber: KGXMetaData = new KGXMetaData(kgxServerRoot)
val versionMetadata: Version = kgxFilesGrabber.getVersionData(version)
var allNodeTables: Seq[MorpheusElementTable] = Seq[MorpheusElementTable]()
var allEdgeTables: Seq[MorpheusElementTable] = Seq[MorpheusElementTable]()
for (nodeFileName <- versionMetadata.nodeFiles) {
val fileUrl = kgxFilesGrabber.serverRootUrl + "/" + nodeFileName
logger.debug("Grabbing node file from: " + fileUrl)
val startTime = System.currentTimeMillis()
val nodeElementTables = KGXNodesFileReader.createElementTables(fileUrl, this.morpheusSession)
val timeDiff = System.currentTimeMillis() - startTime
logger.debug("Converting " + fileUrl + " to morpheus table took : " + timeDiff + " (ms)")
// Add the node elements for that file into the
allNodeTables = allNodeTables ++ nodeElementTables
}
for (edgeFileName <- versionMetadata.edgeFiles) {
val fileUrl = kgxFilesGrabber.serverRootUrl + "/" + edgeFileName
logger.debug("Grabbing node file from: " + fileUrl)
val startTime = System.currentTimeMillis()
val edgeElementTables = KGXEdgesFileReader.createElementTables(fileUrl, this.morpheusSession)
val timeDiff = System.currentTimeMillis() - startTime
logger.debug("Converting " + fileUrl + " to morpheus table took : " + timeDiff + " (ms)")
allEdgeTables = allEdgeTables ++ edgeElementTables
}
val allElements: Seq[MorpheusElementTable] = allNodeTables ++ allEdgeTables
val graph = this.morpheusSession.readFrom(allElements(0), allElements.slice(1, allElements.length): _*)
graph.cache()
graph
}

def runCypherAndShow(cypherQuery: String, graph: RelationalCypherGraph[SparkTable.DataFrameTable]) : Unit = {
val start = System.currentTimeMillis()
graph.cypher(cypherQuery).records.table.df.show
logger.info("Running query: ")
logger.info(cypherQuery)
logger.info("took ")
logger.info((System.currentTimeMillis() - start).toString)
logger.info(" ms")
}

def runCypherAndReturnJsonString(cypherQuery: String, graph: RelationalCypherGraph[SparkTable.DataFrameTable]) : String = {
val start = System.currentTimeMillis()
val records = graph.cypher(cypherQuery).records
val response = Neo4jJsonCaster.convertRecordsToJson(records)
logger.info("Running query: ")
logger.info(cypherQuery)
logger.info("took ")
logger.info((System.currentTimeMillis() - start).toString)
logger.info(" ms")
response
}

def createMorpheusSession(sparkConf: SparkConf): MorpheusSession = {
val sparkSession: SparkSession = SparkSession.builder
.config(sparkConf)
.getOrCreate()
val morpheusSession: MorpheusSession = MorpheusSession.create(sparkSession)
morpheusSession
}

def runAndReturnJSONOld(cypherQuery: String, graph: RelationalCypherGraph[SparkTable.DataFrameTable]): String = {
val start = System.currentTimeMillis()
val response = graph.cypher(cypherQuery).records.table.df.toJSON.collect.mkString("[", "," , "]" )
logger.info("Running query: ")
logger.info(cypherQuery)
logger.info("took ")
logger.info((System.currentTimeMillis() - start).toString)
logger.info(" ms")
response
}

}
47 changes: 47 additions & 0 deletions app/org/renci/t2/parser/KGXEdgesFileReader.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.renci.t2.parser

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col
import org.opencypher.morpheus.api.MorpheusSession
import org.opencypher.morpheus.api.io.MorpheusElementTable
import org.opencypher.okapi.api.io.conversion.{ElementMapping, RelationshipMappingBuilder}

object KGXEdgesFileReader extends KGXFileReader {
override def createElementTables(filePath: String, session: MorpheusSession): Seq[MorpheusElementTable] = {
/**
*
*/
val edgeDF: DataFrame = this.convertKGXFileToDataFrame(filePath, session=session)
val edgeTypes = edgeDF.select(col("edge_label")).distinct.collect()
var elementTables = Seq[MorpheusElementTable]()
for (edgeType <- edgeTypes) {
val edgeTypeStr: String = edgeType.get(0).asInstanceOf[String]
var filtered_edges = edgeDF.filter(edgeDF.col("edge_label") === edgeTypeStr)
val edgesTableSchema = filtered_edges.schema

// Morpheus converts source target and id keys to Long type.
// To avoid conversion of the original we will dup these columns.
filtered_edges = filtered_edges
.withColumn("_source_id", filtered_edges.col("subject"))
.withColumn("_target_id", filtered_edges.col("object"))
.withColumn("_id", filtered_edges.col("id"))

//
val relationshipMapping: ElementMapping = RelationshipMappingBuilder.create(
sourceIdKey = "_id",
sourceStartNodeKey = "_source_id",
sourceEndNodeKey = "_target_id",
relType = edgeTypeStr,
properties = edgesTableSchema.map(property => property.name).toSet[String]
)
filtered_edges.cache()
filtered_edges.sort()
filtered_edges.count()
val edgeTable = MorpheusElementTable.create(relationshipMapping, filtered_edges)

elementTables = elementTables ++ Seq(edgeTable)
}
// Give back element tables
elementTables
}
}
Loading