3333import org .apache .pulsar .common .schema .KeyValueEncodingType ;
3434import org .apache .pulsar .common .schema .SchemaType ;
3535import org .apache .pulsar .functions .api .Record ;
36+ import org .skyscreamer .jsonassert .JSONAssert ;
37+ import org .skyscreamer .jsonassert .JSONCompareMode ;
3638import org .testng .annotations .DataProvider ;
3739import org .testng .annotations .Test ;
3840
@@ -99,8 +101,12 @@ public GenericObject getValue() {
99101 "keyIgnore" , "true" ), null );
100102 Pair <String , String > pair = elasticSearchSink .extractIdAndDocument (genericObjectRecord );
101103 assertEquals (pair .getLeft (), "1" );
102- assertEquals (pair .getRight (), "{\" c\" :\" 1\" ,\" d\" :1,\" e\" :{\" a\" :\" a\" ,\" b\" :true,\" d\" :1.0,"
103- + "\" f\" :1.0,\" i\" :1,\" l\" :10}}" );
104+ JSONAssert .assertEquals (
105+ pair .getRight (),
106+ "{\" c\" :\" 1\" ,\" d\" :1,\" e\" :{\" a\" :\" a\" ,\" b\" :true,\" d\" :1.0,"
107+ + "\" f\" :1.0,\" i\" :1,\" l\" :10}}" ,
108+ JSONCompareMode .STRICT
109+ );
104110 elasticSearchSink .close ();
105111
106112 // two fields PK
@@ -112,9 +118,20 @@ public GenericObject getValue() {
112118 "schemaEnable" , "true" ,
113119 "keyIgnore" , "true" ), null );
114120 Pair <String , String > pair2 = elasticSearchSink2 .extractIdAndDocument (genericObjectRecord );
115- assertEquals (pair2 .getLeft (), "[\" 1\" ,1]" );
116- assertEquals (pair2 .getRight (), "{\" c\" :\" 1\" ,\" d\" :1,\" e\" :{\" a\" :\" a\" ,\" b\" :true,\" d\" :1.0,"
117- + "\" f\" :1.0,\" i\" :1,\" l\" :10}}" );
121+
122+ // NON_EXTENSIBLE is NOT extensible and does NOT have strict ordering so both
123+ // possibilities ["1",1] and [1,"1"] will pass
124+ JSONAssert .assertEquals (
125+ pair2 .getLeft (),
126+ "[\" 1\" ,1]" ,
127+ JSONCompareMode .NON_EXTENSIBLE
128+ );
129+ JSONAssert .assertEquals (
130+ pair2 .getRight (),
131+ "{\" c\" :\" 1\" ,\" d\" :1,\" e\" :{\" a\" :\" a\" ,\" b\" :true,\" d\" :1.0,"
132+ + "\" f\" :1.0,\" i\" :1,\" l\" :10}}" ,
133+ JSONCompareMode .STRICT
134+ );
118135 elasticSearchSink2 .close ();
119136
120137 // default config with null PK => indexed with auto generated _id
@@ -124,8 +141,12 @@ public GenericObject getValue() {
124141 "compatibilityMode" , "ELASTICSEARCH" ), null );
125142 Pair <String , String > pair3 = elasticSearchSink3 .extractIdAndDocument (genericObjectRecord );
126143 assertNull (pair3 .getLeft ());
127- assertEquals (pair3 .getRight (), "{\" c\" :\" 1\" ,\" d\" :1,\" e\" :{\" a\" :\" a\" ,\" b\" :true,\" d\" :1.0,"
128- + "\" f\" :1.0,\" i\" :1,\" l\" :10}}" );
144+ JSONAssert .assertEquals (
145+ pair3 .getRight (),
146+ "{\" c\" :\" 1\" ,\" d\" :1,\" e\" :{\" a\" :\" a\" ,\" b\" :true,\" d\" :1.0,"
147+ + "\" f\" :1.0,\" i\" :1,\" l\" :10}}" ,
148+ JSONCompareMode .STRICT
149+ );
129150 elasticSearchSink3 .close ();
130151
131152 // default config with null PK + null value
0 commit comments