1
0
Fork 0
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

157 lines
5.4 KiB

using Scheduler.Backgrounding;
using WebAPI.Interfaces;
namespace WebAPI.Services
{
internal class CrawlScheduler<BGTaskResult> : ICrawlScheduler<BGTaskResult>
{
private readonly Dictionary<int, Func<Task<BGTaskResult>>> _taskPool = new();
private readonly BGTaskLoop<List<BGTaskResult>> _taskLooper;
//TODO: Log list, that should be saved to DB from time
// and could be reused for future calculations & benchmarking (server restart)
private readonly List<BGTaskInfo> _log = new();
private readonly object _poolLock = new();
private readonly object _logLock = new();
public event EventHandler<BGTaskResult>? OnResult;
public CrawlScheduler()
{
_taskLooper = new BGTaskLoop<List<BGTaskResult>>(new ParallelExecuter(this), TimeSpan.FromSeconds(1));
_taskLooper.OnExecuteStart += OnBGLoopStarted;
_taskLooper.OnExecuteEnd += OnBGLoopEnded;
_taskLooper.Start();
}
/// <summary> Adds a task to the task pool of the scheduler (thread safe) </summary>
public bool AddTask(int id, Func<Task<BGTaskResult>> task)
{
if (task != null)
{
lock (_poolLock)
{
if (_taskPool.ContainsKey(id))
return false;
_taskPool.Add(id, task);
}
return true;
}
else
return false;
}
/// <summary> Removes a task from the task pool of the scheduler (thread safe) </summary>
public bool RemoveTask(int id)
{
lock (_poolLock)
return _taskPool.Remove(id);
}
private void OnBGLoopStarted(object? sender, BGTaskStartEvent e)
{
lock (_logLock)
_log.Add(new BGTaskInfo(e.IntervalID, e.BGTaskGuid, e.At));
}
private void OnBGLoopEnded(object? sender, BGTaskEndEvent<List<BGTaskResult>> e)
{
BGTaskInfo? bgTaskInfo = null;
lock (_logLock)
bgTaskInfo = _log.FirstOrDefault(x => x.GetBGTaskID() == e.GetBGTaskID());
if (bgTaskInfo != null)
{
bgTaskInfo.Finish(e.At);
Console.WriteLine($"[{e.GetBGTaskID()}] finished. returned: {e.Result.Count} list items");
}
else
Console.WriteLine($"[{e.GetBGTaskID()}] tried to finish, though did not find bgTaskInfo:");// {e.Result}");
foreach (var r in e.Result)
OnResult?.Invoke(this, r);
}
/// <summary>
/// Executes multiple tasks in parallel and tries to balance the load.
/// </summary>
private class ParallelExecuter : IBGTaskLoopable<List<BGTaskResult>>
{
private const int MAX_SEC_FOR_COMPLETE_LOOP = 10;
private const int MIN_INTERVAL = 1;
private readonly CrawlScheduler<BGTaskResult> _host;
private readonly Queue<Func<Task<BGTaskResult>>> _q = new();
private int _bufferSize = 10;
private TimeSpan _lastLoopDuration = TimeSpan.Zero;
private readonly object _resultsLock = new();
internal ParallelExecuter(CrawlScheduler<BGTaskResult> host)
{
_host = host;
}
async Task<List<BGTaskResult>> IBGTaskLoopable<List<BGTaskResult>>.OnBGTaskLoopAsync()
{
var start = DateTime.Now;
UpdateBufferSize();
UpdateQueue();
List<BGTaskResult> results = new();
int bufferL = Math.Min(_q.Count, _bufferSize);
var tasks = new Func<Task<BGTaskResult>>[bufferL];
for (int i = 0; i < bufferL; i++)
tasks[i] = _q.Dequeue();
try
{
await Parallel.ForEachAsync(tasks, async (task, cts) =>
{
var result = await task();
lock (_resultsLock)
results.Add(result);
});
}
catch (Exception ex)
{
//TODO: Log
}
_lastLoopDuration = DateTime.Now - start;
return results;
}
private void UpdateBufferSize()
{
int suggstCallsPerSec = 0;
lock (_host._poolLock)
suggstCallsPerSec = (int)Math.Ceiling((double)_host._taskPool.Count / MAX_SEC_FOR_COMPLETE_LOOP);
if (_lastLoopDuration.TotalSeconds > 1)
Console.WriteLine("[WARNING] last loop duration > 1 sec!");
if (suggstCallsPerSec != _bufferSize)
{
_bufferSize = suggstCallsPerSec;
Console.WriteLine("[INTRADESTING]: _bufferSize was updated!");
}
}
/// <summary> if Queue is empty => fetch all tasks from _host and enqueue them once </summary>
private void UpdateQueue()
{
if (_q.Count == 0)
lock (_host._poolLock)
{
foreach (var task in _host._taskPool)
_q.Enqueue(task.Value);
}
}
}
}
}