|
@@ -0,0 +1,253 @@
|
|
|
+using System;
|
|
|
+using System.Collections.Generic;
|
|
|
+using System.Linq;
|
|
|
+using System.Text;
|
|
|
+using System.Threading;
|
|
|
+using System.Threading.Tasks;
|
|
|
+using static Org.BouncyCastle.Math.EC.ECCurve;
|
|
|
+using XdCxRhDW.Dto;
|
|
|
+using System.Configuration;
|
|
|
+using System.Diagnostics;
|
|
|
+using System.IO;
|
|
|
+
|
|
|
+namespace X2D1TaskServer.Service
|
|
|
+{
|
|
|
+ /// <summary>
|
|
|
+ /// 实时任务服务
|
|
|
+ /// </summary>
|
|
|
+
|
|
|
+ public class TaskRealService : TaskService, BaseTaskI
|
|
|
+ {
|
|
|
+ Dictionary<int, CancellationTokenSource> dicCts = new Dictionary<int, CancellationTokenSource>();
|
|
|
+
|
|
|
+ public TaskRealService()
|
|
|
+ {
|
|
|
+ }
|
|
|
+ /// <summary>
|
|
|
+ /// 启动实时任务
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="dto"></param>
|
|
|
+ public void StartAsync(X2D1TaskHandleDto dto)
|
|
|
+ {
|
|
|
+ var cts = new CancellationTokenSource();
|
|
|
+ if (dicCts.ContainsKey(dto.ID))
|
|
|
+ dicCts[dto.ID] = cts;
|
|
|
+ else
|
|
|
+ dicCts.Add(dto.ID, cts);
|
|
|
+
|
|
|
+ Task.Run(async () =>
|
|
|
+ {
|
|
|
+ if (!Directory.Exists(dto.CapDir))
|
|
|
+ {
|
|
|
+ await StopTask(dto.ID, EnumTaskStopType.Error, $"文件路径[{dto.CapDir}]不存在,任务结束");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (dto.DateDirFormat.Contains("\\"))
|
|
|
+ {
|
|
|
+ await StopTask(dto.ID, EnumTaskStopType.Error, $"子目录日期格式不能包含多级目录格式");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ await LogHelper.Info($"【任务{dto.ID}】开始执行...");
|
|
|
+
|
|
|
+ await LogHelper.Info($"【任务{dto.ID}】定位时差系数={_config.posDtoFactor}");
|
|
|
+ string checkFileTypeStr = "上行信号";
|
|
|
+ if (_config.checkFileType != 0)
|
|
|
+ checkFileTypeStr = "主星下行信号";
|
|
|
+ await LogHelper.Info($"【任务{dto.ID}】检测类型={checkFileTypeStr}");
|
|
|
+ string threadStr = _config.threadCount == 0 ? "不限制" : _config.threadCount.ToString();
|
|
|
+ await LogHelper.Info($"【任务{dto.ID}】线程个数={threadStr}");
|
|
|
+ bool canConnected = CanGetSatIdFromMySql(dto.ID);
|
|
|
+ foreach (var taskSig in dto.Sigs)
|
|
|
+ {
|
|
|
+ double? delay1 = taskSig.SigDelay.FirstOrDefault(p => p.SatInfoSatCode == dto.MainSatCode)?.Delay;
|
|
|
+ double? delay2 = taskSig.SigDelay.FirstOrDefault(p => p.SatInfoSatCode == dto.AdjaSatCode)?.Delay;
|
|
|
+ if (delay1 == null)
|
|
|
+ delay1 = _config.mainSatDelay;
|
|
|
+ if (delay2 == null)
|
|
|
+ delay2 = _config.adjaSatDelay;
|
|
|
+ if (delay1 == null) delay1 = 0;
|
|
|
+ if (delay2 == null) delay2 = 0;
|
|
|
+
|
|
|
+ await LogHelper.Info($"【任务{dto.ID}】信号[{taskSig.FreqUp / 1e6:f3}MHz],主星{dto.MainSatCode}转发时延{delay1}us");
|
|
|
+ await LogHelper.Info($"【任务{dto.ID}】信号[{taskSig.FreqUp / 1e6:f3}MHz],邻星{dto.AdjaSatCode}转发时延{delay2}us");
|
|
|
+ }
|
|
|
+
|
|
|
+ DateTime preTime = DateTime.Now.AddSeconds(-_config.capSeconds * 2);
|
|
|
+ int formatFlag;
|
|
|
+ if (string.IsNullOrWhiteSpace(dto.DateDirFormat))
|
|
|
+ {
|
|
|
+ //没有日期目录,处理完目录中的数据后停止任务
|
|
|
+ formatFlag = 0;
|
|
|
+ }
|
|
|
+ else if (dto.DateDirFormat.ToLower().EndsWith("dd"))
|
|
|
+ {
|
|
|
+ //处理完一个目录后跳转到第二天的目录
|
|
|
+ formatFlag = 1;
|
|
|
+ }
|
|
|
+ else if (dto.DateDirFormat.ToUpper().EndsWith("HH"))
|
|
|
+ {
|
|
|
+ //处理完一个目录后跳转到下一个小时的目录
|
|
|
+ formatFlag = 2;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ await StopTask(dto.ID, EnumTaskStopType.Error, $"执行异常,不支持的日期目录格式");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ while (!cts.IsCancellationRequested)
|
|
|
+ {
|
|
|
+ string filesDir = dto.CapDir;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ if (formatFlag != 0)
|
|
|
+ {
|
|
|
+ filesDir = Path.Combine(dto.CapDir, $"{preTime.ToString(dto.DateDirFormat)}");//yyyy_MM_dd_HH
|
|
|
+ }
|
|
|
+ bool doNextHour = false;
|
|
|
+ while (!Directory.Exists(filesDir))
|
|
|
+ {
|
|
|
+ if (cts.IsCancellationRequested) return;
|
|
|
+ if (!dicCts.Values.Contains(cts)) return;
|
|
|
+ await LogHelper.Info($"【任务{dto.ID}】目录[{filesDir}]不存在,等待5秒...");
|
|
|
+ await Task.Delay(5000, cts.Token);
|
|
|
+ if (DateTime.Now.Hour != preTime.Hour)
|
|
|
+ {
|
|
|
+ ResetTime(formatFlag, ref preTime);
|
|
|
+ doNextHour = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (cts.IsCancellationRequested) return;
|
|
|
+ if (doNextHour)
|
|
|
+ {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ await LogHelper.Info($"【任务{dto.ID}】正在扫描[{filesDir}]目录...");
|
|
|
+ Stopwatch sw = new Stopwatch();
|
|
|
+ sw.Start();
|
|
|
+ List<string> files = Directory.GetFiles(filesDir, "*.dat").ToList();
|
|
|
+ sw.Stop();
|
|
|
+ await LogHelper.Info($"【任务{dto.ID}】扫描[{filesDir}]目录完成,耗时{sw.ElapsedMilliseconds}ms");
|
|
|
+ while (true)
|
|
|
+ {
|
|
|
+ if (cts.IsCancellationRequested) return;
|
|
|
+ if (!files.Any())
|
|
|
+ {
|
|
|
+ await LogHelper.Info($"【任务{dto.ID}】目录[{filesDir}]中没有文件,等待10秒...");
|
|
|
+ if (DateTime.Now.Hour != preTime.Hour)
|
|
|
+ {
|
|
|
+ ResetTime(formatFlag, ref preTime);
|
|
|
+ doNextHour = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ await Task.Delay(10000, cts.Token);
|
|
|
+ files = Directory.GetFiles(filesDir, "*.dat").ToList();
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ if (cts.IsCancellationRequested) return;
|
|
|
+ List<IGrouping<DateTime, HistoryFile>> groups = null;
|
|
|
+ bool isLocal = IsLocal(filesDir);
|
|
|
+ groups = files.Select(f => FileToHistoryFile(dto, f, preTime, canConnected, cts.Token))
|
|
|
+ .Where(p => p != null)
|
|
|
+ .GroupBy(m => m.CapTime)
|
|
|
+ .OrderBy(m => m.Key).ToList();
|
|
|
+ if (cts.IsCancellationRequested) break;
|
|
|
+ bool hasFile = false;
|
|
|
+ if (groups.Any())
|
|
|
+ {
|
|
|
+ var lastTime = groups.Last().Key;
|
|
|
+ foreach (var sameTimeFiles in groups)
|
|
|
+ {
|
|
|
+ if (cts.IsCancellationRequested) return;
|
|
|
+ var capTime = sameTimeFiles.First().CapTime;
|
|
|
+ if ((DateTime.Now - capTime).TotalMinutes > 10)
|
|
|
+ {
|
|
|
+ preTime = capTime;
|
|
|
+ await LogHelper.Warning($"【任务{dto.ID}】处理速度过慢,丢弃数据");
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (capTime == lastTime)
|
|
|
+ {
|
|
|
+ await WaitFileEnd(dto.ID, sameTimeFiles.First(), _config.capSeconds, cts);
|
|
|
+ }
|
|
|
+ await LogHelper.Info($"【任务{dto.ID}】[{capTime:yyyyMMddHHmmss}]时刻文件采集完成");
|
|
|
+ if (!CanOpenFile(dto.ID, sameTimeFiles.First()))
|
|
|
+ continue;//文件无法打开
|
|
|
+ hasFile = true;
|
|
|
+ var xdgbInfos = sameTimeFiles.GroupBy(m => m.XdIndex);
|
|
|
+ var splitXdgbInfos = SplitFreqFiles(xdgbInfos);
|
|
|
+ List<Task> listTask = new List<Task>();
|
|
|
+ foreach (var item in splitXdgbInfos)//splitXdgbInfos中的数据并行处理
|
|
|
+ {
|
|
|
+ if (cts.IsCancellationRequested) return;
|
|
|
+ var task = GetPosTask(dto, item, isLocal, cts);
|
|
|
+ listTask.Add(task);
|
|
|
+ }
|
|
|
+ await Task.WhenAll(listTask);
|
|
|
+ preTime = capTime;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!hasFile || preTime.Minute == 59)
|
|
|
+ {
|
|
|
+ await LogHelper.Info($"【任务{dto.ID}】缺少{preTime:yyyyMMddHHmmss}时刻之后的采集文件,等待5秒...");
|
|
|
+ //实时任务没有文件需要考虑跳转到下一个小时的目录
|
|
|
+ var time = new DateTime(preTime.Year, preTime.Month, preTime.Day, preTime.Hour, 0, 0);
|
|
|
+ var now = DateTime.Now;
|
|
|
+ if ((int)(now - time).TotalHours != 0)
|
|
|
+ {
|
|
|
+ preTime = new DateTime(now.Year, now.Month, now.Day, now.Hour, 0, 0);
|
|
|
+ }
|
|
|
+ await Task.Delay(5000, cts.Token);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ await LogHelper.Info($"【任务{dto.ID}】目录[{filesDir}]本次扫描的数据处理完成");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (TaskCanceledException ex)
|
|
|
+ {
|
|
|
+ await LogHelper.Warning($"【任务{dto.ID}】目录[{filesDir}]中的数据处理结束,用户手动终止", ex);
|
|
|
+ ResetTime(formatFlag, ref preTime);
|
|
|
+ }
|
|
|
+ catch (Exception ex)
|
|
|
+ {
|
|
|
+ await LogHelper.Error($"【任务{dto.ID}】目录[{filesDir}]中的数据处理出错,跳过此目录", ex);
|
|
|
+ ResetTime(formatFlag, ref preTime);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ await StopTask(dto.ID, EnumTaskStopType.Properly, "数据处理完成,任务结束");
|
|
|
+ }, cts.Token);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void Stop(int taskID)
|
|
|
+ {
|
|
|
+ if (!dicCts.ContainsKey(taskID)) return;
|
|
|
+ dicCts[taskID]?.Cancel();
|
|
|
+ dicCts.Remove(taskID);
|
|
|
+ }
|
|
|
+ public async Task StopTask(int taskID, EnumTaskStopType type, string stopReason)
|
|
|
+ {
|
|
|
+ await Task.Delay(2000);
|
|
|
+ if (type == EnumTaskStopType.Properly)
|
|
|
+ {
|
|
|
+ await LogHelper.Info($"【任务{taskID}】{stopReason}");
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ await LogHelper.Error($"【任务{taskID}】{stopReason}");
|
|
|
+ }
|
|
|
+ TaskStopHandleDto stopDto = new TaskStopHandleDto() { ID = taskID, StopType = type, TaskType = EnumTaskTypeDto.Real, StopReason = stopReason };
|
|
|
+ var stopResp = await HttpHelper.PostRequestAsync(_config.baseUrl + "Task/StopTask", stopDto);
|
|
|
+ if (stopResp.code != 200)
|
|
|
+ {
|
|
|
+ await LogHelper.Error($"【任务{taskID}】停止异常.{stopResp.msg}");
|
|
|
+ }
|
|
|
+ if (dicCts.ContainsKey(taskID))
|
|
|
+ dicCts.Remove(taskID);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|