From patchwork Tue Mar 6 11:17:10 2012 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: James Tunnicliffe X-Patchwork-Id: 7111 Return-Path: X-Original-To: patchwork@peony.canonical.com Delivered-To: patchwork@peony.canonical.com Received: from fiordland.canonical.com (fiordland.canonical.com [91.189.94.145]) by peony.canonical.com (Postfix) with ESMTP id ADDF523E77 for ; Tue, 6 Mar 2012 11:17:14 +0000 (UTC) Received: from mail-iy0-f180.google.com (mail-iy0-f180.google.com [209.85.210.180]) by fiordland.canonical.com (Postfix) with ESMTP id E7E4EA182DC for ; Tue, 6 Mar 2012 11:17:13 +0000 (UTC) Received: by iage36 with SMTP id e36so9269838iag.11 for ; Tue, 06 Mar 2012 03:17:13 -0800 (PST) Received: from mr.google.com ([10.42.147.199]) by 10.42.147.199 with SMTP id o7mr15955619icv.50.1331032633469 (num_hops = 1); Tue, 06 Mar 2012 03:17:13 -0800 (PST) Received: by 10.42.147.199 with SMTP id o7mr13132159icv.50.1331032633391; Tue, 06 Mar 2012 03:17:13 -0800 (PST) X-Forwarded-To: linaro-patchwork@canonical.com X-Forwarded-For: patch@linaro.org linaro-patchwork@canonical.com Delivered-To: patches@linaro.org Received: by 10.231.53.18 with SMTP id k18csp54214ibg; Tue, 6 Mar 2012 03:17:11 -0800 (PST) Received: by 10.180.85.35 with SMTP id e3mr17788613wiz.6.1331032631068; Tue, 06 Mar 2012 03:17:11 -0800 (PST) Received: from indium.canonical.com (indium.canonical.com. [91.189.90.7]) by mx.google.com with ESMTPS id p70si13084871weq.114.2012.03.06.03.17.10 (version=TLSv1/SSLv3 cipher=OTHER); Tue, 06 Mar 2012 03:17:11 -0800 (PST) Received-SPF: pass (google.com: best guess record for domain of bounces@canonical.com designates 91.189.90.7 as permitted sender) client-ip=91.189.90.7; Authentication-Results: mx.google.com; spf=pass (google.com: best guess record for domain of bounces@canonical.com designates 91.189.90.7 as permitted sender) smtp.mail=bounces@canonical.com Received: from ackee.canonical.com ([91.189.89.26]) by indium.canonical.com with esmtp (Exim 4.71 #1 (Debian)) id 1S4sOE-0004dY-Cn for ; Tue, 06 Mar 2012 11:17:10 +0000 Received: from ackee.canonical.com (localhost [127.0.0.1]) by ackee.canonical.com (Postfix) with ESMTP id 53934E032C for ; Tue, 6 Mar 2012 11:17:10 +0000 (UTC) MIME-Version: 1.0 X-Launchpad-Project: linaro-image-tools X-Launchpad-Branch: ~linaro-image-tools/linaro-image-tools/trunk X-Launchpad-Message-Rationale: Subscriber X-Launchpad-Branch-Revision-Number: 498 X-Launchpad-Notification-Type: branch-revision To: Linaro Patch Tracker From: noreply@launchpad.net Subject: [Branch ~linaro-image-tools/linaro-image-tools/trunk] Rev 498: Removing linaro-fetch-image now that it is a separate project Message-Id: <20120306111710.5441.71693.launchpad@ackee.canonical.com> Date: Tue, 06 Mar 2012 11:17:10 -0000 Reply-To: noreply@launchpad.net Sender: bounces@canonical.com Errors-To: bounces@canonical.com Precedence: bulk X-Generated-By: Launchpad (canonical.com); Revision="14900"; Instance="launchpad-lazr.conf" X-Launchpad-Hash: 9c27ff34aee15586a72a20cee05753f2e56375de X-Gm-Message-State: ALoCoQnMVBKDrb8ehDm1+0LBZU8TcJ/s9cgSPqN6QGYg1xiVEuEn1orJ2EojTN4CyDnaGh25dHSt Merge authors: James Tunnicliffe (dooferlad) Related merge proposals: https://code.launchpad.net/~dooferlad/linaro-image-tools/remove-fetch-image/+merge/95563 proposed by: James Tunnicliffe (dooferlad) review: Approve - Paul Sokolovsky (pfalcon) ------------------------------------------------------------ revno: 498 [merge] committer: James Tunnicliffe branch nick: linaro-image-tools timestamp: Tue 2012-03-06 11:12:55 +0000 message: Removing linaro-fetch-image now that it is a separate project removed: linaro-fetch-image linaro-fetch-image-ui linaro-image-indexer linaro_image_tools/fetch_image.py linaro_image_tools/fetch_image_settings.yaml linaro_image_tools/tests/test_fetch_image.py linaro_image_tools/tests/test_server_index.sqlite modified: linaro_image_tools/tests/__init__.py --- lp:linaro-image-tools https://code.launchpad.net/~linaro-image-tools/linaro-image-tools/trunk You are subscribed to branch lp:linaro-image-tools. To unsubscribe from this branch go to https://code.launchpad.net/~linaro-image-tools/linaro-image-tools/trunk/+edit-subscription === removed file 'linaro-fetch-image' --- linaro-fetch-image 2011-08-12 15:50:12 +0000 +++ linaro-fetch-image 1970-01-01 00:00:00 +0000 @@ -1,79 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) 2010, 2011 Linaro -# -# Author: James Tunnicliffe -# -# This file is part of Linaro Image Tools. -# -# Linaro Image Tools is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# Linaro Image Tools is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Linaro Image Tools; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, -# USA. - -import sys -import os -import linaro_image_tools.fetch_image as fetch_image -import logging - - -def main(): - file_handler = fetch_image.FileHandler() - config = fetch_image.FetchImageConfig() - - # Unfortunately we need to do a bit of a hack here and look for some - # options before performing a full options parse. - clean_cache = ("--clean-cache" in sys.argv[1:] - or "-x" in sys.argv[1:]) - - force_download = ("--force-download" in sys.argv[1:] - or "-d" in sys.argv[1:]) - - if clean_cache: - file_handler.clean_cache() - - # If the settings file and server index need updating, grab them - file_handler.update_files_from_server(force_download) - - # Load settings YAML, which defines the parameters we ask for and - # acceptable responses from the user - config.read_config(file_handler.settings_file) - - # Using the settings that the YAML defines as what we need for a build, - # generate a command line parser and parse the command line - config.parse_args(sys.argv[1:]) - - if config.args['platform'] == "snapshot": - config.args['release_or_snapshot'] = "snapshot" - else: - config.args['release_or_snapshot'] = "release" - - # Using the config we have, look up URLs to download data from in the - # server index - db = fetch_image.DB(file_handler.index_file) - - image_url, hwpack_url = db.get_image_and_hwpack_urls(config.args) - - if(image_url and hwpack_url): - - tools_dir = os.path.dirname(__file__) - if tools_dir == '': - tools_dir = None - - file_handler.create_media(image_url, hwpack_url, - config.args, tools_dir) - else: - logging.error( - "Unable to find files that match the parameters specified") - -if __name__ == '__main__': - main() === removed file 'linaro-fetch-image-ui' --- linaro-fetch-image-ui 2011-12-13 14:13:26 +0000 +++ linaro-fetch-image-ui 1970-01-01 00:00:00 +0000 @@ -1,1760 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) 2010, 2011 Linaro -# -# Author: James Tunnicliffe -# -# This file is part of Linaro Image Tools. -# -# Linaro Image Tools is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# Linaro Image Tools is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Linaro Image Tools; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, -# USA. - -import wx -import wx.html -import wx.wizard -import wx.wizard as wiz -import sys -import re -import os -import linaro_image_tools.fetch_image as fetch_image -import string -import operator -import Queue -import time -import datetime -from linaro_image_tools.fetch_image import (QEMU, HARDWARE) -from linaro_image_tools.media_create.check_device import ( - _get_system_bus_and_udisks_iface, - _get_dbus_property, - ) -import dbus - -def add_button(bind_to, - sizer, - label, - style, - select_event, - hover_event, - unhover_event): - - """Create a radio button with event bindings.""" - if(style != None): - radio_button = wx.RadioButton(bind_to, label=label, style=style) - else: - radio_button = wx.RadioButton(bind_to, label=label) - - sizer.Add(radio_button, 0, wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP, 5) - bind_to.Bind(wx.EVT_RADIOBUTTON, select_event, radio_button) - wx.EVT_ENTER_WINDOW(radio_button, hover_event) - wx.EVT_LEAVE_WINDOW(radio_button, unhover_event) - - return radio_button - - -class ReleaseOrSnapshotPage(wiz.PyWizardPage): - """Ask the user if they want to use a release or a snapshot""" - - def __init__(self, parent, config, db, pages, width): - wiz.PyWizardPage.__init__(self, parent) - self.config = config - self.settings = self.config.settings - self.sizer = wx.BoxSizer(wx.VERTICAL) - self.next = None - self.prev = None - self.db = db - self.wizard = parent - self.pages = pages - self.width = width - self.settings['image'] = None - self.os_selected = True - self.width = width - - message = ("Would you like to use a Linaro release, or a more up to " - "date, but possibly unstable build?") - header = wx.StaticText(self, -1, message) - header.Wrap(width - 10) # -10 because boarder below is 5 pixels wide - - self.box1 = wx.BoxSizer(wx.VERTICAL) - - self.button_text = {'release': "I would like to run stable, " - "tested software.", - 'snapshot': "I would like to run untested, but " - "more up-to-date software."} - - self.rel_btn = add_button(self, self.box1, self.button_text['release'], - wx.RB_GROUP, self.event_radio_button_select, - None, None) - - # Save the setting for the default selected value - self.settings['release_or_snapshot'] = "release" - - self.snap_btn = add_button(self, self.box1, - self.button_text['snapshot'], None, - self.event_radio_button_select, None, None) - - self.cp = wx.CollapsiblePane(self, label="Advanced Options", - style=wx.CP_DEFAULT_STYLE | - wx.CP_NO_TLW_RESIZE) - self.Bind(wx.EVT_COLLAPSIBLEPANE_CHANGED, self.event_on_pane_changed, - self.cp) - self.make_pane_content(self.cp.GetPane()) - self.box2 = wx.BoxSizer(wx.VERTICAL) - self.box2.Add(self.cp, 0) - - self.help_text_main = wx.StaticText(self, -1, "") - - self.sizer.Add(header) - self.sizer.Add(self.box1, 0, wx.ALIGN_LEFT | wx.ALL, 5) - self.sizer.Add(self.box2, 0, wx.ALIGN_LEFT | wx.ALL, 5) - self.sizer.Add(self.help_text_main, 0, wx.ALIGN_LEFT | wx.ALL, 5) - self.cp.SetSizer(self.sizer) - self.SetSizerAndFit(self.sizer) - self.sizer.Fit(self) - self.Move((50, 50)) - - def make_pane_content(self, pane): - self.adv_box = wx.BoxSizer(wx.VERTICAL) - - message = ("You have the option of selecting from several OS images " - "and builds.") - header = wx.StaticText(pane, -1, message) - header.Wrap(self.width - 10) # -10 because boarder below is 5 px wide - self.adv_box.Add(header) - self.grid1 = wx.FlexGridSizer(0, 2, 0, 0) - self.grid2 = wx.FlexGridSizer(0, 2, 0, 0) - self.grid3 = wx.FlexGridSizer(0, 1, 0, 0) - - platforms = [] - for key, value in self.settings['choice']['platform'].items(): - platforms.append(key) - - style = wx.CB_DROPDOWN | wx.CB_READONLY - self.settings['platform'] = None - - self.cb_release = wx.ComboBox(pane, style=style) - self.Bind(wx.EVT_COMBOBOX, self.event_cb_release, self.cb_release) - - self.cb_build = wx.ComboBox(pane, style=style) - self.Bind(wx.EVT_COMBOBOX, self.event_combo_box_build, self.cb_build) - - self.cb_image = wx.ComboBox(pane, style=style) - self.Bind(wx.EVT_COMBOBOX, self.event_combo_box_os, self.cb_image) - - self.help_text = wx.StaticText(pane, -1, "") - - alignment = wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP - align_text = alignment | wx.ALIGN_CENTER_VERTICAL - self.grid1.Add(self.cb_image, 0, alignment, 5) - self.grid1.Add(wx.StaticText(pane, -1, "The OS to run"), - 0, align_text, 5) - - self.grid2.Add(self.cb_release, 0, alignment, 5) - self.grid2.Add(wx.StaticText(pane, -1, "Available Linaro releases"), - 0, align_text, 5) - - self.grid2.Add(self.cb_build, 0, alignment, 5) - self.grid2.Add(wx.StaticText(pane, -1, "Available milestones"), - 0, align_text, 5) - - self.adv_box.Add(self.grid1, 0, alignment, 0) - self.adv_box.Add(self.grid2, 0, alignment, 0) - self.adv_box.Add(self.grid3, 0, alignment, 0) - self.update_visibility() - self.adv_box.Add(self.help_text, 0, alignment, 5) - - pane.SetSizer(self.adv_box) - self.adv_box.Fit(pane) - - # The following line is here because when the pane is shown and grid2 - # is not visible and an option is then selected to make it visible, - # the pane does not (and seemingly can not) resize to accommodate - # the contents of grid2. This results in some content staying - # invisible until the pane is hidden and shown again. This blank bit - # of text is just there to bulk up the size of the pane so work around - # this. Padding is the size of grid2 / 2 (padding is applied all around - # the StaticText). - - self.grid3.Add(wx.StaticText(pane, -1, ""), 0, align_text, - self.grid2.GetSize().height/2) - self.adv_box.Fit(pane) - - def update_visibility(self): - if self.settings['release_or_snapshot'] == "snapshot": - self.adv_box.Hide(self.grid2, True) - else: - self.adv_box.Show(self.grid2, True) - - def update_next_active(self): - if self.build_available and self.os_selected: - self.wizard.FindWindowById(wx.ID_FORWARD).Enable() - else: - self.wizard.FindWindowById(wx.ID_FORWARD).Disable() - - def update_build_box(self): - """Depending on what hardware has been chosen, the OS list may be - restricted. Filter out anything that is unavailable.""" - self.cb_build.Clear() - - if self.settings['release_or_snapshot'] == "snapshot": - # This combo box is not used by snapshot builds - self.cb_build.SetValue("Not applicable") - self.build_available = True - return - - builds = self.db.get_builds(self.settings['platform']) - self.cb_build.SetValue("No build available") - - for build in builds: - if(self.db.hardware_is_available_for_platform_build( - self.settings['compatable_hwpacks'], - self.settings['platform'], - build) - and self.db.build_is_available_for_platform_image( - "release_binaries", - self.settings['platform'], - self.settings['image'], - build)): - - self.cb_build.Append(build) - self.cb_build.SetValue(build) - self.settings['release_build'] = build - - available_hwpacks = ( - self.db.get_available_hwpacks_for_hardware_build_plaform( - self.settings['compatable_hwpacks'], - self.settings['platform'], - self.settings['release_build'])) - - if len(available_hwpacks): - self.settings['hwpack'] = available_hwpacks[0] - self.build_available = True - else: - self.build_available = False - - self.update_next_active() - - def update_release_and_build_boxes(self): - """Depending on what hardware has been chosen, some builds may be - unavailable...""" - self.cb_release.Clear() - - if self.settings['release_or_snapshot'] == "snapshot": - # This combo box is not used by snapshot builds - self.cb_release.SetValue("Not applicable") - return - - if not self.os_selected: - self.cb_release.SetValue("-") - self.update_build_box() - self.update_next_active() - return - - default_release = None - for platform, value in self.settings['choice']['platform'].items(): - if(self.db.hardware_is_available_for_platform( - self.settings['compatable_hwpacks'], - platform) - and len(self.db.execute_return_list( - 'select * from release_binaries ' - 'where platform == ? and image == ?', - (platform, self.settings['image'])))): - - if platform in self.settings['UI']['translate']: - platform = self.settings['UI']['translate'][platform] - - self.cb_release.Append(platform, platform.upper()) - if not default_release or default_release < platform: - default_release = platform - - if default_release in self.settings['UI']['reverse-translate']: - default = self.settings['UI']['reverse-translate'][default_release] - else: - default = default_release - - if not self.settings['platform']: - # No platform has been chose, go with the default - self.settings['platform'] = default - self.cb_release.SetValue(default_release) - else: - # Don't change the value of platform if it is set. - pf = self.settings['UI']['translate'][self.settings['platform']] - self.cb_release.SetValue(pf) - - self.update_build_box() - self.update_next_active() - - def get_human_os_name(self, item): - """Given an OS name from the database, return a human name (either - translated from the YAML settings, or just prettified) and if it is a - LEB OS or not""" - - item = re.sub("linaro-", "", item) # Remove any linaro- decoration - - if item in self.settings['UI']['descriptions']: - human_name = self.settings['UI']['descriptions'][item] - else: - # Make human_name look nicer... - human_name = string.capwords(item) - - leb_search = re.search("^LEB:\s*(.*)$", human_name) - - if leb_search: - return leb_search.group(1), True - - return human_name, False - - def fill_os_list(self): - """Filter the list of OS's from the config file based on the users - preferences so all choices in the list are valid (i.e. their hardware - is supported for the build they have chosen).""" - - # select unique image from snapshot_binaries/release_binaries to - # generate list - os_list = None - if self.settings['release_or_snapshot'] == "release": - os_list = self.db.get_os_list_from('release_binaries') - else: - os_list = self.db.get_os_list_from('snapshot_binaries') - - self.cb_image.Clear() - - printed_tag = None - last_name = None - current_image_setting_valid = False - - for state in ["LEB", "other"]: - for item in os_list: - if item == "old": - # Old is a directory that sometimes hangs around, - # but isn't one we want to display - continue - - # Save the original, untouched image name for use later. - # We give it a more human name for display - original = item - item = re.sub("linaro-", "", item) - - os_hardware_combo_available = ( - self.db.image_hardware_combo_available( - self.settings['release_or_snapshot'], - original, - self.settings['compatable_hwpacks'])) - - if os_hardware_combo_available: - human_name, is_LEB = self.get_human_os_name(item) - - if item == self.settings['image']: - current_image_setting_valid = True - - if state == "LEB" and is_LEB: - - if printed_tag != state: - self.cb_image.Append( - "- Linaro Supported Releases -") - printed_tag = state - - self.cb_image.Append(human_name, original) - - if self.settings['image'] == None: - self.settings['image'] = original - self.os_selected = True - - elif state != "LEB" and not is_LEB: - if printed_tag != state: - self.cb_image.Append( - "- Community Supported Releases -") - printed_tag = state - - self.cb_image.Append(human_name, original) - - last_name = original - - if( self.settings['image'] != None - and current_image_setting_valid == False): - # If we have an image setting, but it doesn't match the OS list, we - # have switched OS list. It may be that adding/removing "linaro-" - # from the name will get a match. - - if re.search("linaro-", self.settings['image']): - test_name = re.sub("linaro-", "", self.settings['image']) - else: - test_name = "linaro-" + self.settings['image'] - - if test_name in os_list: - # Success! We have translated the name and can retain the - # "old setting" - self.settings['image'] = test_name - current_image_setting_valid = True - self.os_selected = True - - if( self.settings['image'] == None - or current_image_setting_valid == False): - # This should only get hit if there are no LEBs available - self.settings['image'] = last_name - self.os_selected = True - - assert self.settings['image'] - - # Make sure the visible selected value matches the saved setting - self.cb_image.SetValue( - self.get_human_os_name(self.settings['image'])[0]) - - def force_rel_snap_if_hw_requires(self): - """ - If a hardware pack is only available on a release or snapshot build - then force self.settings['release_or_snapshot'] and grey out the - other radio button. Also display a helpful message. - """ - - snapshot_ok = False - release_ok = False - - for hwpack in self.settings['compatable_hwpacks']: - if self.db.hardware_is_available_in_table("snapshot_hwpacks", - hwpack): - snapshot_ok = True - if self.db.hardware_is_available_in_table("release_hwpacks", - hwpack): - release_ok = True - - assert release_ok or snapshot_ok, ("release or snapshot should have" - "a hardware pack available") - - self.rel_btn.Enable() - self.snap_btn.Enable() - message = "" - - if not release_ok: - self.settings['release_or_snapshot'] = "snapshot" - self.rel_btn.Disable() - self.rel_btn.SetValue(False) - self.snap_btn.SetValue(True) - message = ("The hardware you have chosen is only available for " - "snapshot builds, so release builds have been disabled") - - if not snapshot_ok: - self.settings['release_or_snapshot'] = "release" - self.snap_btn.Disable() - self.snap_btn.SetValue(False) - self.rel_btn.SetValue(True) - message = ("The hardware you have chosen is only available for " - "release builds, so snapshot builds have been disabled") - - self.help_text_main.SetLabel(message) - self.help_text_main.Wrap(self.width - 10) - - def GetNext(self): - if self.settings['release_or_snapshot'] == "release": - return self.pages['lmc_settings'] - else: - return self.pages['select_snapshot'] - - def GetPrev(self): - return self.pages['hardware_details'] - - #--- Event(s) --- - def event_radio_button_select(self, event): - # The radio button can be release or snapshot - self.radio_selected = event.GetEventObject().GetLabel() - - if self.radio_selected == self.button_text['release']: - self.settings['release_or_snapshot'] = "release" - else: - self.settings['release_or_snapshot'] = "snapshot" - - self.event_on_pane_changed(event) - - def event_on_pane_changed(self, event): - self.fill_os_list() - self.update_release_and_build_boxes() - self.update_visibility() - self.Layout() - self.update_next_active() - - def event_cb_release(self, evt): - str = evt.GetString().encode('ascii').lower() - if str in self.settings['UI']['reverse-translate']: - str = self.settings['UI']['reverse-translate'][str] - self.settings['platform'] = str - - self.update_build_box() - - def event_combo_box_build(self, evt): - self.settings['release_build'] = evt.GetString().encode('ascii') - - def event_combo_box_os(self, evt): - self.settings['image'] = self.cb_image.GetClientData( - evt.GetSelection()) - - if self.settings['image']: # Is None for items that aren't an OS - self.os_selected = True - image = re.sub("linaro-", "", self.settings['image']) - - if image + "::long" in self.settings['UI']['descriptions']: - self.help_text.SetLabel(self.settings['UI'] - ['descriptions'] - [image + "::long"]) - else: - self.help_text.SetLabel("") - - else: # Have selected help text - self.os_selected = False - self.help_text.SetLabel("Please select an operating system to run " - "on your chosen hardware.") - - self.help_text.Wrap(self.width - 10) - self.update_release_and_build_boxes() - self.update_next_active() - #--- END event(s) --- - - -class AboutMyHardwarePage(wiz.PyWizardPage): - """Ask the user about their hardware. This only asks about the board, not - any specific hardware packs because there can be multiple names for the - same hardware pack or sometimes a hardware pack is only available in the - releases or snapshots repository. We whittle down the choice as we go - and the user can chose a hardare pack (if they don't like the default) - under advanced options in the Linaro Media Create options - page""" - - def __init__(self, parent, config, db): - wiz.PyWizardPage.__init__(self, parent) - self.settings = config.settings - self.db = db - self.sizer = wx.BoxSizer(wx.VERTICAL) - self.box1 = wx.BoxSizer(wx.VERTICAL) - self.box2 = wx.BoxSizer(wx.VERTICAL) - self.next = None - - message = """\ -This Wizard will write an operating system of your choosing to either a disk -image or to an MMC card. First we need to know what hardware you have.""" - header = wx.StaticText(self, label=message) - - #--- Hardware Combo Box --- - # Make sure that the displayed release is the one set in settings if - # no selection is made - if "panda" in self.settings['choice']['hardware'].keys(): - default_hardware = "panda" - else: - default_hardware = self.settings['choice']['hardware'].keys()[-1] - - self.settings['hardware'] = default_hardware - self.settings['compatable_hwpacks'] = ( - self.settings['choice']['hwpack'][self.settings['hardware']]) - - self.cb_hardware = wx.ComboBox(self, - value=self.settings['choice'] - ['hardware'] - [default_hardware], - style=wx.CB_DROPDOWN | wx.CB_READONLY) - - self.Bind(wx.EVT_COMBOBOX, - self.event_combo_box_hardware, - self.cb_hardware) - - file_dev_grid = wx.FlexGridSizer(0, 1, 0, 0) - line_1_grid = wx.FlexGridSizer(0, 2, 0, 0) - self.box2.Add(file_dev_grid, 0, wx.EXPAND) - - # self.settings['write_to_file_or_device'] should match the first - # button below... - self.button_text = {'hardware': "I have a", - 'sim': "I want to run on a hardware simulation."} - - self.settings['hw_or_qemu'] = HARDWARE - add_button(self, - line_1_grid, - self.button_text['hardware'], - wx.RB_GROUP, - self.event_radio_button_select, - None, None) - - line_1_grid.Add(self.cb_hardware) - file_dev_grid.Add(line_1_grid) - - add_button(self, - file_dev_grid, - self.button_text['sim'], - None, - self.event_radio_button_select, - None, None) - - self.sizer.Add(header) - self.sizer.Add(self.box1, 0, wx.ALIGN_LEFT | wx.ALL, 5) - self.sizer.Add(self.box2, 0, wx.ALIGN_LEFT | wx.ALL, 5) - self.SetSizerAndFit(self.sizer) - self.sizer.Fit(self) - self.Move((50, 50)) - - def on_page_changing(self): - self.update_hardware_box() - - def update_hardware_box(self): - self.cb_hardware.Clear() - - sorted_hardware_names = sorted(self.settings['choice']['hardware'] - .iteritems(), - key=operator.itemgetter(1)) - - for device_name, human_readable_name in sorted_hardware_names: - for hwpack in self.settings['choice']['hwpack'][device_name]: - if(self.db.hardware_is_available_in_table("snapshot_hwpacks", - hwpack) - or self.db.hardware_is_available_in_table("release_hwpacks", - hwpack)): - self.cb_hardware.Append(human_readable_name, device_name) - break - - #--- Event(s) --- - def event_combo_box_hardware(self, event): - self.settings['hardware'] = (event - .GetEventObject() - .GetClientData(event.GetSelection()) - .encode('ascii')) - - self.settings['compatable_hwpacks'] = ( - self.settings['choice']['hwpack'][self.settings['hardware']]) - def event_radio_button_select(self, event): - val = event.GetEventObject().GetLabel() - if val == self.button_text['sim']: - self.settings['hw_or_qemu'] = QEMU - assert "beagle" in self.settings['choice']['hardware'].keys() - self.settings['hardware'] = "beagle" - self.settings['compatable_hwpacks'] = ( - self.settings['choice']['hwpack'][self.settings['hardware']]) - - elif val == self.button_text['hardware']: - self.settings['hw_or_qemu'] = HARDWARE - #--- END event(s) --- - - def SetNext(self, next): - self.next = next - - def GetNext(self): - return self.next - -class SelectSnapshot(wiz.WizardPageSimple): - """Present the user with a calendar widget and a list of builds available - on the selected date so they can chose a snapshot. Filter out days when - their chosen hardware does not have an available build.""" - - def __init__(self, parent, config, db, width): - wiz.WizardPageSimple.__init__(self, parent) - self.settings = config.settings - self.db = db - self.wizard = parent - self.width = width - self.sizer = wx.BoxSizer(wx.VERTICAL) - - header = wx.StaticText(self, - label="Builds are created most days. First " - "please select the day on which the " - "build you would like to use was built," - " then, if there was more than one " - "build that day you will be able to " - "select the build number.") - header.Wrap(width - 10) # -10 because boarder below is 5 pixels wide - - box1 = wx.BoxSizer(wx.VERTICAL) - self.sizer.Add(header) - - # Set today as the default build date in settings - # (matches the date picker) - self.today = wx.DateTime() - self.today.SetToCurrent() - self.settings['build_date'] = self.today.FormatISODate().encode('ascii') - - dpc = wx.DatePickerCtrl(self, size=(120, -1), - style=wx.DP_DEFAULT) - self.Bind(wx.EVT_DATE_CHANGED, self.on_date_changed, dpc) - - #--- Build number Combo Box --- - # Make sure that the displayed build is the one set in settings if no - # selection is made - self.settings['build_number'] = 0 - self.update_build() - self.cb_build = wx.ComboBox(self, - style=wx.CB_DROPDOWN | wx.CB_READONLY) - self.Bind(wx.EVT_COMBOBOX, self.event_combo_box_build, self.cb_build) - - #--- Layout --- - # -- Combo boxes for hardware and image selection -- - - grid2 = wx.FlexGridSizer(0, 2, 0, 0) - grid2.Add(dpc, 0, wx.ALIGN_LEFT | wx.ALL, 5) - grid2.Add(self.cb_build, 0, wx.ALIGN_LEFT | wx.ALL, 5) - - box1.Add(grid2, 0, wx.ALIGN_LEFT | wx.ALL, 5) - - self.sizer.Add(box1, 0, wx.ALIGN_LEFT | wx.ALL, 5) - - self.help_text = wx.StaticText(self) - self.sizer.Add(self.help_text, 1, wx.EXPAND, 5) - - self.SetSizer(self.sizer) - self.sizer.Fit(self) - self.Move((50, 50)) - - def update_platform(self): - build_and_date = self.settings['snapshot_build'].split(":") - - if len(build_and_date) == 2: - self.settings['platform'] = ( - self.db.execute_return_list( - "select platform from snapshot_binaries " - "where date == ? and build == ?", - (build_and_date[0], build_and_date[1]))) - - if len(self.settings['platform']) > 0: - self.settings['platform'] = self.settings['platform'][0][0] - - def update_build(self): - small_date = re.sub('-', '', self.settings['build_date']) - self.settings['snapshot_build'] = (small_date - + ":" - + str(self.settings['build_number'])) - - def fill_build_combo_box_for_date(self, date): - """Every time a date is chosen, this function should be called. It will - check to see if a compatible build is available. If there isn't, it - will search for one and provide some help text to tell the user when - compatable builds were built.""" - # Re-populate the build combo box - - self.cb_build.Clear() - - builds = self.db.get_binary_builds_on_day_from_db( - self.settings['image'], - date, - self.settings['compatable_hwpacks']) - - if len(builds): - max = 0 - for item in builds: - #Always get a tuple, only interested in the first entry - item = item[0] - self.cb_build.Append(item, item.upper()) - - if item > max: - max = item - - self.cb_build.SetValue(max) - self.wizard.FindWindowById(wx.ID_FORWARD).Enable() - self.help_text.SetLabel("") - - else: - self.cb_build.SetValue("No builds available") - future_date, past_date = self.db.get_next_prev_day_with_builds( - self.settings['image'], - date, - self.settings['compatable_hwpacks']) - - help_text = None - - if future_date and past_date: - help_text = ("There are no builds that match your " - "specifications available on the selected date. " - "The previous build was on " + past_date + - " and the next build was on " + future_date + ".") - elif future_date: - help_text = ("There are no builds that match your " - "specifications available on the selected date. " - "The next build was on " + future_date + - " and I couldn't find a past build (looked one " - "year back from the selected date).") - elif past_date: - help_text = ("There are no builds that match your " - "specifications available on the selected date. " - "The previous build was on " + past_date) - if date != self.today.FormatISODate().encode('ascii'): - help_text += (" and I couldn't find a future build (I " - "looked up to one year forward from the " - "selected date).") - else: - help_text = ("I could not find any builds that match your " - "specifications close to the selected date (I " - "looked forward and back one year from the " - "selected date).") - - self.help_text.SetLabel(help_text) - self.help_text.Wrap(self.width - 10) - self.wizard.FindWindowById(wx.ID_FORWARD).Disable() - - #--- Event(s) --- - def on_date_changed(self, evt): - self.settings['build_date'] = evt.GetDate().FormatISODate().encode('ascii') - self.fill_build_combo_box_for_date(self.settings['build_date']) - self.update_build() - - def event_combo_box_build(self, evt): - self.settings['build_number'] = evt.GetString().encode('ascii').lower() - self.update_build() - #--- END event(s) --- - - -class LMC_settings(wiz.WizardPageSimple): - """Present the user with, intially, the choice of writing the file system - they are going to have created to a file, or directly to a device. Ask - which file/device to write to. - - If writing to a device, the user is asked to tick a box saying that they - understand that the device they have chosen will be erased. - - If the user ticks the advanced box, more options are shown.""" - - def __init__(self, parent, config, db, width): - wiz.WizardPageSimple.__init__(self, parent) - self.settings = config.settings - self.wizard = parent - self.sizer = wx.BoxSizer(wx.VERTICAL) - self.yes_use_mmc = False - self.db = db - - self.settings['path_selected'] = "" - - header = wx.StaticText(self, - label="Media Creation Settings\n\n" - "Please select if you would like to write the " - "file system I am about to create to a memory " - "card, or to a file on the local file system.") - header.Wrap(width - 10) # -10 because boarder below is 5 pixels wide - - #--- Layout --- - self.sizer.Add(header, 0, wx.ALIGN_LEFT | wx.ALL, 5) - box1 = wx.BoxSizer(wx.VERTICAL) - file_dev_grid = wx.FlexGridSizer(0, 2, 0, 0) - box1.Add(file_dev_grid, 0, wx.EXPAND) - - # self.settings['write_to_file_or_device'] should match the first - # button below... - self.settings['write_to_file_or_device'] = "file" - self.file_button = add_button( - self, - file_dev_grid, - "Write to file", - wx.RB_GROUP, - self.event_radio_button_select, - None, None) - - self.device_button = add_button( - self, - file_dev_grid, - "Write to device", - None, - self.event_radio_button_select, - None, None) - - self.help_text_values = {"device": "Please select a device to write " - "the file system to:", - - "file": "Please select a file to write the " - "file system to:"} - - self.help_text = wx.StaticText( - self, - label=self.help_text_values[ - self.settings['write_to_file_or_device']]) - self.help_text.Wrap(width - 10) - - #-- File/dev picker -- - file_browse_button = wx.Button(self, -1, "Browse") - file_browse_grid = wx.FlexGridSizer(0, 2, 0, 0) - self.file_path_and_name = wx.TextCtrl(self, -1, "", size=(300, -1)) - - file_browse_grid.Add(self.file_path_and_name, 0, wx.EXPAND) - file_browse_grid.Add(file_browse_button, 0, wx.EXPAND) - - self.Bind(wx.EVT_BUTTON, - self.event_open_file_control, - file_browse_button) - - self.Bind(wx.EVT_TEXT, - self.event_file_path_and_name, - self.file_path_and_name) - - box1.Add(self.help_text, 0, wx.ALIGN_LEFT | wx.ALL, 5) - - box1.Add(file_browse_grid, 0, wx.EXPAND) - - # Advanced settings collapsible pane - self.cp = wx.CollapsiblePane(self, label="Advanced Options", - style=wx.CP_DEFAULT_STYLE | - wx.CP_NO_TLW_RESIZE) - self.Bind(wx.EVT_COLLAPSIBLEPANE_CHANGED, self.event_on_pane_changed, - self.cp) - self.make_pane_content(self.cp.GetPane()) - self.box2 = wx.BoxSizer(wx.VERTICAL) - self.box2.Add(self.cp) - - self.sizer.Add(box1, 0, wx.ALIGN_LEFT | wx.ALL, 0) - self.sizer.Add(self.box2, 0, wx.ALIGN_LEFT | wx.ALL, 0) - self.SetSizer(self.sizer) - self.sizer.Fit(self) - self.Move((50, 50)) - - def make_pane_content(self, pane): - self.box = wx.BoxSizer(wx.VERTICAL) - grid1 = wx.FlexGridSizer(0, 2, 0, 0) - - #--- Build some widgets --- - #-- Target file system -- - file_systems = ["ext4", "btrfs", "ext3", "ext2"] - default_target = file_systems[0] - self.settings['rootfs'] = default_target - cb_rootfs = wx.ComboBox(pane, - value=default_target, - style=wx.CB_DROPDOWN | wx.CB_READONLY) - - for item in file_systems: - cb_rootfs.Append(item, item.upper()) - - self.Bind(wx.EVT_COMBOBOX, self.event_combo_box_rootfs, cb_rootfs) - - #-- Image size spinner - self.image_size_spinner = wx.SpinCtrl(pane, -1, "") - self.Bind(wx.EVT_SPINCTRL, - self.event_image_size, - self.image_size_spinner) - - #-- Swap size spinner - self.swap_size_spinner = wx.SpinCtrl(pane, -1, "") - self.Bind(wx.EVT_SPINCTRL, - self.event_swap_size, - self.swap_size_spinner) - - - alignment = wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP - grid1.Add(cb_rootfs, 0, alignment, 5) - - grid1.Add(wx.StaticText(pane, - label="The root file system of the image"), - 0, alignment, 5) - - # We want to sub-devide the cell, to add another grid sizer... - file_size_grid = wx.FlexGridSizer(0, 2, 0, 0) - - grid1.Add(file_size_grid, - 0, - wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP) - - # Add a spinner that allows us to type/click a numerical value (defined above) - file_size_grid.Add(self.image_size_spinner, - 0, - alignment, - 5) - - # Add a choice of MB or GB for size input - units = ["GB", "MB"] - self.size_unit = units[0] # Set the default unit - unit_choice = wx.Choice(pane, -1, (100, 50), choices=units) - self.Bind(wx.EVT_CHOICE, self.event_chose_unit, unit_choice) - file_size_grid.Add(unit_choice, 0, wx.ALIGN_RIGHT | wx.TOP, 5) - - # Back out of the extra grid, add some help text - grid1.Add(wx.StaticText( - pane, - label="Writing to file only: Image file size"), - 0, - alignment, - 5) - - # The swap size (MB only) - grid1.Add(self.swap_size_spinner, - 0, - alignment, - 5) - - grid1.Add(wx.StaticText(pane, label="Swap file size in MB"), - 0, - wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP, - 5) - - self.cb_hwpacks = wx.ComboBox( - pane, - value=self.settings['compatable_hwpacks'][0], - style=wx.CB_DROPDOWN | wx.CB_READONLY) - - self.Bind(wx.EVT_COMBOBOX, - self.event_combo_box_hwpack, - self.cb_hwpacks) - - grid1.Add(self.cb_hwpacks, - 0, - wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP, - 5) - - grid1.Add(wx.StaticText(pane, label="Compatible hardware packs"), - 0, - wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP, - 5) - desc_link = wx.HyperlinkCtrl( - pane, id=-1, url="", - label="Hardware pack descriptions") - self.Bind(wx.EVT_HYPERLINK, self.event_hwpack_link, desc_link) - grid1.Add(desc_link, - 0, - wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP, - 5) - - self.box.Add(grid1, 0, alignment, 0) - pane.SetSizer(self.box) - self.box.Fit(pane) - - def on_activate(self): - self.update_forward_active_and_mmc_confirm_box_visible() - self.set_hwpacks_for_hardware() - self.update_dev_file_buttons() - - def update_dev_file_buttons(self): - if self.settings['hw_or_qemu'] == QEMU: - self.device_button.Disable() - self.device_button.SetValue(False) - self.file_button.SetValue(True) - else: - self.device_button.Enable() - - def set_hwpacks_for_hardware(self): - self.cb_hwpacks.Clear() - - if self.settings['release_or_snapshot'] == "snapshot": - self.settings['build'] = self.settings['snapshot_build'] - - date_and_build = self.settings['build'].split(":") - - compatable_hwpacks = ( - self.db.get_available_hwpacks_for_hardware_snapshot_build( - self.settings['compatable_hwpacks'], - self.settings['platform'], - date_and_build[0], - date_and_build[1])) - else: - self.settings['build'] = self.settings['release_build'] - compatable_hwpacks = ( - self.db.get_available_hwpacks_for_hardware_build_plaform( - self.settings['compatable_hwpacks'], - self.settings['platform'], - self.settings['build'])) - - for hwpack in compatable_hwpacks: - self.cb_hwpacks.Append(hwpack) - - self.cb_hwpacks.SetStringSelection(compatable_hwpacks[0]) - self.settings['hwpack'] = compatable_hwpacks[0] - - def update_forward_active_and_mmc_confirm_box_visible(self): - if( self.settings['path_selected'] - and self.settings['path_selected'] != ""): - self.wizard.FindWindowById(wx.ID_FORWARD).Enable() - else: - self.wizard.FindWindowById(wx.ID_FORWARD).Disable() - - # --- Event Handlers --- - def event_on_pane_changed(self, event): - self.Layout() - - def event_open_file_control(self, event): - if self.settings['write_to_file_or_device'] == "file": - - dlg = wx.FileDialog(self, - message="Save file as ...", - defaultDir=os.getcwd(), - defaultFile="", - style=wx.SAVE) - - elif self.settings['write_to_file_or_device'] == "device": - dlg = DevChoser(self, -1, "Please chose a device", self.settings) - - result = dlg.ShowModal() - file_or_dev = self.settings['write_to_file_or_device'] - - if result == wx.ID_OK or file_or_dev == "device": - # The dev chooser doesn't do ok/cancel, it just finishes. - if self.settings['write_to_file_or_device'] == "file": - self.settings['path_selected'] = dlg.GetPaths()[0] - self.file_path_and_name.SetValue(self.settings['path_selected']) - - dlg.Destroy() - self.update_forward_active_and_mmc_confirm_box_visible() - - def event_file_path_and_name(self, event): - self.settings['path_selected'] = event.GetString() - self.update_forward_active_and_mmc_confirm_box_visible() - - def event_combo_box_hwpack(self, event): - self.settings['hwpack'] = event.GetString().encode('ascii') - - def event_combo_box_rootfs(self, evt): - self.settings['rootfs'] = evt.GetString().encode('ascii').lower() - - def event_hwpack_link(self, event): - hw_desc = self.settings['UI']['hwpack-descriptions'] - body = "

%s

    " % self.settings['hardware'] - for hwpack in self.cb_hwpacks.GetItems(): - desc = '' - if hw_desc.has_key(hwpack): - desc = " - %s" % hw_desc[hwpack] - body += "
  • %s%s
  • " % (hwpack, desc) - body += "
" - HtmlDialog( event.GetEventObject().GetParent(), - "Hardware Pack Descriptions: %s", - body) - - def event_radio_button_select(self, event): - """Search the label of the button that has been selected to work out - what we are writing to.""" - setting_search = re.search( - "write to (\w+)", - event - .GetEventObject() - .GetLabel() - .encode('ascii') - .lower()) - - assert setting_search - - self.settings['write_to_file_or_device'] = setting_search.group(1) - - self.help_text.SetLabel( - self.help_text_values[self.settings['write_to_file_or_device']]) - - self.update_forward_active_and_mmc_confirm_box_visible() - - def event_pick_file_path(self, evt): - self.settings['path_selected'] = os.path.abspath(evt.GetPath()) - self.update_forward_active_and_mmc_confirm_box_visible() - - def update_image_size_setting(self): - if(self.image_size_spinner.GetValue() > 0): - self.settings['image_size'] = (str(self.image_size_spinner - .GetValue()) - + self.size_unit[0]) - else: - self.settings['image_size'] = None - - def event_image_size(self, event): - self.update_image_size_setting() - - def event_chose_unit(self, event): - self.size_unit = event.GetString() - self.update_image_size_setting() - - def event_swap_size(self, event): - self.settings['swap_file'] = str(self.image_size_spinner.GetValue()) - -class HtmlDialog(wx.Dialog): - def __init__(self, parent, title, content): - wx.Dialog.__init__(self, parent, -1, title) - html = wx.html.HtmlWindow(self) - - html.SetPage(content) - self.SetSize(wx.Size(500,450)) - self.ShowModal() - self.Destroy() - -class DevChoser(wx.Dialog): - def __init__(self, parent, id, title, settings): - self.settings = settings - wx.Dialog.__init__(self, parent, id, title) - - vbox = wx.BoxSizer(wx.VERTICAL) - - dev_info = self.get_device_info() - dev_number = 1 - - grid1 = wx.FlexGridSizer(0, len(dev_info[0].keys())+1, 0, 0) - group = wx.RB_GROUP - alignment = wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP - keys = dev_info[0].keys() - - grid1.Add(wx.StaticText(self, label="")) - for key in keys: - grid1.Add(wx.StaticText(self, label=key), 0, alignment, 5) - - self.id_dev_map = {} - first = True - for info in dev_info: - button = add_button(self, grid1, str(dev_number), group, - self.event_radio_button_select, None, None) - group = None - - for key in keys: - message = info[key] - grid1.Add(wx.StaticText(self, label=message), 0, alignment, 5) - - if key == "path": - self.id_dev_map[str(dev_number)] = info[key] - - if self.settings['path_selected'] == info[key]: - button.SetValue(True) - - if first and not self.settings['path_selected']: - self.settings['path_selected'] = info[key] - - dev_number += 1 - - vbox.Add(grid1) - hbox = wx.BoxSizer(wx.HORIZONTAL) - okButton = wx.Button(self, -1, 'Use Selected Device') - okButton.Bind(wx.EVT_BUTTON, self.OnCloseMe) - hbox.Add(okButton, 1) - - vbox.Add(hbox, 1, wx.ALIGN_CENTER | wx.TOP | wx.BOTTOM, 10) - vbox.Fit(self) - self.SetSizer(vbox) - - def event_radio_button_select(self, event): - val = event.GetEventObject().GetLabel() - self.settings['path_selected'] = self.id_dev_map[val] - - def get_device_info(self): - """Get information about devices found on the system. - """ - bus, udisks = _get_system_bus_and_udisks_iface() - devices = udisks.get_dbus_method('EnumerateDevices')() - devices.sort() - dev_info = [] - for path in devices: - device = bus.get_object("org.freedesktop.UDisks", path) - props = ['drive-model', 'drive-vendor', - 'drive-connection-interface', 'drive-media'] - info = {'path': _get_dbus_property('DeviceFile', device, path)} - if(not _get_dbus_property('device-is-partition', device, path) and - _get_dbus_property('device-is-media-available', device, path)): - for prop in props: - info[prop] = str(_get_dbus_property(prop, device, path)) - dev_info.append(info) - - return dev_info - - def OnCloseMe(self, event): - self.Close(True) - - def OnCloseWindow(self, event): - self.Destroy() - -class RunLMC(wiz.WizardPageSimple): - """Present the user with some information about their choices and a button - to start linaro-media-create. The linaro-media-create process is started in - a new thread and important events are communicated back to the UI through a - queue.""" - - def __init__(self, parent, config, db, width): - wiz.WizardPageSimple.__init__(self, parent) - self.settings = config.settings - self.sizer = wx.BoxSizer(wx.VERTICAL) - self.db = db - self.width = width - self.wizard = parent - - header = wx.StaticText(self, label="""Installing...""") - header.Wrap(width - 10) # -10 because boarder below is 5 pixels wide - - self.sizer.Add(header) - self.box1 = wx.BoxSizer(wx.VERTICAL) - - # We expect to print 4 lines of information, reserve space using blank - # lines. - self.settings_summary_text = wx.StaticText(self, label="\n\n\n\n") - self.settings_summary_text.Wrap(width - 10) - - self.box1.Add(self.settings_summary_text, 0, wx.ALIGN_LEFT | wx.ALL, 5) - - self.start_button = wx.Button(self, 10, "Start", (20, 20)) - self.Bind(wx.EVT_BUTTON, self.start_lmc, self.start_button) - - self.start_button.SetToolTipString("Start creating an image, using the" - "above settings.") - - self.start_button.SetSize(self.start_button.GetBestSize()) - self.box1.Add(self.start_button, 0, wx.ALIGN_LEFT | wx.ALL, 5) - - self.download_guage = wx.Gauge(self, - -1, - 1000, - size=(self.width * 2 / 3, 25)) - - self.status_grid = wx.FlexGridSizer(0, 2) - - self.status_grid.Add(wx.StaticText(self, label="Downloading files"), - 0, - wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP, - 5) - - self.downloading_files_status = wx.StaticText(self, label="") - - self.status_grid.Add(self.downloading_files_status, - 0, - wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP, - 5) - - self.status_grid.Add(self.download_guage, - 0, - wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP, - 5) - - self.downloading_files_info = wx.StaticText(self, label="") - - self.status_grid.Add(self.downloading_files_info, - 0, - wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP, - 5) - - self.status_grid.Add(wx.StaticText(self, label="Unpacking downloads"), - 0, - wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP, - 5) - - self.unpacking_files_status = wx.StaticText(self, label="") - - self.status_grid.Add(self.unpacking_files_status, - 0, - wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP, - 5) - - self.status_grid.Add(wx.StaticText(self, label="Installing packages"), - 0, - wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP, - 5) - - self.installing_packages_status = wx.StaticText(self, label="") - - self.status_grid.Add(self.installing_packages_status, - 0, - wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP, - 5) - - self.status_grid.Add(wx.StaticText(self, label="Create file system"), - 0, - wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP, - 5) - - self.create_file_system_status = wx.StaticText(self, label="") - - self.status_grid.Add(self.create_file_system_status, - 0, - wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP, - 5) - - self.status_grid.Add(wx.StaticText(self, label="Populate file system"), - 0, - wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP, - 5) - - self.populate_file_system_status = wx.StaticText(self, label="") - - self.status_grid.Add(self.populate_file_system_status, - 0, - wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP, - 5) - - self.box2 = wx.BoxSizer(wx.VERTICAL) - self.blank = wx.StaticText(self, label="") # Just like a spacer GIF... - self.messages = wx.StaticText(self, label="") - self.box2.Add(self.blank, 0, wx.ALIGN_LEFT | wx.ALL, 5) - self.box2.Add(self.messages, 0, wx.ALIGN_LEFT | wx.ALL, 5) - - self.sizer.Add(self.box1, 0, wx.ALIGN_LEFT | wx.ALL, 5) - self.sizer.Add(self.status_grid, 0, wx.ALIGN_LEFT | wx.ALL, 5) - self.sizer.Add(self.box2, 0, wx.ALIGN_LEFT | wx.ALL, 5) - self.SetSizerAndFit(self.sizer) - self.sizer.Fit(self) - self.Move((50, 50)) - - def on_activate(self): - """Called just before the page is displayed to update the text based on - the users preferences.""" - - # The build is stored in different forms depending on if we are using a - # release or snapshot but from here on in it is a common value - if self.settings['release_or_snapshot'] == "snapshot": - self.settings['build'] = self.settings['snapshot_build'] - else: - self.settings['build'] = self.settings['release_build'] - - settings_summary = ("Press start to create an image with the " - "following settings:\n") - settings_summary += "Operating System: " + self.settings['image'] + "\n" - settings_summary += "Hardware: " + self.settings['hardware'] + "\n" - - # Assumption is that a file may be in a long path, we don't know how - # big the font is and we don't want to allow the path to run off the - # end of the line, so if a file is chosen, just show the file name. - # Devices are (probably) /dev/some_short_name and the user really needs - # to check them, so we show the whole thing. - path = self.settings['path_selected'] - if self.settings['write_to_file_or_device'] == "file": - path = self.settings['path_selected'].split(os.sep)[-1] - - settings_summary += ( "Writing image to " - + self.settings['write_to_file_or_device'] - + " " - + path) - - self.settings_summary_text.SetLabel(settings_summary) - self.settings_summary_text.Wrap(self.width - 10) - - def start_lmc(self, event): - """Start a thread that runs linaro-media-create and a timer, which - checks for UI updates every 100ms""" - - if self.settings['write_to_file_or_device'] == "file": - self.settings['image_file'] = self.settings['path_selected'] - elif self.settings['write_to_file_or_device'] == "device": - self.settings['mmc'] = self.settings['path_selected'] - - title = 'Are you sure?' - text = "Completely erase {0}?".format(self.settings['mmc']) - dlg = wx.MessageDialog(None, text, title, - wx.YES_NO | wx.NO_DEFAULT | wx.ICON_QUESTION) - if dlg.ShowModal() != wx.ID_YES: - return - else: - assert False, ("self.config.settings['write_to_file_or_device'] " - "was an unexpected value" - + self.settings['write_to_file_or_device']) - - image_url, hwpack_url = self.db.get_image_and_hwpack_urls(self.settings) - - # Currently the UI is blocked when LMC is running, so grey out the - # buttons to indicate to the user that they won't work! - self.wizard.FindWindowById(wx.ID_BACKWARD).Disable() - self.wizard.FindWindowById(wx.ID_CANCEL).Disable() - - if(image_url and hwpack_url): - - self.file_handler = fetch_image.FileHandler() - - self.timer = wx.Timer(self) - self.Bind(wx.EVT_TIMER, self.timer_ping, self.timer) - self.timer.Start(milliseconds=100, oneShot=True) - - tools_dir = os.path.dirname(__file__) - if tools_dir == '': - tools_dir = None - - self.start_button.Disable() - self.event_queue = Queue.Queue() - self.lmc_thread = self.file_handler.LinaroMediaCreate( - image_url, - hwpack_url, - self.file_handler, - self.event_queue, - self.settings, - tools_dir) - self.lmc_thread.start() - else: - print >> sys.stderr, ("Unable to find files that match the" - "parameters specified") - - def timer_ping(self, event): - """During start_lmc a timer is started to poll for events from - linaro-media-create every 100ms. This is the function which is called - to do that polling.""" - - while not self.event_queue.empty(): - event = self.event_queue.get() - - if event[0] == "start": - self.event_start(event[1]) - - elif event[0] == "end": - self.event_end(event[1]) - - elif event == "terminate" or event == "abort": - # Process complete. Enable next button. - self.wizard.FindWindowById(wx.ID_FORWARD).Enable() - if event == "terminate": - self.populate_file_system_status.SetLabel("Done") - else: - self.populate_file_system_status.SetLabel("Failed") - return # Even if queue isn't empty, stop processing it - - elif event[0] == "update": - self.event_update(event[1], event[2], event[3]) - - elif event[0] == "message": - self.messages.SetLabel(event[1]) - - else: - print >> sys.stderr, "timer_ping: Unhandled event", event - - self.timer.Start(milliseconds=50, oneShot=True) - - def unsigned_packages_query(self, package_list): - message = ('In order to continue, I need to install some unsigned' - 'packages into the image. Is this OK? The packages are:' - '\n\n' + package_list) - - dlg = wx.MessageDialog(self, - message, - 'Install Unsigned Packages Into Image?', - wx.YES_NO | wx.NO_DEFAULT) - - choice = dlg.ShowModal() - dlg.Destroy() - - return choice == wx.ID_YES - - #--- Event(s) --- - def event_start(self, event): - if event == "download": - pass - elif event == "unpack": - self.unpacking_files_status.SetLabel("Running") - elif event == "installing packages": - self.installing_packages_status.SetLabel("Running") - - elif re.search('^unverified_packages:', event): - # Get rid of event ID and whitespace invariance - packages = " ".join(event.split()[1:]) - install_unsigned_packages = self.unsigned_packages_query(packages) - - if install_unsigned_packages == False: - # TODO: Tidy up other threads - sys.exit(1) - else: - self.lmc_thread.send_to_create_process("y") - - elif event == "create file system": - self.create_file_system_status.SetLabel("Running") - elif event == "populate file system": - self.populate_file_system_status.SetLabel("Running") - else: - print "Unhandled start event:", event - - def event_end(self, event): - if event == "download": - self.downloading_files_status.SetLabel("Done") - elif event == "unpack": - self.unpacking_files_status.SetLabel("Done") - elif event == "installing packages": - self.installing_packages_status.SetLabel("Done") - elif event == "create file system": - self.create_file_system_status.SetLabel("Done") - elif event == "populate file system": - self.populate_file_system_status.SetLabel("Done") - else: - print "Unhhandled end event:", event - - def event_update(self, task, update_type, value): - if task == "download": - if update_type == "progress": - self.total_bytes_downloaded += value - - time_difference = time.time() - self.old_time - - if time_difference > 1.0: - self.old_time = time.time() - - # More than a second has passed since we calculated data - # rate - speed = ( float( self.total_bytes_downloaded - - self.old_bytes_downloaded) - / time_difference) - - self.old_bytes_downloaded = self.total_bytes_downloaded - - self.speeds.append(speed) - - average_speed = 0 - speeds_accumulated = 0 - for speed in reversed(self.speeds): - average_speed += speed - speeds_accumulated += 1 - - if speeds_accumulated == 6: - break # do rolling average of 6 seconds - - average_speed /= speeds_accumulated - - time_remaining = ( ( self.total_bytes_to_download - - self.total_bytes_downloaded) - / speed) - - pretty_time = str(datetime.timedelta(seconds=int( - time_remaining))) - - # Following table assumes we don't get past TBps internet - # connections soon :-) - units = ["Bps", "kBps", "MBps", "GBps", "TBps"] - units_index = 0 - while speed > 1024: - speed /= 1024 - units_index += 1 - - info = "Downloading at {0:.1f} {1}".format( - speed, - units[units_index]) - - self.downloading_files_status.SetLabel(info) - - info = "{0} remaining".format( - pretty_time) - - self.downloading_files_info.SetLabel(info) - - self.download_guage.SetValue( 1000 - * self.total_bytes_downloaded - / self.total_bytes_to_download) - - elif update_type == "total bytes": - self.old_time = time.time() - self.old_bytes_downloaded = 0 - self.total_bytes_to_download = value - self.total_bytes_downloaded = 0 - self.speeds = [] # keep an array of speeds used to calculate - # the estimated time remaining - by not just using the - # current speed we can stop the ETA bouncing around too much. - - elif update_type == "message": - self.downloading_files_status.SetLabel(value) - - def event_combo_box_release(self, evt): - pass - - def event_combo_box_build(self, evt): - pass - #--- END event(s) --- - - -class TestDriveWizard(wx.wizard.Wizard): - def __init__(self, title): - wx.wizard.Wizard.__init__(self, None, -1, title, wx.NullBitmap) - self.Bind(wx.wizard.EVT_WIZARD_PAGE_CHANGING, self.on_page_changing) - - def on_page_changing(self, evt): - """Executed before the page changes.""" - - hw_details_pg = self.pages['hardware_details'] - rel_or_snap_pg = self.pages['release_or_snapshot'] - select_snap_pg = self.pages['select_snapshot'] - lmc_settings_pg = self.pages['lmc_settings'] - - page = evt.GetPage() - - if evt.GetDirection(): # If going forwards... - # Always enable back button if going forwards - self.wizard.FindWindowById(wx.ID_BACKWARD).Enable() - - # If going from a select snapshot or select release page, record - # which we were on so the back button of the next page works - if self.config.settings['release_or_snapshot'] == "release": - lmc_settings_pg.SetPrev(rel_or_snap_pg) - else: - select_snap_pg.SetNext(lmc_settings_pg) - - lmc_settings_pg.SetPrev(select_snap_pg) - select_snap_pg.SetPrev(rel_or_snap_pg) - - if page == rel_or_snap_pg: - select_snap_pg.fill_build_combo_box_for_date( - self.config.settings['build_date']) - - if page == hw_details_pg: - rel_or_snap_pg.force_rel_snap_if_hw_requires() - rel_or_snap_pg.fill_os_list() - rel_or_snap_pg.update_release_and_build_boxes() - - if page == select_snap_pg: - # Execute when exiting page - select_snap_pg.update_platform() - - if( page == select_snap_pg - or (page == rel_or_snap_pg and - self.config.settings['release_or_snapshot'] == "release")): - lmc_settings_pg.on_activate() - - if page == lmc_settings_pg: - # Forward stays disabled until LMC has finished running - self.wizard.FindWindowById(wx.ID_FORWARD).Disable() - self.pages['run_lmc'].on_activate() - - else: # Always enable the forward button if reversing into a page - self.wizard.FindWindowById(wx.ID_FORWARD).Enable() - - def go(self): - file_handler = fetch_image.FileHandler() - self.config = fetch_image.FetchImageConfig() - self.config.settings["force_download"] = False - self.config.settings['compatable_hwpacks'] = ['foo'] - - # If the settings file and server index need updating, grab them - file_handler.update_files_from_server() - - # Load settings YAML, which defines the parameters we ask for and - # acceptable responses from the user - self.config.read_config(file_handler.settings_file) - - # Using the config we have, look up URLs to download data from in - # the server index - db = fetch_image.DB(file_handler.index_file) - - # Create the wizard and the pages - self.wizard = wiz.Wizard(self, -1, "Linaro Media Builder") - - self.pages = {} - self.pages['hardware_details'] = AboutMyHardwarePage(self.wizard, - self.config, - db) - - self.wizard.FitToPage(self.pages['hardware_details']) - (width, height) = self.wizard.GetSize() - - self.pages['release_or_snapshot'] = ReleaseOrSnapshotPage(self.wizard, - self.config, - db, - self.pages, - width) - - self.pages['select_snapshot'] = SelectSnapshot(self.wizard, - self.config, - db, - width) - - self.pages['lmc_settings'] = LMC_settings(self.wizard, - self.config, - db, - width) - - self.pages['run_lmc'] = RunLMC(self.wizard, - self.config, - db, - width) - - self.pages['hardware_details'].SetNext( - self.pages['release_or_snapshot']) - - self.pages['lmc_settings'].SetNext(self.pages['run_lmc']) - self.pages['run_lmc'].SetPrev(self.pages['lmc_settings']) - - for (name, page) in self.pages.items(): - self.wizard.GetPageAreaSizer().Add(page) - - self.pages['hardware_details'].on_page_changing() - self.wizard.RunWizard(self.pages['hardware_details']) - - -def run(): - """Wrapper around the full wizard. Is encapsulated in its own function to - allow a restart to be performed, as described in __main___, easily""" - app = wx.PySimpleApp() # Start the application - if app: - pass # We don't use this directly. Stop pyflakes complaining! - - w = TestDriveWizard('Simple Wizard') - return w.go() - -if __name__ == '__main__': - run() === removed file 'linaro-image-indexer' --- linaro-image-indexer 2011-10-27 16:10:01 +0000 +++ linaro-image-indexer 1970-01-01 00:00:00 +0000 @@ -1,238 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) 2010, 2011 Linaro -# -# Author: James Tunnicliffe -# -# This file is part of Linaro Image Tools. -# -# Linaro Image Tools is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# Linaro Image Tools is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Linaro Image Tools; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, -# USA. - -import os -import re -import urlparse -import logging -import bz2 -import linaro_image_tools.fetch_image - -RELEASES_WWW_DOCUMENT_ROOT = "/srv/releases.linaro.org/www" -RELEASE_URL = "http://releases.linaro.org/" -OLD_RELEASES_WWW_DOCUMENT_ROOT = "/srv/releases.linaro.org/www/platform" -OLD_RELEASE_URL = "http://releases.linaro.org/platform/" -SNAPSHOTS_WWW_DOCUMENT_ROOT = "/srv/snapshots.linaro.org/www/" -SNAPSHOTS_URL = "http://snapshots.linaro.org/" - -class ServerIndexer(): - """Create a database of files on the linaro image servers for use by image - creation tools.""" - def reset(self): - self.url_parse = [] - - def __init__(self): - self.reset() - self.db_file_name = "server_index" - self.db = linaro_image_tools.fetch_image.DB(self.db_file_name) - - def regexp_list_matches_some(self, to_search, list): - assert len(list), "empty list passed" - - for item in list: - if re.search(item, to_search): - return True - - return False - - def regexp_list_matches_all(self, to_search, list): - assert len(list), "empty list passed" - - for item in list: - if not re.search(item, to_search): - return False - - return True - - def crawl(self): - self.db.set_url_parse_info(self.url_parse) - logging.getLogger("linaro_image_tools").info(self.url_parse) - - for index in range(len(self.url_parse)): - info = self.url_parse[index] - table = info["table"] - - logging.getLogger("linaro_image_tools").info("%s %s %s %s %s" % \ - (info["base_dir"], info["base_url"], table, - info["url_validator"], info["url_chunks"])) - - self.go(info, table, index) - - logging.getLogger("linaro_image_tools").info("") - - - def go(self, info, table, index): - root_url = info["base_url"] - root_dir = info["base_dir"] - - for root, subFolders, files in os.walk( root_dir ): - for file in files: - relative_location = re.sub(root_dir, "", - os.path.join(root, file)) - relative_location = relative_location.lstrip("/") - - to_match = info["url_validator"][0] - not_match = info["url_validator"][1] - - url = urlparse.urljoin(root_url, relative_location) - url = urlparse.urljoin(url, file) - - to_match_ok = False - if len(to_match) == 0: - to_match_ok = True - if len(to_match) and self.regexp_list_matches_all( - relative_location, to_match): - to_match_ok = True - - not_match_ok = True - if len(not_match) and self.regexp_list_matches_some( - relative_location, not_match): - not_match_ok = False - - if( not (to_match_ok and not_match_ok) - or not re.search("\.gz$", file)): - continue # URL doesn't match the validator. Ignore. - - logging.getLogger("linaro_image_tools").info(url) - self.db.record_url(url, index) - - self.dump() - - def dump(self): - self.db.commit() - - def close_and_bzip2(self): - # After finishing creating the database, create a compressed version - # for more efficient downloads - self.db.close() - bz2_db_file = bz2.BZ2File(self.db_file_name + ".bz2", "w") - db_file = open(self.db_file_name) - bz2_db_file.write(db_file.read()) - bz2_db_file.close() - - def add_directory_parse_list(self, - base_dir, - base_url, - url_validator, - db_columns, - table, - url_chunks): - - if not id in self.url_parse: - self.url_parse.append({"base_dir": base_dir, - "base_url": base_url, - "url_validator": url_validator, - "db_columns": db_columns, - "url_chunks": url_chunks, - "table": table}) - logging.getLogger("linaro_image_tools").info(base_dir) - - # Construct data needed to create the table - items = [] - for item in url_chunks: - if(item != ""): - # If the entry is a tuple, it indicates it is of the - # form name, regexp - if(isinstance(item, tuple)): - items.append(item[0]) - else: - items.append(item) - - self.db.create_table_with_name_columns(table, db_columns) - - def clean_removed_urls_from_db(self): - self.db.clean_removed_urls_from_db() - -if __name__ == '__main__': - crawler = ServerIndexer() - - ch = logging.StreamHandler() - ch.setLevel(logging.CRITICAL) - formatter = logging.Formatter("%(message)s") - ch.setFormatter(formatter) - logger = logging.getLogger("linaro_image_tools") - logger.setLevel(logging.CRITICAL) - logger.addHandler(ch) - - #linaro-n/ubuntu-desktop/11.09 - crawler.add_directory_parse_list(OLD_RELEASES_WWW_DOCUMENT_ROOT, - OLD_RELEASE_URL, - ([], ["platform/", "old/", "hwpack", - "alpha", "beta", "final", "leb", - "leb", "release-candidate"]), - ["platform", "image", "build=final"], - "release_binaries", - ["", "image", "platform"]) - - #linaro-n/hwpacks/11.09 - crawler.add_directory_parse_list(OLD_RELEASES_WWW_DOCUMENT_ROOT, - OLD_RELEASE_URL, - (["/hwpacks/"], - ["alpha", "beta", "final", "leb", - "release-candidate"]), - ["platform", "hardware", "build=final"], - "release_hwpacks", - ["", "", "platform", - ("hardware", r"hwpack_linaro-(.*?)_")]) - - # 11.10/ubuntu/oneiric-images/ubuntu-desktop/ - # NOT images/... - crawler.add_directory_parse_list(RELEASES_WWW_DOCUMENT_ROOT, - RELEASE_URL, - (["\d+\.\d+", "ubuntu", "oneiric-images"], - ["latest/", "platform/", "old/", - "hwpack", "^images/"]), - ["platform", "image", "build=final"], - "release_binaries", - ["platform", "", "", "image"]) - - # 11.10/ubuntu/oneiric-hwpacks/ - crawler.add_directory_parse_list(RELEASES_WWW_DOCUMENT_ROOT, - RELEASE_URL, - (["\d+\.\d+", "ubuntu", "oneiric-hwpacks"], - ["latest/", "platform/", "old/", - "^images/"]), - ["platform", "hardware", "build=final"], - "release_hwpacks", - ["platform", "", "", - ("hardware", r"hwpack_linaro-(.*?)_")]) - - #oneiric/linaro-o-alip/20111026/0/images/tar/ - crawler.add_directory_parse_list(SNAPSHOTS_WWW_DOCUMENT_ROOT, - SNAPSHOTS_URL, - (["^oneiric/"], ["/hwpack"]), - ["platform", "image", "date", "build"], - "snapshot_binaries", - ["platform", "image", "date", "build"]) - - #oneiric/lt-panda-oneiric/20111026/0/images/hwpack/ - crawler.add_directory_parse_list(SNAPSHOTS_WWW_DOCUMENT_ROOT, - SNAPSHOTS_URL, - (["^oneiric/", "/hwpack"], []), - ["platform", "hardware", "date", "build"], - "snapshot_hwpacks", - ["platform", "hardware", "date", "build"]) - - crawler.crawl() - crawler.clean_removed_urls_from_db() - crawler.dump() - crawler.close_and_bzip2() === removed file 'linaro_image_tools/fetch_image.py' --- linaro_image_tools/fetch_image.py 2011-10-28 11:30:44 +0000 +++ linaro_image_tools/fetch_image.py 1970-01-01 00:00:00 +0000 @@ -1,1607 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) 2010, 2011 Linaro -# -# Author: James Tunnicliffe -# -# This file is part of Linaro Image Tools. -# -# Linaro Image Tools is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# Linaro Image Tools is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Linaro Image Tools; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, -# USA. - -import os -import sys -import re -import urllib2 -import argparse -import sqlite3 -import yaml -import urlparse -import logging -import bz2 -import time -import shutil -import datetime -import threading -import subprocess -import utils -import xdg.BaseDirectory as xdgBaseDir - - -QEMU = "qemu" -HARDWARE = "hardware" - -class DownloadManager(): - def __init__(self, cachedir): - self.cachedir = cachedir - self.image_url = None - self.hwpack_url = None - self.settings = None - self.event_queue = None - self.to_download = None - self.sha1_files = None - self.downloaded_files = None - self.sig_files = None - self.verified_files = None - self.gpg_sig_ok = None - self.have_sha1sums = None - self.have_gpg_sigs = None - - def name_and_path_from_url(self, url): - """Return the file name and the path at which the file will be stored - on the local system based on the URL we are downloading from""" - # Use urlparse to get rid of everything but the host and path - scheme, host, path, params, query, fragment = urlparse.urlparse(url) - - url_search = re.search(r"^(.*)/(.*?)$", host + path) - assert url_search, "URL in unexpectd format" + host + path - - # Everything in url_search.group(1) should be directories and the - # server name and url_search.group(2) the file name - file_path = self.cachedir + os.sep + url_search.group(1) - file_name = url_search.group(2) - - return file_name, file_path - - def urllib2_open(self, url): - maxtries = 10 - for trycount in range(0, maxtries): - try: - response = urllib2.urlopen(url) - except: - if trycount < maxtries - 1: - print "Unable to connect to", url, "retrying in 5 seconds." - time.sleep(5) - continue - else: - print "Connect failed:", url - raise - return None - else: - return response - - return None - - def list_files_in_dir_of_url(self, url): - """ - return a directory listing of the directory that url sits in - """ - - import BeautifulSoup - - url = os.path.dirname(url) - response = self.urllib2_open(url) - page = response.read() - - dir = [] - # Use BeautifulSoup to parse the HTML and iterate over all 'a' tags - soup = BeautifulSoup.BeautifulSoup(page) - for link in soup.findAll('a'): - for attr, value in link.attrs: - if( attr == "href" - and not re.search(r'[\?=;/]', value)): - # Ignore links that don't look like plain files - dir.append('/'.join([url, value])) - - return dir - - def sha1sum_file_download_list(self): - """ - Need more than just the hwpack and OS image if we want signature - verification to work, we need sha1sums, signatures for sha1sums - and manifest files. - - Note that this code is a bit sloppy and may result in downloading - more sums than are strictly required, but the - files are small and the wrong files won't be used. - """ - - downloads_list = [] - - # Get list of files in OS image directory - image_dir = self.list_files_in_dir_of_url(self.image_url) - hwpack_dir = self.list_files_in_dir_of_url(self.hwpack_url) - - for link in image_dir: - if re.search("sha1sums\.txt$", link): - downloads_list.append(link) - - for link in hwpack_dir: - if( re.search(self.settings['hwpack'], link) - and re.search("sha1sums\.txt$", link)): - downloads_list.append(link) - - return downloads_list - - def generate_download_list(self): - """ - Generate a list of files based on what is in the sums files that - we have downloaded. We may have downloaded some sig files based on - a rather sloppy file name match that we don't want to use. For the - hwpack sig files, check to see if the hwpack listed matches the hwpack - URL. If it doesn't ignore it. - - 1. Download sig file(s) that match the hardware spec (done by this - point). - 2. Find which sig file really matches the hardware pack we have - downloaded. (this function calculates this list) - 3. Download all the files listed in the sig file (done by another func) - - We go through this process because sometimes a directory will have - more than one hardware pack that will match the hardware pack name, - for example panda and panda-x11 will both match "panda". These checks - make sure we only try and validate the signatures of the files that - we should be downloading and not try and validatate a signature of a - file that there is no reason for us to download, which would result in - an an invalid warning about installing unsigned packages when running - linaro-media-create. - """ - - downloads_list = [self.image_url, self.hwpack_url] - - for sha1sum_file_name in self.sha1_files.values(): - sha1sum_file = open(sha1sum_file_name) - - common_prefix = None - - files = [] - for line in sha1sum_file: - line = line.split() # line[1] is now the file name - - if line[1] == os.path.basename(self.image_url): - # Found a line that matches an image or hwpack URL - keep - # the contents of this sig file - common_prefix = os.path.dirname(self.image_url) - - if line[1] == os.path.basename(self.hwpack_url): - # Found a line that matches an image or hwpack URL - keep - # the contents of this sig file - common_prefix = os.path.dirname(self.hwpack_url) - - if common_prefix: - for file_name in files: - downloads_list.append('/'.join([common_prefix, - file_name])) - - dir = self.list_files_in_dir_of_url(common_prefix + '/') - - # We include the sha1sum files that pointed to the files that - # we are going to download so the full file list can be parsed - # later. The files won't be re-downloaded because they will be - # cached. - file_name = os.path.basename(sha1sum_file_name) - downloads_list.append('/'.join([common_prefix, file_name])) - signed_sums = os.path.basename(sha1sum_file_name) + ".asc" - - for link in dir: - if re.search(signed_sums, link): - downloads_list.append(link) - - sha1sum_file.close() - - return downloads_list - - def download_files(self, - downloads_list, - settings, - event_queue=None, - force_download=False): - """ - Download files specified in the downloads_list, which is a list of - url, name tuples. - """ - - force_download = settings["force_download"] or force_download - - downloaded_files = {} - - bytes_to_download = 0 - - for url in downloads_list: - file_name, file_path = self.name_and_path_from_url(url) - - file_name = file_path + os.sep + file_name - if os.path.exists(file_name) and not force_download: - continue # If file already exists, don't download it - - response = self.urllib2_open(url) - if response: - bytes_to_download += int(response.info() - .getheader('Content-Length').strip()) - response.close() - - if event_queue: - event_queue.put(("start", "download")) - event_queue.put(("update", - "download", - "total bytes", - bytes_to_download)) - - for url in downloads_list: - path = None - try: - path = self.download(url, - event_queue, - force_download) - except Exception: - # Download error. Hardly matters what, we can't continue. - print "Unexpected error:", sys.exc_info()[0] - logging.error("Unable to download " + url + " - aborting.") - - if path == None: # User hit cancel when downloading - sys.exit(0) - - downloaded_files[url] = path - logging.info("Have downloaded {0} to {1}".format(url, path)) - - if event_queue: - event_queue.put(("end", "download")) - - return downloaded_files - - def download(self, - url, - event_queue, - force_download=False): - """Downloads the file requested buy URL to the local cache and returns - the full path to the downloaded file""" - - file_name, file_path = self.name_and_path_from_url(url) - file_name = file_path + os.sep + file_name - - if not os.path.isdir(file_path): - os.makedirs(file_path) - - if force_download != True and os.path.exists(file_name): - logging.info(file_name + " already cached. Not downloading (use " - "--force-download to override).") - return file_name - - logging.info("Fetching " + url) - - response = self.urllib2_open(url) - - self.do_download = True - file_out = open(file_name, 'w') - download_size_in_bytes = int(response.info() - .getheader('Content-Length').strip()) - chunks_downloaded = 0 - - show_progress = download_size_in_bytes > 1024 * 200 - - if show_progress: - chunk_size = download_size_in_bytes / 1000 - if not event_queue: - print "Fetching", url - else: - chunk_size = download_size_in_bytes - - printed_progress = False - while self.do_download: - chunk = response.read(chunk_size) - if len(chunk): - # Print a % download complete so we don't get too bored - if show_progress: - if event_queue: - event_queue.put(("update", - "download", - "progress", - len(chunk))) - else: - # Have 1000 chunks so div by 10 to get %... - sys.stdout.write("\r%d%%" % (chunks_downloaded / 10)) - printed_progress = True - sys.stdout.flush() - - file_out.write(chunk) - chunks_downloaded += 1 - - else: - if printed_progress: - print "" - break - - file_out.close() - - if self.do_download == False: - os.remove(file_name) - return None - - return file_name - - def download_if_old(self, url, event_queue, force_download): - file_name, file_path = self.name_and_path_from_url(url) - - file_path_and_name = file_path + os.sep + file_name - - if(not os.path.isdir(file_path)): - os.makedirs(file_path) - try: - force_download = (force_download == True - or ( time.mktime(time.localtime()) - - os.path.getmtime(file_path_and_name) - > 60 * 60)) - except OSError: - force_download = True # File not found... - - return self.download(url, event_queue, force_download) - - def get_sig_files(self): - """ - Find sha1sum.txt files in downloaded_files, append ".asc" and return - list. - - The reason for not just searching for .asc files is because if they - don't exist, utils.verify_file_integrity(sig_files) won't check the - sha1sums that do exist, and they are useful to us. Trying to check - a GPG signature that doesn't exist just results in the signature check - failing, which is correct and we act accordingly. - """ - - self.sig_files = [] - - for filename in self.downloaded_files.values(): - if re.search(r"sha1sums\.txt$", filename): - self.sig_files.append(filename + ".asc") - - def _download_sigs_gen_download_list(self, force_download=False): - - to_download = self.sha1sum_file_download_list() - self.sha1_files = self.download_files( - to_download, self.settings, - force_download=force_download) - - self.to_download = self.generate_download_list() - - gpg_urls = [f for f in self.to_download if re.search('\.asc$', f)] - self.download_files( - gpg_urls, self.settings, force_download=force_download) - - def _check_downloads(self): - self.get_sig_files() - - (self.verified_files, self.gpg_sig_ok, - self.gpg_out) = utils.verify_file_integrity(self.sig_files) - - # Expect to have 2 sha1sum files (one for hwpack, one for OS bin) - self.have_sha1sums = len(self.sha1_files) ==2 - - # We expect 2 GPG signatures. Check that we have both of them. - - self.have_gpg_sigs = (len(self.sig_files) == 2 and - os.path.exists(self.sig_files[0]) and - os.path.exists(self.sig_files[1])) - - def _unverified_files(self): - """ - Return a list of URLs of files that didn't get verified - """ - verified_files = self.verified_files - - unverified_files = [] - for sig_file in self.sig_files: - name = os.path.basename(sig_file) - verified_files.append(name) - verified_files.append(name[0:-len('.asc')]) - - # Generate a list of files that didn't verify - for url in self.to_download: - url_file = os.path.basename(url) - if url_file not in verified_files: - unverified_files.append(url) - - return unverified_files - - def download_files_and_verify(self, image_url, hwpack_url, - settings, event_queue=None): - - self.image_url = image_url - self.hwpack_url = hwpack_url - self.settings = settings - self.event_queue = event_queue - - self._download_sigs_gen_download_list() - - self.downloaded_files = self.download_files(self.to_download, - self.settings, - self.event_queue) - - self._check_downloads() - - if(self.have_sha1sums and self.have_gpg_sigs - and not self.gpg_sig_ok): - # GPG signatures failed for sha1sum files. Re-download the - # sha1sums and GPG signatues. If the GPG signature then - # matches the sha1sums we will re-download any failing hwpack - # and OS binary files in the if below. - - no_pubkey_search = re.search("\[GNUPG:\] NO_PUBKEY (\S+)", - self.gpg_out) - if no_pubkey_search: - message = ("Package signature check failed.\n" - "To check package signatures, please import " - "key {0}") - # The GPG output we are using gives us the long key format, - # which doesn't match anything in the key management app - # that ships with Ubuntu Desktop. The last 8 digits though - # are the short key, which are what we normally deal with. - # That is, this seems to be the case. I haven't found any - # answers after searching around about the long keyID format, - # but this works for keys I have tested with... - message = message.format(no_pubkey_search.group(1)[-8:]) - if self.event_queue: - self.event_queue.put(("message", message)) - else: - print >> sys.stderr, message - - else: - self._download_sigs_gen_download_list(force_download=True) - self._check_downloads() - - if(self.have_sha1sums and self.have_gpg_sigs - and not self.gpg_sig_ok): - # If after re-trying the downloads we still can't get a GPG - # signature match on a sha1sum file (and both files exist) - # tell the user. - message = "Package signature check failed" - if self.event_queue: - self.event_queue.put(("message", message)) - self.event_queue.put("abort") - else: - print >> sys.stderr, message - - return [], False - - if(self.have_sha1sums and - self.gpg_sig_ok or not self.have_gpg_sigs): - # The GPG signature is OK, so the sha1sums file and GPG sig are - # both good. OR We have sha1sum files and no GPG sigs. We can - # Still validate downloads against the sha1sums, just not mark - # the packages as signed. - - to_retry = self._unverified_files() - - if len(to_retry): - if self.event_queue: - self.event_queue.put(("message", "Retrying bad files")) - else: - print "Re-downloading corrupt files" - # There are some files to re-download - self.download_files(to_retry, - self.settings, - self.event_queue, - force_download=True) - - (self.verified_files, self.gpg_sig_ok, - self.gpg_out) = utils.verify_file_integrity(self.sig_files) - - to_retry = self._unverified_files() - - if len(to_retry): - # We can't get valid package downloads. Either the sha1sums - # file was incorrectly generated or the download is - # corrupt. Display a message to the user and quit. - message = "Download retry failed. Aborting" - if self.event_queue: - self.event_queue.put(("message", message)) - self.event_queue.put("abort") - else: - print >> sys.stderr, message - - return [], False - - hwpack = os.path.basename(self.downloaded_files[hwpack_url]) - hwpack_verified = (hwpack in self.verified_files) and self.gpg_sig_ok - - return self.downloaded_files, hwpack_verified - - -class FileHandler(): - """Downloads files and creates images from them by calling - linaro-media-create""" - def __init__(self): - self.datadir = os.path.join(xdgBaseDir.xdg_data_home, - "linaro", - "image-tools", - "fetch_image") - - self.cachedir = os.path.join(xdgBaseDir.xdg_cache_home, - "linaro", - "image-tools", - "fetch_image") - - self.downloader = DownloadManager(self.cachedir) - - def has_key_and_evaluates_True(self, dictionary, key): - return bool(key in dictionary and dictionary[key]) - - def append_setting_to(self, list, dictionary, key, setting_name=None): - if not setting_name: - setting_name = "--" + key - - if self.has_key_and_evaluates_True(dictionary, key): - list.append(setting_name) - list.append(dictionary[key]) - - def build_lmc_command(self, - binary_file, - hwpack_file, - settings, - tools_dir, - hwpack_verified, - run_in_gui=False): - - import linaro_image_tools.utils - - args = ["pkexec"] - - # Prefer a local linaro-media-create (from a bzr checkout for instance) - # to an installed one - lmc_command = linaro_image_tools.utils.find_command( - 'linaro-media-create', - tools_dir) - - if lmc_command: - args.append(os.path.abspath(lmc_command)) - else: - args.append("linaro-media-create") - - if run_in_gui: - args.append("--nocheck-mmc") - - if hwpack_verified: - # We verify the hwpack before calling linaro-media-create because - # these tools run linaro-media-create as root and to get the GPG - # signature verification to work, root would need to have GPG set - # up with the Canonical Linaro Image Build Automatic Signing Key - # imported. It seems far more likely that users will import keys - # as themselves, not root! - args.append("--hwpack-force-yes") - - if 'rootfs' in settings and settings['rootfs']: - args.append("--rootfs") - args.append(settings['rootfs']) - - assert(self.has_key_and_evaluates_True(settings, 'image_file') ^ - self.has_key_and_evaluates_True(settings, 'mmc')), ("Please " - "specify either an image file, or an mmc target, not both.") - - self.append_setting_to(args, settings, 'mmc') - self.append_setting_to(args, settings, 'image_file', '--image-file') - self.append_setting_to(args, settings, 'image_size', '--image-size') - self.append_setting_to(args, settings, 'swap_size', '--swap-size') - self.append_setting_to(args, settings, 'swap_file', '--swap-file') - self.append_setting_to(args, settings, 'yes_to_mmc_selection', - '--nocheck-mmc') - - args.append("--dev") - args.append(settings['hardware']) - args.append("--binary") - args.append(binary_file) - args.append("--hwpack") - args.append(hwpack_file) - - if not os.path.isdir(self.datadir): - os.makedirs(self.datadir) - with open(os.path.join(self.datadir, "linaro-media-create.log"), - mode='w') as lmc_log: - lmc_log.write(" ".join(args) + "\n") - - logging.info(" ".join(args)) - - return args - - def create_media(self, image_url, hwpack_url, settings, tools_dir): - """Create a command line for linaro-media-create based on the settings - provided then run linaro-media-create, either in a separate thread - (GUI mode) or in the current one (CLI mode).""" - - downloaded_files, hwpack_verified = ( - self.downloader.download_files_and_verify(image_url, hwpack_url, - settings)) - - if len(downloaded_files) == 0: - return - - lmc_command = self.build_lmc_command(downloaded_files[image_url], - downloaded_files[hwpack_url], - settings, - tools_dir, - hwpack_verified) - - self.create_process = subprocess.Popen(lmc_command) - self.create_process.wait() - - class LinaroMediaCreate(threading.Thread): - """Thread class for running linaro-media-create""" - def __init__(self, - image_url, - hwpack_url, - file_handler, - event_queue, - settings, - tools_dir): - - threading.Thread.__init__(self) - - self.image_url = image_url - self.hwpack_url = hwpack_url - self.file_handler = file_handler - self.downloader = file_handler.downloader - self.event_queue = event_queue - self.settings = settings - self.tools_dir = tools_dir - - self.datadir = os.path.join(xdgBaseDir.xdg_data_home, - "linaro", - "image-tools", - "fetch_image") - - def run(self): - """ - 1. Download required files. - 2. Build linaro-media-create command - 3. Start linaro-media-create and look for lines in the output that: - 1. Tell us that an event has happened that we can use to update - the UI progress. - 2. Tell us that linaro-media-create is asking a question that - needs to be re-directed to the GUI - """ - - self.event_queue.put(("update", - "download", - "message", - "Downloading")) - - files, hwpack_ok = self.downloader.download_files_and_verify( - self.image_url, self.hwpack_url, - self.settings, self.event_queue) - - if len(files) == 0: - return - - lmc_command = self.file_handler.build_lmc_command( - files[self.image_url], - files[self.hwpack_url], - self.settings, - self.tools_dir, - hwpack_ok, - True) - - self.create_process = subprocess.Popen(lmc_command, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - - self.line = "" - self.saved_lines = "" - self.save_lines = False - self.state = None - self.waiting_for_event_response = False - self.event_queue.put(("start", "unpack")) - - lmc_log = open(os.path.join(self.datadir, - "linaro-media-create.log"), mode='a') - - while(1): - if self.create_process.poll() != None: - # linaro-media-create has finished. Tell the GUI the return - # code so it can report pass/fail. - if self.create_process.returncode: - self.event_queue.put("abort") - else: - self.event_queue.put("terminate") - return - - self.input = self.create_process.stdout.read(1) - - # We build up lines by extracting data from linaro-media-create - # a single character at a time. This is because if we fetch - # whole lines, using Popen.communicate a character is - # automatically sent back to the running process, which if the - # process is waiting for user input, can trigger a default - # action. By using stdout.read(1) we pick off a character at a - # time and avoid this problem. - if self.input == '\n': - if self.save_lines: - self.saved_lines += self.line - - lmc_log.write(self.line + "\n") - lmc_log.flush() - self.line = "" - else: - self.line += self.input - - if self.line == "Updating apt package lists ...": - self.event_queue.put(("end", "unpack")) - self.event_queue.put(("start", "installing packages")) - self.state = "apt" - - elif(self.line == - "WARNING: The following packages cannot be authenticated!"): - self.saved_lines = "" - self.save_lines = True - - elif(self.line == - "Install these packages without verification [y/N]? "): - self.saved_lines = re.sub("WARNING: The following packages" - " cannot be authenticated!", - "", self.saved_lines) - self.event_queue.put(("start", - "unverified_packages:" - + self.saved_lines)) - self.line = "" - self.waiting_for_event_response = True # Wait for restart - - elif self.line == "Do you want to continue [Y/n]? ": - self.send_to_create_process("y") - self.line = "" - - elif self.line == "Done" and self.state == "apt": - self.state = "create file system" - self.event_queue.put(("end", "installing packages")) - self.event_queue.put(("start", "create file system")) - - elif( self.line == "Created:" - and self.state == "create file system"): - self.event_queue.put(("end", "create file system")) - self.event_queue.put(("start", "populate file system")) - self.state = "populate file system" - - while self.waiting_for_event_response: - time.sleep(0.2) - - lmc_log.close() - - def send_to_create_process(self, text): - print >> self.create_process.stdin, text - self.waiting_for_event_response = False - - - def update_files_from_server(self, force_download=False, - event_queue=None): - - settings_url = "http://releases.linaro.org/fetch_image/fetch_image_settings.yaml" - server_index_url = "http://releases.linaro.org/fetch_image/server_index.bz2" - - self.settings_file = self.downloader.download_if_old(settings_url, - event_queue, - force_download) - - self.index_file = self.downloader.download_if_old(server_index_url, - event_queue, - force_download) - - zip_search = re.search(r"^(.*)\.bz2$", self.index_file) - - if zip_search: - # index file is compressed, unzip it - zipped = bz2.BZ2File(self.index_file) - unzipped = open(zip_search.group(1), "w") - - unzipped.write(zipped.read()) - - zipped.close() - unzipped.close() - - self.index_file = zip_search.group(1) - - def clean_cache(self): - shutil.rmtree(self.cachedir) - - -class FetchImageConfig(): - """Reads settings from the settings YAML file as well as those from the - command line providing a central settings repository.""" - - def __init__(self): - self.settings = {} - - def read_config(self, file_name): - try: - f = open(file_name, "r") - - if(f): - self.settings = dict(self.settings.items() + - yaml.load(f.read()).items()) - - except IOError: - print "Unable to read settings file %s", file_name - logging.error("Unable to read settings file %s", file_name) - sys.exit(0) - - self.settings['write_to_file_or_device'] = "file" - - # At this point, assume settings have been loaded. - # We need some reverse mappings set up - self.settings['UI']['reverse-descriptions'] = {} - for (key, value) in self.settings['UI']['descriptions'].items(): - if isinstance(value, basestring): - value = re.sub('LEB:\s*', '', value) - self.settings['UI']['reverse-descriptions'][value] = key - - # If an item doesn't have a translation, just add in a null one - if not self.settings['UI']['translate']: - self.settings['UI']['translate'] = {} - - for key, value in self.settings['choice']['platform'].items(): - if not key in self.settings['UI']['translate']: - self.settings['UI']['translate'][key] = key - - self.settings['UI']['reverse-translate'] = {} - for (key, value) in self.settings['UI']['translate'].items(): - self.settings['UI']['reverse-translate'][value] = key - - def parse_args(self, args): - parser = argparse.ArgumentParser(description="Create a board image, " - "first downloading any required " - "files.") - - for (key, value) in self.settings['choice'].items(): - parser.add_argument( - "-" + self.settings['UI']['cmdline short names'][key], - "--" + key, - help=self.settings['UI']['descriptions']['choice'][key], - required=True) - - parser.add_argument("-x", "--clean-cache", - help="Delete all cached downloads", - action='store_true') - parser.add_argument("-d", "--force-download", - help="Force re-downloading of cached files", - action='store_true') - parser.add_argument( - "-t", "--image-file", - help="Where to write image file to (use this XOR mmc)") - parser.add_argument( - "-m", "--mmc", - help="What disk to write image to (use this XOR image-file)") - parser.add_argument("--swap-file", - help="Swap file size for created image") - parser.add_argument("--image-size", - help="Image file size for created image") - parser.add_argument("--rootfs", - help="Root file system type for created image") - - self.args = vars(parser.parse_args(args)) - - -class DB(): - """Interacts with the database storing URLs of files that are available - for download to create images with. Provides functions for both creation - and reading.""" - - def __init__(self, index_name): - # http://www.sqlite.org/lang_transaction.html - defer acquiring a locK - # until it is required. - self.db_file_name = index_name - self.database = sqlite3.connect(index_name, isolation_level="DEFERRED") - self.c = self.database.cursor() - self.touched_urls = {} - - def close(self): - self.database.close() - self.database = None - - def set_url_parse_info(self, url_parse): - self.url_parse = url_parse - - def record_url(self, url, index): - """Check to see if the record exists in the index, if not, add it""" - - assert self.url_parse[index]["base_url"], ("Can not match the " - "URL received (%s) to an entry provided by add_url_parse_list", - url) - assert re.search('^' + self.url_parse[index]["base_url"], url), ( - "Base url is not part of the url to record.") - - logging.info("Recording URL %s %d", url, index) - - assert url not in self.touched_urls, ("URLs expected to only be added " - "to 1 place\n" + url) - - self.touched_urls[url] = True - table = self.url_parse[index]["table"] - - # Do not add the record if it already exists - self.c.execute("select url from " + table + " where url == ?", (url,)) - if self.c.fetchone(): - return - - url_match = re.search(self.url_parse[index]["base_url"] + r"(.*)$", - url) - url_chunks = url_match.group(1).lstrip('/').encode('ascii').split('/') - # url_chunks now contains all parts of the url, split on /, - # not including the base URL - - # We now construct an SQL command to insert the index data into the - # database using the information we have. - - sqlcmd = "INSERT INTO " + table + " (" - length = 0 - for name in (self.url_parse[index]["url_chunks"] + ["url"]): - if name != "": - if isinstance(name, tuple): - name = name[0] - sqlcmd += name + ", " - length += 1 - - # Handle fixed value columns - for name in self.url_parse[index]["db_columns"]: - name_search = re.search("(\w+)=(.*)", name) - if name_search: - sqlcmd += name_search.group(1) + ", " - length += 1 - - sqlcmd = sqlcmd.rstrip(", ") # get rid of unwanted space & comma - sqlcmd += ") VALUES (" - - # Add the appropriate number of ?s (+1 is so we have room for url) - sqlcmd += "".join(["?, " for x in range(length)]) - sqlcmd = sqlcmd.rstrip(", ") # get rid of unwanted space and comma - sqlcmd += ")" - - # Get the parameters from the URL to record in the SQL database - sqlparams = [] - chunk_index = 0 - for name in self.url_parse[index]["url_chunks"]: - # If this part of the URL isn't a parameter, don't insert it - if name != "": - # If the entry is a tuple, it indicates it is of the form - # name, regexp - if isinstance(name, tuple): - # use stored regexp to extract data for the database - match = re.search(name[1], url_chunks[chunk_index]) - assert match, ("Unable to match regexp to string ", - + url_chunks[chunk_index] + " " + name[1]) - sqlparams.append(match.group(1)) - - else: - sqlparams.append(url_chunks[chunk_index]) - - chunk_index += 1 - - sqlparams.append(url) - - # Handle fixed value columns - for name in self.url_parse[index]["db_columns"]: - name_search = re.search("(\w+)=(.*)", name) - if name_search: - sqlparams.append(name_search.group(2)) - - logging.info("{0}: {1}".format(sqlcmd, sqlparams)) - self.c.execute(sqlcmd, tuple(sqlparams)) - - def commit(self): - self.database.commit() - - def __del__(self): - if(self.database): - self.commit() - self.database.close() - - def create_table_with_name_columns(self, table, items): - cmd = "create table if not exists " - cmd += table + " (" - - # Handle fixed items (kept in because a field can no longer be derived) - for item in items: - item = re.sub("=.*", "", item) - cmd += item + " TEXT, " - - cmd += "url TEXT)" - - self.execute(cmd) - - def execute(self, cmd, params=None): - self.c = self.database.cursor() - logging.info(cmd) - if(params): - logging.info(params) - self.c.execute(cmd, params) - else: - self.c.execute(cmd) - - def execute_return_list(self, cmd, params=None): - self.execute(cmd, params) - list = [] - item = self.c.fetchone() - while item: - list.append(item) - item = self.c.fetchone() - - return list - - def delete_by_url(self, table, url): - self.execute("delete from " + table + " where url == ?", (url,)) - - def clean_removed_urls_from_db(self): - self.c = self.database.cursor() - - for info in self.url_parse: - table = info["table"] - - self.c.execute("select url from " + table) - to_delete = [] - - while(1): - url = self.c.fetchone() - if(url == None): - break - - if(url[0] not in self.touched_urls): - to_delete.append(url[0]) - - # We can't delete an item from the database while iterating on the - # results of a previous query, so we store the URLs of entries to - # delete, and batch them up - for url in to_delete: - self.delete_by_url(table, url) - - self.commit() - - def get_url(self, table, key_value_pairs, sql_extras=None): - """Return the first matching URL from the specified table building up a - SQL query based on the values inkey_value_pairs creating key == value - sections of the query. sql_extras is appended to the created SQL query, - so we can get the first entry in an ordered list for instance""" - self.c = self.database.cursor() - - cmd = "select url from " + table + " where " - params = [] - - first = True - for key, value in key_value_pairs: - if(first == False): - cmd += " AND " - else: - first = False - - cmd += key + " == ? " - params.append(value) - - if(sql_extras): - cmd += " " + sql_extras - - self.c.execute(cmd, tuple(params)) - url = self.c.fetchone() - - return url - - def get_platforms(self, table): - self.execute("select distinct platform from ?", table) - - platforms = [] - platform = self.c.fetchone() - - while(platform): - platforms.append(platform) - platform = self.c.fetchone() - - return platforms - - def get_builds(self, platform, image=None): - """return a complete list of builds available from the releases - repository""" - - build_tuples = self.execute_return_list( - 'select distinct build from release_binaries ' - 'where platform == "' + platform + '"') - hwpack_build_tuples = self.execute_return_list( - 'select distinct build from release_hwpacks' - ' where platform == "' + platform + '"') - - # Need to also check hwpacks and image -> some builds are only - # Available for a particular image (OS). Guess this makes sense as OS - # customisations may come and go. - - image_builds = [build[0] for build in build_tuples] - hwpack_builds = [build[0] for build in hwpack_build_tuples] - - builds = [] - for build in image_builds: - if build in hwpack_builds: - # Only use a build if it exists for the hardware pack as well - # as the image and platform chosen - builds.append(build) - - builds.sort() - - # Just so final is always the last item, if it exists... - if "final" in builds: - builds.remove("final") - builds.append("final") - - return builds - - def get_hwpacks(self, table, platform=None): - """Return a list of all the hardware packs available in the specified - table""" - - query = 'select distinct hardware from ' + table - if platform: - query += ' where platform == "' + platform + '"' - - results = self.execute_return_list(query) - - hwpacks = [item[0] for item in results] - return hwpacks - - def get_dates_and_builds_of_snapshots(self): - """Return all the dates that match an an OS and hardware pack build""" - # First get all dates from hwpack table - self.execute("select distinct date from snapshot_hwpacks " - "order by date") - - date_with_hwpack = {} - date = self.c.fetchone() - - while date: - date_with_hwpack[date] = 1 - date = self.c.fetchone() - - # Now make sure there is a file system image for that date as well - self.execute("select distinct date from snapshot_binaries " - "order by date") - - date = self.c.fetchone() - while date: - if date in date_with_hwpack: - date_with_hwpack[date] = 2 - date = self.c.fetchone() - - # Now anything with both a hwpack and a file system has a value of 2 - # recorded in date_with_hwpack[date] - date_with_build = {} - - for key, date in date_with_hwpack.items(): - if(key == 2): - date_with_build[date] = True - - return date_with_build - - def get_hardware_from_db(self, table): - """Get a list of hardware available from the given table""" - self.execute("select distinct hardware from " + table + - " order by hardware") - - hardware = [] - hw = self.c.fetchone() - while hw: - hardware.append(hw[0]) - hw = self.c.fetchone() - - return hardware - - def image_hardware_combo_available(self, - release_or_snapshot, - image, - hwpacks): - """Try and find a matching plaform, build pair for both the provided - image and one of the hwpacks in the list.""" - binary_table = release_or_snapshot + "_binaries" - hwpack_table = release_or_snapshot + "_hwpacks" - - binary_list = self.execute_return_list( - 'select distinct platform, build from ' - + binary_table + ' where image == ?', (image,)) - - for hwpack in hwpacks: - hwpack_list = self.execute_return_list( - 'select distinct platform, build from ' - + hwpack_table + - ' where hardware == ?', (hwpack,)) - - for item in binary_list: - if item in hwpack_list: - return True - - return False - - def hardware_is_available_in_table(self, table, hardware): - return len(self.execute_return_list( - 'select url from ' + table + - ' where hardware == ?', (hardware,))) > 0 - - def hardware_is_available_for_platform(self, - hardware_list, - platform, - table="release_hwpacks"): - - for hardware in self.execute_return_list( - 'select distinct hardware from ' + table + - ' where platform == ?', (platform,)): - if hardware[0] in hardware_list: - return True - - return False - - def get_available_hwpacks_for_hardware_build_plaform(self, - hardware_list, - platform, - build): - hwpacks = [] - for hardware in self.execute_return_list( - 'select distinct hardware from release_hwpacks ' - 'where platform == ? and build == ?', - (platform, build)): - - if hardware[0] in hardware_list: - hwpacks.append(hardware[0]) - - return hwpacks - - def image_is_available_for_platform(self, - table, - platform, - image): - - return len(self.execute_return_list( - 'select * from ' + table + - ' where platform == ? and image == ?', - (platform, image))) > 0 - - def hardware_is_available_for_platform_build(self, - hardware_list, - platform, - build): - - for hardware in self.execute_return_list( - 'select distinct hardware from release_hwpacks ' - 'where platform == ? and build == ?', - (platform, build)): - - if hardware[0] in hardware_list: - return True - - return False - - def build_is_available_for_platform_image(self, - table, - platform, - image, - build): - - return len(self.execute_return_list( - 'select * from ' + table + - ' where platform == ? and image == ? and build == ?', - (platform, image, build))) > 0 - - def get_available_hwpacks_for_hardware_snapshot_build(self, - hardware_list, - platform, - date, - build): - hwpacks = [] - for hardware in self.execute_return_list( - 'select distinct hardware from snapshot_hwpacks ' - 'where platform == ? ' - 'and date == ? ' - 'and build == ?', - (platform, date, build)): - if hardware[0] in hardware_list: - hwpacks.append(hardware[0]) - - return hwpacks - - def get_binary_builds_on_day_from_db(self, image, date, hwpacks): - """Return a list of build numbers that are available on the given date - for the given image ensuring that the reqiested hardware is also - available.""" - - # Remove the dashes from the date we get in (YYYY-MM-DD --> YYYYMMDD) - date = re.sub('-', '', date) - - binaries = self.execute_return_list( - "select build from snapshot_binaries " - "where date == ? and image == ?", - (date, image)) - - if len(binaries) == 0: - # Try adding "linaro-" to the beginning of the image name - binaries = self.execute_return_list( - "select build from snapshot_binaries " - "where date == ? and image == ?", - (date, "linaro-" + image)) - - for hwpack in hwpacks: - builds = self.execute_return_list( - "select build from snapshot_hwpacks " - "where date == ? and hardware == ?", - (date, hwpack)) - - if len(builds): - # A hardware pack exists for that date, return a list of builds - # that exist for both hwpack and binary - ret = [] - for build in builds: - if build in binaries: - ret.append(build) - return ret - - # No hardware pack exists for the date requested, return empty table - return [] - - def get_next_prev_day_with_builds(self, image, date, hwpacks): - """Searches forwards and backwards in the database for a date with a - build. Will look 1 year in each direction. Returns a tuple of dates - in YYYYMMDD format.""" - - # Split ISO date into year, month, day - date_chunks = date.split('-') - - current_date = datetime.date(int(date_chunks[0]), - int(date_chunks[1]), - int(date_chunks[2])) - - one_day = datetime.timedelta(days=1) - - # In case of time zone issues we add 1 day to max_search_date - max_search_date = datetime.date.today() + one_day - - day_count = 0 - # Look in the future & past from the given date until we find a day - # with a build on it - - test_date = {'future': current_date, - 'past': current_date} - - for in_the in ["future", "past"]: - test_date[in_the] = None - - if in_the == "future": - loop_date_increment = one_day - else: - loop_date_increment = -one_day - - test_date[in_the] = current_date - - while test_date[in_the] <= max_search_date: - test_date[in_the] += loop_date_increment - - builds = [] - for hwpack in hwpacks: - builds = self.get_binary_builds_on_day_from_db( - image, - test_date[in_the].isoformat(), - [hwpack]) - if len(builds): - break - - if len(builds): - break - - day_count += 1 - if day_count > 365: - test_date[in_the] = None - break - - if test_date[in_the] and test_date[in_the] > max_search_date: - test_date[in_the] = None - - if test_date[in_the]: - test_date[in_the] = test_date[in_the].isoformat() - - return(test_date['future'], test_date['past']) - - def get_os_list_from(self, table): - return [item[0] for item in self.execute_return_list( - "select distinct image from " - + table)] - - def get_image_and_hwpack_urls(self, args): - """ We need a platform image and a hardware pack. These are specified - either as - Release: - Image: platform, image, build - HW Pack: platform, build, hardware - Snapshot: - Image: platform, image, date, build - HW Pack: platform, hardware, date, build""" - image_url = None - hwpack_url = None - old_image = args['image'] - build_bits = None - - if(args['release_or_snapshot'] == "snapshot"): - count = 0 - while(1): # Just so we can try several times... - if(args['build'] == "latest"): - # Start from today, add 1 day because - # get_next_prev_day_with_builds aways searches from the - # day passed to it, not including that day. This will give - # us the latest build dated today or earlier with both - # a hwpack and OS binary - date = (datetime.date.today() + - datetime.timedelta(days=1)).isoformat() - - _, past_date = self.get_next_prev_day_with_builds( - args['image'], - date, - [args['hwpack']]) - - builds = self.get_binary_builds_on_day_from_db( - args['image'], past_date, [args['hwpack']]) - - # Work out latest build - build = None - for b in builds: - if build == None or b[0] > build: - build = b[0] - - # We store dates in the DB in ISO format with the dashes - # missing - date = re.sub('-', '', past_date) - - image_url = self.get_url("snapshot_binaries", - [ - ("image", args['image']), - ("build", build), - ("date", date)]) - - hwpack_url = self.get_url("snapshot_hwpacks", - [ - ("hardware", args['hwpack']), - ("build", build), - ("date", date)]) - - else: - build_bits = args['build'].split(":") - assert re.search("^\d+$", build_bits[0]), ( - "Unexpected date format in build parameter " - + build_bits[0]) - assert re.search("^\d+$", build_bits[1]), ( - "Build number in shapshot build parameter should " - "be an integer " + build_bits[1]) - - image_url = self.get_url("snapshot_binaries", - [ - ("image", args['image']), - ("build", build_bits[1]), - ("date", build_bits[0])]) - - hwpack_url = self.get_url("snapshot_hwpacks", - [ - ("hardware", args['hwpack']), - ("build", build_bits[1]), - ("date", build_bits[0])]) - - if(count == 1): - break - count += 1 - - if(image_url == None): - # If we didn't match anything, try prepending "linaro-" - # to the image name - args['image'] = "linaro-" + args['image'] - else: - break # Got a URL, go. - - elif args['release_or_snapshot'] == "release": - image_url = self.get_url("release_binaries", - [ - ("image", args['image']), - ("build", args['build']), - ("platform", args['platform'])]) - - hwpack_url = self.get_url("release_hwpacks", - [ - ("hardware", args['hwpack']), - ("build", args['build']), - ("platform", args['platform'])]) - - else: - message = "Unexpected args['release_or_snapshot']: {0}".format( - args['release_or_snapshot']) - raise AssertionError(message) - - if(not image_url): - # If didn't get an image URL set up something so the return line - # doesn't crash - image_url = [None] - - if args['release_or_snapshot'] == "snapshot": - table = "snapshot_binaries" - else: - table = "release_binaries" - - if not (self.get_url(table, [("image", old_image)]) or - self.get_url(table, [("image", args['image'])])): - msg = "{0} does not match any OS indexed".format(old_image) - logging.error(msg) - - if args['release_or_snapshot'] == "snapshot": - if not self.get_url("snapshot_binaries", - [("date", build_bits[0])]): - msg = "Can not find requested OS on date {0}".format( - build_bits[0]) - logging.error(msg) - - elif not self.get_url("snapshot_binaries", - [("build", build_bits[1])]): - msg = "Can not find build {0} of requested OS".format( - build_bits[1]) - logging.error(msg) - - else: # Release... - if not self.get_url("release_binaries", - [("build", args['build'])]): - msg = "Can not find build {0} of selected OS".format( - args['build']) - logging.error(msg) - - if not self.get_url("release_binaries", - [("platform", args['platform'])]): - msg = "Can not find OS for platform {0}".format( - args['platform']) - logging.error(msg) - - if(not hwpack_url): - # If didn't get a hardware pack URL set up something so the return - # line doesn't crash - hwpack_url = [None] - - table = None - if args['release_or_snapshot'] == "snapshot": - table = "snapshot_hwpacks" - build = build_bits[1] - - elif args['release_or_snapshot'] == "release": - table = "release_hwpacks" - build = args['build'] - - if not self.get_url(table, [("hardware", args['hwpack'])]): - msg = "{0} does not match any hardware pack indexed".format( - args['hwpack']) - logging.error(msg) - - if args['release_or_snapshot'] == "snapshot": - if not self.get_url("snapshot_hwpacks", - [("date", build_bits[0])]): - msg = "Hardware pack does not exist on date {0}".format( - build_bits[0]) - logging.error(msg) - - else: # Release... - if not self.get_url("release_hwpacks", - [("platform", args['platform'])]): - msg = "Hardware pack unavailable for platform {0}".format( - args['platform']) - - if not self.get_url(table, [("build", build)]): - msg = ("Build {0} doesn't exist for any hardware " - "pack indexed".format(build)) - logging.error(msg) - - return(image_url[0], hwpack_url[0]) === removed file 'linaro_image_tools/fetch_image_settings.yaml' --- linaro_image_tools/fetch_image_settings.yaml 2012-01-25 10:00:56 +0000 +++ linaro_image_tools/fetch_image_settings.yaml 1970-01-01 00:00:00 +0000 @@ -1,177 +0,0 @@ -UI: - descriptions: - alip: ARM Internet Platform - choice: - build: The build (alpha-1 / beta / final) or date of snapshot (YYYYMMDD:build_number or 'latest') - to fetch. - hardware: The board you are building the image to run on. - hwpack: The hardware pack to use when building the image. - image: A distribuion image, e.g. headless. - platform: Which Linaro platform to build an image from. Specify 'snapshot' to - use a snapshot rather than a release. - developer: Developer Tools - o-developer: Developer Tools - graphics: Graphics - o-graphics: Graphics - multimedia: Multimedia - o-multimedia: Multimedia - nano: Nano - o-nano: Nano - ubuntu-desktop: 'LEB: Linaro Ubuntu Desktop' - o-ubuntu-desktop: 'LEB: Linaro Ubuntu Desktop' - ubuntu-desktop::long: Linux for humans on low power machines - ubuntu-desktop::release_note: Shiny! - o-alip: ARM Linux Internet Platform - alip: ARM Linux Internet Platform - server: Server - o-server: Server - hwpack-descriptions: - omap3: Basic support for Beagle boards - omap3-x11-base: Includes support for 3D acceleration - overo: Same kernel as omap3. - panda: Basic support for Panda from linux-linaro-3.0 - panda-x11-base: Basic support for Panda from linux-linaro-3.0 plus support for 3D acceleration - bsp-omap4: (DEPRECATED) - Used in Maverick release - lt-panda: TI Landing Team kernel (kernel-tilt.git) - lt-panda-x11-base-natty: TI Landing Team kernel (kernel-tilt.git) plus support for 3D acceleration - s5pv310: Support for this has been dropped in order to focus development efforts on Origen - lt-s5pv310: Support for this has been dropped in order to focus development efforts on Origen - - help: - latest snapshot: Latest Snapshot help text - release: Release help text - snapshot: Snapshot help text - translate: - cmdline short names: - build: b - hardware: w - platform: a - image: i - hwpack: p -choice: - build: {} - hardware: - beagle: Beagle Board - igep: IGEPv2 - panda: Panda Board - vexpress: Versatile Express - ux500: UX500 - efikamx: EFIKA MX Smarttop - efikasb: EFIKA MX Smartbook - mx51evk: i.MX51 - mx53loco: i.MX53 Quick Start - # I don't have a --dev setting to linaro-media-create for a u8500 yet. - # u8500: U8500 - overo: Overo - smdkv310: S5PV310 - origen: Origen - # UI may need some work to support snowball. Currently under investigation. - #snowball_sd: Snowball - - hwpack: - beagle: - - omap3 - - omap3-x11-base - - omap3-oneiric - - omap3-x11-base-oneiric - - igep: - - igep - - igep-oneiric - - panda: - - panda - - panda-x11-base - - bsp-omap4 - - lt-panda-x11-base-natty - - lt-panda - - panda-oneiric - - panda-x11-base-oneiric - - lt-panda-x11-base-oneiric - - lt-panda-oneiric - - vexpress: - - vexpress - - vexpress-oneiric - - lt-vexpress-a9-oneiric - - ux500: - - bsp-ux500 - - efikamx: - - efikamx - - efikamx-oneiric - - efikasb: - - efikamx - - efikamx-oneiric - - mx51evk: - - imx51 - - imx51-oneiric - - mx53loco: - - lt-mx5 - - lt-mx5-oneiric - - u8500: - - lt-u8500 - - overo: - - overo - - overo-oneiric - - smdkv310: - - lt-s5pv310 - - s5pv310 - - origen: - - lt-origen - - lt-origen-oneiric - - snowball_sd: - - lt-snowball - - lt-snowball-v2 - - lt-snowball-v3-oneiric - - lt-snowball-v2-oneiric - - image: - - alip - - developer - - headless - - ubuntu-desktop - platform: - linaro-m: - - final - linaro-n: - - alpha-1 - - alpha-2 - - alpha-3 - - beta - - rc - - final - - platform: - '11.05': - - final - '11.06': - - final - '11.07': - - final - '11.08': - - final - '11.09': - - final - '11.10': - - final - '11.11': - - final - '11.12': - - final - '12.01': - - final - -platform: linaro-n -repository: release - - === modified file 'linaro_image_tools/tests/__init__.py' --- linaro_image_tools/tests/__init__.py 2011-07-19 16:02:08 +0000 +++ linaro_image_tools/tests/__init__.py 2012-03-02 13:56:37 +0000 @@ -11,7 +11,6 @@ module_names = [ 'linaro_image_tools.tests.test_cmd_runner', 'linaro_image_tools.tests.test_utils', - 'linaro_image_tools.tests.test_fetch_image', ] # if pyflakes is installed and we're running from a bzr checkout... if has_command('pyflakes') and not os.path.isabs(__file__): === removed file 'linaro_image_tools/tests/test_fetch_image.py' --- linaro_image_tools/tests/test_fetch_image.py 2011-08-17 13:18:37 +0000 +++ linaro_image_tools/tests/test_fetch_image.py 1970-01-01 00:00:00 +0000 @@ -1,196 +0,0 @@ -# Copyright (C) 2010, 2011 Linaro -# -# Author: James Tunnicliffe -# -# This file is part of Linaro Image Tools. -# -# Linaro Image Tools is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Linaro Image Tools is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -import os -from linaro_image_tools.testing import TestCaseWithFixtures -import re -import linaro_image_tools.fetch_image as fetch_image - - -class TestURLLookupFunctions(TestCaseWithFixtures): - - def setUp(self): - # We use local files for testing, so get paths sorted. - this_file = os.path.abspath(__file__) - this_dir = os.path.dirname(this_file) - yaml_file_location = os.path.join(this_dir, "../" - "fetch_image_settings.yaml") - sample_db_location = os.path.join(this_dir, "test_server_index.sqlite") - self.file_handler = fetch_image.FileHandler() - self.config = fetch_image.FetchImageConfig() - self.config.settings["force_download"] = False - - # Load settings YAML, which defines the parameters we ask for and - # acceptable responses from the user - self.config.read_config(yaml_file_location) - - # Using the config we have, look up URLs to download data from in the - # server index - self.db = fetch_image.DB(sample_db_location) - - super(TestURLLookupFunctions, self).setUp() - - def test_url_lookup(self): - self.settings = self.config.settings - self.settings['release_or_snapshot'] = "snapshot" - - #--- Test first with a snapshot build lookup --- - # -- Fix a build date -- - # We only need to look up a single snapshot date. We just use the - # latest in the database (we could use today and search from it, but - # the database is just one that is checked in, so it could be old - # and db.get_next_prev_day_with_builds may give up before finding it). - date = self.db.execute_return_list( - "SELECT MAX(date) FROM snapshot_binaries")[0][0] - d = re.search("(\d{4})(\d{2})(\d{2})", date) - date = (d.group(1) + "-" + d.group(2) + "-" + d.group(3)) - - # -- Don't iterate through platforms for snapshot -- - - # -- Select hardware -- - for self.settings['hardware'] in ( - self.settings['choice']['hardware'].keys()): - - compatable_hwpacks = self.settings['choice']['hwpack'][ - self.settings['hardware']] - - future_date, past_date = self.db.get_next_prev_day_with_builds( - "linaro-alip", - date, - compatable_hwpacks) - - if past_date == None: - # Some hardware packs are not available in the snapshot repo, - # so just skip if they aren't - continue - - builds = self.db.get_binary_builds_on_day_from_db( - "linaro-alip", - past_date, - compatable_hwpacks) - - self.assertTrue(len(builds)) - # If the above assert fails, either the DB is empty, or - # db.get_binary_builds_on_day_from_db failed - - small_date = re.sub('-', '', past_date) - self.settings['build'] = small_date + ":" + "0" - - # -- Iterate through hardware packs -- - for self.settings['hwpack'] in compatable_hwpacks: - - # If hardware pack is available... - if(self.settings['hwpack'] - in self.db.get_hwpacks('snapshot_hwpacks')): - - # -- Iterate through images - os_list = self.db.get_os_list_from('snapshot_binaries') - - for self.settings['image'] in os_list: - if re.search('old', self.settings['image']): - # Directories with old in the name are of no - # interest to us - continue - - # -- Check build which matches these parameters - # (builds that don't match are excluded in UI) -- - if( len(self.db.execute_return_list( - 'select * from snapshot_hwpacks ' - 'where hardware == ? ' - 'and date == ? ' - 'and build == ?', - (self.settings['hwpack'], - small_date, - "0"))) - and len(self.db.execute_return_list( - 'select * from snapshot_binaries ' - 'where image == ? ' - 'and date == ? ' - 'and build == ?', - (self.settings['image'], - small_date, - "0")))): - - # - Run the function under test! - - image_url, hwpack_url = ( - self.db.get_image_and_hwpack_urls(self.settings)) - - self.assertTrue(image_url) - self.assertTrue(hwpack_url) - - #--- Now test release build lookup --- - self.settings['release_or_snapshot'] = "release" - # -- Select hardware -- - for self.settings['hardware'] in ( - self.settings['choice']['hardware'].keys()): - compatable_hwpacks = ( - self.settings['choice']['hwpack'][self.settings['hardware']]) - - # -- Iterate through hardware packs -- - for self.settings['hwpack'] in compatable_hwpacks: - - # If hardware pack is available... - if(self.settings['hwpack'] - in self.db.get_hwpacks('release_hwpacks')): - - # -- Iterate through images - os_list = self.db.get_os_list_from('release_binaries') - - for self.settings['image'] in os_list: - if re.search('old', self.settings['image']): - # Directories with old in the name are of no - # interest to us - continue - - for platform, ignore in ( - self.settings['choice']['platform'].items()): - self.settings['platform'] = platform - - # -- Iterate through available builds -- - builds = self.db.get_builds( - self.settings['platform'], - self.settings['image']) - - for build in builds: - self.settings['build'] = build - - # -- Check build which matches these parameters - #(builds that don't match are excluded in UI)-- - if( len(self.db.execute_return_list( - 'select * from release_hwpacks ' - 'where platform == ? ' - 'and hardware == ? ' - 'and build == ?', - (self.settings['platform'], - self.settings['hwpack'], - self.settings['build']))) - and len(self.db.execute_return_list( - 'select * from release_binaries ' - 'where platform == ? ' - 'and image == ? ' - 'and build == ?', - (self.settings['platform'], - self.settings['image'], - self.settings['build'])))): - - # - Run the function under test! - - image_url, hwpack_url = ( - self.db.get_image_and_hwpack_urls( - self.settings)) - self.assertTrue(image_url) - self.assertTrue(hwpack_url) === removed file 'linaro_image_tools/tests/test_server_index.sqlite' Binary files linaro_image_tools/tests/test_server_index.sqlite 2011-08-11 16:18:34 +0000 and linaro_image_tools/tests/test_server_index.sqlite 1970-01-01 00:00:00 +0000 differ