MongoDB C#驱动程序不释放连接然后错误



我使用最新版本的MongoDB(在win64服务器上)和c#驱动程序。我有一个windows服务,每分钟做800读取和更新,几分钟后,当前使用的线程超过200,然后每一个mongodb调用给出这个错误:

System.IO.IOException: Unable to read data from the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond. ---> System.Net.Sockets.SocketException: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond
   at System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags)
   at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size)

我对正在读取的字段有一个索引,所以这不是问题。下面是read的代码:

public static UserUpdateMongo Find(int userId, long deviceId)
{
    return Collection().Find(
        Query.And(
            Query.EQ("UserId", userId),
            Query.EQ("DeviceId", deviceId))).FirstOrDefault();
}

我像这样实例化连接:

var settings = new MongoServerSettings
{
    Server = new MongoServerAddress(segments[0], Convert.ToInt32(segments[1])),MaxConnectionPoolSize = 1000};
    Server = MongoServer.Create(settings);
}

是我做错了什么还是c#驱动程序有问题?帮助! !

c#驱动程序有一个连接池,默认连接池的最大大小是100。因此,从单个c#客户端进程到mongod的连接不应该超过100个。c#驱动程序的1.1版本在重载情况下偶尔会出现问题,其中一个连接上的错误可能会导致大量的断开连接和连接。您可以通过查看服务器日志来判断是否发生了这种情况,每次打开或关闭连接时都会写入日志条目。如果是这样,你可以试试本周发布的1.2 c#驱动程序吗?

您本不应该创建挂起更新队列。连接池通过限制并发请求的数量来充当排序队列。

如果你能在服务器日志中找到任何东西,请告诉我,如果还有什么我可以帮助你的。

解决方案是停止在每个线程上保存记录,并开始将它们添加到内存中的"pending to save"列表中。然后有一个单独的线程,并处理所有保存到mongodb同步。我不知道为什么异步调用会导致c#驱动程序出错,但现在工作得很好。下面是一些示例代码,如果其他人遇到这个问题:

public static class UserUpdateSaver
    {
        public static List<UserUpdateView> PendingUserUpdates;
        public static void Initialize()
        {
            PendingUserUpdates = new List<UserUpdateView>();
            var saveUserUpdatesTime = Convert.ToInt32(ConfigurationBL.ReadApplicationValue("SaveUserUpdatesTime"));
            LogWriter.Write("Setting up timer to save user updates every " + saveUserUpdatesTime + " seconds", LoggingEnums.LogEntryType.Warning);
            var worker = new BackgroundWorker();
            worker.DoWork += delegate(object s, DoWorkEventArgs args)
            {
                while (true)
                {//process pending user updates every x seconds.
                    Thread.Sleep(saveUserUpdatesTime * 1000);
                    ProcessPendingUserUpdates();
                }
            };
            worker.RunWorkerAsync();
        }
        public static void AddUserUpdateToSave(UserUpdateView userUpdate)
        {
            Monitor.Enter(PendingUserUpdates);
            PendingUserUpdates.Add(userUpdate);
            Monitor.Exit(PendingUserUpdates);
        }
        private static void ProcessPendingUserUpdates()
        {
            //get pending user updates.
            var pendingUserUpdates = new List<UserUpdateView>(PendingUserUpdates);
            if (pendingUserUpdates.Count > 0)
            {
                var startDate = DateTime.Now;
                foreach (var userUpdate in pendingUserUpdates)
                {
                    try
                    {
                        UserUpdateStore.Update(userUpdate);
                    }
                    catch (Exception exc)
                    {
                        LogWriter.WriteError(exc);
                    }
                    finally
                    {
                        Monitor.Enter(PendingUserUpdates);
                        PendingUserUpdates.Remove(userUpdate);
                        Monitor.Exit(PendingUserUpdates);
                    }
                }
                var duration = DateTime.Now.Subtract(startDate);
                LogWriter.Write(String.Format("Processed {0} user updates in {1} seconds",
                    pendingUserUpdates.Count, duration.TotalSeconds), LoggingEnums.LogEntryType.Warning);
            }
            else
            {
                LogWriter.Write("No user updates to process", LoggingEnums.LogEntryType.Warning);
            }
        }
    }

您听说过消息队列吗?您可以放置一堆盒子来处理这样的负载,并使用消息队列机制将数据保存到mongodb。但是,在这种情况下,您的消息队列必须能够运行并发发布订阅。一个免费的消息队列(在我看来非常好)是使用RabbitMQ的MassTransit。

工作流程如下:1. 在消息队列中发布数据;2. 一旦它在那里,启动尽可能多的盒子你想要的订阅者保存和处理你的mongo数据。

最新更新