Script to update Qubes template in parallel
If you think Security(TM) and have QubesOS on your system as I do, you might be easily managing 15+ templates, some of which are almost never used (such as the package-maintained ones, of which you probably have only customized the clones).
Updating them can be a pain, though: it's a manual process of starting N templates, depending on your machine's capabilities and the already-running VMs, and normally pressing y and <Return> to confirm the updates and later on turn off the machines.
There must be a better way!
The idea of a script to automatically update templates in dom0 isn't new and isn't original either, but most of the code out there does it sequentially, so the manual process above might still be faster.
The code below, instead, uses multithreading to run N jobs in parallel. In the code below, N=int(math.pi) or "3" as most people call it.
The updater will sort the available templates based on the number of images using it and whether any of these images is running at the moment, and will update the most used templates first. Every template will at least check if any package can be updated, not only those already marked as upgradeable!
Please note: there are two versions of the script below, one for Qubes 3.2 and one for 4.0+ (which hopefully won't need any rewriting for future versions).
Warnings
- every template will install the updates accepting the defaults (as an example, not overwriting an older package's configuration files) and then clean the package manager's cache.
- Supported templates are fedora and debian-based ones.
- I strongly recommend against taking somebody's code and installing it in dom0, without being able to understand whether it could be dangerous.
- The standard warranty exclusion clause applies: I shall not be held liable for any damages that might happen by using my code.
Usage
First of all, you need to copy the appropriate script below into your dom0, as specified here.
Secondly, install it in a folder of your choice and make it executable.
Give it a spin running it with the test parameter to execute a few unit tests. If they succeed, run it with no parameter to start the updater.
dom0-update-all (QubesOS 4.0+)
#!/usr/bin/python2
#
# Copyright (c) Andrea Micheloni 2018
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys, os, time, subprocess, unittest
from Queue import Queue
from threading import Thread, current_thread
class QubesVm:
def __init__(self, name, template, is_running):
self.name = name
self.template = template
self._is_running = is_running
self._is_updated = False
def is_running(self):
return self._is_running
def is_template(self):
return (self.template is None)
def is_updated(self):
return self._is_updated
def run(self, command, autostart = False, verbose = True, user = None, notify_function = None,
passio = False, localcmd = None, gui = True, filter_esc = True ):
qvm_command = ['qvm-run']
if autostart:
qvm_command += ['--autostart']
else:
qvm_command += ['--no-autostart']
if not verbose:
qvm_command += ['--quiet']
if user:
qvm_command += ['--user=%s' % user]
if gui:
qvm_command += ['--gui']
else:
qvm_command += ['--no-gui']
qvm_command += [self.name, command]
subprocess.call(qvm_command)
class QubesHandler:
def __init__(self):
raw_result = self._get_raw_qubes_string()
self._qubes = {}
for qube_string in raw_result.split("\n"):
if len(qube_string)>0:
qube_properties = qube_string.split("|")
qube_class = qube_properties[0]
qube_name = qube_properties[1]
qube_template = None;
if qube_properties[2] != '-':
qube_template = qube_properties[2]
qube_running = True
if qube_properties[3] == 'Halted':
qube_running = False
if qube_class != "AdminVM":
new_qube = self._create_qube(qube_name, qube_template, qube_running)
self._qubes[qube_name] = new_qube
def _create_qube(self, qube_name, qube_template, qube_running):
return QubesVm(qube_name, qube_template, qube_running)
def _get_raw_qubes_string(self):
return subprocess.check_output(['qvm-ls', '--fields', 'CLASS,NAME,TEMPLATE,STATE', '--raw-data'])
def _get_template_dictionary(self, template_vm, rank):
return {'name': template_vm.name,
'qvm': template_vm,
'rank': rank}
def _get_ranked_templates(self):
templates_ranked = {}
for vm in self._qubes.values():
if vm.is_template():
if not vm.name in templates_ranked:
templates_ranked[vm.name] = self._get_template_dictionary(vm, 0)
else:
if vm.template:
if vm.is_running():
rank = 3
else:
rank = 1
if not vm.template in templates_ranked:
templates_ranked[vm.template] = \
self._get_template_dictionary(self._qubes[vm.template], rank)
else:
templates_ranked[vm.template]['rank'] += rank
return templates_ranked
def get_templates(self):
templates_ranked = self._get_ranked_templates()
return [templates_ranked[template] for template in sorted(templates_ranked,
key=lambda name: templates_ranked[name]['rank'],
reverse = True)]
def update_template(self, template, print_function):
print_function('Updating template...')
try:
template['qvm'].run('touch /tmp/update-all; chmod go+r /tmp/update-all; { if [ -f /usr/bin/snap ' +
']; then /usr/bin/snap refresh || exit; fi; if [ -f /usr/bin/apt-get ' +
']; then /usr/bin/apt-get update && /usr/bin/apt-get -y upgrade </dev/null ' +
'&& /usr/bin/apt-get -y autoremove && /usr/bin/apt-get autoclean && ' +
'/sbin/poweroff; else /usr/bin/dnf -y upgrade </dev/null && /usr/bin/dnf ' +
'clean all && /usr/sbin/poweroff; fi; } 2>&1 | tee -a /tmp/update-all',
autostart = True, verbose = False, user = 'root', notify_function = None,
passio = False, localcmd = None, gui = False, filter_esc = True )
print_function('Update complete.')
except:
print_function('Update failed.', error=True)
raise
class UpdateWorker(Thread):
def __init__(self, name, queue, qubes_handler):
Thread.__init__(self)
self.daemon = True
self.name = name
self._queue = queue
self._qubes_handler = qubes_handler
self._template = None
def _print_function(self, text, error=False):
if error:
prefix = 'ERR'
else:
prefix = 'INF'
prefix += ' ['+self.name+' '+self._template['name']+'] '
print prefix + text
def run(self):
while True:
self._template = {'name':'None'}
self._template = self._queue.get()
try:
self._qubes_handler.update_template(self._template, self._print_function)
self._template = {'name':'None'}
finally:
self._template = {'name':'None'}
self._queue.task_done()
self._template = {'name':'None'}
class UpdateHandler:
def __init__(self):
self._queue = Queue()
def update_templates(self, thread_number):
qubes_handler = self._get_qubes_handler()
print 'INF Updating templates: ', [template['name'] for template in qubes_handler.get_templates()]
workers = [self._get_worker('w'+str(number), qubes_handler) for number in range(thread_number)]
for worker in workers:
worker.start()
for template in qubes_handler.get_templates():
self._queue.put(template, block = True)
self._queue.join()
def _get_worker(self, name, qubes_handler):
return UpdateWorker(name, self._queue, qubes_handler)
def _get_qubes_handler(self):
return QubesHandler()
class MockQubesVm(QubesVm):
def __init__(self, name, template, is_running):
QubesVm.__init__(self, name, template, is_running)
self.name = name
self.template = template
self._is_running = is_running
self._is_updated = False
def is_running(self):
return self._is_running
def is_template(self):
return (self.template is None)
def is_updated(self):
return self._is_updated
def __repr__(self):
return "MockQubesVm(%s,%s,%s)" % (self.name, self.template, self._is_running)
def __eq__(self, other):
return repr(self) == repr(other)
def run(self, command, autostart = False, verbose = True, user = None, notify_function = None,
passio = False, localcmd = None, gui = True, filter_esc = True ):
if (autostart):
self._is_running = True
if (user == 'root' and self._is_running):
self._is_updated = True
self._is_running = False
class MockQubesHandler(QubesHandler, unittest.TestCase):
def __init__(self, *args, **kwargs):
if (args):
unittest.TestCase.__init__(self, *args, **kwargs)
self.maxDiff=None
self._template_f = MockQubesVm('fedora-24', None, False)
self._template_fm = MockQubesVm('fedora-24-minimal', None, True)
self._template_d = MockQubesVm('debian-8', None, False)
self._template_ws = MockQubesVm('whonix-ws', None, True)
self._template_wg = MockQubesVm('whonix-wg', None, True)
self._ranked_f = {'name': 'fedora-24', 'qvm': self._template_f, 'rank': 9}
self._ranked_fm = {'name': 'fedora-24-minimal', 'qvm': self._template_fm, 'rank': 4}
self._ranked_d = {'name': 'debian-8', 'qvm': self._template_d, 'rank': 5}
self._ranked_ws = {'name': 'whonix-ws', 'qvm': self._template_ws, 'rank': 1}
self._ranked_wg = {'name': 'whonix-wg', 'qvm': self._template_wg, 'rank': 3}
self._expected_ranked = {
'fedora-24': self._ranked_f,
'fedora-24-minimal': self._ranked_fm,
'debian-8': self._ranked_d,
'whonix-ws': self._ranked_ws,
'whonix-wg': self._ranked_wg }
self._expected = [ self._ranked_f, self._ranked_d, self._ranked_fm,
self._ranked_wg, self._ranked_ws]
QubesHandler.__init__(self)
def _create_qube(self, qube_name, qube_template, qube_running):
return MockQubesVm(qube_name, qube_template, qube_running)
def _get_raw_qubes_string(self):
return 'AppVM|sys-net|fedora-24|Running\n' + \
'AppVM|sys-whonix|whonix-wg|Running\n' + \
'AppVM|personal|debian-8|Halted\n' + \
'AppVM|sys-firewall|fedora-24-minimal|Running\n' + \
'TemplateVM|debian-8|-|Halted\n' + \
'AdminVM|dom0|-|Running\n' + \
'AppVM|gnupg|fedora-24-minimal|Halted\n' + \
'AppVM|disp5|fedora-24|Suspended\n' + \
'AppVM|anonymous|whonix-ws|Halted\n' + \
'TemplateVM|fedora-24-minimal|-|True\n' + \
'TemplateVM|whonix-ws|-|Running\n' + \
'TemplateVM|whonix-wg|-|Running\n' + \
'AppVM|work|debian-8|Suspended\n' + \
'AppVM|development|debian-8|Halted\n' + \
'AppVM|disp2|fedora-24|Running\n' + \
'TemplateVM|fedora-24|-|Halted\n'
def get_updated_status(self):
return [self._qubes[self._template_f.name].is_updated(),
self._qubes[self._template_fm.name].is_updated(),
self._qubes[self._template_d.name].is_updated(),
self._qubes[self._template_ws.name].is_updated(),
self._qubes[self._template_wg.name].is_updated()]
def test_get_template_dictionary(self):
self.assert_get_template_dictionary(self._template_f)
self.assert_get_template_dictionary(self._template_fm)
self.assert_get_template_dictionary(self._template_d)
self.assert_get_template_dictionary(self._template_ws)
self.assert_get_template_dictionary(self._template_wg)
def assert_get_template_dictionary(self, template):
found = self._get_template_dictionary(template, len(template.name))
self.assertEquals(3, len(found.keys()))
self.assertEquals(template, found['qvm'])
self.assertEquals(len(template.name), found['rank'])
self.assertEquals(template.name, found['name'])
def test_get_template_dictionary_custom(self):
template = MockQubesVm('test-template', None, False)
found = self._get_template_dictionary(template, 0)
self.assertEquals(3, len(found.keys()))
self.assertEquals(template, found['qvm'])
self.assertEquals(0, found['rank'])
self.assertEquals('test-template', found['name'])
def test_get_templates(self):
found = self.get_templates()
self.assertEquals(self._expected, found)
def test_get_ranked_templates(self):
found = self._get_ranked_templates()
self.assertEquals(self._expected_ranked, found)
class MockUpdateWorker(UpdateWorker):
def _print_function(self, text, error=False):
pass
class MockUpdateHandler(UpdateHandler, unittest.TestCase):
def __init__(self, *args, **kwargs):
UpdateHandler.__init__(self)
unittest.TestCase.__init__(self, *args, **kwargs)
self.generate_qubes_handler()
def _get_worker(self, name, qubes_handler):
return MockUpdateWorker(name, self._queue, qubes_handler)
def _get_qubes_handler(self):
return self._qubes_handler
def generate_qubes_handler(self):
self._qubes_handler = MockQubesHandler()
def test_update_all(self):
for number in range(1,10):
self.assert_update_all(number)
self.generate_qubes_handler()
def assert_update_all(self, thread_number):
self.assertEquals([False]*5 , self._qubes_handler.get_updated_status())
self.update_templates(thread_number)
self.assertEquals([True]*5 , self._qubes_handler.get_updated_status())
if __name__ == '__main__':
if len(sys.argv) == 1:
UpdateHandler().update_templates(3)
elif len(sys.argv) >2 or sys.argv[1] != 'test':
print "ERR Usage: dom0-update-all [test]"
else:
unittest.main(argv=sys.argv[:1])
dom0-update-all (QubesOS 3.2)
#!/usr/bin/python2
#
# Copyright (c) Andrea Micheloni 2017
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys, os, time, unittest
from Queue import Queue
from threading import Thread, current_thread
class QubesHandler:
def __init__(self):
from qubes.qubes import QubesVmCollection,QubesException
self._qvm_collection = QubesVmCollection()
try:
self._qvm_collection.lock_db_for_reading()
self._qvm_collection.load()
finally:
self._qvm_collection.unlock_db()
def _get_template_dictionary(self, template_vm, rank):
return {'name': template_vm.name,
'qvm': template_vm,
'rank': rank}
def _get_ranked_templates(self):
templates_ranked = {}
for vm in self._qvm_collection.values():
if vm.is_template():
if not vm.name in templates_ranked:
templates_ranked[vm.name] = self._get_template_dictionary(vm, 0)
else:
if vm.template:
if vm.is_running():
rank = 3
else:
rank = 1
if not vm.template.name in templates_ranked:
templates_ranked[vm.template.name] = \
self._get_template_dictionary(vm.template, rank)
else:
templates_ranked[vm.template.name]['rank'] += rank
return templates_ranked
def get_templates(self):
templates_ranked = self._get_ranked_templates()
return [templates_ranked[template] for template in sorted(templates_ranked,
key=lambda name: templates_ranked[name]['rank'],
reverse = True)]
def _wait_ten_seconds(self):
time.sleep(10)
def update_template(self, template, print_function):
print_function('Updating template...')
try:
template['qvm'].run('touch /tmp/update-all; chmod go+r /tmp/update-all; if [ -f /usr/bin/apt-get ' +
']; then /usr/bin/apt-get update && /usr/bin/apt-get -y upgrade </dev/null ' +
'&& /usr/bin/apt-get -y autoremove && /usr/bin/apt-get autoclean && ' +
'/sbin/poweroff; else /usr/bin/dnf -y upgrade </dev/null && /usr/bin/dnf ' +
'clean all && /usr/sbin/poweroff; fi 2>&1 | tee -a /tmp/update-all',
autostart = True, verbose = False, user = 'root', notify_function = None,
passio = False, localcmd = None, gui = False, filter_esc = True )
print_function('Command running.')
self._wait_ten_seconds()
seconds_waited = 10
while (template['qvm'].is_running()):
self._wait_ten_seconds()
seconds_waited += 10
if (seconds_waited>=120 and seconds_waited % 60 == 0):
if (seconds_waited < 600):
print_function(str(int(seconds_waited/60)) + ' minutes elapsed...')
else:
print_function(str(int(seconds_waited/60)) + ' minutes elapsed, try: qvm-run --pass-io '
+ template['name'] + ' "tail -f /tmp/update-all"')
print_function('Update complete.')
except:
print_function('Update failed.', error=True)
raise
class UpdateWorker(Thread):
def __init__(self, name, queue, qubes_handler):
Thread.__init__(self)
self.daemon = True
self.name = name
self._queue = queue
self._qubes_handler = qubes_handler
self._template = None
def _print_function(self, text, error=False):
if error:
prefix = 'ERR'
else:
prefix = 'INF'
prefix += ' ['+self.name+' '+self._template['name']+'] '
print prefix + text
def run(self):
while True:
self._template = {'name':'None'}
self._template = self._queue.get()
try:
self._qubes_handler.update_template(self._template, self._print_function)
self._template = {'name':'None'}
finally:
self._template = {'name':'None'}
self._queue.task_done()
self._template = {'name':'None'}
class UpdateHandler:
def __init__(self):
self._queue = Queue()
def update_templates(self, thread_number):
qubes_handler = self._get_qubes_handler()
print 'INF Updating templates: ', [template['name'] for template in qubes_handler.get_templates()]
workers = [self._get_worker('w'+str(number), qubes_handler) for number in range(thread_number)]
for worker in workers:
worker.start()
for template in qubes_handler.get_templates():
self._queue.put(template, block = True)
self._queue.join()
def _get_worker(self, name, qubes_handler):
return UpdateWorker(name, self._queue, qubes_handler)
def _get_qubes_handler(self):
return QubesHandler()
def has_qubes_qubes_module():
imp = __import__('imp')
try:
qubes_info = imp.find_module('qubes')
qubes = imp.load_module('qubes', *qubes_info)
imp.find_module('qubes', qubes.__path__)
return True
except ImportError:
return False
class MockQubesVm:
def __init__(self, name, template, is_running):
self.name = name
self.template = template
self._is_running = is_running
self._is_updated = False
def is_running(self):
return self._is_running
def is_template(self):
return (self.template is None)
def is_updated(self):
return self._is_updated
def run(self, command, autostart = False, verbose = True, user = None, notify_function = None,
passio = False, localcmd = None, gui = True, filter_esc = True ):
if (autostart):
self._is_running = True
if (user == 'root' and self._is_running):
self._is_updated = True
self._is_running = False
class MockQubesCollection:
def __init__(self, values):
self._values = values
def values(self):
return self._values
class MockQubesHandler(QubesHandler, unittest.TestCase):
def __init__(self, *args, **kwargs):
if (args):
unittest.TestCase.__init__(self, *args, **kwargs)
self.maxDiff=None
self._template_f = MockQubesVm('fedora-24', None, False)
self._template_fm = MockQubesVm('fedora-24-minimal', None, True)
self._template_d = MockQubesVm('debian-8', None, False)
self._template_ws = MockQubesVm('whonix-ws', None, True)
self._template_wg = MockQubesVm('whonix-wg', None, True)
images = [ MockQubesVm('sys-net', self._template_f, True),
self._template_wg,
MockQubesVm('personal', self._template_d, False),
MockQubesVm('sys-firewall', self._template_fm, True),
self._template_d,
MockQubesVm('sys-whonix', self._template_wg, True),
MockQubesVm('gnupg', self._template_fm, False),
MockQubesVm('disp5', self._template_f, True),
MockQubesVm('anonymous', self._template_ws, False),
self._template_fm,
self._template_ws,
MockQubesVm('work', self._template_d, True),
MockQubesVm('development', self._template_d, False),
MockQubesVm('disp2', self._template_f, True),
self._template_f ]
self._ranked_f = {'name': 'fedora-24', 'qvm': self._template_f, 'rank': 9}
self._ranked_fm = {'name': 'fedora-24-minimal', 'qvm': self._template_fm, 'rank': 4}
self._ranked_d = {'name': 'debian-8', 'qvm': self._template_d, 'rank': 5}
self._ranked_ws = {'name': 'whonix-ws', 'qvm': self._template_ws, 'rank': 1}
self._ranked_wg = {'name': 'whonix-wg', 'qvm': self._template_wg, 'rank': 3}
self._expected_ranked = {
'fedora-24': self._ranked_f,
'fedora-24-minimal': self._ranked_fm,
'debian-8': self._ranked_d,
'whonix-ws': self._ranked_ws,
'whonix-wg': self._ranked_wg }
self._expected = [ self._ranked_f, self._ranked_d, self._ranked_fm,
self._ranked_wg, self._ranked_ws]
self._qvm_collection = MockQubesCollection(images)
def _wait_ten_seconds(self):
pass
def get_updated_status(self):
return [self._template_f.is_updated(),
self._template_fm.is_updated(),
self._template_d.is_updated(),
self._template_ws.is_updated(),
self._template_wg.is_updated()]
def test_get_template_dictionary(self):
self.assert_get_template_dictionary(self._template_f)
self.assert_get_template_dictionary(self._template_fm)
self.assert_get_template_dictionary(self._template_d)
self.assert_get_template_dictionary(self._template_ws)
self.assert_get_template_dictionary(self._template_wg)
def assert_get_template_dictionary(self, template):
found = self._get_template_dictionary(template, len(template.name))
self.assertEquals(3, len(found.keys()))
self.assertEquals(template, found['qvm'])
self.assertEquals(len(template.name), found['rank'])
self.assertEquals(template.name, found['name'])
def test_get_template_dictionary_custom(self):
template = MockQubesVm('test-template', None, False)
found = self._get_template_dictionary(template, 0)
self.assertEquals(3, len(found.keys()))
self.assertEquals(template, found['qvm'])
self.assertEquals(0, found['rank'])
self.assertEquals('test-template', found['name'])
def test_get_templates(self):
found = self.get_templates()
self.assertEquals(self._expected, found)
def test_get_ranked_templates(self):
found = self._get_ranked_templates()
self.assertEquals(self._expected_ranked, found)
@unittest.skipUnless(has_qubes_qubes_module(), "Requires importing qubes.qubes")
def test_dom0_load_actual_vms(self):
handler = QubesHandler()
self.assertTrue(len(handler._qvm_collection)>0)
class MockUpdateWorker(UpdateWorker):
def _print_function(self, text, error=False):
pass
class MockUpdateHandler(UpdateHandler, unittest.TestCase):
def __init__(self, *args, **kwargs):
UpdateHandler.__init__(self)
unittest.TestCase.__init__(self, *args, **kwargs)
self.generate_qubes_handler()
def _get_worker(self, name, qubes_handler):
return MockUpdateWorker(name, self._queue, qubes_handler)
def _get_qubes_handler(self):
return self._qubes_handler
def generate_qubes_handler(self):
self._qubes_handler = MockQubesHandler()
def test_update_all(self):
for number in range(1,10):
self.assert_update_all(number)
self.generate_qubes_handler()
def assert_update_all(self, thread_number):
self.assertEquals([False]*5 , self._qubes_handler.get_updated_status())
self.update_templates(thread_number)
self.assertEquals([True]*5 , self._qubes_handler.get_updated_status())
if __name__ == '__main__':
if len(sys.argv) == 1:
UpdateHandler().update_templates(3)
elif len(sys.argv) >2 or sys.argv[1] != 'test':
print "ERR Usage: dom0-update-all [test]"
else:
unittest.main(argv=sys.argv[:1])