为了账号安全,请及时绑定邮箱和手机立即绑定

.net多线程

.net多线程

侃侃尔雅 2018-12-05 11:05:39
我开发一个服务软件A,A接收从通信软件B发过来的包含很多个设备信息的数据包,B发送数据包的频率很快(2s);我现在用一个单线程在服务软件A中对B发过来的数据包进行解析,先把解析出来的设备信息保存在内存中的一个集合S中(只维持每个设备最新的一条数据),然后再把解析出来的数据发给不同的客户端C(有多个C1,C2,C3。。。。。)。测试后发现在A中处理包的速度跟不上B发送包的速度,会导致B发过来的很多包得不到及时的处理。我考虑过用多线程,但还没有什么思路,如果用多线程的话我这个内存的集合S要怎么来管理。另外,整个过程对数据的实时性要求很高(就是B发送包后,在客户端C中就能很快的接收到该包),望各位大虾指教!
查看完整描述

2 回答

?
呼如林

TA贡献1798条经验 获得超3个赞

典型的生产、消费模型,建议用 Queue<T> 非线程安全,需要自己同步。

 1 public class Message
 2     {
 3         private string iD;
 4 
 5         public string ID
 6         {
 7             get { return iD; }
 8             set { iD = value; }
 9         }
10 
11         private int number;
12 
13         public int Number
14         {
15             get { return number; }
16             set { number = value; }
17         }        
18     }
 1 ///<summary>
 2 /// 信息处理单元,封装了线程、状态和同步对象 
 3 ///</summary>
 4     class Worker
 5     {
 6         public AutoResetEvent State = new AutoResetEvent(false);
 7 
 8         public Thread Thread = null;
 9 
10         ///<summary>
11 /// 表示线程是否在运行着
12 ///</summary>
13         public volatile bool IsBusy = false;
14     }
  1 public class MessageScheduler
  2     {
  3         #region Private Fields
  4 
  5         ///<summary>
  6 /// 线程对象列表
  7 ///</summary>
  8         private List<Worker> workerList = new List<Worker>();
  9 
 10         ///<summary>
 11 /// 消息队列
 12 ///</summary>
 13         private Queue<Message> messageList = new Queue<Message>();
 14 
 15         ///<summary>
 16 /// 消息队列同步对象
 17 ///</summary>
 18         private readonly object syncObject = new object();
 19 
 20         ///<summary>
 21 /// 状态
 22 ///</summary>
 23         private volatile bool running;
 24 
 25         #endregion
 26 
 27 
 28         #region Constructors
 29 
 30         ///<summary>
 31 /// 创建调度器(线程数量不要太多,否则线程的切换损耗很大)
 32 ///</summary>
 33 ///<param name="threadCount">要创建的线程数量</param>
 34         public MessageScheduler(int threadCount)
 35         {
 36             running = false;
 37             InitializeWorkerCount(threadCount);
 38         }
 39 
 40         #endregion
 41 
 42         #region Private Methods
 43 
 44         ///<summary>
 45 /// 创建线程
 46 ///</summary>
 47 ///<param name="threadCount">要创建线程的数量</param>
 48         private void InitializeWorkerCount(int threadCount)
 49         {
 50             Worker worker;
 51             for (int i = 0; i < threadCount; i++)
 52             {
 53                 worker = new Worker();
 54                 worker.Thread = new Thread(new ParameterizedThreadStart(this.ProcessMessage));
 55                 worker.Thread.IsBackground = true;
 56                 workerList.Add(worker);
 57             }
 58         }
 59 
 60         ///<summary>
 61 /// 消息处理程序
 62 ///</summary>
 63 ///<param name="param"></param>
 64         private void ProcessMessage(object param)
 65         {
 66             Worker worker = param as Worker;
 67             Message message = null;
 68             worker.State.WaitOne();
 69             while (true)
 70             {
 71                 //消息处理程序一定不要放到lock里面,否则多线程的性能可能会比单线程性能还低(线程切换会带来损耗)
 72                 lock (this.syncObject)
 73                 {
 74                     if (messageList.Count > 0)
 75                     {
 76                         worker.IsBusy = true;
 77                         message = messageList.Dequeue();
 78                     }
 79                 }
 80                 
 81                 if (message != null)
 82                 {
 83                     //进行消息处理,可能比较消耗CPU和时间。
 84 //这里仅仅输出消息的Number
 85                     Console.WriteLine(message.Number);
 86 
 87                 }
 88                 else
 89                 {
 90                     worker.IsBusy = false;
 91                     worker.State.WaitOne();
 92                 }
 93             }
 94         }
 95 
 96         #endregion
 97 
 98         #region Public Methods
 99 
100         ///<summary>
101 /// 启动调度器
102 ///</summary>
103         public void Start()
104         {
105             running = true;
106             foreach (Worker worker in workerList)
107             {
108                 worker.Thread.Start(worker);
109             }
110         }
111 
112         ///<summary>
113 /// 停止调度器
114 ///</summary>
115         public void Stop()
116         {
117             running = false;
118             foreach (Worker worker in workerList)
119             {
120                 worker.Thread.Abort();
121             }
122         }
123 
124         ///<summary>
125 /// 消息调度
126 ///</summary>
127 ///<param name="message"></param>
128         public void DoWork(Message message)
129         {
130             if (!running)
131             {
132                 return;
133             }
134             lock (this.syncObject)
135             {
136                 this.messageList.Enqueue(message);
137             }
138             foreach (Worker worker in this.workerList)
139             {
140                 //如果某线程处于等待,则通知继续
141                 if (!worker.IsBusy)
142                 {
143                     worker.State.Set();
144                 }
145             }
146         }
147 
148         #endregion      
149     }
 1 class Program
 2     {
 3         static MessageScheduler scheduler = new MessageScheduler(5);
 4         
 5         static void Main(string[] args)
 6         {
 7             scheduler.Start();
 8             System.Threading.Thread thread = new System.Threading.Thread(CreateMessage);
 9             thread.IsBackground = true;
10             thread.Start();
11             Console.ReadLine();
12         }
13 
14         static void CreateMessage()
15         {
16             Message message;
17             int i = 0;
18             while (true)
19             {
20                 message = new Message();
21                 message.ID = Guid.NewGuid().ToString();
22                 message.Number = i;
23                 scheduler.DoWork(message);
24                 i++;
25                 System.Threading.Thread.Sleep(1);                
26             }
27         }
28     }




查看完整回答
反对 回复 2018-12-09
?
qq_笑_17

TA贡献1818条经验 获得超7个赞

首先要肯定你的问题出在哪里。

是在解析数据上还是在发送到C哪里。

根据你说的情况,我初步估计你在把数据发给C时用的同步方法。

也就是说你需要等到C接收完数据,你才会处理下一次发送或接收。

你只需发送到C时用异步发送即可


查看完整回答
反对 回复 2018-12-09
  • 2 回答
  • 0 关注
  • 538 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信