-
Notifications
You must be signed in to change notification settings - Fork 9
RabbitHub Users Guide
RabbitHub allows a webhook to be created to forward Rabbitmq messages to a subscribing application. RabbitHub also provides api's to publish messages to Rabbitmq exchanges.
This requires the following components to be created:
- Rabbitmq Requirements
- Rabbitmq Exchange
- Rabbitmq Queue
- Rabbitmq Binding
- Creating a RabbitHub Subscriber
- Rest Service to for subscribing application to receive messages
- RabbitHub Subscriber configuration
- Publishing Messages to Rabbitmq
For detailed information on how to work with Rabbitmq please see https://www.rabbitmq.com/getstarted.html and https://www.rabbitmq.com/documentation.html.
In General Rabbitmq is based on the AMQP Protocol and follows this basic pattern:
- Messages are published to an Exchange
- An Exchange has 1 or more bindings to Queues
- Messages are routed to Queues based on the bindings
- Subscribers listen to queues for messages
Queues are worker queues, which means that if a queue has more than 1 subscriber, each message will be delivered to 1 and only 1 subscriber. For a Publish Subscribe scenario, each subscriber must have its own worker queue
RabbitHub acts as the message consumer for worker queues and then forwards the message via HTTP to the subscriber who listens for messages via a rest service.
RabbitHub also allows for the publishing of messages to Rabbitmq via a Rest API
The RabbitHub api supports the creation of an Exchange:
Default Vhost
PUT http://guest:guest@localhost:15670/endpoint/x/xMyExchangeName?amqp.exchange_type=direct
Note: default exchange type is 'fanout'.
Other Vhost
PUT http://guest:guest@localhost:15670/myvhost/endpoint/x/xMyExchangeName?amqp.exchange_type=direct
You can also use the Rabbitmq_management plugin api (http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html)
Default Vhost (%2F)
PUT http://guest:guest@localhost:15672/api/exchanges/%2f/xMyExchangeName
Payload:
"{""type"":""direct"",""durable"":true}"
Other Vhost
PUT http://guest:guest@localhost:15672/api/exchanges/myvhost/xMyExchangeName
Payload:
"{""type"":""direct"",""durable"":true}"
This can also be done via the Rabbitmq Management UI, and command line and other Rabbitmq supplied methods.
The RabbitHub api supports the creation of an Exchange:
Default Vhost
PUT http://guest:guest@localhost:15670/endpoint/q/qMyQueueName
Other Vhost
PUT http://guest:guest@localhost:15670/myvhost/endpoint/x/qMyQueueName
You can also use the Rabbitmq_management plugin api (http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html)
Default Vhost (%2F)
PUT http://guest:guest@localhost:15672/api/queues/%2f/qMyQueueName
Payload:
"{"auto_delete":false,"durable":true,"arguments":[]}"
Other Vhost
PUT http://guest:guest@localhost:15672/api/queues/myvhost/qMyQueueName
Payload:
"{"auto_delete":false,"durable":true,"arguments":[]}"
This can also be done via the Rabbitmq Management UI, and command line and other Rabbitmq supplied methods.
The Rabbitmq_management plugin api (http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html) supplies the ability to create bindings between an Exchange and a Queue or an Exchange and an Exchange.
Explanations of Bindings can be found here https://www.cloudamqp.com/blog/2015-09-03-part4-rabbitmq-for-beginners-exchanges-routing-keys-bindings.html and https://www.rabbitmq.com/getstarted.html
Default Vhost (%2F)
PUT http://guest:guest@localhost:15672/api/bindings/%2F/e/xMyExchangeName/q/qMyQueueName
Payload:
"{"routing_key":"my_routing_key","arguments":{}}"
Other Vhost
PUT http://guest:guest@localhost:15672/api/bindings/myvhost/e/xMyExchangeName/q/qMyQueueName
Payload:
"{"routing_key":"my_routing_key","arguments":{}}"
Payload for Headers Exchange Binding:
"{"arguments": { "key1": "A", "key2": "B", "x-match": "any" }}"
This can also be done via the Rabbitmq Management UI, command line and other Rabbitmq supplied methods.
For RabbitHub to create a webhook and push messages from the configured Queue to a Subscriber two steps are required.
To create a RabbitHub subscriber a restful api must be created that implements the following methods:
This will receive a message that the callback URL has been subscribed to as the body of the POST. The following HTTP Headers may be set when the the message is POSTed to this URL:
| HTTP Header | Description |
| x-amqp-routing-key=[mykey1] | The routing key used to bind the queue to the exchange, if fanout, the hub.topic value |
| x-message-id=[rabbithub_msgid-tSCazoU9gWZr4Ie1_8DBHg] | If configured, RabbitHub will generate a unique message id at the time the message is published, this is sent to the subscriber |
| x-correlation-id=[1234321] | If configured, and if the publisher sends this id as a header when posting the message to RabbitHub, this will also be passed to the subscriber. |
| x-rabbithub-msg_header=[] | If a headers exchange is used, the binding arguments will be passed as comma delimited key value pairs in this header |
| x-amqp-redelivered=[true] | If the message has been redelivered, this is set to true |
| x-amqp-redelivered-count=[3] | redeliver count based on rabbitmq x-death properties |
| x-amqp-queue=[q2] | Name of Queue |
| x-amqp-exchange=[x2] | Name of Exchange |
| content-type=[application/json] | content-type of the message published to RabbitHub |
(Note: Some of these are sent based on configuration or exchange type, please see TODO:Need Link for more information.)
This will receive validation requests for creating a subscription that will forward messages to this URL. The method will receive a GET request with the following query parameter hub.challenge=token The api must return the token as the body of the response to validate that it is ok to create a subscription to this URL. This request is only made when a subscription request is sent to RabbitHub, this is not called for each message.
Excerpt from a Spring Java Project as a sample of functionality, this code is NOT production ready!
/*
* POST Rest Resource to receive messages from RabbitHub subscriber resource 's1'
*/
@RequestMapping(value="/rabbithub/s1", method=RequestMethod.POST)
public ResponseEntity<Void> recvRabbbitHubMessage_s1(@RequestBody String msg, @RequestHeader HttpHeaders headers ) throws UnsupportedEncodingException {
System.out.println(dateFormat.format(new Date())+": Post received s1"+"--Received Msg="+msg);
String decodedValue1 = URLDecoder.decode(msg, StandardCharsets.UTF_8.name());
System.out.println(dateFormat.format(new Date())+": Post received for rabbithub/s1"+"--Received Msg decoded="+decodedValue1);
System.out.println(dateFormat.format(new Date())+": headers: "+headers);
System.out.println(dateFormat.format(new Date())+": good msg return 200");
return new ResponseEntity<Void>(HttpStatus.OK);
}
/*
* GET Rest Resource for responding to RabbitHub security check with provided token for s1 for node testing
*/
@RequestMapping(value="/rabbithub/s1", method=RequestMethod.GET, produces = MediaType.TEXT_PLAIN_VALUE )
public ResponseEntity<String> setupSubscription_s1(@RequestParam(value="hub.challenge") String token, @RequestHeader HttpHeaders headers, HttpServletRequest request) {
System.out.println(dateFormat.format(new Date())+": Subscriber Validation Check rabbithub/s1: Request query string="+request.getQueryString()+" Respond with token="+token);
System.out.println(dateFormat.format(new Date())+": headers: "+headers);
ResponseEntity<String> re = new ResponseEntity<String>(token, HttpStatus.OK);
return re;
}
The RabbitHub API provides the following to create subscriptions to a queue:
POST http://guest:guest@localhost:15672/subscribe/q/qMyQueueName
| Parameter | Description |
| hub.mode* | 'subscribe' to create a new subscription, 'unsubscribe' to deactivate an existing subscription |
| hub.topic* | A filter for selecting a subset of messages |
| hub.verify* | The subscription verification mode for this request (the value may be either “sync” or “async”). Refer to the PubSubHubBub specification for additional details. |
| hub.callback* | callback url of the subscribers Rest Service |
| hub.lease_seconds | Subscriber-provided lease duration in seconds. After this time, the subscription will be terminated. The default lease is approximately 30 days, and the maximum lease is approximately 1000 years. Refer to the PubSubHubBub specification for additional information. |
| hub.max_tps | Simple throttling mechanism to limit the Maximum Transactions Per Second that can be sent to a subscriber. See Max TPS section for details |
| hub.ha_mode | Ability to set HA Mode for a consumer for an individual subscription overriding the enviornment variable settings. See High Availability Consumers section for details |
| hub.basic_auth | allows the setting of basic auth credentials for calling the subscriber. The value must be the base64 of user:pass. |
| hub.app_name | allows the setting of an application name for the subscriber. |
| hub.contact_name | allows the setting of contact name for the subscriber. |
| hub.phone | allows the setting of a phone number for the subscriber. |
| hub.email | allows the setting of an email address for the subscriber. |
| hub.description | allows the setting of a description for the subscriber. |
*Required
Content-Type: application/x-www-form-urlencoded
"hub.mode=subscribe&hub.callback=http://10.1.1.8:4567/sub1&<br>hub.topic=foo&hub.verify=sync&hub.lease_seconds=86400"
Content-Type: application/json
Required Fields
{
"hub": {
"callback": "http://10.1.1.8:4567/sub1",
"topic": "foo",
"mode": "subscribe",
"verify": "sync",
}
}
All Options
{
"hub": {
"callback": "http://10.1.1.8:4567/sub1",
"topic": "foo",
"lease_seconds": 1234,
"mode": "subscribe",
"verify": "sync",
"max_tps": 3,
"ha_mode": "all",
"basic_auth": "base64ofuser:pass",
"contact": {
"app_name": "My App Name",
"contact_name": "My Name",
"phone": "111-111-1111",
"email": "me@email.com",
"description": "this is my subscriber app"
}
}
}
A Unique Subscriber is defined by
- vhost
- resource type (queue/exchange)
- resource name
- callback url
- topic
It is also possible to subscribe directly to an Exchange with RabbitHub. In this case RabbitHub generates a queue name and creates the queue using the ``hub.topic`` parameter as the routing key for the binding. This will only work with Fanout exchange (routing key ignored), Topic exchange and Direct exchange. It is not possible to subscribe directly to a header exchange since the binding will not work. When you delete this subscription, it will delete the queue as well.
POST http://guest:guest@localhost:15672/subscribe/x/xMyExchangeName
all other options listed aboce for subscribing to a queue apply here as well.
Publishing a message to a Rabbitmq Exchange is the same api as creating the Exchange, except that you use a POST instead of a PUT.
Default Vhost
POST http://guest:guest@localhost:15670/endpoint/x/xMyExchangeName?hub.topic=blue&hub.persistmsg=true
Note: default exchange type is 'fanout'.
Other Vhost
POST http://guest:guest@localhost:15670/myvhost/endpoint/x/xMyExchangeName?hub.topic=blue&hub.persistmsg=true
Payload: Is the message body
| Query Parameter | Description |
| hub.topic=blue | value is the routing key to be used |
| hub.persistmsg=true | true: persist message, false: do not persist message (default) |
If a message is published with a `content-type` header, this will be passed to the susbscriber.
For example if you publish a message with `content-type=[application/json]` The the subscriber will also get the same header 'content-type=[application/json]'
A Rabbitmq_management plugin has been created for RabbitHub. Please refer to https://github.com/gfiehler/rabbithub_management