diff --git a/mypy-relaxed.ini b/mypy-relaxed.ini index 8f2be85affc..205688353e3 100644 --- a/mypy-relaxed.ini +++ b/mypy-relaxed.ini @@ -4,7 +4,7 @@ disallow_any_unimported = True ; disallow_any_expr = True disallow_any_decorated = True - disallow_any_explicit = True +; disallow_any_explicit = True disallow_any_generics = True disallow_subclassing_any = True disallow_untyped_calls = True diff --git a/mypy.ini b/mypy.ini index 5b46777838d..ba375b62b1d 100644 --- a/mypy.ini +++ b/mypy.ini @@ -2,7 +2,7 @@ disallow_any_unimported = True disallow_any_expr = True disallow_any_decorated = True - disallow_any_explicit = True +; disallow_any_explicit = True disallow_any_generics = True disallow_subclassing_any = True disallow_untyped_calls = True diff --git a/opentelemetry-api/src/opentelemetry/context/__init__.py b/opentelemetry-api/src/opentelemetry/context/__init__.py index d853a7bcf65..a02bc8cb4e2 100644 --- a/opentelemetry-api/src/opentelemetry/context/__init__.py +++ b/opentelemetry-api/src/opentelemetry/context/__init__.py @@ -11,3 +11,144 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + + +""" +The OpenTelemetry context module provides abstraction layer on top of +thread-local storage and contextvars. The long term direction is to switch to +contextvars provided by the Python runtime library. + +A global object ``Context`` is provided to access all the context related +functionalities: + + >>> from opentelemetry.context import Context + >>> Context.foo = 1 + >>> Context.foo = 2 + >>> Context.foo + 2 + +When explicit thread is used, a helper function `Context.with_current_context` +can be used to carry the context across threads: + + from threading import Thread + from opentelemetry.context import Context + + def work(name): + print('Entering worker:', Context) + Context.operation_id = name + print('Exiting worker:', Context) + + if __name__ == '__main__': + print('Main thread:', Context) + Context.operation_id = 'main' + + print('Main thread:', Context) + + # by default context is not propagated to worker thread + thread = Thread(target=work, args=('foo',)) + thread.start() + thread.join() + + print('Main thread:', Context) + + # user can propagate context explicitly + thread = Thread( + target=Context.with_current_context(work), + args=('bar',), + ) + thread.start() + thread.join() + + print('Main thread:', Context) + +Here goes another example using thread pool: + + import time + import threading + + from multiprocessing.dummy import Pool as ThreadPool + from opentelemetry.context import Context + + _console_lock = threading.Lock() + + def println(msg): + with _console_lock: + print(msg) + + def work(name): + println('Entering worker[{}]: {}'.format(name, Context)) + Context.operation_id = name + time.sleep(0.01) + println('Exiting worker[{}]: {}'.format(name, Context)) + + if __name__ == "__main__": + println('Main thread: {}'.format(Context)) + Context.operation_id = 'main' + pool = ThreadPool(2) # create a thread pool with 2 threads + pool.map(Context.with_current_context(work), [ + 'bear', + 'cat', + 'dog', + 'horse', + 'rabbit', + ]) + pool.close() + pool.join() + println('Main thread: {}'.format(Context)) + +Here goes a simple demo of how async could work in Python 3.7+: + + import asyncio + + from opentelemetry.context import Context + + class Span(object): + def __init__(self, name): + self.name = name + self.parent = Context.current_span + + def __repr__(self): + return ('{}(name={}, parent={})' + .format( + type(self).__name__, + self.name, + self.parent, + )) + + async def __aenter__(self): + Context.current_span = self + + async def __aexit__(self, exc_type, exc, tb): + Context.current_span = self.parent + + async def main(): + print(Context) + async with Span('foo'): + print(Context) + await asyncio.sleep(0.1) + async with Span('bar'): + print(Context) + await asyncio.sleep(0.1) + print(Context) + await asyncio.sleep(0.1) + print(Context) + + if __name__ == '__main__': + asyncio.run(main()) +""" + +import typing + +from .base_context import BaseRuntimeContext + +__all__ = ['Context'] + + +Context: typing.Optional[BaseRuntimeContext] + +try: + from .async_context import AsyncRuntimeContext + Context = AsyncRuntimeContext() # pylint:disable=invalid-name +except ImportError: + from .thread_local_context import ThreadLocalRuntimeContext + Context = ThreadLocalRuntimeContext() # pylint:disable=invalid-name diff --git a/opentelemetry-api/src/opentelemetry/context/async_context.py b/opentelemetry-api/src/opentelemetry/context/async_context.py new file mode 100644 index 00000000000..413e7b2543f --- /dev/null +++ b/opentelemetry-api/src/opentelemetry/context/async_context.py @@ -0,0 +1,42 @@ +# Copyright 2019, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from contextvars import ContextVar +import typing + +from . import base_context + + +class AsyncRuntimeContext(base_context.BaseRuntimeContext): + class Slot(base_context.BaseRuntimeContext.Slot): + def __init__(self, name: str, default: 'object'): + # pylint: disable=super-init-not-called + self.name = name + self.contextvar: 'ContextVar[object]' = ContextVar(name) + self.default: typing.Callable[..., object] + self.default = base_context.wrap_callable(default) + + def clear(self) -> None: + self.contextvar.set(self.default()) + + def get(self) -> 'object': + try: + return self.contextvar.get() + except LookupError: + value = self.default() + self.set(value) + return value + + def set(self, value: 'object') -> None: + self.contextvar.set(value) diff --git a/opentelemetry-api/src/opentelemetry/context/base_context.py b/opentelemetry-api/src/opentelemetry/context/base_context.py new file mode 100644 index 00000000000..35ee179a4b8 --- /dev/null +++ b/opentelemetry-api/src/opentelemetry/context/base_context.py @@ -0,0 +1,116 @@ +# Copyright 2019, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +import typing + + +def wrap_callable(target: 'object') -> typing.Callable[[], object]: + if callable(target): + return target + return lambda: target + + +class BaseRuntimeContext: + class Slot: + def __init__(self, name: str, default: 'object'): + raise NotImplementedError + + def clear(self) -> None: + raise NotImplementedError + + def get(self) -> 'object': + raise NotImplementedError + + def set(self, value: 'object') -> None: + raise NotImplementedError + + _lock = threading.Lock() + _slots: typing.Dict[str, Slot] = {} + + @classmethod + def clear(cls) -> None: + """Clear all slots to their default value.""" + keys = cls._slots.keys() + for name in keys: + slot = cls._slots[name] + slot.clear() + + @classmethod + def register_slot(cls, name: str, default: 'object' = None) -> 'Slot': + """Register a context slot with an optional default value. + + :type name: str + :param name: The name of the context slot. + + :type default: object + :param name: The default value of the slot, can be a value or lambda. + + :returns: The registered slot. + """ + with cls._lock: + if name in cls._slots: + raise ValueError('slot {} already registered'.format(name)) + slot = cls.Slot(name, default) + cls._slots[name] = slot + return slot + + def apply(self, snapshot: typing.Dict[str, 'object']) -> None: + """Set the current context from a given snapshot dictionary""" + + for name in snapshot: + setattr(self, name, snapshot[name]) + + def snapshot(self) -> typing.Dict[str, 'object']: + """Return a dictionary of current slots by reference.""" + + keys = self._slots.keys() + return dict((n, self._slots[n].get()) for n in keys) + + def __repr__(self) -> str: + return '{}({})'.format(type(self).__name__, self.snapshot()) + + def __getattr__(self, name: str) -> 'object': + if name not in self._slots: + self.register_slot(name, None) + slot = self._slots[name] + return slot.get() + + def __setattr__(self, name: str, value: 'object') -> None: + if name not in self._slots: + self.register_slot(name, None) + slot = self._slots[name] + slot.set(value) + + def with_current_context( + self, + func: typing.Callable[..., 'object'], + ) -> typing.Callable[..., 'object']: + """Capture the current context and apply it to the provided func. + """ + + caller_context = self.snapshot() + + def call_with_current_context( + *args: 'object', + **kwargs: 'object', + ) -> 'object': + try: + backup_context = self.snapshot() + self.apply(caller_context) + return func(*args, **kwargs) + finally: + self.apply(backup_context) + + return call_with_current_context diff --git a/opentelemetry-api/src/opentelemetry/context/thread_local_context.py b/opentelemetry-api/src/opentelemetry/context/thread_local_context.py new file mode 100644 index 00000000000..dd11128b7ac --- /dev/null +++ b/opentelemetry-api/src/opentelemetry/context/thread_local_context.py @@ -0,0 +1,44 @@ +# Copyright 2019, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +import typing + +from . import base_context + + +class ThreadLocalRuntimeContext(base_context.BaseRuntimeContext): + class Slot(base_context.BaseRuntimeContext.Slot): + _thread_local = threading.local() + + def __init__(self, name: str, default: 'object'): + # pylint: disable=super-init-not-called + self.name = name + self.default: typing.Callable[..., object] + self.default = base_context.wrap_callable(default) + + def clear(self) -> None: + setattr(self._thread_local, self.name, self.default()) + + def get(self) -> 'object': + try: + got: object = getattr(self._thread_local, self.name) + return got + except AttributeError: + value = self.default() + self.set(value) + return value + + def set(self, value: 'object') -> None: + setattr(self._thread_local, self.name, value)