From patchwork Wed Jun 22 22:53:13 2011 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Michael-Doyle Hudson X-Patchwork-Id: 2187 Return-Path: X-Original-To: patchwork@peony.canonical.com Delivered-To: patchwork@peony.canonical.com Received: from fiordland.canonical.com (fiordland.canonical.com [91.189.94.145]) by peony.canonical.com (Postfix) with ESMTP id 5BFAC23F08 for ; Wed, 22 Jun 2011 22:53:16 +0000 (UTC) Received: from mail-vx0-f180.google.com (mail-vx0-f180.google.com [209.85.220.180]) by fiordland.canonical.com (Postfix) with ESMTP id B5E1DA18577 for ; Wed, 22 Jun 2011 22:53:15 +0000 (UTC) Received: by vxd7 with SMTP id 7so1476249vxd.11 for ; Wed, 22 Jun 2011 15:53:15 -0700 (PDT) Received: by 10.52.98.97 with SMTP id eh1mr1833981vdb.7.1308783195014; Wed, 22 Jun 2011 15:53:15 -0700 (PDT) X-Forwarded-To: linaro-patchwork@canonical.com X-Forwarded-For: patch@linaro.org linaro-patchwork@canonical.com Delivered-To: patches@linaro.org Received: by 10.52.183.130 with SMTP id em2cs169723vdc; Wed, 22 Jun 2011 15:53:14 -0700 (PDT) Received: by 10.216.142.133 with SMTP id i5mr1250419wej.43.1308783193703; Wed, 22 Jun 2011 15:53:13 -0700 (PDT) Received: from adelie.canonical.com (adelie.canonical.com [91.189.90.139]) by mx.google.com with ESMTP id 81si2475949wem.92.2011.06.22.15.53.13; Wed, 22 Jun 2011 15:53:13 -0700 (PDT) Received-SPF: pass (google.com: best guess record for domain of bounces@canonical.com designates 91.189.90.139 as permitted sender) client-ip=91.189.90.139; Authentication-Results: mx.google.com; spf=pass (google.com: best guess record for domain of bounces@canonical.com designates 91.189.90.139 as permitted sender) smtp.mail=bounces@canonical.com Received: from loganberry.canonical.com ([91.189.90.37]) by adelie.canonical.com with esmtp (Exim 4.71 #1 (Debian)) id 1QZWIL-0005bi-0s for ; Wed, 22 Jun 2011 22:53:13 +0000 Received: from loganberry.canonical.com (localhost [127.0.0.1]) by loganberry.canonical.com (Postfix) with ESMTP id 0335C2E8134 for ; Wed, 22 Jun 2011 22:53:13 +0000 (UTC) MIME-Version: 1.0 X-Launchpad-Project: lava-scheduler X-Launchpad-Branch: ~linaro-validation/lava-scheduler/trunk X-Launchpad-Message-Rationale: Subscriber X-Launchpad-Branch-Revision-Number: 5 X-Launchpad-Notification-Type: branch-revision To: Linaro Patch Tracker From: noreply@launchpad.net Subject: [Branch ~linaro-validation/lava-scheduler/trunk] Rev 5: first cut at a scheduler daemon: this one just uses the filesystem as its datastore Message-Id: <20110622225313.6818.78445.launchpad@loganberry.canonical.com> Date: Wed, 22 Jun 2011 22:53:13 -0000 Reply-To: noreply@launchpad.net Sender: bounces@canonical.com Errors-To: bounces@canonical.com Precedence: bulk X-Generated-By: Launchpad (canonical.com); Revision="13265"; Instance="initZopeless config overlay" X-Launchpad-Hash: 9c2c14f91b9c0c4c4ad35f7a8cf7ecf52f877cb5 Merge authors: Michael Hudson-Doyle (mwhudson) Related merge proposals: https://code.launchpad.net/~mwhudson/lava-scheduler/daemon-v0/+merge/65287 proposed by: Michael Hudson-Doyle (mwhudson) review: Approve - Paul Larson (pwlars) ------------------------------------------------------------ revno: 5 [merge] committer: Michael-Doyle Hudson branch nick: trunk timestamp: Thu 2011-06-23 10:51:42 +1200 message: first cut at a scheduler daemon: this one just uses the filesystem as its datastore added: fake-dispatcher lava-scheduler-daemon.tac lava_scheduler_daemon/ lava_scheduler_daemon/__init__.py lava_scheduler_daemon/board.py lava_scheduler_daemon/jobsource.py lava_scheduler_daemon/service.py lava_scheduler_daemon/tests/ lava_scheduler_daemon/tests/__init__.py lava_scheduler_daemon/tests/test_board.py modified: .bzrignore --- lp:lava-scheduler https://code.launchpad.net/~linaro-validation/lava-scheduler/trunk You are subscribed to branch lp:lava-scheduler. To unsubscribe from this branch go to https://code.launchpad.net/~linaro-validation/lava-scheduler/trunk/+edit-subscription === modified file '.bzrignore' --- .bzrignore 2011-06-07 06:10:56 +0000 +++ .bzrignore 2011-06-21 00:58:19 +0000 @@ -1,1 +1,3 @@ *.egg-info +./twistd.pid +./_trial_temp === added file 'fake-dispatcher' --- fake-dispatcher 1970-01-01 00:00:00 +0000 +++ fake-dispatcher 2011-06-15 04:57:02 +0000 @@ -0,0 +1,6 @@ +#!/bin/sh +echo starting processing $1 +echo error >&2 +sleep 10 +cat $1 +echo ending === added file 'lava-scheduler-daemon.tac' --- lava-scheduler-daemon.tac 1970-01-01 00:00:00 +0000 +++ lava-scheduler-daemon.tac 2011-06-20 01:08:32 +0000 @@ -0,0 +1,22 @@ +import logging +import sys + +from twisted.application import service +from twisted.application import internet +from twisted.python import filepath +from twisted.internet import reactor + +from lava_scheduler_daemon.service import BoardSet +from lava_scheduler_daemon.jobsource import DirectoryJobSource + +application = service.Application("lava scheduler daemon") + +source = DirectoryJobSource(filepath.FilePath('/tmp/lava-jobs')) +board_set = BoardSet(source, 'fake-dispatcher', reactor) +board_set.setServiceParent(application) + +logger = logging.getLogger('') +handler = logging.StreamHandler(sys.stdout) +handler.setFormatter(logging.Formatter("[%(name)s] %(message)s")) +logger.addHandler(handler) +logger.setLevel(logging.DEBUG) === added directory 'lava_scheduler_daemon' === added file 'lava_scheduler_daemon/__init__.py' === added file 'lava_scheduler_daemon/board.py' --- lava_scheduler_daemon/board.py 1970-01-01 00:00:00 +0000 +++ lava_scheduler_daemon/board.py 2011-06-21 03:10:04 +0000 @@ -0,0 +1,223 @@ +import json +import os +import tempfile +import logging + +from twisted.internet.protocol import ProcessProtocol +from twisted.internet import defer + + +logger = logging.getLogger(__name__) + + +class DispatcherProcessProtocol(ProcessProtocol): + + logger = logger.getChild('DispatcherProcessProtocol') + + def __init__(self, deferred): + self.deferred = deferred + + def connectionMade(self): + fd, self._logpath = tempfile.mkstemp() + self._output = os.fdopen(fd, 'wb') + + def outReceived(self, text): + self._output.write(text) + + errReceived = outReceived + + def _cleanUp(self, result): + os.unlink(self._logpath) + return result + + def processEnded(self, reason): + # This discards the process exit value. + self._output.close() + self.deferred.callback(self._logpath) + self.deferred.addCallback(self._cleanUp) + + +class Job(object): + + logger = logger.getChild('Job') + + def __init__(self, json_data, dispatcher, reactor): + self.json_data = json_data + self.dispatcher = dispatcher + self.reactor = reactor + self._json_file = None + + def run(self): + d = defer.Deferred() + fd, self._json_file = tempfile.mkstemp() + with os.fdopen(fd, 'wb') as f: + json.dump(self.json_data, f) + self.reactor.spawnProcess( + DispatcherProcessProtocol(d), self.dispatcher, + args=[self.dispatcher, self._json_file], + childFDs={0:0, 1:'r', 2:'r'}) + d.addBoth(self._exited) + return d + + def _exited(self, log_file_path): + self.logger.info("job finished on %s", self.json_data['target']) + if self._json_file is not None: + os.unlink(self._json_file) + return log_file_path + + +class Board(object): + """ + + A board runs jobs. A board can be in four main states: + + * stopped (S) + * the board is not looking for or processing jobs + * checking (C) + * a call to check for a new job is in progress + * waiting (W) + * no job was found by the last call to getJobForBoard and so the board + is waiting for a while before calling again. + * running (R) + * a job is running (or a job has completed but the call to jobCompleted + on the job source has not) + + In addition, because we can't stop a job instantly nor abort a check for a + new job safely (because a if getJobForBoard returns a job, it has already + been marked as started), there are variations on the 'checking' and + 'running' states -- 'checking with stop requested' (C+S) and 'running with + stop requested' (R+S). Even this is a little simplistic as there is the + possibility of .start() being called before the process of stopping + completes, but we deal with this by deferring any actions taken by + .start() until the board is really stopped. + + Events that cause state transitions are: + + * start() is called. We cheat and pretend that this can only happen in + the stopped state by stopping first, and then move into the C state. + + * stop() is called. If we in the C or R state we move to C+S or R+S + resepectively. If we are in S, C+S or R+S, we stay there. If we are + in W, we just move straight to S. + + * getJobForBoard() returns a job. We can only be in C or C+S here, and + move into R or R+S respectively. + + * getJobForBoard() indicates that there is no job to perform. Again we + can only be in C or C+S and move into W or S respectively. + + * a job completes (i.e. the call to jobCompleted() on the source + returns). We can only be in R or R+S and move to C or S respectively. + + * the timer that being in state W implies expires. We move into C. + + The cheating around start means that interleaving start and stop calls may + not always do what you expect. So don't mess around in that way please. + """ + + logger = logger.getChild('Board') + + job_cls = Job + + def __init__(self, source, board_name, dispatcher, reactor, job_cls=None): + self.source = source + self.board_name = board_name + self.dispatcher = dispatcher + self.reactor = reactor + if job_cls is not None: + self.job_cls = job_cls + self.running_job = None + self._check_call = None + self._stopping_deferreds = [] + self.logger = self.logger.getChild(board_name) + self.checking = False + + def _state_name(self): + if self.running_job: + state = "R" + elif self._check_call: + assert not self._stopping_deferreds + state = "W" + elif self.checking: + state = "C" + else: + assert not self._stopping_deferreds + state = "S" + if self._stopping_deferreds: + state += "+S" + return state + + def start(self): + self.logger.debug("start requested") + self.stop().addCallback(self._start) + + def _start(self, ignored): + self.logger.debug("starting") + self._stopping_deferreds = [] + self._checkForJob() + + def stop(self): + self.logger.debug("stopping") + if self._check_call is not None: + self._check_call.cancel() + self._check_call = None + + if self.running_job is not None or self.checking: + self.logger.debug("job running; deferring stop") + self._stopping_deferreds.append(defer.Deferred()) + return self._stopping_deferreds[-1] + else: + self.logger.debug("stopping immediately") + return defer.succeed(None) + + def _checkForJob(self): + self.logger.debug("checking for job") + self._check_call = None + self.checking = True + self.source.getJobForBoard(self.board_name).addCallbacks( + self._maybeStartJob, self._ebCheckForJob) + + def _ebCheckForJob(self, result): + self.logger.exception(result.value) + self._maybeStartJob(None) + + def _finish_stop(self): + self.logger.debug( + "calling %s deferreds returned from stop()", + len(self._stopping_deferreds)) + for d in self._stopping_deferreds: + d.callback(None) + self._stopping_deferreds = [] + + def _maybeStartJob(self, json_data): + self.checking = False + if json_data is None: + self.logger.debug("no job found") + if self._stopping_deferreds: + self._finish_stop() + else: + self._check_call = self.reactor.callLater( + 10, self._checkForJob) + return + self.logger.debug("starting job") + self.running_job = self.job_cls( + json_data, self.dispatcher, self.reactor) + d = self.running_job.run() + d.addCallbacks(self._cbJobFinished, self._ebJobFinished) + + def _cbJobFinished(self, log_file_path): + self.logger.info("reporting job completed") + self.source.jobCompleted( + self.board_name, log_file_path). addCallback( + self._cbJobCompleted) + + def _ebJobFinished(self, result): + self.logger.exception(result.value) + self._checkForJob() + + def _cbJobCompleted(self, result): + self.running_job = None + if self._stopping_deferreds: + self._finish_stop() + else: + self._checkForJob() === added file 'lava_scheduler_daemon/jobsource.py' --- lava_scheduler_daemon/jobsource.py 1970-01-01 00:00:00 +0000 +++ lava_scheduler_daemon/jobsource.py 2011-06-21 03:15:13 +0000 @@ -0,0 +1,95 @@ +import json +import logging + +from twisted.internet import defer + +from zope.interface import ( + implements, + Interface, + ) + +logger = logging.getLogger(__name__) + + +class IJobSource(Interface): + + def getBoardList(): + """Get the list of currently configured board names.""" + + def getJobForBoard(board_name): + """Return the json data of a job for board_name to run. + + The job should be marked as started before it is returned. + """ + + def jobCompleted(board_name, log_file_path): + """Mark the job currently running on `board_name` as completed.""" + + +class DirectoryJobSource(object): + + implements(IJobSource) + + logger = logger.getChild('DirectoryJobSource') + + def __init__(self, directory): + self.directory = directory + if not self.directory.isdir(): + self.logger.critical("%s is not a directory", self.directory) + raise RuntimeError("%s must be a directory" % self.directory) + boards = self.directory.child('boards') + if not boards.isdir(): + self.logger.critical("%s is not a directory", boards) + raise RuntimeError("%s must be a directory" % boards) + for subdir in 'incoming', 'completed', 'broken': + subdir = self.directory.child(subdir) + if not subdir.isdir(): + subdir.createDirectory() + self.logger.info("starting to look for jobs in %s", self.directory) + + def _getBoardList(self): + return self.directory.child('boards').listdir() + + def getBoardList(self): + return defer.maybeDeferred(self._getBoardList) + + def _jsons(self, kind): + files = self.directory.child(kind).globChildren("*.json") + for json_file in files: + yield (json.load(json_file.open()), json_file) + + def _board_dir(self, board_name): + return self.directory.child('boards').child(board_name) + + def _getJobForBoard(self, board_name): + self.logger.debug('getting job for %s', board_name) + board_dir = self._board_dir(board_name) + if board_dir.listdir() != []: + self.logger.debug('board %s busy', board_name) + return None + for json_data, json_file in self._jsons('incoming'): + self.logger.debug('considering %s for %s', json_file, board_name) + if json_data['target'] == board_name: + self.logger.debug('running %s on %s', json_file, board_name) + json_file.moveTo(board_dir.child(json_file.basename())) + return json_data + else: + return None + + def getJobForBoard(self, board_name): + return defer.maybeDeferred(self._getJobForBoard, board_name) + + def _jobCompleted(self, board_name, log_file_path): + [json_file] = self._board_dir(board_name).children() + completed = self.directory.child('completed') + counter = 0 + while True: + fname = '%03d%s' % (counter, json_file.basename()) + if not completed.child(fname).exists(): + break + counter += 1 + json_file.moveTo(completed.child(fname)) + + def jobCompleted(self, board_name, log_file_path): + return defer.maybeDeferred( + self._jobCompleted, board_name, log_file_path) === added file 'lava_scheduler_daemon/service.py' --- lava_scheduler_daemon/service.py 1970-01-01 00:00:00 +0000 +++ lava_scheduler_daemon/service.py 2011-06-17 02:44:13 +0000 @@ -0,0 +1,56 @@ +import logging + +from twisted.application.service import Service +from twisted.internet import defer +from twisted.internet.task import LoopingCall + +from lava_scheduler_daemon.board import Board + + +logger = logging.getLogger(__name__) + + +class BoardSet(Service): + + logger = logger.getChild('BoardSet') + + def __init__(self, source, dispatcher, reactor): + self.source = source + self.boards = {} + self.dispatcher = dispatcher + self.reactor = reactor + self._update_boards_call = LoopingCall(self._updateBoards) + self._update_boards_call.clock = reactor + + def _updateBoards(self): + self.logger.info("Refreshing board list") + return self.source.getBoardList().addCallback(self._cbUpdateBoards) + + def _cbUpdateBoards(self, board_names): + self.logger.info("New board list %s", board_names) + new_boards = {} + for board_name in board_names: + if board_name in self.boards: + new_boards[board_name] = self.boards.pop(board_name) + else: + new_boards[board_name] = Board( + self.source, board_name, self.dispatcher, self.reactor) + new_boards[board_name].start() + for board in self.boards.values(): + board.stop() + self.boards = new_boards + + def startService(self): + self._update_boards_call.start(20) + + def stopService(self): + self._update_boards_call.stop() + ds = [] + dead_boards = [] + for board in self.boards.itervalues(): + ds.append(board.stop().addCallback(dead_boards.append)) + self.logger.info( + "waiting for %s boards", len(self.boards) - len(dead_boards)) + return defer.gatherResults(ds) + + === added directory 'lava_scheduler_daemon/tests' === added file 'lava_scheduler_daemon/tests/__init__.py' === added file 'lava_scheduler_daemon/tests/test_board.py' --- lava_scheduler_daemon/tests/test_board.py 1970-01-01 00:00:00 +0000 +++ lava_scheduler_daemon/tests/test_board.py 2011-06-21 03:10:04 +0000 @@ -0,0 +1,184 @@ +from collections import defaultdict +import logging + +from twisted.internet import defer +from twisted.internet.task import Clock +from twisted.trial.unittest import TestCase + +from lava_scheduler_daemon.board import Board + +def stub_method(method_name): + def method_impl(self, board_name, *args): + assert method_name not in self._requests[board_name], ( + 'overlapping call to %s on %s' % (method_name, board_name)) + d = self._requests[method_name][board_name] = defer.Deferred() + def _remove_request(result): + del self._requests[method_name][board_name] + return result + d.addBoth(_remove_request) + self._calls[board_name][method_name].append(args) + return d + return method_impl + + +class TestJobSource(object): + + def __init__(self): + self._calls = defaultdict(lambda :defaultdict(list)) + self._requests = defaultdict(dict) + + jobCompleted = stub_method('jobCompleted') + getJobForBoard = stub_method('getJobForBoard') + + def _completeCall(self, method_name, board_name, result): + self._requests[method_name][board_name].callback(result) + +class TestJob(object): + + def __init__(self, json_data, dispatcher, reactor): + self.json_data = json_data + self.dispatcher = dispatcher + self.reactor = reactor + self.deferred = defer.Deferred() + + def run(self): + return self.deferred + + +class AppendingHandler(logging.Handler): + + def __init__(self, target_list): + logging.Handler.__init__(self) + self.target_list = target_list + + def emit(self, record): + self.target_list.append((record.levelno, self.format(record))) + + +class TestBoard(TestCase): + + def setUp(self): + TestCase.setUp(self) + self.clock = Clock() + self.source = TestJobSource() + self._log_messages = [] + self._handler = AppendingHandler(self._log_messages) + self.addCleanup(self._checkNoLogs) + + def _checkNoLogs(self): + warnings = [message for (level, message) in self._log_messages + if level >= logging.WARNING] + if warnings: + self.fail("Logged warnings: %s" % warnings) + + def make_board(self, board_name): + board = Board(self.source, board_name, 'script', self.clock, TestJob) + board.logger.addHandler(self._handler) + board.logger.setLevel(logging.DEBUG) + return board + + def test_initial_state_is_stopped(self): + b = self.make_board('board') + self.assertEqual('S', b._state_name()) + + def test_start_checks(self): + b = self.make_board('board') + b.start() + self.assertEqual('C', b._state_name()) + + def test_no_job_waits(self): + b = self.make_board('board') + b.start() + self.source._completeCall('getJobForBoard', 'board', None) + self.assertEqual('W', b._state_name()) + + def test_actual_job_runs(self): + b = self.make_board('board') + b.start() + self.source._completeCall('getJobForBoard', 'board', {}) + self.assertEqual('R', b._state_name()) + + def test_completion_calls_jobCompleted(self): + b = self.make_board('board') + b.start() + self.source._completeCall('getJobForBoard', 'board', {}) + b.running_job.deferred.callback('path') + self.assertEqual( + 1, len(self.source._calls['board']['jobCompleted'])) + + def test_still_running_during_jobCompleted(self): + b = self.make_board('board') + b.start() + self.source._completeCall('getJobForBoard', 'board', {}) + b.running_job.deferred.callback('path') + self.assertEqual('R', b._state_name()) + + def test_check_again_on_completion(self): + b = self.make_board('board') + b.start() + self.source._completeCall('getJobForBoard', 'board', {}) + b.running_job.deferred.callback('path') + self.source._completeCall('jobCompleted', 'board', None) + self.assertEqual('C', b._state_name()) + + def test_stop_while_checking_moves_to_check_plus_stop(self): + b = self.make_board('board') + b.start() + b.stop() + self.assertEqual('C+S', b._state_name()) + + def test_stop_while_checking_no_job_stops(self): + b = self.make_board('board') + b.start() + s = b.stop() + stop_results = [] + s.addCallback(stop_results.append) + self.assertEqual(0, len(stop_results)) + self.source._completeCall('getJobForBoard', 'board', None) + self.assertEqual(1, len(stop_results)) + self.assertEqual('S', b._state_name()) + + def test_stop_while_checking_actual_job_runs(self): + b = self.make_board('board') + b.start() + s = b.stop() + stop_results = [] + s.addCallback(stop_results.append) + self.assertEqual(0, len(stop_results)) + self.source._completeCall('getJobForBoard', 'board', {}) + self.assertEqual(0, len(stop_results)) + self.assertEqual('R+S', b._state_name()) + + def test_stop_while_checking_actual_job_stops_on_complete(self): + b = self.make_board('board') + b.start() + s = b.stop() + stop_results = [] + s.addCallback(stop_results.append) + self.assertEqual(0, len(stop_results)) + self.source._completeCall('getJobForBoard', 'board', {}) + b.running_job.deferred.callback(None) + self.source._completeCall('jobCompleted', 'board', None) + self.assertEqual(1, len(stop_results)) + self.assertEqual('S', b._state_name()) + + def test_stop_while_running_job_stops_on_complete(self): + b = self.make_board('board') + b.start() + self.source._completeCall('getJobForBoard', 'board', {}) + self.assertEqual('R', b._state_name()) + s = b.stop() + stop_results = [] + s.addCallback(stop_results.append) + self.assertEqual(0, len(stop_results)) + b.running_job.deferred.callback(None) + self.source._completeCall('jobCompleted', 'board', None) + self.assertEqual(1, len(stop_results)) + self.assertEqual('S', b._state_name()) + + def test_wait_expires_check_again(self): + b = self.make_board('board') + b.start() + self.source._completeCall('getJobForBoard', 'board', None) + self.clock.advance(10000) # hack: the delay should be config data + self.assertEqual('C', b._state_name())