Pub/Sub and Real-Time Feed

In the WebSockets chapter we used nova_pubsub to broadcast comments. Now let's dive deeper into Nova's pub/sub system and build a real-time feed for our blog — live notifications when posts are published and comments are added.

How nova_pubsub works

Nova's pub/sub is built on OTP's pg module (process groups). It starts automatically with Nova — no configuration needed. Any Erlang process can join channels, and messages are delivered to all members.

%% Join a channel
nova_pubsub:join(channel_name).

%% Leave a channel
nova_pubsub:leave(channel_name).

%% Broadcast to all members on all nodes
nova_pubsub:broadcast(channel_name, Topic, Payload).

%% Broadcast to members on the local node only
nova_pubsub:local_broadcast(channel_name, Topic, Payload).

%% Get all members of a channel
nova_pubsub:get_members(channel_name).

%% Get members on the local node
nova_pubsub:get_local_members(channel_name).

Channels are atoms. Topics can be lists or binaries. Payloads can be anything.

Message format

When a process receives a pub/sub message, it arrives as:

{nova_pubsub, Channel, SenderPid, Topic, Payload}

In a gen_server, handle this in handle_info/2. In a WebSocket handler, use websocket_info/2.

Building the real-time feed

Notification WebSocket handler

Create src/controllers/blog_feed_handler.erl:

-module(blog_feed_handler).
-behaviour(nova_websocket).

-export([
         init/1,
         websocket_handle/2,
         websocket_info/2
        ]).

init(State) ->
    nova_pubsub:join(posts),
    nova_pubsub:join(comments),
    {ok, State}.

websocket_handle({text, <<"ping">>}, State) ->
    {reply, {text, <<"pong">>}, State};
websocket_handle(_Frame, State) ->
    {ok, State}.

websocket_info({nova_pubsub, Channel, _Sender, Topic, Payload}, State) ->
    Msg = thoas:encode(#{
        channel => Channel,
        event => list_to_binary(Topic),
        data => Payload
    }),
    {reply, {text, Msg}, State};
websocket_info(_Info, State) ->
    {ok, State}.

On connect, the handler joins both the posts and comments channels. Any pub/sub message is encoded as JSON and forwarded to the client.

Broadcasting from controllers

Update the posts controller to broadcast on changes:

create(#{params := Params}) ->
    CS = post:changeset(#{}, Params),
    case blog_repo:insert(CS) of
        {ok, Post} ->
            nova_pubsub:broadcast(posts, "post_created", post_to_json(Post)),
            {json, 201, #{}, post_to_json(Post)};
        {error, #kura_changeset{} = CS1} ->
            {json, 422, #{}, #{errors => changeset_errors_to_json(CS1)}}
    end;

Do the same for updates and deletes:

%% After a successful update:
nova_pubsub:broadcast(posts, "post_updated", post_to_json(Updated)),

%% After a successful delete:
nova_pubsub:broadcast(posts, "post_deleted", #{id => binary_to_integer(Id)}),

And for comments:

%% After creating a comment:
nova_pubsub:broadcast(comments, "comment_created", comment_to_json(Comment)),

Adding the route

{"/feed", blog_feed_handler, #{protocol => ws}}

Client-side JavaScript

const ws = new WebSocket("ws://localhost:8080/feed");

ws.onmessage = (event) => {
    const msg = JSON.parse(event.data);
    console.log(`[${msg.channel}] ${msg.event}:`, msg.data);

    switch (msg.event) {
        case "post_created":
            // Add the new post to the feed
            break;
        case "post_updated":
            // Update the post in the feed
            break;
        case "post_deleted":
            // Remove the post from the feed
            break;
        case "comment_created":
            // Append the new comment
            break;
    }
};

// Keep-alive
setInterval(() => ws.send("ping"), 30000);

Per-post comment feeds

For a live comment section on a specific post, use dynamic channel names:

-module(blog_post_comments_handler).
-behaviour(nova_websocket).

-export([init/1, websocket_handle/2, websocket_info/2]).

init(#{bindings := #{<<"post_id">> := PostId}} = State) ->
    Channel = list_to_atom("post_comments_" ++ binary_to_list(PostId)),
    nova_pubsub:join(Channel),
    {ok, State#{channel => Channel}};
init(State) ->
    {ok, State}.

websocket_handle(_Frame, State) ->
    {ok, State}.

websocket_info({nova_pubsub, _Channel, _Sender, _Topic, Payload}, State) ->
    {reply, {text, thoas:encode(Payload)}, State};
websocket_info(_Info, State) ->
    {ok, State}.

Route:

{"/posts/:post_id/comments/ws", blog_post_comments_handler, #{protocol => ws}}

When creating a comment, broadcast to the post-specific channel:

Channel = list_to_atom("post_comments_" ++ integer_to_list(PostId)),
nova_pubsub:broadcast(Channel, "new_comment", comment_to_json(Comment)).

Using pub/sub in gen_servers

Any Erlang process can join a channel. This is useful for background workers like search indexing:

-module(blog_search_indexer).
-behaviour(gen_server).

-export([start_link/0]).
-export([init/1, handle_info/2, handle_cast/2, handle_call/3]).

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

init([]) ->
    nova_pubsub:join(posts),
    {ok, #{}}.

handle_info({nova_pubsub, posts, _Sender, "post_created", Post}, State) ->
    logger:info("Indexing new post: ~p", [maps:get(title, Post)]),
    %% Add to search index
    {noreply, State};
handle_info({nova_pubsub, posts, _Sender, "post_deleted", #{id := Id}}, State) ->
    logger:info("Removing post ~p from index", [Id]),
    %% Remove from search index
    {noreply, State};
handle_info(_Info, State) ->
    {noreply, State}.

handle_cast(_Msg, State) ->
    {noreply, State}.

handle_call(_Req, _From, State) ->
    {reply, ok, State}.

Add it to your supervisor to start automatically.

Distributed pub/sub

nova_pubsub works across Erlang nodes. If you have multiple instances connected in a cluster, broadcast/3 delivers to all members on all nodes.

For local-only messaging (e.g., clearing a local cache):

nova_pubsub:local_broadcast(posts, "cache_invalidated", #{id => PostId}).

Organizing channels and topics

%% Different channels for different domains
nova_pubsub:join(posts).
nova_pubsub:join(comments).
nova_pubsub:join(users).
nova_pubsub:join(system).

%% Topics within channels for filtering
nova_pubsub:broadcast(posts, "created", Post).
nova_pubsub:broadcast(posts, "published", Post).
nova_pubsub:broadcast(comments, "created", Comment).
nova_pubsub:broadcast(users, "logged_in", #{username => User}).
nova_pubsub:broadcast(system, "deploy", #{version => <<"1.2.0">>}).

Processes can join multiple channels and pattern match on channel and topic in their handlers.


Next, let's look at transactions, multi, and bulk operations for atomic and efficient data operations.