如何使用 Erlang 以分布式方式读取二进制文件



我有几个中等大小(1 到 5 GB)的二进制文件,我想使用 Erlang 读取和处理这些文件。

每个文件都有不同大小的记录,即一条记录为 200 kb,但另一条记录的大小可能是 800 kb。记录大小可以通过读取记录的前几个字节来获得。由于它是一个二进制文件,因此两条记录之间没有分隔符。

为了

处理这些文件,我们可以编写多线程程序,但只是为了好玩,我想使用 Erlang 并行处理文件。

我是 Erlang

的新手,所以我不知道如何将文件拆分为块并将这些块传递给 Erlang 进程。

谁能给出一些想法?

我已经在其他编程语言中做过很多次了,在 Erlang 中这是一个有趣的学习练习。

基本策略是告诉每个进程其起始字节偏移量、记录大小和要读取的记录数。(如果一个文件中有不同大小的记录,则可以传递记录大小列表,如 [200, 800, 800, 800, 800, 200, 100] )。工作线程独立处理其块并将结果返回给父级。

我认为这很容易,你可以自己弄清楚。你需要把erlang:spawnfile:openfile:readfile:position作为主要部分。但是,如果你想要剧透,这是我对一个模块的实现,该模块从二进制文件中读取数字并使用多个进程找到平均值。 (我不是 Erlang 专家,所以可能有更好的方法可以做到这一点。

-module(average).
-export([write_file/3, read_file/4]).
-export([read_file_worker/5]).
write_file(Filename, BlockSize, ListOfNumbers) ->
    BS = BlockSize*8,
    BinData = [<<X:BS>> || X <- ListOfNumbers],
    {ok, IoDevice} = file:open(Filename, [write, raw, binary]),
    file:write(IoDevice, BinData),
    file:close(IoDevice).
read_file(Filename, BlocksPerProcess, BlockSize, TotalBlockCount) ->
    {ok, SpawnCount} = read_file_spawner(Filename, BlocksPerProcess, BlockSize, TotalBlockCount, 0, 0),
    {ok, Sum} = read_file_listener(SpawnCount, 0),
    io:format("Total sum: ~p~nNumbers seen: ~p~nAverage: ~p~n", [Sum, TotalBlockCount, Sum/TotalBlockCount]).
read_file_spawner(Filename, BlocksPerProcess, BlockSize, TotalBlockCount, BlockOffset, SpawnCount) when BlockOffset < TotalBlockCount ->
    Offset = BlockOffset * BlockSize,
    MaxBlocks = min(BlocksPerProcess, TotalBlockCount - BlockOffset),
    spawn(?MODULE, read_file_worker, [self(), Filename, Offset, BlockSize, MaxBlocks]),
    read_file_spawner(Filename, BlocksPerProcess, BlockSize, TotalBlockCount, BlockOffset + BlocksPerProcess, SpawnCount + 1);
read_file_spawner(_Filename, _BlocksPerProcess, _BlockSize, _TotalBlockCount, _BlockOffset, SpawnCount) ->
    {ok, SpawnCount}.
read_file_listener(0, Accum) ->
    {ok, Accum};
read_file_listener(SpawnCount, Accum) ->
    receive
    {ok, Number} ->
        io:format("Got ~p~n", [Number]),
        read_file_listener(SpawnCount - 1, Accum + Number)
    end.
read_file_worker(MasterPid, Filename, Offset, BlockSize, MaxBlocks) ->
    {ok, IoDevice} = file:open(Filename, [read, raw, binary]),
    {ok, Offset} = file:position(IoDevice, {bof, Offset}),
    {ok, Sum} = read_file_worker_loop(IoDevice, BlockSize, 0, MaxBlocks, 0),
    MasterPid ! {ok, Sum}.
read_file_worker_loop(IoDevice, BlockSize, BlocksRead, MaxBlocks, Accum) when BlocksRead < MaxBlocks ->
    {ok, BinData} = file:read(IoDevice, BlockSize),
    Number = binary:decode_unsigned(BinData),
    read_file_worker_loop(IoDevice, BlockSize, BlocksRead + 1, MaxBlocks, Accum + Number);
read_file_worker_loop(_IoDevice, _BlockSize, _BlocksRead, _MaxBlocks, Accum)  ->
    {ok, Accum}.

在这里,它正在运行,使用 10 个进程从文件中读取 5 个数字:

12> Numbers = [1,1,2,4,6,10,10,1,0,500].                                  
[1,1,2,4,6,10,10,1,0,500]
13> average:write_file("/tmp/test.binary", 32, Numbers).                  
ok
14> average:read_file("/tmp/file.binary", 2, 32, length(Numbers)).        
Got 500
Got 11
Got 6
Got 16
Got 2
Total sum: 535
Numbers seen: 10
Average: 53.5
ok

为什么不为每个文件启动一个进程?我看不到将小文件拆分为块的价值。您的文件已经很小了。这也可以在许多计算机上并行化。

相关内容

  • 没有找到相关文章

最新更新