我使用最新版本的MongoDB(在win64服务器上)和c#驱动程序。我有一个windows服务,每分钟做800读取和更新,几分钟后,当前使用的线程超过200,然后每一个mongodb调用给出这个错误:
System.IO.IOException: Unable to read data from the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond. ---> System.Net.Sockets.SocketException: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond
at System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags)
at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size)
我对正在读取的字段有一个索引,所以这不是问题。下面是read的代码:
public static UserUpdateMongo Find(int userId, long deviceId)
{
return Collection().Find(
Query.And(
Query.EQ("UserId", userId),
Query.EQ("DeviceId", deviceId))).FirstOrDefault();
}
我像这样实例化连接:
var settings = new MongoServerSettings
{
Server = new MongoServerAddress(segments[0], Convert.ToInt32(segments[1])),MaxConnectionPoolSize = 1000};
Server = MongoServer.Create(settings);
}
是我做错了什么还是c#驱动程序有问题?帮助! !
c#驱动程序有一个连接池,默认连接池的最大大小是100。因此,从单个c#客户端进程到mongod的连接不应该超过100个。c#驱动程序的1.1版本在重载情况下偶尔会出现问题,其中一个连接上的错误可能会导致大量的断开连接和连接。您可以通过查看服务器日志来判断是否发生了这种情况,每次打开或关闭连接时都会写入日志条目。如果是这样,你可以试试本周发布的1.2 c#驱动程序吗?
您本不应该创建挂起更新队列。连接池通过限制并发请求的数量来充当排序队列。
如果你能在服务器日志中找到任何东西,请告诉我,如果还有什么我可以帮助你的。
解决方案是停止在每个线程上保存记录,并开始将它们添加到内存中的"pending to save"列表中。然后有一个单独的线程,并处理所有保存到mongodb同步。我不知道为什么异步调用会导致c#驱动程序出错,但现在工作得很好。下面是一些示例代码,如果其他人遇到这个问题:
public static class UserUpdateSaver
{
public static List<UserUpdateView> PendingUserUpdates;
public static void Initialize()
{
PendingUserUpdates = new List<UserUpdateView>();
var saveUserUpdatesTime = Convert.ToInt32(ConfigurationBL.ReadApplicationValue("SaveUserUpdatesTime"));
LogWriter.Write("Setting up timer to save user updates every " + saveUserUpdatesTime + " seconds", LoggingEnums.LogEntryType.Warning);
var worker = new BackgroundWorker();
worker.DoWork += delegate(object s, DoWorkEventArgs args)
{
while (true)
{//process pending user updates every x seconds.
Thread.Sleep(saveUserUpdatesTime * 1000);
ProcessPendingUserUpdates();
}
};
worker.RunWorkerAsync();
}
public static void AddUserUpdateToSave(UserUpdateView userUpdate)
{
Monitor.Enter(PendingUserUpdates);
PendingUserUpdates.Add(userUpdate);
Monitor.Exit(PendingUserUpdates);
}
private static void ProcessPendingUserUpdates()
{
//get pending user updates.
var pendingUserUpdates = new List<UserUpdateView>(PendingUserUpdates);
if (pendingUserUpdates.Count > 0)
{
var startDate = DateTime.Now;
foreach (var userUpdate in pendingUserUpdates)
{
try
{
UserUpdateStore.Update(userUpdate);
}
catch (Exception exc)
{
LogWriter.WriteError(exc);
}
finally
{
Monitor.Enter(PendingUserUpdates);
PendingUserUpdates.Remove(userUpdate);
Monitor.Exit(PendingUserUpdates);
}
}
var duration = DateTime.Now.Subtract(startDate);
LogWriter.Write(String.Format("Processed {0} user updates in {1} seconds",
pendingUserUpdates.Count, duration.TotalSeconds), LoggingEnums.LogEntryType.Warning);
}
else
{
LogWriter.Write("No user updates to process", LoggingEnums.LogEntryType.Warning);
}
}
}
您听说过消息队列吗?您可以放置一堆盒子来处理这样的负载,并使用消息队列机制将数据保存到mongodb。但是,在这种情况下,您的消息队列必须能够运行并发发布订阅。一个免费的消息队列(在我看来非常好)是使用RabbitMQ的MassTransit。
工作流程如下:1. 在消息队列中发布数据;2. 一旦它在那里,启动尽可能多的盒子你想要的订阅者保存和处理你的mongo数据。