• Home
  • Module code
  • wte
  • wte.scripts.timed_tasks

Source code for wte.scripts.timed_tasks

# -*- coding: utf-8 -*-
"""
###########################################################
:mod:`wte.scripts.timed_tasks` -- Script to run timed tasks
###########################################################

The :mod:`~wte.scripts.timed_tasks` module provides the functionality for
running :class:`~wte.models.TimedTask`. Only a single instance of it should
ever be run as otherwise it is possible that tasks are run multiple times.

.. moduleauthor:: Mark Hall <mark.hall@work.room3b.eu>
"""
import logging
import transaction

from pyramid.paster import (get_appsettings, setup_logging)
from random import randint
from pywebtools.sqlalchemy import DBSession
from sqlalchemy import (engine_from_config, and_, func)
from threading import Thread

from wte.models import (Base, TimedTask, Part)


[docs]def init(subparsers): """Initialises the :class:`~argparse.ArgumentParser`, adding the "run-timed-tasks" command that runs :func:`~wte.scripts.timed_tasks.run_timed_tasks`. """ parser = subparsers.add_parser('run-timed-tasks', help='Run all timed tasks that are due') parser.add_argument('configuration', help='WTE configuration file') parser.set_defaults(func=run_timed_tasks)
[docs]def run_timed_tasks(args): """Runs all timed tasks where the timestamp is in the past and the status is "ready". All :class:`~wte.models.TimedTask` that are to be run are given a unique "run-{random-number}" ``status`` to uniquely identify them for this run. Individual task runners are then responsible for setting that status to "completed" after the task completes successfully or to "failed" if it failed. All task runners are run in independent :class:`threading.Thread`\ s. After all :class:`~threading.Thread` complete, any :class:`~wte.models.TimedTask` that still have the unique "run-{random-number}" status are automatically set to the "failed" status. """ settings = get_appsettings(args.configuration) setup_logging(args.configuration) engine = engine_from_config(settings, 'sqlalchemy.') DBSession.configure(bind=engine) Base.metadata.bind = engine dbsession = DBSession() tasks = dbsession.query(TimedTask).filter(and_(TimedTask.timestamp <= func.now(), TimedTask.status == 'ready')) rnd = randint(0, 1000000) with transaction.manager: tasks.update({TimedTask.status: 'running-%i' % (rnd)}, synchronize_session=False) tasks = dbsession.query(TimedTask).filter(TimedTask.status == 'running-%i' % (rnd)) task_count = tasks.count() if task_count > 0: logging.getLogger('wte').info('Running %i tasks' % (task_count)) threads = [] for task in tasks: if task.name == 'change_status': threads.append(Thread(None, target=run_change_status, args=(task.id,))) for thread in threads: thread.start() for thread in threads: thread.join() dbsession.flush() failed_count = tasks.count() if failed_count > 0: logging.getLogger('wte').error('%i tasks failed' % (failed_count)) else: logging.getLogger('wte').info('All tasks completed') with transaction.manager: tasks.update({TimedTask.status: 'failed'})
[docs]def run_change_status(task_id): """Run the status change task for the :class:`~wte.models.TimedTask` with the id ``task_id``. It changes the :class:`~wte.models.TimedTask`\ s status to the value of the "target_status" key in the :attr:`~wte.models.TimedTask.options`. """ dbsession = DBSession() task = dbsession.query(TimedTask).filter(TimedTask.id == task_id).first() if task: part = dbsession.query(Part).filter(Part.id == task.part_id).first() if part and 'target_status' in task.options: with transaction.manager: dbsession.add(part) dbsession.add(task) part.status = task.options['target_status'] with transaction.manager: dbsession.add(task) task.status = 'completed'