Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 121 additions & 72 deletions request_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,16 @@ class RequestDecomposition(BaseModel):
optionalPatterns: list = []
values: list = []
insertPattern: list = []
subDecompositions: list = []


##################
# QUERY HANDLING #
##################

def constructGraphFromKnowledgeNetwork(query: str, requester_id: str, gaps_enabled) -> tuple[Graph, list]:
# TEST query
#query = "SELECT * WHERE {?s ?p ?o}"
# first parse the query
try:
parsed_query = parseQuery(query)
Expand Down Expand Up @@ -84,35 +87,41 @@ def constructGraphFromKnowledgeNetwork(query: str, requester_id: str, gaps_enabl

# decompose the query algebra and get the main BGP pattern, possible OPTIONAL patterns and possible VALUES statements
try:
query_decomposition = RequestDecomposition()
query_decomposition = decomposeRequest(algebra['p']['p'], query_decomposition)
logger.debug(f"Query decomposition VALUES is: {query_decomposition.values}")
query_decomposition = decomposeRequest(algebra['p'], RequestDecomposition())
except Exception as e:
raise Exception(f"Could not decompose query to get graph patterns, {e}")

# deal with multiple VALUES clauses, combine them and delete incorrect combinations
query_decomposition = combineValuesStatements(query_decomposition)

# now show the derived query decomposition
showRequestDecomposition(query_decomposition, prologue.namespace_manager)

# if there are multiple VALUES clauses, combine them and delete incorrect combinations
if len(query_decomposition.values) > 1:
logger.info(f"Now combining the VALUES statements")
query_decomposition = combineValuesStatements(query_decomposition)
# now show the query decomposition after values application
showRequestDecomposition(query_decomposition, prologue.namespace_manager)

# search bindings in the knowledge network for the graph patterns and build a local graph of them
# build up a graph (and optionally knowledge gaps) by executing the decomposition on the knowledge network
graph = Graph()
knowledge_gaps = []
graph, knowledge_gaps = buildGraphFromDecomposition(graph, query_decomposition, requester_id, gaps_enabled, knowledge_gaps)

if len(query_decomposition.mainPattern) > 0:
# first, ask the main graph pattern and add the bindings to the graph
logger.info('Main graph pattern is being asked from the knowledge network!')
logger.info(f"Knowledge network successfully responded to all the ask patterns!")

return graph, knowledge_gaps


def buildGraphFromDecomposition(graph: Graph,
decomposition: RequestDecomposition,
requester_id: str,
gaps_enabled: bool,
knowledge_gaps: list) -> tuple[Graph, list]:

# first, ask the main graph pattern and add the bindings to the graph
if len(decomposition.mainPattern) > 0:
logger.info('A main graph pattern is being asked from the knowledge network!')
try:
pattern = query_decomposition.mainPattern
pattern = decomposition.mainPattern
logger.info(f"Pattern that is asked: {pattern}")
bindings = [{}]
if len(query_decomposition.values) > 0:
bindings = query_decomposition.values[0]
if len(decomposition.values) > 0:
bindings = decomposition.values[0]
logger.info(f"Bindings that accompany the ASK: {bindings}")
answer = knowledge_network.askPatternAtKnowledgeNetwork(requester_id, pattern, bindings, gaps_enabled)
logger.info(f"Received answer from the knowledge network: {answer}")
Expand All @@ -132,20 +141,28 @@ def constructGraphFromKnowledgeNetwork(query: str, requester_id: str, gaps_enabl
raise Exception(f"An error occurred when contacting the knowledge network: {e}")
logger.info(f"Knowledge network successfully responded to the main graph pattern!")

# second, loop over the optional graph patterns and add the bindings to the graph
try:
logger.info('Optional graph patterns are being asked from the knowledge network!')
for pattern in query_decomposition.optionalPatterns:
logger.info(f"Pattern that is asked: {pattern}")
answer = knowledge_network.askPatternAtKnowledgeNetwork(requester_id, pattern, [{}], gaps_enabled)
logger.info(f'Received answer from the knowledge network: {answer}')
# extend the graph with the triples and values in the bindings
graph = buildGraphFromTriplesAndBindings(graph, pattern, answer["bindingSet"])
except Exception as e:
raise Exception(f"An error occurred when contacting the knowledge network: {e}")
logger.info(f"Knowledge network successfully responded to all the ask patterns!")
else:
logger.info(f"No main graph pattern is derived from the query, so the result is empty!")
# second, loop over the optional graph patterns and add the bindings to the graph
try:
for pattern in decomposition.optionalPatterns:
logger.info('An optional graph pattern is being asked from the knowledge network!')
logger.info(f"Pattern that is asked: {pattern}")
answer = knowledge_network.askPatternAtKnowledgeNetwork(requester_id, pattern, [{}], gaps_enabled)
logger.info(f'Received answer from the knowledge network: {answer}')
# extend the graph with the triples and values in the bindings
graph = buildGraphFromTriplesAndBindings(graph, pattern, answer["bindingSet"])
logger.info(f"Knowledge network successfully responded to an optional graph pattern!")
except Exception as e:
raise Exception(f"An error occurred when contacting the knowledge network: {e}")

# third, check if there sub-decompositions and loop over them to extend the graph and optionally knowledge gaps
try:
if len(decomposition.subDecompositions) > 0:
for decomp in decomposition.subDecompositions:
logger.info(f"A sub decomposition is being handled!")
graph, knowledge_gaps = buildGraphFromDecomposition(graph, decomp, requester_id, gaps_enabled, knowledge_gaps)
logger.info(f"The sub decomposition has successfully been handled!")
except Exception as e:
raise Exception(f"An error occurred when contacting the knowledge network: {e}")

return graph, knowledge_gaps

Expand Down Expand Up @@ -177,20 +194,15 @@ def checkAndDecomposeUpdate(update: str) -> RequestDecomposition:

# decompose the request algebra and get the INSERT and WHERE part (if present) of the request
try:
update_decomposition = RequestDecomposition()
update_decomposition = decomposeRequest(algebra[0], update_decomposition)
update_decomposition = decomposeRequest(algebra[0], RequestDecomposition())
except Exception as e:
raise Exception(f"Could not decompose update request to get INSERT or WHERE graph pattern, {e}")

# now show the derived query decomposition
# deal with multiple VALUES clauses, combine them and delete incorrect combinations
update_decomposition = combineValuesStatements(update_decomposition)

# now show the derived request decomposition
showRequestDecomposition(update_decomposition, prologue.namespace_manager)

# if there are multiple VALUES clauses, combine them and delete incorrect combinations
if len(update_decomposition.values) > 1:
logger.info(f"Now combining the VALUES statements")
update_decomposition = combineValuesStatements(update_decomposition)
# now show the query decomposition after values application
showRequestDecomposition(update_decomposition, prologue.namespace_manager)

return update_decomposition

Expand Down Expand Up @@ -286,12 +298,24 @@ def decomposeRequest(algebra: dict, decomposition: RequestDecomposition) -> Requ
logger.debug(f"Value clause after transforming to JSON is: {values_clause}")
decomposition.values.append(values_clause)
case "Filter":
if not str(algebra['expr']).startswith("Builtin"):
# it is a filter with a value for a variable, so this does not contain triples to be added to the graph pattern
decomposition = decomposeRequest(algebra['p'], decomposition)
else:
# it is either a filter_exists or a filter_not_exists
raise Exception(f"Unsupported construct type {str(algebra['expr']).split('{')[0]} in construct type {type}. Please contact the endpoint administrator to implement this!")
filter_type = algebra['expr'].name
logger.debug(f"Filter expression is {filter_type}")
match filter_type:
case "RelationalExpression":
# it is a filter that checks a relation between a variable and a value => this can be ignored, continue with the rest
decomposition = decomposeRequest(algebra['p'], decomposition)
case "ConditionalAndExpression":
# it is a filter that checks multiple conditions in an AND setting => this can be ignored, continue with the rest
decomposition = decomposeRequest(algebra['p'], decomposition)
case "Builtin_isBLANK":
# it is a filter that checks whether the argument is a blank node => this can be ignored, continue with the rest
decomposition = decomposeRequest(algebra['p'], decomposition)
case _:
raise Exception(f"Unsupported construct type {filter_type}. Please contact the endpoint administrator to implement this!")
case "Union":
# two new sub decompositions should be derived
decomposition.subDecompositions.append(decomposeRequest(algebra['p1'], RequestDecomposition()))
decomposition.subDecompositions.append(decomposeRequest(algebra['p2'], RequestDecomposition()))
case "Join":
# both parts should be added to the same main graph pattern
decomposition = decomposeRequest(algebra['p1'], decomposition)
Expand All @@ -301,6 +325,15 @@ def decomposeRequest(algebra: dict, decomposition: RequestDecomposition) -> Requ
decomposition = decomposeRequest(algebra['p1'], decomposition)
# part p2 is an optional part which is BGP and its triples should be added as optional graph pattern
decomposition.optionalPatterns.append(algebra['p2']['triples'])
case "Distinct":
# the distinct contains a part p that should be further processed
decomposition = decomposeRequest(algebra['p'], decomposition)
case "Project":
# the project contains a part p that should be further processed
decomposition = decomposeRequest(algebra['p'], decomposition)
case "Slice":
# the slice contains a part p that should be further processed
decomposition = decomposeRequest(algebra['p'], decomposition)
case "Extend":
# the extend contains a part p that should be further processed
decomposition = decomposeRequest(algebra['p'], decomposition)
Expand Down Expand Up @@ -337,29 +370,39 @@ def filterBindingsOnPatternVariables(bindings: list , pattern: list) -> list:


def combineValuesStatements(decomposition: RequestDecomposition) -> RequestDecomposition:
# derive all combinations of VALUES clause elements
values_combinations = list(itertools.product(*decomposition.values))
correct_values_combinations = []
# delete all incorrect value combinations
for values_combination in values_combinations:
# check if this values combination is correct
correct = True
correct_values_combination = {}
# loop over the elements in the values combination
for element in values_combination:
for key in element.keys():
logger.debug(f"Value for {key} is: {element[key]}")
if key not in correct_values_combination.keys():
correct_values_combination[key] = element[key]
else:
if correct_values_combination[key] != element[key]:
# this values combination is inconsistent, because one key must only have one value
correct = False
if correct:
correct_values_combinations.append(correct_values_combination)

decomposition.values = [correct_values_combinations]
logger.debug(f"Values after combining are: {decomposition.values}")
# if there are VALUES clause elements, combine them
if len(decomposition.values) > 1:
logger.info(f"Now combining the VALUES statements")
# derive all combinations of VALUES clause elements
logger.debug(f"Query decomposition VALUES is: {decomposition.values}")
values_combinations = list(itertools.product(*decomposition.values))
correct_values_combinations = []
# delete all incorrect value combinations
for values_combination in values_combinations:
# check if this values combination is correct
correct = True
correct_values_combination = {}
# loop over the elements in the values combination
for element in values_combination:
for key in element.keys():
logger.debug(f"Value for {key} is: {element[key]}")
if key not in correct_values_combination.keys():
correct_values_combination[key] = element[key]
else:
if correct_values_combination[key] != element[key]:
# this values combination is inconsistent, because one key must only have one value
correct = False
if correct:
correct_values_combinations.append(correct_values_combination)

decomposition.values = [correct_values_combinations]
logger.debug(f"Values after combining are: {decomposition.values}")

# if there are sub decompositions, do check if they have VALUES statements to be combined
subDecomp_combined_values = []
for decomp in decomposition.subDecompositions:
subDecomp_combined_values.append(combineValuesStatements(decomp))
decomposition.subDecompositions = subDecomp_combined_values

return decomposition

Expand Down Expand Up @@ -392,7 +435,8 @@ def showRequestDecomposition(qd: RequestDecomposition, nm: NamespaceManager):
bound_triple += element.n3(namespace_manager = nm) + " "
bound_triple += "\n"
pattern += bound_triple
logger.info(f"Derived the following main graph pattern from the request:\n{pattern}")
if pattern != "":
logger.info(f"Derived the following main graph pattern from the request:\n{pattern}")

for p in qd.optionalPatterns:
pattern = ""
Expand Down Expand Up @@ -427,4 +471,9 @@ def showRequestDecomposition(qd: RequestDecomposition, nm: NamespaceManager):
bound_triple += element.n3(namespace_manager = nm) + " "
bound_triple += "\n"
pattern += bound_triple
logger.info(f"Derived the following insert pattern from the request:\n{pattern}")
if pattern != "":
logger.info(f"Derived the following insert pattern from the request:\n{pattern}")

for decomp in qd.subDecompositions:
showRequestDecomposition(decomp,nm)

Loading