我在玩Vert。对于基于事件循环的服务器来说,这与线程/连接模型相反。
public void start(Future<Void> fut) {
vertx
.createHttpServer()
.requestHandler(r -> {
LocalDateTime start = LocalDateTime.now();
System.out.println("Request received - "+start.format(DateTimeFormatter.ISO_DATE_TIME));
final MyModel model = new MyModel();
try {
for(int i=0;i<10000000;i++){
//some simple operation
}
model.data = start.format(DateTimeFormatter.ISO_DATE_TIME) +" - "+LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
r.response().end(
new Gson().toJson(model)
);
})
.listen(4568, result -> {
if (result.succeeded()) {
fut.complete();
} else {
fut.fail(result.cause());
}
});
System.out.println("Server started ..");
}
- 我只是想模拟一个长时间运行的请求处理程序,以了解这个模型是如何工作的。 我观察到的是所谓的事件循环被阻塞,直到我的第一个请求完成。无论花费多少时间,后续请求都不会被处理,直到前一个请求完成。
- 显然我在这里少了一块,这就是我在这里的问题。
根据目前的答案编辑:
- 不接受所有的请求被认为是异步的吗?如果是新的只有在清除前一个连接时才能接受连接关闭,它是如何异步的?
- 假设一个典型的请求需要100毫秒到1秒(根据请求的类型和性质)。它的意思是事件循环在前一个请求之前无法接受新连接结束(即使它在一秒钟内结束)。如果我是一个程序员必须考虑所有这些并将这些请求处理程序推送到工作线程,那么它与线程/连接有什么不同模型?
- 我只是想了解这个模型如何更好地从传统的线程/conn服务器模型?假设没有I/O操作或所有的I/O操作都是异步处理的?怎么解呢C10k问题,当它不能并行启动所有并发请求并且必须等到前一个请求终止时?
即使我决定把所有这些操作推到一个工作线程(池),然后我回到同样的问题,不是吗?线程之间的上下文切换?为赏金编辑和顶部这个问题
- 不完全理解这个模型是如何被声明为异步的
- 绿色。x有一个异步JDBC客户端(异步是关键字),我试图适应RXJava。
- 这是一个代码示例(相关部分)
server.requestStream () .toObservable()。订阅(请求-> {
LocalDateTime start = LocalDateTime.now(); System.out.println("Request for " + req.absoluteURI() +" received - " +start.format(DateTimeFormatter.ISO_DATE_TIME)); jdbc.getConnectionObservable().subscribe( conn -> { // Now chain some statements using flatmap composition Observable<ResultSet> resa = conn.queryObservable("SELECT * FROM CALL_OPTION WHERE UNDERLYING='NIFTY'"); // Subscribe to the final result resa.subscribe(resultSet -> { req.response().end(resultSet.getRows().toString()); System.out.println("Request for " + req.absoluteURI() +" Ended - " +LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME)); }, err -> { System.out.println("Database problem"); err.printStackTrace(); }); }, // Could not connect err -> { err.printStackTrace(); } ); }); server.listen(4568);
- select查询返回完整的表转储大约需要3秒。
- 当我触发并发请求时(只尝试了2个),我看到第二个请求完全等待第一个请求完成。如果JDBC选择是异步的,那么让框架在等待选择查询返回任何东西的同时处理第二个连接,这不是一个公平的期望吗?
实际上,X事件循环是许多平台上存在的经典事件循环。当然,大多数解释和文档都可以找到Node.js,因为它是基于这种架构模式的最流行的框架。看看Node.js事件循环机制的一个或多或少的好解释。绿色。x教程在"不要给我们打电话,我们会给你打电话"one_answers"Verticles"之间也有很好的解释。
编辑你的更新:
首先,当你使用事件循环时,主线程应该非常快地处理所有请求。你不应该在这个循环中做任何长时间的工作。当然,您不应该等待对数据库调用的响应。—异步调度-给result分配一个回调(处理程序)-回调将在工作线程中执行,而不是事件循环线程。例如,这个回调将向套接字返回一个响应。因此,您在事件循环中的操作应该只是调度所有带回调的异步操作,然后进入下一个请求,而不等待任何结果。
假设一个典型的请求需要100毫秒到1秒(根据请求的类型和性质)。
在这种情况下,你的请求有一些计算昂贵的部分或访问IO -你在事件循环中的代码不应该等待这些操作的结果。
我只是想了解这个模型如何更好地从传统的线程/conn服务器模型?假设没有I/O操作或者所有I/O操作都是异步处理的?
当您有太多并发请求和传统编程模型时,您将为每个请求创建线程。这个线程将做什么?它们将主要等待IO操作(例如,来自数据库的结果)。这是对资源的浪费。在我们的事件循环模型中,您有一个主线程来调度操作,并为长任务预先分配了一定数量的工作线程。+这些工人实际上都不等待响应,他们只是可以在等待IO结果时执行另一个代码(它可以通过回调或定期检查当前正在进行的IO作业状态来实现)。我建议您通过Java NIO和Java NIO 2来了解如何在框架内实际实现这种异步IO。绿线也是一个非常相关的概念,这很好理解。绿色线程和协程是一种阴影事件循环,它们试图达到同样的目的——更少的线程,因为我们可以重用系统线程,而绿色线程等待某事。
它如何解决c10k问题,当它不能启动所有并发请求并行,必须等到前一个终止?
可以肯定的是,我们不会在主线程中等待发送前一个请求的响应。获取请求,调度长/IO任务执行,下一个请求。
即使我决定把所有这些操作推到一个工作线程(池),然后我回到同样的问题,不是吗?线程之间的上下文切换?
如果你让一切都正确-不。更重要的是,您将获得良好的数据局部性和执行流预测。一个CPU核心将执行您的短事件循环和调度异步工作,而不需要上下文切换,仅此而已。其他内核调用数据库并返回响应,仅此而已。在回调之间切换或检查不同通道的IO状态实际上不需要任何系统线程的上下文切换——它实际上在一个工作线程中工作。因此,我们每个核心有一个工作线程,这个系统线程等待/检查来自多个数据库连接的结果可用性。重温Java NIO概念,了解它是如何以这种方式工作的。(NIO - proxy-server的经典示例,可以接受许多并行连接(数千),代理请求到其他远程服务器,侦听响应并将响应发送回客户端,所有这些都使用一个或两个线程)
关于您的代码,我为您做了一个示例项目,以演示一切都按预期工作:
public class MyFirstVerticle extends AbstractVerticle {
@Override
public void start(Future<Void> fut) {
JDBCClient client = JDBCClient.createShared(vertx, new JsonObject()
.put("url", "jdbc:hsqldb:mem:test?shutdown=true")
.put("driver_class", "org.hsqldb.jdbcDriver")
.put("max_pool_size", 30));
client.getConnection(conn -> {
if (conn.failed()) {throw new RuntimeException(conn.cause());}
final SQLConnection connection = conn.result();
// create a table
connection.execute("create table test(id int primary key, name varchar(255))", create -> {
if (create.failed()) {throw new RuntimeException(create.cause());}
});
});
vertx
.createHttpServer()
.requestHandler(r -> {
int requestId = new Random().nextInt();
System.out.println("Request " + requestId + " received");
client.getConnection(conn -> {
if (conn.failed()) {throw new RuntimeException(conn.cause());}
final SQLConnection connection = conn.result();
connection.execute("insert into test values ('" + requestId + "', 'World')", insert -> {
// query some data with arguments
connection
.queryWithParams("select * from test where id = ?", new JsonArray().add(requestId), rs -> {
connection.close(done -> {if (done.failed()) {throw new RuntimeException(done.cause());}});
System.out.println("Result " + requestId + " returned");
r.response().end("Hello");
});
});
});
})
.listen(8080, result -> {
if (result.succeeded()) {
fut.complete();
} else {
fut.fail(result.cause());
}
});
}
}
@RunWith(VertxUnitRunner.class)
public class MyFirstVerticleTest {
private Vertx vertx;
@Before
public void setUp(TestContext context) {
vertx = Vertx.vertx();
vertx.deployVerticle(MyFirstVerticle.class.getName(),
context.asyncAssertSuccess());
}
@After
public void tearDown(TestContext context) {
vertx.close(context.asyncAssertSuccess());
}
@Test
public void testMyApplication(TestContext context) {
for (int i = 0; i < 10; i++) {
final Async async = context.async();
vertx.createHttpClient().getNow(8080, "localhost", "/",
response -> response.handler(body -> {
context.assertTrue(body.toString().contains("Hello"));
async.complete();
})
);
}
}
}
输出:
Request 1412761034 received
Request -1781489277 received
Request 1008255692 received
Request -853002509 received
Request -919489429 received
Request 1902219940 received
Request -2141153291 received
Request 1144684415 received
Request -1409053630 received
Request -546435082 received
Result 1412761034 returned
Result -1781489277 returned
Result 1008255692 returned
Result -853002509 returned
Result -919489429 returned
Result 1902219940 returned
Result -2141153291 returned
Result 1144684415 returned
Result -1409053630 returned
Result -546435082 returned
所以,我们接受一个请求-调度一个请求到数据库,转到下一个请求,我们消耗所有的请求,只有当数据库的一切都完成时才为每个请求发送响应。
关于你的代码样本,我看到两个可能的问题-首先,它看起来像你没有close()
连接,这是重要的返回到池。其次,您的池是如何配置的?如果只有一个空闲连接,这些请求将序列化等待这个连接。
我建议您为两个请求添加一些时间戳打印,以找到序列化的位置。你有一些东西使事件循环中的调用被阻塞。还是……检查您是否在测试中并行发送请求。
这是如何异步的?答案就在你的问题中
我观察到的是所谓的事件循环被阻塞,直到我的第一个请求完成。不管花了多少时间,都是后续的在前一个请求完成之前,请求不会被处理
这个想法不是为每个HTTP请求提供新的服务,而是使用被长时间运行的任务阻塞的相同线程。
事件循环的目标是节省上下文从一个线程切换到另一个线程的时间,并在任务使用IO/Network活动时利用理想的CPU时间。如果在处理您的请求时,它必须进行其他IO/网络操作,例如:在此期间从远程MongoDB实例获取数据,您的线程将不会被阻塞,相反,另一个请求将由同一线程提供,这是事件循环模型的理想用例(考虑到您有并发请求来到您的服务器)。
如果你有长时间运行的任务,不涉及网络/IO操作,你应该考虑使用线程池代替,如果你阻塞你的主事件循环线程本身,其他请求将被延迟。也就是说,对于长时间运行的任务,你可以为服务器的响应支付上下文切换的代价。
编辑:服务器处理请求的方式各不相同:
1)为每个传入请求生成一个新线程(在此模型中,上下文切换将很高,并且每次生成一个新线程都有额外的成本)
2)使用线程池为请求提供服务(相同的线程集将用于服务请求,额外的请求将排队)
3)使用事件循环(单个线程处理所有请求)。忽略上下文切换。因为会有一些线程在运行,例如:排队接收请求)
首先,上下文切换并不坏,它需要保持应用程序服务器的响应性,但是,如果并发请求的数量过高(大约超过10k),过多的上下文切换可能是一个问题。如果你想更详细地了解,我建议你阅读C10K文章
假设一个典型的请求花费在100毫秒到1秒之间请求的种类和性质)。它的意思是,事件循环不能接受一个新的连接,直到前一个请求完成(甚至?如果它在一秒钟内结束)。
如果您需要响应大量并发请求(超过10k),我会认为超过500ms是一个较长的运行操作。其次,就像我说的,有一些线程/上下文切换涉及到,例如:排队传入的请求,但是,线程之间的上下文切换将大大减少,因为一次线程太少。第三,如果在解析第一个请求时涉及到网络/IO操作,那么第二个请求将有机会在第一个请求被解析之前被解析,这就是该模型发挥作用的地方。
如果我作为一个程序员不得不思考通过所有这些并将这些请求处理程序推送到工作线程,那么它与线程/连接模型有什么不同呢?
Vertx试图给你最好的线程和事件循环,所以,作为程序员,你可以调用如何使你的应用程序在两种情况下都有效,即长时间运行的操作,有和没有网络/IO操作。
我只是想了解这个模型如何从一个更好的传统的线程/连接服务器模型?假设没有I/O操作或所有的I/O操作都是异步处理的?怎么解c10k呢问题,当它不能并行地启动所有并发请求时要等到前一个结束吗?
以上的解释应该能回答这个问题。
即使我决定把所有这些操作都推给一个worker线程(池),然后我回到同样的问题,不是吗?上下文在线程之间切换?
就像我说的,两者都有优点和缺点,vertx给你两个模型,根据你的用例,你必须选择最适合你的场景。
在这些类型的处理引擎中,您应该将长时间运行的任务转换为异步执行的操作,这是一种执行此操作的方法,以便关键线程可以尽快完成并返回执行另一个任务。也就是说,任何IO操作都被传递给框架,当IO完成时,框架会回叫你。
框架是异步的,因为它支持生成和运行这些异步任务,但它不会将您的代码从同步更改为异步。