无法停止Hadoop IPC服务



我正在使用Hadoop IPC创建一个序列号生成服务,但是当程序退出时无法停止服务器。有人能帮帮我吗?

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import dk.aau.cs.cloudetl.common.CEConstants;
import dk.aau.cs.cloudetl.hadoop.fs.FSUtil;
public class SequenceServer extends Thread implements  ClientProtocol {
    Map<String, Integer> seqMap = new HashMap<String, Integer>();
    Configuration conf;
    Server server;
    Path seqFile;
    volatile private boolean running = true;        
    public SequenceServer(Configuration conf) {
        try {
            this.conf = conf;
            this.seqFile = new Path(CEConstants.META_DIR + Path.SEPARATOR
                    + "cloudETL.seq");
            InetAddress addr = InetAddress.getLocalHost();
            server = RPC.getServer(this, addr.getHostName(),CEConstants.SEQ_SERVER_PORT, 5, true, conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void run() {
        try {
            readSeqsFromHDFS();
            server.start();

            System.out.println("=============Start==============");
            while(running){
                sleep(5000);
            }
            System.out.println("=============END==============");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void readSeqsFromHDFS() {
        try {
            FileSystem fs = FileSystem.getLocal(conf);
            if (fs.exists(seqFile)) {
                SequenceFile.Reader reader = new SequenceFile.Reader(fs,
                        seqFile, conf);
                Text key = new Text();
                IntWritable value = new IntWritable();
                while (reader.next(key, value)) {
                    String name = key.toString();
                    int seq = value.get();
                    seqMap.put(name, seq);
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    private void writeSeqsToHDFS() {
        try {
            FileSystem fs = FileSystem.getLocal(conf);
            Path tmp = new Path(seqFile.getParent() + Path.SEPARATOR
                    + "tmp.seq");
            SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
                    tmp, Text.class, IntWritable.class);
            for (Entry<String, Integer> entry : seqMap.entrySet()) {
                String name = entry.getKey();
                int seq = entry.getValue();
                writer.append(new Text(name), new IntWritable(seq));
            }
            writer.close();
            FSUtil.replaceFile(new File(tmp.toString()),
                    new File(seqFile.toString()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    synchronized public void stopServer() {
        try {
            System.out.println(server.getNumOpenConnections() );
            server.stop();
            writeSeqsToHDFS();
            running = false;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    @Override
    public long getProtocolVersion(String protocol, long clientVersion)
            throws IOException {
        return versionID;
    }
    @Override
    synchronized public IntWritable nextSeq(Text name) {
        String seqName = name.toString();
        if (!seqMap.containsKey(seqName)) {
            seqMap.put(seqName, new Integer(CEConstants.SEQ_INCR_DELTA));
            return new IntWritable(0);
        } else {
            int ret = seqMap.get(seqName);
            seqMap.put(seqName, ret + CEConstants.SEQ_INCR_DELTA);
            return new IntWritable(ret);
        }
    }
    public static void main(String[] args) {
        SequenceServer server = new SequenceServer(new Configuration());
        server.start();
        server.stopServer();
    }
}

我有另一个客户端程序来获得唯一的数字。我不会在这里发帖。


谢谢你的回答。正如你所说的,我知道问题所在。然而,我目前的问题是无法停止RPC服务器。由于RPC服务器以守护模式运行,即使运行stop(),也无法退出。你可以试试:

import java.io.IOException;
import java.net.InetAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
public class Test {
    Server server;
    public Test() {
        try {
            InetAddress addr = InetAddress.getLocalHost();
            server = RPC.getServer(this, addr.getHostName(),16000, 5, true, new Configuration());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public void start(){
        server.start();
    }
    public void stop(){
        server.stop();
    }
    public static void main(String[] args) {
        Test test = new Test();
        test.start();
        test.stop();
    }
}

谢谢!但它仍然不起作用。你能试试我的例子吗?您只需复制并保存为Test.java,然后运行它。您将看到它无法退出主线程。

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
interface MyProtocal extends org.apache.hadoop.ipc.VersionedProtocol {
    public static final long versionID = 1L;
    IntWritable nextSeq(Text name);
}
public class Test implements MyProtocal {
    Server server;
    public Test() {
        try {
            InetAddress addr = InetAddress.getLocalHost();
            server = RPC.getServer(this, addr.getHostName(), 16000, 5, true,
                    new Configuration());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public void start() {
        server.start();
    }
    public void stop() {
        server.stop();
    }
    @Override
    public long getProtocolVersion(String protocol, long clientVersion)
            throws IOException {
        return versionID;
    }
    @Override
    public IntWritable nextSeq(Text name) {
        return new IntWritable(999);
    }
    static class SEQ {
        MyProtocal client;
        public SEQ() {
            InetAddress addr;
            try {
                addr = InetAddress.getLocalHost();
                client = (MyProtocal) RPC.waitForProxy(MyProtocal.class,
                        MyProtocal.versionID,
                        new InetSocketAddress(addr.getHostName(), 16000),
                        new Configuration());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        public void print() {
            System.out.println(client.nextSeq(new Text("aa")).get());
        }
        public void stop() {
            RPC.stopProxy(client);
        }
    }
    public static void main(String[] args) {
        Test server = new Test();
        server.start();
        SEQ seq = new SEQ();
        seq.print();
        seq.stop();
        server.stop();
    }
}

你的设计坏了。为什么需要在单独的线程中执行程序?

它已经在主线程中运行,RPC服务器也在一个单独的线程中运行。

我的建议是删除你自己的线程,只调用没有while()循环的run方法,并在它之后停止服务器。

通则:实现Runnable而不是从Thread扩展

如果你需要坚持你的无用的线程,然后调用你的主方法server.join()和调用stopServer()从你的run方法的结束。

相关内容

  • 没有找到相关文章

最新更新