1010from overrides import overrides
1111
1212from stac_fastapi .elasticsearch .config import ElasticsearchSettings
13- from stac_fastapi .elasticsearch .core import COLLECTIONS_INDEX , ITEMS_INDEX
13+ from stac_fastapi .elasticsearch .core import COLLECTIONS_INDEX , ITEMS_INDEX , mk_item_id
1414from stac_fastapi .elasticsearch .serializers import CollectionSerializer , ItemSerializer
1515from stac_fastapi .elasticsearch .session import Session
1616from stac_fastapi .extensions .third_party .bulk_transactions import (
@@ -42,29 +42,39 @@ def create_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item:
4242 if item ["type" ] == "FeatureCollection" :
4343 bulk_client = BulkTransactionsClient ()
4444 processed_items = [
45- bulk_client ._preprocess_item (item , base_url )
46- for item in item ["features" ]
45+ bulk_client .preprocess_item (item , base_url ) for item in item ["features" ]
4746 ]
4847 return_msg = f"Successfully added { len (processed_items )} items."
4948 bulk_client .bulk_sync (processed_items )
5049
5150 return return_msg
52-
53- # If a single item is posted
54- if not self .client .exists (index = COLLECTIONS_INDEX , id = item ["collection" ]):
55- raise ForeignKeyError (f"Collection { item ['collection' ]} does not exist" )
56-
57- if self .client .exists (index = ITEMS_INDEX , id = item ["id" ]):
58- raise ConflictError (
59- f"Item { item ['id' ]} in collection { item ['collection' ]} already exists"
51+ else :
52+ # TODO
53+ if self .client .exists (
54+ index = ITEMS_INDEX , id = mk_item_id (item ["id" ], item ["collection" ])
55+ ):
56+ raise ConflictError (
57+ f"Item { item ['id' ]} in collection { item ['collection' ]} already exists"
58+ )
59+
60+ # todo: check if collection exists, but cache
61+ if not self .client .exists (index = COLLECTIONS_INDEX , id = item ["collection" ]):
62+ raise ForeignKeyError (f"Collection { item ['collection' ]} does not exist" )
63+
64+ item = BulkTransactionsClient ().preprocess_item (item , base_url )
65+
66+ es_resp = self .client .index (
67+ index = ITEMS_INDEX ,
68+ id = mk_item_id (item ["id" ], item ["collection" ]),
69+ document = item ,
6070 )
6171
62- data = ItemSerializer .stac_to_db (item , base_url )
72+ if (meta := es_resp .get ("meta" )) and meta .get ("status" ) == 409 :
73+ raise ConflictError (
74+ f"Item { item ['id' ]} in collection { item ['collection' ]} already exists"
75+ )
6376
64- self .client .index (
65- index = ITEMS_INDEX , doc_type = "_doc" , id = item ["id" ], document = data
66- )
67- return ItemSerializer .db_to_stac (item , base_url )
77+ return item
6878
6979 @overrides
7080 def update_item (self , item : stac_types .Item , ** kwargs ) -> stac_types .Item :
@@ -75,14 +85,11 @@ def update_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item:
7585
7686 if not self .client .exists (index = COLLECTIONS_INDEX , id = item ["collection" ]):
7787 raise ForeignKeyError (f"Collection { item ['collection' ]} does not exist" )
78- if not self .client .exists (index = ITEMS_INDEX , id = item ["id" ]):
79- raise NotFoundError (
80- f"Item { item ['id' ]} in collection { item ['collection' ]} doesn't exist"
81- )
88+
89+ # todo: index instead of delete and create
8290 self .delete_item (item ["id" ], item ["collection" ])
8391 self .create_item (item , ** kwargs )
84- # self.client.update(index=ITEMS_INDEX,doc_type='_doc',id=model["id"],
85- # body=model)
92+ # self.client.update(index=ITEMS_INDEX,id=item["id"], body=item)
8693 return ItemSerializer .db_to_stac (item , base_url )
8794
8895 @overrides
@@ -91,10 +98,12 @@ def delete_item(
9198 ) -> stac_types .Item :
9299 """Delete item."""
93100 try :
94- _ = self .client .get (index = ITEMS_INDEX , id = item_id )
101+ self .client .delete (index = ITEMS_INDEX , id = mk_item_id ( item_id , collection_id ) )
95102 except elasticsearch .exceptions .NotFoundError :
96- raise NotFoundError (f"Item { item_id } not found" )
97- self .client .delete (index = ITEMS_INDEX , doc_type = "_doc" , id = item_id )
103+ raise NotFoundError (
104+ f"Item { item_id } in collection { collection_id } not found"
105+ )
106+ return None
98107
99108 @overrides
100109 def create_collection (
@@ -109,9 +118,9 @@ def create_collection(
109118
110119 if self .client .exists (index = COLLECTIONS_INDEX , id = collection ["id" ]):
111120 raise ConflictError (f"Collection { collection ['id' ]} already exists" )
121+
112122 self .client .index (
113123 index = COLLECTIONS_INDEX ,
114- doc_type = "_doc" ,
115124 id = collection ["id" ],
116125 document = collection ,
117126 )
@@ -139,7 +148,8 @@ def delete_collection(self, collection_id: str, **kwargs) -> stac_types.Collecti
139148 _ = self .client .get (index = COLLECTIONS_INDEX , id = collection_id )
140149 except elasticsearch .exceptions .NotFoundError :
141150 raise NotFoundError (f"Collection { collection_id } not found" )
142- self .client .delete (index = COLLECTIONS_INDEX , doc_type = "_doc" , id = collection_id )
151+ self .client .delete (index = COLLECTIONS_INDEX , id = collection_id )
152+ return None
143153
144154
145155@attr .s
@@ -153,38 +163,45 @@ def __attrs_post_init__(self):
153163 settings = ElasticsearchSettings ()
154164 self .client = settings .create_client
155165
156- def _preprocess_item (self , model : stac_types .Item , base_url ) -> stac_types .Item :
166+ def preprocess_item (self , item : stac_types .Item , base_url ) -> stac_types .Item :
157167 """Preprocess items to match data model."""
158- if not self .client .exists (index = COLLECTIONS_INDEX , id = model ["collection" ]):
159- raise ForeignKeyError (f"Collection { model ['collection' ]} does not exist" )
168+ if not self .client .exists (index = COLLECTIONS_INDEX , id = item ["collection" ]):
169+ raise ForeignKeyError (f"Collection { item ['collection' ]} does not exist" )
160170
161- if self .client .exists (index = ITEMS_INDEX , id = model ["id" ]):
171+ if self .client .exists (index = ITEMS_INDEX , id = item ["id" ]):
162172 raise ConflictError (
163- f"Item { model ['id' ]} in collection { model ['collection' ]} already exists"
173+ f"Item { item ['id' ]} in collection { item ['collection' ]} already exists"
164174 )
165175
166- item = ItemSerializer .stac_to_db (model , base_url )
167- return item
176+ return ItemSerializer .stac_to_db (item , base_url )
168177
169178 def bulk_sync (self , processed_items ):
170179 """Elasticsearch bulk insertion."""
171- actions = [{"_index" : ITEMS_INDEX , "_source" : item } for item in processed_items ]
180+ actions = [
181+ {
182+ "_index" : ITEMS_INDEX ,
183+ "_id" : mk_item_id (item ["id" ], item ["collection" ]),
184+ "_source" : item ,
185+ }
186+ for item in processed_items
187+ ]
172188 helpers .bulk (self .client , actions )
173189
174190 @overrides
175191 def bulk_item_insert (
176192 self , items : Items , chunk_size : Optional [int ] = None , ** kwargs
177193 ) -> str :
178194 """Bulk item insertion using es."""
179- try :
180- base_url = str (kwargs ["request" ].base_url )
181- except Exception :
195+ request = kwargs .get ("request" )
196+ if request :
197+ base_url = str (request .base_url )
198+ else :
182199 base_url = ""
200+
183201 processed_items = [
184- self ._preprocess_item (item , base_url ) for item in items .items .values ()
202+ self .preprocess_item (item , base_url ) for item in items .items .values ()
185203 ]
186- return_msg = f"Successfully added { len (processed_items )} items."
187204
188205 self .bulk_sync (processed_items )
189206
190- return return_msg
207+ return f"Successfully added { len ( processed_items ) } Items."
0 commit comments