Piscal Queue working more resilient to error
This commit is contained in:
@@ -2,168 +2,111 @@
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using Hangfire;
|
||||
using LeafWeb.Core.DAL;
|
||||
using LeafWeb.Core.Entities;
|
||||
using LeafWeb.Core.Remote;
|
||||
using NLog;
|
||||
|
||||
namespace LeafWeb.Web.Services
|
||||
{
|
||||
public class PiscalQueueManager : IDisposable
|
||||
public class PiscalQueueManager : PiscalQueueBase
|
||||
{
|
||||
private readonly DataService _dataService;
|
||||
private readonly PiscalService _piscalService;
|
||||
private readonly EmailNotificationService _emailService;
|
||||
|
||||
public PiscalQueueManager(DataService dataService, PiscalService piscalService, EmailNotificationService emailService)
|
||||
{
|
||||
_dataService = dataService;
|
||||
_piscalService = piscalService;
|
||||
_emailService = emailService;
|
||||
}
|
||||
|
||||
public PiscalQueueManager() : this(new DataService(), new PiscalService(), new EmailNotificationService()) {}
|
||||
|
||||
private static readonly object ProcessQueueLock = new object();
|
||||
|
||||
public void ProcessQueue()
|
||||
{
|
||||
var logger = LogManager.GetCurrentClassLogger();
|
||||
|
||||
// prevent multiple entry into processing the queue
|
||||
if (Monitor.TryEnter(ProcessQueueLock))
|
||||
{
|
||||
logger.Trace("ProcessQueue entered");
|
||||
Logger.Trace("ProcessQueue entered");
|
||||
|
||||
try
|
||||
{
|
||||
ProcessRunning(logger);
|
||||
UpdateRunning();
|
||||
|
||||
ProcessQueue(logger);
|
||||
StartNextPending();
|
||||
}
|
||||
finally
|
||||
{
|
||||
logger.Trace("ProcessQueue exit");
|
||||
Logger.Trace("ProcessQueue exit");
|
||||
|
||||
Monitor.Exit(ProcessQueueLock);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.Trace("ProcessQueue locked, queue already processing");
|
||||
Logger.Trace("ProcessQueue locked, queue already processing");
|
||||
}
|
||||
}
|
||||
|
||||
private void ProcessQueue(ILogger logger)
|
||||
|
||||
private void StartNextPending()
|
||||
{
|
||||
var runningLeafInputs = _dataService.GetLeafInputs(LeafInputStatusType.Running).ToList();
|
||||
var runningLeafInputs = DataService.GetRunningLeafInputs().ToList();
|
||||
if (runningLeafInputs.Any())
|
||||
{
|
||||
logger.Trace("Leaf input currently running , don't enqueue any new");
|
||||
Logger.Trace("Leaf input currently running , don't enqueue any new");
|
||||
return;
|
||||
}
|
||||
|
||||
var pending =
|
||||
_dataService
|
||||
var pendingInput =
|
||||
DataService
|
||||
.GetLeafInputs(LeafInputStatusType.Pending)
|
||||
.OrderBy(l => l.StatusHistory.Min(sh => sh.DateTime))
|
||||
.FirstOrDefault();
|
||||
|
||||
if (pending == null)
|
||||
if (pendingInput == null)
|
||||
{
|
||||
logger.Trace("No pending leaf input");
|
||||
Logger.Trace("No pending leaf input");
|
||||
return;
|
||||
}
|
||||
|
||||
logger.Info("LeafInputFile: {0}, Start", pending.Id);
|
||||
try
|
||||
{
|
||||
_piscalService.Run(pending);
|
||||
}
|
||||
catch (PiscalClientException ex)
|
||||
{
|
||||
var errorMessage =
|
||||
$"LeafInputFile: {pending.Id}\r\nProcessRunning Exception: {ex.Message}"
|
||||
+ $"\r\nInnerException: {ex.InnerException}\r\nStackTrace: {ex.StackTrace}";
|
||||
logger.Error(errorMessage);
|
||||
BackgroundJob.Enqueue(() => _emailService.SendAdministratorMessage("LeafWeb ProcessQueue Exception", errorMessage));
|
||||
|
||||
_dataService.SetLeafInputStatus(pending, LeafInputStatusType.Exception, "Error occurred starting LeafInput", ex.Message);
|
||||
logger.Info("LeafInputFile: {0}, Cleanup", pending.Id);
|
||||
_piscalService.Cleanup(pending);
|
||||
|
||||
// TODO: re-queue?
|
||||
//_dataService.SetLeafInputFileStatus(queuedFile, LeafInputStatusType.Queued, "Re-queuing LeafInput");
|
||||
}
|
||||
logger.Trace("LeafInputFile: {0}, Set Pending", pending.Id);
|
||||
_dataService.SetLeafInputStatus(pending, LeafInputStatusType.Running);
|
||||
var pendingInputId = pendingInput.Id;
|
||||
Logger.Info("LeafInput: {0}, Starting", pendingInputId);
|
||||
DataService.SetLeafInputStatus(pendingInput, LeafInputStatusType.Starting);
|
||||
BackgroundJob.Enqueue<StartPending>(c => c.DoWork(pendingInputId));
|
||||
}
|
||||
|
||||
private void ProcessRunning(ILogger logger)
|
||||
private void UpdateRunning()
|
||||
{
|
||||
var running = _dataService.GetLeafInputs(LeafInputStatusType.Running).ToList();
|
||||
var running = DataService.GetLeafInputs(LeafInputStatusType.Running).ToList();
|
||||
foreach (var leafInput in running)
|
||||
{
|
||||
try
|
||||
{
|
||||
var status = _piscalService.GetStatus(leafInput);
|
||||
var status = PiscalService.GetStatus(leafInput);
|
||||
var leafInputId = leafInput.Id;
|
||||
switch (status)
|
||||
{
|
||||
case PiscalStatus.NotStarted:
|
||||
logger.Warn("LeafInputFile: {0}, Not Started, re-queueing", leafInput.Id);
|
||||
// if it's not started, try to requeue the process - this is unusual state
|
||||
_dataService.SetLeafInputStatus(leafInput, LeafInputStatusType.Pending);
|
||||
// if it's not started - this is unusual state
|
||||
Logger.Warn("LeafInput: {0}, Piscal Not Started", leafInput.Id);
|
||||
break;
|
||||
|
||||
case PiscalStatus.Running:
|
||||
logger.Trace("LeafInputFile: {0}, Running", leafInput.Id);
|
||||
Logger.Trace("LeafInput: {0}, Piscal Running", leafInput.Id);
|
||||
// continue running
|
||||
break;
|
||||
|
||||
case PiscalStatus.Complete:
|
||||
logger.Info("LeafInputFile: {0}, Complete", leafInput.Id);
|
||||
// collect the leaf output
|
||||
var leafOutputFiles = _piscalService.RetrieveOutputFiles(leafInput).ToList();
|
||||
// TODO: change to "retrieving output"?
|
||||
DataService.SetLeafInputStatus(leafInput, LeafInputStatusType.Finishing);
|
||||
|
||||
logger.Trace("LeafInputFile: {0}, saving output files", leafInput.Id);
|
||||
foreach (var outputFile in leafOutputFiles)
|
||||
_dataService.AddLeafOutputFile(outputFile);
|
||||
|
||||
logger.Info("LeafInputFile: {0}, output files: {1}", leafInput.Id,
|
||||
string.Join(", ", leafOutputFiles.Select(o => o.Filename)));
|
||||
|
||||
// update db
|
||||
logger.Trace("LeafInputFile: {0}, Set Complete", leafInput.Id);
|
||||
_dataService.SetLeafInputStatus(leafInput, LeafInputStatusType.Complete);
|
||||
|
||||
BackgroundJob.Enqueue(() => _emailService.SendLeafWebComplete(leafInput.Id));
|
||||
|
||||
// remove working data from the server
|
||||
logger.Info("LeafInputFile: {0}, Cleanup", leafInput.Id);
|
||||
_piscalService.Cleanup(leafInput);
|
||||
var retrieveFilesId = BackgroundJob.Enqueue<RetrieveOutputFiles>(s => s.DoWork(leafInputId));
|
||||
BackgroundJob.ContinueWith<Cleanup>(retrieveFilesId, s => s.DoWork(leafInputId));
|
||||
BackgroundJob.ContinueWith<EmailNotificationService>(retrieveFilesId,
|
||||
email => email.SendLeafWebComplete(leafInputId));
|
||||
BackgroundJob.ContinueWith<SetComplete>(retrieveFilesId, s => s.DoWork(leafInputId));
|
||||
break;
|
||||
}
|
||||
}
|
||||
catch (PiscalClientException ex)
|
||||
{
|
||||
PiscalExceptionNotify(ex, leafInput);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
var errorMessage =
|
||||
$"LeafInputFile: {leafInput.Id}\r\nProcessRunning Exception: {ex.Message}"
|
||||
+ $"\r\nInnerException: {ex.InnerException}\r\nStackTrace: {ex.StackTrace}";
|
||||
logger.Error(errorMessage);
|
||||
BackgroundJob.Enqueue(() => _emailService.SendAdministratorMessage("LeafWeb ProcessRunning Exception", errorMessage));
|
||||
|
||||
_dataService.SetLeafInputStatus(leafInput, LeafInputStatusType.Exception, "Error occurred processing LeafInput", ex.Message);
|
||||
|
||||
// TODO: Send email
|
||||
logger.Info("LeafInputFile: {0}, Cleanup", leafInput.Id);
|
||||
_piscalService.Cleanup(leafInput);
|
||||
// TODO: re-queue ?
|
||||
var errorMessage = FormatException(ex, leafInput.Id);
|
||||
Logger.Error(errorMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_dataService.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user