using Scheduler.Backgrounding; using WebAPI.Interfaces; namespace WebAPI.Services { internal class CrawlScheduler : ICrawlScheduler { private readonly Dictionary>> _taskPool = new(); private readonly BGTaskLoop> _taskLooper; //TODO: Log list, that should be saved to DB from time // and could be reused for future calculations & benchmarking (server restart) private readonly List _log = new(); private readonly object _poolLock = new(); private readonly object _logLock = new(); public event EventHandler? OnResult; public CrawlScheduler() { _taskLooper = new BGTaskLoop>(new ParallelExecuter(this), TimeSpan.FromSeconds(1)); _taskLooper.OnExecuteStart += OnBGLoopStarted; _taskLooper.OnExecuteEnd += OnBGLoopEnded; _taskLooper.Start(); } /// Adds a task to the task pool of the scheduler (thread safe) public bool AddTask(int id, Func> task) { if (task != null) { lock (_poolLock) { if (_taskPool.ContainsKey(id)) return false; _taskPool.Add(id, task); } return true; } else return false; } /// Removes a task from the task pool of the scheduler (thread safe) public bool RemoveTask(int id) { lock (_poolLock) return _taskPool.Remove(id); } private void OnBGLoopStarted(object? sender, BGTaskStartEvent e) { lock (_logLock) _log.Add(new BGTaskInfo(e.IntervalID, e.BGTaskGuid, e.At)); } private void OnBGLoopEnded(object? sender, BGTaskEndEvent> e) { BGTaskInfo? bgTaskInfo = null; lock (_logLock) bgTaskInfo = _log.FirstOrDefault(x => x.GetBGTaskID() == e.GetBGTaskID()); if (bgTaskInfo != null) { bgTaskInfo.Finish(e.At); Console.WriteLine($"[{e.GetBGTaskID()}] finished. returned: {e.Result.Count} list items"); } else Console.WriteLine($"[{e.GetBGTaskID()}] tried to finish, though did not find bgTaskInfo:");// {e.Result}"); foreach (var r in e.Result) OnResult?.Invoke(this, r); } /// /// Executes multiple tasks in parallel and tries to balance the load. /// private class ParallelExecuter : IBGTaskLoopable> { private const int MAX_SEC_FOR_COMPLETE_LOOP = 10; private const int MIN_INTERVAL = 1; private readonly CrawlScheduler _host; private readonly Queue>> _q = new(); private int _bufferSize = 10; private TimeSpan _lastLoopDuration = TimeSpan.Zero; private readonly object _resultsLock = new(); internal ParallelExecuter(CrawlScheduler host) { _host = host; } async Task> IBGTaskLoopable>.OnBGTaskLoopAsync() { var start = DateTime.Now; UpdateBufferSize(); UpdateQueue(); List results = new(); int bufferL = Math.Min(_q.Count, _bufferSize); var tasks = new Func>[bufferL]; for (int i = 0; i < bufferL; i++) tasks[i] = _q.Dequeue(); try { await Parallel.ForEachAsync(tasks, async (task, cts) => { var result = await task(); lock (_resultsLock) results.Add(result); }); } catch (Exception ex) { //TODO: Log } _lastLoopDuration = DateTime.Now - start; return results; } private void UpdateBufferSize() { int suggstCallsPerSec = 0; lock (_host._poolLock) suggstCallsPerSec = (int)Math.Ceiling((double)_host._taskPool.Count / MAX_SEC_FOR_COMPLETE_LOOP); if (_lastLoopDuration.TotalSeconds > 1) Console.WriteLine("[WARNING] last loop duration > 1 sec!"); if (suggstCallsPerSec != _bufferSize) { _bufferSize = suggstCallsPerSec; Console.WriteLine("[INTRADESTING]: _bufferSize was updated!"); } } /// if Queue is empty => fetch all tasks from _host and enqueue them once private void UpdateQueue() { if (_q.Count == 0) lock (_host._poolLock) { foreach (var task in _host._taskPool) _q.Enqueue(task.Value); } } } } }