44import logging
55import datetime
66import socket
7- from threading import Timer
7+ from threading import Timer , Lock
88from enum import Enum
99from elasticsearch import helpers as eshelpers
1010from elasticsearch import Elasticsearch , RequestsHttpConnection
@@ -197,6 +197,7 @@ def __init__(self,
197197
198198 self ._client = None
199199 self ._buffer = []
200+ self ._buffer_lock = Lock ()
200201 self ._timer = None
201202 self ._index_name_func = CMRESHandler ._INDEX_FREQUENCY_FUNCION_DICT [self .index_name_frequency ]
202203 self .serializer = CMRESSerializer ()
@@ -285,13 +286,16 @@ def flush(self):
285286
286287 if self ._buffer :
287288 try :
289+ with self ._buffer_lock :
290+ logs_buffer = self ._buffer
291+ self ._buffer = []
288292 actions = (
289293 {
290294 '_index' : self ._index_name_func .__func__ (self .es_index_name ),
291295 '_type' : self .es_doc_type ,
292296 '_source' : log_record
293297 }
294- for log_record in self . _buffer
298+ for log_record in logs_buffer
295299 )
296300 eshelpers .bulk (
297301 client = self .__get_es_client (),
@@ -301,7 +305,6 @@ def flush(self):
301305 except Exception as exception :
302306 if self .raise_on_indexing_exceptions :
303307 raise exception
304- self ._buffer = []
305308
306309 def close (self ):
307310 """ Flushes the buffer and release any outstanding resource
@@ -327,7 +330,8 @@ def emit(self, record):
327330 if key not in CMRESHandler .__LOGGING_FILTER_FIELDS :
328331 rec [key ] = "" if value is None else value
329332 rec [self .default_timestamp_field_name ] = self .__get_es_datetime_str (record .created )
330- self ._buffer .append (rec )
333+ with self ._buffer_lock :
334+ self ._buffer .append (rec )
331335
332336 if len (self ._buffer ) >= self .buffer_size :
333337 self .flush ()
0 commit comments