1111import com .dylibso .chicory .runtime .Memory ;
1212import com .dylibso .chicory .wasm .types .FunctionType ;
1313import com .dylibso .chicory .wasm .types .ValType ;
14- import com .fasterxml .jackson .core .JsonGenerator ;
1514import com .fasterxml .jackson .core .JsonProcessingException ;
1615import com .fasterxml .jackson .databind .JavaType ;
17- import com .fasterxml .jackson .databind .JsonDeserializer ;
18- import com .fasterxml .jackson .databind .JsonMappingException ;
19- import com .fasterxml .jackson .databind .JsonSerializer ;
2016import com .fasterxml .jackson .databind .ObjectMapper ;
21- import com .fasterxml .jackson .databind .SerializerProvider ;
2217import com .fasterxml .jackson .databind .module .SimpleModule ;
2318import com .google .protobuf .Struct ;
2419import dev .openfeature .contrib .providers .flagd .FlagdOptions ;
2520import dev .openfeature .contrib .providers .flagd .resolver .Resolver ;
2621import dev .openfeature .contrib .providers .flagd .resolver .common .FlagdProviderEvent ;
27- import dev .openfeature .contrib .providers .flagd .resolver .process .model . FeatureFlag ;
28- import dev .openfeature .contrib .providers .flagd .resolver .process .storage . StorageQueryResult ;
22+ import dev .openfeature .contrib .providers .flagd .resolver .process .jackson . ImmutableMetadataDeserializer ;
23+ import dev .openfeature .contrib .providers .flagd .resolver .process .jackson . LayeredEvalContextSerializer ;
2924import dev .openfeature .contrib .providers .flagd .resolver .process .storage .connector .QueuePayload ;
3025import dev .openfeature .contrib .providers .flagd .resolver .process .storage .connector .QueueSource ;
3126import dev .openfeature .contrib .providers .flagd .resolver .process .storage .connector .file .FileQueueSource ;
3833import dev .openfeature .sdk .ProviderEvent ;
3934import dev .openfeature .sdk .Structure ;
4035import dev .openfeature .sdk .Value ;
41- import java .io .IOException ;
42- import java .util .List ;
36+ import dev .openfeature .sdk .exceptions .GeneralError ;
4337import java .util .Map ;
4438import java .util .function .Consumer ;
4539import lombok .extern .slf4j .Slf4j ;
@@ -54,9 +48,8 @@ public class InProcessWasmResolver implements Resolver {
5448
5549 private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper ();
5650
57- {
58-
59- // Register this module with your ObjectMapper
51+ static {
52+ // Register custom serializers/deserializers with the ObjectMapper
6053 SimpleModule module = new SimpleModule ();
6154 module .addDeserializer (ImmutableMetadata .class , new ImmutableMetadataDeserializer ());
6255 module .addSerializer (LayeredEvaluationContext .class , new LayeredEvalContextSerializer ());
@@ -66,6 +59,7 @@ public class InProcessWasmResolver implements Resolver {
6659 private final Consumer <FlagdProviderEvent > onConnectionEvent ;
6760 private final String scope ;
6861 private final QueueSource connector ;
62+ private Thread stateWatcher ;
6963 private final ExportFunction validationMode ;
7064 private ExportFunction updateStore ;
7165 private ExportFunction alloc ;
@@ -109,14 +103,13 @@ public InProcessWasmResolver(FlagdOptions options, Consumer<FlagdProviderEvent>
109103 public void init () throws Exception {
110104
111105 connector .init ();
112- final Thread stateWatcher = new Thread (() -> {
113- try {
114- var streamPayloads = connector . getStreamQueue ();
115- while ( true ) {
106+ this . stateWatcher = new Thread (() -> {
107+ var streamPayloads = connector . getStreamQueue ();
108+ while (! Thread . currentThread (). isInterrupted ()) {
109+ try {
116110 final QueuePayload payload = streamPayloads .take ();
117111 switch (payload .getType ()) {
118112 case DATA :
119- List <String > changedFlagsKeys ;
120113 var data = payload .getFlagData ().getBytes ();
121114 long dataPtr = alloc .apply (data .length )[0 ];
122115 memory .write ((int ) dataPtr , data );
@@ -146,26 +139,30 @@ public void init() throws Exception {
146139 default :
147140 log .warn (String .format ("Payload with unknown type: %s" , payload .getType ()));
148141 }
142+ } catch (InterruptedException e ) {
143+ log .debug ("Storage state watcher interrupted" , e );
144+ Thread .currentThread ().interrupt ();
145+ break ;
146+ } catch (JsonProcessingException e ) {
147+ log .error ("Error processing flag data, skipping update" , e );
149148 }
150- } catch (InterruptedException e ) {
151- log .warn ("Storage state watcher interrupted" , e );
152- Thread .currentThread ().interrupt ();
153- } catch (JsonMappingException e ) {
154- throw new RuntimeException (e );
155- } catch (JsonProcessingException e ) {
156- throw new RuntimeException (e );
157149 }
158150 });
159- stateWatcher .setDaemon (true );
160- stateWatcher .start ();
151+ this . stateWatcher .setDaemon (true );
152+ this . stateWatcher .start ();
161153 }
162154
163155 /**
164156 * Shutdown in-process resolver.
165157 *
166158 * @throws InterruptedException if stream can't be closed within deadline.
167159 */
168- public void shutdown () throws InterruptedException {}
160+ public void shutdown () throws InterruptedException {
161+ if (stateWatcher != null ) {
162+ stateWatcher .interrupt ();
163+ }
164+ connector .shutdown ();
165+ }
169166
170167 /**
171168 * Resolve a boolean flag.
@@ -253,62 +250,16 @@ private <T> ProviderEvaluation<T> resolve(Class<T> type, String key, EvaluationC
253250 ProviderEvaluation <T > providerEvaluation = OBJECT_MAPPER .readValue (result , javaType );
254251
255252 return providerEvaluation ;
253+ } catch (JsonProcessingException e ) {
254+ throw new GeneralError ("Error deserializing WASM evaluation result" , e );
256255 } catch (Exception e ) {
257- throw new RuntimeException ( e );
256+ throw new GeneralError ( "Error during WASM evaluation" , e );
258257 } finally {
259258 dealloc .apply (flagPtr , flagBytes .length );
260259 dealloc .apply (ctxPtr , ctxBytes .length );
261260 }
262261 }
263262
264- private ImmutableMetadata getFlagMetadata (StorageQueryResult storageQueryResult ) {
265- ImmutableMetadata .ImmutableMetadataBuilder metadataBuilder = ImmutableMetadata .builder ();
266- for (Map .Entry <String , Object > entry :
267- storageQueryResult .getFlagSetMetadata ().entrySet ()) {
268- addEntryToMetadataBuilder (metadataBuilder , entry .getKey (), entry .getValue ());
269- }
270-
271- if (scope != null ) {
272- metadataBuilder .addString ("scope" , scope );
273- }
274-
275- FeatureFlag flag = storageQueryResult .getFeatureFlag ();
276- if (flag != null ) {
277- for (Map .Entry <String , Object > entry : flag .getMetadata ().entrySet ()) {
278- addEntryToMetadataBuilder (metadataBuilder , entry .getKey (), entry .getValue ());
279- }
280- }
281-
282- return metadataBuilder .build ();
283- }
284-
285- private void addEntryToMetadataBuilder (
286- ImmutableMetadata .ImmutableMetadataBuilder metadataBuilder , String key , Object value ) {
287- if (value instanceof Number ) {
288- if (value instanceof Long ) {
289- metadataBuilder .addLong (key , (Long ) value );
290- return ;
291- } else if (value instanceof Double ) {
292- metadataBuilder .addDouble (key , (Double ) value );
293- return ;
294- } else if (value instanceof Integer ) {
295- metadataBuilder .addInteger (key , (Integer ) value );
296- return ;
297- } else if (value instanceof Float ) {
298- metadataBuilder .addFloat (key , (Float ) value );
299- return ;
300- }
301- } else if (value instanceof Boolean ) {
302- metadataBuilder .addBoolean (key , (Boolean ) value );
303- return ;
304- } else if (value instanceof String ) {
305- metadataBuilder .addString (key , (String ) value );
306- return ;
307- }
308- throw new IllegalArgumentException (
309- "The type of the Metadata entry with key " + key + " and value " + value + " is not supported" );
310- }
311-
312263 private Structure parseSyncContext (Struct syncContext ) {
313264 if (syncContext != null ) {
314265 try {
@@ -319,36 +270,4 @@ private Structure parseSyncContext(Struct syncContext) {
319270 }
320271 return new ImmutableStructure ();
321272 }
322-
323- // Implement a custom deserializer for ImmutableMetadata
324- public class ImmutableMetadataDeserializer extends JsonDeserializer <ImmutableMetadata > {
325- @ Override
326- public ImmutableMetadata deserialize (
327- com .fasterxml .jackson .core .JsonParser p , com .fasterxml .jackson .databind .DeserializationContext ctxt )
328- throws IOException {
329- // Deserialize into a Map or DTO, then use the builder
330- Map <String , Object > map = p .readValueAs (Map .class );
331- ImmutableMetadata .ImmutableMetadataBuilder builder = ImmutableMetadata .builder ();
332- for (Map .Entry <String , Object > entry : map .entrySet ()) {
333- builder .addString (entry .getKey (), entry .getValue ().toString ());
334- }
335- return builder .build ();
336- }
337- }
338-
339- public class LayeredEvalContextSerializer extends JsonSerializer <LayeredEvaluationContext > {
340- @ Override
341- public void serialize (LayeredEvaluationContext ctx , JsonGenerator gen , SerializerProvider serializers )
342- throws IOException {
343- gen .writeStartObject ();
344-
345- // Use the keySet and getValue to stream the entries
346- for (String key : ctx .keySet ()) {
347- Object value = ctx .getValue (key );
348- gen .writeObjectField (key , value );
349- }
350-
351- gen .writeEndObject ();
352- }
353- }
354273}
0 commit comments