Simple task scheduler deployed in a cluster
Working in a startup presents a lot of challenges, with the biggest one being time to market. This drives the development team to build things quickly while maintaining quality.
One such task that you come across is having a simple task scheduler.
Uses of a task scheduler could be many. You might want to
- Send regular emails at set intervals
- Perform heavy computations based on a set schedule
- One off tasks such as deleting customer data at off-peak times.
There could be many reasons why you need a task scheduler, and these needs tend to grow over time. Ideally, you should consider using RabbitMQ or Kafka (depending on your specific needs) for message queuing. However, if you need a quick solution, you can opt for a cron job.
One potential problem with cron jobs or schedulers within an application's lifetime is when you distribute your application across a cluster. In this case, you want to avoid sending the same email multiple times, based on the number of application instances running in the cluster.
Here, I am presenting a simple solution or hack that works for a startup and is cluster-friendly. Your tasks will also run only once and use just a database, which probably every application has.
So lets get down to business.
You’ll need a database table with the following columns
Id | Column | Type | |
---|---|---|---|
1 | scheduled_time | timestamp | |
2 | status | IN_PROGRESS/WATING/DONE | |
3 | start_time | timestamp | |
4 | version | int | |
5 | meta_data | jsonb (or string) |
Now your pseudo code boils down to these simple steps
jobs = getJobs(status = `WAITING` && scheduled_time < time.now)
for job in jobs:
success = UPDATE status = 'IN_PROGRESS',
version = job.version + 1
WHERE id = job.id
AND status = 'WAITING'
AND version = job.version
if(success):
job_type = job.getTypeFromMetaData()
switch(job_type):
...
UPDATE status = 'DONE' WHERE id = job.id
Just to explain the important bits of the pseudo code,
- The
version
is there to ensure only one update call succeeds if multiple of those are made. - Updating
start_time
marks when the job was started - The
status
fields marks the item.
I like to keep the status as DONE
instead of deleting the rows right away, it works as a way of logs as how did the jobs go. You could perform a bulk delete when your start_time
is past 1 month or whatever that suits you.
We need just another set of code that checks the failed jobs if the application was killed in the process.
// Reset stale jobs
jobs = getJobs(status = `IN_PROGRESS`
&& start_time - time.now > stale_time )
for job in jobs:
UPDATE status = `WAITING`,
version=job.version + 1
WHERE version = job.version
// Clean old jobs
DELETE jobs WHERE scheduled_time - time.now > `3 months`
Here we are putting those jobs back in WAITING
where the start_time
was not updated to DONE
for a time we expect it to have it completed. Consider it a TTL. We could also add a heartbeat column if we are expecting to have some task running for a long time.
Run thes pieces of code at a set interval of either a minute, 5 mintues or 10 minutes or whatever you think is best.
This solution is language agnostic hence no mention of implementation code.
Member discussion