在分布式环境中,Apache Thrift RPC的Java方法实现



我项目的简要说明:我正在编写一个名为" engrightingsnode"的Java类,该类别在有"管理节点"的分布式环境中起作用,就像服务存储库一样,接收和存储和存储其他节点和调度的信息(主机端口号和提供)注册服务提供的方法RPC。如果节点可以回答RPC,则打开节俭插座,并在调用节点和答案节点之间建立连接,并且答案节点返回结果。

我正在使用Apache Thrift作为RPC的IDL和框架。

现在问题。我的engeringsNodeHandler类实现了一个简单的旧货接口,其中包含单个方法" gethello(user)"(用户是包含节点名称的struct,它是engeringsNode类的构造函数的参数)。当一个连接到管理节点的GreetingsNode X制作该方法的RPC时,另一个注册的engeringsNode必须用消息" Hello x"回答。

我不正确理解如何在返回结果的处理程序的一部分中实现部分,因此我无法理解如何编写Junit测试,该测试应该检查方法实现是否正常。

一个断言 assertequals(client.gethello(user).getMessage(),"你好约翰·doe")

会起作用,但在我的情况下,我不明白我应该放置客户的一部分...

招待服务的代码:

struct Message {
    1: string message   
}  
struct User {
    1: string name
}
service GreetingsService {
    Message getHello(1: User user)
}

engeringServiceHandler的代码必须实现engeringsService方法gethello()

public class GreetingsServiceHandler implements GreetingsService.Iface {
private static Random random = new Random(10);
private ManagementService.Client managementClient;
private GreetingsService.Client helloClient;
@Override
public Message getHello(User user) throws TException {
    Message answer = null;
    // class ServiceProvider is generated by thrift, part of ManagementService thrift service 
    ServiceProvider provider = null; 
    List<ServiceProvider>providers = managementClient.getProvidersForService(user.name);
    if (providers.isEmpty())
        throw new NoProviderAvailableException(); //separate file contains Exception
    else {
        provider = providers.get(random.nextInt(providers.size()));
        //connection between nodes is established here
        TTransport helloTransport = new TSocket(provider.getHostName(), provider.getPort());
        TProtocol helloProtocol = new TBinaryProtocol(helloTransport);
        helloClient = new GreetingsService.Client(helloProtocol);
        helloTransport.open();
        // here lies my problem
        answer = helloClient.getHello(user);
        //if I use this instead, then helloClient variable is clearly not used, but of course I need it to answer the method call
        answer = answer.setMessage("Ciao " + user.getName() + ", welcome among us!");
    }
    return answer;
}

和engeringsnode代码如下:

public class GreetingsNode implements NodeIface {
private ThriftServer helloServer;
private ManagementService.Client managementClient;
private NodeManifest nodeManifest;
private User user;
private String name;
public GreetingsNode(NodeManifest nodeManifest, String name) {
    this.nodeManifest = nodeManifest;
    this.helloServer = new ThriftServer(GreetingsServiceHandler.class);
    this.name = name;
}
@Override
public void turnOn() throws TException {
    helloServer.start();
    TSocket helloServerTransport = new TSocket("localhost", Constants.SERVER_PORT);
    TBinaryProtocol helloServerProtocol = new TBinaryProtocol(helloServerTransport);
    managementClient = new ManagementService.Client(helloServerProtocol);
    this.setUser(new User(name));
    helloServerTransport.open();
    helloServer = new ThriftServer(GreetingsServiceHandler.class);
    //portNegotiator is a class described in a separate file, that handles the registration of other nodes to the managementNode. NodeManifest is a file generated by thrift, part of managementService thrift file, describing a struct that contains hostname and port number of nodes.
    PortNegotiator negotiator = new PortNegotiator(managementClient);
    negotiator.negotiate(nodeManifest, helloServer);
}
@Override
public void turnOff() {
    helloServer.stop();
}
public User getUser() {
    return user;
}
public void setUser(User user) {
    this.user = user;
}

处理程序中的基本方法inmp非常简单,类似以下的操作(免责声明:未测试):

@Override
public Message getHello(User user) throws TException {
    Message answer = new Message();
    answer = answer.setMessage("Ciao " + user.getName() + ", welcome among us!");
    return answer;
}

如果我使用它,则显然不会使用helloclient变量,但是我当然需要它来回答方法调用

当一个连接到管理节点的问候节点X制作该方法的RPC时,另一个注册的engeringsnode必须用消息" Hello x"回答。

如果这意味着我们想要一个呼叫序列,例如 client => servera => server b ,那么这也是可能的,只需要轻微的修改。从上面的基本示例开始,我们相应地增强了代码:

private Message callTheOtherNode(User user) {
  // class ServiceProvider is generated by Thrift, 
  // part of ManagementService Thrift service 
  ServiceProvider provider = null; 
  List<ServiceProvider>providers = managementClient.getProvidersForService(user.name);
  if (providers.isEmpty())
      throw new NoProviderAvailableException(); //separate file contains Exception
  provider = providers.get(random.nextInt(providers.size()));
  //connection between nodes is established here
  TTransport helloTransport = new TSocket(provider.getHostName(), provider.getPort());
  TProtocol helloProtocol = new TBinaryProtocol(helloTransport);
  helloClient = new GreetingsService.Client(helloProtocol);
  helloTransport.open();
  return helloClient.getHello(user);
}
@Override
public Message getHello(User user) throws TException {
    Message answer = callTheOtherNode(user);
    return answer;
}

当然,称为"其他节点"需要实际上对请求做点事

最新更新