33#include < unistd.h>
44#include < signal.h>
55#include < iostream>
6+ #include < stdlib.h>
67
78class test_publish ;
89
@@ -30,12 +31,14 @@ class test_connect : public pc::manager_sub
3031 // construct publishers on addition of new symbols
3132 void on_add_symbol ( pc::manager *, pc::price * ) override ;
3233
34+ // when we receive a new slot
35+ void on_slot_publish ( pc::manager * ) override ;
36+
3337 // have we received an on_init() callback yet
3438 bool get_is_init () const ;
3539
3640 void teardown ();
3741
38- private:
3942 test_publish *pub1_; // SYMBOL1 publisher
4043 test_publish *pub2_; // SYMBOL2 publisher
4144};
@@ -44,8 +47,7 @@ class test_connect : public pc::manager_sub
4447class test_publish : public pc ::request_sub,
4548 public pc::request_sub_i<pc::product>,
4649 public pc::request_sub_i<pc::price>,
47- public pc::request_sub_i<pc::price_init>,
48- public pc::request_sub_i<pc::price_sched>
50+ public pc::request_sub_i<pc::price_init>
4951{
5052public:
5153 test_publish ( pc::price *sym, int64_t px, uint64_t sprd );
@@ -57,12 +59,18 @@ class test_publish : public pc::request_sub,
5759 // callback for on-chain aggregate price update
5860 void on_response ( pc::price*, uint64_t ) override ;
5961
60- // callback for when to submit new price on-chain
61- void on_response ( pc::price_sched *, uint64_t ) override ;
62-
6362 // callback for re-initialization of price account (with diff. exponent)
6463 void on_response ( pc::price_init *, uint64_t ) override ;
6564
65+ // user-facing method to update the price this publisher will send
66+ void update_price ( int64_t px_, uint64_t sprd_ );
67+
68+ // the pyth price publisher for this symbol
69+ pc::price *sym_;
70+
71+ // indicates if there is an update pending which should be sent in the next slot
72+ bool has_update_pending;
73+
6674private:
6775 void unsubscribe ();
6876
@@ -71,25 +79,23 @@ class test_publish : public pc::request_sub,
7179 uint64_t sprd_; // confidence interval or bid-ask spread
7280 double expo_; // price exponent
7381 uint64_t sid1_; // subscription id for prices
74- uint64_t sid2_; // subscription id for scheduling
75- uint64_t sid3_; // subscription id for scheduling
82+ uint64_t sid2_; // subscription id for products
7683 uint64_t rcnt_; // price receive count
7784};
7885
7986test_publish::test_publish ( pc::price *sym, int64_t px, uint64_t sprd )
80- : sub_( this ),
87+ : sym_( sym ),
88+ has_update_pending( false ),
89+ sub_( this ),
8190 px_( px ),
8291 sprd_( sprd ),
8392 rcnt_( 0UL )
8493{
8594 // add subscriptions for price updates from block chain
8695 sid1_ = sub_.add ( sym );
8796
88- // add subscription for price scheduling
89- sid2_ = sub_.add ( sym->get_sched () );
90-
9197 // add subscription for product updates
92- sid3_ = sub_.add ( sym->get_product () );
98+ sid2_ = sub_.add ( sym->get_product () );
9399
94100 // get price exponent for this symbol
95101 int64_t expo = sym->get_price_exponent ();
@@ -170,6 +176,34 @@ void test_connect::on_add_symbol( pc::manager *, pc::price *sym )
170176 }
171177}
172178
179+ // Send any pending updates when a new slot is published.
180+ void test_connect::on_slot_publish ( pc::manager * )
181+ {
182+ // Collect all the prices that are pending updates
183+ std::vector<pc::price*> updates;
184+ if ( pub1_ && pub1_->has_update_pending ) {
185+ updates.emplace_back ( pub1_->sym_ );
186+ }
187+ if ( pub2_ && pub2_->has_update_pending ) {
188+ updates.emplace_back ( pub2_->sym_ );
189+ }
190+
191+ // Do nothing if there are no pending updates
192+ if ( updates.empty () ) {
193+ return ;
194+ }
195+
196+ // Send the batch price update
197+ if ( !pc::price::send ( updates.data (), updates.size ()) ) {
198+ PC_LOG_ERR ( " batch send failed" ).end ();
199+ }
200+
201+ // Mark the updates as completed
202+ updates.clear ();
203+ pub1_->has_update_pending = false ;
204+ pub2_->has_update_pending = false ;
205+ }
206+
173207test_publish::~test_publish ()
174208{
175209 unsubscribe ();
@@ -179,7 +213,6 @@ void test_publish::unsubscribe()
179213{
180214 // unsubscribe to callbacks
181215 sub_.del ( sid1_ ); // unsubscribe price updates
182- sub_.del ( sid2_ ); // unsubscribe price schedule updates
183216}
184217
185218void test_publish::on_response ( pc::product *prod, uint64_t )
@@ -264,56 +297,6 @@ void test_publish::on_response( pc::price *sym, uint64_t )
264297 }
265298}
266299
267- void test_publish::on_response ( pc::price_sched *ptr, uint64_t sub_id )
268- {
269- // check if currently in error
270- pc::price *sym = ptr->get_price ();
271- if ( sym->get_is_err () ) {
272- PC_LOG_ERR ( " aggregate price in error" )
273- .add ( " err" , sym->get_err_msg () )
274- .end ();
275- unsubscribe ();
276- return ;
277- }
278-
279- // submit next price to block chain for this symbol
280- if ( sym->update ( px_, sprd_, pc::symbol_status::e_trading ) ) {
281- double price = expo_ * (double )px_;
282- double spread = expo_ * (double )sprd_;
283- PC_LOG_INF ( " submit price to block-chain" )
284- .add ( " symbol" , sym->get_symbol () )
285- .add ( " price_type" , pc::price_type_to_str ( sym->get_price_type () ) )
286- .add ( " price" , price )
287- .add ( " spread" , spread )
288- .add ( " slot" , sym->get_manager ()->get_slot () )
289- .add ( " sub_id" , sub_id )
290- .end ();
291- // increase price
292- px_ += static_cast < int64_t >( sprd_ );
293- } else if ( !sym->has_publisher () ) {
294- PC_LOG_WRN ( " missing publish permission" )
295- .add ( " symbol" , sym->get_symbol () )
296- .add ( " price_type" , pc::price_type_to_str ( sym->get_price_type () ) )
297- .end ();
298- // should work once publisher has been permissioned
299- } else if ( !sym->get_is_ready_publish () ) {
300- PC_LOG_WRN ( " not ready to publish next price - check rpc / pyth_tx connection" )
301- .add ( " symbol" , sym->get_symbol () )
302- .add ( " price_type" , pc::price_type_to_str ( sym->get_price_type () ) )
303- .end ();
304- // likely that pyth_tx not yet connected
305- } else if ( sym->get_is_err () ) {
306- PC_LOG_WRN ( " block-chain error" )
307- .add ( " symbol" , sym->get_symbol () )
308- .add ( " price_type" , pc::price_type_to_str ( sym->get_price_type () ) )
309- .add ( " err_msg" , sym->get_err_msg () )
310- .end ();
311- unsubscribe ();
312- // either bad config or on-chain program problem - cant continue as is
313- // could try calling reset_err() and continue once error is resolved
314- }
315- }
316-
317300void test_publish::on_response ( pc::price_init *ptr, uint64_t )
318301{
319302 pc::price *sym = ptr->get_price ();
@@ -323,6 +306,12 @@ void test_publish::on_response( pc::price_init *ptr, uint64_t )
323306 .end ();
324307}
325308
309+ // Updates the price value stored locally, without sending the update.
310+ void test_publish::update_price ( int64_t px_, uint64_t sprd_ ) {
311+ sym_->update_no_send ( px_, sprd_, pc::symbol_status::e_trading, false );
312+ has_update_pending = true ;
313+ }
314+
326315std::string get_rpc_host ()
327316{
328317 return " localhost" ;
@@ -364,6 +353,11 @@ int usage()
364353 return 1 ;
365354}
366355
356+ int64_t random_value ( )
357+ {
358+ return rand () % 10 + 1 ;
359+ }
360+
367361int main (int argc, char ** argv)
368362{
369363 // unpack options
@@ -429,6 +423,15 @@ int main(int argc, char** argv)
429423 // and requests to submit price
430424 while ( do_run && !mgr.get_is_err () ) {
431425 mgr.poll ( do_wait );
426+
427+ // Submit new price updates
428+ if ( sub.pub1_ != nullptr ) {
429+ sub.pub1_ ->update_price ( random_value () , uint64_t ( random_value () ) );
430+ }
431+ if ( sub.pub2_ != nullptr ) {
432+ sub.pub2_ ->update_price ( random_value () , uint64_t ( random_value () ) );
433+ }
434+
432435 }
433436
434437 // report any errors on exit
0 commit comments