123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- using Ips.Library.Basic;
- using Ips.Sps.Scheduling.Entities;
- using Ips.Sps.Scheduling.SigAds;
- using Ips.Sps.Scheduling.SigLocs;
- using Ips.Sps.Scheduling.SigProces;
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- namespace Ips.Sps.Scheduling
- {
- public class TskHandler
- {
- public TskHandler(RunTsk tsk)
- {
- _tsk = tsk;
- _cts = new CancellationTokenSource();
- }
- public RunTsk _tsk { get; private set; }
- private CancellationTokenSource _cts;
- private volatile bool _isRunning;
- public bool IsRunning => _isRunning;
- public bool IsCancelled => _cts.IsCancellationRequested;
- Task sigAdTsk, sigProcessTsk, sigLocTsk;
- BlockingCollection<AdSigGroup> adGroupQueue = new BlockingCollection<AdSigGroup>();
- BlockingCollection<ProcSigGroup> procGroupQueue = new BlockingCollection<ProcSigGroup>();
- public Task Start()
- {
- return Task.Run(() =>
- {
- try
- {
- //启动信号采集
- sigAdTsk = Task.Run(StartAdTsk);
- //启动信号处理
- sigProcessTsk = Task.Run(StartProcTsk);
- //启动信号定位
- sigLocTsk = Task.Run(StartLocTsk);
- //等待信号处理完成
- Task.WaitAll(sigAdTsk, sigProcessTsk, sigLocTsk);
- }
- finally
- {
- _isRunning = false;
- }
- });
- }
- public void Stop()
- {
- _cts.Cancel();
- }
- public void WaitStop()
- {
- while (_isRunning)
- {
- Task.Delay(100).Wait();
- }
- }
- protected virtual void StartAdTsk()
- {
- try
- {
- var worker = AdWorkerFactory.Create(_tsk);
- if (worker != null)
- {
- worker.AdSigGroupStart += (sender, arg) =>
- {
- adGroupQueue.Add(arg.AdGroup);
- };
- worker.Execute(_cts.Token);
- }
- }
- catch (Exception ex)
- {
- IpsLogger.Error($"任务【{_tsk.TskId}-{_tsk.TskName}】采集线程异常",ex);
- }
- finally
- {
- adGroupQueue.CompleteAdding();
- }
- IpsLogger.Info($"任务【{_tsk.TskId}-{_tsk.TskName}】采集线程退出!");
- }
- protected virtual void StartProcTsk()
- {
- while (!_cts.IsCancellationRequested && !adGroupQueue.IsCompleted)
- {
- if (adGroupQueue.TryTake(out AdSigGroup adGroup, Timeout.Infinite, _cts.Token))
- {
- ProcSigGroup procSigGroup = new ProcSigGroup(_tsk.TskId, adGroup.GroupTime);
- while (!adGroup.GroupSigList.IsCompleted)
- {
- if (adGroup.GroupSigList.TryTake(out AdSigResult adSigResult, Timeout.Infinite, _cts.Token))
- {
- var procWorker = ProcWorkerFactory.Create(_tsk, adSigResult.Sig.SigType);
- procWorker.ProcSigCompleted += (sender, arg) =>
- {
- procSigGroup.ProcSigList.Add(arg.Result);
- };
- procWorker.Execute(adSigResult, _cts.Token);
- IpsLogger.Info($"任务【{_tsk.TskId}-{_tsk.TskName}】,信号时间:{adSigResult.SigTime:yyyy-MM-dd HH:mm:ss},信号频点:{adSigResult.Sig.SigFreq},文件数量:{adSigResult.PasList.Count}");
- }
- }
- procGroupQueue.Add(procSigGroup);
- }
- }
- procGroupQueue.CompleteAdding();
- IpsLogger.Info($"任务【{_tsk.TskId}-{_tsk.TskName}】处理线程退出!");
- }
- protected virtual void StartLocTsk()
- {
- while (!_cts.IsCancellationRequested && !procGroupQueue.IsCompleted)
- {
- if (procGroupQueue.TryTake(out ProcSigGroup procGroup, Timeout.Infinite, _cts.Token))
- {
- var worker = LocWorkerFactory.Create(_tsk);
- worker.Execute(procGroup, _cts.Token);
- }
- }
- }
- }
- }
|