|
| 1 | +# Copyright © 2011-2026 Splunk, Inc. |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"): you may |
| 4 | +# not use this file except in compliance with the License. You may obtain |
| 5 | +# a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| 11 | +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| 12 | +# License for the specific language governing permissions and limitations |
| 13 | +# under the License. |
| 14 | +import asyncio |
| 15 | +import json |
| 16 | +import logging |
| 17 | +import logging.handlers |
| 18 | +import os |
| 19 | +import sys |
| 20 | +from collections.abc import Generator, Sequence |
| 21 | +from typing import Any, final, override |
| 22 | + |
| 23 | +# ! NOTE: This insert is only needed for splunk-sdk-python CI/CD to work. |
| 24 | +# ! Remove this if you're modifying this example locally. |
| 25 | +sys.path.insert(0, "/splunklib-deps") |
| 26 | + |
| 27 | +# Include all 3rd party dependencies from <app_name>/bin/lib/ |
| 28 | +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "lib")) |
| 29 | + |
| 30 | +import httpx |
| 31 | +from pydantic import BaseModel, Field |
| 32 | + |
| 33 | +from splunklib.ai import OpenAIModel |
| 34 | +from splunklib.ai.agent import Agent |
| 35 | +from splunklib.ai.messages import HumanMessage |
| 36 | +from splunklib.data import Record |
| 37 | +from splunklib.searchcommands import ( |
| 38 | + Configuration, |
| 39 | + Option, |
| 40 | + dispatch, # pyright: ignore[reportPrivateLocalImportUsage] |
| 41 | + validators, |
| 42 | +) |
| 43 | +from splunklib.searchcommands.eventing_command import EventingCommand |
| 44 | + |
| 45 | +# BUG: By default, a CRE process has its trust store path overridden by Splunk. |
| 46 | +# Unsetting that env makes said process use the default CAs instead. |
| 47 | +CA_TRUST_STORE = "/opt/splunk/openssl/cert.pem" |
| 48 | +if os.environ.get("SSL_CERT_FILE") == CA_TRUST_STORE and not os.path.exists( |
| 49 | + CA_TRUST_STORE |
| 50 | +): |
| 51 | + del os.environ["SSL_CERT_FILE"] |
| 52 | + |
| 53 | +APP_NAME = "ai_custom_search_app" |
| 54 | + |
| 55 | + |
| 56 | +def setup_logging() -> logging.Logger: |
| 57 | + """To see logs from this logger, run this SPL in Splunk: |
| 58 | + `index=_internal sourcetype=ai_custom_search_app:log` |
| 59 | + """ |
| 60 | + SPLUNK_HOME: str = os.environ.get("SPLUNK_HOME", os.path.join("/opt", "splunk")) |
| 61 | + LOG_FILE: str = os.path.join(SPLUNK_HOME, "var", "log", "splunk", f"{APP_NAME}.log") |
| 62 | + |
| 63 | + logger = logging.getLogger(APP_NAME) |
| 64 | + logger.setLevel(logging.DEBUG) |
| 65 | + |
| 66 | + handler = logging.handlers.RotatingFileHandler( |
| 67 | + LOG_FILE, maxBytes=1024 * 1024, backupCount=5 |
| 68 | + ) |
| 69 | + handler.setFormatter( |
| 70 | + logging.Formatter(f"%(asctime)s %(levelname)s [{APP_NAME}] %(message)s") |
| 71 | + ) |
| 72 | + logger.addHandler(handler) |
| 73 | + |
| 74 | + return logger |
| 75 | + |
| 76 | + |
| 77 | +logger = setup_logging() |
| 78 | + |
| 79 | +# endregion |
| 80 | + |
| 81 | +LLM_MODEL = OpenAIModel( |
| 82 | + model="gpt-4o-mini", |
| 83 | + base_url="https://api.openai.com/v1", |
| 84 | + # To store API keys, consider secret storage: |
| 85 | + # https://dev.splunk.com/enterprise/docs/developapps/manageknowledge/secretstorage/secretstoragepython |
| 86 | + api_key="<super_secret_key>", |
| 87 | +) |
| 88 | +LLM_SYSTEM_PROMPT = "You are an Expert Splunk Data Analyst." |
| 89 | + |
| 90 | + |
| 91 | +class AgentOutput(BaseModel): |
| 92 | + """Output schema model for the LLM-based Agent.""" |
| 93 | + |
| 94 | + should_keep: bool = Field( |
| 95 | + description="If False, filter a record out of the pipeline.", default=True |
| 96 | + ) |
| 97 | + is_relevant: bool = Field( |
| 98 | + description="Should event be highlighted in a table view.", default=False |
| 99 | + ) |
| 100 | + |
| 101 | + |
| 102 | +@final |
| 103 | +@Configuration() |
| 104 | +class AgenticReportingCSC(EventingCommand): |
| 105 | + """agenticreport provides an assortment of example integrations with an LLM Agent. |
| 106 | +
|
| 107 | + Example: |
| 108 | + ``` |
| 109 | + | makeresults count=10 | streamstats count as _row |
| 110 | + | agenticreport should_filter="true" highlight_topic="Is this record's _row odd?" |
| 111 | + ``` |
| 112 | + """ |
| 113 | + |
| 114 | + should_filter = Option( |
| 115 | + doc="Should irrelevant records be filtered out", |
| 116 | + require=False, |
| 117 | + default=False, |
| 118 | + validate=validators.Boolean(), |
| 119 | + ) |
| 120 | + highlight_topic = Option( |
| 121 | + doc="What to consider when deciding to highlight a record", |
| 122 | + require=False, |
| 123 | + default=False, |
| 124 | + ) |
| 125 | + |
| 126 | + @override |
| 127 | + def transform(self, records: Sequence[Record]) -> Generator[Record, Any]: |
| 128 | + logger.info( |
| 129 | + "Begin transform() in `agenticreport` with " |
| 130 | + + f"options: {self.should_filter=}, {self.highlight_topic=}" |
| 131 | + ) |
| 132 | + |
| 133 | + for record in records: |
| 134 | + if not record: |
| 135 | + continue |
| 136 | + |
| 137 | + record_json = json.dumps(record) |
| 138 | + logger.debug(f"{record_json=}") |
| 139 | + |
| 140 | + user_prompt = f""" |
| 141 | +Analyze this log: "{record_json}" and perform these tasks: |
| 142 | +
|
| 143 | +1. Decide if record matches the intent: "{self.should_filter}"? |
| 144 | + (Return boolean `should_keep`) |
| 145 | +2. Is this log relevant to "{self.highlight_topic}"? |
| 146 | + (Return boolean `is_relevant`) |
| 147 | +""" |
| 148 | + try: |
| 149 | + llm_analysis = asyncio.run(self.invoke_agent(user_prompt)) |
| 150 | + logger.debug(f"{llm_analysis.model_dump_json()=}") |
| 151 | + if self.should_filter and not llm_analysis.should_keep: |
| 152 | + # Filter the record out of the results |
| 153 | + continue |
| 154 | + |
| 155 | + if self.highlight_topic: |
| 156 | + self.add_field(record, "should_keep", llm_analysis.is_relevant) |
| 157 | + except Exception as e: |
| 158 | + logger.exception(e) |
| 159 | + self.add_field(record, "agent_error", e) |
| 160 | + finally: |
| 161 | + yield record |
| 162 | + |
| 163 | + logger.debug("Finish transform() in `agenticreport`") |
| 164 | + |
| 165 | + async def invoke_agent(self, prompt: str) -> AgentOutput: |
| 166 | + assert self.service, "No Splunk connection available" |
| 167 | + |
| 168 | + async with Agent( |
| 169 | + model=LLM_MODEL, |
| 170 | + system_prompt=LLM_SYSTEM_PROMPT, |
| 171 | + service=self.service, |
| 172 | + output_schema=AgentOutput, |
| 173 | + ) as agent: |
| 174 | + logger.info(f"Invoking {LLM_MODEL.model} at {LLM_MODEL.base_url}") |
| 175 | + result = await agent.invoke([HumanMessage(role="user", content=prompt)]) |
| 176 | + return result.structured_output |
| 177 | + |
| 178 | + |
| 179 | +dispatch(AgenticReportingCSC, sys.argv, sys.stdin, sys.stdout, __name__) |
0 commit comments