Sign in
agent:
Auto Exec

Queries Elasticsearch to fetch the latest logs from a list of specified services with required fields

There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.

Queries Elasticsearch to fetch the latest logs from specified services as target_services with specified fields like ["@timestamp", "resource.service.name", "body"]

For Example:

Query Elasticsearch index 'otel-log-*' to get the 10 latest documents where resource.service.name is in ['frontend-proxy','frontend','product-catalog'], sorted by @timestamp descending, returning @timestamp, resource.service.name, and body fields

Expects Inputs: index_pattern (str), target_services (array), log_count (int). If log_count not specified assume 10.

import requests import json from urllib.parse import urlparse # Get Elasticsearch URL from environment elastic_url = getEnvVar('ELASTIC_URL_OTEL') # Parse URL to determine if SSL should be used parsed_url = urlparse(elastic_url) use_ssl = parsed_url.scheme == 'https' try: # Construct the search query to get logs for multiple services search_query = { "query": { "terms": { "resource.service.name.keyword": target_services } }, "sort": [ { "@timestamp": { "order": "desc" } } ], "size": log_count, "_source": [ "@timestamp", "resource.service.name", "body" ] } # Make request to search for logs response = requests.post( f"{elastic_url}/{index_pattern}/_search", headers={'Content-Type': 'application/json'}, data=json.dumps(search_query), verify=use_ssl, timeout=30 ) if response.status_code == 200: search_results = response.json() # Extract hits and total count hits = search_results.get('hits', {}) total_matches = hits.get('total', {}).get('value', 0) log_entries = hits.get('hits', []) # Process the logs to extract relevant information filtered_logs = [] for log_entry in log_entries: source = log_entry.get('_source', {}) # Extract timestamp (handle array format) timestamp = source.get('@timestamp', ['']) if isinstance(timestamp, list) and len(timestamp) > 0: timestamp = timestamp[0] # Extract service name (handle array format) service_name = source.get('resource.service.name', ['']) if isinstance(service_name, list) and len(service_name) > 0: service_name = service_name[0] # Extract body (handle array format) body = source.get('body', ['']) if isinstance(body, list) and len(body) > 0: body = body[0] log_info = { 'timestamp': timestamp, 'service_name': service_name, 'body': body } filtered_logs.append(log_info) fetch_successful = True print(f"Successfully fetched {len(filtered_logs)} logs from services {target_services}") print(f"Total logs available for these services: {total_matches}") # Print a summary of the logs for i, log in enumerate(filtered_logs, 1): print(f"\nLog {i}:") print(f" Timestamp: {log['timestamp']}") print(f" Service: {log['service_name']}") print(f" Body: {log['body'][:100]}{'...' if len(log['body']) > 100 else ''}") else: print(f"Error fetching logs: HTTP {response.status_code}") print(f"Response: {response.text}") filtered_logs = [] total_matches = 0 fetch_successful = False except Exception as e: print(f"Exception occurred while fetching logs: {str(e)}") filtered_logs = [] total_matches = 0 fetch_successful = False print(f"\nOutput parameters:") print(f"filtered_logs: {json.dumps(filtered_logs, indent=2)}") print(f"total_matches: {total_matches}") print(f"fetch_successful: {fetch_successful}")
copied