66from urllib .parse import urljoin
77
88import attr
9- import elasticsearch
10- from elasticsearch_dsl import Q , Search
119from fastapi import HTTPException
1210from overrides import overrides
1311
1715from stac_pydantic .shared import MimeTypes
1816
1917from stac_fastapi .elasticsearch import serializers
20- from stac_fastapi .elasticsearch .config import ElasticsearchSettings
18+ from stac_fastapi .elasticsearch .database_logic import DatabaseLogic
2119from stac_fastapi .elasticsearch .session import Session
2220
2321# from stac_fastapi.elasticsearch.types.error_checks import ErrorChecks
2422from stac_fastapi .types .core import BaseCoreClient
25- from stac_fastapi .types .errors import NotFoundError
2623from stac_fastapi .types .stac import Collection , Collections , Item , ItemCollection
2724
2825logger = logging .getLogger (__name__ )
2926
3027NumType = Union [float , int ]
3128
32- ITEMS_INDEX = "stac_items"
33- COLLECTIONS_INDEX = "stac_collections"
34-
35-
36- def mk_item_id (item_id : str , collection_id : str ):
37- """Make the Elasticsearch document _id value from the Item id and collection."""
38- return f"{ item_id } |{ collection_id } "
39-
4029
4130@attr .s
4231class CoreCrudClient (BaseCoreClient ):
@@ -49,25 +38,14 @@ class CoreCrudClient(BaseCoreClient):
4938 collection_serializer : Type [serializers .Serializer ] = attr .ib (
5039 default = serializers .CollectionSerializer
5140 )
52- settings = ElasticsearchSettings ()
53- client = settings .create_client
41+ database = DatabaseLogic ()
5442
5543 @overrides
5644 def all_collections (self , ** kwargs ) -> Collections :
5745 """Read all collections from the database."""
5846 base_url = str (kwargs ["request" ].base_url )
59- try :
60- collections = self .client .search (
61- index = COLLECTIONS_INDEX , query = {"match_all" : {}}
62- )
63- except elasticsearch .exceptions .NotFoundError :
64- raise NotFoundError ("No collections exist" )
65- serialized_collections = [
66- self .collection_serializer .db_to_stac (
67- collection ["_source" ], base_url = base_url
68- )
69- for collection in collections ["hits" ]["hits" ]
70- ]
47+ serialized_collections = self .database .get_all_collections (base_url = base_url )
48+
7149 links = [
7250 {
7351 "rel" : Relations .root .value ,
@@ -94,12 +72,8 @@ def all_collections(self, **kwargs) -> Collections:
9472 def get_collection (self , collection_id : str , ** kwargs ) -> Collection :
9573 """Get collection by id."""
9674 base_url = str (kwargs ["request" ].base_url )
97- try :
98- collection = self .client .get (index = COLLECTIONS_INDEX , id = collection_id )
99- except elasticsearch .exceptions .NotFoundError :
100- raise NotFoundError (f"Collection { collection_id } not found" )
101-
102- return self .collection_serializer .db_to_stac (collection ["_source" ], base_url )
75+ collection = self .database .find_collection (collection_id = collection_id )
76+ return self .collection_serializer .db_to_stac (collection , base_url )
10377
10478 @overrides
10579 def item_collection (
@@ -108,24 +82,10 @@ def item_collection(
10882 """Read an item collection from the database."""
10983 links = []
11084 base_url = str (kwargs ["request" ].base_url )
111- search = Search (using = self .client , index = "stac_items" )
11285
113- collection_filter = Q (
114- "bool" , should = [ Q ( "match_phrase" , ** { "collection" : collection_id })]
86+ serialized_children , count = self . database . get_item_collection (
87+ collection_id = collection_id , limit = limit , base_url = base_url
11588 )
116- search = search .query (collection_filter )
117- try :
118- count = search .count ()
119- except elasticsearch .exceptions .NotFoundError :
120- raise NotFoundError ("No items exist" )
121- # search = search.sort({"id.keyword" : {"order" : "asc"}})
122- search = search .query ()[0 :limit ]
123- collection_children = search .execute ().to_dict ()
124-
125- serialized_children = [
126- self .item_serializer .db_to_stac (item ["_source" ], base_url = base_url )
127- for item in collection_children ["hits" ]["hits" ]
128- ]
12989
13090 context_obj = None
13191 if self .extension_is_enabled ("ContextExtension" ):
@@ -146,15 +106,8 @@ def item_collection(
146106 def get_item (self , item_id : str , collection_id : str , ** kwargs ) -> Item :
147107 """Get item by item id, collection id."""
148108 base_url = str (kwargs ["request" ].base_url )
149- try :
150- item = self .client .get (
151- index = ITEMS_INDEX , id = mk_item_id (item_id , collection_id )
152- )
153- except elasticsearch .exceptions .NotFoundError :
154- raise NotFoundError (
155- f"Item { item_id } does not exist in Collection { collection_id } "
156- )
157- return self .item_serializer .db_to_stac (item ["_source" ], base_url )
109+ item = self .database .get_one_item (item_id = item_id , collection_id = collection_id )
110+ return self .item_serializer .db_to_stac (item , base_url )
158111
159112 @staticmethod
160113 def _return_date (interval_str ):
@@ -238,125 +191,63 @@ def get_search(
238191
239192 return resp
240193
241- @staticmethod
242- def bbox2poly (b0 , b1 , b2 , b3 ):
243- """Transform bbox to polygon."""
244- poly = [[[b0 , b1 ], [b2 , b1 ], [b2 , b3 ], [b0 , b3 ], [b0 , b1 ]]]
245- return poly
246-
247- def post_search (self , search_request : Search , ** kwargs ) -> ItemCollection :
194+ def post_search (self , search_request , ** kwargs ) -> ItemCollection :
248195 """POST search catalog."""
249196 base_url = str (kwargs ["request" ].base_url )
250- search = (
251- Search ()
252- .using (self .client )
253- .index (ITEMS_INDEX )
254- .sort (
255- {"properties.datetime" : {"order" : "desc" }},
256- {"id" : {"order" : "desc" }},
257- {"collection" : {"order" : "desc" }},
258- )
259- )
197+ search = self .database .create_search_object ()
260198
261199 if search_request .query :
262200 if type (search_request .query ) == str :
263201 search_request .query = json .loads (search_request .query )
264202 for (field_name , expr ) in search_request .query .items ():
265203 field = "properties__" + field_name
266204 for (op , value ) in expr .items ():
267- if op != "eq" :
268- key_filter = {field : {f"{ op } " : value }}
269- search = search .query (Q ("range" , ** key_filter ))
270- else :
271- search = search .query ("match_phrase" , ** {field : value })
205+ search = self .database .create_query_filter (
206+ search = search , op = op , field = field , value = value
207+ )
272208
273209 if search_request .ids :
274- id_list = []
275- for item_id in search_request .ids :
276- id_list .append (Q ("match_phrase" , ** {"id" : item_id }))
277- id_filter = Q ("bool" , should = id_list )
278- search = search .query (id_filter )
210+ search = self .database .search_ids (
211+ search = search , item_ids = search_request .ids
212+ )
279213
280214 if search_request .collections :
281- collection_list = []
282- for collection_id in search_request .collections :
283- collection_list .append (
284- Q ("match_phrase" , ** {"collection" : collection_id })
285- )
286- collection_filter = Q ("bool" , should = collection_list )
287- search = search .query (collection_filter )
215+ search = self .database .search_collections (
216+ search = search , collection_ids = search_request .collections
217+ )
288218
289219 if search_request .datetime :
290220 datetime_search = self ._return_date (search_request .datetime )
291- if "eq" in datetime_search :
292- search = search .query (
293- "match_phrase" , ** {"properties__datetime" : datetime_search ["eq" ]}
294- )
295- else :
296- search = search .filter (
297- "range" , properties__datetime = {"lte" : datetime_search ["lte" ]}
298- )
299- search = search .filter (
300- "range" , properties__datetime = {"gte" : datetime_search ["gte" ]}
301- )
221+ search = self .database .search_datetime (
222+ search = search , datetime_search = datetime_search
223+ )
302224
303225 if search_request .bbox :
304226 bbox = search_request .bbox
305227 if len (bbox ) == 6 :
306228 bbox = [bbox [0 ], bbox [1 ], bbox [3 ], bbox [4 ]]
307- poly = self .bbox2poly (bbox [0 ], bbox [1 ], bbox [2 ], bbox [3 ])
308-
309- bbox_filter = Q (
310- {
311- "geo_shape" : {
312- "geometry" : {
313- "shape" : {"type" : "polygon" , "coordinates" : poly },
314- "relation" : "intersects" ,
315- }
316- }
317- }
318- )
319- search = search .query (bbox_filter )
229+
230+ search = self .database .search_bbox (search = search , bbox = bbox )
320231
321232 if search_request .intersects :
322- intersect_filter = Q (
323- {
324- "geo_shape" : {
325- "geometry" : {
326- "shape" : {
327- "type" : search_request .intersects .type .lower (),
328- "coordinates" : search_request .intersects .coordinates ,
329- },
330- "relation" : "intersects" ,
331- }
332- }
333- }
233+ self .database .search_intersects (
234+ search = search , intersects = search_request .intersects
334235 )
335- search = search .query (intersect_filter )
336236
337237 if search_request .sortby :
338238 for sort in search_request .sortby :
339239 if sort .field == "datetime" :
340240 sort .field = "properties__datetime"
341241 field = sort .field + ".keyword"
342- search = search .sort ({field : {"order" : sort .direction }})
242+ search = self .database .sort_field (
243+ search = search , field = field , direction = sort .direction
244+ )
343245
344- try :
345- count = search .count ()
346- except elasticsearch .exceptions .NotFoundError :
347- raise NotFoundError ("No items exist" )
348-
349- # search = search.sort({"id.keyword" : {"order" : "asc"}})
350- search = search .query ()[0 : search_request .limit ]
351- response = search .execute ().to_dict ()
352-
353- if len (response ["hits" ]["hits" ]) > 0 :
354- response_features = [
355- self .item_serializer .db_to_stac (item ["_source" ], base_url = base_url )
356- for item in response ["hits" ]["hits" ]
357- ]
358- else :
359- response_features = []
246+ count = self .database .search_count (search = search )
247+
248+ response_features = self .database .execute_search (
249+ search = search , limit = search_request .limit , base_url = base_url
250+ )
360251
361252 # if self.extension_is_enabled("FieldsExtension"):
362253 # if search_request.query is not None:
@@ -384,7 +275,7 @@ def post_search(self, search_request: Search, **kwargs) -> ItemCollection:
384275 else :
385276 limit = 10
386277 response_features = response_features [0 :limit ]
387- limit = 10
278+
388279 context_obj = None
389280 if self .extension_is_enabled ("ContextExtension" ):
390281 context_obj = {
0 commit comments