Source code for tango.green

# SPDX-FileCopyrightText: All Contributors to the PyTango project
# SPDX-License-Identifier: LGPL-3.0-or-later


# Imports
import os
import inspect
import textwrap
import re

from functools import wraps

from threading import get_ident

# Tango imports
from tango._tango import GreenMode
from tango.utils import _forcefully_traced_method

__all__ = (
    "get_green_mode",
    "set_green_mode",
    "green",
    "green_callback",
    "get_executor",
    "get_object_executor",
    "switch_existing_global_executors_to_thread",
)

try:
    import gevent

    del gevent
    _gevent_available = True
except ImportError:
    _gevent_available = False

# Handle current green mode

try:
    _CURRENT_GREEN_MODE = getattr(
        GreenMode, os.environ["PYTANGO_GREEN_MODE"].capitalize()
    )
except Exception:
    _CURRENT_GREEN_MODE = GreenMode.Synchronous


[docs] def set_green_mode(green_mode=None): """Sets the global default PyTango green mode. Advice: Use only in your final application. Don't use this in a python library in order not to interfere with the beavior of other libraries and/or application where your library is being. :param green_mode: the new global default PyTango green mode :type green_mode: GreenMode """ global _CURRENT_GREEN_MODE # Make sure the green mode is available get_executor(green_mode) # Set the green mode _CURRENT_GREEN_MODE = green_mode
[docs] def get_green_mode(): """Returns the current global default PyTango green mode. :returns: the current global default PyTango green mode :rtype: GreenMode """ return _CURRENT_GREEN_MODE
# Abstract executor class class AbstractExecutor: asynchronous = NotImplemented default_wait = NotImplemented def __init__(self): self.ident = get_ident(), os.getpid() def get_ident(self): return self.ident def in_executor_context(self): return self.ident == (get_ident(), os.getpid()) def delegate(self, fn, *args, **kwargs): """Delegate an operation and return an accessor.""" if not self.asynchronous: raise ValueError("Not supported in synchronous mode") raise NotImplementedError def access(self, accessor, timeout=None): """Return a result from an accessor.""" if not self.asynchronous: raise ValueError("Not supported in synchronous mode") raise NotImplementedError def submit(self, fn, *args, **kwargs): """Submit an operation""" if not self.asynchronous: return fn(*args, **kwargs) raise NotImplementedError def execute(self, fn, *args, **kwargs): """Execute an operation and return the result.""" if not self.asynchronous: return fn(*args, **kwargs) raise NotImplementedError def run(self, fn, args=(), kwargs={}, wait=None, timeout=None): if wait is None: wait = self.default_wait # Wait and timeout are not supported in synchronous mode if not self.asynchronous and (not wait or timeout): raise ValueError("Not supported in synchronous mode") # Synchronous (no delegation) if not self.asynchronous or not self.in_executor_context(): return fn(*args, **kwargs) # Asynchronous delegation accessor = self.delegate(fn, *args, **kwargs) if not wait: return accessor return self.access(accessor, timeout=timeout) class SynchronousExecutor(AbstractExecutor): asynchronous = False default_wait = True # Default synchronous executor def get_synchronous_executor(): return _SYNCHRONOUS_EXECUTOR _SYNCHRONOUS_EXECUTOR = SynchronousExecutor() # Getters def get_object_green_mode(obj): if hasattr(obj, "get_green_mode"): return obj.get_green_mode() return get_green_mode() def get_executor(green_mode=None): if green_mode is None: green_mode = get_green_mode() # Valid green modes if green_mode == GreenMode.Synchronous: return get_synchronous_executor() if green_mode == GreenMode.Gevent: from tango import gevent_executor return gevent_executor.get_global_executor() if green_mode == GreenMode.Futures: from tango import futures_executor return futures_executor.get_global_executor() if green_mode == GreenMode.Asyncio: from tango import asyncio_executor return asyncio_executor.get_global_executor() # Invalid green mode raise TypeError("Not a valid green mode") def switch_existing_global_executors_to_thread(): """ checks which global executor existing, and if they are belong to the caller thread if not - creates a new executor, linked to thread, and set it as global """ from tango import asyncio_executor from tango import futures_executor if _gevent_available: from tango import gevent_executor else: gevent_executor = None for executor in [asyncio_executor, futures_executor, gevent_executor]: if executor: executor._switch_global_executor_to_thread() def get_object_executor(obj, green_mode=None): """Returns the proper executor for the given object. If the object has *_executors* and *_green_mode* members it returns the submit callable for the executor corresponding to the green_mode. Otherwise it returns the global executor for the given green_mode. Note: *None* is a valid object. :returns: submit callable""" # Get green mode if green_mode is None: green_mode = get_object_green_mode(obj) # Get executor executor = None if hasattr(obj, "_executors"): executor = obj._executors.get(green_mode, None) if executor is None: executor = get_executor(green_mode) # Get submitter return executor # Green modifiers def green(fn=None, consume_green_mode=True, update_signature_and_docstring=False): """Make a function green. Can be used as a decorator.""" def decorator(fn): @wraps(fn) def greener(obj, *args, **kwargs): args = (obj,) + args wait = kwargs.pop("wait", None) timeout = kwargs.pop("timeout", None) access = kwargs.pop if consume_green_mode else kwargs.get green_mode = access("green_mode", None) executor = get_object_executor(obj, green_mode) return executor.run(fn, args, kwargs, wait=wait, timeout=timeout) sig = inspect.signature(fn) if update_signature_and_docstring: # Build green parameters green_mode_param = inspect.Parameter( "green_mode", kind=inspect.Parameter.KEYWORD_ONLY, default=None, annotation=None, ) wait_param = inspect.Parameter( "wait", kind=inspect.Parameter.KEYWORD_ONLY, default=None, annotation=None, ) timeout_param = inspect.Parameter( "timeout", kind=inspect.Parameter.KEYWORD_ONLY, default=None, annotation=None, ) # Append it to the existing parameters old_params = list(sig.parameters.values()) add_kwargs = False if old_params[-1].kind == inspect.Parameter.VAR_KEYWORD: old_params = old_params[:-1] add_kwargs = True if "green_mode" in [param.name for param in old_params]: new_params = old_params + [wait_param, timeout_param] else: new_params = old_params + [green_mode_param, wait_param, timeout_param] if add_kwargs: new_params += [ inspect.Parameter("kwargs", kind=inspect.Parameter.VAR_KEYWORD) ] new_sig = sig.replace(parameters=new_params) if greener.__doc__ is not None: fill_green_doc(greener) greener.__signature__ = new_sig else: greener.__signature__ = sig return greener if fn is None: return decorator return decorator(fn) def green_callback(fn, obj=None, green_mode=None): """Return a green verion of the given callback.""" executor = get_object_executor(obj, green_mode) @wraps(fn) def greener(*args, **kwargs): return executor.submit(_forcefully_traced_method(fn), *args, **kwargs) return greener __GREEN_KWARGS__ = "green_mode=None, wait=True, timeout=None" __GREEN_KWARGS_DESCRIPTION__ = """ :param green_mode: Defaults to the current tango GreenMode. Refer to :meth:`~tango.DeviceProxy.get_green_mode` and :meth:`~tango.DeviceProxy.set_green_mode` for more details. :type green_mode: :obj:`tango.GreenMode`, optional :param wait: Specifies whether to wait for the result. If `green_mode` is *Synchronous*, this parameter is ignored as the operation always waits for the result. This parameter is also ignored when `green_mode` is Synchronous. :type wait: bool, optional :param timeout: The number of seconds to wait for the result. If set to `None`, there is no limit on the wait time. This parameter is ignored when `green_mode` is Synchronous or when `wait` is False. :type timeout: float, optional """ __GREEN_RAISES__ = """ :throws: :obj:`TimeoutError`: (green_mode == Futures) If the future didn't finish executing before the given timeout. :throws: :obj:`Timeout`: (green_mode == Gevent) If the async result didn't finish executing before the given timeout. """ def fill_green_doc(method): """ Replace the __GREEN_KWARGS__ __GREEN_KWARGS_DESCRIPTION__ placeholders in `doc` preserving the placeholder’s indentation. """ dedented = textwrap.dedent(method.__doc__) dedented = dedented.replace("__GREEN_KWARGS__", __GREEN_KWARGS__) m = re.search( r"^(?P<indent>[ \t]*)__GREEN_KWARGS_DESCRIPTION__", dedented, flags=re.MULTILINE ) if not m: return indent = m.group("indent") indented_desc = textwrap.indent(__GREEN_KWARGS_DESCRIPTION__.strip("\n"), indent) indented_raises = textwrap.indent(__GREEN_RAISES__.strip("\n"), indent) dedented = re.sub( r"^[ \t]*__GREEN_KWARGS_DESCRIPTION__", indented_desc, dedented, flags=re.MULTILINE, ) method.__doc__ = re.sub( r"^[ \t]*__GREEN_RAISES__", indented_raises, dedented, flags=re.MULTILINE )