agent: | Auto Exec |
Basic Elasticsearch Operations
Add credentials for various integrations
What is an "Expert"? How do we create our own expert?
Managing workspaces and access control
DagKnows Architecture Overview
Setting up SSO via Azure AD for Dagknows
Enable "Auto Exec" and "Send Execution Result to LLM" in "Adjust Settings" if desired
(Optionally) Add ubuntu user to docker group and refresh group membership
Deployment of an EKS Cluster with Worker Nodes in AWS
Adding, Deleting, Listing DagKnows Proxy credentials or key-value pairs
Comprehensive AWS Security and Compliance Evaluation Workflow (SOC2 Super Runbook)
AWS EKS Version Update 1.29 to 1.30 via terraform
Instruction to allow WinRM connection
MSP Usecase: User Onboarding Azure + M365
Post a message to a Slack channel
How to debug a kafka cluster and kafka topics?
Open VPN Troubleshooting (Powershell)
Execute a simple task on the proxy
Assign the proxy role to a user
Create roles to access credentials in proxy
Install OpenVPN client on Windows laptop
Setup Kubernetes kubectl and Minikube on Ubuntu 22.04 LTS
Install Prometheus and Grafana on the minikube cluster on EC2 instance in the monitoring namespace
update the EKS versions in different clusters
AI agent session 2024-09-12T09:36:14-07:00 by Sarang Dharmapurikar
Parse EDN content and give a JSON out
Check whether a user is there on Azure AD and if the user account status is enabled
Get the input parameters of a Jenkins pipeline
Get the console output of last Jenkins job build
Get last build status for a Jenkins job
Trigger a Jenkins job with param values
Give me steps to do health checks on a Linux Server
Process Zendesk Ticket for updating comments (auto reply)
Add a public comment to a Zendesk Ticket
Identify list out IAM users list in AWS using dagknows
Restoring an AWS Redshift Cluster from a Snapshot
Basic Elasticsearch Operations
- 1lCqEmwQWUaKyYCTnwIRiList my elasticsearch indices
1
List my elasticsearch indices
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Retrieves and lists all Elasticsearch indices using the _cat/indices APIinputsoutputsimport requests import json from urllib.parse import urlparse # Get Elasticsearch URL from environment elastic_url = getEnvVar('ELASTIC_URL_OTEL') # Parse URL to determine if SSL should be used parsed_url = urlparse(elastic_url) use_ssl = parsed_url.scheme == 'https' try: # Make request to get all indices response = requests.get(f"{elastic_url}/_cat/indices?format=json", verify=use_ssl, timeout=30) if response.status_code == 200: indices_data = response.json() # Extract index names indices = [index_info['index'] for index_info in indices_data] index_count = len(indices) print(f"Found {index_count} indices:") for i, index_name in enumerate(indices, 1): print(f"{i}. {index_name}") else: print(f"Error retrieving indices: HTTP {response.status_code}") print(f"Response: {response.text}") indices = [] index_count = 0 except Exception as e: print(f"Exception occurred while retrieving indices: {str(e)}") indices = [] index_count = 0 print(f"\nOutput parameters:") print(f"indices: {json.dumps(indices, indent=2)}") print(f"index_count: {index_count}")copied1 - 2tbY1dJkoAALX3MCrMwRbFetch the most recent 5 logs from the elasticsearch index <index_name> in last n minutes <lookback_minutes>
2
Fetch the most recent 5 logs from the elasticsearch index <index_name> in last n minutes <lookback_minutes>
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task fetches the most recent logs from the specified Elasticsearch index sorted by timestamp in descending order
Requires index_name (str), log_count (int) and lookback_minutes (int) as inputs. Assume log_count as 5 and lookback_minutes as 15 if it is not specified.
inputsoutputsimport requests import json from urllib.parse import urlparse from datetime import datetime # === Config / Inputs === # These are expected to be provided by your runtime/environment: # - ELASTIC_URL_OTEL (env var, via getEnvVar) # - index_name (string) # - log_count (int) # Optionally provide lookback_minutes; otherwise default to 15. try: if not lookback_minutes or not isinstance(lookback_minutes, (int, float)) or lookback_minutes <= 0: print("[INFO] lookback_minutes not set or invalid — defaulting to 15") lookback_minutes = 15 else: print(f"[INFO] Using lookback_minutes={lookback_minutes}") except NameError: print("[INFO] lookback_minutes not defined — defaulting to 15") lookback_minutes = 15 elastic_url = getEnvVar('ELASTIC_URL_OTEL') # Parse URL to determine if SSL should be used parsed_url = urlparse(elastic_url) use_ssl = parsed_url.scheme == 'https' # ---- Build time filter and query ---- print(f"Fetching last {lookback_minutes} minute(s) of logs from index '{index_name}'") time_filter = { "range": { "@timestamp": { "gte": f"now-{lookback_minutes}m", "lte": "now" } } } search_query = { "query": { "bool": { "filter": [ time_filter # restrict to recent window ] } }, "sort": [ {"@timestamp": {"order": "desc"}} ], "size": log_count, "_source": [ "@timestamp", "body", "severity.text", "resource.service.name", "resource.k8s.pod.name", "resource.k8s.namespace.name" ] } try: # Make request to search for logs response = requests.post( f"{elastic_url}/{index_name}/_search", headers={'Content-Type': 'application/json'}, data=json.dumps(search_query), verify=use_ssl, timeout=30 ) if response.status_code == 200: search_results = response.json() # Extract hits and total count hits = search_results.get('hits', {}) total_hits = hits.get('total', {}).get('value', 0) log_entries = hits.get('hits', []) # Process the logs to extract relevant information recent_logs = [] for log_entry in log_entries: source = log_entry.get('_source', {}) # Extract timestamp (handle array format) timestamp = source.get('@timestamp', ['']) if isinstance(timestamp, list) and len(timestamp) > 0: timestamp = timestamp[0] # Extract body (handle array format) body = source.get('body', ['']) if isinstance(body, list) and len(body) > 0: body = body[0] # Extract severity (handle array format) severity = source.get('severity.text', ['']) if isinstance(severity, list) and len(severity) > 0: severity = severity[0] # Extract service name (handle array format) service_name = source.get('resource.service.name', ['']) if isinstance(service_name, list) and len(service_name) > 0: service_name = service_name[0] # Extract pod name (handle array format) pod_name = source.get('resource.k8s.pod.name', ['']) if isinstance(pod_name, list) and len(pod_name) > 0: pod_name = pod_name[0] # Extract namespace (handle array format) namespace = source.get('resource.k8s.namespace.name', ['']) if isinstance(namespace, list) and len(namespace) > 0: namespace = namespace[0] log_info = { 'timestamp': timestamp, 'body': body, 'severity': severity, 'service_name': service_name, 'pod_name': pod_name, 'namespace': namespace, 'index': log_entry.get('_index', ''), 'id': log_entry.get('_id', '') } recent_logs.append(log_info) fetch_successful = True print(f"Successfully fetched {len(recent_logs)} logs from index '{index_name}'") print(f"Total logs available in index (all time): {total_hits}") # Print a summary of the logs for i, log in enumerate(recent_logs, 1): print(f"\nLog {i}:") print(f" Timestamp: {log['timestamp']}") print(f" Service: {log['service_name']}") print(f" Severity: {log['severity']}") print(f" Body: {log['body'][:100]}{'...' if len(log['body']) > 100 else ''}") else: print(f"Error fetching logs: HTTP {response.status_code}") print(f"Response: {response.text}") recent_logs = [] total_hits = 0 fetch_successful = False except Exception as e: print(f"Exception occurred while fetching logs: {str(e)}") recent_logs = [] total_hits = 0 fetch_successful = False print(f"\nOutput parameters:") print(f"recent_logs: {json.dumps(recent_logs, indent=2)}") print(f"total_hits: {total_hits}") print(f"fetch_successful: {fetch_successful}")copied2 - 3ICMRYOUdNGBZ9bYtEELkFetches the latest 10 logs from Elasticsearch for a specific service, sorted by timestamp in descending order
3
Fetches the latest 10 logs from Elasticsearch for a specific service, sorted by timestamp in descending order
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task Fetches the latest 10 logs from Elasticsearch for a specific service, sorted by timestamp in descending order.
For Example:
From index <INDEX_PATTERN> for eg otel-logs-* get the 10 latest documents sorted by @timestamp desc where resource.service.name=<SERVICE> for eg: "product-catalog" Return @timestamp, severity.text, body.
Expects index_pattern (str), service_name (str) and log_count (int) as input.
inputsoutputsimport requests import json from urllib.parse import urlparse # Get Elasticsearch URL from environment elastic_url = getEnvVar('ELASTIC_URL_OTEL') # Parse URL to determine if SSL should be used parsed_url = urlparse(elastic_url) use_ssl = parsed_url.scheme == 'https' try: # Construct the search query to get logs for specific service search_query = { "query": { "term": { "resource.service.name.keyword": service_name } }, "sort": [ { "@timestamp": { "order": "desc" } } ], "size": log_count, "_source": [ "@timestamp", "severity.text", "body" ] } # Make request to search for logs response = requests.post( f"{elastic_url}/{index_pattern}/_search", headers={'Content-Type': 'application/json'}, data=json.dumps(search_query), verify=use_ssl, timeout=30 ) if response.status_code == 200: search_results = response.json() # Extract hits and total count hits = search_results.get('hits', {}) total_matches = hits.get('total', {}).get('value', 0) log_entries = hits.get('hits', []) # Process the logs to extract relevant information service_logs = [] for log_entry in log_entries: source = log_entry.get('_source', {}) # Extract timestamp (handle array format) timestamp = source.get('@timestamp', ['']) if isinstance(timestamp, list) and len(timestamp) > 0: timestamp = timestamp[0] # Extract severity (handle array format) severity = source.get('severity.text', ['']) if isinstance(severity, list) and len(severity) > 0: severity = severity[0] # Extract body (handle array format) body = source.get('body', ['']) if isinstance(body, list) and len(body) > 0: body = body[0] log_info = { 'timestamp': timestamp, 'severity': severity, 'body': body } service_logs.append(log_info) fetch_successful = True print(f"Successfully fetched {len(service_logs)} logs for service '{service_name}' from index pattern '{index_pattern}'") print(f"Total logs available for this service: {total_matches}") # Print a summary of the logs for i, log in enumerate(service_logs[:5], 1): # Show first 5 for summary print(f"\nLog {i}:") print(f" Timestamp: {log['timestamp']}") print(f" Severity: {log['severity']}") print(f" Body: {log['body'][:100]}{'...' if len(log['body']) > 100 else ''}") if len(service_logs) > 5: print(f"\n... and {len(service_logs) - 5} more logs") else: print(f"Error fetching logs: HTTP {response.status_code}") print(f"Response: {response.text}") service_logs = [] total_matches = 0 fetch_successful = False except Exception as e: print(f"Exception occurred while fetching logs: {str(e)}") service_logs = [] total_matches = 0 fetch_successful = False print(f"\nOutput parameters:") print(f"service_logs: {json.dumps(service_logs, indent=2)}") print(f"total_matches: {total_matches}") print(f"fetch_successful: {fetch_successful}")copied3 - 4tqfrZNCH1ZlE6lOSH2p0Queries Elasticsearch to fetch the latest logs from a list of specified services with required fields
4
Queries Elasticsearch to fetch the latest logs from a list of specified services with required fields
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Queries Elasticsearch to fetch the latest logs from specified services as target_services with specified fields like ["@timestamp", "resource.service.name", "body"]
For Example:
Query Elasticsearch index 'otel-log-*' to get the 10 latest documents where resource.service.name is in ['frontend-proxy','frontend','product-catalog'], sorted by @timestamp descending, returning @timestamp, resource.service.name, and body fields
Expects Inputs: index_pattern (str), target_services (array), log_count (int). If log_count not specified assume 10.
inputsoutputsimport requests import json from urllib.parse import urlparse # Get Elasticsearch URL from environment elastic_url = getEnvVar('ELASTIC_URL_OTEL') # Parse URL to determine if SSL should be used parsed_url = urlparse(elastic_url) use_ssl = parsed_url.scheme == 'https' try: # Construct the search query to get logs for multiple services search_query = { "query": { "terms": { "resource.service.name.keyword": target_services } }, "sort": [ { "@timestamp": { "order": "desc" } } ], "size": log_count, "_source": [ "@timestamp", "resource.service.name", "body" ] } # Make request to search for logs response = requests.post( f"{elastic_url}/{index_pattern}/_search", headers={'Content-Type': 'application/json'}, data=json.dumps(search_query), verify=use_ssl, timeout=30 ) if response.status_code == 200: search_results = response.json() # Extract hits and total count hits = search_results.get('hits', {}) total_matches = hits.get('total', {}).get('value', 0) log_entries = hits.get('hits', []) # Process the logs to extract relevant information filtered_logs = [] for log_entry in log_entries: source = log_entry.get('_source', {}) # Extract timestamp (handle array format) timestamp = source.get('@timestamp', ['']) if isinstance(timestamp, list) and len(timestamp) > 0: timestamp = timestamp[0] # Extract service name (handle array format) service_name = source.get('resource.service.name', ['']) if isinstance(service_name, list) and len(service_name) > 0: service_name = service_name[0] # Extract body (handle array format) body = source.get('body', ['']) if isinstance(body, list) and len(body) > 0: body = body[0] log_info = { 'timestamp': timestamp, 'service_name': service_name, 'body': body } filtered_logs.append(log_info) fetch_successful = True print(f"Successfully fetched {len(filtered_logs)} logs from services {target_services}") print(f"Total logs available for these services: {total_matches}") # Print a summary of the logs for i, log in enumerate(filtered_logs, 1): print(f"\nLog {i}:") print(f" Timestamp: {log['timestamp']}") print(f" Service: {log['service_name']}") print(f" Body: {log['body'][:100]}{'...' if len(log['body']) > 100 else ''}") else: print(f"Error fetching logs: HTTP {response.status_code}") print(f"Response: {response.text}") filtered_logs = [] total_matches = 0 fetch_successful = False except Exception as e: print(f"Exception occurred while fetching logs: {str(e)}") filtered_logs = [] total_matches = 0 fetch_successful = False print(f"\nOutput parameters:") print(f"filtered_logs: {json.dumps(filtered_logs, indent=2)}") print(f"total_matches: {total_matches}") print(f"fetch_successful: {fetch_successful}")copied4 - 5zs1OltDe4h56LBMzAUweList my elasticsearch indices to give me an index pattern name I can search the logs for
5
List my elasticsearch indices to give me an index pattern name I can search the logs for
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Lists all Elasticsearch indices and analyzes them to recommend the best index pattern for log searching.
This task outputs a recommended_index_pattern which can be used for searching logs in elasticsearch.
inputsoutputsimport requests import json from urllib.parse import urlparse # Get Elasticsearch URL from environment elastic_url = getEnvVar('ELASTIC_URL_OTEL') # Parse URL to determine if SSL should be used parsed_url = urlparse(elastic_url) use_ssl = parsed_url.scheme == 'https' try: # Make request to get all indices response = requests.get(f"{elastic_url}/_cat/indices?format=json", verify=use_ssl, timeout=30) if response.status_code == 200: indices_data = response.json() # Extract index names indices = [index_info['index'] for index_info in indices_data] index_count = len(indices) # Analyze indices to recommend a pattern log_indices = [idx for idx in indices if 'log' in idx.lower()] otel_indices = [idx for idx in indices if 'otel' in idx.lower()] # Determine the best index pattern if otel_indices: # Check if there are date-based otel-logs indices otel_log_indices = [idx for idx in otel_indices if 'log' in idx.lower()] if otel_log_indices: INDEX_PATTERN = "otel-logs-*" pattern_explanation = f"Found {len(otel_log_indices)} OpenTelemetry log indices. The pattern 'otel-logs-*' will match all date-based log indices like {', '.join(otel_log_indices[:3])}{'...' if len(otel_log_indices) > 3 else ''}." else: INDEX_PATTERN = "otel-*" pattern_explanation = f"Found {len(otel_indices)} OpenTelemetry indices. The pattern 'otel-*' will match all OpenTelemetry indices." elif log_indices: INDEX_PATTERN = "*log*" pattern_explanation = f"Found {len(log_indices)} log-related indices. The pattern '*log*' will match all indices containing 'log' in their name." else: INDEX_PATTERN = "*" pattern_explanation = "No specific log indices detected. Using '*' to match all indices, but this may include non-log data." print(f"Found {index_count} indices:") for i, index_name in enumerate(indices, 1): print(f"{i}. {index_name}") print(f"\nRecommended index pattern: {INDEX_PATTERN}") print(f"Explanation: {pattern_explanation}") else: print(f"Error retrieving indices: HTTP {response.status_code}") print(f"Response: {response.text}") indices = [] index_count = 0 INDEX_PATTERN = "*" pattern_explanation = "Failed to retrieve indices, defaulting to '*' pattern" except Exception as e: print(f"Exception occurred while retrieving indices: {str(e)}") indices = [] index_count = 0 INDEX_PATTERN = "*" pattern_explanation = "Exception occurred, defaulting to '*' pattern" print(f"\nOutput parameters:") print(f"indices: {json.dumps(indices, indent=2)}") print(f"index_count: {index_count}") print(f"INDEX_PATTERN: {INDEX_PATTERN}") print(f"pattern_explanation: {pattern_explanation}")copied5 - 6LTkcIDKMnfJEurqZa3KEIn 'otel-log-*' over last <N_MINUTES> minutes, where resource.service.name in target_services ["frontend-proxy","frontend","product-catalog"] and severity.text in ['WARN','ERROR','FATAL'], group by resource.service.name and attributes.code.function.name. Return counts and top sample body per group.
6
In 'otel-log-*' over last <N_MINUTES> minutes, where resource.service.name in target_services ["frontend-proxy","frontend","product-catalog"] and severity.text in ['WARN','ERROR','FATAL'], group by resource.service.name and attributes.code.function.name. Return counts and top sample body per group.
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Queries Elasticsearch for logs from target services with WARN/ERROR/FATAL severity levels, groups by service and function name, and returns counts with sample bodiesinputsoutputsimport requests import json from urllib.parse import urlparse from datetime import datetime, timedelta # Get Elasticsearch URL from environment elastic_url = getEnvVar('ELASTIC_URL_OTEL') # Parse URL to determine if SSL should be used parsed_url = urlparse(elastic_url) use_ssl = parsed_url.scheme == 'https' # Calculate time range current_time = datetime.utcnow() start_time = current_time - timedelta(minutes=N_MINUTES) start_time_str = start_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ') try: # Construct the search query with aggregations search_query = { "query": { "bool": { "must": [ { "terms": { "resource.service.name.keyword": target_services } }, { "terms": { "severity.text.keyword": severity_levels } }, { "range": { "@timestamp": { "gte": start_time_str } } } ] } }, "size": 0, "aggs": { "by_service": { "terms": { "field": "resource.service.name.keyword", "size": 100 }, "aggs": { "by_function": { "terms": { "field": "attributes.code.function.name.keyword", "size": 100, "missing": "unknown_function" }, "aggs": { "sample_body": { "top_hits": { "size": 1, "_source": ["body", "@timestamp", "severity.text"] } } } } } } } } # Make request to search for logs with aggregations response = requests.post( f"{elastic_url}/otel-logs-*/_search", headers={'Content-Type': 'application/json'}, data=json.dumps(search_query), verify=use_ssl, timeout=30 ) if response.status_code == 200: search_results = response.json() # Extract total hits total_matching_logs = search_results.get('hits', {}).get('total', {}).get('value', 0) # Process aggregation results grouped_logs = {} service_buckets = search_results.get('aggregations', {}).get('by_service', {}).get('buckets', []) for service_bucket in service_buckets: service_name = service_bucket['key'] grouped_logs[service_name] = {} function_buckets = service_bucket.get('by_function', {}).get('buckets', []) for function_bucket in function_buckets: function_name = function_bucket['key'] count = function_bucket['doc_count'] # Extract sample body sample_hits = function_bucket.get('sample_body', {}).get('hits', {}).get('hits', []) sample_body = "" sample_timestamp = "" sample_severity = "" if sample_hits: sample_source = sample_hits[0].get('_source', {}) # Extract body (handle array format) body = sample_source.get('body', ['']) if isinstance(body, list) and len(body) > 0: sample_body = body[0] else: sample_body = str(body) # Extract timestamp (handle array format) timestamp = sample_source.get('@timestamp', ['']) if isinstance(timestamp, list) and len(timestamp) > 0: sample_timestamp = timestamp[0] else: sample_timestamp = str(timestamp) # Extract severity (handle array format) severity = sample_source.get('severity.text', ['']) if isinstance(severity, list) and len(severity) > 0: sample_severity = severity[0] else: sample_severity = str(severity) grouped_logs[service_name][function_name] = { 'count': count, 'sample_body': sample_body[:200] + '...' if len(sample_body) > 200 else sample_body, 'sample_timestamp': sample_timestamp, 'sample_severity': sample_severity } query_successful = True print(f"Successfully queried logs from last {N_MINUTES} minutes") print(f"Total matching logs: {total_matching_logs}") print(f"Services found: {list(grouped_logs.keys())}") # Print summary of grouped results for service, functions in grouped_logs.items(): print(f"\nService: {service}") for function, data in functions.items(): print(f" Function: {function} - Count: {data['count']}") print(f" Sample: {data['sample_body'][:100]}{'...' if len(data['sample_body']) > 100 else ''}") else: print(f"Error querying logs: HTTP {response.status_code}") print(f"Response: {response.text}") grouped_logs = {} total_matching_logs = 0 query_successful = False except Exception as e: print(f"Exception occurred while querying logs: {str(e)}") grouped_logs = {} total_matching_logs = 0 query_successful = False print(f"\nOutput parameters:") print(f"grouped_logs: {json.dumps(grouped_logs, indent=2)}") print(f"total_matching_logs: {total_matching_logs}") print(f"query_successful: {query_successful}")copied6 - 7o7GgATUkQAAljrS4WNEKSummarize WARN/ERROR/FATAL logs per service in target services ["frontend-proxy","frontend","product-catalog"] for the last 60 minutes in ELK.
7
Summarize WARN/ERROR/FATAL logs per service in target services ["frontend-proxy","frontend","product-catalog"] for the last 60 minutes in ELK.
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Analyzes Elasticsearch logs to summarize WARN/ERROR/FATAL logs per service with counts, severity breakdown, and sample messagesinputsoutputsimport requests import json from urllib.parse import urlparse from datetime import datetime, timedelta # Get Elasticsearch URL from environment elastic_url = getEnvVar('ELASTIC_URL_OTEL') # Parse URL to determine if SSL should be used parsed_url = urlparse(elastic_url) use_ssl = parsed_url.scheme == 'https' # Calculate time range current_time = datetime.utcnow() start_time = current_time - timedelta(minutes=time_minutes) start_time_str = start_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ') try: # Construct the search query with aggregations search_query = { "query": { "bool": { "must": [ { "terms": { "resource.service.name.keyword": target_services } }, { "terms": { "severity.text.keyword": severity_levels } }, { "range": { "@timestamp": { "gte": start_time_str } } } ] } }, "size": 0, "aggs": { "by_service": { "terms": { "field": "resource.service.name.keyword", "size": 100 }, "aggs": { "by_severity": { "terms": { "field": "severity.text.keyword", "size": 10 }, "aggs": { "sample_logs": { "top_hits": { "size": 3, "_source": ["body", "@timestamp", "severity.text"], "sort": [{"@timestamp": {"order": "desc"}}] } } } } } } } } # Make request to search for logs with aggregations response = requests.post( f"{elastic_url}/otel-logs-*/_search", headers={'Content-Type': 'application/json'}, data=json.dumps(search_query), verify=use_ssl, timeout=30 ) if response.status_code == 200: search_results = response.json() # Extract total hits total_error_logs = search_results.get('hits', {}).get('total', {}).get('value', 0) # Process aggregation results service_summary = {} service_buckets = search_results.get('aggregations', {}).get('by_service', {}).get('buckets', []) for service_bucket in service_buckets: service_name = service_bucket['key'] service_total_count = service_bucket['doc_count'] service_summary[service_name] = { 'total_count': service_total_count, 'severity_breakdown': {}, 'sample_messages': [] } severity_buckets = service_bucket.get('by_severity', {}).get('buckets', []) for severity_bucket in severity_buckets: severity_level = severity_bucket['key'] severity_count = severity_bucket['doc_count'] service_summary[service_name]['severity_breakdown'][severity_level] = severity_count # Extract sample logs sample_hits = severity_bucket.get('sample_logs', {}).get('hits', {}).get('hits', []) for hit in sample_hits: sample_source = hit.get('_source', {}) # Extract body (handle array format) body = sample_source.get('body', ['']) if isinstance(body, list) and len(body) > 0: body = body[0] # Extract timestamp (handle array format) timestamp = sample_source.get('@timestamp', ['']) if isinstance(timestamp, list) and len(timestamp) > 0: timestamp = timestamp[0] # Extract severity (handle array format) severity = sample_source.get('severity.text', ['']) if isinstance(severity, list) and len(severity) > 0: severity = severity[0] sample_message = { 'severity': severity, 'timestamp': timestamp, 'body': body[:150] + '...' if len(body) > 150 else body } service_summary[service_name]['sample_messages'].append(sample_message) # Add services with zero error logs for service in target_services: if service not in service_summary: service_summary[service] = { 'total_count': 0, 'severity_breakdown': {}, 'sample_messages': [] } analysis_successful = True print(f"Successfully analyzed logs from last {time_minutes} minutes") print(f"Total WARN/ERROR/FATAL logs found: {total_error_logs}") # Print summary for each service for service, data in service_summary.items(): print(f"\nService: {service}") print(f" Total error logs: {data['total_count']}") if data['severity_breakdown']: print(f" Severity breakdown:") for severity, count in data['severity_breakdown'].items(): print(f" {severity}: {count}") if data['sample_messages']: print(f" Sample messages ({len(data['sample_messages'])}):") for i, msg in enumerate(data['sample_messages'][:2], 1): print(f" {i}. [{msg['severity']}] {msg['body'][:80]}{'...' if len(msg['body']) > 80 else ''}") else: print(f" No WARN/ERROR/FATAL logs found") else: print(f"Error querying logs: HTTP {response.status_code}") print(f"Response: {response.text}") service_summary = {} total_error_logs = 0 analysis_successful = False except Exception as e: print(f"Exception occurred while analyzing logs: {str(e)}") service_summary = {} total_error_logs = 0 analysis_successful = False print(f"\nOutput parameters:") print(f"service_summary: {json.dumps(service_summary, indent=2)}") print(f"total_error_logs: {total_error_logs}") print(f"analysis_successful: {analysis_successful}")copied7 - 8FAxQGNLIuUeS6mHRHprdGet Errors or Bad HTTP Status Codes from Elasticsearch Logs for a specific time window and index pattern
8
Get Errors or Bad HTTP Status Codes from Elasticsearch Logs for a specific time window and index pattern
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task fetches for any http status code above 400 if it has appeared in the body for Elasticsearch Logs.
It expects the following inputs
#* target_services: list[str] of service names to include (e.g., ["frontend", "frontend-proxy"])
#* time_minutes: int lookback window in minutes (defaults to 15 if missing/invalid)
#* index_name_pattern: str index pattern (defaults to "otel-logs-*")
#* sample_size_per_bucket: int number of sample logs to fetch per service/severity (default 3)
#* max_error_logs: int (cap for exported raw error logs; default 10)
inputsoutputsimport requests import json from urllib.parse import urlparse from datetime import datetime, timedelta # ===== Inputs expected from your environment/runtime ===== # - ELASTIC_URL_OTEL via getEnvVar('ELASTIC_URL_OTEL') # - Optional: # * target_services: list[str] (e.g., ["frontend", "frontend-proxy"]) # * time_minutes: int (lookback window; defaults to 15 if missing/invalid) # * index_name_pattern: str (defaults to "otel-logs-*") # * sample_size_per_bucket: int (default 3) # * max_error_logs: int (cap for exported raw error logs; default 200) elastic_url = getEnvVar('ELASTIC_URL_OTEL') # -------- Defaults / Guards -------- try: if not isinstance(time_minutes, (int, float)) or time_minutes <= 0: print("[INFO] time_minutes not set or invalid — defaulting to 15") time_minutes = 15 else: print(f"[INFO] Using time_minutes={time_minutes}") except NameError: print("[INFO] time_minutes not defined — defaulting to 15") time_minutes = 15 try: target_services # noqa: F821 if not isinstance(target_services, list) or not all(isinstance(s, str) for s in target_services): print("[INFO] target_services invalid — ignoring service filter") target_services = [] except NameError: target_services = [] print("[INFO] target_services not provided — searching across all services") try: index_name_pattern # noqa: F821 if not isinstance(index_name_pattern, str) or not index_name_pattern.strip(): index_name_pattern = "otel-logs-*" except NameError: index_name_pattern = "otel-logs-*" try: sample_size_per_bucket # noqa: F821 if not isinstance(sample_size_per_bucket, int) or sample_size_per_bucket <= 0: sample_size_per_bucket = 3 except NameError: sample_size_per_bucket = 3 try: max_error_logs # noqa: F821 if not isinstance(max_error_logs, int) or max_error_logs <= 0: max_error_logs = 10 except NameError: max_error_logs = 10 # -------- Transport / SSL -------- parsed_url = urlparse(elastic_url) use_ssl = parsed_url.scheme == 'https' # -------- Time Window -------- current_time = datetime.utcnow() start_time = current_time - timedelta(minutes=time_minutes) start_time_str = start_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ') print(f"[INFO] Searching index pattern '{index_name_pattern}' for HTTP status >= 400 in body, " f"last {time_minutes} minute(s)") if target_services: print(f"[INFO] Restricting to services: {target_services}") # -------- Runtime field to extract status code from `body` -------- # Tries `body.keyword` (doc values) then falls back to `_source`. Regex finds a 3-digit status code. runtime_mappings = { "status_code": { "type": "long", "script": { "source": """ String text = null; if (doc.containsKey('body.keyword') && doc['body.keyword'].size() > 0) { text = doc['body.keyword'].value; } else if (params._source != null && params._source.containsKey('body')) { def b = params._source.body; if (b != null) { text = b instanceof String ? b : (b instanceof List && b.size() > 0 ? b[0] : null); } } if (text == null) return; def m = /\\s(\\d{3})\\s/.matcher(text); if (m.find()) { emit(Integer.parseInt(m.group(1))); } """ } } } # -------- Query building helpers -------- must_filters = [ {"range": {"@timestamp": {"gte": start_time_str}}} ] if target_services: must_filters.append({"terms": {"resource.service.name.keyword": target_services}}) base_bool_query = { "bool": { "must": must_filters, "filter": [ {"range": {"status_code": {"gte": 400}}} ] } } # -------- 1) Aggregation query (size=0) -------- agg_query = { "runtime_mappings": runtime_mappings, "query": base_bool_query, "size": 0, "aggs": { "by_service": { "terms": { "field": "resource.service.name.keyword", "size": 100 }, "aggs": { "by_status_class": { "terms": { "script": { "source": """ def sc = doc['status_code'].size() > 0 ? doc['status_code'].value : null; if (sc == null) return 'unknown'; if (sc >= 400 && sc < 500) return '4xx'; if (sc >= 500 && sc < 600) return '5xx'; return 'other'; """, "lang": "painless" }, "size": 5 } }, "sample_logs": { "top_hits": { "size": sample_size_per_bucket, "_source": [ "body", "@timestamp", "resource.service.name", "resource.k8s.pod.name", "resource.k8s.namespace.name", "severity.text", "traceId", "spanId" ], "sort": [{"@timestamp": {"order": "desc"}}] } } } } } } # -------- 2) Hits query for exporting raw error logs (capped) -------- hits_query = { "runtime_mappings": runtime_mappings, "query": base_bool_query, "sort": [{"@timestamp": {"order": "desc"}}], "track_total_hits": True, "size": max_error_logs, "_source": [ "body", "@timestamp", "resource.service.name", "resource.k8s.pod.name", "resource.k8s.namespace.name", "severity.text", "traceId", "spanId" ] } def _coerce_scalar(val): """Return first element if list-like; otherwise return val.""" if isinstance(val, list) and val: return val[0] return val def _extract_source_fields(src): body = _coerce_scalar(src.get("body", "")) ts = _coerce_scalar(src.get("@timestamp", "")) sev = _coerce_scalar(src.get("severity", {}).get("text", "")) or _coerce_scalar(src.get("severity.text", "")) service = ( src.get("resource", {}).get("service", {}).get("name", "") or _coerce_scalar(src.get("resource.service.name", "")) ) pod = ( src.get("resource", {}).get("k8s", {}).get("pod", {}).get("name", "") or _coerce_scalar(src.get("resource.k8s.pod.name", "")) ) namespace = ( src.get("resource", {}).get("k8s", {}).get("namespace", {}).get("name", "") or _coerce_scalar(src.get("resource.k8s.namespace.name", "")) ) trace_id = _coerce_scalar(src.get("traceId", "")) span_id = _coerce_scalar(src.get("spanId", "")) return { "timestamp": ts, "service": service, "pod": pod, "namespace": namespace, "severity": sev, "traceId": trace_id, "spanId": span_id, "body": body } try: # ---- Run aggregation query ---- agg_resp = requests.post( f"{elastic_url}/{index_name_pattern}/_search", headers={"Content-Type": "application/json"}, data=json.dumps(agg_query), verify=use_ssl, timeout=30 ) # ---- Run hits query for raw error logs ---- hits_resp = requests.post( f"{elastic_url}/{index_name_pattern}/_search", headers={"Content-Type": "application/json"}, data=json.dumps(hits_query), verify=use_ssl, timeout=30 ) if agg_resp.status_code != 200 or hits_resp.status_code != 200: if agg_resp.status_code != 200: print(f"[ERROR] Aggregation search HTTP {agg_resp.status_code}") print(agg_resp.text) if hits_resp.status_code != 200: print(f"[ERROR] Hits search HTTP {hits_resp.status_code}") print(hits_resp.text) service_summary = {} error_logs = [] total_error_logs = 0 analysis_successful = False else: agg_result = agg_resp.json() hits_result = hits_resp.json() total_error_logs = hits_result.get("hits", {}).get("total", {}).get("value", 0) # ---- Build service_summary from aggregations ---- service_summary = {} buckets = agg_result.get("aggregations", {}).get("by_service", {}).get("buckets", []) for b in buckets: svc = b.get("key", "") doc_count = b.get("doc_count", 0) class_buckets = b.get("by_status_class", {}).get("buckets", []) class_breakdown = {cb["key"]: cb["doc_count"] for cb in class_buckets} samples = [] sample_hits = b.get("sample_logs", {}).get("hits", {}).get("hits", []) for h in sample_hits: src = h.get("_source", {}) samples.append(_extract_source_fields(src)) service_summary[svc] = { "total_count": doc_count, "status_class_breakdown": class_breakdown, "sample_messages": samples } # Ensure all requested services appear (even if zero) if target_services: for svc in target_services: service_summary.setdefault(svc, { "total_count": 0, "status_class_breakdown": {}, "sample_messages": [] }) # ---- Collect capped raw error logs from hits query ---- error_logs = [] for h in hits_result.get("hits", {}).get("hits", []): src = h.get("_source", {}) error_logs.append(_extract_source_fields(src)) analysis_successful = True print(f"Successfully analyzed logs for HTTP status >= 400 over last {time_minutes} minute(s).") print(f"Total matching logs (all shards): {total_error_logs}") print(f"Exported raw error logs (capped): {len(error_logs)}") for svc, data in service_summary.items(): print(f"\nService: {svc}") print(f" Total error logs: {data['total_count']}") if data["status_class_breakdown"]: print(" Status class breakdown:") for k, v in data["status_class_breakdown"].items(): print(f" {k}: {v}") if data["sample_messages"]: print(f" Sample messages ({len(data['sample_messages'])}):") for i, msg in enumerate(data["sample_messages"][:2], 1): print(f" {i}. [{msg['timestamp']}] {msg['body']}") else: print(" No sample messages") except Exception as e: print(f"[EXCEPTION] {str(e)}") service_summary = {} error_logs = [] total_error_logs = 0 analysis_successful = False # -------- Outputs -------- print("\nOutput parameters:") print(f"service_summary: {json.dumps(service_summary, indent=2)}") print(f"total_error_logs: {total_error_logs}") print(f"exported_error_logs_count: {len(error_logs)}") # If your runner expects a variable to capture raw logs: # error_logs now holds the capped list print(f"error_logs: {json.dumps(error_logs, indent=2)}") print(f"analysis_successful: {analysis_successful}")copied8 - 9kmNO5DdOzWka0oYm4JnpTest error task
9
Test error task
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.inputsoutputsimport requests import json from urllib.parse import urlparse from datetime import datetime, timedelta # ===== Inputs expected from your environment/runtime ===== # - ELASTIC_URL_OTEL via getEnvVar # - Optional: # * target_services: list[str] of service names to include (e.g., ["frontend", "frontend-proxy"]) # * time_minutes: int lookback window in minutes (defaults to 15 if missing/invalid) # * index_name_pattern: str index pattern (defaults to "otel-logs-*") # * sample_size_per_bucket: int number of sample logs to fetch per service/severity (default 3) elastic_url = getEnvVar('ELASTIC_URL_OTEL') # -------- Defaults / Guards -------- try: if not isinstance(time_minutes, (int, float)) or time_minutes <= 0: print("[INFO] time_minutes not set or invalid — defaulting to 15") time_minutes = 15 else: print(f"[INFO] Using time_minutes={time_minutes}") except NameError: print("[INFO] time_minutes not defined — defaulting to 15") time_minutes = 15 try: target_services # noqa: F821 if not isinstance(target_services, list) or not all(isinstance(s, str) for s in target_services): print("[INFO] target_services invalid — ignoring service filter") target_services = [] except NameError: target_services = [] print("[INFO] target_services not provided — searching across all services") try: index_name_pattern # noqa: F821 if not isinstance(index_name_pattern, str) or not index_name_pattern.strip(): index_name_pattern = "otel-logs-*" except NameError: index_name_pattern = "otel-logs-*" try: sample_size_per_bucket # noqa: F821 if not isinstance(sample_size_per_bucket, int) or sample_size_per_bucket <= 0: sample_size_per_bucket = 3 except NameError: sample_size_per_bucket = 3 # -------- Transport / SSL -------- parsed_url = urlparse(elastic_url) use_ssl = parsed_url.scheme == 'https' # -------- Time Window -------- current_time = datetime.utcnow() start_time = current_time - timedelta(minutes=time_minutes) start_time_str = start_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ') print(f"[INFO] Searching index pattern '{index_name_pattern}' for HTTP status >= 400 in body, " f"last {time_minutes} minute(s)") if target_services: print(f"[INFO] Restricting to services: {target_services}") # -------- Runtime field to extract status code from `body` -------- # We try to use body.keyword (fast via doc values). If absent/ignored, we fall back to _source. # The regex looks for a 3-digit code bounded by whitespace (common in Envoy/Nginx access logs). runtime_mappings = { "status_code": { "type": "long", "script": { "source": """ String text = null; if (doc.containsKey('body.keyword') && doc['body.keyword'].size() > 0) { text = doc['body.keyword'].value; } else if (params._source != null && params._source.containsKey('body')) { def b = params._source.body; if (b != null) { text = b instanceof String ? b : (b instanceof List && b.size() > 0 ? b[0] : null); } } if (text == null) return; def m = /\\s(\\d{3})\\s/.matcher(text); if (m.find()) { def code = Integer.parseInt(m.group(1)); emit(code); } """ } } } # -------- Query -------- # Filters: # - time range # - optional terms filter for service.name # - range on the runtime status_code >= 400 must_filters = [ {"range": {"@timestamp": {"gte": start_time_str}}} ] if target_services: must_filters.append({ "terms": {"resource.service.name.keyword": target_services} }) search_query = { "runtime_mappings": runtime_mappings, "query": { "bool": { "must": must_filters, "filter": [ {"range": {"status_code": {"gte": 400}}} ] } }, "size": 0, "aggs": { "by_service": { "terms": { "field": "resource.service.name.keyword", "size": 100 }, "aggs": { "by_status_class": { "terms": { # Collapse into 4xx vs 5xx using a runtime script that buckets by first digit "script": { "source": """ def sc = doc['status_code'].size() > 0 ? doc['status_code'].value : null; if (sc == null) return 'unknown'; if (sc >= 400 && sc < 500) return '4xx'; if (sc >= 500 && sc < 600) return '5xx'; return 'other'; """, "lang": "painless" }, "size": 3 } }, "sample_logs": { "top_hits": { "size": sample_size_per_bucket, "_source": [ "body", "@timestamp", "resource.service.name", "resource.k8s.pod.name", "resource.k8s.namespace.name", "severity.text", "traceId", "spanId" ], "sort": [{"@timestamp": {"order": "desc"}}] } } } } } } try: resp = requests.post( f"{elastic_url}/{index_name_pattern}/_search", headers={"Content-Type": "application/json"}, data=json.dumps(search_query), verify=use_ssl, timeout=30 ) if resp.status_code != 200: print(f"[ERROR] Elasticsearch returned HTTP {resp.status_code}") print(resp.text) service_summary = {} total_error_logs = 0 analysis_successful = False else: result = resp.json() total_error_logs = result.get("hits", {}).get("total", {}).get("value", 0) service_summary = {} buckets = result.get("aggregations", {}).get("by_service", {}).get("buckets", []) for b in buckets: svc = b.get("key", "") doc_count = b.get("doc_count", 0) # Severity might not be present in access logs; keep key but it may be empty sev_breakdown = {} # 4xx vs 5xx class breakdown class_buckets = b.get("by_status_class", {}).get("buckets", []) class_breakdown = {cb["key"]: cb["doc_count"] for cb in class_buckets} # Sample logs samples = [] hits = b.get("sample_logs", {}).get("hits", {}).get("hits", []) for h in hits: src = h.get("_source", {}) # body may be string or list body = src.get("body", "") if isinstance(body, list) and body: body = body[0] ts = src.get("@timestamp", "") if isinstance(ts, list) and ts: ts = ts[0] sev = src.get("severity", {}).get("text", "") or src.get("severity.text", "") if isinstance(sev, list) and sev: sev = sev[0] # Keep body as-is (no truncation); comment the next two lines if you want the full body always. # shown_body = body if len(body) <= 500 else (body[:500] + " ...") shown_body = body # no truncation as requested samples.append({ "timestamp": ts, "service": src.get("resource", {}).get("service", {}).get("name", "") or src.get("resource.service.name", ""), "pod": src.get("resource", {}).get("k8s", {}).get("pod", {}).get("name", "") or src.get("resource.k8s.pod.name", ""), "namespace": src.get("resource", {}).get("k8s", {}).get("namespace", {}).get("name", "") or src.get("resource.k8s.namespace.name", ""), "severity": sev, "traceId": src.get("traceId", ""), "spanId": src.get("spanId", ""), "body": shown_body }) service_summary[svc] = { "total_count": doc_count, "status_class_breakdown": class_breakdown, "severity_breakdown": sev_breakdown, "sample_messages": samples } # Ensure all requested services appear (even if zero) if target_services: for svc in target_services: service_summary.setdefault(svc, { "total_count": 0, "status_class_breakdown": {}, "severity_breakdown": {}, "sample_messages": [] }) analysis_successful = True print(f"Successfully analyzed logs for HTTP status >= 400 over last {time_minutes} minute(s).") print(f"Total matching logs: {total_error_logs}") for svc, data in service_summary.items(): print(f"\nService: {svc}") print(f" Total error logs: {data['total_count']}") if data["status_class_breakdown"]: print(" Status class breakdown:") for k, v in data["status_class_breakdown"].items(): print(f" {k}: {v}") if data["sample_messages"]: print(f" Sample messages ({len(data['sample_messages'])}):") for i, msg in enumerate(data["sample_messages"][:2], 1): body_preview = msg["body"] print(f" {i}. [{msg['timestamp']}] {body_preview}") else: print(" No sample messages") except Exception as e: print(f"[EXCEPTION] {str(e)}") service_summary = {} total_error_logs = 0 analysis_successful = False print("\nOutput parameters:") print(f"service_summary: {json.dumps(service_summary, indent=2)}") print(f"total_error_logs: {total_error_logs}") print(f"analysis_successful: {analysis_successful}")copied9