Elixir: Domain Driven Design with Actor Model
Couple of months back, I was listening to Eric Evan’s pod cast on Software engineering radio where he mentioned that actor model is a great fit for modeling aggregate root in domain driven design, I was like what, wait? ain’t OO languages the only way to model concepts like aggregate, sagas, domain service, repositories or bounded Contexts in Domain driven design ?
After hours of googling and days of reading and weeks of asking questions on forums, I realized that modeling DDD concepts is more explicit and easy in Elixir because of the language constructs it provides. For example using GenServer for aggregate root, OTP application for each Bounded Context, putting all of these Bounded Context together as an umbrella App.
What is an Aggregate Root ?
Essentially aggregate is pattern in domain driven design, Aggregate is a cluster of your domain objects which can be treated as a singe unit.
Let’s say, you are writing a blog engine and you have identified some domain models like Blog, Post, Comment, and Like.
A Blog contains multiple Posts, a post contains multiple Comments and Likes, while defining the relationship between domain models, it is a good practice to define the relationship in such a way that the object graph is acyclic, that is, there are no cycles. for example, a blog might have posts but the post should not contain a reference to the blog, similarly a blog might contain comments but the comment should not contain a reference to the post.
Blog, Post, Comment, and Likes can be considered as a collection of objects which define an aggregate in our application. Next thing is to identify a model among these models and promote it to be root (called aggregate root) and make sure that all the operations/access on these models are happening through the aggregate root.
In our case, we can promote the Blog model to be aggregate root, and make sure that we only access Posts, like Blog.Post or Comments like Blog.Post.Comment, this approach guarantees that our models are being modified only via our aggregate roots and we are not breaking any invariant.
DDD and Phoenix 1.3
If you have read about latest changes in Phoenix framework (1.3), you would realize that it forces developer to think about their models upfront and it encourages the concepts like Bounded Context and aggregate, wherein user need to specify context (more in line with Bounded context in DDD) to access resource (there are no models anymore). For example Blog, Post, Comment and Likes can be grouped together and accessed by using a Blog Context object. For detailed information take a look at this keynote by Chris McCord at ElixirConf 2017.
Phoenix framework is awesome, but it is not my application ( look here if not convinced), I like to think of Phoenix as only a web interface for my application and should only be responsible for exposing endpoints to interact with my application with minimal business logic. Following this approach means that I am going to move out my business logic from Phoenix app and put it somewhere else, to achieve this I will use Umbrella project and cut out our Bounded Context in our application into different OTP application within single Umbrella project.
Aggregate root as Elixir Process
Let’s consider an application where we are storing and retrieving user information such as name, age, gender and educational details, so we have following models.
NOTE: If you want to look the the entire repository, you can find it here.
- BasicInfo module containing name, age and gender fields.
defmodule Bhaduli.User.BasicInfo do
defstruct [name: nil, age: nil, gender: nil]
def new(basic_info) do
%Bhaduli.User.BasicInfo{name: basic_info.name, age: basic_info.age, gender: basic_info.gender}
end
end
- EducationalDetails module containing graduation, senior_secondary and intermediate fields
defmodule Bhaduli.User.EducationalDetails do
defstruct graduation: nil, senior_secondary: nil, intermediate: nildef new(educational_details) do
struct(Bhaduli.User.EducationalDetails, educational_details)
end
end
- User module which contains user_id, basicInfo and EducationalDetails model.
defmodule Bhaduli.User doalias Bhaduli.User.{BasicInfo, EducationalDetails}defstruct [user_id: nil, basic_info: %Bhaduli.User.BasicInfo{}, educational_details: %Bhaduli.User.EducationalDetails{}]...
User Module as Aggregate root
I have got three models, basicInfo, educationalDetails and User. I am going to make User as my aggregate root and expose methods to save and retrieve EducationalDetails and BasicInfo as shown below.
defmodule Bhaduli.User doalias Bhaduli.User.{BasicInfo, EducationalDetails}defstruct [user_id: nil, basic_info: %Bhaduli.User.BasicInfo{}, educational_details: %Bhaduli.User.EducationalDetails{}]def update(id, %BasicInfo{} = basic_info) do
enddef update(id, %EducationalDetails{} = educational_details) do
enddef get_basic_info(id) do
enddef get_educational_details(id) do
enddef get(id) do
end
So we have got basic skeleton for User aggregate, but still we haven’t really done anything interesting. Like I said earlier I want to model my aggregate root as a long running elixir process and way to do that is to use GenServer.
But before we do that I want to explain how entire flow should work. Each user will be an Elixir process, for example
- If a new user is created with user_id “Erin” than we would create a new process with user_id “Erin”.
- If we try to get user (user_id “Mustang”) for the first time , we will fetch it from database and spawn a new process with given user_id. Now we will have a total of two user processes one each for “Erin” and “Mustang”.
- If an existing user (user_id “Envy”) is updated and a process for that user has not been created yet than we are going to spawn new process and load its state from Database and update it. Now we will have three processes each for “Erin”, “Mustang” and “Envy”.
- We will keep one process for each user and keep it loaded in memory. If user process is already running we will read from it and return otherwise we go to database and spawn a new process.
- We will add a supervisor for user process so that every time a process crashes we create a new one and rehydrate it from database.
Process Registry
Even though I have outlined overall flow of application, there are few things that needs to be addressed first.
- Whenever a new process is created, we get back a PID. Since we are creating one process for each user, soon we will have thousands of processes . Anyone who want to get user data will have to know PID of each user process in order to interact with it.
- Having a supervisor for user process makes our application fault tolerant but also adds a complexity because every time a process crashes supervisor is going to create a new one and this new process will have a different PID than the one that had crashed.
Though we can hand-roll our process registration mechanism,but it will soon get hard to track all of these processes as we intend to create thousands of them. On top of it, since supervisor is going to create a new process for every crashed one, propagating new process Id to the client would add more complexity.
Luckily we have a module in Elixir called Registry, which allows us to map PID to any given string/atom, so that we don’t have to deal with cryptic PIDs. In our case we want to client to be able to interact with our application only by using user_id, so we would be mapping PID to user_id.
Registry can be started with Registry.start_link/3, once registry is started we need some kind of mechanism which will map our user_id to the PID of spawned process and way to do that is to implement a function that returns a :via
tuple, based on a value we provide.
Our via_tuple
function, along with the Registry module, will allows us to spawn a process for a given user_id and make subsequent calls on the process using the same id, rather than a PID.
Our :via_tuple function
def via_tuple(user_id) do
{:via, Registry, {:user_process_registry, user_id}}
end
GenServer provides the plumbing to make use of this :via
tuple and automatically handle the registration of our name (user_id) to the actual process.
we will use this via_tuple function while spawning a new user process(User module)
#User.exdef start_link(name) do
GenServer.start_link(__MODULE__, %Bhaduli.User{user_id: name}, name: via_tuple(name))
end
Aggregate Root as GenServer
In order to make User aggregate an Elixir process, I am going to use GenServer behavior in User module.
In our main application file, start the registry. Here we are starting the registry by giving it a unique name as `user_process_registry` and supervising it.
defmodule Bhaduli do
use Applicationdef start(_type, _args) do
import Supervisor.Spec, warn: falsechildren = [
supervisor(Registry, [:unique, :user_process_registry]),
supervisor(Bhaduli.UserSupervisor, [])
]
opts = [strategy: :one_for_one, name: Bhaduli.Supervisor]
Supervisor.start_link(children, opts)
end
end
User Supervisor, we are defining a Supervisor for user module by giving it a simple_one_for_one strategy and module name.
defmodule Bhaduli.UserSupervisor do
use Supervisordef start_link do
Supervisor.start_link(__MODULE__,nil, name: :user_sup)
enddef init(_) do
children = [worker(Bhaduli.User, [])]
supervise(children, strategy: :simple_one_for_one)
end
end
Now, we will use GenServer behaviour in User module and alias BasicInfo and Educational Details.
defmodule Bhaduli.User do
use GenServeralias Bhaduli.User.{BasicInfo, EducationalDetails}...
Next we will define a User struct to contain user_id, basic_info and educational_details.
defmodule Bhaduli.User do
use GenServeralias Bhaduli.User.{BasicInfo, EducationalDetails}@registry :user_process_registrydefstruct [user_id: nil, basic_info: %Bhaduli.User.BasicInfo{}, educational_details: %Bhaduli.User.EducationalDetails{}]
Let’s write a start_link/3 method in User module which will be called by User supervisor while starting User process. Here we are passing three arguments to start_link function
- __MODULE__, this is the module name.
- User struct as initial argument.
- Third parameter in alias for the spawned process, which allows us to access this process with given alias, we are using
via_tuple
function because GenServer provides the plumbing to make use of this:via
tuple and automatically handle the registration of our alias (user_id) to the actual process.
#User.exdef start_link(name) do
GenServer.start_link(__MODULE__, %Bhaduli.User{user_id: name}, name: via_tuple(name))
end
Now comes the interesting part, we are defining a init callback which will be called when GenServer is started. It is this method which will be responsible for loading the User state from database.
#User.exdef init(%Bhaduli.User{} = user) do
GenServer.cast(self(), {:populate_user})
{:ok, user}
end
Inside init we are calling handle_callback
function, this callback goes to UserRepository and fetches the data from database, If a user is found it is returned otherwise and error is returned.
#User.exdef handle_cast({:populate_user}, user) do
case Bhaduli.UserRepository.get(user.user_id) do
{:error, _} -> {:noreply, user}
{:ok, user} -> {:noreply, user}
end
end
It is important to note that GenServer won’t start until init method returns. for this reason we are using handle_cast
method instead of handle_call
because handle_cast
can load data asynchronously and init method don’t have to wait for it to finish.
Once we have done all this plumbing we can get the PID of any process for a given user_id from Registry as shown below.
[{pid, _}] = Registry.lookup(@registry, id)
Getting basic_info for a given user_id
def get_basic_info(id) do
[{pid, _}] = Registry.lookup(@registry, id)
GenServer.call(pid, {:basic_info})
end
We are using handle_call
for getting basic_info as we are going to wait for reply. Interesting thing to note here is that we are not making database call (Guess why ?)
def handle_call({:basic_info}, pid, state) do
{:reply, state.basic_info, state}
end
Similarly for educational_details
def get_educational_details(id) do
[{pid, _}] = Registry.lookup(@registry, id)
GenServer.call(pid, {:educational_details})
enddef handle_call({:educational_details}, pid, state) do
{:reply, state.educational_details, state}
end
For updating basic_info and educational details we are going to use handle_cast as we don’t need to wait for response from GenServer callback.
def update(id, %BasicInfo{} = basic_info) do
[{pid, _}] = Registry.lookup(@registry, id)
GenServer.cast(pid, {:update_basic_info, basic_info})
enddef update(id, %EducationalDetails{} = educational_details) do
[{pid, _}] = Registry.lookup(@registry, id)
GenServer.cast(pid, {:update_educational_details, educational_details})
end#Callbacksdef handle_cast({:update_basic_info, basic_info}, user) do
user = %Bhaduli.User{user | basic_info: basic_info}
{:noreply, user}
enddef handle_cast({:update_educational_details, educational_details}, user) do
user = %Bhaduli.User{user | educational_details: educational_details}
{:noreply, user}
end
Putting it all together.
#User.ex
defmodule Bhaduli.User do
use GenServer
alias Bhaduli.User.{BasicInfo, EducationalDetails}
@registry :user_process_registry
defstruct [user_id: nil, basic_info: %Bhaduli.User.BasicInfo{}, educational_details: %Bhaduli.User.EducationalDetails{}]def start_link(name) do
GenServer.start_link(__MODULE__, %Bhaduli.User{user_id: name}, name: via_tuple(name))
enddef init(%Bhaduli.User{} = user) do
GenServer.cast(self(), {:populate_user})
{:ok, user}
enddef update(id, %BasicInfo{} = basic_info) do
[{pid, _}] = Registry.lookup(@registry, id)
GenServer.cast(pid, {:update_basic_info, basic_info})
enddef update(id, %EducationalDetails{} = educational_details) do
[{pid, _}] = Registry.lookup(@registry, id)
GenServer.cast(pid, {:update_educational_details, educational_details})
enddef get_basic_info(id) do
[{pid, _}] = Registry.lookup(@registry, id)
GenServer.call(pid, {:basic_info})
enddef get_educational_details(id) do
[{pid, _}] = Registry.lookup(@registry, id)
GenServer.call(pid, {:educational_details})
enddef get(id) do
case Registry.lookup(@registry, id) do
[{pid, _}] -> Registry.lookup(@registry, id)
GenServer.call(pid, {})
[] -> {:error, :not_found}
end
enddef handle_cast({:update_basic_info, basic_info}, user) do
user = %Bhaduli.User{user | basic_info: basic_info}
{:noreply, user}
enddef handle_cast({:update_educational_details, educational_details}, user) do
user = %Bhaduli.User{user | educational_details: educational_details}
{:noreply, user}
enddef handle_cast({:populate_user}, user) do
case Bhaduli.UserRepository.get(user.user_id) do
{:error, _} -> {:noreply, user}
{:ok, user} -> {:noreply, user}
enddef handle_cast({:update_basic_info, basic_info}, user) do
user = %Bhaduli.User{user | basic_info: basic_info}
{:noreply, user}
enddef handle_cast({:update_educational_details, educational_details}, user) do
user = %Bhaduli.User{user | educational_details: educational_details}
{:noreply, user}
endenddef handle_call({:basic_info}, pid, state) do
{:reply, state.basic_info, state}
enddef handle_call({:educational_details}, pid, state) do
{:reply, state.educational_details, state}
enddef handle_call({}, pid, state) do
{:reply, state, state}
enddef via_tuple(user_id) do
{:via, Registry, {:user_process_registry, user_id}}
endend
Code for this can be found here. This application is part of an Umbrella project and in next post I’ll show how this application is used from a Phoenix application.
About Author
About Author :
Naveen Negi is software engineer based out of Wolfsburg Germany. He believes in software craftsmanship. He is also an avid blogger, reader, and a polyglot developer. In his free time he tries to explore different spiritual paradigm and the one which entices him most is Samakhya school of philosophy.
LinkedIn profile: http://linkedin.com/in/nav301186