TaskRealService.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics;
  4. using System.IO;
  5. using System.Linq;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using XdCxRhDW.Dto;
  9. using XdCxRhDW.UI.Lib;
  10. namespace X2D1TaskServer.Service
  11. {
  12. /// <summary>
  13. /// 实时任务服务
  14. /// </summary>
  15. public class TaskRealService : TaskService, BaseTaskI
  16. {
  17. Dictionary<int, CancellationTokenSource> dicCts = new Dictionary<int, CancellationTokenSource>();
  18. public TaskRealService()
  19. {
  20. }
  21. /// <summary>
  22. /// 启动实时任务
  23. /// </summary>
  24. /// <param name="dto"></param>
  25. public void StartAsync(X2D1TaskHandleDto dto)
  26. {
  27. var cts = new CancellationTokenSource();
  28. if (dicCts.ContainsKey(dto.ID))
  29. dicCts[dto.ID] = cts;
  30. else
  31. dicCts.Add(dto.ID, cts);
  32. Task.Run(async () =>
  33. {
  34. if (!Directory.Exists(dto.CapDir))
  35. {
  36. await StopTask(dto.ID, EnumTaskStopType.Error, $"文件路径[{dto.CapDir}]不存在,任务结束");
  37. return;
  38. }
  39. if (dto.DateDirFormat.Contains("\\"))
  40. {
  41. await StopTask(dto.ID, EnumTaskStopType.Error, $"子目录日期格式不能包含多级目录格式");
  42. return;
  43. }
  44. await LogUI.Info($"【任务{dto.ID}】开始执行...");
  45. string checkFileTypeStr = "上行信号";
  46. if (_config.checkFileType != 0)
  47. checkFileTypeStr = "主星下行信号";
  48. await LogUI.Info($"【任务{dto.ID}】检测类型={checkFileTypeStr}");
  49. string threadStr = _config.threadCount == 0 ? "不限制" : _config.threadCount.ToString();
  50. await LogUI.Info($"【任务{dto.ID}】线程个数={threadStr}");
  51. bool canConnected = CanGetSatIdFromMySql(dto.ID);
  52. foreach (var taskSig in dto.Sigs)
  53. {
  54. double? delay1 = taskSig.SigDelay.FirstOrDefault(p => p.SatInfoSatCode == dto.MainSatCode)?.Delay;
  55. double? delay2 = taskSig.SigDelay.FirstOrDefault(p => p.SatInfoSatCode == dto.AdjaSatCode)?.Delay;
  56. if (delay1 == null)
  57. delay1 = _config.mainSatDelay;
  58. if (delay2 == null)
  59. delay2 = _config.adjaSatDelay;
  60. if (delay1 == null) delay1 = 0;
  61. if (delay2 == null) delay2 = 0;
  62. await LogUI.Info($"【任务{dto.ID}】信号[{taskSig.FreqUp / 1e6:f3}MHz],主星{dto.MainSatCode}转发时延{delay1}us");
  63. await LogUI.Info($"【任务{dto.ID}】信号[{taskSig.FreqUp / 1e6:f3}MHz],邻星{dto.AdjaSatCode}转发时延{delay2}us");
  64. }
  65. DateTime preTime = DateTime.Now.AddSeconds(-_config.capSeconds * 2);
  66. int formatFlag;
  67. if (string.IsNullOrWhiteSpace(dto.DateDirFormat))
  68. {
  69. //没有日期目录,处理完目录中的数据后停止任务
  70. formatFlag = 0;
  71. }
  72. else if (dto.DateDirFormat.ToLower().EndsWith("dd"))
  73. {
  74. //处理完一个目录后跳转到第二天的目录
  75. formatFlag = 1;
  76. }
  77. else if (dto.DateDirFormat.ToUpper().EndsWith("HH"))
  78. {
  79. //处理完一个目录后跳转到下一个小时的目录
  80. formatFlag = 2;
  81. }
  82. else
  83. {
  84. await StopTask(dto.ID, EnumTaskStopType.Error, $"执行异常,不支持的日期目录格式");
  85. return;
  86. }
  87. while (!cts.IsCancellationRequested)
  88. {
  89. string filesDir = dto.CapDir;
  90. try
  91. {
  92. if (formatFlag != 0)
  93. {
  94. filesDir = Path.Combine(dto.CapDir, $"{preTime.ToString(dto.DateDirFormat)}");//yyyy_MM_dd_HH
  95. }
  96. bool doNextHour = false;
  97. while (!Directory.Exists(filesDir))
  98. {
  99. if (cts.IsCancellationRequested) return;
  100. if (!dicCts.Values.Contains(cts)) return;
  101. await LogUI.Info($"【任务{dto.ID}】目录[{filesDir}]不存在,等待5秒...");
  102. await Task.Delay(5000, cts.Token);
  103. if (DateTime.Now.Hour != preTime.Hour)
  104. {
  105. ResetTime(formatFlag, ref preTime);
  106. doNextHour = true;
  107. break;
  108. }
  109. }
  110. if (cts.IsCancellationRequested) return;
  111. if (doNextHour)
  112. {
  113. continue;
  114. }
  115. await LogUI.Info($"【任务{dto.ID}】正在扫描[{filesDir}]目录...");
  116. Stopwatch sw = new Stopwatch();
  117. sw.Start();
  118. List<string> files = Directory.GetFiles(filesDir, "*.dat").ToList();
  119. sw.Stop();
  120. await LogUI.Info($"【任务{dto.ID}】扫描[{filesDir}]目录完成,耗时{sw.ElapsedMilliseconds}ms");
  121. while (true)
  122. {
  123. if (cts.IsCancellationRequested) return;
  124. if (!files.Any())
  125. {
  126. await LogUI.Info($"【任务{dto.ID}】目录[{filesDir}]中没有文件,等待10秒...");
  127. if (DateTime.Now.Hour != preTime.Hour)
  128. {
  129. ResetTime(formatFlag, ref preTime);
  130. doNextHour = true;
  131. break;
  132. }
  133. await Task.Delay(10000, cts.Token);
  134. files = Directory.GetFiles(filesDir, "*.dat").ToList();
  135. continue;
  136. }
  137. break;
  138. }
  139. if (cts.IsCancellationRequested) return;
  140. List<IGrouping<DateTime, HistoryFile>> groups = null;
  141. bool isLocal = IsLocal(filesDir);
  142. groups = files.Select(f => FileToHistoryFile(dto, f, preTime, canConnected, cts.Token))
  143. .Where(p => p != null)
  144. .GroupBy(m => m.CapTime)
  145. .OrderBy(m => m.Key).ToList();
  146. if (cts.IsCancellationRequested) break;
  147. bool hasFile = false;
  148. if (groups.Any())
  149. {
  150. var lastTime = groups.Last().Key;
  151. foreach (var sameTimeFiles in groups)
  152. {
  153. if (cts.IsCancellationRequested) return;
  154. var capTime = sameTimeFiles.First().CapTime;
  155. if ((DateTime.Now - capTime).TotalMinutes > 10)
  156. {
  157. preTime = capTime;
  158. await LogUI.Warning($"【任务{dto.ID}】处理速度过慢,丢弃数据");
  159. continue;
  160. }
  161. if (capTime == lastTime)
  162. {
  163. await WaitFileEnd(dto.ID, sameTimeFiles.First(), _config.capSeconds, cts);
  164. }
  165. await LogUI.Info($"【任务{dto.ID}】[{capTime:yyyyMMddHHmmss}]时刻文件采集完成");
  166. if (!CanOpenFile(dto.ID, sameTimeFiles.First()))
  167. continue;//文件无法打开
  168. hasFile = true;
  169. var xdgbInfos = sameTimeFiles.GroupBy(m => m.XdIndex);
  170. var splitXdgbInfos = SplitFreqFiles(xdgbInfos);
  171. List<Task> listTask = new List<Task>();
  172. foreach (var item in splitXdgbInfos)//splitXdgbInfos中的数据并行处理
  173. {
  174. if (cts.IsCancellationRequested) return;
  175. var task = GetPosTask(dto, item, isLocal, cts);
  176. listTask.Add(task);
  177. }
  178. await Task.WhenAll(listTask);
  179. preTime = capTime;
  180. }
  181. }
  182. if (!hasFile || preTime.Minute == 59)
  183. {
  184. await LogUI.Info($"【任务{dto.ID}】缺少{preTime:yyyyMMddHHmmss}时刻之后的采集文件,等待5秒...");
  185. //实时任务没有文件需要考虑跳转到下一个小时的目录
  186. var time = new DateTime(preTime.Year, preTime.Month, preTime.Day, preTime.Hour, 0, 0);
  187. var now = DateTime.Now;
  188. if ((int)(now - time).TotalHours != 0)
  189. {
  190. preTime = new DateTime(now.Year, now.Month, now.Day, now.Hour, 0, 0);
  191. }
  192. await Task.Delay(5000, cts.Token);
  193. }
  194. else
  195. {
  196. await LogUI.Info($"【任务{dto.ID}】目录[{filesDir}]本次扫描的数据处理完成");
  197. }
  198. }
  199. catch (TaskCanceledException ex)
  200. {
  201. await LogUI.Warning($"【任务{dto.ID}】目录[{filesDir}]中的数据处理结束,用户手动终止", ex);
  202. ResetTime(formatFlag, ref preTime);
  203. }
  204. catch (Exception ex)
  205. {
  206. await LogUI.Error($"【任务{dto.ID}】目录[{filesDir}]中的数据处理出错,跳过此目录", ex);
  207. ResetTime(formatFlag, ref preTime);
  208. }
  209. }
  210. await StopTask(dto.ID, EnumTaskStopType.Properly, "数据处理完成,任务结束");
  211. }, cts.Token);
  212. }
  213. public bool Stop(int taskID)
  214. {
  215. if (!dicCts.ContainsKey(taskID)) return false;
  216. dicCts[taskID]?.Cancel();
  217. return dicCts.Remove(taskID);
  218. }
  219. public async Task StopTask(int taskID, EnumTaskStopType type, string stopReason)
  220. {
  221. await Task.Delay(2000);
  222. if (type == EnumTaskStopType.Properly)
  223. {
  224. await LogUI.Info($"【任务{taskID}】{stopReason}");
  225. }
  226. else
  227. {
  228. await LogUI.Error($"【任务{taskID}】{stopReason}");
  229. }
  230. TaskStopHandleDto stopDto = new TaskStopHandleDto() { ID = taskID, StopType = type, TaskType = EnumTaskTypeDto.Real, StopReason = stopReason };
  231. var stopResp = await HttpHelper.PostRequestAsync(_config.baseUrl + "Task/StopTask", stopDto);
  232. if (stopResp.code != 200)
  233. {
  234. await LogUI.Error($"【任务{taskID}】停止异常.{stopResp.msg}");
  235. }
  236. if (dicCts.ContainsKey(taskID))
  237. dicCts.Remove(taskID);
  238. }
  239. }
  240. }