| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 | using Ips.Library.Basic;using Ips.Sps.Scheduling.Entities;using Ips.Sps.Scheduling.SigAds;using Ips.Sps.Scheduling.SigLocs;using Ips.Sps.Scheduling.SigProces;using System;using System.Collections.Concurrent;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;namespace Ips.Sps.Scheduling{    public class TskHandler    {        public TskHandler(RunTsk tsk)        {            _tsk = tsk;            _cts = new CancellationTokenSource();        }        public RunTsk _tsk { get; private set; }        private CancellationTokenSource _cts;        private volatile bool _isRunning;        public bool IsRunning => _isRunning;        public bool IsCancelled => _cts.IsCancellationRequested;        Task sigAdTsk, sigProcessTsk, sigLocTsk;        BlockingCollection<AdSigGroup> adGroupQueue = new BlockingCollection<AdSigGroup>();        BlockingCollection<ProcSigGroup> procGroupQueue = new BlockingCollection<ProcSigGroup>();        public Task Start()        {            return Task.Run(() =>            {                try                {                    //启动信号采集                    sigAdTsk = Task.Run(StartAdTsk);                    //启动信号处理                    sigProcessTsk = Task.Run(StartProcTsk);                    //启动信号定位                    sigLocTsk = Task.Run(StartLocTsk);                    //等待信号处理完成                    Task.WaitAll(sigAdTsk, sigProcessTsk, sigLocTsk);                }                finally                {                    _isRunning = false;                }            });        }        public void Stop()        {            _cts.Cancel();        }        public void WaitStop()        {            while (_isRunning)            {                Task.Delay(100).Wait();            }        }        protected virtual void StartAdTsk()        {            try            {                var worker = AdWorkerFactory.Create(_tsk);                if (worker != null)                {                    worker.AdSigGroupStart += (sender, arg) =>                    {                        adGroupQueue.Add(arg.AdGroup);                    };                    worker.Execute(_cts.Token);                }            }            catch (Exception ex)            {                IpsLogger.Error($"任务【{_tsk.TskId}-{_tsk.TskName}】采集线程异常",ex);            }            finally            {                adGroupQueue.CompleteAdding();            }            IpsLogger.Info($"任务【{_tsk.TskId}-{_tsk.TskName}】采集线程退出!");        }        protected virtual void StartProcTsk()        {            while (!_cts.IsCancellationRequested && !adGroupQueue.IsCompleted)            {                if (adGroupQueue.TryTake(out AdSigGroup adGroup, Timeout.Infinite, _cts.Token))                {                    ProcSigGroup procSigGroup = new ProcSigGroup(_tsk.TskId, adGroup.GroupTime);                    while (!adGroup.GroupSigList.IsCompleted)                    {                        if (adGroup.GroupSigList.TryTake(out AdSigResult adSigResult, Timeout.Infinite, _cts.Token))                        {                            var procWorker = ProcWorkerFactory.Create(_tsk, adSigResult.Sig.SigType);                            procWorker.ProcSigCompleted += (sender, arg) =>                            {                                procSigGroup.ProcSigList.Add(arg.Result);                            };                            procWorker.Execute(adSigResult, _cts.Token);                            IpsLogger.Info($"任务【{_tsk.TskId}-{_tsk.TskName}】,信号时间:{adSigResult.SigTime:yyyy-MM-dd HH:mm:ss},信号频点:{adSigResult.Sig.SigFreq},文件数量:{adSigResult.PasList.Count}");                        }                    }                    procGroupQueue.Add(procSigGroup);                }            }            procGroupQueue.CompleteAdding();            IpsLogger.Info($"任务【{_tsk.TskId}-{_tsk.TskName}】处理线程退出!");        }        protected virtual void StartLocTsk()        {            while (!_cts.IsCancellationRequested && !procGroupQueue.IsCompleted)            {                if (procGroupQueue.TryTake(out ProcSigGroup procGroup, Timeout.Infinite, _cts.Token))                {                    var worker = LocWorkerFactory.Create(_tsk);                    worker.Execute(procGroup, _cts.Token);                }            }        }    }}
 |