HBase应用:通过模拟HBase进行单元测试



我的应用程序中有一个从HBase访问数据的方法。它使用scan方法来查询hbase。我想写单元测试用例来测试这个功能。所以我想模拟hbase调用。怎么做呢?我用Mockit来嘲弄。

如果您正在使用Mockito,您可以存根您的类,使它们返回您想要的。

假设您有一个名为HBaseHelper的类和一个名为getData()的方法,该类中使用扫描仪从hbase检索数据。现在假设在另一个类中有另一个名为useData()的方法如下:

public String useData() {
  String data = hbaseHelper.getData();
  // ... Do things with data
  return data;
}

如果你正在使用Mockito,你可以在你的测试中有效地做这样的事情来返回虚拟的"数据",并测试使用这些数据的方法:

import org.mockito.Mock;
import org.mockito.Mockito.when;
@Mock
HBaseHelper hbaseHelper;
@Test
public void testFoo() {
  when(hbaseHelper.getData()).thenReturn("hello world");
  assertThat(useData()).equals("hello world");
}

在Java中可以这样使用HBaseTestingUtility:

private static final HBaseTestingUtility TEST_UTIL =
  new HBaseTestingUtility();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean("hbase.table.sanity.checks", false);
TEST_UTIL.startMiniCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
EnvironmentEdgeManager.reset();
}

另外,你可能需要Thrift server来使用一些客户端库:

private static final HBaseTestingUtility TEST_UTIL =
  new HBaseTestingUtility();
ThriftServer thriftServer;
Thread thriftServerThread;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean("hbase.table.sanity.checks", false);
TEST_UTIL.startMiniCluster();
List<String> args = new ArrayList<>();
port = HBaseTestingUtility.randomFreePort();
args.add("-" + ThriftServer.PORT_OPTION);
args.add(String.valueOf(port));
args.add("-infoport");
int infoPort = HBaseTestingUtility.randomFreePort();
args.add(String.valueOf(infoPort));
args.add("start");
thriftServer = new ThriftServer(TEST_UTIL.getConfiguration());
thriftServerThread = new Thread(new Runnable() {
  @Override
  public void run() {
    thriftServer.doMain(args.toArray(new String[args.size()]));
  }
});
thriftServerThread.setDaemon(true)
thriftServerThread.start();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
EnvironmentEdgeManager.reset();
}

使用pyspark通过py4j:

def setUp(self):
    super(StreamingTest, self).setUp()
    # --- hbase configuration ---
    hbase_testing_utility_clz = self.sparkStreamingContext._jvm.java.lang.Thread.currentThread().getContextClassLoader() 
        .loadClass('org.apache.hadoop.hbase.HBaseTestingUtility')
    self._hbaseTestingUtility = hbase_testing_utility_clz.newInstance()
    self._hbaseTestingUtility.startMiniCluster()

def tearDown(self):
    if self._hbaseTestingUtility is not None:
        self._hbaseTestingUtility.shutdownMiniCluster()

如果需要Thrift server(例如使用happybase客户端库):

def setUp(self):
    super(StreamingTest, self).setUp()
    # --- hbase configuration ---
    hbase_testing_utility_clz = self.sparkStreamingContext._jvm.java.lang.Thread.currentThread().getContextClassLoader() 
        .loadClass('org.apache.hadoop.hbase.HBaseTestingUtility')
    self._hbaseTestingUtility = hbase_testing_utility_clz.newInstance()
    self._hbaseTestingUtility.getConfiguration().setBoolean("hbase.table.sanity.checks", False)  # for thrift
    self._hbaseTestingUtility.startMiniCluster()
    # --- thrift server configuration ---
    thrift_server_clz = self.sparkStreamingContext._jvm.java.lang.Thread.currentThread().getContextClassLoader() 
        .loadClass('org.apache.hadoop.hbase.thrift.ThriftServer')
    # make thrift server instance
    cArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Class, 1)
    cArgs[0] = self._hbaseTestingUtility.getConfiguration().getClass()
    iArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Object, 1)
    iArgs[0] = self._hbaseTestingUtility.getConfiguration()
    self._thriftServer = thrift_server_clz
        .getDeclaredConstructor(cArgs)
        .newInstance(iArgs)
    # prepare server start arguments
    tArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.String, 5)
    port = self._hbaseTestingUtility.randomFreePort()
    self.thrift_port = port
    tArgs[0] = "-port"
    tArgs[1] = str(port)
    tArgs[2] = "-infoport"
    info_port = self._hbaseTestingUtility.randomFreePort()
    tArgs[3] = str(info_port)
    tArgs[4] = "start"
    mArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Class, 1)
    mArgs[0] = tArgs.getClass()
    method = thrift_server_clz.getDeclaredMethod('doMain', mArgs)
    method.setAccessible(True)
    args = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Object, 1)
    # start server in separate thread
    args[0] = tArgs
    self.thrift_server_thread = threading.Thread(target=method.invoke, args=[self._thriftServer, args])
    self.thrift_server_thread.setDaemon(True)
    self.thrift_server_thread.start()

当然hbase和thrift jar应该通过spark submit传递:

--jars path/to/jar1.jar,path/to/jar2.jar, --conf spark.driver.userClassPathFirst=true

相关内容

  • 没有找到相关文章

最新更新