Skip to main content
Version: 6.0

script

Description

Executes a script and adds results to the query.

Please note!

The script source requires a configured SA Engine RE to work.

SA Engine RE setup article

Syntax

...| script [intr=<script_interpreter>] <script_file_name> [<timeout>]

Required Arguments

ParameterSyntaxDescription
script_file_name<string>Script file name.

Optional Arguments

ParameterSyntaxDefaultDescription
script_interpreter<string>Taken from sme.core.remote_script.base_interpreter_name.Interpreter name.
timeout<int>Taken from sme.core.remote_script.timeout.Query execution timeout in milliseconds.

Source Configuration

info

All source settings are stored in _cluster/settings.

Configuration example:

"sme" : {
"core" : {
"remote_script" : {
"base_path" : "/home/test_user/test/",
"port" : "18080",
"base_interpreter_name" : "python3",
"url" : "http://localhost",
"timeout" : "60000",
"interpreters" : [
"python3::/usr/bin/python3",
"bash::/usr/bin/bash"
]
}
}
}

Configuration Parameters Description

  • url — URL of the SAF Script Proxy connector
  • port — Port of the SAF Script Proxy connector
  • interpreters - interpreter names and paths to their executable files
  • base_interpreter_name - default interpreter name
  • base_path - path to the directory where scripts are located
  • timeout - request timeout, specified in milliseconds

Query Examples

These examples show running a script without an explicit interpreter, with a specified intr, and with a timeout setting.

Example 1

In this example, the default interpreter specified in the source configuration is used.

| script test-script.py

Example 2

In this example, the interpreter is explicitly passed via intr=python3.

source tweets
| script intr=python3 test-script.py

Example 3

In this example, a timeout is additionally set to limit the script execution time.

source tweets
| script intr=python3 test-script.py timeout=5000

Examples with Argument Passing

To correctly execute a query regardless of where the script command is used (at the beginning, middle, or end of the query), the following features must be considered:

  1. Preparing data for the script: Before calling the script, the query result that will be processed is formed. Data transfer does not require mandatory limitation of the selection by required fields, but to improve performance, the data volume can be reduced using fields, table, or aggregating functions on the required fields. The query result in JSON format is passed to the script through a temporary file, the path to which is automatically specified as the last implicit argument

  2. Script data retrieval: The script must support receiving arguments and further working with them. Events can be the result of aggregation functions or simply the result of a search query. The main condition for successful transmission is the presence of the specified fields in the query result. To pass arguments, it is necessary to list the field names in the script command call line. Input data is extracted from the resp.body.hits.hits block of the temporary file, where they are presented as an array of events (JSON objects). Each array element corresponds to a separate event, which can then be processed using the capabilities of the used interpreter

  3. Sending data from the script: To send one event, it is sufficient to output the JSON result to stdout. To output multiple events, it is necessary to form a payload and similarly send it to stdout

Example 1

Consider the following example of calling transform_data.py. The required fields from the query result are passed as arguments to this script, and for the error_description and connect_status fields, conversion from numeric values to text values is performed. After that, the data is transferred to SA Engine and displayed in the search. This example also demonstrates how to form data for outputting multiple events.

Python script code
    import sys
import json

def main():

# Reading arguments and checking their number
args = sys.argv[1:]
if len(args) != 6:
raise Exception(f"Expected 5 fields + payload file, got {len(args)-1}")

# Last argument - path to temporary file
timestamp_field, sensor_name_field, location_field, error_description_field, connect_status_field, payload_filepath = args


# Loading JSON payload from temporary SA Engine file
with open(payload_filepath) as f:
payload = json.load(f)

# Extracting hits list - data block
hits = payload.get("resp", {}).get("body", payload).get("hits", {}).get("hits", [])
result_hits = []


# Processing each document from hits
for i, hit in enumerate(hits, 1):
source = hit.get("_source", {})

error_description = source.get(error_description_field)
connect_status = source.get(connect_status_field)

# Converting error_description to human-readable format
error_description = (
"No data" if error_description == -1 else
"No error" if 1 <= error_description <= 3 else
"Medium error" if 4 <= error_description <= 7 else
"Critical error" if 8 <= error_description <= 10 else
"No data"
)

# Converting connect_status to human-readable format
connect_status = (
"Connected" if connect_status == 1 else
"Not connected" if connect_status == 0 else
"No data"
)

# Forming the final result document
result_hits.append({
"_index": hit.get("_index", "custom"),
"_type": hit.get("_type", "custom"),
"_id": hit.get("_id", str(i)),
"_score": hit.get("_score", 1),
"_source": {
"@timestamp": source.get(timestamp_field, "No data"),
"sensor_name": source.get(sensor_name_field, "No data"),
"location": source.get(location_field, "No data"),
"error_description": error_description,
"connect_status": connect_status,
}
})


# Forming payload for sending
result = {
"hits": {
"total": {
"value": len(result_hits),
"relation": "eq"
},
"max_score": 1,
"hits": result_hits
}
}

# Outputting payload to stdout
print(json.dumps(result))

if __name__ == "__main__":
main()

Example 2

Data is passed to a bash script in a similar way. The following example demonstrates executing the aggregation_value.sh script, which sends one event. Before using the script function, data is collected using aggs. Then it searches for unique values for the passed fields, after which transformation is performed for the unique_location field using the eval function and the result is displayed through table.

Bash script code
#!/bin/bash
# Getting the temporary query result file - the last implicit argument
payload_filepath="${!#}"

# Getting the field list
fields=("${@:1:$(($#-1))}")

# Reading full JSON from file
payload=$(cat "$payload_filepath")

# Extracting document array located in hits
hits=$(echo "$payload" | jq '.resp.body.hits.hits // .hits.hits // []')

# Processing each passed field
jq_filter='{'

first=1

for field in "${fields[@]}"; do
if [ $first -eq 1 ]; then
first=0
else
jq_filter+=","
fi

jq_filter+="
\"unique_$field\": (
[ .[]?._source.\"$field\" ]
| map(select(. != null))
| unique
)"
done

jq_filter+='
}'

# Outputting the result
echo "$hits" | jq "$jq_filter"