Sign in
agent:
Auto Exec

Trace-based log analysis across microservices or services in Elasticsearch for distributed request tracking using problematic trace_ids from jaeger

There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.

This workflow queries Elasticsearch to retrieve logs related to a specific trace ID for example: (15fc0a6356862952ddfcdabf0983d53d) say from OpenTelemetry data over the last 60 minutes. It searches across multiple fields including traceId, attributes.otelTraceID, spanId, and message body to capture all related log entries. The results are sorted chronologically and limited to the number set by limit, displaying key fields like timestamp, service name, pod name, severity, and message body.

Additionally, it provides per-service log counts to visualize the end-to-end request flow across different microservices in the distributed system. Use trace_id to track the request flow across multiple services.

import json import requests from datetime import datetime, timedelta from collections import defaultdict elastic_url = getEnvVar('ELASTIC_URL_OTEL') if not elastic_url.endswith('/'): elastic_url += '/' logs_data = {} # you provide these elsewhere: # lookback_minutes, index_pattern="otel-logs-*", max_logs=200 # trace_id: str | "" end_time = datetime.utcnow() start_time = end_time - timedelta(minutes=lookback_minutes) start_ts = start_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ') end_ts = end_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ') # Match ONLY by trace_id. If not provided, return nothing. should_clauses = [] if trace_id: should_clauses += [ {"term": {"traceId.keyword": trace_id}}, {"term": {"attributes.otelTraceID.keyword": trace_id}}, ] else: should_clauses = [{"match_none": {}}] query = { "query": { "bool": { "must": [ {"range": {"@timestamp": {"gte": start_ts, "lte": end_ts}}}, {"bool": {"should": should_clauses, "minimum_should_match": 1}} ] } }, "sort": [{"@timestamp": {"order": "asc"}}], "size": max_logs, "_source": [ "@timestamp", "resource.service.name", "resource.k8s.pod.name", "severity.text", "severity.number", "body", "traceId", "spanId", "attributes.otelTraceID", "attributes.otelSpanID", ] } try: resp = requests.post( f"{elastic_url}{index_pattern}/_search", json=query, headers={"Content-Type": "application/json"}, timeout=6, ) resp.raise_for_status() result = resp.json() except requests.exceptions.RequestException as e: print(f"Error querying Elasticsearch: {e}") logs_data = {"error": f"Failed to query Elasticsearch: {e}"} else: hits = (result.get("hits") or {}).get("hits", []) processed_logs = [] service_counts = defaultdict(int) for h in hits: src = h.get("_source", {}) if isinstance(h, dict) else {} res = (src.get("resource") or {}) svc = res.get("service.name", "unknown") pod = res.get("k8s.pod.name", "unknown") sev_obj = (src.get("severity") or {}) sev_text = sev_obj.get("text", "") sev_num = sev_obj.get("number", "") body = (src.get("body") or "") if len(body) > 300: body = body[:297] + "..." processed_logs.append({ "timestamp": src.get("@timestamp", ""), "service_name": svc, "pod_name": pod, "severity": sev_text or (str(sev_num) if sev_num != "" else ""), "body": body, "trace_id": src.get("traceId", (src.get("attributes") or {}).get("otelTraceID", "")), "span_id": src.get("spanId", (src.get("attributes") or {}).get("otelSpanID", "")), # shown for context only }) service_counts[svc] += 1 logs_data = { "total_logs_found": len(processed_logs), "time_range": {"start": start_ts, "end": end_ts, "lookback_minutes": lookback_minutes}, "search_criteria": {"trace_id": trace_id}, "logs": processed_logs, "service_counts": dict(service_counts), "service_flow_summary": { "total_services": len(service_counts), "services_involved": list(service_counts.keys()) } } print(json.dumps(logs_data, indent=4))
copied