using Ips.Library.Basic; using Ips.Sps.Scheduling.Entities; using Ips.Sps.Scheduling.SigAds; using Ips.Sps.Scheduling.SigLocs; using Ips.Sps.Scheduling.SigProces; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Ips.Sps.Scheduling { public class TskHandler { public TskHandler(RunTsk tsk) { _tsk = tsk; _cts = new CancellationTokenSource(); } public RunTsk _tsk { get; private set; } private CancellationTokenSource _cts; private volatile bool _isRunning; public bool IsRunning => _isRunning; public bool IsCancelled => _cts.IsCancellationRequested; Task sigAdTsk, sigProcessTsk, sigLocTsk; BlockingCollection adGroupQueue = new BlockingCollection(); BlockingCollection procGroupQueue = new BlockingCollection(); public Task Start() { return Task.Run(() => { try { //启动信号采集 sigAdTsk = Task.Run(StartAdTsk); //启动信号处理 sigProcessTsk = Task.Run(StartProcTsk); //启动信号定位 sigLocTsk = Task.Run(StartLocTsk); //等待信号处理完成 Task.WaitAll(sigAdTsk, sigProcessTsk, sigLocTsk); } finally { _isRunning = false; } }); } public void Stop() { _cts.Cancel(); } public void WaitStop() { while (_isRunning) { Task.Delay(100).Wait(); } } protected virtual void StartAdTsk() { try { var worker = AdWorkerFactory.Create(_tsk); if (worker != null) { worker.AdSigGroupStart += (sender, arg) => { adGroupQueue.Add(arg.AdGroup); }; worker.Execute(_cts.Token); } } catch (Exception ex) { IpsLogger.Error($"任务【{_tsk.TskId}-{_tsk.TskName}】采集线程异常",ex); } finally { adGroupQueue.CompleteAdding(); } IpsLogger.Info($"任务【{_tsk.TskId}-{_tsk.TskName}】采集线程退出!"); } protected virtual void StartProcTsk() { while (!_cts.IsCancellationRequested && !adGroupQueue.IsCompleted) { if (adGroupQueue.TryTake(out AdSigGroup adGroup, Timeout.Infinite, _cts.Token)) { ProcSigGroup procSigGroup = new ProcSigGroup(_tsk.TskId, adGroup.GroupTime); while (!adGroup.GroupSigList.IsCompleted) { if (adGroup.GroupSigList.TryTake(out AdSigResult adSigResult, Timeout.Infinite, _cts.Token)) { var procWorker = ProcWorkerFactory.Create(_tsk, adSigResult.Sig.SigType); procWorker.ProcSigCompleted += (sender, arg) => { procSigGroup.ProcSigList.Add(arg.Result); }; procWorker.Execute(adSigResult, _cts.Token); IpsLogger.Info($"任务【{_tsk.TskId}-{_tsk.TskName}】,信号时间:{adSigResult.SigTime:yyyy-MM-dd HH:mm:ss},信号频点:{adSigResult.Sig.SigFreq},文件数量:{adSigResult.PasList.Count}"); } } procGroupQueue.Add(procSigGroup); } } procGroupQueue.CompleteAdding(); IpsLogger.Info($"任务【{_tsk.TskId}-{_tsk.TskName}】处理线程退出!"); } protected virtual void StartLocTsk() { while (!_cts.IsCancellationRequested && !procGroupQueue.IsCompleted) { if (procGroupQueue.TryTake(out ProcSigGroup procGroup, Timeout.Infinite, _cts.Token)) { var worker = LocWorkerFactory.Create(_tsk); worker.Execute(procGroup, _cts.Token); } } } } }