Script to update Qubes template in parallel
If you think Security(TM) and have QubesOS on your system as I do, you might be easily managing 15+ templates, some of which are almost never used (such as the package-maintained ones, of which you probably have only customized the clones).
Updating them can be a pain, though: it's a manual process of starting N templates, depending on your machine's capabilities and the already-running VMs, and normally pressing y
and <Return>
to confirm the updates and later on turn off the machines.
There must be a better way!
The idea of a script to automatically update templates in dom0 isn't new and isn't original either, but most of the code out there does it sequentially, so the manual process above might still be faster.
The code below, instead, uses multithreading to run N jobs in parallel. In the code below, N=int(math.pi)
or "3" as most people call it.
The updater will sort the available templates based on the number of images using it and whether any of these images is running at the moment, and will update the most used templates first. Every template will at least check if any package can be updated, not only those already marked as upgradeable!
Please note: there are two versions of the script below, one for Qubes 3.2 and one for 4.0+ (which hopefully won't need any rewriting for future versions).
Warnings
- every template will install the updates accepting the defaults (as an example, not overwriting an older package's configuration files) and then clean the package manager's cache.
- Supported templates are fedora and debian-based ones.
- I strongly recommend against taking somebody's code and installing it in dom0, without being able to understand whether it could be dangerous.
- The standard warranty exclusion clause applies: I shall not be held liable for any damages that might happen by using my code.
Usage
First of all, you need to copy the appropriate script below into your dom0, as specified here.
Secondly, install it in a folder of your choice and make it executable.
Give it a spin running it with the test
parameter to execute a few unit tests. If they succeed, run it with no parameter to start the updater.
dom0-update-all (QubesOS 4.0+)
#!/usr/bin/python2 # # Copyright (c) Andrea Micheloni 2018 # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import sys, os, time, subprocess, unittest from Queue import Queue from threading import Thread, current_thread class QubesVm: def __init__(self, name, template, is_running): self.name = name self.template = template self._is_running = is_running self._is_updated = False def is_running(self): return self._is_running def is_template(self): return (self.template is None) def is_updated(self): return self._is_updated def run(self, command, autostart = False, verbose = True, user = None, notify_function = None, passio = False, localcmd = None, gui = True, filter_esc = True ): qvm_command = ['qvm-run'] if autostart: qvm_command += ['--autostart'] else: qvm_command += ['--no-autostart'] if not verbose: qvm_command += ['--quiet'] if user: qvm_command += ['--user=%s' % user] if gui: qvm_command += ['--gui'] else: qvm_command += ['--no-gui'] qvm_command += [self.name, command] subprocess.call(qvm_command) class QubesHandler: def __init__(self): raw_result = self._get_raw_qubes_string() self._qubes = {} for qube_string in raw_result.split("\n"): if len(qube_string)>0: qube_properties = qube_string.split("|") qube_class = qube_properties[0] qube_name = qube_properties[1] qube_template = None; if qube_properties[2] != '-': qube_template = qube_properties[2] qube_running = True if qube_properties[3] == 'Halted': qube_running = False if qube_class != "AdminVM": new_qube = self._create_qube(qube_name, qube_template, qube_running) self._qubes[qube_name] = new_qube def _create_qube(self, qube_name, qube_template, qube_running): return QubesVm(qube_name, qube_template, qube_running) def _get_raw_qubes_string(self): return subprocess.check_output(['qvm-ls', '--fields', 'CLASS,NAME,TEMPLATE,STATE', '--raw-data']) def _get_template_dictionary(self, template_vm, rank): return {'name': template_vm.name, 'qvm': template_vm, 'rank': rank} def _get_ranked_templates(self): templates_ranked = {} for vm in self._qubes.values(): if vm.is_template(): if not vm.name in templates_ranked: templates_ranked[vm.name] = self._get_template_dictionary(vm, 0) else: if vm.template: if vm.is_running(): rank = 3 else: rank = 1 if not vm.template in templates_ranked: templates_ranked[vm.template] = \ self._get_template_dictionary(self._qubes[vm.template], rank) else: templates_ranked[vm.template]['rank'] += rank return templates_ranked def get_templates(self): templates_ranked = self._get_ranked_templates() return [templates_ranked[template] for template in sorted(templates_ranked, key=lambda name: templates_ranked[name]['rank'], reverse = True)] def update_template(self, template, print_function): print_function('Updating template...') try: template['qvm'].run('touch /tmp/update-all; chmod go+r /tmp/update-all; { if [ -f /usr/bin/snap ' + ']; then /usr/bin/snap refresh || exit; fi; if [ -f /usr/bin/apt-get ' + ']; then /usr/bin/apt-get update && /usr/bin/apt-get -y upgrade </dev/null ' + '&& /usr/bin/apt-get -y autoremove && /usr/bin/apt-get autoclean && ' + '/sbin/poweroff; else /usr/bin/dnf -y upgrade </dev/null && /usr/bin/dnf ' + 'clean all && /usr/sbin/poweroff; fi; } 2>&1 | tee -a /tmp/update-all', autostart = True, verbose = False, user = 'root', notify_function = None, passio = False, localcmd = None, gui = False, filter_esc = True ) print_function('Update complete.') except: print_function('Update failed.', error=True) raise class UpdateWorker(Thread): def __init__(self, name, queue, qubes_handler): Thread.__init__(self) self.daemon = True self.name = name self._queue = queue self._qubes_handler = qubes_handler self._template = None def _print_function(self, text, error=False): if error: prefix = 'ERR' else: prefix = 'INF' prefix += ' ['+self.name+' '+self._template['name']+'] ' print prefix + text def run(self): while True: self._template = {'name':'None'} self._template = self._queue.get() try: self._qubes_handler.update_template(self._template, self._print_function) self._template = {'name':'None'} finally: self._template = {'name':'None'} self._queue.task_done() self._template = {'name':'None'} class UpdateHandler: def __init__(self): self._queue = Queue() def update_templates(self, thread_number): qubes_handler = self._get_qubes_handler() print 'INF Updating templates: ', [template['name'] for template in qubes_handler.get_templates()] workers = [self._get_worker('w'+str(number), qubes_handler) for number in range(thread_number)] for worker in workers: worker.start() for template in qubes_handler.get_templates(): self._queue.put(template, block = True) self._queue.join() def _get_worker(self, name, qubes_handler): return UpdateWorker(name, self._queue, qubes_handler) def _get_qubes_handler(self): return QubesHandler() class MockQubesVm(QubesVm): def __init__(self, name, template, is_running): QubesVm.__init__(self, name, template, is_running) self.name = name self.template = template self._is_running = is_running self._is_updated = False def is_running(self): return self._is_running def is_template(self): return (self.template is None) def is_updated(self): return self._is_updated def __repr__(self): return "MockQubesVm(%s,%s,%s)" % (self.name, self.template, self._is_running) def __eq__(self, other): return repr(self) == repr(other) def run(self, command, autostart = False, verbose = True, user = None, notify_function = None, passio = False, localcmd = None, gui = True, filter_esc = True ): if (autostart): self._is_running = True if (user == 'root' and self._is_running): self._is_updated = True self._is_running = False class MockQubesHandler(QubesHandler, unittest.TestCase): def __init__(self, *args, **kwargs): if (args): unittest.TestCase.__init__(self, *args, **kwargs) self.maxDiff=None self._template_f = MockQubesVm('fedora-24', None, False) self._template_fm = MockQubesVm('fedora-24-minimal', None, True) self._template_d = MockQubesVm('debian-8', None, False) self._template_ws = MockQubesVm('whonix-ws', None, True) self._template_wg = MockQubesVm('whonix-wg', None, True) self._ranked_f = {'name': 'fedora-24', 'qvm': self._template_f, 'rank': 9} self._ranked_fm = {'name': 'fedora-24-minimal', 'qvm': self._template_fm, 'rank': 4} self._ranked_d = {'name': 'debian-8', 'qvm': self._template_d, 'rank': 5} self._ranked_ws = {'name': 'whonix-ws', 'qvm': self._template_ws, 'rank': 1} self._ranked_wg = {'name': 'whonix-wg', 'qvm': self._template_wg, 'rank': 3} self._expected_ranked = { 'fedora-24': self._ranked_f, 'fedora-24-minimal': self._ranked_fm, 'debian-8': self._ranked_d, 'whonix-ws': self._ranked_ws, 'whonix-wg': self._ranked_wg } self._expected = [ self._ranked_f, self._ranked_d, self._ranked_fm, self._ranked_wg, self._ranked_ws] QubesHandler.__init__(self) def _create_qube(self, qube_name, qube_template, qube_running): return MockQubesVm(qube_name, qube_template, qube_running) def _get_raw_qubes_string(self): return 'AppVM|sys-net|fedora-24|Running\n' + \ 'AppVM|sys-whonix|whonix-wg|Running\n' + \ 'AppVM|personal|debian-8|Halted\n' + \ 'AppVM|sys-firewall|fedora-24-minimal|Running\n' + \ 'TemplateVM|debian-8|-|Halted\n' + \ 'AdminVM|dom0|-|Running\n' + \ 'AppVM|gnupg|fedora-24-minimal|Halted\n' + \ 'AppVM|disp5|fedora-24|Suspended\n' + \ 'AppVM|anonymous|whonix-ws|Halted\n' + \ 'TemplateVM|fedora-24-minimal|-|True\n' + \ 'TemplateVM|whonix-ws|-|Running\n' + \ 'TemplateVM|whonix-wg|-|Running\n' + \ 'AppVM|work|debian-8|Suspended\n' + \ 'AppVM|development|debian-8|Halted\n' + \ 'AppVM|disp2|fedora-24|Running\n' + \ 'TemplateVM|fedora-24|-|Halted\n' def get_updated_status(self): return [self._qubes[self._template_f.name].is_updated(), self._qubes[self._template_fm.name].is_updated(), self._qubes[self._template_d.name].is_updated(), self._qubes[self._template_ws.name].is_updated(), self._qubes[self._template_wg.name].is_updated()] def test_get_template_dictionary(self): self.assert_get_template_dictionary(self._template_f) self.assert_get_template_dictionary(self._template_fm) self.assert_get_template_dictionary(self._template_d) self.assert_get_template_dictionary(self._template_ws) self.assert_get_template_dictionary(self._template_wg) def assert_get_template_dictionary(self, template): found = self._get_template_dictionary(template, len(template.name)) self.assertEquals(3, len(found.keys())) self.assertEquals(template, found['qvm']) self.assertEquals(len(template.name), found['rank']) self.assertEquals(template.name, found['name']) def test_get_template_dictionary_custom(self): template = MockQubesVm('test-template', None, False) found = self._get_template_dictionary(template, 0) self.assertEquals(3, len(found.keys())) self.assertEquals(template, found['qvm']) self.assertEquals(0, found['rank']) self.assertEquals('test-template', found['name']) def test_get_templates(self): found = self.get_templates() self.assertEquals(self._expected, found) def test_get_ranked_templates(self): found = self._get_ranked_templates() self.assertEquals(self._expected_ranked, found) class MockUpdateWorker(UpdateWorker): def _print_function(self, text, error=False): pass class MockUpdateHandler(UpdateHandler, unittest.TestCase): def __init__(self, *args, **kwargs): UpdateHandler.__init__(self) unittest.TestCase.__init__(self, *args, **kwargs) self.generate_qubes_handler() def _get_worker(self, name, qubes_handler): return MockUpdateWorker(name, self._queue, qubes_handler) def _get_qubes_handler(self): return self._qubes_handler def generate_qubes_handler(self): self._qubes_handler = MockQubesHandler() def test_update_all(self): for number in range(1,10): self.assert_update_all(number) self.generate_qubes_handler() def assert_update_all(self, thread_number): self.assertEquals([False]*5 , self._qubes_handler.get_updated_status()) self.update_templates(thread_number) self.assertEquals([True]*5 , self._qubes_handler.get_updated_status()) if __name__ == '__main__': if len(sys.argv) == 1: UpdateHandler().update_templates(3) elif len(sys.argv) >2 or sys.argv[1] != 'test': print "ERR Usage: dom0-update-all [test]" else: unittest.main(argv=sys.argv[:1])
dom0-update-all (QubesOS 3.2)
#!/usr/bin/python2 # # Copyright (c) Andrea Micheloni 2017 # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import sys, os, time, unittest from Queue import Queue from threading import Thread, current_thread class QubesHandler: def __init__(self): from qubes.qubes import QubesVmCollection,QubesException self._qvm_collection = QubesVmCollection() try: self._qvm_collection.lock_db_for_reading() self._qvm_collection.load() finally: self._qvm_collection.unlock_db() def _get_template_dictionary(self, template_vm, rank): return {'name': template_vm.name, 'qvm': template_vm, 'rank': rank} def _get_ranked_templates(self): templates_ranked = {} for vm in self._qvm_collection.values(): if vm.is_template(): if not vm.name in templates_ranked: templates_ranked[vm.name] = self._get_template_dictionary(vm, 0) else: if vm.template: if vm.is_running(): rank = 3 else: rank = 1 if not vm.template.name in templates_ranked: templates_ranked[vm.template.name] = \ self._get_template_dictionary(vm.template, rank) else: templates_ranked[vm.template.name]['rank'] += rank return templates_ranked def get_templates(self): templates_ranked = self._get_ranked_templates() return [templates_ranked[template] for template in sorted(templates_ranked, key=lambda name: templates_ranked[name]['rank'], reverse = True)] def _wait_ten_seconds(self): time.sleep(10) def update_template(self, template, print_function): print_function('Updating template...') try: template['qvm'].run('touch /tmp/update-all; chmod go+r /tmp/update-all; if [ -f /usr/bin/apt-get ' + ']; then /usr/bin/apt-get update && /usr/bin/apt-get -y upgrade </dev/null ' + '&& /usr/bin/apt-get -y autoremove && /usr/bin/apt-get autoclean && ' + '/sbin/poweroff; else /usr/bin/dnf -y upgrade </dev/null && /usr/bin/dnf ' + 'clean all && /usr/sbin/poweroff; fi 2>&1 | tee -a /tmp/update-all', autostart = True, verbose = False, user = 'root', notify_function = None, passio = False, localcmd = None, gui = False, filter_esc = True ) print_function('Command running.') self._wait_ten_seconds() seconds_waited = 10 while (template['qvm'].is_running()): self._wait_ten_seconds() seconds_waited += 10 if (seconds_waited>=120 and seconds_waited % 60 == 0): if (seconds_waited < 600): print_function(str(int(seconds_waited/60)) + ' minutes elapsed...') else: print_function(str(int(seconds_waited/60)) + ' minutes elapsed, try: qvm-run --pass-io ' + template['name'] + ' "tail -f /tmp/update-all"') print_function('Update complete.') except: print_function('Update failed.', error=True) raise class UpdateWorker(Thread): def __init__(self, name, queue, qubes_handler): Thread.__init__(self) self.daemon = True self.name = name self._queue = queue self._qubes_handler = qubes_handler self._template = None def _print_function(self, text, error=False): if error: prefix = 'ERR' else: prefix = 'INF' prefix += ' ['+self.name+' '+self._template['name']+'] ' print prefix + text def run(self): while True: self._template = {'name':'None'} self._template = self._queue.get() try: self._qubes_handler.update_template(self._template, self._print_function) self._template = {'name':'None'} finally: self._template = {'name':'None'} self._queue.task_done() self._template = {'name':'None'} class UpdateHandler: def __init__(self): self._queue = Queue() def update_templates(self, thread_number): qubes_handler = self._get_qubes_handler() print 'INF Updating templates: ', [template['name'] for template in qubes_handler.get_templates()] workers = [self._get_worker('w'+str(number), qubes_handler) for number in range(thread_number)] for worker in workers: worker.start() for template in qubes_handler.get_templates(): self._queue.put(template, block = True) self._queue.join() def _get_worker(self, name, qubes_handler): return UpdateWorker(name, self._queue, qubes_handler) def _get_qubes_handler(self): return QubesHandler() def has_qubes_qubes_module(): imp = __import__('imp') try: qubes_info = imp.find_module('qubes') qubes = imp.load_module('qubes', *qubes_info) imp.find_module('qubes', qubes.__path__) return True except ImportError: return False class MockQubesVm: def __init__(self, name, template, is_running): self.name = name self.template = template self._is_running = is_running self._is_updated = False def is_running(self): return self._is_running def is_template(self): return (self.template is None) def is_updated(self): return self._is_updated def run(self, command, autostart = False, verbose = True, user = None, notify_function = None, passio = False, localcmd = None, gui = True, filter_esc = True ): if (autostart): self._is_running = True if (user == 'root' and self._is_running): self._is_updated = True self._is_running = False class MockQubesCollection: def __init__(self, values): self._values = values def values(self): return self._values class MockQubesHandler(QubesHandler, unittest.TestCase): def __init__(self, *args, **kwargs): if (args): unittest.TestCase.__init__(self, *args, **kwargs) self.maxDiff=None self._template_f = MockQubesVm('fedora-24', None, False) self._template_fm = MockQubesVm('fedora-24-minimal', None, True) self._template_d = MockQubesVm('debian-8', None, False) self._template_ws = MockQubesVm('whonix-ws', None, True) self._template_wg = MockQubesVm('whonix-wg', None, True) images = [ MockQubesVm('sys-net', self._template_f, True), self._template_wg, MockQubesVm('personal', self._template_d, False), MockQubesVm('sys-firewall', self._template_fm, True), self._template_d, MockQubesVm('sys-whonix', self._template_wg, True), MockQubesVm('gnupg', self._template_fm, False), MockQubesVm('disp5', self._template_f, True), MockQubesVm('anonymous', self._template_ws, False), self._template_fm, self._template_ws, MockQubesVm('work', self._template_d, True), MockQubesVm('development', self._template_d, False), MockQubesVm('disp2', self._template_f, True), self._template_f ] self._ranked_f = {'name': 'fedora-24', 'qvm': self._template_f, 'rank': 9} self._ranked_fm = {'name': 'fedora-24-minimal', 'qvm': self._template_fm, 'rank': 4} self._ranked_d = {'name': 'debian-8', 'qvm': self._template_d, 'rank': 5} self._ranked_ws = {'name': 'whonix-ws', 'qvm': self._template_ws, 'rank': 1} self._ranked_wg = {'name': 'whonix-wg', 'qvm': self._template_wg, 'rank': 3} self._expected_ranked = { 'fedora-24': self._ranked_f, 'fedora-24-minimal': self._ranked_fm, 'debian-8': self._ranked_d, 'whonix-ws': self._ranked_ws, 'whonix-wg': self._ranked_wg } self._expected = [ self._ranked_f, self._ranked_d, self._ranked_fm, self._ranked_wg, self._ranked_ws] self._qvm_collection = MockQubesCollection(images) def _wait_ten_seconds(self): pass def get_updated_status(self): return [self._template_f.is_updated(), self._template_fm.is_updated(), self._template_d.is_updated(), self._template_ws.is_updated(), self._template_wg.is_updated()] def test_get_template_dictionary(self): self.assert_get_template_dictionary(self._template_f) self.assert_get_template_dictionary(self._template_fm) self.assert_get_template_dictionary(self._template_d) self.assert_get_template_dictionary(self._template_ws) self.assert_get_template_dictionary(self._template_wg) def assert_get_template_dictionary(self, template): found = self._get_template_dictionary(template, len(template.name)) self.assertEquals(3, len(found.keys())) self.assertEquals(template, found['qvm']) self.assertEquals(len(template.name), found['rank']) self.assertEquals(template.name, found['name']) def test_get_template_dictionary_custom(self): template = MockQubesVm('test-template', None, False) found = self._get_template_dictionary(template, 0) self.assertEquals(3, len(found.keys())) self.assertEquals(template, found['qvm']) self.assertEquals(0, found['rank']) self.assertEquals('test-template', found['name']) def test_get_templates(self): found = self.get_templates() self.assertEquals(self._expected, found) def test_get_ranked_templates(self): found = self._get_ranked_templates() self.assertEquals(self._expected_ranked, found) @unittest.skipUnless(has_qubes_qubes_module(), "Requires importing qubes.qubes") def test_dom0_load_actual_vms(self): handler = QubesHandler() self.assertTrue(len(handler._qvm_collection)>0) class MockUpdateWorker(UpdateWorker): def _print_function(self, text, error=False): pass class MockUpdateHandler(UpdateHandler, unittest.TestCase): def __init__(self, *args, **kwargs): UpdateHandler.__init__(self) unittest.TestCase.__init__(self, *args, **kwargs) self.generate_qubes_handler() def _get_worker(self, name, qubes_handler): return MockUpdateWorker(name, self._queue, qubes_handler) def _get_qubes_handler(self): return self._qubes_handler def generate_qubes_handler(self): self._qubes_handler = MockQubesHandler() def test_update_all(self): for number in range(1,10): self.assert_update_all(number) self.generate_qubes_handler() def assert_update_all(self, thread_number): self.assertEquals([False]*5 , self._qubes_handler.get_updated_status()) self.update_templates(thread_number) self.assertEquals([True]*5 , self._qubes_handler.get_updated_status()) if __name__ == '__main__': if len(sys.argv) == 1: UpdateHandler().update_templates(3) elif len(sys.argv) >2 or sys.argv[1] != 'test': print "ERR Usage: dom0-update-all [test]" else: unittest.main(argv=sys.argv[:1])