Transactions, Multi & Bulk Operations

For simple CRUD, the repo functions are enough. But some operations need atomicity (all-or-nothing), multi-step pipelines, or bulk efficiency. Kura provides transactions, multi, and bulk operations for these cases.

Transactions

Wrap multiple operations in a transaction — if any step fails, everything rolls back:

blog_repo:transaction(fun() ->
    CS1 = user:changeset(#{}, #{<<"username">> => <<"alice">>,
                                <<"email">> => <<"alice@example.com">>,
                                <<"password_hash">> => <<"hashed">>}),
    {ok, User} = blog_repo:insert(CS1),

    CS2 = post:changeset(#{}, #{<<"title">> => <<"Welcome">>,
                                <<"body">> => <<"Hello world">>,
                                <<"user_id">> => maps:get(id, User)}),
    {ok, _Post} = blog_repo:insert(CS2),
    ok
end).

If the second insert fails, the user creation is rolled back too. The transaction function returns {ok, ReturnValue} on success or {error, Reason} on failure.

Multi: named transaction pipelines

For complex multi-step operations, kura_multi provides a pipeline where each step has a name and can reference results from previous steps:

M = kura_multi:new(),

%% Step 1: Create a user
M1 = kura_multi:insert(M, create_user,
    user:changeset(#{}, #{<<"username">> => <<"alice">>,
                          <<"email">> => <<"alice@example.com">>,
                          <<"password_hash">> => <<"hashed">>})),

%% Step 2: Create a first draft, using the user ID from step 1
M2 = kura_multi:insert(M1, create_draft,
    fun(#{create_user := User}) ->
        post:changeset(#{}, #{<<"title">> => <<"My First Draft">>,
                              <<"body">> => <<"Coming soon...">>,
                              <<"user_id">> => maps:get(id, User)})
    end),

%% Step 3: Run a custom function
M3 = kura_multi:run(M2, send_welcome,
    fun(#{create_user := User}) ->
        logger:info("Welcome ~s!", [maps:get(username, User)]),
        {ok, sent}
    end),

%% Execute everything atomically
case blog_repo:multi(M3) of
    {ok, #{create_user := User, create_draft := Post, send_welcome := sent}} ->
        logger:info("User ~p created with draft post ~p",
                    [maps:get(id, User), maps:get(id, Post)]);
    {error, FailedStep, FailedValue, _Completed} ->
        logger:error("Multi failed at step ~p: ~p", [FailedStep, FailedValue])
end.

Multi API

FunctionPurpose
kura_multi:new()Create a new multi
kura_multi:insert(M, Name, CS)Insert a record (changeset or fun returning changeset)
kura_multi:update(M, Name, CS)Update a record
kura_multi:delete(M, Name, CS)Delete a record
kura_multi:run(M, Name, Fun)Run a custom function

Steps that take a fun receive a map of all completed steps so far:

fun(#{step1 := Result1, step2 := Result2}) -> ...

Error handling

When a multi fails, you get the name of the failed step, the error value, and a map of steps that completed before the failure:

case blog_repo:multi(M) of
    {ok, Results} ->
        %% All steps succeeded, Results is a map of step_name => result
        ok;
    {error, FailedStep, FailedValue, CompletedSteps} ->
        %% FailedStep: atom name of the step that failed
        %% FailedValue: the error (e.g., a changeset with errors)
        %% CompletedSteps: map of steps that succeeded (then rolled back)
        ok
end.

Bulk operations

insert_all — batch inserts

Insert many records at once:

Posts = [
    #{title => <<"Post 1">>, body => <<"Body 1">>, status => <<"draft">>, user_id => 1},
    #{title => <<"Post 2">>, body => <<"Body 2">>, status => <<"draft">>, user_id => 1},
    #{title => <<"Post 3">>, body => <<"Body 3">>, status => <<"published">>, user_id => 2}
],
{ok, 3} = blog_repo:insert_all(post, Posts).

insert_all bypasses changesets — it inserts raw maps directly. Use it for imports and seeding where you trust the data. The return value is the number of rows inserted.

update_all — batch updates

Update many records matching a query:

%% Publish all drafts
Q = kura_query:from(post),
Q1 = kura_query:where(Q, {status, <<"draft">>}),
{ok, Count} = blog_repo:update_all(Q1, #{status => <<"published">>}).

update_all returns the count of rows affected. It applies the updates in a single SQL statement.

delete_all — batch deletes

Delete all records matching a query:

%% Delete all archived posts
Q = kura_query:from(post),
Q1 = kura_query:where(Q, {status, <<"archived">>}),
{ok, Count} = blog_repo:delete_all(Q1).

Upserts with on_conflict

Import data without failing on duplicates:

%% Insert a tag, do nothing if it already exists
CS = tag:changeset(#{}, #{<<"name">> => <<"erlang">>}),
{ok, Tag} = blog_repo:insert(CS, #{on_conflict => {name, nothing}}).

The on_conflict option controls what happens when a unique constraint is violated:

%% Do nothing on conflict (skip the row)
#{on_conflict => {name, nothing}}

%% Replace all fields on conflict
#{on_conflict => {name, replace_all}}

%% Replace specific fields on conflict
#{on_conflict => {name, {replace, [updated_at]}}}

%% Use a named constraint instead of a field
#{on_conflict => {{constraint, <<"tags_name_key">>}, nothing}}

Practical example: importing posts

import_posts(Posts) ->
    lists:foreach(fun(PostData) ->
        CS = post:changeset(#{}, PostData),
        blog_repo:insert(CS, #{on_conflict => {title, nothing}})
    end, Posts).

Putting it all together

A controller action that publishes a post and notifies subscribers atomically:

publish(#{bindings := #{<<"id">> := Id}}) ->
    case blog_repo:get(post, binary_to_integer(Id)) of
        {ok, #{status := draft} = Post} ->
            M = kura_multi:new(),
            M1 = kura_multi:update(M, publish_post,
                post:changeset(Post, #{<<"status">> => <<"published">>})),
            M2 = kura_multi:run(M1, notify,
                fun(#{publish_post := Published}) ->
                    nova_pubsub:broadcast(posts, "post_published", Published),
                    {ok, notified}
                end),
            case blog_repo:multi(M2) of
                {ok, #{publish_post := Published}} ->
                    {json, post_to_json(Published)};
                {error, _Step, _Value, _} ->
                    {status, 422, #{}, #{error => <<"failed to publish">>}}
            end;
        {ok, _} ->
            {status, 422, #{}, #{error => <<"only drafts can be published">>}};
        {error, not_found} ->
            {status, 404, #{}, #{error => <<"post not found">>}}
    end.

With transactions and bulk operations covered, let's prepare the application for deployment.