RabbitMQ

Remote Procedure Calls with RabbitMQ

In this article, we’re going to use RabbitMQ to build a Remote Produce Call (RPC) system. Incoming HTTP calls from clients will be proxied to the broker. The asynchronous response from the message-based communication will finally be mapped to the request and returned to the client.

RabbitMQ is an open-source mesage broker. Producers send messages to exchanges on the broker. The broker maps exchanges to queues by applying filters by queue name or other attributes. Consuming services register themselves and receive messages whenever a new delivery is sent to a queue.

This allows us to decouple our microservice architecture into a set of independent services, not knowing each other. This has several benefits compared to synchronous HTTP requests.

A synchronous call chain can fail because of timeouts, not responding services or other failures.

In contrast, the message-based communication is more robust as messages are sent to the broker where they reside until a consumer service registers. Thereby, the asynchronous delivery still suceeds even if a microservice is currently down or not processing any requests at the moment.

What if we need to call a REST endpoint on a remote server and wait for the result? This pattern is commonly known as Remote Procedure Call or RPC.

in the following I would like to show how we have solved various challenges. We will see

  • how to redirect synchronous HTTP requests to asynchronous message-based communication,
  • how to wait for a response from a RPC call and
  • how to reply to the client.

RPC: Proxy HTTP to Message-based Communication

Typically, we receive HTTP requests from a frontend or REST client (for testing purposes). As usual, a Gateway Microservice represents the interface to the outside world. All other microservices are initially only accessible internally.

However, since the internal communication between the microservices is message-based via the broker, the gateway service must redirect the HTTP-based requests to this technology. Finally, it receives a response to the initial request and sends it back to the client.

Our Remote procedure call (RPC) will work like this:

  • When the gateway starts up, it creates a callback queue named, here “gateway_rpc_response”.
  • For an RPC request, the gateway sends a message with two properties: reply_to, which is set to the callback queue and correlation_id, which is set to a unique value for every request.
  • The request is sent to the queue as defined in the proxy mapping.
  • Some other microservices is waiting for requests on that queue. When a request appears, it processes it and sends a message with the result back, using the queue from the reply_to field.
  • The gateway waits for deliveries on the callback queue. When a message appears, it checks the correlation_id property. If it matches the value from the request it returns the response to the client.

Let’s look at this RPC communication in detail. The following code snippets are in Golang sometimes using the amqp package from github.com/streadway/amqp.

Remote Procedure Call with RabbitMQ

Initially, the gateway receives the HTTP request from the client. Then, we verify the validity of the user’s token (actually, this is done by a seperate Access-Control microservice).

Internally, the HTTP routes are mapped to queues by their names. A router middleware handles the proxy functionality. Here, requests to the GET orders endpoints are redirected to the queue “orders.getAll”.

v1.GET("/orders", proxy("orders.getAll"), requireToken)

First, we ensure that the target queue exists on the broker.

queue, err := c.Channel.QueueDeclare("orders.getAll", durable, autoDelete, exclusive, noWait, nil)  

Second, we map all HTTP headers, parameters and the body to amqp headers, properties and payload.

Third, the message with this data is published to the broker with a unique Correlation ID and a ReplyTo parameter.

uuid := CorrelationID(uuid.New().String()) # uuid v4

In order to receive a response, the gateway needs to send a ‘callback’ queue address (ReplyTo) with the request.

err := e.Publish(exchange, routingKey, payload, broker.Publishing{
   ReplyTo:       rpc.queue,
   CorrelationID: string(corrID),
   Header:        header,
})

Additionally, we create an entry with this Correlation ID in a RPC registry and wait in a separate thread (channel) for a response with this ID.

message, err := rpc.awaitDelivery(ctx, corrID)

func (c *controller) awaitDelivery(ctx) {
go c.awaitWithTimeout(ctx, deliveryChan, responseChan, errChan)

select {
case <-ctx.Done():
    return nil, errors.BadGateway(errTimeout)
case response := <-responseChan:
    return response, nil
case err := <-errChan:
    return nil, err
}
}

In our case, all microservices can send responses to the gateway on a predefined queue called “gateway.rpc.response”.

var (
  durable    = true
  autoDelete = false
  noWait     = false
exclusive = false
name = "gateway.rpc.response"
)

queue, err := c.Channel.QueueDeclare(name, durable, autoDelete, exclusive, noWait, nil)   

Whenever the gateway receives a message from this queue from the broker, we first verify that its Correlation ID is known (exists in the registry). If we receive an unknown correlation id, we may safely discard the message – it doesn’t belong to our requests. Otherwise, we can match a request with a response and acknowledge its delivery to the broker.

By removing entries from the registry for received replies, we also prevent duplicate processing of multiple delivered messages.

Additionally, the waiting is provided with a timeout so that the client only has to wait a predefined maximum time for a response. In case of a timeout, a BadGateway error is returned. This is similar to standard HTTP communication.

Finally, the response is processed and a reply is sent to the client with an HTTP status code and body in JSON as contained in the response.

response, err := events.RPC(exchange, routingKey, body, header)
if err != nil {
   return errors.InternalServer(err)
}

return ctx.JSON(response.Status(), response.Body())

The benefit of this setup is that the gateway only has to create a single callback queue independent of the number of queues or microservices.