Skip to content

Commit

Permalink
Do updates based on a single dependency list
Browse files Browse the repository at this point in the history
This ensures that updates proceed in the correct order with maximum
parallelism, for arbitrarily complex dependency graphs.

Change-Id: Ia11f4cfef58a3045199e1e5e49050cb1646f5057
  • Loading branch information
zaneb committed Aug 30, 2013
1 parent 752da8e commit 8b73224
Showing 1 changed file with 70 additions and 45 deletions.
115 changes: 70 additions & 45 deletions heat/engine/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from heat.db import api as db_api

from heat.engine import dependencies
from heat.engine import resource
from heat.engine import scheduler

Expand Down Expand Up @@ -50,32 +51,29 @@ def __repr__(self):
def __call__(self):
"""Return a co-routine that updates the stack."""

existing_deps = self.existing_stack.dependencies
new_deps = self.new_stack.dependencies

cleanup_prev = scheduler.DependencyTaskGroup(
self.previous_stack.dependencies,
self._remove_backup_resource,
reverse=True)
cleanup = scheduler.DependencyTaskGroup(existing_deps,
self._remove_old_resource,
reverse=True)
create_new = scheduler.DependencyTaskGroup(new_deps,
self._create_new_resource)
update = scheduler.DependencyTaskGroup(new_deps,
self._update_resource)

update = scheduler.DependencyTaskGroup(self.dependencies(),
self._resource_update)

if not self.rollback:
yield cleanup_prev()

yield create_new()
try:
yield update()
finally:
prev_deps = self.previous_stack._get_dependencies(
self.previous_stack.resources.itervalues())
self.previous_stack.dependencies = prev_deps
yield cleanup()

def _resource_update(self, res):
if res.name in self.new_stack and self.new_stack[res.name] is res:
return self._process_new_resource_update(res)
else:
return self._process_existing_resource_update(res)

@scheduler.wrappertask
def _remove_backup_resource(self, prev_res):
Expand All @@ -84,27 +82,6 @@ def _remove_backup_resource(self, prev_res):
logger.debug("Deleting backup resource %s" % prev_res.name)
yield prev_res.destroy()

@scheduler.wrappertask
def _remove_old_resource(self, existing_res):
res_name = existing_res.name

if res_name in self.previous_stack:
yield self._remove_backup_resource(self.previous_stack[res_name])

if res_name not in self.new_stack:
logger.debug("resource %s not found in updated stack"
% res_name + " definition, deleting")
yield existing_res.destroy()
del self.existing_stack.resources[res_name]

@scheduler.wrappertask
def _create_new_resource(self, new_res):
res_name = new_res.name
if res_name not in self.existing_stack:
logger.debug("resource %s not found in current stack"
% res_name + " definition, adding")
yield self._create_resource(new_res)

@staticmethod
def _exchange_stacks(existing_res, prev_res):
db_api.resource_exchange_stacks(existing_res.stack.context,
Expand Down Expand Up @@ -147,24 +124,72 @@ def _create_resource(self, new_res):
yield new_res.create()

@scheduler.wrappertask
def _update_resource(self, new_res):
def _process_new_resource_update(self, new_res):
res_name = new_res.name

if res_name not in self.existing_snippets:
return
if res_name in self.existing_stack:
existing_res = self.existing_stack[res_name]
try:
yield self._update_in_place(existing_res,
new_res)
except resource.UpdateReplace:
pass
else:
logger.info("Resource %s for stack %s updated" %
(res_name, self.existing_stack.name))
return

yield self._create_resource(new_res)

@scheduler.wrappertask
def _update_in_place(self, existing_res, new_res):
# Compare resolved pre/post update resource snippets,
# note the new resource snippet is resolved in the context
# of the existing stack (which is the stack being updated)
existing_snippet = self.existing_snippets[res_name]
existing_snippet = self.existing_snippets[existing_res.name]
new_snippet = self.existing_stack.resolve_runtime_data(new_res.t)

if new_snippet != existing_snippet:
try:
yield self.existing_stack[res_name].update(new_snippet,
existing_snippet)
except resource.UpdateReplace:
yield self._create_resource(new_res)
else:
logger.info("Resource %s for stack %s updated" %
(res_name, self.existing_stack.name))
yield existing_res.update(new_snippet, existing_snippet)

@scheduler.wrappertask
def _process_existing_resource_update(self, existing_res):
res_name = existing_res.name

if res_name in self.previous_stack:
yield self._remove_backup_resource(self.previous_stack[res_name])

if res_name in self.new_stack:
new_res = self.new_stack[res_name]
if new_res.state == (new_res.INIT, new_res.COMPLETE):
# Already updated in-place
return

if existing_res.stack is not self.previous_stack:
yield existing_res.destroy()

if res_name not in self.new_stack:
del self.existing_stack.resources[res_name]

def dependencies(self):
'''
Return a Dependencies object representing the dependencies between
update operations to move from an existing stack definition to a new
one.
'''
existing_deps = self.existing_stack.dependencies
new_deps = self.new_stack.dependencies

def edges():
# Create/update the new stack's resources in create order
for e in new_deps.graph().edges():
yield e
# Destroy/cleanup the old stack's resources in delete order
for e in existing_deps.graph(reverse=True).edges():
yield e
# Don't cleanup old resources until after they have been replaced
for res in self.existing_stack:
if res.name in self.new_stack:
yield (res, self.new_stack[res.name])

return dependencies.Dependencies(edges())

0 comments on commit 8b73224

Please sign in to comment.