TskHandler.cs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. using Ips.Library.Basic;
  2. using Ips.Sps.Scheduling.Entities;
  3. using Ips.Sps.Scheduling.SigAds;
  4. using Ips.Sps.Scheduling.SigLocs;
  5. using Ips.Sps.Scheduling.SigProces;
  6. using System;
  7. using System.Collections.Concurrent;
  8. using System.Collections.Generic;
  9. using System.Linq;
  10. using System.Text;
  11. using System.Threading.Tasks;
  12. namespace Ips.Sps.Scheduling
  13. {
  14. public class TskHandler
  15. {
  16. public TskHandler(RunTsk tsk)
  17. {
  18. _tsk = tsk;
  19. _cts = new CancellationTokenSource();
  20. }
  21. public RunTsk _tsk { get; private set; }
  22. private CancellationTokenSource _cts;
  23. private volatile bool _isRunning;
  24. public bool IsRunning => _isRunning;
  25. public bool IsCancelled => _cts.IsCancellationRequested;
  26. Task sigAdTsk, sigProcessTsk, sigLocTsk;
  27. BlockingCollection<AdSigGroup> adGroupQueue = new BlockingCollection<AdSigGroup>();
  28. BlockingCollection<ProcSigGroup> procGroupQueue = new BlockingCollection<ProcSigGroup>();
  29. public Task Start()
  30. {
  31. return Task.Run(() =>
  32. {
  33. try
  34. {
  35. //启动信号采集
  36. sigAdTsk = Task.Run(StartAdTsk);
  37. //启动信号处理
  38. sigProcessTsk = Task.Run(StartProcTsk);
  39. //启动信号定位
  40. sigLocTsk = Task.Run(StartLocTsk);
  41. //等待信号处理完成
  42. Task.WaitAll(sigAdTsk, sigProcessTsk, sigLocTsk);
  43. }
  44. finally
  45. {
  46. _isRunning = false;
  47. }
  48. });
  49. }
  50. public void Stop()
  51. {
  52. _cts.Cancel();
  53. }
  54. public void WaitStop()
  55. {
  56. while (_isRunning)
  57. {
  58. Task.Delay(100).Wait();
  59. }
  60. }
  61. protected virtual void StartAdTsk()
  62. {
  63. try
  64. {
  65. var worker = AdWorkerFactory.Create(_tsk);
  66. if (worker != null)
  67. {
  68. worker.AdSigGroupStart += (sender, arg) =>
  69. {
  70. adGroupQueue.Add(arg.AdGroup);
  71. };
  72. worker.Execute(_cts.Token);
  73. }
  74. }
  75. catch (Exception ex)
  76. {
  77. IpsLogger.Error($"任务【{_tsk.TskId}-{_tsk.TskName}】采集线程异常",ex);
  78. }
  79. finally
  80. {
  81. adGroupQueue.CompleteAdding();
  82. }
  83. IpsLogger.Info($"任务【{_tsk.TskId}-{_tsk.TskName}】采集线程退出!");
  84. }
  85. protected virtual void StartProcTsk()
  86. {
  87. while (!_cts.IsCancellationRequested && !adGroupQueue.IsCompleted)
  88. {
  89. if (adGroupQueue.TryTake(out AdSigGroup adGroup, Timeout.Infinite, _cts.Token))
  90. {
  91. ProcSigGroup procSigGroup = new ProcSigGroup(_tsk.TskId, adGroup.GroupTime);
  92. while (!adGroup.GroupSigList.IsCompleted)
  93. {
  94. if (adGroup.GroupSigList.TryTake(out AdSigResult adSigResult, Timeout.Infinite, _cts.Token))
  95. {
  96. var procWorker = ProcWorkerFactory.Create(_tsk, adSigResult.Sig.SigType);
  97. procWorker.ProcSigCompleted += (sender, arg) =>
  98. {
  99. procSigGroup.ProcSigList.Add(arg.Result);
  100. };
  101. procWorker.Execute(adSigResult, _cts.Token);
  102. IpsLogger.Info($"任务【{_tsk.TskId}-{_tsk.TskName}】,信号时间:{adSigResult.SigTime:yyyy-MM-dd HH:mm:ss},信号频点:{adSigResult.Sig.SigFreq},文件数量:{adSigResult.PasList.Count}");
  103. }
  104. }
  105. procGroupQueue.Add(procSigGroup);
  106. }
  107. }
  108. procGroupQueue.CompleteAdding();
  109. IpsLogger.Info($"任务【{_tsk.TskId}-{_tsk.TskName}】处理线程退出!");
  110. }
  111. protected virtual void StartLocTsk()
  112. {
  113. while (!_cts.IsCancellationRequested && !procGroupQueue.IsCompleted)
  114. {
  115. if (procGroupQueue.TryTake(out ProcSigGroup procGroup, Timeout.Infinite, _cts.Token))
  116. {
  117. var worker = LocWorkerFactory.Create(_tsk);
  118. worker.Execute(procGroup, _cts.Token);
  119. }
  120. }
  121. }
  122. }
  123. }