Skip to content

Commit fd47ede

Browse files
authored
Merge pull request #32 from SOFTNETWORK-APP/feature/licensing
- add support for watchers - add support for enrich policies - add transform model - add materialized view model - add licensing - add REPL client
2 parents 4c70892 + c7f3d6a commit fd47ede

336 files changed

Lines changed: 29567 additions & 3456 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 127 additions & 1601 deletions
Large diffs are not rendered by default.

bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/ElasticAggregation.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package app.softnetwork.elastic.sql.bridge
1818

19-
import app.softnetwork.elastic.sql.PainlessContext
19+
import app.softnetwork.elastic.sql.{PainlessContext, PainlessContextType}
2020
import app.softnetwork.elastic.sql.`type`.SQLTemporal
2121
import app.softnetwork.elastic.sql.query.{
2222
Asc,
@@ -100,7 +100,10 @@ object ElasticAggregation {
100100
having: Option[Criteria],
101101
bucketsDirection: Map[String, SortOrder],
102102
allAggregations: Map[String, SQLAggregation]
103-
)(implicit timestamp: Long): ElasticAggregation = {
103+
)(implicit
104+
timestamp: Long,
105+
contextType: PainlessContextType
106+
): ElasticAggregation = {
104107
import sqlAgg._
105108
val sourceField = identifier.path
106109

@@ -153,14 +156,14 @@ object ElasticAggregation {
153156
buildScript: (String, Script) => Aggregation
154157
): Aggregation = {
155158
if (transformFuncs.nonEmpty) {
156-
val context = PainlessContext()
159+
val context = PainlessContext(context = contextType)
157160
val scriptSrc = identifier.painless(Some(context))
158161
val script = now(Script(s"$context$scriptSrc").lang("painless"))
159162
buildScript(aggName, script)
160163
} else {
161164
aggType match {
162165
case th: WindowFunction if th.shouldBeScripted =>
163-
val context = PainlessContext()
166+
val context = PainlessContext(context = contextType)
164167
val scriptSrc = th.identifier.painless(Some(context))
165168
val script = now(Script(s"$context$scriptSrc").lang("painless"))
166169
buildScript(aggName, script)
@@ -348,7 +351,10 @@ object ElasticAggregation {
348351
having: Option[Criteria],
349352
nested: Option[NestedElement],
350353
allElasticAggregations: Seq[ElasticAggregation]
351-
)(implicit timestamp: Long): Seq[Aggregation] = {
354+
)(implicit
355+
timestamp: Long,
356+
contextType: PainlessContextType = PainlessContextType.Query
357+
): Seq[Aggregation] = {
352358
for (tree <- buckets) yield {
353359
val treeNodes =
354360
tree.sortBy(_.level).reverse.foldLeft(Seq.empty[NodeAggregation]) { (current, node) =>
@@ -364,7 +370,7 @@ object ElasticAggregation {
364370

365371
val aggScript =
366372
if (!bucket.isBucketScript && bucket.shouldBeScripted) {
367-
val context = PainlessContext()
373+
val context = PainlessContext(context = contextType)
368374
val painless = bucket.painless(Some(context))
369375
Some(now(Script(s"$context$painless").lang("painless")))
370376
} else {

bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/ElasticBridge.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package app.softnetwork.elastic.sql.bridge
1818

19+
import app.softnetwork.elastic.sql.PainlessContextType
1920
import app.softnetwork.elastic.sql.operator.AND
2021
import app.softnetwork.elastic.sql.query.{
2122
BetweenExpr,
@@ -36,16 +37,16 @@ import app.softnetwork.elastic.sql.query.{
3637
Predicate
3738
}
3839
import com.sksamuel.elastic4s.ElasticApi._
39-
import com.sksamuel.elastic4s.requests.common.FetchSourceContext
4040
import com.sksamuel.elastic4s.requests.searches.queries.{InnerHit, Query}
4141

42-
import scala.annotation.tailrec
43-
4442
case class ElasticBridge(filter: ElasticFilter) {
4543
def query(
4644
innerHitsNames: Set[String] = Set.empty,
4745
currentQuery: Option[ElasticBoolQuery]
48-
)(implicit timestamp: Long): Query = {
46+
)(implicit
47+
timestamp: Long,
48+
contextType: PainlessContextType = PainlessContextType.Query
49+
): Query = {
4950
filter match {
5051
case boolQuery: ElasticBoolQuery =>
5152
import boolQuery._

bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/ElasticCriteria.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616

1717
package app.softnetwork.elastic.sql.bridge
1818

19+
import app.softnetwork.elastic.sql.PainlessContextType
1920
import app.softnetwork.elastic.sql.query.Criteria
2021
import com.sksamuel.elastic4s.requests.searches.queries.Query
2122

2223
case class ElasticCriteria(criteria: Criteria) {
2324

2425
def asQuery(group: Boolean = true, innerHitsNames: Set[String] = Set.empty)(implicit
25-
timestamp: Long
26+
timestamp: Long,
27+
contextType: PainlessContextType = PainlessContextType.Query
2628
): Query = {
2729
val query = criteria.boolQuery.copy(group = group)
2830
query

bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/package.scala

Lines changed: 93 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,16 @@ import app.softnetwork.elastic.sql.`type`.{
2323
SQLTemporal,
2424
SQLVarchar
2525
}
26+
import app.softnetwork.elastic.sql.config.ElasticSqlConfig
2627
import app.softnetwork.elastic.sql.function.aggregate.COUNT
2728
import app.softnetwork.elastic.sql.function.geo.{Distance, Meters}
2829
import app.softnetwork.elastic.sql.operator._
2930
import app.softnetwork.elastic.sql.query._
31+
import com.fasterxml.jackson.databind.JsonNode
32+
import com.fasterxml.jackson.databind.node.NullNode
3033
import com.sksamuel.elastic4s.ElasticApi
3134
import com.sksamuel.elastic4s.ElasticApi._
32-
import com.sksamuel.elastic4s.requests.common.FetchSourceContext
35+
import com.sksamuel.elastic4s.json.JacksonBuilder
3336
import com.sksamuel.elastic4s.requests.script.Script
3437
import com.sksamuel.elastic4s.requests.script.ScriptType.Source
3538
import com.sksamuel.elastic4s.requests.searches.aggs.{
@@ -51,17 +54,30 @@ import scala.language.implicitConversions
5154

5255
package object bridge {
5356

54-
def now(script: Script)(implicit timestamp: Long): Script = {
57+
lazy val sqlConfig: ElasticSqlConfig = ElasticSqlConfig()
58+
59+
def now(script: Script)(implicit
60+
timestamp: Long,
61+
contextType: PainlessContextType = PainlessContextType.Query
62+
): Script = {
5563
if (!script.script.contains("params.__now__")) {
5664
return script
5765
}
58-
script.param("__now__", timestamp)
66+
contextType match {
67+
case PainlessContextType.Query => script.param("__now__", timestamp)
68+
case PainlessContextType.Transform =>
69+
script.param("__now__", sqlConfig.transformLastUpdatedColumnName)
70+
case _ => script
71+
}
5972
}
6073

6174
implicit def requestToNestedFilterAggregation(
6275
request: SingleSearch,
6376
innerHitsName: String
64-
)(implicit timestamp: Long): Option[FilterAggregation] = {
77+
)(implicit
78+
timestamp: Long,
79+
contextType: PainlessContextType = PainlessContextType.Query
80+
): Option[FilterAggregation] = {
6581
val having: Option[Query] =
6682
request.having.flatMap(_.criteria) match {
6783
case Some(f) =>
@@ -137,7 +153,10 @@ package object bridge {
137153

138154
implicit def requestToFilterAggregation(
139155
request: SingleSearch
140-
)(implicit timestamp: Long): Option[FilterAggregation] =
156+
)(implicit
157+
timestamp: Long,
158+
contextType: PainlessContextType = PainlessContextType.Query
159+
): Option[FilterAggregation] =
141160
request.having.flatMap(_.criteria) match {
142161
case Some(f) =>
143162
val boolQuery = Option(ElasticBoolQuery(group = true))
@@ -155,7 +174,10 @@ package object bridge {
155174
implicit def requestToRootAggregations(
156175
request: SingleSearch,
157176
aggregations: Seq[ElasticAggregation]
158-
)(implicit timestamp: Long): Seq[AbstractAggregation] = {
177+
)(implicit
178+
timestamp: Long,
179+
contextType: PainlessContextType = PainlessContextType.Query
180+
): Seq[AbstractAggregation] = {
159181
val notNestedAggregations = aggregations.filterNot(_.nested)
160182

161183
val notNestedBuckets = request.bucketTree.filterNot(_.bucket.nested)
@@ -207,7 +229,10 @@ package object bridge {
207229
implicit def requestToScopedAggregations(
208230
request: SingleSearch,
209231
aggregations: Seq[ElasticAggregation]
210-
)(implicit timestamp: Long): Seq[NestedAggregation] = {
232+
)(implicit
233+
timestamp: Long,
234+
contextType: PainlessContextType = PainlessContextType.Query
235+
): Seq[NestedAggregation] = {
211236
// Group nested aggregations by their nested path
212237
val nestedAggregations: Map[String, Seq[ElasticAggregation]] = aggregations
213238
.filter(_.nested)
@@ -413,7 +438,8 @@ package object bridge {
413438
}
414439

415440
implicit def requestToElasticSearchRequest(request: SingleSearch)(implicit
416-
timestamp: Long
441+
timestamp: Long,
442+
contextType: PainlessContextType = PainlessContextType.Query
417443
): ElasticSearchRequest =
418444
ElasticSearchRequest(
419445
request.sql,
@@ -431,7 +457,10 @@ package object bridge {
431457

432458
implicit def requestToSearchRequest(
433459
request: SingleSearch
434-
)(implicit timestamp: Long): SearchRequest = {
460+
)(implicit
461+
timestamp: Long,
462+
contextType: PainlessContextType = PainlessContextType.Query
463+
): SearchRequest = {
435464
import request._
436465

437466
val aggregations = request.aggregates.map(
@@ -491,7 +520,7 @@ package object bridge {
491520
case Nil => _search
492521
case _ =>
493522
_search scriptfields scriptFields.map { field =>
494-
val context = PainlessContext()
523+
val context = PainlessContext(context = contextType)
495524
val script = field.painless(Some(context))
496525
scriptField(
497526
field.scriptName,
@@ -512,7 +541,7 @@ package object bridge {
512541
case Some(o) if aggregates.isEmpty && buckets.isEmpty =>
513542
_search sortBy o.sorts.map { sort =>
514543
if (sort.isScriptSort) {
515-
val context = PainlessContext()
544+
val context = PainlessContext(context = contextType)
516545
val painless = sort.field.painless(Some(context))
517546
val painlessScript = s"$context$painless"
518547
val script =
@@ -571,7 +600,10 @@ package object bridge {
571600

572601
implicit def requestToMultiSearchRequest(
573602
request: MultiSearch
574-
)(implicit timestamp: Long): MultiSearchRequest = {
603+
)(implicit
604+
timestamp: Long,
605+
contextType: PainlessContextType = PainlessContextType.Query
606+
): MultiSearchRequest = {
575607
MultiSearchRequest(
576608
request.requests.map(implicitly[SearchRequest](_))
577609
)
@@ -582,7 +614,10 @@ package object bridge {
582614
doubleOp: Double => A
583615
): A = n.toEither.fold(longOp, doubleOp)
584616

585-
implicit def expressionToQuery(expression: GenericExpression)(implicit timestamp: Long): Query = {
617+
implicit def expressionToQuery(expression: GenericExpression)(implicit
618+
timestamp: Long,
619+
contextType: PainlessContextType = PainlessContextType.Query
620+
): Query = {
586621
import expression._
587622
if (isAggregation)
588623
return matchAllQuery()
@@ -592,7 +627,7 @@ package object bridge {
592627
case _ => true
593628
}))
594629
) {
595-
val context = PainlessContext()
630+
val context = PainlessContext(context = contextType)
596631
val script = painless(Some(context))
597632
return scriptQuery(
598633
now(Script(script = s"$context$script").lang("painless").scriptType("source"))
@@ -810,7 +845,7 @@ package object bridge {
810845
case NE | DIFF => not(rangeQuery(identifier.name) gte script lte script)
811846
}
812847
case _ =>
813-
val context = PainlessContext()
848+
val context = PainlessContext(context = contextType)
814849
val script = painless(Some(context))
815850
scriptQuery(
816851
now(
@@ -821,7 +856,7 @@ package object bridge {
821856
)
822857
}
823858
case _ =>
824-
val context = PainlessContext()
859+
val context = PainlessContext(context = contextType)
825860
val script = painless(Some(context))
826861
scriptQuery(
827862
now(
@@ -884,7 +919,10 @@ package object bridge {
884919

885920
implicit def betweenToQuery(
886921
between: BetweenExpr
887-
)(implicit timestamp: Long): Query = {
922+
)(implicit
923+
timestamp: Long,
924+
contextType: PainlessContextType = PainlessContextType.Query
925+
): Query = {
888926
import between._
889927
// Geo distance special case
890928
identifier.functions.headOption match {
@@ -1007,6 +1045,40 @@ package object bridge {
10071045
)
10081046
}
10091047

1048+
implicit def queryToJson(
1049+
query: Query
1050+
): JsonNode = {
1051+
JacksonBuilder.toNode(
1052+
SearchBodyBuilderFn(
1053+
ElasticApi.search("") query {
1054+
query
1055+
}
1056+
).value
1057+
) match {
1058+
case Left(node: JsonNode) =>
1059+
if (node.has("query")) {
1060+
node.get("query")
1061+
} else {
1062+
node
1063+
}
1064+
case Right(_) => NullNode.instance
1065+
}
1066+
}
1067+
1068+
implicit def criteriaToQuery(criteria: Criteria)(implicit
1069+
timestamp: Long,
1070+
contextType: PainlessContextType = PainlessContextType.Query
1071+
): Query = {
1072+
ElasticCriteria(criteria).asQuery()
1073+
}
1074+
1075+
implicit def criteriaToNode(criteria: Criteria)(implicit
1076+
timestamp: Long,
1077+
contextType: PainlessContextType = PainlessContextType.Query
1078+
): JsonNode = {
1079+
queryToJson(criteriaToQuery(criteria))
1080+
}
1081+
10101082
implicit def filterToQuery(
10111083
filter: ElasticFilter
10121084
): ElasticBridge = {
@@ -1015,7 +1087,10 @@ package object bridge {
10151087

10161088
implicit def sqlQueryToAggregations(
10171089
query: SelectStatement
1018-
)(implicit timestamp: Long): Seq[ElasticAggregation] = {
1090+
)(implicit
1091+
timestamp: Long,
1092+
contextType: PainlessContextType = PainlessContextType.Query
1093+
): Seq[ElasticAggregation] = {
10191094
import query._
10201095
statement
10211096
.map {

0 commit comments

Comments
 (0)