From patchwork Thu Aug 18 04:44:13 2011 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Michael-Doyle Hudson X-Patchwork-Id: 3498 Return-Path: X-Original-To: patchwork@peony.canonical.com Delivered-To: patchwork@peony.canonical.com Received: from fiordland.canonical.com (fiordland.canonical.com [91.189.94.145]) by peony.canonical.com (Postfix) with ESMTP id D215A23F6B for ; Thu, 18 Aug 2011 04:44:15 +0000 (UTC) Received: from mail-ew0-f52.google.com (mail-ew0-f52.google.com [209.85.215.52]) by fiordland.canonical.com (Postfix) with ESMTP id A9FD6A18294 for ; Thu, 18 Aug 2011 04:44:15 +0000 (UTC) Received: by ewy28 with SMTP id 28so891823ewy.11 for ; Wed, 17 Aug 2011 21:44:15 -0700 (PDT) Received: by 10.213.3.155 with SMTP id 27mr1566481ebn.18.1313642655206; Wed, 17 Aug 2011 21:44:15 -0700 (PDT) X-Forwarded-To: linaro-patchwork@canonical.com X-Forwarded-For: patch@linaro.org linaro-patchwork@canonical.com Delivered-To: patches@linaro.org Received: by 10.213.102.5 with SMTP id e5cs55601ebo; Wed, 17 Aug 2011 21:44:14 -0700 (PDT) Received: by 10.227.113.71 with SMTP id z7mr195798wbp.77.1313642654033; Wed, 17 Aug 2011 21:44:14 -0700 (PDT) Received: from indium.canonical.com (indium.canonical.com [91.189.90.7]) by mx.google.com with ESMTPS id fs19si2340698wbb.22.2011.08.17.21.44.13 (version=TLSv1/SSLv3 cipher=OTHER); Wed, 17 Aug 2011 21:44:14 -0700 (PDT) Received-SPF: pass (google.com: best guess record for domain of bounces@canonical.com designates 91.189.90.7 as permitted sender) client-ip=91.189.90.7; Authentication-Results: mx.google.com; spf=pass (google.com: best guess record for domain of bounces@canonical.com designates 91.189.90.7 as permitted sender) smtp.mail=bounces@canonical.com Received: from ackee.canonical.com ([91.189.89.26]) by indium.canonical.com with esmtp (Exim 4.71 #1 (Debian)) id 1QtuSj-0005fw-Bw for ; Thu, 18 Aug 2011 04:44:13 +0000 Received: from ackee.canonical.com (localhost [127.0.0.1]) by ackee.canonical.com (Postfix) with ESMTP id 4BE10E03A6 for ; Thu, 18 Aug 2011 04:44:13 +0000 (UTC) MIME-Version: 1.0 X-Launchpad-Project: lava-scheduler X-Launchpad-Branch: ~linaro-validation/lava-scheduler/trunk X-Launchpad-Message-Rationale: Subscriber X-Launchpad-Branch-Revision-Number: 66 X-Launchpad-Notification-Type: branch-revision To: Linaro Patch Tracker From: noreply@launchpad.net Subject: [Branch ~linaro-validation/lava-scheduler/trunk] Rev 66: The scheduler now runs a subprocess which runs the dispatcher and reports Message-Id: <20110818044413.19141.77917.launchpad@ackee.canonical.com> Date: Thu, 18 Aug 2011 04:44:13 -0000 Reply-To: noreply@launchpad.net Sender: bounces@canonical.com Errors-To: bounces@canonical.com Precedence: bulk X-Generated-By: Launchpad (canonical.com); Revision="13697"; Instance="initZopeless config overlay" X-Launchpad-Hash: 06d420f55ee2c09cace9479287d15dd48543f3eb Merge authors: Michael Hudson-Doyle (mwhudson) ------------------------------------------------------------ revno: 66 [merge] committer: Michael-Doyle Hudson branch nick: trunk timestamp: Thu 2011-08-18 16:41:46 +1200 message: The scheduler now runs a subprocess which runs the dispatcher and reports completion status to the db; this means that the scheduler can be killed and restarted while jobs are still running. added: lava-scheduler-monitor modified: lava-scheduler lava_scheduler_app/tests.py lava_scheduler_daemon/board.py lava_scheduler_daemon/dbjobsource.py lava_scheduler_daemon/main.py lava_scheduler_daemon/tests/test_board.py setup.py --- lp:lava-scheduler https://code.launchpad.net/~linaro-validation/lava-scheduler/trunk You are subscribed to branch lp:lava-scheduler. To unsubscribe from this branch go to https://code.launchpad.net/~linaro-validation/lava-scheduler/trunk/+edit-subscription === modified file 'lava-scheduler' --- lava-scheduler 2011-07-27 07:02:12 +0000 +++ lava-scheduler 2011-08-18 02:44:06 +0000 @@ -6,6 +6,6 @@ # at package build time. os.environ['DJANGO_SETTINGS_MODULE'] = 'lava_server.settings.development' -from lava_scheduler_daemon.main import main +from lava_scheduler_daemon.main import daemon_main -main() +daemon_main() === added file 'lava-scheduler-monitor' --- lava-scheduler-monitor 1970-01-01 00:00:00 +0000 +++ lava-scheduler-monitor 2011-08-18 02:44:06 +0000 @@ -0,0 +1,11 @@ +#!/usr/bin/python + +import os + +# The following line is mangled to point at the debian settings file +# at package build time. +os.environ['DJANGO_SETTINGS_MODULE'] = 'lava_server.settings.development' + +from lava_scheduler_daemon.main import monitor_main + +monitor_main() === modified file 'lava_scheduler_app/tests.py' --- lava_scheduler_app/tests.py 2011-07-27 10:10:10 +0000 +++ lava_scheduler_app/tests.py 2011-08-18 03:24:09 +0000 @@ -195,17 +195,7 @@ requested_device=device, definition=json.dumps(definition)) transaction.commit() self.assertEqual( - definition, DatabaseJobSource().getJobForBoard_impl('panda01')[0]) - - def test_getJobForBoard_returns_writable_file(self): - device = self.factory.make_device(hostname='panda01') - definition = {'foo': 'bar'} - self.factory.make_testjob( - requested_device=device, definition=json.dumps(definition)) - transaction.commit() - log_file = DatabaseJobSource().getJobForBoard_impl('panda01')[1] - log_file.write('a') - log_file.close() + definition, DatabaseJobSource().getJobForBoard_impl('panda01')) def test_getJobForBoard_returns_None_if_no_job(self): self.factory.make_device(hostname='panda01') @@ -223,7 +213,7 @@ transaction.commit() definition['target'] = 'panda01' self.assertEqual( - definition, DatabaseJobSource().getJobForBoard_impl('panda01')[0]) + definition, DatabaseJobSource().getJobForBoard_impl('panda01')) def test_getJobForBoard_prefers_older(self): panda_type = self.factory.ensure_device_type(name='panda') @@ -240,7 +230,7 @@ transaction.commit() self.assertEqual( first_definition, - DatabaseJobSource().getJobForBoard_impl('panda01')[0]) + DatabaseJobSource().getJobForBoard_impl('panda01')) def test_getJobForBoard_prefers_directly_targeted(self): panda_type = self.factory.ensure_device_type(name='panda') @@ -258,7 +248,7 @@ transaction.commit() self.assertEqual( device_definition, - DatabaseJobSource().getJobForBoard_impl('panda01')[0]) + DatabaseJobSource().getJobForBoard_impl('panda01')) def test_getJobForBoard_avoids_targeted_to_other_board_of_same_type(self): panda_type = self.factory.ensure_device_type(name='panda') @@ -352,3 +342,13 @@ DatabaseJobSource().jobCompleted_impl('panda01') device = Device.objects.get(pk=device.pk) self.assertEquals(None, device.current_job) + + def test_getLogFileForJobOnBoard_returns_writable_file(self): + device, job = self.get_device_and_running_job() + definition = {'foo': 'bar'} + self.factory.make_testjob( + requested_device=device, definition=json.dumps(definition)) + transaction.commit() + log_file = DatabaseJobSource().getLogFileForJobOnBoard_impl('panda01') + log_file.write('a') + log_file.close() === modified file 'lava_scheduler_daemon/board.py' --- lava_scheduler_daemon/board.py 2011-08-18 02:25:09 +0000 +++ lava_scheduler_daemon/board.py 2011-08-18 04:02:33 +0000 @@ -71,8 +71,13 @@ self._json_file = None def run(self): + d = self.source.getLogFileForJobOnBoard(self.board_name) + return d.addCallback(self._run).addErrback( + catchall_errback(self.logger)) + + def _run(self, log_file): d = defer.Deferred() - json_data, log_file = self.job_data + json_data = self.job_data fd, self._json_file = tempfile.mkstemp() with os.fdopen(fd, 'wb') as f: json.dump(json_data, f) @@ -86,7 +91,48 @@ return d def _exited(self, result): - self.logger.info("job finished on %s", self.job_data[0]['target']) + self.logger.info("job finished on %s", self.job_data['target']) + if self._json_file is not None: + os.unlink(self._json_file) + self.logger.info("reporting job completed") + return self.source.jobCompleted(self.board_name).addCallback( + lambda r:result) + + +class SimplePP(ProcessProtocol): + def __init__(self, d): + self.d = d + def processEnded(self, reason): + self.d.callback(None) + + +class MonitorJob(object): + + logger = logging.getLogger(__name__ + '.MonitorJob') + + def __init__(self, job_data, dispatcher, source, board_name, reactor): + self.job_data = job_data + self.dispatcher = dispatcher + self.source = source + self.board_name = board_name + self.reactor = reactor + self._json_file = None + + def run(self): + d = defer.Deferred() + json_data = self.job_data + fd, self._json_file = tempfile.mkstemp() + with os.fdopen(fd, 'wb') as f: + json.dump(json_data, f) + self.reactor.spawnProcess( + SimplePP(d), 'lava-scheduler-monitor', childFDs={0:0, 1:1, 2:2}, + env=None, args=[ + 'lava-scheduler-monitor', self.dispatcher, + self.board_name, self._json_file]) + d.addBoth(self._exited) + return d + + def _exited(self, result): if self._json_file is not None: os.unlink(self._json_file) return result @@ -140,7 +186,7 @@ not always do what you expect. So don't mess around in that way please. """ - job_cls = Job + job_cls = MonitorJob def __init__(self, source, board_name, dispatcher, reactor, job_cls=None): self.source = source @@ -231,16 +277,11 @@ d = self.running_job.run() d.addCallbacks(self._cbJobFinished, self._ebJobFinished) - def _cbJobFinished(self, result): - self.logger.info("reporting job completed") - self.source.jobCompleted( - self.board_name).addCallback(self._cbJobCompleted) - def _ebJobFinished(self, result): self.logger.exception(result.value) self._checkForJob() - def _cbJobCompleted(self, result): + def _cbJobFinished(self, result): self.running_job = None if self._stopping_deferreds: self._finish_stop() === modified file 'lava_scheduler_daemon/dbjobsource.py' --- lava_scheduler_daemon/dbjobsource.py 2011-08-18 02:19:55 +0000 +++ lava_scheduler_daemon/dbjobsource.py 2011-08-18 03:24:09 +0000 @@ -94,10 +94,7 @@ json_data = json.loads(job.definition) json_data['target'] = device.hostname transaction.commit() - log_file = job.log_file - log_file.file.close() - log_file.open('wb') - return json_data, log_file + return json_data else: # We don't really need to rollback here, as no modifying # operations have been made to the database. But Django is @@ -111,6 +108,19 @@ return self.deferForDB(self.getJobForBoard_impl, board_name) @transaction.commit_on_success() + def getLogFileForJobOnBoard_impl(self, board_name): + device = Device.objects.get(hostname=board_name) + device.status = Device.IDLE + job = device.current_job + log_file = job.log_file + log_file.file.close() + log_file.open('wb') + return log_file + + def getLogFileForJobOnBoard(self, board_name): + return self.deferForDB(self.getLogFileForJobOnBoard_impl, board_name) + + @transaction.commit_on_success() def jobCompleted_impl(self, board_name): self.logger.debug('marking job as complete on %s', board_name) device = Device.objects.get(hostname=board_name) === modified file 'lava_scheduler_daemon/main.py' --- lava_scheduler_daemon/main.py 2011-08-17 03:18:13 +0000 +++ lava_scheduler_daemon/main.py 2011-08-18 03:34:27 +0000 @@ -2,14 +2,29 @@ import os import sys -from twisted.internet import reactor +from twisted.internet import defer, reactor from lava_scheduler_daemon.service import BoardSet from lava_scheduler_daemon.config import get_config from lava_scheduler_daemon.dbjobsource import DatabaseJobSource -def main(): +def _configure_logging(): + logger = logging.getLogger('') + config = get_config('logging') + level = config.get("logging", "level") + destination = config.get("logging", 'destination', None) + if destination == '-': + handler = logging.StreamHandler(sys.stdout) + else: + handler = logging.FileHandler(destination) + handler.setFormatter( + logging.Formatter("[%(levelname)s] [%(name)s] %(message)s")) + logger.addHandler(handler) + logger.setLevel(getattr(logging, level)) + + +def daemon_main(): source = DatabaseJobSource() if sys.argv[1:] == ['--use-fake']: @@ -27,17 +42,20 @@ service = BoardSet(source, dispatcher, reactor) reactor.callWhenRunning(service.startService) - logger = logging.getLogger('') - config = get_config('logging') - level = config.get("logging", "level") - destination = config.get("logging", 'destination', None) - if destination == '-': - handler = logging.StreamHandler(sys.stdout) - else: - handler = logging.FileHandler(destination) - handler.setFormatter( - logging.Formatter("[%(levelname)s] [%(name)s] %(message)s")) - logger.addHandler(handler) - logger.setLevel(getattr(logging, level)) - + _configure_logging() + + reactor.run() + + +def monitor_main(): + import json + from lava_scheduler_daemon.board import Job + source = DatabaseJobSource() + dispatcher, board_name, json_file = sys.argv[1:] + job = Job( + json.load(open(json_file)), dispatcher, source, board_name, reactor) + def run(): + job.run().addCallback(lambda result: reactor.stop()) + reactor.callWhenRunning(run) + _configure_logging() reactor.run() === modified file 'lava_scheduler_daemon/tests/test_board.py' --- lava_scheduler_daemon/tests/test_board.py 2011-08-05 10:50:10 +0000 +++ lava_scheduler_daemon/tests/test_board.py 2011-08-18 04:29:11 +0000 @@ -101,27 +101,11 @@ self.source._completeCall('getJobForBoard', 'board', ({}, None)) self.assertEqual('R', b._state_name()) - def test_completion_calls_jobCompleted(self): - b = self.make_board('board') - b.start() - self.source._completeCall('getJobForBoard', 'board', ({}, None)) - b.running_job.deferred.callback('path') - self.assertEqual( - 1, len(self.source._calls['board']['jobCompleted'])) - - def test_still_running_during_jobCompleted(self): - b = self.make_board('board') - b.start() - self.source._completeCall('getJobForBoard', 'board', ({}, None)) - b.running_job.deferred.callback('path') - self.assertEqual('R', b._state_name()) - def test_check_again_on_completion(self): b = self.make_board('board') b.start() self.source._completeCall('getJobForBoard', 'board', ({}, None)) b.running_job.deferred.callback('path') - self.source._completeCall('jobCompleted', 'board', None) self.assertEqual('C', b._state_name()) def test_stop_while_checking_moves_to_check_plus_stop(self): @@ -161,7 +145,6 @@ self.assertEqual(0, len(stop_results)) self.source._completeCall('getJobForBoard', 'board', ({}, None)) b.running_job.deferred.callback(None) - self.source._completeCall('jobCompleted', 'board', None) self.assertEqual(1, len(stop_results)) self.assertEqual('S', b._state_name()) @@ -175,7 +158,6 @@ s.addCallback(stop_results.append) self.assertEqual(0, len(stop_results)) b.running_job.deferred.callback(None) - self.source._completeCall('jobCompleted', 'board', None) self.assertEqual(1, len(stop_results)) self.assertEqual('S', b._state_name()) === modified file 'setup.py' --- setup.py 2011-07-27 10:24:50 +0000 +++ setup.py 2011-08-18 02:44:06 +0000 @@ -32,7 +32,7 @@ [lava_server.extensions] scheduler = lava_scheduler_app.extension:SchedulerExtension """, - scripts=["lava-scheduler"], + scripts=["lava-scheduler", "lava-scheduler-monitor"], install_requires=[ "lava-server >= 0.4a1", "twisted",