Creating a Standard Logstash Pipeline
Notation and Variables
LS_HOME
- Logstash home directory, typically/app/logstash/
PIPELINE_FILE
— the name of the pipeline configuration filePIPELINE_ID
— pipeline identifierHOST_OS_DATA
— nodes with thedata
roleINDEX_NAME
— name of the target index
Pipeline File Location
Create the pipeline file <PIPELINE_FILE>
and place it in the ${LS_HOME}/config/conf.d
directory using the following command:
sudo -u logstash touch ${LS_HOME}/config/conf.d/<PIPELINE_FILE>
Pipeline File Structure
A pipeline configuration file consists of three primary blocks: input
, filter
and output
.
Input Block
The input block defines the source(s) from which Logstash ingests data. This could be a file (e.g., line-based logs, JSON, CSV), input from a forwarder agent, directory service queries (e.g., LDAP), REST API calls, and more.
If multiple input plugins are defined within a single configuration file, each operates independently — data will be collected in parallel from all specified sources. When using multiple configurations of the same input plugin (e.g., several file
blocks), each is handled according to its specific parameters.
Example input block:
input {
file {
path => "/tmp/log_file.log"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => "json"
}
}
This example uses the file
input plugin with the following parameters for file reading:
path
— specifies the path to the file to be processed by Logstashstart_position
— defines the starting position for reading data from the filesincedb_path
— controls the path to the file storing the current read position. If not specified, Logstash uses a file in the${LS_HOME}/data/plugins/inputs
directory. To force reading from the beginning, you can specify/dev/null
this will reset the position on restart and may cause data duplicationcodec
— a common parameter for all input plugins that specifies the incoming data decoding method. In this case, the file input plugin expects incoming file lines to be in JSON format and will deserialize them into objects
Filter Block
The filter block handles incoming data processing: parsing, filtering, and modification. For log parsing, the grok filter plugin is typically used, which works with regular expressions. Additional information about the grok filter plugin can be found in the Elastic official documentation. The filter block may also include the mutate plugin, which allows applying options like add_field
, remove_field
and other event field transformation methods. More information about these and other filter plugins is available in the Elastic official documentation. All plugins within the filter block execute sequentially.
Example of a simple filter block:
filter {
grok {
match => {
"message" => ['%{IP:client_ip}\s+"%{IP:remote_ip}"\s+\[%{HTTPDATE:timestamp}\]\s+%{WORD:http_method}]
}
}
mutate {
rename => { "http_method" => "method" }
add_field => { "service" => "web_server" }
}
}
Output Block
The output block defines where processed data is sent. This can include destinations such as Elasticsearch, OpenSearch, files, or databases.
If multiple output plugins are specified in the configuration, each processed event will be sequentially passed to all output blocks. Each output block receives its own copy of the event, allowing, for example, simultaneous data sending to OpenSearch and saving to a file for debugging purposes.
If one output
block fails, the others will continue to function.
Example of a standard output block for OpenSearch:
output {
opensearch {
hosts => ["https://data1:9200", "https://data2:9200"]
index => "<INDEX_NAME>"
user => "logstash"
password => "${ES_PWD}"
ssl => true
cacert => "${LS_HOME}/config/ca-cert.pem"
ecs_compatibility => "disabled"
}
}
This example uses standard parameters:
hosts
— data role nodes, multiple values can be specified comma-separatedindex
— name of the index where events will be stored. To automatically generate weekly indexes, use a dynamic naming pattern like<INDEX_NAME>-%{+xxxx.ww}
, where%{+xxxx.ww}
inserts the ISO-formatted year and week number (recommended to use policies ISM)user
— username for OpenSearch authenticationpassword
— password for OpenSearch authentication. In this example, the password is retrieved from the Logstash keystore (see the relevant section on Adding a Password to the Keystore via Request)ssl
— parameter determining whether to use SSLssl_certificate_verification
— certificate verification levelcacert
— path to the CA certificateecs_compatibility
— configures compatibility with the ECS
Adding the Pipeline to pipelines.yml
After creating the pipeline, it must be added to the pipelines.yml
file to launch on the next Logstash start. Navigate to the ${LS_HOME}/config
directory and open the pipelines.yml
file for editing:
sudo nano ${LS_HOME}/config/pipelines.yml
Add the following lines to the file by executing:
- pipeline.id: <PIPELINE_ID>
path.config: "${LS_HOME}/config/conf.d/<PIPELINE_FILE>"
It’s recommended to use a <PIPELINE_ID>
that matches the pipeline filename. This helps identify pipeline entries in logs and simplifies debugging.
Now restart Logstash and check its status with these commands:
sudo systemctl restart logstash
sudo systemctl status logstash
Viewing Logs
To check running pipeline statistics, execute:
tail -n 100 /app/logs/logstash/logstash-plain.log
Your pipeline ID (<PIPELINE_ID>
) should appear in the list of running pipelines (running_pipelines
). Example of running pipelines display:
[2025-06-02T16:30:20,227][INFO ][logstash.agent ] Pipelines running {:count=>3, :running_pipelines=>[:linux_io_monitoring, :win_activity, :ksc_applications], :non_running_pipelines=>[]}
In addition to the main Logstash log (logstash-plain.log
), each pipeline has its own log saved as pipeline_<PIPELINE_ID>.log
, containing detailed information about the pipeline's operation. Example log:
[2025-06-02T16:30:06,222][INFO ][logstash.javapipeline ] Starting pipeline {:pipeline_id=>"linux_io_monitoring", "pipeline.workers"=>8, "pipeline.batch.size"=>225, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1800, "pipeline.sources"=>["/app/logstash/config/conf.d/linux_disk_io.conf"], :thread=>"#<Thread:0x2ebc0ca3 run>"}
[2025-06-02T16:30:07,953][INFO ][logstash.javapipeline ] Pipeline Java execution initialization time {"seconds"=>1.73}
[2025-06-02T16:30:08,678][INFO ][logstash.inputs.beats ] Starting input listener {:address=>"0.0.0.0:51123"}
[2025-06-02T16:30:08,779][INFO ][logstash.javapipeline ] Pipeline started {"pipeline.id"=>"linux_io_monitoring"}
[2025-06-02T16:30:09,058][INFO ][org.logstash.beats.Server] Starting server on port: 51123
The last line confirms that the pipeline has started successfully.