Skip to content

Commit d7b0668

Browse files
committed
bulk sync
1 parent cc96d23 commit d7b0668

File tree

2 files changed

+22
-20
lines changed

2 files changed

+22
-20
lines changed

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import attr
66
import elasticsearch
77
from elasticsearch_dsl import Q, Search
8+
from elasticsearch import helpers
89

910
from stac_fastapi.elasticsearch import serializers
1011
from stac_fastapi.elasticsearch.config import ElasticsearchSettings
@@ -289,3 +290,22 @@ def prep_update_collection(self, collection_id: str):
289290
except elasticsearch.exceptions.NotFoundError:
290291
raise NotFoundError(f"Collection {collection_id} not found")
291292

293+
def delete_collection(self, collection_id: str):
294+
try:
295+
_ = self.client.get(index=COLLECTIONS_INDEX, id=collection_id)
296+
except elasticsearch.exceptions.NotFoundError:
297+
raise NotFoundError(f"Collection {collection_id} not found")
298+
self.client.delete(index=COLLECTIONS_INDEX, id=collection_id)
299+
300+
def bulk_sync(self, processed_items):
301+
"""Database logic for bulk insertion."""
302+
actions = [
303+
{
304+
"_index": ITEMS_INDEX,
305+
"_id": mk_item_id(item["id"], item["collection"]),
306+
"_source": item,
307+
}
308+
for item in processed_items
309+
]
310+
helpers.bulk(self.client, actions)
311+

stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/transactions.py

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
from typing import Optional
66

77
import attr
8-
import elasticsearch
9-
from elasticsearch import helpers
108
from overrides import overrides
119

1210
from stac_fastapi.elasticsearch.config import ElasticsearchSettings
@@ -111,11 +109,7 @@ def update_collection(
111109
@overrides
112110
def delete_collection(self, collection_id: str, **kwargs) -> stac_types.Collection:
113111
"""Delete collection."""
114-
try:
115-
_ = self.client.get(index=COLLECTIONS_INDEX, id=collection_id)
116-
except elasticsearch.exceptions.NotFoundError:
117-
raise NotFoundError(f"Collection {collection_id} not found")
118-
self.client.delete(index=COLLECTIONS_INDEX, id=collection_id)
112+
self.database.delete_collection(collection_id=collection_id)
119113
return None
120114

121115

@@ -136,18 +130,6 @@ def preprocess_item(self, item: stac_types.Item, base_url) -> stac_types.Item:
136130
item = self.database.prep_create_item(item=item, base_url=base_url)
137131
return item
138132

139-
def bulk_sync(self, processed_items):
140-
"""Elasticsearch bulk insertion."""
141-
actions = [
142-
{
143-
"_index": ITEMS_INDEX,
144-
"_id": mk_item_id(item["id"], item["collection"]),
145-
"_source": item,
146-
}
147-
for item in processed_items
148-
]
149-
helpers.bulk(self.client, actions)
150-
151133
@overrides
152134
def bulk_item_insert(
153135
self, items: Items, chunk_size: Optional[int] = None, **kwargs
@@ -163,6 +145,6 @@ def bulk_item_insert(
163145
self.preprocess_item(item, base_url) for item in items.items.values()
164146
]
165147

166-
self.bulk_sync(processed_items)
148+
self.database.bulk_sync(processed_items)
167149

168150
return f"Successfully added {len(processed_items)} Items."

0 commit comments

Comments
 (0)