a red and white rocket ship flying through the sky

How to use Datastar with Elixir Phoenix

Step 1: Add Datastar

Add the Datastar js file to root layout

<script type="module" src="https://cdn.jsdelivr.net/gh/starfederation/datastar@v1.0.0-beta.2/bundles/datastar.js"></script>

Step 2: Allow Server Sent Events

By default Phoenix would not allow server sent events.
We need to add the mime type in config.exs file

config :mime, :types, %{
  "text/event-stream" => ["sse"]
}

And then add it in our router

plug :accepts, ["html", "sse"]

Step 3: Create SSE helpers to manage Datastar responses

defmodule DatastarElixir do
  @moduledoc """
  An Elixir SDK for Datastar, providing Server-Sent Events (SSE) utilities for Phoenix applications.
  Conforms to https://data-star.dev/reference/sse_events.
  """

  import Plug.Conn
  alias DatastarElixir.Constants

  @allowed_opts [
    :selector, :merge_mode, :settle_duration, :use_view_transition,
    :fragments, :signals, :only_if_missing, :paths, :auto_remove,
    :attributes, :script
  ]

  def merge_fragments(content, opts \\ [use_view_transition: true]) do
    content = content |> Phoenix.HTML.Safe.to_iodata() |> IO.iodata_to_binary() |> String.replace("\n", "")
    rows = "event: #{Constants.event_type_merge_fragments()}"
    message = build_message_from_opts(rows, opts)
    add_data_row(message, :fragments, content)
    |> add_message_break()
  end

  def remove_fragments(selectors, opts \\ []) do
    rows = "event: #{Constants.event_type_remove_fragments()}"
    selectors = List.wrap(selectors) |> Enum.map(&String.replace(&1, "\n", ""))
    message = build_message_from_opts(rows, opts)
    Enum.reduce(selectors, message, fn selector, acc ->
      add_data_row(acc, :selector, selector)
    end)
    |> add_message_break()
  end

  def merge_signals(signals, opts \\ []) do
    signals = signals |> to_string() |> String.replace("\n", "")
    rows = "event: #{Constants.event_type_merge_signals()}"
    message = build_message_from_opts(rows, opts)
    add_data_row(message, :signals, signals)
    |> add_message_break()
  end

  def remove_signals(paths, opts \\ []) do
    rows = "event: #{Constants.event_type_remove_signals()}"
    paths = List.wrap(paths) |> Enum.map(&String.replace(&1, "\n", ""))
    message = build_message_from_opts(rows, opts)
    Enum.reduce(paths, message, fn path, acc ->
      add_data_row(acc, :paths, path)
    end)
    |> add_message_break()
  end

  def execute_script(script, opts \\ []) do
    script = script |> to_string() |> String.replace("\n", " ")
    rows = "event: #{Constants.event_type_execute_script()}"
    message = build_message_from_opts(rows, opts)
    add_data_row(message, :script, script)
    |> add_message_break()
  end

  def toast(content, opts \\ [selector: "#toasts", merge_mode: "append", use_view_transition: true]) do
    content = content |> Phoenix.HTML.Safe.to_iodata() |> IO.iodata_to_binary() |> String.replace("\n", "")
    rows = "event: #{Constants.event_type_merge_fragments()}"
    message = build_message_from_opts(rows, opts)
    add_data_row(message, :fragments, content)
    |> add_message_break()
  end

  def add_data_row(rows, key, value), do: "#{rows}\ndata: #{get_key_name(key)} #{value}"

  defp get_key_name(:selector), do: "selector"
  defp get_key_name(:merge_mode), do: "mergeMode"
  defp get_key_name(:settle_duration), do: "settleDuration"
  defp get_key_name(:use_view_transition), do: "useViewTransition"
  defp get_key_name(:fragments), do: "fragments"
  defp get_key_name(:signals), do: "signals"
  defp get_key_name(:only_if_missing), do: "onlyIfMissing"
  defp get_key_name(:paths), do: "paths"
  defp get_key_name(:auto_remove), do: "autoRemove"
  defp get_key_name(:attributes), do: "attributes"
  defp get_key_name(:script), do: "script"

  def build_message_from_opts(rows, opts) when is_list(opts) do
    Enum.reduce(opts, rows, fn {key, value}, acc ->
      if key in @allowed_opts do
        value = value |> to_string() |> String.replace("\n", "")
        add_data_row(acc, key, value)
      else
        IO.warn("Ignoring invalid option key: #{inspect(key)}")
        acc
      end
    end)
  end

  def add_message_break(message), do: "#{message}\n\n"

  def build_assigns(conn, assigns) do
    Enum.reduce(assigns, conn.assigns, fn {k, v}, a ->
      Map.put(a, k, v)
    end)
  end

  def send_chunk(messages, conn) when is_list(messages) do
    send_compressed_chunk(Enum.join(messages, ""), conn)
  end
  def send_chunk(message, conn) do
    send_compressed_chunk(message, conn)
  end

  def flush(message, conn) do
    conn
    |> prepare_uncompressed_sse()
    |> chunk(message)
    |> handle_chunk_result()
  end

  defp send_compressed_chunk(message, conn) do
    conn
    |> prepare_sse()
    |> chunk(:zlib.gzip(message))
    |> handle_chunk_result()
  end

  defp prepare_sse(conn) do
    if conn.state == :chunked do
      conn
    else
      conn
      |> put_resp_header("content-encoding", "gzip")
      |> common_sse_headers()
      |> send_chunked(200)
    end
  end

  defp prepare_uncompressed_sse(conn) do
    if conn.state == :chunked do
      conn
    else
      common_sse_headers(conn)
      |> send_chunked(200)
    end
  end

  defp common_sse_headers(conn) do
    conn
    |> put_resp_header("content-type", "text/event-stream")
    |> put_resp_header("cache-control", "no-cache")
    |> put_resp_header("connection", "keep-alive")
  end

  defp handle_chunk_result({:ok, conn}), do: {:ok, conn}
  defp handle_chunk_result({:error, reason}), do: {:error, reason, conn}
end

Save this file as myapp_web/sse.ex

And then in the myapp_web.ex file import it in the controllers section so that its available to all the controllers

def controller do
    ...

      import Plug.Conn
      import OtestWeb.SSE

     ...
  end

Step4: Use it in controllers

defmodule OtestWeb.TeacherController do
  use OtestWeb, :controller
  alias OtestWeb.TeacherHTML, as: H

  alias Otest.Accounts
  alias Otest.Accounts.Admin
  alias Phoenix.Component, as: C

  def index(conn, _params) do
    teachers = Accounts.list_teachers()

    {teachers, _} =
      Enum.reduce(teachers, {[], 0}, fn t, {list, index} ->
        index = index + 1
        {list ++ [Map.put(t, :index, index)], index}
      end)

    data_count = length(teachers)
    render(conn, :index, teachers: teachers, data_count: data_count, main_page: "teachers", datastar: true)
  end

  def new(conn, _) do
    teacher = %Admin{}
    changeset = Admin.registration_changeset(teacher, %{})
    form = C.to_form(changeset)

    assigns = %{
      form: form,
      form_action: "/admins/teachers",
      form_method: "post",
      form_title: "Add Teacher"
    }

    assigns = build_assigns(conn, assigns)

    conn
    |> merge_fragments(H.teacher_form(assigns))
  end

  def create(conn, %{"admin" => params}) do
    if conn.assigns.current_admin.role == "SUPERADMIN" do
      case Accounts.register_teacher(params) do
        {:ok, _c} ->
          send_full_page(conn)

        {:error, changeset} ->
          form = C.to_form(changeset)

          assigns = %{
            form: form,
            form_action: "/admins/teachers",
            form_method: "post",
            form_title: "Add Teacher"
          }

          assigns = build_assigns(conn, assigns)

          merge_fragments(conn, H.teacher_form(assigns))
      end
    else
      text(conn, "Unauthorized Attempt")
    end
  end

  def edit(conn, %{"id" => id}) do
    teacher = Accounts.get_admin!(id)
    changeset = Admin.registration_changeset(teacher, %{})
    form = C.to_form(changeset)

    assigns = %{
      form: form,
      form_action: "/admins/teachers/#{teacher.id}",
      form_method: "put",
      form_title: "Edit Teacher"
    }

    assigns = build_assigns(conn, assigns)
    merge_fragments(conn, H.teacher_form(assigns))
  end

  def update(conn, %{"id" => id, "admin" => params}) do
    if conn.assigns.current_admin.role == "SUPERADMIN" do
      teacher = Accounts.get_admin!(id)

      case Accounts.update_teacher(teacher, params) do
        {:ok, _c} ->
          send_full_page(conn)

        {:error, changeset} ->
          form = C.to_form(changeset)

          assigns = %{
            form: form,
            form_action: "/admins/teachers/#{teacher.id}",
            form_method: "put",
            form_title: "Edit Teacher"
          }

          assigns = build_assigns(conn, assigns)
          merge_fragments(conn, H.teacher_form(assigns))
      end
    else
      text(conn, "Unauthorized Attempt")
    end
  end

  def delete(conn, %{"id" => id}) do
    if conn.assigns.current_admin.role == "SUPERADMIN" do
      teacher = Accounts.get_admin!(id)

      case Accounts.delete_teacher(teacher) do
        {:ok, c} ->
          send_full_page(conn)

        _ ->
          {:noreply, conn |> put_flash(:error, "Failed to delete teacher.")}
      end
    else
      {:noreply, conn |> put_flash(:error, "Unauthorized attempt")}
    end
  end

  def send_full_page(conn) do
    teachers = Accounts.list_teachers()

    {teachers, _} =
      Enum.reduce(teachers, {[], 0}, fn t, {list, index} ->
        index = index + 1
        {list ++ [Map.put(t, :index, index)], index}
      end)

    data_count = length(teachers)

    assigns = %{
      teachers: teachers,
      data_count: data_count
    }

    assigns = build_assigns(conn, assigns)

    merge_fragments(conn, H.index(assigns), use_view_transition: true)
  end
end

Chunked Response

The above file (sse.ex) now supports chunked reponses. Slight change in the api. Instead of

merge_fragments(conn, H.index(assigns), use_view_transition: true)

You now need to use

merge_fragments(chunkify(conn), H.index(assigns), use_view_transition: true)

This would allow you to send multiple resonses like so-

conn
    |> chunkify()
    |> merge_fragments(H.teacher_form(assigns))
		|> merge_signals(...)
		|> execute_script(...)

Creating a long lived SSE connection in Phoenix

defmodule MyAppWeb.SSEController do
  use MyAppWeb, :controller

  def index(conn, _params) do
    :ok = Phoenix.PubSub.subscribe(KarmaAxis.PubSub, "unique_string")

    conn =
      conn
      |> put_resp_header("cache-control", "no-cache")
      |> put_resp_header("connection", "keep-alive")
      |> put_resp_header("content-type", "text/event-stream; charset=utf-8")
      |> put_resp_header("access-control-allow-origin", "*")
      |> send_chunked(200)

    sse_loop(conn, self())
  end

  defp sse_loop(conn, pid) do
    receive do
      {:plug_conn, :sent} ->
        sse_loop(conn, pid)

      msg ->
        chunk(conn, msg)
        sse_loop(conn, pid)
    end
  end
end

Then from anywhere send a broadcast like so

Phoenix.PubSub.broadcast(MyApp.PubSub, "unique_string", msg)
  • The msg should be a datastar compatible message
This doc is made using Smplr