Elegant Architecture

Architecture, Functional Languages, Style

RabbitMQ and F# - Part 3

I now have a functioning RabbitMQ Library! Though, there is a lot more to be done to make it satisfactory.

There are two problems:

  1. Missing the Queue Consumer functionality. This makes it a lot easier to deal with RabbitMQ so I definitely want to get this in.
  2. I’m not happy with using the record type to capture the Read and Publish functions for a queue. After all, how often are you going to be writing to and reading from a specific queue in the same process?

I will start by fixing #2, as that is bothering me the most. The design, at present, has this flow:

  1. Connect to a RabbitMQ server
  2. Open a channel
  3. Request a connection to a queue
  4. Receive a function for writing to the queue and a function for reading from the queue

The first question I have is: how often are you going to be reading from and writing to the same queue in the same code? Probably not very often. Which means that, most of the time, only half of what I am returning is useful. We can simplify this. Rather than build the Read and Write functions for you, they should be built only when you need them.

I will change the design so that rather than opening a queue and getting back two functions. There will be two functions which can write or read to any queue.

My next design approach will be to update the function which creates a channel to now also return two functions: one function will be for using the channel to write to a queue, the function is for using the channel to read from a queue. Now if you want to setup a publishing function to “MyQueue” you use the returned write function and partial application to build the writeToMyQueue function.

I’ve modified my readFromQueue function so that it now takes a channel and a queue name and returns a function which will read one message from the queue:

1
2
3
4
5
6
7
8
9
10
11
let readFromQueue (channel:IModel) queueName =
    declareQueue channel queueName |> ignore

    fun () ->
        let ea = channel.BasicGet(queueName, true)
        if ea <> null then
            let body = ea.Body
            let message = Encoding.UTF8.GetString(body)
            Some message
        else
            None

Then I made this new function:

1
2
let createQueueFuntions channel =
    (readFromQueue channel, publishToQueue channel)

Which will take a channel and return two functions. One for writing to a specific queue and one for reading from a specific queue. You can then use these two functions to connect to queues as you need. Once a developer reaches this point, all he or she needs to think about are: queue names, do I want to read from this queue, and do I want to write to this queue.

For example, in my receiver the code is now:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[<EntryPoint>]
let main argv =
    let connection = openConnection "localhost"
    let myChannel = openChannel connection
    let (readFrom,_) = createQueueFuntions myChannel

    let readFromHelloQueue = readFrom "hello"

    while true do
        let message = readFromHelloQueue()
        match message with
        | Some(s) -> printfn "%s" s
        | _ -> ()

    0 // return an integer exit code

I don’t care about writing to queues at all, so I completely ignore the write function which createQueueFunctions returns.

The Sender code now looks like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[<EntryPoint>]
let main argv =
    let connection = openConnection "localhost"
    let channel = openChannel connection
    let (_,writeTo) = createQueueFuntions channel

    let writeToHelloQueue = writeTo "hello"

    let mutable i = 0
    while true do
        i <- i + 1
        let message = sprintf "%d,test" ((i + 1) % 10)  // send a message with a number from 0 to 9 along with some text
        printfn "Sending: %s" message
        message |> writeToHelloQueue
        System.Threading.Thread.Sleep(1000)

    0 // return an integer exit code

I find that this design is a lot better. Functions for reading and writing are only created when you need to read or write to a queue. The channel object can now safely be ignored after the initial setup, instead everything boils down to: what do I want to do with this queue. When you write with this framework, you now are no longer concerned with what objects you have at hand and what you can do with those objects; you are just concerned with what you want to do.

The complete source as of this point:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
namespace RabbitMQ.FSharp

open RabbitMQ.Client
open RabbitMQ.Client.Events
open System.Text

module Client =
    type Queue = { Name: string; Read: unit -> string option; Publish: string -> unit }

    let openConnection address =
        let factory = new ConnectionFactory(HostName = address)
        factory.CreateConnection()

    // I need to declare the type for connection because F# can't infer types on classes
    let openChannel (connection:IConnection) = connection.CreateModel()

    let declareQueue (channel:IModel) queueName = channel.QueueDeclare( queueName, false, false, false, null )

    let readFromQueue (channel:IModel) queueName =
        declareQueue channel queueName |> ignore

        fun () ->
            let ea = channel.BasicGet(queueName, true)
            if ea <> null then
                let body = ea.Body
                let message = Encoding.UTF8.GetString(body)
                Some message
            else
                None

    let publishToQueue (channel:IModel) queueName (message:string) =
        declareQueue channel queueName |> ignore
        let body = Encoding.UTF8.GetBytes(message)
        channel.BasicPublish("", queueName, null, body)

    let createQueueFuntions channel =
        (readFromQueue channel, publishToQueue channel)

    let connectToQueue connection (channel:IModel) queueName =
        declareQueue channel queueName |> ignore

        {Name = queueName;
        Read = (fun () ->
                        let ea = channel.BasicGet(queueName, true)
                        if ea <> null then
                            let body = ea.Body
                            let message = Encoding.UTF8.GetString(body)
                            Some message
                        else
                            None);
        Publish = (publishToQueue channel queueName)}