Run scheduled tasks

Task Scheduler Feature

While integrating with external or internal systems, building scheduling engines and notifications, it becomes necessary to execute certain tasks at predefined moments of time. For example:
  • Check state of the analytical report that is being prepared by the external system.
  • Check SEVIS web services for the processing state of the requests.
  • Issue a heart-beat message for the monitoring.
  • Pull new exceptions and status reports from the monitoring subject.

While doing things like that we should also take care about:
  • handling any possible exceptions without affecting the rest of the application;
  • when exception happens - rolling back any changes to ensure consistent state;
  • allowing to run task multiple times immediately, if there is more work to be processed;
  • leverage automatic lifetime management, inversion of control and deterministic disposal, provided by the inversion of control container.

Task Scheduler Feature of Lokad Cqrs App Engine provides foundation for implementing such functionality. It is based on the experience of doing enterprise integration with unreliable web services and implementing various scheduling processes.

Each task is implemented as a simple class, that is handled by the infrastructure and, while executed, returns interval of time till the next run (if interval is zero, engine will attempt to rerun the task immediately). App Engine will take care of instantiating task instances, running them, handling all problems and scheduling execution of multiple tasks.

Lokad.CQRS App Engine can have multiple task scheduler instances per worker, each one will be executed as a separate engine thread with the proper start and shutdown sequences.

Note, that if the App Engine has multiple worker role instances running, then each one will have tasks executed independently.

!!Guidance

Implementation

Tasks are coded in .NET in a way that's similar to message handlers. Below is a snippet from the Sample-02 of Lokad.CQRS codebase. It sends a random payment message every 3 seconds.
public sealed class SendPaymentsSometimes : IScheduledTask
{
  readonly IMessageClient _sender;

  public SendPaymentsSometimes(IMessageClient sender)
  {
    _sender = sender;
  }

  public TimeSpan Execute()
  {
    var amount = (Rand.NextDouble()*100).Round(1);
    _sender.Send(new SendPaymentMessage(amount));
    // sleep for
    return 3.Seconds();
  }
}

In this case we are implementing interface defined in Lokad.Cqrs.Default namespace. App Engine is configuration snippet could look like this:

builder
  .RunTasks(m => { m.WithDefaultInterfaces().InCurrentAssembly(); })

Alternatively you can keep your task implementations decoupled from the Lokad.Cqrs assemblies, implementing your own interfaces and wiring them like this:
/// <summary>
/// Custom interface for the task scheduler feature of Lokad.CQRS App Engine
/// </summary>
public interface IHappenRegularly
{
  /// <summary>
  /// Executes some event that happens at the predefined moments of time
  /// </summary>
  /// <returns>Amount of time to sleep till the next run</returns>
  TimeSpan Happen();
}
Then the configuration will change to:
builder
  .RunTasks(c => c
    .AdaptTasks<IHappenRegularly>(h => h.Happen())
    .InAssemblyOf<IHappenRegularly>())

Advanced Features

Normally you wouldn't worry about such things. Yet, by default task scheduler also:
  • automatically resolves task implementations against the current IoC container (injecting all dependencies) in a separate lifetime scope.
  • executes tasks within the explicitly defined transaction scope.

This ensures, that you can employ absolutely the same development model for tasks as in the message handlers (i.e.: letting the infrastructure take care of the NHibernate sessions, transactions and flushing).

It's important to note, that tasks are created as transient instances for each execution by default! This does not create any performance problems in real-world enterprise applications (Autofac IoC does a great job here) and simplifies development models (i.e.: NHibernate).

Should you need to persist in-memory state between the execution of the same task, just implement some controller, registering it in the Container as singleton and injecting it into the task via the constructor injection.

Implementing Time-Based Scheduler

Task Scheduler feature serves as a logical and reliable foundation for more specific and flexible time-based schedulers, that leverage frameworks and functionality of your solution.

For example, here's how project-specific scheduling of SalescastProject is implemented on top of the Task scheduler. It uses:

In our database we store integration-specific schedules that are associated to the integration projects. Database persistence is defined by the following entity of NHibernateModule:
public class ScheduleEntity
{
  //.. project specific fields skipped

  public virtual DateTime? NextRun { get; set; }

  public virtual TimeSpan ScheduleTime { get; set; }
  public virtual ScheduleFrequency ScheduleFrequency { get; set; }

  public virtual string Title { get; set; }
  public virtual byte[] PayloadData { get; set; }
  public virtual string PayloadType { get; set; }
}
Where Schedule Frequency is just an enum, defining basic scheduling rules (more could be added as needed):
public enum ScheduleFrequency
{
  None,
  Daily,
  Weekly,
  WorkDays,
  Every,
  Monthly
}

Payload Data and Type (in our case) contain name and data for the messages that will be dispatched to the message queue, once the schedule condition is satisfied. This flexible approach allows us to schedule any action that could possibly be defined and executed within our problem domain.

For example, that's how we define task that will be executed every 2 minutes:
var schedule = new ScheduleEntity
{
  ScheduleFrequency = ScheduleFrequency.Every,
  ScheduleTime = 2.Minutes(),
  PayloadData = payload.Data,
  PayloadType = payload.Type,
  ScheduleId = scheduleId,  
  Title = "Run every 2 minutes"
};
_session.Save(schedule);
Here's the task, that will run daily at 4.AM, until dismissed:
var schedule = new ScheduleEntity
{
  ScheduleFrequency = ScheduleFrequency.Daily,
  ScheduleTime = 4.Hours(),
  PayloadData = payload.Data,
  PayloadType = payload.Type,
  ScheduleId = scheduleId,  
  Title = "Run daily at 4AM"
};
_session.Save(schedule);
Or the task that will run once on December 21 2012:
var schedule = new ScheduleEntity
{
  NextRun = new DateTime(2012, 12, 21);
  PayloadData = payload.Data,
  PayloadType = payload.Type,
  ScheduleId = scheduleId,  
  Title = "The world is supposed to be ending now"
};
_session.Save(schedule);

The actual implementation of the process looks like this:
[UsedImplicitly]
public sealed class CalendarSchedulerTask : IHappenRegularly
{
  static readonly TimeSpan Precision = 1.Minutes();
  readonly IBusClient _client;
  readonly IMessageSerializer _serializer;
  readonly ISession _session;

  public CalendarSchedulerTask(ISession session, IBusClient client, IMessageSerializer serializer)
  {
    _session = session;
    _client = client;
    _serializer = serializer;
  }


  public TimeSpan Happen()
  {
    // check for the pending items
    var currentTime = SystemUtil.UtcNow;
    var end = currentTime + Precision;

    var next = _session
      .Linq<ScheduleEntity>()
      .Where(e => (e.NextRun == null) || (e.NextRun < end))
      .Take(10)
      .ToArray();

    var messages = new List<IMessage>();

    foreach (var entity in next)
    {
      // scheduled item
      if (entity.NextRun.HasValue)
      {
        messages.Add(_serializer.Deserialize(entity.PayloadData, entity.PayloadType));
      }
      // every item
      entity.NextRun = GetNextRunTime(entity.ScheduleFrequency, entity.ScheduleTime, currentTime);
    }

    if (messages.Any())
    {
      _client.Send(messages.ToArray());
    }

    return Precision;
  }

  static DateTime GetNextRunTime(ScheduleFrequency frequency, TimeSpan offset, DateTime now)
  {
    switch (frequency)
    {
      case ScheduleFrequency.Monthly:
        // every month with the offset
        var nextMonthStarts = now.Date.AddDays((now.Day - 1)*-1).AddMonths(1);
        return nextMonthStarts + offset;
      case ScheduleFrequency.Daily:
        // every day with the offset
        var tomorrowStats = now.Date + 1.Days();
        return tomorrowStats + offset;
      case ScheduleFrequency.Weekly:
        // week begins on Monday
        var nextWeekStarts = now.Date.AddDays(((int) now.DayOfWeek - 1)*-1).AddDays(7);
        return nextWeekStarts + offset;
      case ScheduleFrequency.Every:
        return now + offset;
      case ScheduleFrequency.None:
      default:
        return new DateTime(9999, 1, 1);
    }
  }
}

Notes:
  • More custom schedule types could be added to the mix, as needed;
  • Although custom serializer is used in this sample, it is generally recommended to use same serializer that is used by the message handling and exposed into the IoC Container under the IMessageSerializer interface.

Last edited Apr 11, 2011 at 10:33 PM by AlexandrYZ, version 2

Comments

No comments yet.