123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- using System;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.IO;
- using System.Linq;
- using System.Threading;
- using System.Threading.Tasks;
- using XdCxRhDW.Dto;
- using XdCxRhDW.UI.Lib;
- namespace X2D1TaskServer.Service
- {
- /// <summary>
- /// 实时任务服务
- /// </summary>
- public class TaskRealService : TaskService, BaseTaskI
- {
- Dictionary<int, CancellationTokenSource> dicCts = new Dictionary<int, CancellationTokenSource>();
- public TaskRealService()
- {
- }
- /// <summary>
- /// 启动实时任务
- /// </summary>
- /// <param name="dto"></param>
- public void StartAsync(X2D1TaskHandleDto dto)
- {
- var cts = new CancellationTokenSource();
- if (dicCts.ContainsKey(dto.ID))
- dicCts[dto.ID] = cts;
- else
- dicCts.Add(dto.ID, cts);
- Task.Run(async () =>
- {
- if (!Directory.Exists(dto.CapDir))
- {
- await StopTask(dto.ID, EnumTaskStopType.Error, $"文件路径[{dto.CapDir}]不存在,任务结束");
- return;
- }
- if (dto.DateDirFormat.Contains("\\"))
- {
- await StopTask(dto.ID, EnumTaskStopType.Error, $"子目录日期格式不能包含多级目录格式");
- return;
- }
- await LogUI.Info($"【任务{dto.ID}】开始执行...");
- string checkFileTypeStr = "上行信号";
- if (_config.checkFileType != 0)
- checkFileTypeStr = "主星下行信号";
- await LogUI.Info($"【任务{dto.ID}】检测类型={checkFileTypeStr}");
- string threadStr = _config.threadCount == 0 ? "不限制" : _config.threadCount.ToString();
- await LogUI.Info($"【任务{dto.ID}】线程个数={threadStr}");
- bool canConnected = CanGetSatIdFromMySql(dto.ID);
- foreach (var taskSig in dto.Sigs)
- {
- double? delay1 = taskSig.SigDelay.FirstOrDefault(p => p.SatInfoSatCode == dto.MainSatCode)?.Delay;
- double? delay2 = taskSig.SigDelay.FirstOrDefault(p => p.SatInfoSatCode == dto.AdjaSatCode)?.Delay;
- if (delay1 == null)
- delay1 = _config.mainSatDelay;
- if (delay2 == null)
- delay2 = _config.adjaSatDelay;
- if (delay1 == null) delay1 = 0;
- if (delay2 == null) delay2 = 0;
- await LogUI.Info($"【任务{dto.ID}】信号[{taskSig.FreqUp / 1e6:f3}MHz],主星{dto.MainSatCode}转发时延{delay1}us");
- await LogUI.Info($"【任务{dto.ID}】信号[{taskSig.FreqUp / 1e6:f3}MHz],邻星{dto.AdjaSatCode}转发时延{delay2}us");
- }
- DateTime preTime = DateTime.Now.AddSeconds(-_config.capSeconds * 2);
- int formatFlag;
- if (string.IsNullOrWhiteSpace(dto.DateDirFormat))
- {
- //没有日期目录,处理完目录中的数据后停止任务
- formatFlag = 0;
- }
- else if (dto.DateDirFormat.ToLower().EndsWith("dd"))
- {
- //处理完一个目录后跳转到第二天的目录
- formatFlag = 1;
- }
- else if (dto.DateDirFormat.ToUpper().EndsWith("HH"))
- {
- //处理完一个目录后跳转到下一个小时的目录
- formatFlag = 2;
- }
- else
- {
- await StopTask(dto.ID, EnumTaskStopType.Error, $"执行异常,不支持的日期目录格式");
- return;
- }
- while (!cts.IsCancellationRequested)
- {
- string filesDir = dto.CapDir;
- try
- {
- if (formatFlag != 0)
- {
- filesDir = Path.Combine(dto.CapDir, $"{preTime.ToString(dto.DateDirFormat)}");//yyyy_MM_dd_HH
- }
- bool doNextHour = false;
- while (!Directory.Exists(filesDir))
- {
- if (cts.IsCancellationRequested) return;
- if (!dicCts.Values.Contains(cts)) return;
- await LogUI.Info($"【任务{dto.ID}】目录[{filesDir}]不存在,等待5秒...");
- await Task.Delay(5000, cts.Token);
- if (DateTime.Now.Hour != preTime.Hour)
- {
- ResetTime(formatFlag, ref preTime);
- doNextHour = true;
- break;
- }
- }
- if (cts.IsCancellationRequested) return;
- if (doNextHour)
- {
- continue;
- }
- await LogUI.Info($"【任务{dto.ID}】正在扫描[{filesDir}]目录...");
- Stopwatch sw = new Stopwatch();
- sw.Start();
- List<string> files = Directory.GetFiles(filesDir, "*.dat").ToList();
- sw.Stop();
- await LogUI.Info($"【任务{dto.ID}】扫描[{filesDir}]目录完成,耗时{sw.ElapsedMilliseconds}ms");
- while (true)
- {
- if (cts.IsCancellationRequested) return;
- if (!files.Any())
- {
- await LogUI.Info($"【任务{dto.ID}】目录[{filesDir}]中没有文件,等待10秒...");
- if (DateTime.Now.Hour != preTime.Hour)
- {
- ResetTime(formatFlag, ref preTime);
- doNextHour = true;
- break;
- }
- await Task.Delay(10000, cts.Token);
- files = Directory.GetFiles(filesDir, "*.dat").ToList();
- continue;
- }
- break;
- }
- if (cts.IsCancellationRequested) return;
- List<IGrouping<DateTime, HistoryFile>> groups = null;
- bool isLocal = IsLocal(filesDir);
- groups = files.Select(f => FileToHistoryFile(dto, f, preTime, canConnected, cts.Token))
- .Where(p => p != null)
- .GroupBy(m => m.CapTime)
- .OrderBy(m => m.Key).ToList();
- if (cts.IsCancellationRequested) break;
- bool hasFile = false;
- if (groups.Any())
- {
- var lastTime = groups.Last().Key;
- foreach (var sameTimeFiles in groups)
- {
- if (cts.IsCancellationRequested) return;
- var capTime = sameTimeFiles.First().CapTime;
- if ((DateTime.Now - capTime).TotalMinutes > 10)
- {
- preTime = capTime;
- await LogUI.Warning($"【任务{dto.ID}】处理速度过慢,丢弃数据");
- continue;
- }
- if (capTime == lastTime)
- {
- await WaitFileEnd(dto.ID, sameTimeFiles.First(), _config.capSeconds, cts);
- }
- await LogUI.Info($"【任务{dto.ID}】[{capTime:yyyyMMddHHmmss}]时刻文件采集完成");
- if (!CanOpenFile(dto.ID, sameTimeFiles.First()))
- continue;//文件无法打开
- hasFile = true;
- var xdgbInfos = sameTimeFiles.GroupBy(m => m.XdIndex);
- var splitXdgbInfos = SplitFreqFiles(xdgbInfos);
- List<Task> listTask = new List<Task>();
- foreach (var item in splitXdgbInfos)//splitXdgbInfos中的数据并行处理
- {
- if (cts.IsCancellationRequested) return;
- var task = GetPosTask(dto, item, isLocal, cts);
- listTask.Add(task);
- }
- await Task.WhenAll(listTask);
- preTime = capTime;
- }
- }
- if (!hasFile || preTime.Minute == 59)
- {
- await LogUI.Info($"【任务{dto.ID}】缺少{preTime:yyyyMMddHHmmss}时刻之后的采集文件,等待5秒...");
- //实时任务没有文件需要考虑跳转到下一个小时的目录
- var time = new DateTime(preTime.Year, preTime.Month, preTime.Day, preTime.Hour, 0, 0);
- var now = DateTime.Now;
- if ((int)(now - time).TotalHours != 0)
- {
- preTime = new DateTime(now.Year, now.Month, now.Day, now.Hour, 0, 0);
- }
- await Task.Delay(5000, cts.Token);
- }
- else
- {
- await LogUI.Info($"【任务{dto.ID}】目录[{filesDir}]本次扫描的数据处理完成");
- }
- }
- catch (TaskCanceledException ex)
- {
- await LogUI.Warning($"【任务{dto.ID}】目录[{filesDir}]中的数据处理结束,用户手动终止", ex);
- ResetTime(formatFlag, ref preTime);
- }
- catch (Exception ex)
- {
- await LogUI.Error($"【任务{dto.ID}】目录[{filesDir}]中的数据处理出错,跳过此目录", ex);
- ResetTime(formatFlag, ref preTime);
- }
- }
- await StopTask(dto.ID, EnumTaskStopType.Properly, "数据处理完成,任务结束");
- }, cts.Token);
- }
- public bool Stop(int taskID)
- {
- if (!dicCts.ContainsKey(taskID)) return false;
- dicCts[taskID]?.Cancel();
- return dicCts.Remove(taskID);
- }
- public async Task StopTask(int taskID, EnumTaskStopType type, string stopReason)
- {
- await Task.Delay(2000);
- if (type == EnumTaskStopType.Properly)
- {
- await LogUI.Info($"【任务{taskID}】{stopReason}");
- }
- else
- {
- await LogUI.Error($"【任务{taskID}】{stopReason}");
- }
- TaskStopHandleDto stopDto = new TaskStopHandleDto() { ID = taskID, StopType = type, TaskType = EnumTaskTypeDto.Real, StopReason = stopReason };
- var stopResp = await HttpHelper.PostRequestAsync(_config.baseUrl + "Task/StopTask", stopDto);
- if (stopResp.code != 200)
- {
- await LogUI.Error($"【任务{taskID}】停止异常.{stopResp.msg}");
- }
- if (dicCts.ContainsKey(taskID))
- dicCts.Remove(taskID);
- }
- }
- }
|