多线程处理文件行的最佳方法



我有一些单独的文件,我想处理文件的每一行(顺序和独立(,我希望它很快。

所以我写了一个代码来将文件的一大块读入 ram 上的缓冲区,然后多线程将竞争从缓冲区读取行并处理它们。 伪代码如下:

do{
do{      
fread(buffer,500MB,1,file);
// creating threads
// let the threads compete to read from buffer and PROCESS independently
// end of threads
while( EOF not reached )
file = nextfile;
while( there is another file to read )

或者这个:

void mt_ReadAndProcess(){
lock();
fread(buffer,50MB,1,file);
if(EOF reached)
file = nextfile;
unlock();
process();
}
main(){
// create multi threads
// call mt_ReadAndProcess() with multi threads
}

该过程是一个(及时的(昂贵的过程。

有没有更好的方法可以做到这一点? 更快读取文件或使用多线程处理文件的更好方法?

谢谢大家,

阿米尔。

为什么要让线程"竞争从缓冲区读取"? 数据在由执行读取的线程读取时可以轻松分区。 争相从缓冲区获取数据没有任何好处,同时可能会浪费 CPU 和挂钟时间。

由于您是逐行处理的,因此只需从文件中读取行,并通过指针将缓冲区传递给工作线程。

假设您运行在符合 POSIX 标准的系统上,如下所示:

#include <unistd.h>
#include <pthread.h>
#define MAX_LINE_LEN 1024
#define NUM_THREADS 8
// linePipe holds pointers to lines sent to
// worker threads
static int linePipe[ 2 ];
// bufferPipe holds pointers to buffers returned
// from worker threads and used to read data
static int bufferPipe[ 2 ];
// thread function that actually does the work
void *threadFunc( void *arg )
{
const char *linePtr;
for ( ;; )
{
// get a pointer to a line from the pipe
read( linePipe[ 1 ], &linePtr, sizeof( linePtr ) );
// end loop on NULL linePtr value
if ( !linePtr )
{
break;
}
// process line
// return the buffer
write( bufferPipe[ 0 ], &linePtr, sizeof( linePtr ) );
}
return( NULL );
}
int main( int argc, char **argv )
{
pipe( linePipe );
pipe( bufferPipe );
// create buffers and load them into the buffer pipe for reading
for ( int ii = 0; ii < ( 2 * NUM_THREADS ); ii++ )
{
char *buffer = malloc( MAX_LINE_LEN );
write( bufferPipe[ 0 ], &buffer, sizeof( buffer ) );
}
pthread_t tids[ NUM_THREADS ];
for ( int ii = 0; ii < NUM_THREADS; ii++ )
{
pthread_create( &( tids[ ii ] ), NULL, thread_func, NULL );
}
FILE *fp = ...
for ( ;; )
{
char *linePtr;
// get the pointer to a buffer from the buffer pipe 
read( bufferPipe[ 1 ], &linePtr, sizeof( linePtr ) );
// read a line from the current file into the buffer
char *result = fgets( linePtr, MAX_LINE_LEN, fp );
if ( result )
{
// send the line to the worker threads
write( linePipe, &linePtr, sizeof( linePtr ) );
}
else
{
// either end loop, or open another file
fclose( fp );
fp = fopen( ... );
}
}
// clean up and exit
// send NULL to cause worker threads to stop
char *nullPtr = NULL;
for ( int ii = 0; ii < NUM_THREADS; ii++ )
{
write( linePipe[ 0 ], &nullPtr, sizeof( nullPtr ) );
}
// wait for worker threads to stop
for ( int ii = 0; ii < NUM_THREADS; ii++ )
{
pthread_join( tids[ ii ], NULL );
}
return( 0 );
}

相关内容

最新更新