-
Notifications
You must be signed in to change notification settings - Fork 699
Initial implementation of context #57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
989c3e7
dd979b2
e6fbe91
44e5897
beaa000
9f71780
90fde07
b8ca3e4
00a256c
199054a
023eac9
f83cf89
62517f0
937240e
2b2f553
533fd2a
123887e
1f93a36
0b114e4
e557afd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,3 +11,144 @@ | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
|
||
""" | ||
The OpenTelemetry context module provides abstraction layer on top of | ||
thread-local storage and contextvars. The long term direction is to switch to | ||
contextvars provided by the Python runtime library. | ||
|
||
A global object ``Context`` is provided to access all the context related | ||
functionalities: | ||
|
||
>>> from opentelemetry.context import Context | ||
>>> Context.foo = 1 | ||
>>> Context.foo = 2 | ||
>>> Context.foo | ||
2 | ||
|
||
When explicit thread is used, a helper function `Context.with_current_context` | ||
can be used to carry the context across threads: | ||
|
||
from threading import Thread | ||
from opentelemetry.context import Context | ||
|
||
def work(name): | ||
print('Entering worker:', Context) | ||
Context.operation_id = name | ||
print('Exiting worker:', Context) | ||
|
||
if __name__ == '__main__': | ||
print('Main thread:', Context) | ||
Context.operation_id = 'main' | ||
|
||
print('Main thread:', Context) | ||
|
||
# by default context is not propagated to worker thread | ||
thread = Thread(target=work, args=('foo',)) | ||
thread.start() | ||
thread.join() | ||
|
||
print('Main thread:', Context) | ||
|
||
# user can propagate context explicitly | ||
thread = Thread( | ||
target=Context.with_current_context(work), | ||
args=('bar',), | ||
) | ||
thread.start() | ||
thread.join() | ||
|
||
print('Main thread:', Context) | ||
|
||
Here goes another example using thread pool: | ||
|
||
import time | ||
import threading | ||
|
||
from multiprocessing.dummy import Pool as ThreadPool | ||
from opentelemetry.context import Context | ||
|
||
_console_lock = threading.Lock() | ||
|
||
def println(msg): | ||
with _console_lock: | ||
print(msg) | ||
|
||
def work(name): | ||
println('Entering worker[{}]: {}'.format(name, Context)) | ||
Context.operation_id = name | ||
time.sleep(0.01) | ||
println('Exiting worker[{}]: {}'.format(name, Context)) | ||
|
||
if __name__ == "__main__": | ||
println('Main thread: {}'.format(Context)) | ||
Context.operation_id = 'main' | ||
pool = ThreadPool(2) # create a thread pool with 2 threads | ||
pool.map(Context.with_current_context(work), [ | ||
'bear', | ||
'cat', | ||
'dog', | ||
'horse', | ||
'rabbit', | ||
]) | ||
pool.close() | ||
pool.join() | ||
println('Main thread: {}'.format(Context)) | ||
|
||
Here goes a simple demo of how async could work in Python 3.7+: | ||
|
||
import asyncio | ||
|
||
from opentelemetry.context import Context | ||
|
||
class Span(object): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @c24t this should unblock your work on the actual span creation and tracer context update. |
||
def __init__(self, name): | ||
self.name = name | ||
self.parent = Context.current_span | ||
|
||
def __repr__(self): | ||
return ('{}(name={}, parent={})' | ||
.format( | ||
type(self).__name__, | ||
self.name, | ||
self.parent, | ||
)) | ||
|
||
async def __aenter__(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This example only works on Python3.7+ due to the new |
||
Context.current_span = self | ||
|
||
async def __aexit__(self, exc_type, exc, tb): | ||
Context.current_span = self.parent | ||
|
||
async def main(): | ||
print(Context) | ||
async with Span('foo'): | ||
print(Context) | ||
await asyncio.sleep(0.1) | ||
async with Span('bar'): | ||
print(Context) | ||
await asyncio.sleep(0.1) | ||
print(Context) | ||
await asyncio.sleep(0.1) | ||
print(Context) | ||
|
||
if __name__ == '__main__': | ||
asyncio.run(main()) | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about adding this module to the sphinx docs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, I'll send a follow up PR to add test cases and docs. |
||
|
||
import typing | ||
|
||
from .base_context import BaseRuntimeContext | ||
|
||
__all__ = ['Context'] | ||
|
||
|
||
Context: typing.Optional[BaseRuntimeContext] | ||
|
||
try: | ||
from .async_context import AsyncRuntimeContext | ||
Context = AsyncRuntimeContext() # pylint:disable=invalid-name | ||
except ImportError: | ||
from .thread_local_context import ThreadLocalRuntimeContext | ||
Context = ThreadLocalRuntimeContext() # pylint:disable=invalid-name |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
# Copyright 2019, OpenTelemetry Authors | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from contextvars import ContextVar | ||
import typing | ||
|
||
from . import base_context | ||
|
||
|
||
class AsyncRuntimeContext(base_context.BaseRuntimeContext): | ||
class Slot(base_context.BaseRuntimeContext.Slot): | ||
def __init__(self, name: str, default: 'object'): | ||
# pylint: disable=super-init-not-called | ||
self.name = name | ||
self.contextvar: 'ContextVar[object]' = ContextVar(name) | ||
self.default: typing.Callable[..., object] | ||
self.default = base_context.wrap_callable(default) | ||
|
||
def clear(self) -> None: | ||
self.contextvar.set(self.default()) | ||
|
||
def get(self) -> 'object': | ||
try: | ||
return self.contextvar.get() | ||
except LookupError: | ||
value = self.default() | ||
self.set(value) | ||
return value | ||
|
||
def set(self, value: 'object') -> None: | ||
self.contextvar.set(value) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
# Copyright 2019, OpenTelemetry Authors | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import threading | ||
import typing | ||
|
||
|
||
def wrap_callable(target: 'object') -> typing.Callable[[], object]: | ||
if callable(target): | ||
return target | ||
return lambda: target | ||
|
||
|
||
class BaseRuntimeContext: | ||
class Slot: | ||
def __init__(self, name: str, default: 'object'): | ||
raise NotImplementedError | ||
|
||
def clear(self) -> None: | ||
raise NotImplementedError | ||
|
||
def get(self) -> 'object': | ||
raise NotImplementedError | ||
|
||
def set(self, value: 'object') -> None: | ||
raise NotImplementedError | ||
|
||
_lock = threading.Lock() | ||
_slots: typing.Dict[str, Slot] = {} | ||
|
||
@classmethod | ||
def clear(cls) -> None: | ||
"""Clear all slots to their default value.""" | ||
keys = cls._slots.keys() | ||
for name in keys: | ||
slot = cls._slots[name] | ||
slot.clear() | ||
|
||
@classmethod | ||
def register_slot(cls, name: str, default: 'object' = None) -> 'Slot': | ||
"""Register a context slot with an optional default value. | ||
|
||
:type name: str | ||
:param name: The name of the context slot. | ||
|
||
:type default: object | ||
:param name: The default value of the slot, can be a value or lambda. | ||
|
||
:returns: The registered slot. | ||
""" | ||
with cls._lock: | ||
if name in cls._slots: | ||
raise ValueError('slot {} already registered'.format(name)) | ||
slot = cls.Slot(name, default) | ||
cls._slots[name] = slot | ||
return slot | ||
|
||
def apply(self, snapshot: typing.Dict[str, 'object']) -> None: | ||
"""Set the current context from a given snapshot dictionary""" | ||
|
||
for name in snapshot: | ||
setattr(self, name, snapshot[name]) | ||
|
||
def snapshot(self) -> typing.Dict[str, 'object']: | ||
"""Return a dictionary of current slots by reference.""" | ||
|
||
keys = self._slots.keys() | ||
return dict((n, self._slots[n].get()) for n in keys) | ||
|
||
def __repr__(self) -> str: | ||
return '{}({})'.format(type(self).__name__, self.snapshot()) | ||
|
||
def __getattr__(self, name: str) -> 'object': | ||
if name not in self._slots: | ||
self.register_slot(name, None) | ||
slot = self._slots[name] | ||
return slot.get() | ||
|
||
def __setattr__(self, name: str, value: 'object') -> None: | ||
if name not in self._slots: | ||
self.register_slot(name, None) | ||
slot = self._slots[name] | ||
slot.set(value) | ||
|
||
def with_current_context( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would there be value in exposing a contextmanager like this as well? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know at this moment. I'm open to add it if we find it helpful. |
||
self, | ||
func: typing.Callable[..., 'object'], | ||
) -> typing.Callable[..., 'object']: | ||
"""Capture the current context and apply it to the provided func. | ||
""" | ||
|
||
caller_context = self.snapshot() | ||
|
||
def call_with_current_context( | ||
*args: 'object', | ||
**kwargs: 'object', | ||
) -> 'object': | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we don't expect users to bring their own context impl (as we do with trace and metrics) I think it's fine to lose the type annotations in this module, especially when they're this useless. |
||
try: | ||
backup_context = self.snapshot() | ||
self.apply(caller_context) | ||
return func(*args, **kwargs) | ||
finally: | ||
self.apply(backup_context) | ||
|
||
return call_with_current_context |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
# Copyright 2019, OpenTelemetry Authors | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import threading | ||
import typing | ||
|
||
from . import base_context | ||
|
||
|
||
class ThreadLocalRuntimeContext(base_context.BaseRuntimeContext): | ||
class Slot(base_context.BaseRuntimeContext.Slot): | ||
_thread_local = threading.local() | ||
|
||
def __init__(self, name: str, default: 'object'): | ||
# pylint: disable=super-init-not-called | ||
self.name = name | ||
self.default: typing.Callable[..., object] | ||
self.default = base_context.wrap_callable(default) | ||
|
||
def clear(self) -> None: | ||
setattr(self._thread_local, self.name, self.default()) | ||
|
||
def get(self) -> 'object': | ||
try: | ||
got: object = getattr(self._thread_local, self.name) | ||
return got | ||
except AttributeError: | ||
value = self.default() | ||
self.set(value) | ||
return value | ||
|
||
def set(self, value: 'object') -> None: | ||
setattr(self._thread_local, self.name, value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this helps clear up the value of with_current_context. Nice!
I feel like the need to add with_current_context will probably be a gotcha in many cases. It's too bad there isn't a way to make this something that is implicitly shared.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the day comes that contextvars or some built-in Python lib support such scenario, we can say goodbye to this
Context
class and never need to reinvent wheels again :)