Phoenix PubSub

Using Phoenix.PubSub to send messages across processes

You have a message

Coming from a Ruby background, I have always held Sandi Metz and her books in high regard. In the OOP world we tend to solve problems by trying to model our business domain using objects. Sandi keeps reminding us that objects are important but what is even more important is the messages they send to each other.

You don't send messages because you have objects, you have objects because you send messages.

-- Sandi Metz, Practical Object-Oriented Design in Ruby: An Agile Primer

Messages are the cornerstone of communication between Elixir processes too. Processes send messages to other processes' mailboxes. Even though Elixir is functional, there is an interesting parallel between the two paradigmes. In both it is the messages that fundamentally drive the design of our systems.

In its most basic form, we send messages to Elixir processes by providing the PID of the receiving process and the message:

send a_pid, "a message"

It might be as simple as it gets, but it implies that:

Let's assume the points above don't hold true. We want to deliver a message to an unknown number of recipients. Moreover, we don't know who the recipients are going to be. What now?

Meet PubSub

PubSub, short for publish-subscribe, is a messaging pattern in which instead of sending messages directly to receivers we publish them to a topic. Receivers, on the other hand, subscribe to topics they are interested in and get notified whenever a new message has been published.

PubSub has some serious advantages. The producer of the message doesn't need to know how many subscribers there are and how to reach them. On the other hand, subscribers can be easily added and removed without the need for the producer to change.

We can easily implement this style of communication between Elixir processes with Phoenix.PubSub.

Although it has "Phoenix" in its name, Phoenix.PubSub is a stand-alone library and can be used in plain Elixir applications. Phoenix is mentioned due to the fact that the library is an integral part of the framework -- PubSub powers channels, one of Phoenix's killer features.

Enough theory though, let's see PubSub in action.

Fridges are getting smarter

We'll pretend we are writing code for a smart fridge capable of tracking what items are being put in or removed from it. The imaginary fridge will in fact be a simple module producing events into a PubSub topic which will, in turn, be consumed by some subscribers. To start, let's generate a new mix project. The project is going to be an application as we'll need a supervisor to supervise our PubSub process.

$ mix new pub_sub_demo --sup
$ cd pub_sub_demo

Add Phoenix.PubSub to mix.exs:

# mix.exs

defp deps do
  [
    {:phoenix_pubsub, "~> 1.0.2"}
  ]
end

And fetch the dependencies:

$ mix deps.get

There is one last piece of setup that we need to do -- PubSub needs to be put in the application's supervision tree. Open up application.ex and add the following:

# lib/pub_sub_demo/application.ex

children = [
  %{
    id: Phoenix.PubSub.PG2,
    start: {Phoenix.PubSub.PG2, :start_link, [:fridge, []]}
  }
]

Here we add a new child spec to the main application's supervisor telling it to start Phoenix.PubSub.PG2, a PubSub adapter built on top of pg2 (pg2 is an Erlang module implementing distributed named process groups and also an example of how Elixir benefits from the tried and tested Erlang's ecosystem). The adapter is started with two arguments, the first one is the name of the process, which in our case is going to be :fridge. The second argument is a keyword list of options which we are leaving empty here.

With these in place our application now has a working PubSub instance. It is not very useful without any messages though. Let's rectify this.

Publisher and subscriber

Create the following module in lib/pub_sub_demo/fridge.ex:

# lib/pub_sub_demo/fridge.ex

defmodule PubSubDemo.Fridge do
  @pubsub_name :fridge
  @pubsub_topic "fridge_updates"

  def take(product, quantity) when is_binary(product) and is_integer(quantity) do
    Phoenix.PubSub.broadcast(@pubsub_name, @pubsub_topic, {:take, product, quantity})
  end

  def return(product, quantity) when is_binary(product) and is_integer(quantity) do
    Phoenix.PubSub.broadcast(@pubsub_name, @pubsub_topic, {:return, product, quantity})
  end
end

Here we create a module which simulates taking things out of the fridge and putting them back in. It implements two functions which broadcast messages to the PubSub process we started in application.ex above. All messages are published to fridge_updates topic.

We are able to publish messages to a PubSub topic now but don't have a subscriber listening for these messages yet. We'll create a shopping list process which will listen for messages and update its own state whenever an item has been removed from or returned to the fridge.

# lib/pub_sub_demo/shopping_list.ex

defmodule PubSubDemo.ShoppingList do
  use GenServer

  def start_link() do
    GenServer.start_link(__MODULE__, nil, name: __MODULE__)
  end

  def get() do
    GenServer.call(__MODULE__, :get)
  end

  def init(_) do
    Phoenix.PubSub.subscribe(:fridge, "fridge_updates")
    {:ok, %{}}
  end 

  def handle_call(:get, _, state) do
    {:reply, state, state}
  end

  def handle_info({:take, product, quantity}, state) do
    IO.puts("Adding #{product} (#{quantity}) to shopping list")

    updated_state = state
      |> Map.update(product, quantity, &(&1 + quantity))

    {:noreply, updated_state}
  end
  
  def handle_info({:return, product, quantity}, state) do
    IO.puts("Removing #{product} (#{quantity}) from shopping list")

    updated_state = state
      |> Map.update(product, 0, &(&1 - quantity))
      |> Enum.reject(fn({_, v}) -> v <= 0 end)
      |> Map.new

    {:noreply, updated_state}
  end
end

Here, the ShoppingList is a GenServer process which subscribes to the fridge_updates topic in its init method. PubSub will send messages to the process any time a new message is published. We handle these messages in two clauses of the handle_info function -- one is responsible for adding to the shopping list when a product is removed from the fridge, the other for removing items from the shopping list when something is put back. These clauses will update the shopping list and print corresponding messages to the console.

Now that we have a product and a subscriber we can test them in IEx.

$ iex -S mix

Let's start the shopping list process first and verify it starts empty:

iex(1)> PubSubDemo.ShoppingList.start_link()
iex(2)> PubSubDemo.ShoppingList.get()
%{}

We'll now take a couple of products out of the fridge:

iex(3)> PubSubDemo.Fridge.take("eggs", 4)
Adding eggs (4) to shopping list
:ok
iex(4)> PubSubDemo.Fridge.take("tomatoes", 2)
Adding tomatoes (2) to shopping list
:ok
iex(5)> PubSubDemo.ShoppingList.get()
%{"eggs" => 4, "tomatoes" => 2}
iex(6)> PubSubDemo.Fridge.return("eggs", 1) # Oops, too many eggs
Removing eggs (1) from shopping list
:ok
iex(7)> PubSubDemo.ShoppingList.get()
%{"eggs" => 3, "tomatoes" => 2}

And we can see PubSub works as expected. The fridge does not even know the shopping list exists, whereas the list picks up all messages published by the fridge and updates its own state accordingly.

We can easily imagine other processes subscribing to the updates that the fridge produces such as a calorie counter or a process counting the number of times the fridge has been opened. The point is, any of these services can access the fridge data by subscribing to a PubSub topic. Most importantly, the fridge code doesn't need to change at all!

Going distributed

Whilst the example above gives us an idea of how Phoenix.PubSub works, it doesn't touch on one of its greatest features. When discussing Phoenix.PubSub.PG2 we did mentioned that pg2 is an Erlang module for distributed processes. Therefore, we can slightly modify the previous example to see that PubSub can even work across separate nodes!

We are going to start the shopping list as we did before. Notice, however, that this time we are going to use an additional --sname flag when starting IEx. The flag assigns a name to the node so that it can then be referenced from elsewhere.

# Terminal 1

$ iex --sname node1@localhost -S mix
iex(node1@localhost)1> PubSubDemo.ShoppingList.start_link()
iex(node1@localhost)2> PubSubDemo.ShoppingList.get()
%{}

We have got a node with an empty shopping list subscribed to the fridge_updates channel. Let's open another terminal session now and start IEx there too. This will start a separate node, which is where first we'll connect to the node we have created in the other terminal session and then we we'll remove an item from the fridge:

# Terminal 2

$ iex --sname node2@localhost -S mix
iex(node2@locahost)1> Node.connect(:node1@localhost)
iex(node2@locahost)2> PubSubDemo.Fridge.take("eggplant", 1)
# Terminal 1

Adding eggplant (1) to shopping list
iex(node1@localhost)3> PubSubDemo.ShoppingList.get()
%{"eggplant" => 1}

Looking back at the first terminal window we can see that the shopping list process received the message about an eggplant being removed from the fridge. Notice how the message was broadcast across two different nodes. In our examples both nodes are located on the same machine but, with some modifications to the way IEx is started, they might as well be located on two different continents!

Redis adapter

Cross-node communication is easy with Phoenix.PubSub.PG2 but it requires the ability for nodes to talk directly to each other. This is not always possible. Heroku is a popular hosting solution with good support for Elixir but by default it firewalls dynos from each other making direct node-to-node communication impossible. They do offer a service called Private Spaces which removes this limitation but it is expensive.

Fortunately, Phoenix.PubSub can be used with a Redis adapter which uses Redis instead of pg2 and does not require direct node-to-node access. Nodes can be firewalled from each other and all will work fine as long as they have access to the same Redis instance.

We'll demonstrate this by going back to our fridge example and replacing the pg2 adapter with a Redis adapter. First update your mix.exs file as follows:

# mix.exs

defp deps do
  [
    {:phoenix_pubsub, "~> 1.0.2"},
    {:phoenix_pubsub_redis, "~> 2.1.0"},
  ]
  end

Here we add the Redis adapter. It is not a part of the main Phoenix.PubSub library and needs to be pulled in separately. Remember to run mix deps.get.

Next, let's update application.ex:

# mix.exs

children = [
  %{
    id: Phoenix.PubSub.Redis,
    start: {Phoenix.PubSub.Redis, :start_link, [:fridge, [
      pool_size: 1,
      node_name: "name"
    ]]}
  },
]

Here we replace the pg2 adapter with the Redis adapter. We need to pass two additional options: pool_size and node_name. Typically, we would also specify Redis host and port by passing host and port options but here we are going to run Redis locally so we can let it use the default 127.0.0.1:6379.

Now, make sure Redis is started and open redis-cli in a separate terminal window. Issue the monitor command to see messages flowing into Redis.

$ redis-cli
127.0.0.1:6379> monitor
OK

Now, let's repeat the first example again. We'll take a product out of the fridge and verify that the shopping list has been correctly updated.

$ iex -S mix
iex(1)> PubSubDemo.ShoppingList.start_link()
iex(2)> PubSubDemo.Fridge.take("cheesecake", 2)
Adding cheesecake (2) to shopping list
:ok
iex(3)> PubSubDemo.ShoppingList.get()
%{"cheesecake" => 2}

PubSub functionality works just as it used to. Let's see what happened in Redis.

1531591626.755149 [0 127.0.0.1:51531] "SUBSCRIBE" "phx:fridge_updates"
1531591654.936677 [0 127.0.0.1:51532] "PUBLISH" "phx:fridge_updates" "\x83h\aa\x01m\x00\x00\x00\x18\x15\x1a\xad/+\xde9\x11\x86\x8e\xd1!Y\x89Td\x9b\x9bU\xb9\x0f\bm!d\x00\x03nila\x01d\x00\x04nonem\x00\x00\x00\bshoppingh\x03d\x00\x04takem\x00\x00\x00\ncheesecakea\x02"

Redis received two commands. First one was issued by the adapter when the application was booting up. The second one is the fridge publishing a message about cheesecake. It is not very readable because Redis adapter uses :erlang.term_to_binary/1 behind the scenes but you can see the word 'cheesecake' in there.

We can clearly see that the entire PubSub mechanism works as it used to but messages are passed through Redis now. We could run this example on two firewalled nodes and it would still work.

PubSub and Phoenix

PubSub is an integral part of the Phoenix framework. As mentioned before, it powers Phoenix's channels. It is worth adding that if you are planning to use PubSub with Phoenix you don't need manually put it in the supervision tree as we did in our examples. PubSub adapter is configured in the endpoint instead.

Summary

We have discussed how the publish-subscribe pattern differs from direct process-to-process communication and saw how it can be implemented in Elixir applications with Phoenix.PubSub. Additionally, we have built a couple of simple examples using two different adapters that Phoenix.PubSub supports and touched upon how both adapters let us publish messages not only locally but also across Elixir nodes.

Phoenix.PubSub solves a non-trivial problem of distributed communication and provides an elegant solution wrapped up in an easy to use API. Moreover, with its PG2 adapter it is also a great example of how Elixir leverages its Erlang background to put powerful and battle-tested tools at our disposal. As with all things, PubSub is not a tool for every job but it is definitely one worth knowing about.