Sign in
agent:
Auto Exec

Search and retrieve recent logs from Elasticsearch for specific services containing target keywords.

There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.

This workflow queries Elasticsearch to find logs from designated target_services like (frontend-proxy, frontend, product-catalog) within a specified time window. It searches for logs containing specific keywords in either the URL path or message body, such as '/api/checkout'. The query returns the limit say 30 most recent matching logs, sorted by timestamp in descending order. Key fields extracted include timestamp, service name, pod name, URL details, message body, and tracing identifiers for debugging purposes.

import requests import json from datetime import datetime, timedelta from urllib.parse import urljoin checkout_related_logs = {} # Get Elasticsearch URL elastic_url = getEnvVar('ELASTIC_URL_OTEL') if not elastic_url.endswith('/'): elastic_url += '/' # Time range end_time = datetime.utcnow() start_time = end_time - timedelta(minutes=n_minutes) # Build query with OR condition for URL path and body query = { "size": limit, "query": { "bool": { "must": [ { "range": { "@timestamp": { "gte": start_time.isoformat() + "Z", "lte": end_time.isoformat() + "Z" } } }, { "terms": { "resource.service.name.keyword": target_services } }, { "bool": { "should": [ { "wildcard": { "attributes.url.path.keyword": { "value": f"*{keyword}*", "case_insensitive": True } } }, { "wildcard": { "body": { "value": f"*{keyword}*", "case_insensitive": True } } } ], "minimum_should_match": 1 } } ] } }, "sort": [ {"@timestamp": {"order": "desc"}} ], "_source": [ "@timestamp", "resource.service.name", "resource.k8s.pod.name", "attributes.url.path", "attributes.url.full", "body", "traceId", "spanId", "severity.text" ] } # Execute # search_url = urljoin(elastic_url, f"{index_pattern}/_search") # resp = requests.post(search_url, json=query, headers={'Content-Type': 'application/json'}) # if resp.status_code != 200: # print(f"Error querying Elasticsearch: {resp.status_code} - {resp.text}") # checkout_related_logs = {"error": f"Failed to query Elasticsearch: {resp.status_code}"} # else: # data = resp.json() # result = { # "total_hits": data.get("hits", {}).get("total", {}).get("value", 0), # "time_range": { # "start": start_time.isoformat() + "Z", # "end": end_time.isoformat() + "Z", # "duration_minutes": n_minutes # }, # "keyword_searched": keyword, # "logs": [] # } # # Process hits # for hit in data.get("hits", {}).get("hits", []): # src = hit.get("_source", {}) # body_text = src.get("body", "") or "" # log_entry = { # "timestamp": src.get("@timestamp", ""), # "service_name": src.get("resource", {}).get("service.name", ""), # "pod_name": src.get("resource", {}).get("k8s.pod.name", ""), # "url_path": src.get("attributes", {}).get("url.path", ""), # "url_full": src.get("attributes", {}).get("url.full", ""), # "body": body_text[:500] + ("..." if len(body_text) > 500 else ""), # "trace_id": src.get("traceId", ""), # "span_id": src.get("spanId", ""), # "severity": src.get("severity", {}).get("text", "") # } # result["logs"].append(log_entry) # checkout_related_logs = result # Execute search_url = urljoin(elastic_url, f"{index_pattern}/_search") try: resp = requests.post( search_url, json=query, headers={'Content-Type': 'application/json'}, timeout=6, ) resp.raise_for_status() data = resp.json() except requests.exceptions.RequestException as e: print(f"Error querying Elasticsearch: {e}") checkout_related_logs = {"error": f"Failed to query Elasticsearch: {e}"} else: # Safe parsing of response hits_root = (data.get("hits") or {}) total_hits = (hits_root.get("total") or {}).get("value", 0) hits = hits_root.get("hits", []) result = { "total_hits": total_hits, "time_range": { "start": start_time.isoformat() + "Z", "end": end_time.isoformat() + "Z", "duration_minutes": n_minutes }, "keyword_searched": keyword, "logs": [] } for hit in hits: src = hit.get("_source", {}) if isinstance(hit, dict) else {} body_text = src.get("body", "") or "" res = src.get("resource", {}) or {} attrs = src.get("attributes", {}) or {} log_entry = { "timestamp": src.get("@timestamp", ""), "service_name": res.get("service.name", ""), "pod_name": res.get("k8s.pod.name", ""), "url_path": attrs.get("url.path", ""), "url_full": attrs.get("url.full", ""), "body": body_text[:500] + ("..." if len(body_text) > 500 else ""), "trace_id": src.get("traceId", ""), "span_id": src.get("spanId", ""), "severity": (src.get("severity", {}) or {}).get("text", "") } result["logs"].append(log_entry) checkout_related_logs = result print(json.dumps(checkout_related_logs, indent=4, default=str))
copied