我正在使用Hadoop IPC创建一个序列号生成服务,但是当程序退出时无法停止服务器。有人能帮帮我吗?
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import dk.aau.cs.cloudetl.common.CEConstants;
import dk.aau.cs.cloudetl.hadoop.fs.FSUtil;
public class SequenceServer extends Thread implements ClientProtocol {
Map<String, Integer> seqMap = new HashMap<String, Integer>();
Configuration conf;
Server server;
Path seqFile;
volatile private boolean running = true;
public SequenceServer(Configuration conf) {
try {
this.conf = conf;
this.seqFile = new Path(CEConstants.META_DIR + Path.SEPARATOR
+ "cloudETL.seq");
InetAddress addr = InetAddress.getLocalHost();
server = RPC.getServer(this, addr.getHostName(),CEConstants.SEQ_SERVER_PORT, 5, true, conf);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
readSeqsFromHDFS();
server.start();
System.out.println("=============Start==============");
while(running){
sleep(5000);
}
System.out.println("=============END==============");
} catch (Exception e) {
e.printStackTrace();
}
}
private void readSeqsFromHDFS() {
try {
FileSystem fs = FileSystem.getLocal(conf);
if (fs.exists(seqFile)) {
SequenceFile.Reader reader = new SequenceFile.Reader(fs,
seqFile, conf);
Text key = new Text();
IntWritable value = new IntWritable();
while (reader.next(key, value)) {
String name = key.toString();
int seq = value.get();
seqMap.put(name, seq);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void writeSeqsToHDFS() {
try {
FileSystem fs = FileSystem.getLocal(conf);
Path tmp = new Path(seqFile.getParent() + Path.SEPARATOR
+ "tmp.seq");
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
tmp, Text.class, IntWritable.class);
for (Entry<String, Integer> entry : seqMap.entrySet()) {
String name = entry.getKey();
int seq = entry.getValue();
writer.append(new Text(name), new IntWritable(seq));
}
writer.close();
FSUtil.replaceFile(new File(tmp.toString()),
new File(seqFile.toString()));
} catch (IOException e) {
e.printStackTrace();
}
}
synchronized public void stopServer() {
try {
System.out.println(server.getNumOpenConnections() );
server.stop();
writeSeqsToHDFS();
running = false;
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return versionID;
}
@Override
synchronized public IntWritable nextSeq(Text name) {
String seqName = name.toString();
if (!seqMap.containsKey(seqName)) {
seqMap.put(seqName, new Integer(CEConstants.SEQ_INCR_DELTA));
return new IntWritable(0);
} else {
int ret = seqMap.get(seqName);
seqMap.put(seqName, ret + CEConstants.SEQ_INCR_DELTA);
return new IntWritable(ret);
}
}
public static void main(String[] args) {
SequenceServer server = new SequenceServer(new Configuration());
server.start();
server.stopServer();
}
}
我有另一个客户端程序来获得唯一的数字。我不会在这里发帖。
谢谢你的回答。正如你所说的,我知道问题所在。然而,我目前的问题是无法停止RPC服务器。由于RPC服务器以守护模式运行,即使运行stop(),也无法退出。你可以试试:
import java.io.IOException;
import java.net.InetAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
public class Test {
Server server;
public Test() {
try {
InetAddress addr = InetAddress.getLocalHost();
server = RPC.getServer(this, addr.getHostName(),16000, 5, true, new Configuration());
} catch (IOException e) {
e.printStackTrace();
}
}
public void start(){
server.start();
}
public void stop(){
server.stop();
}
public static void main(String[] args) {
Test test = new Test();
test.start();
test.stop();
}
}
谢谢!但它仍然不起作用。你能试试我的例子吗?您只需复制并保存为Test.java,然后运行它。您将看到它无法退出主线程。
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
interface MyProtocal extends org.apache.hadoop.ipc.VersionedProtocol {
public static final long versionID = 1L;
IntWritable nextSeq(Text name);
}
public class Test implements MyProtocal {
Server server;
public Test() {
try {
InetAddress addr = InetAddress.getLocalHost();
server = RPC.getServer(this, addr.getHostName(), 16000, 5, true,
new Configuration());
} catch (IOException e) {
e.printStackTrace();
}
}
public void start() {
server.start();
}
public void stop() {
server.stop();
}
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return versionID;
}
@Override
public IntWritable nextSeq(Text name) {
return new IntWritable(999);
}
static class SEQ {
MyProtocal client;
public SEQ() {
InetAddress addr;
try {
addr = InetAddress.getLocalHost();
client = (MyProtocal) RPC.waitForProxy(MyProtocal.class,
MyProtocal.versionID,
new InetSocketAddress(addr.getHostName(), 16000),
new Configuration());
} catch (Exception e) {
e.printStackTrace();
}
}
public void print() {
System.out.println(client.nextSeq(new Text("aa")).get());
}
public void stop() {
RPC.stopProxy(client);
}
}
public static void main(String[] args) {
Test server = new Test();
server.start();
SEQ seq = new SEQ();
seq.print();
seq.stop();
server.stop();
}
}
你的设计坏了。为什么需要在单独的线程中执行程序?
它已经在主线程中运行,RPC服务器也在一个单独的线程中运行。
我的建议是删除你自己的线程,只调用没有while()
循环的run方法,并在它之后停止服务器。
通则:实现Runnable
而不是从Thread
扩展
如果你需要坚持你的无用的线程,然后调用你的主方法server.join()
和调用stopServer()从你的run
方法的结束。