我开发一个服务软件A,A接收从通信软件B发过来的包含很多个设备信息的数据包,B发送数据包的频率很快(2s);我现在用一个单线程在服务软件A中对B发过来的数据包进行解析,先把解析出来的设备信息保存在内存中的一个集合S中(只维持每个设备最新的一条数据),然后再把解析出来的数据发给不同的客户端C(有多个C1,C2,C3。。。。。)。测试后发现在A中处理包的速度跟不上B发送包的速度,会导致B发过来的很多包得不到及时的处理。我考虑过用多线程,但还没有什么思路,如果用多线程的话我这个内存的集合S要怎么来管理。另外,整个过程对数据的实时性要求很高(就是B发送包后,在客户端C中就能很快的接收到该包),望各位大虾指教!
2 回答
呼如林
TA贡献1798条经验 获得超3个赞
典型的生产、消费模型,建议用 Queue<T> 非线程安全,需要自己同步。
1 public class Message 2 { 3 private string iD; 4 5 public string ID 6 { 7 get { return iD; } 8 set { iD = value; } 9 } 10 11 private int number; 12 13 public int Number 14 { 15 get { return number; } 16 set { number = value; } 17 } 18 }
1 ///<summary> 2 /// 信息处理单元,封装了线程、状态和同步对象 3 ///</summary> 4 class Worker 5 { 6 public AutoResetEvent State = new AutoResetEvent(false); 7 8 public Thread Thread = null; 9 10 ///<summary> 11 /// 表示线程是否在运行着 12 ///</summary> 13 public volatile bool IsBusy = false; 14 }
1 public class MessageScheduler 2 { 3 #region Private Fields 4 5 ///<summary> 6 /// 线程对象列表 7 ///</summary> 8 private List<Worker> workerList = new List<Worker>(); 9 10 ///<summary> 11 /// 消息队列 12 ///</summary> 13 private Queue<Message> messageList = new Queue<Message>(); 14 15 ///<summary> 16 /// 消息队列同步对象 17 ///</summary> 18 private readonly object syncObject = new object(); 19 20 ///<summary> 21 /// 状态 22 ///</summary> 23 private volatile bool running; 24 25 #endregion 26 27 28 #region Constructors 29 30 ///<summary> 31 /// 创建调度器(线程数量不要太多,否则线程的切换损耗很大) 32 ///</summary> 33 ///<param name="threadCount">要创建的线程数量</param> 34 public MessageScheduler(int threadCount) 35 { 36 running = false; 37 InitializeWorkerCount(threadCount); 38 } 39 40 #endregion 41 42 #region Private Methods 43 44 ///<summary> 45 /// 创建线程 46 ///</summary> 47 ///<param name="threadCount">要创建线程的数量</param> 48 private void InitializeWorkerCount(int threadCount) 49 { 50 Worker worker; 51 for (int i = 0; i < threadCount; i++) 52 { 53 worker = new Worker(); 54 worker.Thread = new Thread(new ParameterizedThreadStart(this.ProcessMessage)); 55 worker.Thread.IsBackground = true; 56 workerList.Add(worker); 57 } 58 } 59 60 ///<summary> 61 /// 消息处理程序 62 ///</summary> 63 ///<param name="param"></param> 64 private void ProcessMessage(object param) 65 { 66 Worker worker = param as Worker; 67 Message message = null; 68 worker.State.WaitOne(); 69 while (true) 70 { 71 //消息处理程序一定不要放到lock里面,否则多线程的性能可能会比单线程性能还低(线程切换会带来损耗) 72 lock (this.syncObject) 73 { 74 if (messageList.Count > 0) 75 { 76 worker.IsBusy = true; 77 message = messageList.Dequeue(); 78 } 79 } 80 81 if (message != null) 82 { 83 //进行消息处理,可能比较消耗CPU和时间。 84 //这里仅仅输出消息的Number 85 Console.WriteLine(message.Number); 86 87 } 88 else 89 { 90 worker.IsBusy = false; 91 worker.State.WaitOne(); 92 } 93 } 94 } 95 96 #endregion 97 98 #region Public Methods 99 100 ///<summary> 101 /// 启动调度器 102 ///</summary> 103 public void Start() 104 { 105 running = true; 106 foreach (Worker worker in workerList) 107 { 108 worker.Thread.Start(worker); 109 } 110 } 111 112 ///<summary> 113 /// 停止调度器 114 ///</summary> 115 public void Stop() 116 { 117 running = false; 118 foreach (Worker worker in workerList) 119 { 120 worker.Thread.Abort(); 121 } 122 } 123 124 ///<summary> 125 /// 消息调度 126 ///</summary> 127 ///<param name="message"></param> 128 public void DoWork(Message message) 129 { 130 if (!running) 131 { 132 return; 133 } 134 lock (this.syncObject) 135 { 136 this.messageList.Enqueue(message); 137 } 138 foreach (Worker worker in this.workerList) 139 { 140 //如果某线程处于等待,则通知继续 141 if (!worker.IsBusy) 142 { 143 worker.State.Set(); 144 } 145 } 146 } 147 148 #endregion 149 }
1 class Program 2 { 3 static MessageScheduler scheduler = new MessageScheduler(5); 4 5 static void Main(string[] args) 6 { 7 scheduler.Start(); 8 System.Threading.Thread thread = new System.Threading.Thread(CreateMessage); 9 thread.IsBackground = true; 10 thread.Start(); 11 Console.ReadLine(); 12 } 13 14 static void CreateMessage() 15 { 16 Message message; 17 int i = 0; 18 while (true) 19 { 20 message = new Message(); 21 message.ID = Guid.NewGuid().ToString(); 22 message.Number = i; 23 scheduler.DoWork(message); 24 i++; 25 System.Threading.Thread.Sleep(1); 26 } 27 } 28 }
qq_笑_17
TA贡献1818条经验 获得超7个赞
首先要肯定你的问题出在哪里。
是在解析数据上还是在发送到C哪里。
根据你说的情况,我初步估计你在把数据发给C时用的同步方法。
也就是说你需要等到C接收完数据,你才会处理下一次发送或接收。
你只需发送到C时用异步发送即可
- 2 回答
- 0 关注
- 538 浏览
添加回答
举报
0/150
提交
取消