You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
# сървърpid=spawn(fn->receivedo{:call,{pid,ref},msg}-># Do the worksend(pid,{:reply,ref,msg}endend)# клиентref=Process.monitor(pid)send(pid,{:call,{self(),ref},msg})receivedo{^ref,reply}->Process.demonitor(ref,[:flush])reply{:DOWN,^ref,:process,^pid,status}->exit(status)after5_000->Process.demonitor(ref,[:flush]):timeout# will retryend
Къде е проблемът?
# сървърpid=spawn(fn->receivedo{:call,{pid,ref},msg}->Process.sleep(6_000)send(pid,{:reply,ref,msg}endend)# клиентref=Process.monitor(pid)send(pid,{:call,{self(),ref},msg})receivedo{^ref,reply}->Process.demonitor(ref,[:flush])reply{:DOWN,^ref,:process,^pid,status}->exit(status)after5_000->Process.demonitor(ref,[:flush]):timeout# will retryend
Late reply
Обработката на съобщението отнема по-дълго от времето, което клиентът очаква.
Все пак съобщението е обработено и отговорът е изпратен.
Това съобщение никога няма да бъде прочетено от клиента, защото ref няма да се съпостави с никой бъдещ make_ref().
alias - псевдоним на PID, който може да се използва вместо PID в send/2 и да живее по-кратко от процеса.
Псевдонимът е от тип ref. От OTP/24 ref винаги е валидна дестинация при изпращане на съобщения.
Ако ref не е псевдоним на процес или е деактивиран псевдоним, съобщенията drop-ват без грешки (silently).
OTP/24 - Process alias
pid=spawn(fn->receivedo{:call,ref_alias,msg}->Process.sleep(6_000)send(ref_alias,{:reply,ref_alias,msg}endend)ref_alias=Process.monitor(pid,alias: :demonitor)send(pid,{:call,ref_alias,msg})receivedo{^ref_alias,reply}->Process.demonitor(ref_alias,[:flush])reply{:DOWN,^ref_alias,:process,^pid,status}->exit(status)after5_000->Process.demonitor(ref_alias,[:flush]):timeout# will retryend
Обработка на съобщения
Какво се случва, когато дадено съобщение не е съпоставено с никой шаблон в receive?
A: Съобщението никога не може да бъде съпоставено и се изтрива.
B: Съобщението се връща в пощенската кутия и ще се опита да се съпостави в бъдеще?
Съобщенията, които не се съпоставят с никой шаблон се добавят в Save Queue.
Когато някое съобщение се съпостави, съобщенията от Save Queue се добавят обратно към началото пощенската кутия.
Защо?
Actor model
Всичко е Actor (подобно на "всичко е обект").
Един Actor може:
Да изпраща краен брой съобщения до други Actor-и;
Да създава краен брой нови Actor-и;
Да дефинира поведение, което ще се изпълни, кога се получи ново съобщение.
defmoduleReceiverdodefimportant_first()doProcess.sleep(100)receivedo{:high_priority,pid,ref,msg}->send(pid,{ref,:high})endreceivedo{:low_priority,pid,ref,msg}->send(pid,{ref,:low})endimportant_first()endendpid=spawn(Receiver,:important_first,[])send(pid,{:low_priority,self(),make_ref(),:hello})send(pid,{:low_priority,self(),make_ref(),:hello})send(pid,{:high_priority,self(),make_ref(),:HELLO})flush()# {#Reference<0.2940497199.2689335302.9720>, :high}# {#Reference<0.2940497199.2689335302.9704>, :low}# Блокирали сме в първия receive, второто low не се изпълнява
defmoduleLooperdodefloop(expected)doreceivedo^expectedwhenrem(expected,1000)==0->IO.puts("[#{DateTime.utc_now}] Received #{expected}")loop(expected-1)loop(expected-1)^expected->loop(expected-1)endendendpid=spawn(Looper,:loop,[1_000_000])fori<-1..1_000_000,do: send(pid,i)# [2023-03-20 17:26:44.533652Z] Received 1000000# [2023-03-20 17:26:49.362306Z] Received 999000# [2023-03-20 17:26:54.228245Z] Received 998000# [2023-03-20 17:26:59.092978Z] Received 997000# [2023-03-20 17:27:03.910761Z] Received 996000# Read 4k messages in 20 seconds, 996k to go
Selective Receive - Best Case
defmoduleLooperdodefloop(expected)doreceivedo^expectedwhenrem(expected,1000)==0->IO.puts("[#{DateTime.utc_now}] Received #{expected}")loop(expected-1)^expected->loop(expected-1)endendendpid=spawn(Looper,:loop,[1_000_000])fori<-1_000_000..1,do: send(pid,i)# [2023-03-20 17:28:32.210224Z] Received 1000000# [2023-03-20 17:28:32.214683Z] Received 999000# [2023-03-20 17:28:33.217271Z] Received 1000# Read 1M messages in 1 second
Много съобщения
Съобщенията се прочитат в реда, в който са получени.
Проблем: 10 милиона съобщения са получени преди съобщението, което чакаме.
a computer or computer program which manages access to a centralized resource or service in a network.
network computer, computer program, or device that processes requests from a client
a computer, a device or a program that is dedicated to managing network resources. They are called that because they “serve” another computer, device, or program called “client” to which they provide functionality
Какво е GenServer?
OTP behaviour:
Вие имплементирате няколко функции, чието поведение се комбинира с кода на GenServer, за да получите крайния резултат.
Абстракция на процес, който ви позволява да:
Държите и "променяте" (чрез рекурсия) състояние;
Изпращате и обработва синхронни съобщения (call и handle_call);
Изпращате и обработва асинхронни съобщения (cast и handle_cast);
Обработвате всички останали съобщения (handle_info);
Стартирате и спирате процеса чрез определени функции.
Какво е GenServer?
Процесите, които не си комуникират с други процеси, са с ограничена употреба.
Абстракцията, която позволява лесна комуникация и поддръжка на състояние, ще бъде полезна.
Съвместимост с Hot Code Swapping чрез GenServer.code_change/3 (няма да го разглеждаме).
Не е сървър, който слуша на някой порт и обработва TCP/UDP заявки.
Примери за GenServer в Elixir
Ecto DBConnection - връзката с база данни
Supervisor - процесът, който стартира, спира, свързва се и наблюдава други процеси.
Phoenix Channel - процесът, който обработва WebSocket съобщенията с един клиент.
Ranch Server - TCP Socket Acceptor.
Ако обработва HTTP (или просто TCP) заявка, заявка към база данни или стартирате ваш процес под Supervisor, то част от изпълнението на кода най-вероятно минава през GenServer.
GenServer.start_link/3
start_link/3 - стартира процеса и връща {:ok, pid} или грешка.
Приема като аргументи:
Името на модул, имплементиращ GenServer;
Аргумент, който ще бъде подаден на init функцията;
Списък с настройки (име на процеса и други)
В нашия модул имплементираме функция start_link, която извиква GenServer.start_link с подходящи аргументи.
Имплементацията в нашия модул на terminate/2 е опционална.
GenServer.stop/2
Синхронно спира GenServer.
Приема аргументи pid и reason.
Извиква terminate/2, за терминира процеса.
Връща :ok ако terminate/2 завърши с reason.
Извиква exit, в противен случай.
MyModule.handle_info/2
Обработва всички съобщения, изпратени към процеса чрез send.
Форматът на съобщението не е от значение.
Обработва :EXIT (ако trap_exit: true) и :DOWN съобщенията.
Не може да връща резултат към изпращача.
Добра идея е винаги да имаме и catch-all handle_info/2, за да обработваме неочаквани съобщения.
В противен случай неочакваното съобщение ще принтира грешка.
[error] MyModule #PID<0.702.0> received unexpected message in handle_info/2: :hello
Имплементира се в нашия модул.
defmoduleMyModuledouseGenServerdefstart_link(),do: GenServer.start_link(__MODULE__,[])defhandle_info(msg,state)doIO.puts("Got message: #{inspect(msg)}"){:noreply,state}endend{:ok,pid}=MyModule.start_link()send(pid,:hello)send(pid,%{a: "this is a map"})# Got message: :hello# Got message: %{a: "this is a map"}
MyModule.init/1
Използва се за инициализиране на състоянието на процеса.
Изпълнява се в GenServer процеса (self() връща pid на новия процес).
Приема втория аргумент на GenServer.start_link/{2,3}.
Докато init/1 завърши, процесът, който стартира GenServer-а, блокира.
Стартирането и инициализирането на GenServer е синхронна операция.
Според върната стойност от handle_call/3, то поведението е едно от следните:
Връща отговор към клиента: {:reply, ...};
Не връща отговор към клиента: {:noreply, ...};
Спира процеса: {:stop, ...};
Всеки вариант има няколко подварианта (timeout, hibernate, continue).
Single process bottleneck
Всеки GenServer е един процес и целият код в един процес е последователен.
Съобщенията се обработват последователно, едно по едно.
Не е проблем, ако обработката на съобщенията е много бърза.
Проблем, ако обработката на съобщения е бавна.
Идеи?
defhandle_call({:get_fib,n},from,state)do# Стартирай Task, който ще продължи да живее и работи и след като# тази функция е приключила. Използвай GenServer.reply/2, за да# върнеш резултата от Task-а на клиента.Task.start(fn->fib=calculate_fib(n)GenServer.reply(from,fib)end)# Не връщай нищо. Клиентът ще продължава да чака отговор.# Отговорът ще бъде върнат от стартирания Task.{:noreply,state}end