HistoryTaskService.cs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. using DevExpress.Utils.Extensions;
  2. using MySqlX.XDevAPI.Common;
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Configuration;
  6. using System.Globalization;
  7. using System.IO;
  8. using System.Linq;
  9. using System.Text;
  10. using System.Threading;
  11. using System.Threading.Tasks;
  12. using XdCxRhDW.Dto;
  13. namespace XdCxRhDW.X2D1TaskServer.Service
  14. {
  15. public class HistoryTaskService
  16. {
  17. private readonly string baseUrl;
  18. CancellationTokenSource cts;
  19. public HistoryTaskService()
  20. {
  21. var posPlatformAddr = ConfigurationManager.AppSettings["PosPlatformAddr"].Trim();//like http://127.0.0.1:8091 or http://127.0.0.1:8091/
  22. if (posPlatformAddr.EndsWith("/"))
  23. this.baseUrl = posPlatformAddr + "api/";
  24. else
  25. this.baseUrl = posPlatformAddr + "/api/";
  26. }
  27. public void StartAsync(X2D1HistoryTaskHandleDto dto)
  28. {
  29. cts = new CancellationTokenSource();
  30. LogHelper.Info($"接收到开始执行任务[{dto.TaskName}],ID={dto.ID}");
  31. //点击定位平台右上角查看接口可以在浏览器中查看平台提供的所有接口详细信息
  32. Task.Run(async () =>
  33. {
  34. DateTime preTime = dto.StartTime;
  35. int formatFlag;
  36. if (string.IsNullOrWhiteSpace(dto.DateDirFormat))
  37. {
  38. //没有日期目录,处理完目录中的数据后停止任务
  39. formatFlag = 0;
  40. }
  41. else if (dto.DateDirFormat.ToLower().EndsWith("dd"))
  42. {
  43. //处理完一个目录后跳转到第二天的目录
  44. formatFlag = 1;
  45. }
  46. else if (dto.DateDirFormat.ToUpper().EndsWith("HH"))
  47. {
  48. //处理完一个目录后跳转到下一个小时的目录
  49. formatFlag = 2;
  50. }
  51. else
  52. {
  53. LogHelper.Error($"【任务{dto.ID}】执行异常,不支持的日期目录格式");
  54. return;
  55. }
  56. while (!cts.IsCancellationRequested && preTime <= dto.EndTime)
  57. {
  58. string filesDir = null;
  59. try
  60. {
  61. filesDir = Path.Combine(dto.CapDir, $"{preTime.ToString(dto.DateDirFormat)}");//yyyyMMdd
  62. LogHelper.Info($"【任务{dto.ID}】正在处理[{filesDir}]目录中的数据...");
  63. IEnumerable<string> files;
  64. if (!Directory.Exists(filesDir))
  65. {
  66. LogHelper.Info($"【任务{dto.ID}】目录[{filesDir}]不存在,跳过此目录");
  67. ResetTime(formatFlag, ref preTime);
  68. continue;
  69. }
  70. files = Directory.EnumerateFiles(filesDir, "*.dat");
  71. if (!files.Any())
  72. {
  73. LogHelper.Info($"【任务{dto.ID}】目录[{filesDir}]中没有文件,跳过此目录");
  74. ResetTime(formatFlag, ref preTime);
  75. continue;
  76. }
  77. IOrderedEnumerable<IGrouping<DateTime, HistoryFile>> groups = null;
  78. groups = files.Select(f => FileToHistoryFile(dto, f)).GroupBy(m => m.CapTime).OrderBy(m => m.Key);
  79. foreach (var item in groups)
  80. {
  81. if (cts.IsCancellationRequested) break;
  82. var finfos = item.ToList();
  83. var capTime = finfos.First().CapTime;
  84. if (capTime < dto.StartTime) continue;
  85. if (finfos.Count < 3)
  86. {
  87. LogHelper.Warning($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻文件数量只有{finfos.Count}个,跳过此组数据");
  88. continue;
  89. }
  90. //超短波信号
  91. var dinfo = finfos.FirstOrDefault(m => m.Ch == 1);
  92. if (dinfo == null)
  93. {
  94. LogHelper.Warning($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻未找到超短波信号ch1文件,跳过此组数据");
  95. continue;
  96. }
  97. //主星
  98. var minfo = finfos.FirstOrDefault(m => m.Ch == 2);
  99. if (minfo == null)
  100. {
  101. LogHelper.Warning($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻未找到主星信号ch2文件,跳过此组数据");
  102. continue;
  103. }
  104. //邻1
  105. var ninfo = finfos.FirstOrDefault(m => m.Ch == 3);
  106. if (ninfo == null)
  107. {
  108. LogHelper.Info($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻未找到邻星信号ch3文件,跳过此组数据");
  109. continue;
  110. }
  111. try
  112. {
  113. string mainFile = await HttpHelper.UploadFileAsync(minfo.FilePath, baseUrl + "File/UploadFileAsync", token: cts.Token);//主星文件
  114. string adjaFile = await HttpHelper.UploadFileAsync(ninfo.FilePath, baseUrl + "File/UploadFileAsync", token: cts.Token);//邻星文件
  115. string cdbFile = await HttpHelper.UploadFileAsync(dinfo.FilePath, baseUrl + "File/UploadFileAsync", token: cts.Token);//超短文件
  116. DetectDto detectDto = new DetectDto()
  117. {
  118. file1 = cdbFile,//11局使用上行泄露信号进行检测
  119. dmcType = DmcType.Ky5758,//上行信号目前算法只能使用基于能量的Ky或IBS检测
  120. fsHz = minfo.FsHz,
  121. };
  122. var deteResp = await HttpHelper.PostRequestAsync<List<DetectResDto>>(baseUrl + "DetectCg/DetectCalc", detectDto, token: cts.Token);
  123. if (deteResp.code != 200)
  124. {
  125. LogHelper.Error($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻信号检测出错.{deteResp.msg}");
  126. continue;
  127. }
  128. LogHelper.Info($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻信号检测完成,共{deteResp.data.Count}个时隙");
  129. var smps = deteResp.data.Select(m => new SmpPosition(m.Start, m.Length)).ToList();//怎么补0?
  130. var cgDto = new CpuCgMultiDto()
  131. {
  132. dtCenter = 260000,
  133. dtRange = 60000,
  134. file1 = mainFile,
  135. file2 = cdbFile,
  136. samplingRate = minfo.FsHz,
  137. smpPositions = smps,
  138. snrThreshold = 15,
  139. };
  140. var result1 = await HttpHelper.PostRequestAsync<List<CpuCgResDto>>(baseUrl + "DetectCg/CpuCgMultiCalc", cgDto, token: cts.Token);
  141. if (result1.code != 200)
  142. {
  143. LogHelper.Error($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻主星超短CPU参估出错.{deteResp.msg}");
  144. continue;
  145. }
  146. LogHelper.Info($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻主星超短CPU参估完成.");
  147. cgDto = new CpuCgMultiDto()
  148. {
  149. dtCenter = 260000,
  150. dtRange = 60000,
  151. file1 = adjaFile,
  152. file2 = cdbFile,
  153. samplingRate = minfo.FsHz,
  154. smpPositions = smps,
  155. snrThreshold = 15,
  156. };
  157. var result2 = await HttpHelper.PostRequestAsync<List<CpuCgResDto>>(baseUrl + "DetectCg/CpuCgMultiCalc", cgDto, token: cts.Token);
  158. if (result2.code != 200)
  159. {
  160. LogHelper.Error($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻邻星超短CPU参估出错.{deteResp.msg}");
  161. continue;
  162. }
  163. LogHelper.Info($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻邻星超短CPU参估完成");
  164. var data1 = result1.data;
  165. var data2 = result2.data;
  166. if (data1.Count != data2.Count || data1.Count != deteResp.data.Count)
  167. {
  168. LogHelper.Error($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻参估结果个数和检测结果个数不匹配");
  169. continue;
  170. }
  171. for (int i = 0; i < data1.Count; i++)
  172. {
  173. try
  174. {
  175. if (cts.IsCancellationRequested) break;
  176. X2D1NoXlNoParlPosDto x2D1 = new X2D1NoXlNoParlPosDto()
  177. {
  178. TaskID = dto.ID,
  179. SigTime = minfo.CapTime.AddSeconds(data1[i].Smpstart / minfo.FsHz),
  180. MainCode = minfo.SatId,
  181. AdjaCode = ninfo.SatId,
  182. SxDto = data1[i].Dt,
  183. SxDfo = data1[i].Df,
  184. SxSnr = data1[i].Snr,
  185. XdDto = data2[i].Dt,
  186. XdDfo = data2[i].Df,
  187. XdSnr = data2[i].Snr,
  188. SatTxLon = minfo.CapLon,
  189. SatTxLat = minfo.CapLat,
  190. CdbTxLon = minfo.CapLon,
  191. CdbTxLat = minfo.CapLat,
  192. FreqDown = minfo.FreqHz,
  193. FreqUp = dinfo.FreqHz,
  194. CheckRes = new CheckResDto()
  195. {
  196. FileName = Path.GetFileName(dinfo.FilePath),
  197. ModRate = deteResp.data[i].ModRate,
  198. ModType = deteResp.data[i].ModType,
  199. SmpCount = deteResp.data[i].Length,
  200. SmpStart = deteResp.data[i].Start,
  201. UserName = deteResp.data[i].UserName,
  202. PosCheckType = deteResp.data[i].DmcType.GetEnumByDisplayName<EnumPosCheckTypeDto>(),
  203. }
  204. };
  205. var result = await HttpHelper.PostRequestAsync<PosResDto>(baseUrl + "Pos/PosX2D1NoXlNoParAsync", x2D1);
  206. if (result.code != 200)
  207. {
  208. LogHelper.Error($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻时隙位置{data1[i].Smpstart}定位异常.{result.msg}");
  209. }
  210. }
  211. catch (Exception ex)
  212. {
  213. LogHelper.Error($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻时隙位置{data1[i].Smpstart}定位异常", ex);
  214. }
  215. }
  216. }
  217. catch (Exception ex)
  218. {
  219. LogHelper.Error($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻文件处理异常", ex);
  220. continue;
  221. }
  222. }
  223. ResetTime(formatFlag, ref preTime);
  224. LogHelper.Info($"【任务{dto.ID}】目录[{filesDir}]中的数据处理完成");
  225. }
  226. catch (Exception ex)
  227. {
  228. LogHelper.Error($"【任务{dto.ID}】目录[{filesDir}]中的数据处理出错,跳过此目录", ex);
  229. ResetTime(formatFlag, ref preTime);
  230. }
  231. }
  232. LogHelper.Info($"【任务{dto.ID}】数据处理完成,任务结束");
  233. });
  234. }
  235. private void ResetTime(int formatFlag, ref DateTime time)
  236. {
  237. if (formatFlag == 0)
  238. {
  239. time = DateTime.MaxValue;
  240. }
  241. else if (formatFlag == 1)
  242. {
  243. time = time.AddHours(24);
  244. }
  245. else
  246. {
  247. time = time.AddHours(1);
  248. }
  249. }
  250. private HistoryFile FileToHistoryFile(X2D1HistoryTaskHandleDto dto, string filePath)
  251. { //读取采集文件
  252. //2024_01_31_10_01_51_000000000_ch11_-1__Nxx.xxxxxx_Exx.xxxxxx_xxxxx.xxxHz_xxx.xxxMHz_ch1_xd1.dat
  253. HistoryFile historyFile = new HistoryFile();
  254. historyFile.FilePath = filePath;
  255. var fileName = Path.GetFileNameWithoutExtension(filePath);
  256. var strs = fileName.Split(new string[] { "_", "MHz", "Hz", "ch" }, StringSplitOptions.RemoveEmptyEntries);
  257. //采集时间
  258. var datestr = string.Join("_", strs.Take(6));
  259. DateTime dateTime;
  260. bool istime = DateTime.TryParseExact(datestr, "yyyy_MM_dd_HH_mm_ss", null, DateTimeStyles.None, out dateTime);
  261. if (!istime) return historyFile;
  262. historyFile.CapTime = dateTime;
  263. //采集通道
  264. var chstr = strs.Skip(strs.Length - 2).Take(1).First();
  265. int ch;
  266. int.TryParse(chstr, out ch);
  267. //采集频点
  268. var freqstr = strs.Skip(strs.Length - 3).Take(1).First();
  269. double freqMHz;
  270. double.TryParse(freqstr, out freqMHz);
  271. //采样率
  272. var fsstr = strs.Skip(strs.Length - 4).Take(1).First();
  273. double fsHz;
  274. double.TryParse(fsstr, out fsHz);
  275. historyFile.FreqHz = freqMHz * 1e6;
  276. historyFile.Ch = ch;
  277. historyFile.FsHz = fsHz;
  278. if (ch == 2)
  279. {
  280. historyFile.SatId = dto.MainSatCode;
  281. }
  282. else if (ch == 3)
  283. {
  284. historyFile.SatId = dto.AdjaSatCode;
  285. }
  286. return historyFile;
  287. }
  288. public void Stop()
  289. {
  290. cts.Cancel();
  291. }
  292. }
  293. }