使用 Apache Camel/Smallrye/reactive 流 - 如何将"publisher"连接到跨 JVM 的"subscriber"?



下面是我对Apache Camel反应式流解决方案的尝试,该解决方案将发布者连接到订阅者(骆驼路由的代码如下所示(跨JVM的

为了使通信能够跨越 JVM,似乎需要一个"代理"服务器。 因此,我已经实现了Artemis代理并相应地修改了application.properties文件(根据我对如何做到这一点的最佳理解(。

此外,为了缩小焦点,选择使用小黑麦放大器连接器。

问题:

订阅者应接收并记录字符串值(来自正文(:

-
-
-
:blahblahblah
:blahblahblah
:blahblahblah
-
-
-

--相反,它记录值,如下所示:

-
-
-
:Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-289]
:Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-292]
:Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-295]
-
-
-

问题:

为什么发布者发送的有效负载没有到达订阅服务器,我可以修改哪些代码/配置来修复它?

提前感谢任何帮助!

---

"发布者"路线

package aaa.bbb.ccc.jar;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.reactivestreams.Publisher;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@ApplicationScoped
public class CamelPub extends RouteBuilder {
@Inject
CamelContext ctx;
CamelReactiveStreamsService crss;
static int x = 0;
@Outgoing("data")
public Publisher<Exchange> source() {
return crss.from("seda:thesource");
}
@Override
public void configure() {
crss = CamelReactiveStreams.get(ctx);
from("timer://thetimer?period=1000")
.process(new Processor() {
@Override
public void process(Exchange msg) throws Exception {
msg.getIn().setBody("blahblahblah"); //(Integer.toString(x++));
}
})
.log("....... PUB ....... camelpub - body: ${body}")
.to("direct:thesource");
}
}

microprofile-config.properties - publisher

injected.value=Injected value
value=lookup value
# Microprofile server properties
server.port=8084
server.host=0.0.0.0
mp.messaging.outgoing.data.connector=smallrye-amqp
mp.messaging.outgoing.data.host=localhost
mp.messaging.outgoing.data.port=5672
mp.messaging.outgoing.data.username=artuser
mp.messaging.outgoing.data.password=artpassword
mp.messaging.outgoing.data.endpoint-uri:seda:thesource
mp.messaging.outgoing.data.broadcast=true
mp.messaging.outgoing.data.durable=true

相关控制台日志摘录(?( - 发布者

...
--- exec-maven-plugin:1.5.0:exec (default-cli) @ camelpub ---
2019.12.17 22:26:34 INFO io.helidon.microprofile.server.Main Thread[main,5,main]: Logging configured using classpath: /logging.properties
2019.12.17 22:26:35 INFO org.jboss.weld.Version Thread[main,5,main]: WELD-000900: 3.1.1 (Final)
2019.12.17 22:26:35 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-ENV-000020: Using jandex for bean discovery
2019.12.17 22:26:35 WARN org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-001208: Error when validating null@6 against xsd. cvc-complex-type.4: Attribute 'bean-discovery-mode' must appear on element 'beans'.
2019.12.17 22:26:35 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-000101: Transactional services not available. Injection of @Inject UserTransaction not available. Transactional observers will be invoked synchronously.
2019.12.17 22:26:36 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] public org.glassfish.jersey.ext.cdi1x.internal.ProcessAllAnnotatedTypes.processAnnotatedType(@Observes ProcessAnnotatedType<?>, BeanManager) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:26:36 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] private io.helidon.microprofile.openapi.IndexBuilder.processAnnotatedType(@Observes ProcessAnnotatedType<X>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:26:36 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] private org.apache.camel.cdi.CdiCamelExtension.processAnnotatedType(@Observes ProcessAnnotatedType<?>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:26:36 WARN org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-001101: Member of array type or annotation type must be annotated @NonBinding:  [EnhancedAnnotatedMethodImpl] public abstract javax.enterprise.inject.Typed.value()
2019.12.17 22:26:36 INFO org.apache.camel.cdi.CdiCamelExtension Thread[main,5,main]: Camel CDI is starting Camel context [camel-1]
2019.12.17 22:26:36 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Apache Camel 3.0.0 (CamelContext: camel-1) is starting
2019.12.17 22:26:36 INFO org.apache.camel.impl.engine.DefaultManagementStrategy Thread[main,5,main]: JMX is disabled
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2019.12.17 22:26:37 INFO org.apache.camel.component.seda.SedaEndpoint Thread[main,5,main]: Endpoint seda://thesource is using shared queue: seda://thesource with size: 1000
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Route: route1 started and consuming from: timer://thetimer?period=1000
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Total 1 routes, of which 1 are started
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Apache Camel 3.0.0 (CamelContext: camel-1) started in 0.191 seconds
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.ReactiveMessagingExtension Thread[main,5,main]: Analyzing mediator bean: Managed Bean [class aaa.bbb.ccc.jar.CamelPub] with qualifiers [@Any @Default]
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Scanning Type: class aaa.bbb.ccc.jar.CamelPub
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Deployment done... start processing
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Found incoming connectors: []
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Found outgoing connectors: []
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Stream manager initializing...
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Initializing mediators
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Route: route2 started and consuming from: seda://thesource
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Connecting mediators
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Connecting method aaa.bbb.ccc.jar.CamelPub#source to sink data
2019.12.17 22:26:37 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-ENV-002003: Weld SE container e71e38c0-91ec-4758-a310-55f1368c6a9c initialized
2019.12.17 22:26:37 WARNING io.helidon.microprofile.server.Server$Builder Thread[main,5,main]: Failed to find JAX-RS resource to use
2019.12.17 22:26:37 INFO io.helidon.microprofile.security.SecurityMpService Thread[main,5,main]: Security extension for microprofile is enabled, yet security configuration is missing from config (requires providers configuration at key security.providers). Security will not have any valid provider.
2019.12.17 22:26:37 INFO io.smallrye.openapi.api.OpenApiDocument Thread[main,5,main]: OpenAPI document initialized: io.smallrye.openapi.api.models.OpenAPIImpl@57fbc06f
2019.12.17 22:26:38 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 0
2019.12.17 22:26:38 INFO io.helidon.webserver.NettyWebServer Thread[main,5,main]: Version: 1.4.0
2019.12.17 22:26:38 INFO io.helidon.webserver.NettyWebServer Thread[nioEventLoopGroup-2-1,10,main]: Channel '@default' started: [id: 0x52928b67, L:/0:0:0:0:0:0:0:0:8084]
2019.12.17 22:26:38 INFO io.helidon.microprofile.server.ServerImpl Thread[nioEventLoopGroup-2-1,10,main]: Server initialized on http://localhost:8084 (and all other host addresses) in 3668 milliseconds.
2019.12.17 22:26:39 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 1
2019.12.17 22:26:40 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 2
2019.12.17 22:26:41 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 3
2019.12.17 22:26:42 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 4
...

"订阅者"路由

package aaa.bbb.ccc.jar;
import javax.inject.Inject;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import javax.enterprise.context.ApplicationScoped;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.reactivestreams.Subscriber;
@ApplicationScoped
public class CamelSub extends RouteBuilder {
public CamelSub() throws Exception {
}
@Inject
CamelContext ctx;
CamelReactiveStreamsService crss;
@Incoming("data")
public Subscriber<String> sink() {
return crss.subscriber("seda:thesink", String.class);
}    
@Override
public void configure() {
crss = CamelReactiveStreams.get(ctx);
from("seda:thesink")
.convertBodyTo(String.class)
.log("ooooooo SUB ooooooo camelsub - body: ${body}");
}
}

microprofile-config.properties - subscriber

injected.value=Injected value
value=lookup value
# Microprofile server properties
server.port=8082
server.host=0.0.0.0
mp.messaging.incoming.data.connector=smallrye-amqp
mp.messaging.incoming.data.host=localhost
mp.messaging.incoming.data.port=5672
mp.messaging.incoming.data.username=artuser
mp.messaging.incoming.data.password=artpassword
mp.messaging.incoming.data.endpoint-uri:seda:thesink
mp.messaging.incoming.data.broadcast=true
mp.messaging.incoming.data.durable=true

相关控制台日志摘录(?( - 订阅者

...
--- exec-maven-plugin:1.5.0:exec (default-cli) @ camelsub ---
2019.12.17 22:28:09 INFO io.helidon.microprofile.server.Main Thread[main,5,main]: Logging configured using classpath: /logging.properties
2019.12.17 22:28:10 INFO org.jboss.weld.Version Thread[main,5,main]: WELD-000900: 3.1.1 (Final)
2019.12.17 22:28:10 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-ENV-000020: Using jandex for bean discovery
2019.12.17 22:28:10 WARN org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-001208: Error when validating null@6 against xsd. cvc-complex-type.4: Attribute 'bean-discovery-mode' must appear on element 'beans'.
2019.12.17 22:28:10 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-000101: Transactional services not available. Injection of @Inject UserTransaction not available. Transactional observers will be invoked synchronously.
2019.12.17 22:28:10 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] public org.glassfish.jersey.ext.cdi1x.internal.ProcessAllAnnotatedTypes.processAnnotatedType(@Observes ProcessAnnotatedType<?>, BeanManager) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:28:10 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] private io.helidon.microprofile.openapi.IndexBuilder.processAnnotatedType(@Observes ProcessAnnotatedType<X>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:28:10 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] private org.apache.camel.cdi.CdiCamelExtension.processAnnotatedType(@Observes ProcessAnnotatedType<?>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:28:10 WARN org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-001101: Member of array type or annotation type must be annotated @NonBinding:  [EnhancedAnnotatedMethodImpl] public abstract javax.enterprise.inject.Typed.value()
2019.12.17 22:28:11 INFO org.apache.camel.cdi.CdiCamelExtension Thread[main,5,main]: Camel CDI is starting Camel context [camel-1]
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Apache Camel 3.0.0 (CamelContext: camel-1) is starting
2019.12.17 22:28:11 INFO org.apache.camel.impl.engine.DefaultManagementStrategy Thread[main,5,main]: JMX is disabled
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2019.12.17 22:28:11 INFO org.apache.camel.component.seda.SedaEndpoint Thread[main,5,main]: Endpoint seda://thesink is using shared queue: seda://thesink with size: 1000
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Route: route1 started and consuming from: seda://thesink
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Total 1 routes, of which 1 are started
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Apache Camel 3.0.0 (CamelContext: camel-1) started in 0.173 seconds
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.extension.ReactiveMessagingExtension Thread[main,5,main]: Analyzing mediator bean: Managed Bean [class aaa.bbb.ccc.jar.CamelSub] with qualifiers [@Any @Default]
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Scanning Type: class aaa.bbb.ccc.jar.CamelSub
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Deployment done... start processing
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Found incoming connectors: []
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Found outgoing connectors: []
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Stream manager initializing...
2019.12.17 22:28:12 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Initializing mediators
2019.12.17 22:28:12 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Route: route2 started and consuming from: reactive-streams://ID-LAPTOP-4LR4PMVQ-1576639692145-0-1
2019.12.17 22:28:12 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Connecting mediators
2019.12.17 22:28:12 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Attempt to resolve aaa.bbb.ccc.jar.CamelSub#sink
2019.12.17 22:28:12 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Connecting aaa.bbb.ccc.jar.CamelSub#sink to `data` (org.eclipse.microprofile.reactive.streams.operators.core.PublisherBuilderImpl@3eda0aeb)
2019.12.17 22:28:12 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-ENV-002003: Weld SE container c1eaa1fb-486c-4b95-b56b-0f1a7b88f741 initialized
2019.12.17 22:28:12 WARNING io.helidon.microprofile.server.Server$Builder Thread[main,5,main]: Failed to find JAX-RS resource to use
2019.12.17 22:28:12 INFO io.helidon.microprofile.security.SecurityMpService Thread[main,5,main]: Security extension for microprofile is enabled, yet security configuration is missing from config (requires providers configuration at key security.providers). Security will not have any valid provider.
2019.12.17 22:28:12 INFO io.smallrye.openapi.api.OpenApiDocument Thread[main,5,main]: OpenAPI document initialized: io.smallrye.openapi.api.models.OpenAPIImpl@77f905e3
2019.12.17 22:28:12 INFO io.helidon.webserver.NettyWebServer Thread[main,5,main]: Version: 1.4.0
2019.12.17 22:28:12 INFO io.helidon.webserver.NettyWebServer Thread[nioEventLoopGroup-2-1,10,main]: Channel '@default' started: [id: 0xd8f72801, L:/0:0:0:0:0:0:0:0:8082]
2019.12.17 22:28:12 INFO io.helidon.microprofile.server.ServerImpl Thread[nioEventLoopGroup-2-1,10,main]: Server initialized on http://localhost:8082 (and all other host addresses) in 3310 milliseconds.
2019.12.17 22:28:13 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub - body: Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-289]
2019.12.17 22:28:14 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub - body: Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-292]
2019.12.17 22:28:15 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub - body: Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-295]    
...

注意:上面的输出应该显示数字...而不是,例如,"交换[ID-LAPTOP-4LR4PMVQ-1576639597494-0-289]"等... :-(

基本上相同的Maven Pom.xml对于每个

<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>aaa.bbb.ccc</groupId>
<artifactId>[NOTE: essentially same pom.xml for both camelpub or camelsub]</artifactId>
<version>1.0</version>
<properties>
<helidonVersion>1.4.0</helidonVersion>
<package>aaa.bbb.ccc.jar</package>
<failOnMissingWebXml>false</failOnMissingWebXml>
<mpVersion>3.2</mpVersion>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<libs.classpath.prefix>libs</libs.classpath.prefix>
<mainClass>io.helidon.microprofile.server.Main</mainClass>
<jersey.version>2.29</jersey.version>
<copied.libs.dir>${project.build.directory}/${libs.classpath.prefix}</copied.libs.dir>
<camelversion>3.0.0</camelversion>
</properties>
<dependencies>
<dependency>
<groupId>org.eclipse.microprofile</groupId>
<artifactId>microprofile</artifactId>
<version>${mpVersion}</version>
<type>pom</type>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.microprofile.reactive.messaging</groupId>
<artifactId>microprofile-reactive-messaging-api</artifactId>
<version>1.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-api</artifactId>
<version>8.0</version>
<type>jar</type>
</dependency> 
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>${camelversion}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-reactive-streams</artifactId>
<version>${camelversion}</version>
</dependency>       
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-cdi</artifactId>
<version>${camelversion}</version>
</dependency>                              
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-provider</artifactId>
<version>1.0.8</version>
</dependency>          
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-amqp</artifactId>
<version>1.0.8</version>
</dependency>          
<dependency>
<groupId>javax.enterprise</groupId>
<artifactId>cdi-api</artifactId>
<version>2.0</version>
</dependency>          
<dependency>
<groupId>io.helidon</groupId>
<artifactId>helidon-bom</artifactId>
<version>${helidonVersion}</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.jboss</groupId>
<artifactId>jandex</artifactId>
<version>2.1.1.Final</version>
<scope>runtime</scope>
<optional>true</optional>            
</dependency>
<dependency>
<groupId>javax.activation</groupId>
<artifactId>javax.activation-api</artifactId>
<version>1.2.0</version>
<scope>runtime</scope>            
</dependency>
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-binding</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>io.helidon.microprofile.bundles</groupId>
<artifactId>helidon-microprofile-3.0</artifactId>
<version>${helidonVersion}</version>
</dependency>
</dependencies>
<build>
<finalName>[NOTE: essentially same pom.xml for both camelpub or camelsub]</finalName>
<plugins>                             
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>2.5</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>${libs.classpath.prefix}</classpathPrefix>
<mainClass>${mainClass}</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.9</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${copied.libs.dir}</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
<excludeScope>test</excludeScope>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

docker-compose.yml (Artemis(

# A docker compose file to start an Artemis AMQP broker
# more details on https://github.com/vromero/activemq-artemis-docker.
version: '2'
services:
artemis:
image: vromero/activemq-artemis:2.8.0-alpine
ports:
- "8161:8161"
- "61616:61616"
- "5672:5672"
environment:
ARTEMIS_USERNAME: artuser
ARTEMIS_PASSWORD: artpassword

使用的技术

java 8
apache camel
smallrye
artemis
reactive streams/programming

(使用此链接作为资源:https://smallrye.io/smallrye-reactive-messaging/(

您在帖子中提到的问题是一个相当常见的用例,具有一些明确定义的模式来解决问题,在这种情况下,这将合理地涉及设置某种异步消息传递中间件,例如Apache ActiveMQ,RabbitMQ,Apache Kafka等。 这样做为您提供了一种解耦驼峰上下文的完美方法,如文章为什么要使用多个 Camel 上下文中所述?消息通道 EIP(EIP = 企业集成模式(的 Apache Camel 文档中进一步解释了此概念。

我在你上面的帖子中看到你似乎在尝试使用骆驼SEDA。其文档页面指出:

请注意,队列仅在单个 CamelContext 中可见。如果你 想要跨 CamelContext 实例进行通信(例如, 在 Web 应用程序之间进行通信(,请参阅 VM 组件。

此组件不实现任何类型的持久性或恢复, 如果 VM 在尚未处理消息时终止。如果你 需要持久性、可靠性或分布式 SEDA ,请尝试使用其中之一 JMS 或 ActiveMQ。

Camel虚拟机组件在这里也不适合您,因为您的多个Camel上下文分布在不同的服务器上。VM 组件可以在多个 Camel 上下文之间运行,但它们必须都在同一个 JVM 中运行才能相互通信。

由于这些原因,我认为在这种情况下使用某种消息传递中间件没有任何方法。

既然你提到了流媒体,像Apache Kafka这样的东西可能是一个不错的选择。我以前没有处理过这个问题,也无法进一步评论它,但我找到了一篇文章,其中有一个家伙谈论它(参见 Apache Kafka 的反应式流(。Camel有一个Kafka组件,可以用来将所有东西连接在一起。

不是一个通用的简单骆驼答案,但 RSocket 通过网络(在 TCP 套接字、HTTP Websockets 等之上(实现了 RX 编程模型。

https://github.com/rsocket/rsocket-java

它得到了Spring Boot等应用程序框架的良好支持。 但这不是您要求的简单工作示例。

因此,事实证明,将方法签名从...

@Outgoing("data")
public Publisher<Exchange> source() {
...
}

自。。。

@Outgoing("data")
public Publisher<String> source() {
...
}

修复了此问题,以便订阅服务器现在接收并记录发布者发送的值/有效负载

最新更新