读取线程中的解析消息



如果标题有点模糊,请原谅。我会试着更好地解释我正在努力实现的目标。

有一个叫做parsebytes的函数,它是我实现的外部接口的一部分。它需要一个字节数组和一个长度。这个特定程序中的所有解析都在一个线程上运行,所以我想尽快从parsebytes中获取数据,这样它就可以返回到获取更多的数据。我的伪代码方法是:创建一个外部运行的线程(ParserThreadClass)。每次调用parsebytes时,通过循环遍历所有字节并执行byteQueue.add(bytes[i]),将字节放入ParserThreadClass中的队列中。此代码由synchronized(byteQueue)包围实际上,这应该会释放parsebytes,以便返回并获取更多数据。

在这种情况下,我的ParserThreadClass也在运行。这是run()函数中的代码

while (!shutdown) //while the thread is still running
{
synchronized (byteQueue) 
{
bytes.addAll(byteQueue);  //an arraylist
byteQueue.clear();
}
parseMessage();   //this will take the bytes arraylist and build an xml message.
}

我是不是效率太低了?如果是的话,有人能告诉我应该如何解决这个问题吗?

这就是我以前试图解决这个问题的方法。基本上,您有一个生产者线程,就像这里一样,它读取文件并将项目放入队列。然后您有一个工作线程,它从队列中读取并处理它们。代码如下,但它看起来与您正在做的基本相同。我发现这几乎没有加快速度,因为相对于磁盘读取,我每行需要做的处理非常快。如果你必须进行的解析非常密集,或者块非常大,你可以通过这种方式加快速度。但是,如果它非常小,就不要期望看到太多性能改进,因为这个过程是受IO限制的。在这些情况下,您需要并行化磁盘访问,而这在一台机器上是无法实现的。

public static LinkedBlockingQueue<Pair<String, String>> mappings;
public static final Pair<String, String> end =
new Pair<String, String>("END", "END");
public static AtomicBoolean done;
public static NpToEntityMapping mapping;
public static Set<String> attested_nps;
public static Set<Entity> possible_entities;
public static class ProducerThread implements Runnable {
private File f;
public ProducerThread(File f) {
this.f = f;
}
public void run() {
try {
BufferedReader reader = new BufferedReader(new FileReader(f));
String line;
while ((line = reader.readLine()) != null) {
String entities = reader.readLine();
String np = line.trim();
mappings.put(new Pair<String, String>(np, entities));
}
reader.close();
for (int i=0; i<num_threads; i++) {
mappings.put(end);
}
} catch (InterruptedException e) {
System.out.println("Producer thread interrupted");
} catch (IOException e) {
System.out.println("Producer thread threw IOException");
}
}
}
public static class WorkerThread implements Runnable {
private Dictionary dict;
private EntityFactory factory;
public WorkerThread(Dictionary dict, EntityFactory factory) {
this.dict = dict;
this.factory = factory;
}
public void run() {
try {
while (!done.get()) {
Pair<String, String> np_ent = mappings.take();
if (np_ent == end) {
done.set(false);
continue;
}
String entities = np_ent.getRight();
String np = np_ent.getLeft().toLowerCase();
if (attested_nps == null || attested_nps.contains(np)) {
int np_index = dict.getIndex(np);
HashSet<Entity> entity_set = new HashSet<Entity>();
for (String entity : entities.split(", ")) {
Entity e = factory.createEntity(entity.trim());
if (possible_entities != null) {
possible_entities.add(e);
}
entity_set.add(e);
}
mapping.put(np_index, entity_set);
}
}
} catch (InterruptedException e) {
System.out.println("Worker thread interrupted");
}
}
}

编辑:

以下是启动生产者和工作线程的主线程的代码:

Thread producer = new Thread(new ProducerThread(f), "Producer");
producer.start();
ArrayList<Thread> workers = new ArrayList<Thread>();
for (int i=0; i<num_threads; i++) {
workers.add(new Thread(new WorkerThread(dict, factory), "Worker"));
}
for (Thread t : workers) {
t.start();
}
try {
producer.join();
for (Thread t : workers) {
t.join();
}
} catch (InterruptedException e) {
System.out.println("Main thread interrupted...");
}

在生产者线程中完成的工作也应该在主线程中完成,这样就不需要在主代码中启动和连接另一个线程。不过,一定要在浏览文件之前启动工作线程,并在完成工作后加入它们。不过,我不确定这种方式和我在这里的方式之间的性能差异。

最新更新