-
-
Save guenp/080986d8077e711e5d2fe28aef72bfa4 to your computer and use it in GitHub Desktop.
api-telemetry
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
docker run --name jaeger \ SIGINT(2) ↵ | |
-e COLLECTOR_OTLP_ENABLED=true \ | |
-p 16686:16686 \ | |
-p 4317:4317 \ | |
-p 4318:4318 \ | |
jaegertracing/all-in-one:1.35 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import os.path | |
import inspect | |
from typing import Sequence | |
from opentelemetry import trace | |
from opentelemetry.sdk.resources import Resource, SERVICE_NAME | |
from opentelemetry.sdk.trace import TracerProvider | |
from opentelemetry.sdk.trace.export import BatchSpanProcessor | |
from opentelemetry.sdk.trace.export import ConsoleSpanExporter | |
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter | |
from contextlib import wraps | |
from importlib.abc import MetaPathFinder | |
from importlib.util import spec_from_loader | |
from importlib.machinery import SourceFileLoader | |
ALLOWED_TYPES = [bool, str, bytes, int, float] | |
__all__ = ["span"] | |
resource = Resource(attributes={ | |
"service.name": "scipy-service" | |
}) | |
trace.set_tracer_provider(TracerProvider(resource=resource)) | |
tracer = trace.get_tracer(__name__) | |
agent_host_name = os.environ.get("JAEGER_HOST", "localhost") | |
agent_port = os.environ.get("JAEGER_PORT", 4317) | |
endpoint = f"http://{agent_host_name}:{agent_port}" | |
otlp_exporter = OTLPSpanExporter(endpoint=endpoint, insecure=True) | |
trace.get_tracer_provider().add_span_processor( | |
BatchSpanProcessor(otlp_exporter) | |
) | |
def _get_func_name(func): | |
return f"{func.__module__}.{func.__qualname__}" | |
def _serialize(arg): | |
for _type in ALLOWED_TYPES: | |
if isinstance(arg, _type): | |
return arg | |
if isinstance(arg, Sequence) and len(arg) > 0: | |
if isinstance(arg[0], _type): | |
return arg | |
return str(arg) | |
def span(func): | |
# Creates a tracer from the global tracer provider | |
tracer = trace.get_tracer(__name__) | |
func_name = _get_func_name(func) | |
@wraps(func) | |
def span_wrapper(*args, **kwargs): | |
print(args, kwargs) | |
import inspect | |
stack = inspect.stack() | |
print([stack[n][3] for n in range(len(stack))]) | |
with tracer.start_as_current_span(func_name) as span: | |
for n, arg in enumerate(args): | |
span.set_attribute(f"args.{n}", _serialize(arg)) | |
for k, v in kwargs.items(): | |
span.set_attribute(f"kwargs.{k}", v) | |
return func(*args, **kwargs) | |
return span_wrapper | |
class TelemetryMetaFinder(MetaPathFinder): | |
def __init__(self, module_names, *args, **kwargs): | |
"""MetaPathFinder implementation that overrides a spec loader | |
of type SourceFileLoader with a TelemetrySpanLoader. | |
Args: | |
module_names (List[str]): Module names to include. | |
""" | |
self._module_names = module_names | |
super().__init__(*args, **kwargs) | |
def find_spec(self, fullname, path, target=None): | |
if any([name in fullname for name in self._module_names]): | |
for finder in sys.meta_path: | |
if finder != self: | |
spec = finder.find_spec(fullname, path, target) | |
if spec is not None: | |
if isinstance(spec.loader, SourceFileLoader): | |
return spec_from_loader( | |
name=spec.name, | |
loader=TelemetrySpanSourceFileLoader( | |
spec.name, | |
spec.origin | |
), | |
origin=spec.origin | |
) | |
else: | |
return spec | |
return None | |
class TelemetrySpanSourceFileLoader(SourceFileLoader): | |
def exec_module(self, module): | |
super().exec_module(module) | |
functions = inspect.getmembers(module, predicate=inspect.isfunction) | |
classes = inspect.getmembers(module, predicate=inspect.isclass) | |
# Add telemetry to functions | |
for name, _function in functions: | |
_module = inspect.getmodule(_function) | |
if module == _module: | |
setattr(_module, name, span(_function)) | |
# Add telemetry to methods | |
for _, _class in classes: | |
for name, method in inspect.getmembers( | |
_class, | |
predicate=inspect.isfunction | |
): | |
if inspect.getmodule(_class) == module: | |
if not name.startswith("_"): | |
setattr(_class, name, span(method)) | |
def install(module_names): | |
"""Inserts the finder into the import machinery""" | |
sys.meta_path.insert(0, TelemetryMetaFinder(module_names)) | |
if __name__ == "__main__": | |
install( | |
[ | |
"scipy.stats._distn_infrastructure" | |
] | |
) | |
from scipy import stats | |
stats.norm.pdf(x=1, loc=1, scale=0.01) | |
stats.norm(loc=1, scale=0.01).pdf(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment