Creating a simple Message Bus: Episode 1

Creating a simple Message Bus: Episode 1

In my perpetual quest to be a better engineer and to understand tools/architectures better, I decided to start building stuff.

Build a message bus, database, reverse proxy… etc.
Whatever. Just build something I’m interested in learning more of.

To not think about this as a huge task, I decided to commit myself to build stuff in the simplest way possible. No fancy shenanigans.

Start small and simple. Add more along the way.

I’ll be working with Go, not because I’m a Go expert, but because I like it and I feel like it helps with my productivity. I’ll probably be learning more about Go along the way. Two birds with one stone kind of thing.

I also want to point out, that I write a post after I’m done with a portion of the code, sort of journaling my way through it.

which means that, the code could be incomplete or does not work (as I’m writing this, I’m thinking but that’s what tests are for, but lets leave the tests for some other time). And it also means I’ll probably be jumping between files a lot.

I wanted to start with a message bus.

Let’s define it and start this series of posts by creating the project structure and maybe a bit more.

A message bus, is a messaging system that allows different systems to communicate with each other via sending and receiving of these messages.

So this message bus, is a system (also called the broker) that allows senders (also called producers) to send messages (just data, it could contain anything) to receivers (also called consumers).

In other words,

A producer prepares a messages, points it to the broker and says “here, deliver this message please” to this destination
The broker gets the message and delivers it to one or more consumers that are subscribing to said destination.


(image source: here)

Project layout

So we have three actors: a broker, consumer and producer.

Let’s start by creating an empty project structure. I’ll call the go module mbus. Short and nice.

# Create the dir and cd into it
mkdir mbus
cd mbus

# Create the go module
go mod init mbus

# Create the project layout
mkdir cmd internal build
mkdir cmd/{broker,producer,consumer}
mkdir internal/{broker,producer,consumer}

Our base project layout is created.

To make our lives easier, let’s create a very simple Makefile

all: clean build

.PHONY: build
build:
go build -o build/broker cmd/broker/broker.go
go build -o build/producer cmd/producer/producer.go
go build -o build/consumer cmd/consumer/consumer.go

.PHONY: clean
clean:
rm -f build/broker
rm -f build/consumer
rm -f build/producer

So running make in the command line will rebuild our project. You could use something like gowatch, but again I’m keeping it simple.

Message structure

Let’s define what “message” is in our application.

It needs to have some data, it could be json, it could be Base64 encoded image… we don’t know and we don’t care.
It needs to have some sort of destination name, for us to know where to send it to. In the “message bus” world, it’s often called a “topic” or a “routing key” if you want to sound like a real nerd. I like “routing key” but let’s use topic since it’s shorter.

The message will be our contract between all parties, so let’s call it apiv1 and put it inside internal, like so

mkdir internal/apiv1
touch internal/apiv1/message.go
// internal/apiv1/message.go

package apiv1

type Message struct {
Data []byte
Len int
Topic string
}

func NewMessage(topic string, data []byte) *Message {
return &Message{
Data: data,
Len: len(data),
Topic: topic,
}
}

Nice and simple.

The Len field is something we might not use, but when dealing with slices it’s always a good idea to keep the length of it around. We’ll see, if we don’t need it we can just remove it later on.

Now, let’s create the “producer” part of the app and call it a day.

Producing messages

If you remember from our intro, a producer is very simple: it has a message and a topic, and it just sends them off to a broker.

Knowing that, let’s create a command line app that will accept a host and port pair to point it to the broker, a topic and a message.

// cmd/producer/producer.go
package main

import (
“flag”
“log”

“mbus/internal/producer”
)

var (
brokerHost string
brokerPort string
topic string
message string
)

func main() {
parseFlags()

client := producer.NewClient(brokerHost, brokerPort)

err := client.Publish(topic, message)
if err != nil {
log.Fatalf(err.Error())
}
}

func parseFlags() {
flag.StringVar(&brokerHost, “host”, “127.0.0.1”, “Broker host”)
flag.StringVar(&brokerPort, “port”, “9990”, “Broker port”)
flag.StringVar(&topic, “topic”, “”, “Topic to produce the message for”)
flag.StringVar(&message, “message”, “”, “The message contents”)

flag.Parse()

if topic == “” {
log.Fatalf(“please provide a topic”)
}

if message == “” {
log.Fatalf(“please provide a message to be sent”)
}
}

Parsing flags to get command line arguments,
Creating a client by using mbus/internal/producer package (which we’ll create after this)
Publishing the message to the topic, using the client.

The interesting stuff is at internal/producer/producer.go which we’ll create in a minute, first I want to show you what a Producer looks like.

// internal/producer/producer.go
type Producer struct {
host string
port string

conn net.Conn

encoder encoder.Encoder
}

The first two fields are there to know where the broker is in our network.
The second field, represents the TCP connection to the broker.
The next one is the encoder. More on this bellow.

In order for us to send a Message object down the wire, we need to properly encode it to binary. We have a bunch of options in Go, but I’ll go with msgpack. (Offical Website)

The encoder.Encoder is an interface so we can swap out the msgpack implementation with another one.

I’m used to a lot of OOP so that encoded is embedded inside the publisher (composition), but I realize that maybe that’s not the best way to things all the time.

But it works for now, so let’s leave it be.

// Creating a shared folder for all shared things
mkdir -p internal/shared/encoder

The Encoder interface is pretty simple:

// internal/shared/encoder/encoder.go

package encoder

import “mbus/internal/apiv1”

type Encoder interface {
Encode(*apiv1.Message) ([]byte, error)
}

Let’s create a msgpack encoder, but first let’s install the msgpack package:

go get -u github.com/vmihailenco/msgpack
// internal/shared/encoder/msgpack.go
package encoder

import (
“mbus/internal/apiv1”

“github.com/vmihailenco/msgpack”
)

type MsgpackEncoder struct {
}

func (e *MsgpackEncoder) Encode(msg *apiv1.Message) ([]byte, error) {
data, err := msgpack.Marshal(msg)
if err != nil {
return nil, err
}

return data, nil
}

Pretty simple stuff.

Now let’s get back to our producer by creating a constructor method:

// internal/producer/producer.go

func New(host, port string) *Producer {
return &Producer{
host: host,
port: port,
conn: nil,
encoder: &encoder.MsgpackEncoder{},
}
}

Here, we create a new Publisher and use MsgpackEncoder for decoding.

Now, let’s add a method to the Publisher so we can start publishing messages:

// internal/producer/producer.go
func (c *Producer) Publish(topic, message string) error {
err := c.connect()
if err != nil {
return err
}

msg := apiv1.NewMessage(topic, []byte(message))

data, err := c.encoder.Encode(msg)
if err != nil {
return err
}

n, err := c.conn.Write(data)
if err != nil {
return err
}

if n != len(data) {
return errors.New(“could not write all data”)
}

return nil
}

func (c *Producer) connect() error {
conn, err := net.Dial(“tcp”, net.JoinHostPort(c.host, c.port))
if err != nil {
return nil
}

c.conn = conn
return nil
}

Again very simple.

We connect to the broker, create a Message object, encode it, and send it to the broker using the connection established.

That’s it. Producer part done. I told you the producer is the easiest one.

Next one will be the broker.

But first, let’s at least manually test (since we don’t have unit tests, lazy me) that our producer is actually sending stuff somewhere.

For that, we can use netcat for this. Run this command in another terminal:

nc -l -p 9990 -t 127.0.0.1

This will tell netcat (nc) to listen for TCP connections, on 127.0.0.1 port 9990. Kind of like a temporary test double for our broker 😁

Now, let’s compile our app and run the producer:

make
./build/producer -topic sales -message hey

You should see something printed on the terminal where you ran nc

Done, test coverage 100%.

Jokes aside, we’ll probably add tests in another episode.

But for now, we’ll call it a day.

Like I said at the start of this post, I’m just starting out with this so I don’t really know where I’m going with this, and that’s part of the fun. But it also means you could find the code not working or incomplete.

In any way, if you find a mistake or have some feedback, I’d love to hear it.

until then, see you in another episode!