How to use Datastar with Elixir Phoenix
Step 1: Add Datastar
Add the Datastar js file to root layout
<script type="module" src="https://cdn.jsdelivr.net/gh/starfederation/datastar@v1.0.0-beta.2/bundles/datastar.js"></script>
Step 2: Allow Server Sent Events
By default Phoenix would not allow server sent events.
We need to add the mime type in config.exs file
config :mime, :types, %{
"text/event-stream" => ["sse"]
}
And then add it in our router
plug :accepts, ["html", "sse"]
Step 3: Create SSE helpers to manage Datastar responses
defmodule DatastarElixir do
@moduledoc """
An Elixir SDK for Datastar, providing Server-Sent Events (SSE) utilities for Phoenix applications.
Conforms to https://data-star.dev/reference/sse_events.
"""
import Plug.Conn
alias DatastarElixir.Constants
@allowed_opts [
:selector, :merge_mode, :settle_duration, :use_view_transition,
:fragments, :signals, :only_if_missing, :paths, :auto_remove,
:attributes, :script
]
def merge_fragments(content, opts \\ [use_view_transition: true]) do
content = content |> Phoenix.HTML.Safe.to_iodata() |> IO.iodata_to_binary() |> String.replace("\n", "")
rows = "event: #{Constants.event_type_merge_fragments()}"
message = build_message_from_opts(rows, opts)
add_data_row(message, :fragments, content)
|> add_message_break()
end
def remove_fragments(selectors, opts \\ []) do
rows = "event: #{Constants.event_type_remove_fragments()}"
selectors = List.wrap(selectors) |> Enum.map(&String.replace(&1, "\n", ""))
message = build_message_from_opts(rows, opts)
Enum.reduce(selectors, message, fn selector, acc ->
add_data_row(acc, :selector, selector)
end)
|> add_message_break()
end
def merge_signals(signals, opts \\ []) do
signals = signals |> to_string() |> String.replace("\n", "")
rows = "event: #{Constants.event_type_merge_signals()}"
message = build_message_from_opts(rows, opts)
add_data_row(message, :signals, signals)
|> add_message_break()
end
def remove_signals(paths, opts \\ []) do
rows = "event: #{Constants.event_type_remove_signals()}"
paths = List.wrap(paths) |> Enum.map(&String.replace(&1, "\n", ""))
message = build_message_from_opts(rows, opts)
Enum.reduce(paths, message, fn path, acc ->
add_data_row(acc, :paths, path)
end)
|> add_message_break()
end
def execute_script(script, opts \\ []) do
script = script |> to_string() |> String.replace("\n", " ")
rows = "event: #{Constants.event_type_execute_script()}"
message = build_message_from_opts(rows, opts)
add_data_row(message, :script, script)
|> add_message_break()
end
def toast(content, opts \\ [selector: "#toasts", merge_mode: "append", use_view_transition: true]) do
content = content |> Phoenix.HTML.Safe.to_iodata() |> IO.iodata_to_binary() |> String.replace("\n", "")
rows = "event: #{Constants.event_type_merge_fragments()}"
message = build_message_from_opts(rows, opts)
add_data_row(message, :fragments, content)
|> add_message_break()
end
def add_data_row(rows, key, value), do: "#{rows}\ndata: #{get_key_name(key)} #{value}"
defp get_key_name(:selector), do: "selector"
defp get_key_name(:merge_mode), do: "mergeMode"
defp get_key_name(:settle_duration), do: "settleDuration"
defp get_key_name(:use_view_transition), do: "useViewTransition"
defp get_key_name(:fragments), do: "fragments"
defp get_key_name(:signals), do: "signals"
defp get_key_name(:only_if_missing), do: "onlyIfMissing"
defp get_key_name(:paths), do: "paths"
defp get_key_name(:auto_remove), do: "autoRemove"
defp get_key_name(:attributes), do: "attributes"
defp get_key_name(:script), do: "script"
def build_message_from_opts(rows, opts) when is_list(opts) do
Enum.reduce(opts, rows, fn {key, value}, acc ->
if key in @allowed_opts do
value = value |> to_string() |> String.replace("\n", "")
add_data_row(acc, key, value)
else
IO.warn("Ignoring invalid option key: #{inspect(key)}")
acc
end
end)
end
def add_message_break(message), do: "#{message}\n\n"
def build_assigns(conn, assigns) do
Enum.reduce(assigns, conn.assigns, fn {k, v}, a ->
Map.put(a, k, v)
end)
end
def send_chunk(messages, conn) when is_list(messages) do
send_compressed_chunk(Enum.join(messages, ""), conn)
end
def send_chunk(message, conn) do
send_compressed_chunk(message, conn)
end
def flush(message, conn) do
conn
|> prepare_uncompressed_sse()
|> chunk(message)
|> handle_chunk_result()
end
defp send_compressed_chunk(message, conn) do
conn
|> prepare_sse()
|> chunk(:zlib.gzip(message))
|> handle_chunk_result()
end
defp prepare_sse(conn) do
if conn.state == :chunked do
conn
else
conn
|> put_resp_header("content-encoding", "gzip")
|> common_sse_headers()
|> send_chunked(200)
end
end
defp prepare_uncompressed_sse(conn) do
if conn.state == :chunked do
conn
else
common_sse_headers(conn)
|> send_chunked(200)
end
end
defp common_sse_headers(conn) do
conn
|> put_resp_header("content-type", "text/event-stream")
|> put_resp_header("cache-control", "no-cache")
|> put_resp_header("connection", "keep-alive")
end
defp handle_chunk_result({:ok, conn}), do: {:ok, conn}
defp handle_chunk_result({:error, reason}), do: {:error, reason, conn}
end
Save this file as myapp_web/sse.ex
And then in the myapp_web.ex file import it in the controllers section so that its available to all the controllers
def controller do
...
import Plug.Conn
import OtestWeb.SSE
...
end
Step4: Use it in controllers
defmodule OtestWeb.TeacherController do
use OtestWeb, :controller
alias OtestWeb.TeacherHTML, as: H
alias Otest.Accounts
alias Otest.Accounts.Admin
alias Phoenix.Component, as: C
def index(conn, _params) do
teachers = Accounts.list_teachers()
{teachers, _} =
Enum.reduce(teachers, {[], 0}, fn t, {list, index} ->
index = index + 1
{list ++ [Map.put(t, :index, index)], index}
end)
data_count = length(teachers)
render(conn, :index, teachers: teachers, data_count: data_count, main_page: "teachers", datastar: true)
end
def new(conn, _) do
teacher = %Admin{}
changeset = Admin.registration_changeset(teacher, %{})
form = C.to_form(changeset)
assigns = %{
form: form,
form_action: "/admins/teachers",
form_method: "post",
form_title: "Add Teacher"
}
assigns = build_assigns(conn, assigns)
conn
|> merge_fragments(H.teacher_form(assigns))
end
def create(conn, %{"admin" => params}) do
if conn.assigns.current_admin.role == "SUPERADMIN" do
case Accounts.register_teacher(params) do
{:ok, _c} ->
send_full_page(conn)
{:error, changeset} ->
form = C.to_form(changeset)
assigns = %{
form: form,
form_action: "/admins/teachers",
form_method: "post",
form_title: "Add Teacher"
}
assigns = build_assigns(conn, assigns)
merge_fragments(conn, H.teacher_form(assigns))
end
else
text(conn, "Unauthorized Attempt")
end
end
def edit(conn, %{"id" => id}) do
teacher = Accounts.get_admin!(id)
changeset = Admin.registration_changeset(teacher, %{})
form = C.to_form(changeset)
assigns = %{
form: form,
form_action: "/admins/teachers/#{teacher.id}",
form_method: "put",
form_title: "Edit Teacher"
}
assigns = build_assigns(conn, assigns)
merge_fragments(conn, H.teacher_form(assigns))
end
def update(conn, %{"id" => id, "admin" => params}) do
if conn.assigns.current_admin.role == "SUPERADMIN" do
teacher = Accounts.get_admin!(id)
case Accounts.update_teacher(teacher, params) do
{:ok, _c} ->
send_full_page(conn)
{:error, changeset} ->
form = C.to_form(changeset)
assigns = %{
form: form,
form_action: "/admins/teachers/#{teacher.id}",
form_method: "put",
form_title: "Edit Teacher"
}
assigns = build_assigns(conn, assigns)
merge_fragments(conn, H.teacher_form(assigns))
end
else
text(conn, "Unauthorized Attempt")
end
end
def delete(conn, %{"id" => id}) do
if conn.assigns.current_admin.role == "SUPERADMIN" do
teacher = Accounts.get_admin!(id)
case Accounts.delete_teacher(teacher) do
{:ok, c} ->
send_full_page(conn)
_ ->
{:noreply, conn |> put_flash(:error, "Failed to delete teacher.")}
end
else
{:noreply, conn |> put_flash(:error, "Unauthorized attempt")}
end
end
def send_full_page(conn) do
teachers = Accounts.list_teachers()
{teachers, _} =
Enum.reduce(teachers, {[], 0}, fn t, {list, index} ->
index = index + 1
{list ++ [Map.put(t, :index, index)], index}
end)
data_count = length(teachers)
assigns = %{
teachers: teachers,
data_count: data_count
}
assigns = build_assigns(conn, assigns)
merge_fragments(conn, H.index(assigns), use_view_transition: true)
end
end
Chunked Response
The above file (sse.ex) now supports chunked reponses. Slight change in the api. Instead of
merge_fragments(conn, H.index(assigns), use_view_transition: true)
You now need to use
merge_fragments(chunkify(conn), H.index(assigns), use_view_transition: true)
This would allow you to send multiple resonses like so-
conn
|> chunkify()
|> merge_fragments(H.teacher_form(assigns))
|> merge_signals(...)
|> execute_script(...)
Creating a long lived SSE connection in Phoenix
defmodule MyAppWeb.SSEController do
use MyAppWeb, :controller
def index(conn, _params) do
:ok = Phoenix.PubSub.subscribe(KarmaAxis.PubSub, "unique_string")
conn =
conn
|> put_resp_header("cache-control", "no-cache")
|> put_resp_header("connection", "keep-alive")
|> put_resp_header("content-type", "text/event-stream; charset=utf-8")
|> put_resp_header("access-control-allow-origin", "*")
|> send_chunked(200)
sse_loop(conn, self())
end
defp sse_loop(conn, pid) do
receive do
{:plug_conn, :sent} ->
sse_loop(conn, pid)
msg ->
chunk(conn, msg)
sse_loop(conn, pid)
end
end
end
Then from anywhere send a broadcast like so
Phoenix.PubSub.broadcast(MyApp.PubSub, "unique_string", msg)
- The msg should be a datastar compatible message
This doc is made using
Smplr