=== modified file 'abrek/builtins.py'
@@ -70,6 +70,7 @@
arglist = ['*testname']
def run(self):
+ self.checkroot()
if len(self.args) != 1:
print "please specify the name of the test to install"
sys.exit(1)
@@ -93,6 +94,7 @@
help="Store processed test output to FILE")]
def run(self):
+ self.checkroot()
if len(self.args) != 1:
print "please specify the name of the test to run"
sys.exit(1)
=== modified file 'abrek/command.py'
@@ -14,6 +14,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from optparse import OptionParser
+import os
+import sys
class _AbrekOptionParser(OptionParser):
@@ -97,6 +99,11 @@
def get_subcommand(self, name):
return None
+ def checkroot(self):
+ if os.getuid() != 0:
+ print >> sys.stderr, ("**** WARNING: ROOT PERMISSIONS ARE OFTEN"
+ "REQUIRED FOR THIS OPERATION ****")
+
class AbrekCmdWithSubcommands(AbrekCmd):
=== modified file 'abrek/testdef.py'
@@ -27,7 +27,7 @@
from abrek.api import ITest
from abrek.bundle import DocumentIO
from abrek.config import get_config
-from abrek.utils import Tee, geturl, run_and_log, write_file
+from abrek.utils import geturl, run_and_log, write_file
class AbrekTest(ITest):
@@ -222,9 +222,9 @@
def _runsteps(self, resultsdir, quiet=False):
outputlog = os.path.join(resultsdir, 'testoutput.log')
- with Tee(outputlog, 'a', quiet=quiet) as fd:
+ with open(outputlog, 'a') as fd:
for cmd in self.steps:
- run_and_log(cmd, fd)
+ run_and_log(cmd, fd, quiet)
def run(self, resultsdir, quiet=False):
self.starttime = datetime.utcnow()
@@ -245,10 +245,10 @@
For example: If your testoutput had lines that look like:
"test01: PASS", then you could use a pattern like this:
"^(?P<testid>\w+):\W+(?P<result>\w+)"
- This would result in identifying "test01" as testid and "PASS"
- as result. Once parse() has been called, self.results.test_results[]
- contains a list of dicts of all the key,value pairs found for
- each test result
+ This would result in identifying "test01" as testid and
+ "PASS" as result. Once parse() has been called,
+ self.results.test_results[] contains a list of dicts of all the
+ key,value pairs found for each test result
fixupdict - dict of strings to convert test results to standard strings
For example: if you want to standardize on having pass/fail results
in lower case, but your test outputs them in upper case, you could
@@ -280,7 +280,9 @@
try:
pat = re.compile(self.pattern)
except Exception as strerror:
- raise RuntimeError("AbrekTestParser - Invalid regular expression '%s' - %s" %(self.pattern,strerror))
+ raise RuntimeError(
+ "AbrekTestParser - Invalid regular expression '%s' - %s" % (
+ self.pattern,strerror))
with open(filename, 'r') as stream:
for lineno, line in enumerate(stream, 1):
@@ -310,7 +312,8 @@
def appendtoall(self, entry):
"""Append entry to each item in the test_results.
- entry - dict of key,value pairs to add to each item in the test_results
+ entry - dict of key,value pairs to add to each item in the
+ test_results
"""
for t in self.results['test_results']:
t.update(entry)
=== modified file 'abrek/utils.py'
@@ -15,15 +15,16 @@
import os
import shutil
+import subprocess
import sys
import urllib2
import urlparse
-from subprocess import Popen, PIPE
_fake_files = None
_fake_paths = None
_fake_machine = None
+
class Tee(file):
""" A file-like object that optionally mimics tee functionality.
@@ -42,26 +43,29 @@
if self.quiet is False:
sys.stdout.write(data)
+
def geturl(url, path=""):
urlpath = urlparse.urlsplit(url).path
filename = os.path.basename(urlpath)
if path:
- filename = os.path.join(path,filename)
+ filename = os.path.join(path, filename)
fd = open(filename, "w")
try:
response = urllib2.urlopen(urllib2.quote(url, safe=":/"))
fd = open(filename, 'wb')
- shutil.copyfileobj(response,fd,0x10000)
+ shutil.copyfileobj(response, fd, 0x10000)
fd.close()
response.close()
except:
raise RuntimeError("Could not retrieve %s" % url)
return filename
+
def write_file(data, path):
with open(path, "w") as fd:
fd.write(data)
+
def read_file(path):
global _fake_files
global _fake_paths
@@ -75,6 +79,7 @@
data = fd.read()
return data
+
def fake_file(path, data=None, newpath=None):
"""
Set up a fake file to be read with read_file() in testing
@@ -93,6 +98,7 @@
_fake_paths = {}
_fake_paths[path] = newpath
+
def fake_machine(type):
"""
Set up a fake machine type for testing
@@ -100,27 +106,34 @@
global _fake_machine
_fake_machine = type
+
def clear_fakes():
global _fake_files
global _fake_paths
_fake_files = {}
_fake_paths = {}
+
def clear_fake_machine():
global _fake_machine
_fake_machine = None
-def run_and_log(cmd, fd):
+
+def run_and_log(cmd, fd, quiet=False):
"""
Run a command and log the output to fd
"""
- proc = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)
- output, err = proc.communicate()
- if output is not None:
- fd.write(output)
-
+ proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT, shell=True)
+ while proc.returncode == None:
+ proc.poll()
+ data = proc.stdout.readline()
+ fd.write(data)
+ if quiet is False:
+ sys.stdout.write(data)
return proc.returncode
+
def get_machine_type():
"""
Return the machine type
@@ -129,4 +142,3 @@
if _fake_machine is None:
return os.uname()[-1]
return _fake_machine
-