C#窗口线程服务创建问题



我对电子邮件发送窗口服务有问题。服务每延迟三分钟就会启动一次,并从数据库中获取要发送的消息,然后开始发送

        MessageFilesHandler MFHObj = new MessageFilesHandler();
        List<Broadcostmsg> imidiateMsgs = Manager.GetImidiateBroadCastMsgs(conString);
        if (imidiateMsgs.Count > 0)
        {
           // WriteToFileImi(strLog);
            Thread imMsgThread = new Thread(new             ParameterizedThreadStart(MFHObj.SendImidiatBroadcast));              
            imMsgThread.IsBackground = true;
            imMsgThread.Start(imidiateMsgs);
        }

这会将消息发送到较大的列表,并且需要很长时间才能完成向较大列表的发送。现在,当on消息仍在发送,并且服务获得要发送的新消息时,问题就出现了,上一次发送被取消,开始发送新消息,尽管我使用的是线程,但每次服务获得消息发送时,都会启动一个新线程。你能帮助我在代码中出错的地方吗。

我认为您在一个循环中使用代码,该循环WAITS用于新消息,您管理了这些等待吗??让我们看看:

while(imidiateMsgs.Count == 0)
{
    //Wait for new Message
}
//Now you have a new message Here
//Make a new thread to process message

有不同的等待方法,我建议使用BlockingQueues:

公共区域:

BlockingCollection<Broadcostmsg> imidiateMsgs = new BlockingCollection<Broadcostmsg>();

在您的消费者(生成消息的线程)中:

SendImidiatBroadcast = imidiateMsgs.Take();//this will wait for new message
//Now you have a new message Here
//Make a new thread to process message

在生产者(回答消息的线程):

imidiateMsgs.Add(SendImidiatBroadcast);

您必须使用线程池来每次生成新线程来回答消息,而不是每次初始化新线程。

看起来需要构建一个消费者-生产者队列。其中生产者将继续向列表中添加消息,消费者将从该列表中选择项目并对其进行一些操作我唯一担心的是,你每次都会创建一个新的线程来发送电子邮件,而不是从线程池中挑选线程。若您继续创建越来越多的线程,应用程序的性能将由于上下文切换造成的开销而降低。

如果你使用的是.Net框架工作4.0,那就很容易了。您可以使用System.Collections.Concurrent.CurrentQueue对项目进行排队和出队。它是线程安全的,因此不需要锁定对象。使用"任务"处理您的邮件。

BlockingCollection在其构造函数中采用IProducerConsumerCollection,或者如果您调用其空构造函数,则默认情况下它将使用ConcurrentQueue。

因此,将您的信息排入队列。

//define a blocking collectiom
var blockingCollection = new BlockingCollection<string>();
//Producer
Task.Factory.StartNew(() =>
{
    while (true)
    {
        blockingCollection.Add("value" + count);
        count++;                    
    }
});
//consumer
//GetConsumingEnumerable would wait until it find some item for work
// its similar to while(true) loop that we put inside consumer queue
Task.Factory.StartNew(() =>
{
    foreach (string value in blockingCollection.GetConsumingEnumerable())
    {
        Console.WriteLine("Worker 1: " + value);
    }                
});

更新

由于您使用的是FrameWork 3.5。我建议你看看Joseph Albahari对消费者/生产者队列的实现。这是你能发现的最好的一个。

直接从上面的链接中获取代码

public class PCQueue
{
  readonly object _locker = new object();
  Thread[] _workers;
  Queue<Action> _itemQ = new Queue<Action>();
  public PCQueue (int workerCount)
  {
    _workers = new Thread [workerCount];
    // Create and start a separate thread for each worker
    for (int i = 0; i < workerCount; i++)
      (_workers [i] = new Thread (Consume)).Start();
  }
  public void Shutdown (bool waitForWorkers)
  {
    // Enqueue one null item per worker to make each exit.
    foreach (Thread worker in _workers)
      EnqueueItem (null);
    // Wait for workers to finish
    if (waitForWorkers)
      foreach (Thread worker in _workers)
        worker.Join();
  }
  public void EnqueueItem (Action item)
  {
    lock (_locker)
    {
      _itemQ.Enqueue (item);           // We must pulse because we're
      Monitor.Pulse (_locker);         // changing a blocking condition.
    }
  }
  void Consume()
  {
    while (true)                        // Keep consuming until
    {                                   // told otherwise.
      Action item;
      lock (_locker)
      {
        while (_itemQ.Count == 0) Monitor.Wait (_locker);
        item = _itemQ.Dequeue();
      }
      if (item == null) return;         // This signals our exit.
      item();                           // Execute item.
    }
  }
}

这种方法的优点是可以控制为优化性能而需要创建的线程数。使用线程池方法,尽管它是安全的,但您不能控制可以同时创建的线程数量。

最新更新