HistoryTaskService.cs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. using DevExpress.Utils.Extensions;
  2. using MySqlX.XDevAPI.Common;
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Configuration;
  6. using System.Globalization;
  7. using System.IO;
  8. using System.Linq;
  9. using System.Text;
  10. using System.Threading;
  11. using System.Threading.Tasks;
  12. using XdCxRhDW.Dto;
  13. namespace XdCxRhDW.X2D1TaskServer.Service
  14. {
  15. public class HistoryTaskService
  16. {
  17. private readonly string baseUrl;
  18. CancellationTokenSource cts;
  19. public HistoryTaskService()
  20. {
  21. var posPlatformAddr = ConfigurationManager.AppSettings["PosPlatformAddr"].Trim();//like http://127.0.0.1:8091 or http://127.0.0.1:8091/
  22. if (posPlatformAddr.EndsWith("/"))
  23. this.baseUrl = posPlatformAddr + "api/";
  24. else
  25. this.baseUrl = posPlatformAddr + "/api/";
  26. }
  27. public void StartAsync(X2D1HistoryTaskHandleDto dto)
  28. {
  29. cts = new CancellationTokenSource();
  30. if (!Directory.Exists(dto.CapDir))
  31. {
  32. StopTask(dto.ID, $"文件路径[{dto.CapDir}]不存在,任务结束");
  33. return;
  34. }
  35. LogHelper.Info($"【任务{dto.ID}】开始执行...");
  36. bool canConnected = CanGetSatIdFromMySql();
  37. if (!canConnected)
  38. {
  39. LogHelper.Warning("无法连接MySql查询卫星编号,将使用任务中提供的卫星");
  40. }
  41. //点击定位平台右上角查看接口可以在浏览器中查看平台提供的所有接口详细信息
  42. Task.Run(async () =>
  43. {
  44. DateTime preTime = dto.StartTime;
  45. int formatFlag;
  46. if (string.IsNullOrWhiteSpace(dto.DateDirFormat))
  47. {
  48. //没有日期目录,处理完目录中的数据后停止任务
  49. formatFlag = 0;
  50. }
  51. else if (dto.DateDirFormat.ToLower().EndsWith("dd"))
  52. {
  53. //处理完一个目录后跳转到第二天的目录
  54. formatFlag = 1;
  55. }
  56. else if (dto.DateDirFormat.ToUpper().EndsWith("HH"))
  57. {
  58. //处理完一个目录后跳转到下一个小时的目录
  59. formatFlag = 2;
  60. }
  61. else
  62. {
  63. StopTask(dto.ID, $"执行异常,不支持的日期目录格式");
  64. return;
  65. }
  66. while (!cts.IsCancellationRequested && preTime <= dto.EndTime)
  67. {
  68. string filesDir = dto.CapDir;
  69. try
  70. {
  71. if (formatFlag != 0)
  72. {
  73. filesDir = Path.Combine(dto.CapDir, $"{preTime.ToString(dto.DateDirFormat)}");//yyyyMMdd
  74. }
  75. LogHelper.Info($"【任务{dto.ID}】正在处理[{filesDir}]目录中的数据...");
  76. IEnumerable<string> files;
  77. if (!Directory.Exists(filesDir))
  78. {
  79. LogHelper.Info($"【任务{dto.ID}】目录[{filesDir}]不存在,跳过此目录");
  80. ResetTime(formatFlag, ref preTime);
  81. continue;
  82. }
  83. files = Directory.EnumerateFiles(filesDir, "*.dat");
  84. if (!files.Any())
  85. {
  86. LogHelper.Info($"【任务{dto.ID}】目录[{filesDir}]中没有文件,跳过此目录");
  87. ResetTime(formatFlag, ref preTime);
  88. continue;
  89. }
  90. IOrderedEnumerable<IGrouping<DateTime, HistoryFile>> groups = null;
  91. groups = files.Select(f => FileToHistoryFile(dto, f, canConnected)).GroupBy(m => m.CapTime).OrderBy(m => m.Key);
  92. foreach (var item in groups)
  93. {
  94. if (cts.IsCancellationRequested) break;
  95. var finfos = item.ToList();
  96. var capTime = finfos.First().CapTime;
  97. if (capTime < dto.StartTime) continue;
  98. if (capTime > dto.EndTime) break;
  99. if (finfos.Count < 3)
  100. {
  101. LogHelper.Warning($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻文件数量只有{finfos.Count}个,跳过此组数据");
  102. continue;
  103. }
  104. //超短波信号
  105. var dinfo = finfos.FirstOrDefault(m => m.Ch == 1);
  106. if (dinfo == null)
  107. {
  108. LogHelper.Warning($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻未找到超短波信号ch1文件,跳过此组数据");
  109. continue;
  110. }
  111. //主星
  112. var minfo = finfos.FirstOrDefault(m => m.Ch == 2);
  113. if (minfo == null)
  114. {
  115. LogHelper.Warning($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻未找到主星信号ch2文件,跳过此组数据");
  116. continue;
  117. }
  118. //邻1
  119. var ninfo = finfos.FirstOrDefault(m => m.Ch == 3);
  120. if (ninfo == null)
  121. {
  122. LogHelper.Info($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻未找到邻星信号ch3文件,跳过此组数据");
  123. continue;
  124. }
  125. try
  126. {
  127. string mainFile = await HttpHelper.UploadFileAsync(minfo.FilePath, baseUrl + "File/UploadFileAsync", token: cts.Token);//主星文件
  128. string adjaFile = await HttpHelper.UploadFileAsync(ninfo.FilePath, baseUrl + "File/UploadFileAsync", token: cts.Token);//邻星文件
  129. string cdbFile = await HttpHelper.UploadFileAsync(dinfo.FilePath, baseUrl + "File/UploadFileAsync", token: cts.Token);//超短文件
  130. DetectDto detectDto = new DetectDto()
  131. {
  132. file1 = cdbFile,//11局使用上行泄露信号进行检测
  133. dmcType = DmcType.Ky5758,//上行信号目前算法只能使用基于能量的Ky或IBS检测
  134. fsHz = minfo.FsHz,
  135. };
  136. var deteResp = await HttpHelper.PostRequestAsync<List<DetectResDto>>(baseUrl + "DetectCg/DetectCalc", detectDto, token: cts.Token);
  137. if (deteResp.code != 200)
  138. {
  139. LogHelper.Error($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻信号检测出错.{deteResp.msg}");
  140. continue;
  141. }
  142. LogHelper.Info($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻信号检测完成,共{deteResp.data.Count}个时隙");
  143. var smps = deteResp.data.Select(m => new SmpPosition(m.Start, m.Length)).ToList();//怎么补0?
  144. var cgDto = new CpuCgMultiDto()
  145. {
  146. dtCenter = 260000,
  147. dtRange = 60000,
  148. file1 = mainFile,
  149. file2 = cdbFile,
  150. samplingRate = minfo.FsHz,
  151. smpPositions = smps,
  152. snrThreshold = 15,
  153. };
  154. var result1 = await HttpHelper.PostRequestAsync<List<CpuCgResDto>>(baseUrl + "DetectCg/CpuCgMultiCalc", cgDto, token: cts.Token);
  155. if (result1.code != 200)
  156. {
  157. LogHelper.Error($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻主星超短CPU参估出错.{deteResp.msg}");
  158. continue;
  159. }
  160. LogHelper.Info($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻主星超短CPU参估完成.");
  161. cgDto = new CpuCgMultiDto()
  162. {
  163. dtCenter = 260000,
  164. dtRange = 60000,
  165. file1 = adjaFile,
  166. file2 = cdbFile,
  167. samplingRate = minfo.FsHz,
  168. smpPositions = smps,
  169. snrThreshold = 15,
  170. };
  171. var result2 = await HttpHelper.PostRequestAsync<List<CpuCgResDto>>(baseUrl + "DetectCg/CpuCgMultiCalc", cgDto, token: cts.Token);
  172. if (result2.code != 200)
  173. {
  174. LogHelper.Error($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻邻星超短CPU参估出错.{deteResp.msg}");
  175. continue;
  176. }
  177. LogHelper.Info($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻邻星超短CPU参估完成");
  178. var data1 = result1.data;
  179. var data2 = result2.data;
  180. if (data1.Count != data2.Count || data1.Count != deteResp.data.Count)
  181. {
  182. LogHelper.Error($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻参估结果个数和检测结果个数不匹配");
  183. continue;
  184. }
  185. for (int i = 0; i < data1.Count; i++)
  186. {
  187. try
  188. {
  189. if (cts.IsCancellationRequested) break;
  190. X2D1NoXlNoParlPosDto x2D1 = new X2D1NoXlNoParlPosDto()
  191. {
  192. TaskID = dto.ID,
  193. SigTime = minfo.CapTime.AddSeconds(data1[i].Smpstart / minfo.FsHz),
  194. MainCode = minfo.SatId,
  195. AdjaCode = ninfo.SatId,
  196. SxDto = data1[i].Dt,
  197. SxDfo = data1[i].Df,
  198. SxSnr = data1[i].Snr,
  199. XdDto = data2[i].Dt,
  200. XdDfo = data2[i].Df,
  201. XdSnr = data2[i].Snr,
  202. SatTxLon = minfo.CapLon,
  203. SatTxLat = minfo.CapLat,
  204. CdbTxLon = minfo.CapLon,
  205. CdbTxLat = minfo.CapLat,
  206. FreqDown = minfo.FreqHz,
  207. FreqUp = dinfo.FreqHz,
  208. CheckRes = new CheckResDto()
  209. {
  210. FileName = Path.GetFileName(dinfo.FilePath),
  211. ModRate = deteResp.data[i].ModRate,
  212. ModType = deteResp.data[i].ModType,
  213. SmpCount = deteResp.data[i].Length,
  214. SmpStart = deteResp.data[i].Start,
  215. UserName = deteResp.data[i].UserName,
  216. PosCheckType = deteResp.data[i].DmcType.GetEnumByDisplayName<EnumPosCheckTypeDto>(),
  217. }
  218. };
  219. var result = await HttpHelper.PostRequestAsync<PosResDto>(baseUrl + "Pos/PosX2D1NoXlNoParAsync", x2D1);
  220. if (result.code != 200)
  221. {
  222. LogHelper.Error($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻时隙位置{data1[i].Smpstart}定位异常.{result.msg}");
  223. }
  224. }
  225. catch (Exception ex)
  226. {
  227. LogHelper.Error($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻时隙位置{data1[i].Smpstart}定位异常", ex);
  228. }
  229. }
  230. LogHelper.Info($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻定位完成");
  231. }
  232. catch (Exception ex)
  233. {
  234. LogHelper.Error($"【任务{dto.ID}】{capTime:yyyyMMddHHmmss}时刻文件处理异常", ex);
  235. continue;
  236. }
  237. }
  238. ResetTime(formatFlag, ref preTime);
  239. LogHelper.Info($"【任务{dto.ID}】目录[{filesDir}]中的数据处理完成");
  240. }
  241. catch (Exception ex)
  242. {
  243. LogHelper.Error($"【任务{dto.ID}】目录[{filesDir}]中的数据处理出错,跳过此目录", ex);
  244. ResetTime(formatFlag, ref preTime);
  245. }
  246. }
  247. StopTask(dto.ID, $"数据处理完成,任务结束");
  248. }, cts.Token);
  249. }
  250. private void ResetTime(int formatFlag, ref DateTime time)
  251. {
  252. if (formatFlag == 0)
  253. {
  254. time = DateTime.MaxValue;
  255. }
  256. else if (formatFlag == 1)
  257. {
  258. time = time.AddHours(24);
  259. }
  260. else
  261. {
  262. time = time.AddHours(1);
  263. }
  264. }
  265. private HistoryFile FileToHistoryFile(X2D1HistoryTaskHandleDto dto, string filePath, bool canConnected)
  266. { //读取采集文件
  267. //2024_01_31_10_01_51_000000000_ch11_-1__Nxx.xxxxxx_Exx.xxxxxx_xxxxx.xxxHz_xxx.xxxMHz_ch1_xd1.dat
  268. HistoryFile historyFile = new HistoryFile();
  269. historyFile.FilePath = filePath;
  270. var fileName = Path.GetFileNameWithoutExtension(filePath);
  271. var strs = fileName.Split(new string[] { "_", "MHz", "Hz", "ch" }, StringSplitOptions.RemoveEmptyEntries);
  272. //采集时间
  273. var datestr = string.Join("_", strs.Take(6));
  274. DateTime dateTime;
  275. bool istime = DateTime.TryParseExact(datestr, "yyyy_MM_dd_HH_mm_ss", null, DateTimeStyles.None, out dateTime);
  276. if (!istime) return historyFile;
  277. historyFile.CapTime = dateTime;
  278. //采集通道
  279. var chstr = strs.Skip(strs.Length - 2).Take(1).First();
  280. int ch;
  281. int.TryParse(chstr, out ch);
  282. //采集频点
  283. var freqstr = strs.Skip(strs.Length - 3).Take(1).First();
  284. double freqMHz;
  285. double.TryParse(freqstr, out freqMHz);
  286. //采样率
  287. var fsstr = strs.Skip(strs.Length - 4).Take(1).First();
  288. double fsHz;
  289. double.TryParse(fsstr, out fsHz);
  290. historyFile.FreqHz = freqMHz * 1e6;
  291. historyFile.Ch = ch;
  292. historyFile.FsHz = fsHz;
  293. if (canConnected)
  294. historyFile.SatId = GetSatId(historyFile.FreqHz);
  295. if (ch == 2)
  296. {
  297. if (historyFile.SatId == 0)
  298. {
  299. LogHelper.Warning($"【任务{dto.ID}】主星使用任务中的卫星[{dto.MainSatCode}]");
  300. historyFile.SatId = dto.MainSatCode;
  301. }
  302. }
  303. else if (ch == 3)
  304. {
  305. if (historyFile.SatId == 0)
  306. {
  307. LogHelper.Warning($"【任务{dto.ID}】邻星使用任务中的卫星[{dto.AdjaSatCode}]");
  308. historyFile.SatId = dto.AdjaSatCode;
  309. }
  310. }
  311. return historyFile;
  312. }
  313. int GetSatId(double freqdown)
  314. {
  315. int satId = 0;
  316. string sql = $"select 卫星ID from freguencysatid where 下行 = '{freqdown}'and 洋区 = 'I' LIMIT 1";
  317. try
  318. {
  319. var res = MySqlTools.ExecuteScalar(System.Data.CommandType.Text, sql);
  320. if (res == null || res == DBNull.Value)
  321. {
  322. LogHelper.Error($"下行频点{freqdown * 1e-6}未能从MySql中找到对应的卫星编号.SQL={sql}");
  323. }
  324. bool isInt = int.TryParse($"{res}", out satId);
  325. if (!isInt)
  326. {
  327. //System.Configuration.ConfigurationManager.ConnectionStrings["MySql"].ConnectionString
  328. LogHelper.Error($"卫星编号转换失败,obj={res}");
  329. }
  330. }
  331. catch (Exception ex)
  332. {
  333. LogHelper.Error($"下行频点{freqdown * 1e-6}MySql查询卫星编号异常.ConnectionString={System.Configuration.ConfigurationManager.ConnectionStrings["MySql"].ConnectionString}", ex);
  334. }
  335. return satId;
  336. }
  337. bool CanGetSatIdFromMySql()
  338. {
  339. string sql = $"select 卫星ID from freguencysatid where 下行 = '{0}'and 洋区 = 'I' LIMIT 1";
  340. try
  341. {
  342. var res = MySqlTools.ExecuteScalar(System.Data.CommandType.Text, sql);
  343. return true;
  344. }
  345. catch
  346. {
  347. return false;
  348. }
  349. }
  350. void StopTask(int taskID, string stopReason)
  351. {
  352. TaskStopHandleDto stopDto = new TaskStopHandleDto() { ID = taskID, StopReason = stopReason };
  353. if (string.IsNullOrWhiteSpace(stopReason))
  354. LogHelper.Info($"【任务{taskID}】正常结束");
  355. else
  356. LogHelper.Error($"【任务{taskID}】{stopReason}");
  357. var stopResp = HttpHelper.PostRequestAsync(baseUrl + "Task/StopTask", stopDto).Result;
  358. if (stopResp.code != 200)
  359. {
  360. LogHelper.Error($"【任务{taskID}】停止异常.{stopResp.msg}");
  361. return;
  362. }
  363. }
  364. public void Stop()
  365. {
  366. cts?.Cancel();
  367. }
  368. }
  369. }