Elegant Architecture

Architecture, Functional Languages, Style

RabbitMQ and F# - Part 2

Notes about RabbitMQ: - You should have one connection per application and one channel per thread (http://stackoverflow.com/a/10501593)

I was able to build and run my sender and receiver, both using my client.

However, something odd happened. The receiver was only printing out every other message which the Sender sent.

Here’s the secret of what’s happening:

OH SNAP! There are two consumers on the queue and RabbitMQ is splitting the messages evenly between the two consumers.

My suspicion is that the Sender is also opening up a consumer. I can verify this easily by starting only the Sender and looking at the RaabbitMQ console:

Sure enough, there’s one consumer! So the Sender is also opening up a consumer and reading messages from the queue.

The culprit is almost certainly this bit of code:

1
2
3
4
5
6
let consumer = new QueueingBasicConsumer(channel)
channel.BasicConsume(queueName, true, consumer) |> ignore

{Name = queueName;
Read = (fun () -> readFromQueue consumer queueName);
Publish = (publishToQueue channel queueName)}

I create a consumer and attach it to the queue every time a request is made to open a queue. My assumption had been that a message would only be read from the queue when consumer.Queue.Dequeue() was called. This is a fairly obvious error, in hindsight. Reading the documentation further, I see that the consumer sets up a subscription to a queue and messages are automatically read; making this a push pattern. To do a pull pattern, I would just use BasicGet on the queue.

A basic get example, in C#:

1
BasicGetResult result = channel.BasicGet(queueName, noAck);

I do want to have subscriptions and for this to be useful in my future projects. However, for now my goal is to get a simple functioning library. So I will switch my code over to use the basic get.

1
2
3
4
5
6
7
8
9
10
11
12
13
let connectToQueue connection (channel:IModel) queueName =
    declareQueue channel queueName |> ignore

    {Name = queueName;
    Read = (fun () ->
                    let ea = channel.BasicGet(queueName, true)
                    if ea <> null then
                        let body = ea.Body
                        let message = Encoding.UTF8.GetString(body)
                        message
                    else
                        "");
    Publish = (publishToQueue channel queueName)}

The Read function now does a BasicGet and decodes the message.

The result:

No more extra consumer!

I really don’t like the part where I return “” if there is nothing in the queue. There’s already a great way of handling that in F#. So I’ll change the Read function to return a string option, which will change my code to:

1
2
3
4
5
6
7
8
Read = (fun () ->
                let ea = channel.BasicGet(queueName, true)
                if ea <> null then
                    let body = ea.Body
                    let message = Encoding.UTF8.GetString(body)
                    Some message
                else
                    None);

This is good, because it will force developers using this function to deal with both the case where a message is on the queue and the case where there is no message on the queue.

Here’s the current complete code for my simple F# library:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
namespace RabbitMQ.FSharp

open RabbitMQ.Client
open RabbitMQ.Client.Events
open System.Text

module Client =
    type Queue = { Name: string; Read: unit -> string; Publish: string -> unit }

    let openConnection address =
        let factory = new ConnectionFactory(HostName = address)
        factory.CreateConnection()

    // I need to declare the type for connection because F# can't infer types on classes
    let openChannel (connection:IConnection) = connection.CreateModel()

    let declareQueue (channel:IModel) queueName = channel.QueueDeclare( queueName, false, false, false, null )

    let readFromQueue (consumer:QueueingBasicConsumer) queueName =
        let ea = consumer.Queue.Dequeue()
        let body = ea.Body
        let message = Encoding.UTF8.GetString(body)
        message

    let publishToQueue (channel:IModel) queueName (message:string) =
        let body = Encoding.UTF8.GetBytes(message)
        channel.BasicPublish("", queueName, null, body)

    // I don't have to declare the type of connection, because F# can infer the type from my call to openChannel
    let connectToQueue connection (channel:IModel) queueName =
        declareQueue channel queueName |> ignore

        {Name = queueName;
        Read = (fun () ->
                        let ea = channel.BasicGet(queueName, true)
                        if ea <> null then
                            let body = ea.Body
                            let message = Encoding.UTF8.GetString(body)
                            message
                        else
                            "");
        Publish = (publishToQueue channel queueName)}