@@ -34,6 +34,7 @@ def __init__(self, client: Any, index_operations: IndexOperations):
3434 self .client = client
3535 self .index_operations = index_operations
3636 self .datetime_manager = DatetimeIndexManager (client , index_operations )
37+ self .index_selector = DatetimeBasedIndexSelector (client )
3738
3839 @property
3940 def use_datetime (self ) -> bool :
@@ -74,6 +75,14 @@ async def create_simple_index(self, client: Any, collection_id: str) -> str:
7475 """
7576 return await self .index_operations .create_simple_index (client , collection_id )
7677
78+ async def refresh_cache (self ) -> None :
79+ """Refresh the index selector cache.
80+
81+ This method refreshes the cached index information used for
82+ datetime-based index selection.
83+ """
84+ await self .index_selector .refresh_cache ()
85+
7786 async def get_target_index (
7887 self , collection_id : str , product : Dict [str , Any ]
7988 ) -> str :
@@ -86,9 +95,8 @@ async def get_target_index(
8695 Returns:
8796 str: Target index name for the product.
8897 """
89- index_selector = DatetimeBasedIndexSelector (self .client )
9098 return await self ._get_target_index_internal (
91- index_selector , collection_id , product , check_size = True
99+ collection_id , product , check_size = True
92100 )
93101
94102 async def prepare_bulk_actions (
@@ -109,17 +117,16 @@ async def prepare_bulk_actions(
109117 raise HTTPException (status_code = status .HTTP_400_BAD_REQUEST , detail = msg )
110118
111119 items .sort (key = lambda item : item ["properties" ][self .primary_datetime_name ])
112- index_selector = DatetimeBasedIndexSelector (self .client )
113120
114- await self ._ensure_indexes_exist (index_selector , collection_id , items )
121+ await self ._ensure_indexes_exist (collection_id , items )
115122 await self ._check_and_handle_oversized_index (
116- index_selector , collection_id , items
123+ collection_id , items
117124 )
118125
119126 actions = []
120127 for item in items :
121128 target_index = await self ._get_target_index_internal (
122- index_selector , collection_id , item , check_size = False
129+ collection_id , item , check_size = False
123130 )
124131 actions .append (
125132 {
@@ -133,15 +140,13 @@ async def prepare_bulk_actions(
133140
134141 async def _get_target_index_internal (
135142 self ,
136- index_selector ,
137143 collection_id : str ,
138144 product : Dict [str , Any ],
139145 check_size : bool = True ,
140146 ) -> Optional [str ]:
141147 """Get target index with size checking internally.
142148
143149 Args:
144- index_selector: Index selector instance.
145150 collection_id (str): Collection identifier.
146151 product (Dict[str, Any]): Product data.
147152 check_size (bool): Whetheru to check index size limits.
@@ -159,16 +164,16 @@ async def _get_target_index_internal(
159164 else product_datetimes .start_datetime
160165 )
161166
162- target_index = await index_selector .select_indexes (
167+ target_index = await self . index_selector .select_indexes (
163168 [collection_id ], primary_datetime_value , for_insertion = True
164169 )
165- all_indexes = await index_selector .get_collection_indexes (collection_id )
170+ all_indexes = await self . index_selector .get_collection_indexes (collection_id )
166171
167172 if not all_indexes :
168173 target_index = await self .datetime_manager .handle_new_collection (
169174 collection_id , self .primary_datetime_name , product_datetimes
170175 )
171- await index_selector .refresh_cache ()
176+ await self .refresh_cache ()
172177 return target_index
173178
174179 all_indexes = sorted (
@@ -186,7 +191,7 @@ async def _get_target_index_internal(
186191 product_datetimes ,
187192 all_indexes [0 ][0 ],
188193 )
189- await index_selector .refresh_cache ()
194+ await self .refresh_cache ()
190195 return alias
191196
192197 if target_index != all_indexes [- 1 ][0 ][self .primary_datetime_name ]:
@@ -213,7 +218,7 @@ async def _get_target_index_internal(
213218 product_datetimes ,
214219 aliases_dict ,
215220 )
216- await index_selector .refresh_cache ()
221+ await self .refresh_cache ()
217222 return target_index
218223
219224 for item in all_indexes :
@@ -229,16 +234,15 @@ async def _get_target_index_internal(
229234 return None
230235
231236 async def _ensure_indexes_exist (
232- self , index_selector , collection_id : str , items : List [Dict [str , Any ]]
237+ self , collection_id : str , items : List [Dict [str , Any ]]
233238 ):
234239 """Ensure necessary indexes exist for the items.
235240
236241 Args:
237- index_selector: Index selector instance.
238242 collection_id (str): Collection identifier.
239243 items (List[Dict[str, Any]]): List of items to process.
240244 """
241- all_indexes = await index_selector .get_collection_indexes (collection_id )
245+ all_indexes = await self . index_selector .get_collection_indexes (collection_id )
242246
243247 if not all_indexes :
244248 first_item = items [0 ]
@@ -260,18 +264,17 @@ async def _ensure_indexes_exist(
260264 collection_id ,
261265 ** index_params ,
262266 )
263- await index_selector .refresh_cache ()
267+ await self .refresh_cache ()
264268
265269 async def _check_and_handle_oversized_index (
266- self , index_selector , collection_id : str , items : List [Dict [str , Any ]]
270+ self , collection_id : str , items : List [Dict [str , Any ]]
267271 ) -> None :
268272 """Check if index is oversized and create new index if needed.
269273
270274 Checks if the index where the first item would be inserted is oversized.
271275 If so, creates a new index starting from the next day.
272276
273277 Args:
274- index_selector: Index selector instance.
275278 collection_id (str): Collection identifier.
276279 items (List[Dict[str, Any]]): List of items to process.
277280
@@ -280,10 +283,10 @@ async def _check_and_handle_oversized_index(
280283 """
281284 first_item = items [0 ]
282285 first_item_index = await self ._get_target_index_internal (
283- index_selector , collection_id , first_item , check_size = False
286+ collection_id , first_item , check_size = False
284287 )
285288
286- all_indexes = await index_selector .get_collection_indexes (collection_id )
289+ all_indexes = await self . index_selector .get_collection_indexes (collection_id )
287290 all_indexes = sorted (
288291 all_indexes , key = lambda x : x [0 ][self .primary_datetime_name ]
289292 )
@@ -313,7 +316,7 @@ async def _check_and_handle_oversized_index(
313316 product_datetimes ,
314317 all_indexes [- 1 ][0 ],
315318 )
316- await index_selector .refresh_cache ()
319+ await self .refresh_cache ()
317320
318321
319322class SimpleIndexInserter (BaseIndexInserter ):
0 commit comments