Files

851 lines
29 KiB
Python
Raw Permalink Normal View History

2025-09-07 22:09:54 +02:00
import collections
import hashlib
from functools import wraps
from typing import Callable, Optional, Any, List, Tuple, Union
import asyncio
import flask
from .dependencies import (
handle_callback_args,
handle_grouped_callback_args,
Output,
ClientsideFunction,
Input,
)
from .development.base_component import ComponentRegistry
from .exceptions import (
InvalidCallbackReturnValue,
PreventUpdate,
WildcardInLongCallback,
MissingLongCallbackManagerError,
BackgroundCallbackError,
ImportedInsideCallbackError,
)
from ._grouping import (
flatten_grouping,
make_grouping_by_index,
grouping_len,
)
from ._utils import (
create_callback_id,
stringify_id,
to_json,
coerce_to_list,
AttributeDict,
clean_property_name,
)
from . import _validate
from .background_callback.managers import BaseBackgroundCallbackManager
from ._callback_context import context_value
from ._no_update import NoUpdate
async def _async_invoke_callback(
func, *args, **kwargs
): # used to mark the frame for the debugger
# Check if the function is a coroutine function
if asyncio.iscoroutinefunction(func):
return await func(*args, **kwargs) # %% callback invoked %%
# If the function is not a coroutine, call it directly
return func(*args, **kwargs) # %% callback invoked %%
def _invoke_callback(func, *args, **kwargs): # used to mark the frame for the debugger
return func(*args, **kwargs) # %% callback invoked %%
GLOBAL_CALLBACK_LIST = []
GLOBAL_CALLBACK_MAP = {}
GLOBAL_INLINE_SCRIPTS = []
# pylint: disable=too-many-locals
def callback(
*_args,
background: bool = False,
interval: int = 1000,
progress: Optional[Union[List[Output], Output]] = None,
progress_default: Any = None,
running: Optional[List[Tuple[Output, Any, Any]]] = None,
cancel: Optional[Union[List[Input], Input]] = None,
manager: Optional[BaseBackgroundCallbackManager] = None,
cache_args_to_ignore: Optional[list] = None,
cache_ignore_triggered=True,
on_error: Optional[Callable[[Exception], Any]] = None,
**_kwargs,
) -> Callable[..., Any]:
"""
Normally used as a decorator, `@dash.callback` provides a server-side
callback relating the values of one or more `Output` items to one or
more `Input` items which will trigger the callback when they change,
and optionally `State` items which provide additional information but
do not trigger the callback directly.
`@dash.callback` is an alternative to `@app.callback` (where `app = dash.Dash()`)
introduced in Dash 2.0.
It allows you to register callbacks without defining or importing the `app`
object. The call signature is identical and it can be used instead of `app.callback`
in all cases.
The last, optional argument `prevent_initial_call` causes the callback
not to fire when its outputs are first added to the page. Defaults to
`False` and unlike `app.callback` is not configurable at the app level.
:Keyword Arguments:
:param background:
Mark the callback as a background callback to execute in a manager for
callbacks that take a long time without locking up the Dash app
or timing out.
:param manager:
A background callback manager instance. Currently, an instance of one of
`DiskcacheManager` or `CeleryManager`.
Defaults to the `background_callback_manager` instance provided to the
`dash.Dash constructor`.
- A diskcache manager (`DiskcacheManager`) that runs callback
logic in a separate process and stores the results to disk using the
diskcache library. This is the easiest backend to use for local
development.
- A Celery manager (`CeleryManager`) that runs callback logic
in a celery worker and returns results to the Dash app through a Celery
broker like RabbitMQ or Redis.
:param running:
A list of 3-element tuples. The first element of each tuple should be
an `Output` dependency object referencing a property of a component in
the app layout. The second element is the value that the property
should be set to while the callback is running, and the third element
is the value the property should be set to when the callback completes.
:param cancel:
A list of `Input` dependency objects that reference a property of a
component in the app's layout. When the value of this property changes
while a callback is running, the callback is canceled.
Note that the value of the property is not significant, any change in
value will result in the cancellation of the running job (if any).
This parameter only applies to background callbacks (`background=True`).
:param progress:
An `Output` dependency grouping that references properties of
components in the app's layout. When provided, the decorated function
will be called with an extra argument as the first argument to the
function. This argument, is a function handle that the decorated
function should call in order to provide updates to the app on its
current progress. This function accepts a single argument, which
correspond to the grouping of properties specified in the provided
`Output` dependency grouping. This parameter only applies to background
callbacks (`background=True`).
:param progress_default:
A grouping of values that should be assigned to the components
specified by the `progress` argument when the callback is not in
progress. If `progress_default` is not provided, all the dependency
properties specified in `progress` will be set to `None` when the
callback is not running. This parameter only applies to background
callbacks (`background=True`).
:param cache_args_to_ignore:
Arguments to ignore when caching is enabled. If callback is configured
with keyword arguments (Input/State provided in a dict),
this should be a list of argument names as strings. Otherwise,
this should be a list of argument indices as integers.
This parameter only applies to background callbacks (`background=True`).
:param cache_ignore_triggered:
Whether to ignore which inputs triggered the callback when creating
the cache. This parameter only applies to background callbacks
(`background=True`).
:param interval:
Time to wait between the background callback update requests.
:param on_error:
Function to call when the callback raises an exception. Receives the
exception object as first argument. The callback_context can be used
to access the original callback inputs, states and output.
"""
background_spec = None
config_prevent_initial_callbacks = _kwargs.pop(
"config_prevent_initial_callbacks", False
)
callback_map = _kwargs.pop("callback_map", GLOBAL_CALLBACK_MAP)
callback_list = _kwargs.pop("callback_list", GLOBAL_CALLBACK_LIST)
if background:
background_spec: Any = {
"interval": interval,
}
if manager:
background_spec["manager"] = manager
if progress:
background_spec["progress"] = coerce_to_list(progress)
validate_background_inputs(background_spec["progress"])
if progress_default:
background_spec["progressDefault"] = coerce_to_list(progress_default)
if not len(background_spec["progress"]) == len(
background_spec["progressDefault"]
):
raise Exception(
"Progress and progress default needs to be of same length"
)
if cancel:
cancel_inputs = coerce_to_list(cancel)
validate_background_inputs(cancel_inputs)
background_spec["cancel"] = [c.to_dict() for c in cancel_inputs]
background_spec["cancel_inputs"] = cancel_inputs
if cache_args_to_ignore:
background_spec["cache_args_to_ignore"] = cache_args_to_ignore
background_spec["cache_ignore_triggered"] = cache_ignore_triggered
return register_callback(
callback_list,
callback_map,
config_prevent_initial_callbacks,
*_args,
**_kwargs,
background=background_spec,
manager=manager,
running=running,
on_error=on_error,
)
def validate_background_inputs(deps):
for dep in deps:
if dep.has_wildcard():
raise WildcardInLongCallback(
f"""
background callbacks does not support dependencies with
pattern-matching ids
Received: {repr(dep)}\n"""
)
ClientsideFuncType = Union[str, ClientsideFunction]
def clientside_callback(clientside_function: ClientsideFuncType, *args, **kwargs):
return register_clientside_callback(
GLOBAL_CALLBACK_LIST,
GLOBAL_CALLBACK_MAP,
False,
GLOBAL_INLINE_SCRIPTS,
clientside_function,
*args,
**kwargs,
)
# pylint: disable=too-many-arguments
def insert_callback(
callback_list,
callback_map,
config_prevent_initial_callbacks,
output,
outputs_indices,
inputs,
state,
inputs_state_indices,
prevent_initial_call,
background=None,
manager=None,
running=None,
dynamic_creator: Optional[bool] = False,
no_output=False,
):
if prevent_initial_call is None:
prevent_initial_call = config_prevent_initial_callbacks
_validate.validate_duplicate_output(
output, prevent_initial_call, config_prevent_initial_callbacks
)
callback_id = create_callback_id(output, inputs, no_output)
callback_spec = {
"output": callback_id,
"inputs": [c.to_dict() for c in inputs],
"state": [c.to_dict() for c in state],
"clientside_function": None,
# prevent_initial_call can be a string "initial_duplicates"
# which should not prevent the initial call.
"prevent_initial_call": prevent_initial_call is True,
"background": background
and {
"interval": background["interval"],
},
"dynamic_creator": dynamic_creator,
"no_output": no_output,
}
if running:
callback_spec["running"] = running
callback_map[callback_id] = {
"inputs": callback_spec["inputs"],
"state": callback_spec["state"],
"outputs_indices": outputs_indices,
"inputs_state_indices": inputs_state_indices,
"background": background,
"output": output,
"raw_inputs": inputs,
"manager": manager,
"allow_dynamic_callbacks": dynamic_creator,
"no_output": no_output,
}
callback_list.append(callback_spec)
return callback_id
def _set_side_update(ctx, response) -> bool:
side_update = dict(ctx.updated_props)
if len(side_update) > 0:
response["sideUpdate"] = side_update
return True
return False
def _initialize_context(args, kwargs, inputs_state_indices, has_output, insert_output):
"""Initialize context and validate output specifications."""
app = kwargs.pop("app", None)
output_spec = kwargs.pop("outputs_list")
callback_ctx = kwargs.pop("callback_context", AttributeDict({"updated_props": {}}))
context_value.set(callback_ctx)
original_packages = set(ComponentRegistry.registry)
if has_output:
_validate.validate_output_spec(insert_output, output_spec, Output)
func_args, func_kwargs = _validate.validate_and_group_input_args(
args, inputs_state_indices
)
return (
output_spec,
callback_ctx,
func_args,
func_kwargs,
app,
original_packages,
False,
)
def _get_callback_manager(
kwargs: dict, background: dict
) -> Union[BaseBackgroundCallbackManager, None]:
"""Set up the background callback and manage jobs."""
callback_manager = background.get(
"manager", kwargs.get("background_callback_manager", None)
)
if background is not None:
if not callback_manager:
raise MissingLongCallbackManagerError(
"Running `background` callbacks requires a manager to be installed.\n"
"Available managers:\n"
"- Diskcache (`pip install dash[diskcache]`) to run callbacks in a separate Process"
" and store results on the local filesystem.\n"
"- Celery (`pip install dash[celery]`) to run callbacks in a celery worker"
" and store results on redis.\n"
)
old_job = flask.request.args.getlist("oldJob")
if old_job:
for job in old_job:
callback_manager.terminate_job(job)
return callback_manager
def _setup_background_callback(
kwargs, background, background_key, func, func_args, func_kwargs, callback_ctx
):
"""Set up the background callback and manage jobs."""
callback_manager = _get_callback_manager(kwargs, background)
progress_outputs = background.get("progress")
cache_ignore_triggered = background.get("cache_ignore_triggered", True)
cache_key = callback_manager.build_cache_key(
func,
# Inputs provided as dict is kwargs.
func_args if func_args else func_kwargs,
background.get("cache_args_to_ignore", []),
None if cache_ignore_triggered else callback_ctx.get("triggered_inputs", []),
)
job_fn = callback_manager.func_registry.get(background_key)
ctx_value = AttributeDict(**context_value.get())
ctx_value.ignore_register_page = True
ctx_value.pop("background_callback_manager")
ctx_value.pop("dash_response")
job = callback_manager.call_job_fn(
cache_key,
job_fn,
func_args if func_args else func_kwargs,
ctx_value,
)
data = {
"cacheKey": cache_key,
"job": job,
}
cancel = background.get("cancel")
if cancel:
data["cancel"] = cancel
progress_default = background.get("progressDefault")
if progress_default:
data["progressDefault"] = {
str(o): x for o, x in zip(progress_outputs, progress_default)
}
return to_json(data)
def _progress_background_callback(response, callback_manager, background):
progress_outputs = background.get("progress")
cache_key = flask.request.args.get("cacheKey")
if progress_outputs:
# Get the progress before the result as it would be erased after the results.
progress = callback_manager.get_progress(cache_key)
if progress:
response["progress"] = {
str(x): progress[i] for i, x in enumerate(progress_outputs)
}
def _update_background_callback(
error_handler, callback_ctx, response, kwargs, background, multi
):
"""Set up the background callback and manage jobs."""
callback_manager = _get_callback_manager(kwargs, background)
cache_key = flask.request.args.get("cacheKey")
job_id = flask.request.args.get("job")
_progress_background_callback(response, callback_manager, background)
output_value = callback_manager.get_result(cache_key, job_id)
return _handle_rest_background_callback(
output_value, callback_manager, response, error_handler, callback_ctx, multi
)
def _handle_rest_background_callback(
output_value,
callback_manager,
response,
error_handler,
callback_ctx,
multi,
has_update=False,
):
cache_key = flask.request.args.get("cacheKey")
job_id = flask.request.args.get("job")
# Must get job_running after get_result since get_results terminates it.
job_running = callback_manager.job_running(job_id)
if not job_running and output_value is callback_manager.UNDEFINED:
# Job canceled -> no output to close the loop.
output_value = NoUpdate()
elif isinstance(output_value, dict) and "background_callback_error" in output_value:
error = output_value.get("background_callback_error", {})
exc = BackgroundCallbackError(
f"An error occurred inside a background callback: {error['msg']}\n{error['tb']}"
)
if error_handler:
output_value = error_handler(exc)
if output_value is None:
output_value = NoUpdate()
# set_props from the error handler uses the original ctx
# instead of manager.get_updated_props since it runs in the
# request process.
has_update = (
_set_side_update(callback_ctx, response) or output_value is not None
)
else:
raise exc
if job_running and output_value is not callback_manager.UNDEFINED:
# cached results.
callback_manager.terminate_job(job_id)
if multi and isinstance(output_value, (list, tuple)):
output_value = [
NoUpdate() if NoUpdate.is_no_update(r) else r for r in output_value
]
updated_props = callback_manager.get_updated_props(cache_key)
if len(updated_props) > 0:
response["sideUpdate"] = updated_props
has_update = True
if output_value is callback_manager.UNDEFINED:
return to_json(response), has_update, True
return output_value, has_update, False
# pylint: disable=too-many-branches
def _prepare_response(
output_value,
output_spec,
multi,
response,
callback_ctx,
app,
original_packages,
background,
has_update,
has_output,
output,
callback_id,
allow_dynamic_callbacks,
):
"""Prepare the response object based on the callback output."""
component_ids = collections.defaultdict(dict)
if has_output:
if not multi:
output_value, output_spec = [output_value], [output_spec]
flat_output_values = output_value
else:
if isinstance(output_value, (list, tuple)):
# For multi-output, allow top-level collection to be
# list or tuple
output_value = list(output_value)
if NoUpdate.is_no_update(output_value):
flat_output_values = [output_value]
else:
# Flatten grouping and validate grouping structure
flat_output_values = flatten_grouping(output_value, output)
if not NoUpdate.is_no_update(output_value):
_validate.validate_multi_return(
output_spec, flat_output_values, callback_id
)
for val, spec in zip(flat_output_values, output_spec):
if NoUpdate.is_no_update(val):
continue
for vali, speci in (
zip(val, spec) if isinstance(spec, list) else [[val, spec]] # type: ignore[reportArgumentType]
):
if not NoUpdate.is_no_update(vali):
has_update = True
id_str = stringify_id(speci["id"])
prop = clean_property_name(speci["property"])
component_ids[id_str][prop] = vali
else:
if output_value is not None:
raise InvalidCallbackReturnValue(
f"No-output callback received return value: {output_value}"
)
if not background:
has_update = _set_side_update(callback_ctx, response) or has_output
if not has_update:
raise PreventUpdate
if len(ComponentRegistry.registry) != len(original_packages):
diff_packages = list(
set(ComponentRegistry.registry).difference(original_packages)
)
if not allow_dynamic_callbacks:
raise ImportedInsideCallbackError(
f"Component librar{'y' if len(diff_packages) == 1 else 'ies'} was imported during callback.\n"
"You can set `_allow_dynamic_callbacks` to allow for development purpose only."
)
dist = app.get_dist(diff_packages)
response["dist"] = dist
return response.update({"response": component_ids})
# pylint: disable=too-many-branches,too-many-statements
def register_callback(
callback_list, callback_map, config_prevent_initial_callbacks, *_args, **_kwargs
):
(
output,
flat_inputs,
flat_state,
inputs_state_indices,
prevent_initial_call,
) = handle_grouped_callback_args(_args, _kwargs)
if isinstance(output, Output):
# Insert callback with scalar (non-multi) Output
insert_output = output
multi = False
has_output = True
else:
# Insert callback as multi Output
insert_output = flatten_grouping(output)
multi = True
has_output = len(output) > 0
background = _kwargs.get("background")
manager = _kwargs.get("manager")
running = _kwargs.get("running")
on_error = _kwargs.get("on_error")
if running is not None:
if not isinstance(running[0], (list, tuple)):
running = [running]
running = {
"running": {str(r[0]): r[1] for r in running},
"runningOff": {str(r[0]): r[2] for r in running},
}
allow_dynamic_callbacks = _kwargs.get("_allow_dynamic_callbacks")
output_indices = make_grouping_by_index(output, list(range(grouping_len(output))))
callback_id = insert_callback(
callback_list,
callback_map,
config_prevent_initial_callbacks,
insert_output,
output_indices,
flat_inputs,
flat_state,
inputs_state_indices,
prevent_initial_call,
background=background,
manager=manager,
dynamic_creator=allow_dynamic_callbacks,
running=running,
no_output=not has_output,
)
# pylint: disable=too-many-locals
def wrap_func(func):
if background is None:
background_key = None
else:
background_key = BaseBackgroundCallbackManager.register_func(
func,
background.get("progress") is not None,
callback_id,
)
@wraps(func)
def add_context(*args, **kwargs):
"""Handles synchronous callbacks with context management."""
error_handler = on_error or kwargs.pop("app_on_error", None)
(
output_spec,
callback_ctx,
func_args,
func_kwargs,
app,
original_packages,
has_update,
) = _initialize_context(
args, kwargs, inputs_state_indices, has_output, insert_output
)
response: dict = {"multi": True}
jsonResponse = None
try:
if background is not None:
if not flask.request.args.get("cacheKey"):
return _setup_background_callback(
kwargs,
background,
background_key,
func,
func_args,
func_kwargs,
callback_ctx,
)
output_value, has_update, skip = _update_background_callback(
error_handler, callback_ctx, response, kwargs, background, multi
)
if skip:
return output_value
else:
output_value = _invoke_callback(func, *func_args, **func_kwargs) # type: ignore[reportArgumentType]
except PreventUpdate:
raise
except Exception as err: # pylint: disable=broad-exception-caught
if error_handler:
output_value = error_handler(err)
if output_value is None and output_spec:
output_value = NoUpdate()
else:
raise err
_prepare_response(
output_value,
output_spec,
multi,
response,
callback_ctx,
app,
original_packages,
background,
has_update,
has_output,
output,
callback_id,
allow_dynamic_callbacks,
)
try:
jsonResponse = to_json(response)
except TypeError:
_validate.fail_callback_output(output_value, output)
return jsonResponse
@wraps(func)
async def async_add_context(*args, **kwargs):
"""Handles async callbacks with context management."""
error_handler = on_error or kwargs.pop("app_on_error", None)
(
output_spec,
callback_ctx,
func_args,
func_kwargs,
app,
original_packages,
has_update,
) = _initialize_context(
args, kwargs, inputs_state_indices, has_output, insert_output
)
response: dict = {"multi": True}
try:
if background is not None:
if not flask.request.args.get("cacheKey"):
return _setup_background_callback(
kwargs,
background,
background_key,
func,
func_args,
func_kwargs,
callback_ctx,
)
output_value, has_update, skip = _update_background_callback(
error_handler, callback_ctx, response, kwargs, background, multi
)
if skip:
return output_value
else:
output_value = await _async_invoke_callback(
func, *func_args, **func_kwargs
)
except PreventUpdate:
raise
except Exception as err: # pylint: disable=broad-exception-caught
if error_handler:
output_value = error_handler(err)
if output_value is None and output_spec:
output_value = NoUpdate()
else:
raise err
_prepare_response(
output_value,
output_spec,
multi,
response,
callback_ctx,
app,
original_packages,
background,
has_update,
has_output,
output,
callback_id,
allow_dynamic_callbacks,
)
try:
jsonResponse = to_json(response)
except TypeError:
_validate.fail_callback_output(output_value, output)
return jsonResponse
if asyncio.iscoroutinefunction(func):
callback_map[callback_id]["callback"] = async_add_context
else:
callback_map[callback_id]["callback"] = add_context
return func
return wrap_func
_inline_clientside_template = """
(function() {{
var clientside = window.dash_clientside = window.dash_clientside || {{}};
var ns = clientside["{namespace}"] = clientside["{namespace}"] || {{}};
ns["{function_name}"] = {clientside_function};
}})();
"""
def register_clientside_callback(
callback_list,
callback_map,
config_prevent_initial_callbacks,
inline_scripts,
clientside_function: ClientsideFuncType,
*args,
**kwargs,
):
output, inputs, state, prevent_initial_call = handle_callback_args(args, kwargs)
no_output = isinstance(output, (list,)) and len(output) == 0
insert_callback(
callback_list,
callback_map,
config_prevent_initial_callbacks,
output,
None,
inputs,
state,
None,
prevent_initial_call,
no_output=no_output,
)
# If JS source is explicitly given, create a namespace and function
# name, then inject the code.
if isinstance(clientside_function, str):
namespace = "_dashprivate_clientside_funcs"
# Create a hash from the function, it will be the same always
function_name = hashlib.sha256(clientside_function.encode("utf-8")).hexdigest()
inline_scripts.append(
_inline_clientside_template.format(
namespace=namespace,
function_name=function_name,
clientside_function=clientside_function,
)
)
# Callback is stored in an external asset.
else:
namespace = clientside_function.namespace
function_name = clientside_function.function_name
callback_list[-1]["clientside_function"] = {
"namespace": namespace,
"function_name": function_name,
}