我遇到了Mule ESB独立的FTP轮询问题:应用程序运行了几天没有问题,然后FTP轮询停止,没有给出警告或错误。
日志显示FTP轮询的活动迹象,直到它停止。之后什么也没有,但是其他连接器仍然是活动的(主要是SFTP轮询)。我在运行时启用了DEBUG日志,以查看是否仍有活动,并且相应的连接器线程完全沉默,就像停止或阻塞一样。
最后,重新启动应用程序暂时解决了这个问题,但我试图理解为什么会发生这种情况,以避免再次面对它。我怀疑是FTP连接器线程停止或被阻塞,阻止了进一步的轮询。这可能是由我们用来防止轮询后文件删除的扩展ftpmessagerreceiver(覆盖postProcess()函数)引起的。然而,在这个组件和基本FTP接收器和连接器的源代码中,我看不出它是如何发生的。
知道为什么投票会突然停止而不抛出错误吗?
下面是当前的连接器配置:
<ftp:connector name="nonDeletingFtpConnector" doc:name="FTP"
pollingFrequency="${frequency}"
validateConnections="true">
<reconnect frequency="${frequency}" count="${count}"/>
<service-overrides messageReceiver="my.comp.NonDeletingFtpMessageReceiver" />
</ftp:connector>
和相应的端点:
<ftp:inbound-endpoint host="${ftp.source.host}"
port="${ftp.source.port}"
path="${ftp.source.path}"
user="${ftp.source.login}"
responseTimeout="10000"
password="${ftp.source.password}"
connector-ref="archivingFtpConnector"
pollingFrequency="${ftp.default.polling.frequency}">
<file:filename-wildcard-filter pattern="*.zip"/>
</ftp:inbound-endpoint>
messagerreceiver代码:
public class NonDeletingFtpMessageReceiver extends FtpMessageReceiver {
public NonDeletingFtpMessageReceiver(Connector connector, FlowConstruct flowConstruct, InboundEndpoint endpoint, long frequency) throws CreateException {
super(connector, flowConstruct, endpoint, frequency);
}
@Override
protected void postProcess(FTPClient client, FTPFile file, MuleMessage message) throws Exception {
//do nothing
}
}
正如你所看到的,我们定义了一个FtpMessageReceiver来避免在poll上删除文件(这是在流中进一步完成的),但是在代码中,我看不出跳过super.postProcess()调用(负责删除文件)可能会导致问题。
我查看的ftpmessagerreceiver源代码:https://github.com/mulesoft/mule/blob/mule-3.5.0/transports/ftp/src/main/java/org/mule/transport/ftp/FtpMessageReceiver.java
技术配置:
- Mule Standalone 3.5.0
- Ubuntu 14.04.2 LTS
- Java OpenJDK Runtime Environment (IcedTea 2.5.6) (7u79-2.5.6-0ubuntu1.14.04.1)
任何帮助都会很感激。
正如评论中讨论的那样,这个错误更多地与Apache FTP客户端联系在一起,因此我在这里创建了一个特定的帖子。
下面是找到的解决方案:使用自定义FtpConnectionFactory正确配置客户端,超时值> 0。通过这种方式,挂起被中断并抛出超时异常。public class SafeFtpConnectionFactory extends FtpConnectionFactory{
//define a default timeout
public static int defaultTimeout = 60000;
public static synchronized int getDefaultTimeout() {
return defaultTimeout;
}
public static synchronized void setDefaultTimeout(int defaultTimeout) {
SafeFtpConnectionFactory.defaultTimeout = defaultTimeout;
}
public SafeFtpConnectionFactory(EndpointURI uri) {
super(uri);
}
@Override
protected FTPClient createFtpClient() {
FTPClient client = super.createFtpClient();
//Define the default timeout here, which will be used by the socket by default,
//instead of the 0 timeout hanging indefinitely
client.setDefaultTimeout(getDefaultTimeout());
return client;
}
}
然后将其连接到连接器:
<ftp:connector name="archivingFtpConnector" doc:name="FTP"
pollingFrequency="${frequency}"
validateConnections="true"
connectionFactoryClass="my.comp.SafeFtpConnectionFactory">
<reconnect frequency="${reconnection.frequency}" count="${reconnection.attempt}"/>
</ftp:connector>
如果另一个答案有任何明显的变化,我会尽量更新这个答案。