Skip to content
Merged
76 changes: 1 addition & 75 deletions markdown/solace-masterclass/solace-masterclass.md
Original file line number Diff line number Diff line change
Expand Up @@ -448,81 +448,7 @@ This **Order-Confirmed** needs to be subscribed by the **Order Service**. Follow
![order-service-all-order-updates-queue.png](img/retail-domain-usecase/order-service-all-order-updates-queue.png)
> aside positive As we incorporate new features, we will update this queue's subscriptions with additional event topics.

* Once the consumer is created, navigate to the **Runtime** tab and push the updates to the event broker
* Navigate to the **Order-Service** by simply selecting the folder at the location:
**solace-masterclass/retail-domain/order-service**
> aside negative If you need any assistance in this, please feel free to reach out to the Solace instructors nearby.

* Open the file: **com.solace.acme.store.orderservice.service.SolaceEventPublisher.java** and make the below updates to
the file :
* In the method **connectToBroker**, add in the code snippet before the return statement :
```Java
final PersistentMessageReceiver orderUpdatesEventReceiver = messagingService.createPersistentMessageReceiverBuilder().build(Queue.durableExclusiveQueue(configProperties.getOrderUpdatesQueueName()));
orderUpdatesEventReceiver.setReceiveFailureListener(failedReceiveEvent -> System.out.println("### FAILED RECEIVE EVENT " + failedReceiveEvent));
orderUpdatesEventReceiver.start();
orderUpdatesEventReceiver.receiveAsync(buildOrdersUpdatesEventHandler(orderUpdatesEventReceiver));
```

> aside positive This code snippet creates and configures a receiver for subscribing to the persistent messages
> attracted in the **all-order-updates** queue.
> The receiver links an asynchronous callback handler which processes the event

* Introduce the below two methods in the same class :
```Java
private MessageReceiver.MessageHandler buildOrdersUpdatesEventHandler(final PersistentMessageReceiver orderUpdatesEventReceiver) {
return (inboundMessage -> {
try {
final String inboundTopic = inboundMessage.getDestinationName();
log.info("Processing message on incoming topic :{} with payload:{}", inboundTopic, inboundMessage.getPayloadAsString());
boolean eventProcessed = processOrderUpdate(inboundTopic, inboundMessage.getPayloadAsString());
if (eventProcessed) {
orderUpdatesEventReceiver.ack(inboundMessage);
}
} catch (RuntimeException runtimeException) {
log.error("Runtime exception encountered while processing incoming event payload :{} on topic:{}. Error is :", inboundMessage.getPayloadAsString(), inboundMessage.getDestinationName(), runtimeException);
}
});
}
```

> aside positive This code snippet is the handler for the events which are being consumed by the above-configured
> receiver.

```Java
private boolean processOrderUpdate(final String eventTopic, final String eventJson) {
try {
if (eventTopic.contains("order")) {
final Order order = objectMapper.readValue(eventJson, Order.class);
final String incomingOrderId = order.getId();
Order orderObjectFromCache = OrderCache.getInstance().getOrderMap().get(incomingOrderId);
orderObjectFromCache.setState(Order.OrderState.VALIDATED);
OrderCache.getInstance().getOrderMap().put(incomingOrderId, orderObjectFromCache);
} else if (eventTopic.contains("payment")) {
final Payment payment = objectMapper.readValue(eventJson, Payment.class);
final String incomingOrderId = payment.getOrderId();
Order orderObjectFromCache = OrderCache.getInstance().getOrderMap().get(incomingOrderId);
orderObjectFromCache.setState(Order.OrderState.PAYMENT_PROCESSED);
OrderCache.getInstance().getOrderMap().put(incomingOrderId, orderObjectFromCache);
} else if (eventTopic.contains("shipment")) {
final Shipping shipment = objectMapper.readValue(eventJson, Shipping.class);
final String incomingOrderId = shipment.getOrderId();
Order orderObjectFromCache = OrderCache.getInstance().getOrderMap().get(incomingOrderId);
orderObjectFromCache.setState(Order.OrderState.SHIPPED);
OrderCache.getInstance().getOrderMap().put(incomingOrderId, orderObjectFromCache);
}
return true;
} catch (JsonProcessingException jsonProcessingException) {
log.error("Error encountered while processing event:{}, exception:", eventJson, jsonProcessingException);
return false;
}
}
```

> aside positive This code snippet identifies the topic which the event from the queue was published on and depending on
> the **object type (Order, Payment or Shipment)** implements an appropriate business logic

* In the terminal for the **Order Service**, stop the service if running and execute the command:
**mvn clean spring-boot:run**
* Once the consumer is created, navigate to the **Runtime** tab and push the updates to the event broker.
* Publish a few more orders from the **Order-Service** and see it being processed in the console logs.
* Go back to the **Order-Service,** and you will observe that the status of the newly created order is showing as
VALIDATED as the orders are processed by the **Inventory-FraudCheck-Service**
Expand Down
Loading