我的应用程序中有一个从HBase访问数据的方法。它使用scan方法来查询hbase。我想写单元测试用例来测试这个功能。所以我想模拟hbase调用。怎么做呢?我用Mockit来嘲弄。
如果您正在使用Mockito,您可以存根您的类,使它们返回您想要的。
假设您有一个名为HBaseHelper
的类和一个名为getData()
的方法,该类中使用扫描仪从hbase检索数据。现在假设在另一个类中有另一个名为useData()
的方法如下:
public String useData() {
String data = hbaseHelper.getData();
// ... Do things with data
return data;
}
如果你正在使用Mockito,你可以在你的测试中有效地做这样的事情来返回虚拟的"数据",并测试使用这些数据的方法:
import org.mockito.Mock;
import org.mockito.Mockito.when;
@Mock
HBaseHelper hbaseHelper;
@Test
public void testFoo() {
when(hbaseHelper.getData()).thenReturn("hello world");
assertThat(useData()).equals("hello world");
}
在Java中可以这样使用HBaseTestingUtility
:
private static final HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean("hbase.table.sanity.checks", false);
TEST_UTIL.startMiniCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
EnvironmentEdgeManager.reset();
}
另外,你可能需要Thrift server来使用一些客户端库:
private static final HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
ThriftServer thriftServer;
Thread thriftServerThread;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean("hbase.table.sanity.checks", false);
TEST_UTIL.startMiniCluster();
List<String> args = new ArrayList<>();
port = HBaseTestingUtility.randomFreePort();
args.add("-" + ThriftServer.PORT_OPTION);
args.add(String.valueOf(port));
args.add("-infoport");
int infoPort = HBaseTestingUtility.randomFreePort();
args.add(String.valueOf(infoPort));
args.add("start");
thriftServer = new ThriftServer(TEST_UTIL.getConfiguration());
thriftServerThread = new Thread(new Runnable() {
@Override
public void run() {
thriftServer.doMain(args.toArray(new String[args.size()]));
}
});
thriftServerThread.setDaemon(true)
thriftServerThread.start();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
EnvironmentEdgeManager.reset();
}
使用pyspark通过py4j:
def setUp(self):
super(StreamingTest, self).setUp()
# --- hbase configuration ---
hbase_testing_utility_clz = self.sparkStreamingContext._jvm.java.lang.Thread.currentThread().getContextClassLoader()
.loadClass('org.apache.hadoop.hbase.HBaseTestingUtility')
self._hbaseTestingUtility = hbase_testing_utility_clz.newInstance()
self._hbaseTestingUtility.startMiniCluster()
def tearDown(self):
if self._hbaseTestingUtility is not None:
self._hbaseTestingUtility.shutdownMiniCluster()
如果需要Thrift server(例如使用happybase
客户端库):
def setUp(self):
super(StreamingTest, self).setUp()
# --- hbase configuration ---
hbase_testing_utility_clz = self.sparkStreamingContext._jvm.java.lang.Thread.currentThread().getContextClassLoader()
.loadClass('org.apache.hadoop.hbase.HBaseTestingUtility')
self._hbaseTestingUtility = hbase_testing_utility_clz.newInstance()
self._hbaseTestingUtility.getConfiguration().setBoolean("hbase.table.sanity.checks", False) # for thrift
self._hbaseTestingUtility.startMiniCluster()
# --- thrift server configuration ---
thrift_server_clz = self.sparkStreamingContext._jvm.java.lang.Thread.currentThread().getContextClassLoader()
.loadClass('org.apache.hadoop.hbase.thrift.ThriftServer')
# make thrift server instance
cArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Class, 1)
cArgs[0] = self._hbaseTestingUtility.getConfiguration().getClass()
iArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Object, 1)
iArgs[0] = self._hbaseTestingUtility.getConfiguration()
self._thriftServer = thrift_server_clz
.getDeclaredConstructor(cArgs)
.newInstance(iArgs)
# prepare server start arguments
tArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.String, 5)
port = self._hbaseTestingUtility.randomFreePort()
self.thrift_port = port
tArgs[0] = "-port"
tArgs[1] = str(port)
tArgs[2] = "-infoport"
info_port = self._hbaseTestingUtility.randomFreePort()
tArgs[3] = str(info_port)
tArgs[4] = "start"
mArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Class, 1)
mArgs[0] = tArgs.getClass()
method = thrift_server_clz.getDeclaredMethod('doMain', mArgs)
method.setAccessible(True)
args = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Object, 1)
# start server in separate thread
args[0] = tArgs
self.thrift_server_thread = threading.Thread(target=method.invoke, args=[self._thriftServer, args])
self.thrift_server_thread.setDaemon(True)
self.thrift_server_thread.start()
当然hbase和thrift jar应该通过spark submit传递:
--jars path/to/jar1.jar,path/to/jar2.jar, --conf spark.driver.userClassPathFirst=true