我正在编写一个程序,使用 Erlang 多处理解决生产者-消费者问题,其中一个进程负责处理我生产/消费的缓冲区以及许多生产者和许多消费者进程。为了简化起见,我假设生产者/消费者不知道他的操作失败了(由于缓冲区限制,无法生产或使用),但服务器已准备好执行此操作。
我的代码是:
服务器代码
server(Buffer, Capacity, CountPid) ->
receive
%% PRODUCER
{Pid, produce, InputList} ->
NumberProduce = lists:flatlength(InputList),
case canProduce(Buffer, NumberProduce, Capacity) of
true ->
NewBuffer = append(InputList, Buffer),
CountPid ! lists:flatlength(InputList),
Pid ! ok,
server(NewBuffer,Capacity, CountPid);
false ->
Pid ! tryagain,
server(Buffer, Capacity, CountPid)
end;
%% CONSUMER
{Pid, consume, Number} ->
case canConsume(Buffer, Number) of
true ->
Data = lists:sublist(Buffer, Number),
NewBuffer = lists:subtract(Buffer, Data),
Pid ! {ok, Data},
server(NewBuffer, Capacity,CountPid);
false ->
Pid ! tryagain,
server(Buffer, Capacity, CountPid)
end
end.
生产者和消费者
producer(ServerPid) ->
X = rand:uniform(9),
ToProduce = [rand:uniform(500) || _ <- lists:seq(1, X)],
ServerPid ! {self(),produce,ToProduce},
producer(ServerPid).
consumer(ServerPid) ->
X = rand:uniform(9),
ServerPid ! {self(),consume,X},
consumer(ServerPid).
启动和辅助功能(我附上,因为我不知道我的问题到底在哪里)
spawnProducers(Number, ServerPid) ->
case Number of
0 -> io:format("Spawned producers");
N ->
spawn(zad2,producer,[ServerPid]),
spawnProducers(N - 1,ServerPid)
end.
spawnConsumers(Number, ServerPid) ->
case Number of
0 -> io:format("Spawned producers");
N ->
spawn(zad2,consumer,[ServerPid]),
spawnProducers(N - 1,ServerPid)
end.
start(ProdsNumber, ConsNumber) ->
CountPid = spawn(zad2, count, [0,0]),
ServerPid = spawn(zad2,server,[[],20, CountPid]),
spawnProducers(ProdsNumber, ServerPid),
spawnConsumers(ConsNumber, ServerPid).
canProduce(Buffer, Number, Capacity) ->
lists:flatlength(Buffer) + Number =< Capacity.
canConsume(Buffer, Number) ->
lists:flatlength(Buffer) >= Number.
append([H|T], Tail) ->
[H|append(T, Tail)];
append([], Tail) ->
Tail.
我正在尝试使用这样的过程计算元素的数量,每当生成元素时,服务器都会向其发送消息。
count(N, ThousandsCounter) ->
receive
X ->
if
N >= 1000 ->
io:format("Yeah! We have produced ~p elements!~n", [ThousandsCounter]),
count(0, ThousandsCounter + 1000);
true -> count(N + X, ThousandsCounter)
end
end.
我希望这个程序能够正常工作,这意味着:它产生元素,产生的元素的增加取决于时间,如 f(t) = kt,k 常数,我拥有的流程越多,生产速度就越快。
实际问题
I 启动程序:
erl
c(zad2)
zad2:start(5,5)
程序的行为方式:
- 生产持续的时间越长,单位时间内生产的元素就越少(例如,在第一秒 10000,在接下来的 5000 中,在第 10 秒 1000 等。
- 我拥有的流程越多,生产速度就越慢,在 start(10,10) 中,我需要等待大约一秒钟才能获得前一千个,而对于 start(2,2) 20000 几乎立即出现
- start(100,100) 让我重新启动计算机(我在 Ubuntu 上工作),因为使用了整个 CPU,并且没有内存可供我打开终端和终止 erlang 机器
为什么我的程序没有像我预期的那样运行?我是否在 Erlang 编程方面做错了什么,或者这是操作系统或其他问题?
上面写的生产者/1和消费者/1函数从不等待任何东西 - 它们只是循环和循环,用消息轰炸服务器。服务器的消息队列正在迅速填满,Erlang VM 将尝试尽可能多地增长,窃取您的所有内存,循环进程将窃取所有内核上的所有可用 CPU 时间。