1010from overrides import overrides
1111
1212from stac_fastapi .elasticsearch .config import ElasticsearchSettings
13- from stac_fastapi .elasticsearch .core import COLLECTIONS_INDEX , ITEMS_INDEX
13+ from stac_fastapi .elasticsearch .core import COLLECTIONS_INDEX , ITEMS_INDEX , mk_item_id
1414from stac_fastapi .elasticsearch .serializers import CollectionSerializer , ItemSerializer
1515from stac_fastapi .elasticsearch .session import Session
1616from stac_fastapi .extensions .third_party .bulk_transactions import (
@@ -42,29 +42,38 @@ def create_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item:
4242 if item ["type" ] == "FeatureCollection" :
4343 bulk_client = BulkTransactionsClient ()
4444 processed_items = [
45- bulk_client ._preprocess_item (item , base_url )
46- for item in item ["features" ]
45+ bulk_client .preprocess_item (item , base_url ) for item in item ["features" ]
4746 ]
4847 return_msg = f"Successfully added { len (processed_items )} items."
4948 bulk_client .bulk_sync (processed_items )
5049
5150 return return_msg
52-
53- # If a single item is posted
54- if not self .client .exists (index = COLLECTIONS_INDEX , id = item ["collection" ]):
55- raise ForeignKeyError (f"Collection { item ['collection' ]} does not exist" )
56-
57- if self .client .exists (index = ITEMS_INDEX , id = item ["id" ]):
58- raise ConflictError (
59- f"Item { item ['id' ]} in collection { item ['collection' ]} already exists"
51+ else :
52+ # todo: check if collection exists, but cache
53+ if not self .client .exists (index = COLLECTIONS_INDEX , id = item ["collection" ]):
54+ raise ForeignKeyError (f"Collection { item ['collection' ]} does not exist" )
55+
56+ if self .client .exists (
57+ index = ITEMS_INDEX , id = mk_item_id (item ["id" ], item ["collection" ])
58+ ):
59+ raise ConflictError (
60+ f"Item { item ['id' ]} in collection { item ['collection' ]} already exists"
61+ )
62+
63+ item = BulkTransactionsClient ().preprocess_item (item , base_url )
64+
65+ es_resp = self .client .index (
66+ index = ITEMS_INDEX ,
67+ id = mk_item_id (item ["id" ], item ["collection" ]),
68+ document = item ,
6069 )
6170
62- data = ItemSerializer .stac_to_db (item , base_url )
71+ if (meta := es_resp .get ("meta" )) and meta .get ("status" ) == 409 :
72+ raise ConflictError (
73+ f"Item { item ['id' ]} in collection { item ['collection' ]} already exists"
74+ )
6375
64- self .client .index (
65- index = ITEMS_INDEX , doc_type = "_doc" , id = item ["id" ], document = data
66- )
67- return ItemSerializer .db_to_stac (item , base_url )
76+ return item
6877
6978 @overrides
7079 def update_item (self , item : stac_types .Item , ** kwargs ) -> stac_types .Item :
@@ -75,14 +84,11 @@ def update_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item:
7584
7685 if not self .client .exists (index = COLLECTIONS_INDEX , id = item ["collection" ]):
7786 raise ForeignKeyError (f"Collection { item ['collection' ]} does not exist" )
78- if not self .client .exists (index = ITEMS_INDEX , id = item ["id" ]):
79- raise NotFoundError (
80- f"Item { item ['id' ]} in collection { item ['collection' ]} doesn't exist"
81- )
87+
88+ # todo: index instead of delete and create
8289 self .delete_item (item ["id" ], item ["collection" ])
8390 self .create_item (item , ** kwargs )
84- # self.client.update(index=ITEMS_INDEX,doc_type='_doc',id=model["id"],
85- # body=model)
91+ # self.client.update(index=ITEMS_INDEX,id=item["id"], body=item)
8692 return ItemSerializer .db_to_stac (item , base_url )
8793
8894 @overrides
@@ -91,10 +97,12 @@ def delete_item(
9197 ) -> stac_types .Item :
9298 """Delete item."""
9399 try :
94- _ = self .client .get (index = ITEMS_INDEX , id = item_id )
100+ self .client .delete (index = ITEMS_INDEX , id = mk_item_id ( item_id , collection_id ) )
95101 except elasticsearch .exceptions .NotFoundError :
96- raise NotFoundError (f"Item { item_id } not found" )
97- self .client .delete (index = ITEMS_INDEX , doc_type = "_doc" , id = item_id )
102+ raise NotFoundError (
103+ f"Item { item_id } in collection { collection_id } not found"
104+ )
105+ return None
98106
99107 @overrides
100108 def create_collection (
@@ -109,9 +117,9 @@ def create_collection(
109117
110118 if self .client .exists (index = COLLECTIONS_INDEX , id = collection ["id" ]):
111119 raise ConflictError (f"Collection { collection ['id' ]} already exists" )
120+
112121 self .client .index (
113122 index = COLLECTIONS_INDEX ,
114- doc_type = "_doc" ,
115123 id = collection ["id" ],
116124 document = collection ,
117125 )
@@ -139,7 +147,8 @@ def delete_collection(self, collection_id: str, **kwargs) -> stac_types.Collecti
139147 _ = self .client .get (index = COLLECTIONS_INDEX , id = collection_id )
140148 except elasticsearch .exceptions .NotFoundError :
141149 raise NotFoundError (f"Collection { collection_id } not found" )
142- self .client .delete (index = COLLECTIONS_INDEX , doc_type = "_doc" , id = collection_id )
150+ self .client .delete (index = COLLECTIONS_INDEX , id = collection_id )
151+ return None
143152
144153
145154@attr .s
@@ -153,38 +162,45 @@ def __attrs_post_init__(self):
153162 settings = ElasticsearchSettings ()
154163 self .client = settings .create_client
155164
156- def _preprocess_item (self , model : stac_types .Item , base_url ) -> stac_types .Item :
165+ def preprocess_item (self , item : stac_types .Item , base_url ) -> stac_types .Item :
157166 """Preprocess items to match data model."""
158- if not self .client .exists (index = COLLECTIONS_INDEX , id = model ["collection" ]):
159- raise ForeignKeyError (f"Collection { model ['collection' ]} does not exist" )
167+ if not self .client .exists (index = COLLECTIONS_INDEX , id = item ["collection" ]):
168+ raise ForeignKeyError (f"Collection { item ['collection' ]} does not exist" )
160169
161- if self .client .exists (index = ITEMS_INDEX , id = model ["id" ]):
170+ if self .client .exists (index = ITEMS_INDEX , id = item ["id" ]):
162171 raise ConflictError (
163- f"Item { model ['id' ]} in collection { model ['collection' ]} already exists"
172+ f"Item { item ['id' ]} in collection { item ['collection' ]} already exists"
164173 )
165174
166- item = ItemSerializer .stac_to_db (model , base_url )
167- return item
175+ return ItemSerializer .stac_to_db (item , base_url )
168176
169177 def bulk_sync (self , processed_items ):
170178 """Elasticsearch bulk insertion."""
171- actions = [{"_index" : ITEMS_INDEX , "_source" : item } for item in processed_items ]
179+ actions = [
180+ {
181+ "_index" : ITEMS_INDEX ,
182+ "_id" : mk_item_id (item ["id" ], item ["collection" ]),
183+ "_source" : item ,
184+ }
185+ for item in processed_items
186+ ]
172187 helpers .bulk (self .client , actions )
173188
174189 @overrides
175190 def bulk_item_insert (
176191 self , items : Items , chunk_size : Optional [int ] = None , ** kwargs
177192 ) -> str :
178193 """Bulk item insertion using es."""
179- try :
180- base_url = str (kwargs ["request" ].base_url )
181- except Exception :
194+ request = kwargs .get ("request" )
195+ if request :
196+ base_url = str (request .base_url )
197+ else :
182198 base_url = ""
199+
183200 processed_items = [
184- self ._preprocess_item (item , base_url ) for item in items .items .values ()
201+ self .preprocess_item (item , base_url ) for item in items .items .values ()
185202 ]
186- return_msg = f"Successfully added { len (processed_items )} items."
187203
188204 self .bulk_sync (processed_items )
189205
190- return return_msg
206+ return f"Successfully added { len ( processed_items ) } Items."
0 commit comments