Skip to content

increase IO polling interval over time #891

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions docs/test_format.rst
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,19 @@ the above examples. Planemo adapts the CWL_ job document to Galaxy_ workflows an
names for Galaxy_ tools and input node labels for workflows.

Input files can be specified using either ``path`` attributes (which should generally be file
paths relative to the aritfact and test directory) or ``location`` (which should be a URI). The
paths relative to the artifact and test directory) or ``location`` (which should be a URI). The
examples above demonstrate using both paths relative to the tool file and test data published
to `Zenodo <https://zenodo.org/>`__.

.. note::

These job objects can be run directly with ``planemo run``.

::

$ planemo run --engine=<engine_type> [ENGINE_OPTIONS] [ARTIFACT_PATH] [JOB_PATH]
This should be familar to CWL developers - and indeed if ``--engine=cwltool`` this works as a formal CWL

This should be familar to CWL developers - and indeed if ``--engine=cwltool`` this works as a formal CWL
runner. Planemo provides a uniform interface to Galaxy for Galaxy workflows and tools though using the same
CLI invocation if ``--engine=galaxy`` (for a Planemo managed Galaxy instance), ``--engine=docker_galaxy``
(for a Docker instance of Galaxy launched by Planemo), or ``--engine=external_galaxy`` (for a running
Expand All @@ -107,8 +107,8 @@ to `Zenodo <https://zenodo.org/>`__.
``outputs``
--------------

Galaxy_ tools and CWL_ artifacts have obvious output names that much match the mapping in this block on test
file. Galaxy workflows require explicit output labels to be used with tests, but the important outputs in
Galaxy_ tools and CWL_ artifacts have obvious output names that much match the mapping in this block on test
file. Galaxy workflows require explicit output labels to be used with tests, but the important outputs in
your workflows should be labeled anyway to work with Galaxy subworkflows and more cleanly with API calls.

If an output is known, fixed, and small it makes a lot of sense to just include a copy of the output next
Expand Down Expand Up @@ -246,15 +246,16 @@ option to mount test data into the testing container.
``external_galaxy``
~~~~~~~~~~~~~~~~~~~~

$ planemo test --engine external_galaxy --galaxy_admin_key <admin_key> --galaxy_user_key <user_key> [--no_shed_install] <url>
$ planemo test --engine external_galaxy --galaxy_admin_key <admin_key> --galaxy_user_key <user_key> [--no_shed_install] [--polling_backoff <integer>] <url>

This is primarily useful for testing workflows against already running Galaxy instances. An admin or
master API key should be supplied to install missing tool repositories for the workflow and a user API
key should be supplied to run the workflow using. If you wish to skip tool shed repository installation
(this requires the tool all be present already), use the ``-no_shed_install`` option with the ``test``
command.
(this requires all the tools be present already), use the ``--no_shed_install`` option. If you want to
reduce the load on the target Galaxy while checking for the status changes use the ``--polling_backoff <integer>``
option where integer is the incremental increase in seconds for every request.

To run tool tests against a running Galaxy, ``galaxy-tool-test`` is a script that gets installed with
To run tool tests against a running Galaxy, ``galaxy-tool-test`` is a script that gets installed with
galaxy-lib and so may very well already be on your ``PATH``. Check out the options available with that
using ``galaxy-tool-test --help``.

Expand Down
8 changes: 8 additions & 0 deletions planemo/commands/cmd_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@
"previously.",
default=False,
)
@click.option(
"--polling_backoff",
type=int,
help="Poll resources with an increasing interval between requests. "
"Useful when testing against remote and/or production "
"instances to limit generated traffic.",
default="0",
)
@click.option(
"--history_name",
help="Name for history (if a history is generated as part of testing.)"
Expand Down
22 changes: 12 additions & 10 deletions planemo/galaxy/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,15 @@ def _execute(ctx, config, runnable, job_path, **kwds):
invocation = Client._post(user_gi.workflows, payload, url=invocations_url)
invocation_id = invocation["id"]
ctx.vlog("Waiting for invocation [%s]" % invocation_id)
polling_backoff = kwds.get("polling_backoff", 0)
try:
final_invocation_state = _wait_for_invocation(ctx, user_gi, history_id, workflow_id, invocation_id)
final_invocation_state = _wait_for_invocation(ctx, user_gi, history_id, workflow_id, invocation_id, polling_backoff)
except Exception:
ctx.vlog("Problem waiting on invocation...")
summarize_history(ctx, user_gi, history_id)
raise
ctx.vlog("Final invocation state is [%s]" % final_invocation_state)
final_state = _wait_for_history(ctx, user_gi, history_id)
final_state = _wait_for_history(ctx, user_gi, history_id, polling_backoff)
if final_state != "ok":
msg = "Failed to run workflow final history state is [%s]." % final_state
summarize_history(ctx, user_gi, history_id)
Expand Down Expand Up @@ -566,15 +567,15 @@ def _history_id(gi, **kwds):
return history_id


def _wait_for_invocation(ctx, gi, history_id, workflow_id, invocation_id):
def _wait_for_invocation(ctx, gi, history_id, workflow_id, invocation_id, polling_backoff=0):

def state_func():
if _retry_on_timeouts(ctx, gi, lambda gi: has_jobs_in_states(gi, history_id, ["error", "deleted", "deleted_new"])):
raise Exception("Problem running workflow, one or more jobs failed.")

return _retry_on_timeouts(ctx, gi, lambda gi: gi.workflows.show_invocation(workflow_id, invocation_id))

return _wait_on_state(state_func)
return _wait_on_state(state_func, polling_backoff)


def _retry_on_timeouts(ctx, gi, f):
Expand Down Expand Up @@ -606,20 +607,21 @@ def has_jobs_in_states(gi, history_id, states):
return len(target_jobs) > 0


def _wait_for_history(ctx, gi, history_id):
def _wait_for_history(ctx, gi, history_id, polling_backoff=0):

def has_active_jobs(gi):
if has_jobs_in_states(gi, history_id, ["new", "upload", "waiting", "queued", "running"]):
return True
else:
return None

wait_on(lambda: _retry_on_timeouts(ctx, gi, has_active_jobs), "active jobs", timeout=60 * 60 * 24)
timeout = 60 * 60 * 24
wait_on(lambda: _retry_on_timeouts(ctx, gi, has_active_jobs), "active jobs", timeout, polling_backoff)

def state_func():
return _retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id))

return _wait_on_state(state_func)
return _wait_on_state(state_func, polling_backoff)


def _wait_for_job(gi, job_id):
Expand All @@ -629,7 +631,7 @@ def state_func():
return _wait_on_state(state_func)


def _wait_on_state(state_func):
def _wait_on_state(state_func, polling_backoff=0):

def get_state():
response = state_func()
Expand All @@ -638,8 +640,8 @@ def get_state():
return state
else:
return None

final_state = wait_on(get_state, "state", timeout=60 * 60 * 24)
timeout = 60 * 60 * 24
final_state = wait_on(get_state, "state", timeout, polling_backoff)
return final_state


Expand Down
14 changes: 7 additions & 7 deletions planemo/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,20 +284,20 @@ def tee_captured_output(output):
sys.stderr.write(message['data'] + '\n')


# Taken from Galaxy's twilltestcase.
def wait_on(function, desc, timeout=5):
def wait_on(function, desc, timeout=5, polling_backoff=0):
"""Wait on given function's readiness. Grow the polling
interval incrementally by the polling_backoff."""
delta = .25
iteration = 0
timing = 0
while True:
if (delta * iteration) > timeout:
if timing > timeout:
message = "Timed out waiting on %s." % desc
raise Exception(message)

iteration += 1
timing += delta
delta += polling_backoff
value = function()
if value is not None:
return value

time.sleep(delta)


Expand Down