@@ -78,7 +78,7 @@ private ElasticLowLevelClient CreateNewElasticLowLevelClient(Uri elasticSearchEn
7878 var singleNode = new SingleNodeConnectionPool ( _optionsMonitor . CurrentValue . ElasticsearchEndpoint ) ;
7979
8080 var cc = new ConnectionConfiguration ( singleNode , new ElasticsearchJsonNetSerializer ( ) )
81- // .ServerCertificateValidationCallback((obj, cert, chain, policyerrors) => true)
81+ . ServerCertificateValidationCallback ( ( obj , cert , chain , policyerrors ) => true )
8282 . EnableHttpPipelining ( )
8383 . EnableHttpCompression ( )
8484 . ThrowExceptions ( ) ;
@@ -88,15 +88,15 @@ private ElasticLowLevelClient CreateNewElasticLowLevelClient(Uri elasticSearchEn
8888
8989 private void Initialize ( )
9090 {
91- //setup a flag in config to chose
91+ //TODO: setup a flag in config to chose
9292 //SetupObserver();
9393 SetupObserverBatchy ( ) ;
9494 }
9595
9696
9797 private void SetupObserver ( )
9898 {
99- _scribeProcessor = a => WriteDirectlyToES ( a ) ;
99+ _scribeProcessor = async ( a ) => await WriteDirectlyToES ( a ) ;
100100
101101 //this._queueToBePosted.GetConsumingEnumerable()
102102 //.ToObservable(Scheduler.Default)
@@ -128,6 +128,9 @@ private async Task WriteDirectlyToES(JObject jo)
128128 }
129129 }
130130
131+ //POST /_bulk? filter_path = items.*.error
132+ private static Dictionary < string , object > filter_path = new Dictionary < string , object > ( ) { { "filter_path" , "items.*.error" } } ;
133+
131134 private async Task WriteDirectlyToESAsBatch ( IEnumerable < JObject > jos )
132135 {
133136 if ( ! jos . Any ( ) )
@@ -141,7 +144,7 @@ private async Task WriteDirectlyToESAsBatch(IEnumerable<JObject> jos)
141144
142145 _ = Client . BulkPutAsync < VoidResponse > ( Index , DocumentType ,
143146 PostData . MultiJson ( bbo . ToArray ( ) ) ,
144- new BulkRequestParameters { Refresh = Refresh . False } )
147+ new BulkRequestParameters { Refresh = Refresh . False , QueryString = filter_path } )
145148 . ContinueWith ( x =>
146149 {
147150 if ( x . IsFaulted )
0 commit comments