Skip to main content
Version: 5.1

Creating a Standard Logstash Pipeline

Notation and Variables

  • LS_HOME - Logstash home directory, typically /app/logstash/
  • PIPELINE_FILE — the name of the pipeline configuration file
  • PIPELINE_ID — pipeline identifier
  • HOST_OS_DATA — nodes with the data role
  • INDEX_NAME — name of the target index

Pipeline File Location

Create the pipeline file <PIPELINE_FILE> and place it in the ${LS_HOME}/config/conf.d directory using the following command:

sudo -u logstash touch ${LS_HOME}/config/conf.d/<PIPELINE_FILE>

Pipeline File Structure

A pipeline configuration file consists of three primary blocks: input, filter and output.

Input Block

The input block defines the source(s) from which Logstash ingests data. This could be a file (e.g., line-based logs, JSON, CSV), input from a forwarder agent, directory service queries (e.g., LDAP), REST API calls, and more.

If multiple input plugins are defined within a single configuration file, each operates independently — data will be collected in parallel from all specified sources. When using multiple configurations of the same input plugin (e.g., several file blocks), each is handled according to its specific parameters.

Example input block:

input {
file {
path => "/tmp/log_file.log"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => "json"
}
}

This example uses the file input plugin with the following parameters for file reading:

  • path — specifies the path to the file to be processed by Logstash
  • start_position — defines the starting position for reading data from the file
  • sincedb_path — controls the path to the file storing the current read position. If not specified, Logstash uses a file in the ${LS_HOME}/data/plugins/inputs directory. To force reading from the beginning, you can specify /dev/null this will reset the position on restart and may cause data duplication
  • codec — a common parameter for all input plugins that specifies the incoming data decoding method. In this case, the file input plugin expects incoming file lines to be in JSON format and will deserialize them into objects

Filter Block

The filter block handles incoming data processing: parsing, filtering, and modification. For log parsing, the grok filter plugin is typically used, which works with regular expressions. Additional information about the grok filter plugin can be found in the Elastic official documentation. The filter block may also include the mutate plugin, which allows applying options like add_field, remove_field and other event field transformation methods. More information about these and other filter plugins is available in the Elastic official documentation. All plugins within the filter block execute sequentially.

Example of a simple filter block:

filter {
grok {
match => {
"message" => ['%{IP:client_ip}\s+"%{IP:remote_ip}"\s+\[%{HTTPDATE:timestamp}\]\s+%{WORD:http_method}]
}
}

mutate {
rename => { "http_method" => "method" }
add_field => { "service" => "web_server" }
}
}

Output Block

The output block defines where processed data is sent. This can include destinations such as Elasticsearch, OpenSearch, files, or databases.

If multiple output plugins are specified in the configuration, each processed event will be sequentially passed to all output blocks. Each output block receives its own copy of the event, allowing, for example, simultaneous data sending to OpenSearch and saving to a file for debugging purposes.

Please Note!

If one output block fails, the others will continue to function.

Example of a standard output block for OpenSearch:

output {
opensearch {
hosts => ["https://data1:9200", "https://data2:9200"]
index => "<INDEX_NAME>"
user => "logstash"
password => "${ES_PWD}"
ssl => true
cacert => "${LS_HOME}/config/ca-cert.pem"
ecs_compatibility => "disabled"
}
}

This example uses standard parameters:

  • hosts — data role nodes, multiple values can be specified comma-separated
  • index — name of the index where events will be stored. To automatically generate weekly indexes, use a dynamic naming pattern like <INDEX_NAME>-%{+xxxx.ww}, where %{+xxxx.ww} inserts the ISO-formatted year and week number (recommended to use policies ISM)
  • user — username for OpenSearch authentication
  • password — password for OpenSearch authentication. In this example, the password is retrieved from the Logstash keystore (see the relevant section on Adding a Password to the Keystore via Request)
  • ssl — parameter determining whether to use SSL
  • ssl_certificate_verification — certificate verification level
  • cacert — path to the CA certificate
  • ecs_compatibility — configures compatibility with the ECS

Adding the Pipeline to pipelines.yml

After creating the pipeline, it must be added to the pipelines.yml file to launch on the next Logstash start. Navigate to the ${LS_HOME}/config directory and open the pipelines.yml file for editing:

sudo nano ${LS_HOME}/config/pipelines.yml

Add the following lines to the file by executing:

- pipeline.id: <PIPELINE_ID>
path.config: "${LS_HOME}/config/conf.d/<PIPELINE_FILE>"
Please Note!

It’s recommended to use a <PIPELINE_ID> that matches the pipeline filename. This helps identify pipeline entries in logs and simplifies debugging.

Now restart Logstash and check its status with these commands:

sudo systemctl restart logstash 
sudo systemctl status logstash

Viewing Logs

To check running pipeline statistics, execute:

tail -n 100 /app/logs/logstash/logstash-plain.log

Your pipeline ID (<PIPELINE_ID>) should appear in the list of running pipelines (running_pipelines). Example of running pipelines display:

[2025-06-02T16:30:20,227][INFO ][logstash.agent           ] Pipelines running {:count=>3, :running_pipelines=>[:linux_io_monitoring, :win_activity, :ksc_applications], :non_running_pipelines=>[]}

In addition to the main Logstash log (logstash-plain.log), each pipeline has its own log saved as pipeline_<PIPELINE_ID>.log, containing detailed information about the pipeline's operation. Example log:

[2025-06-02T16:30:06,222][INFO ][logstash.javapipeline    ] Starting pipeline {:pipeline_id=>"linux_io_monitoring", "pipeline.workers"=>8, "pipeline.batch.size"=>225, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1800, "pipeline.sources"=>["/app/logstash/config/conf.d/linux_disk_io.conf"], :thread=>"#<Thread:0x2ebc0ca3 run>"}
[2025-06-02T16:30:07,953][INFO ][logstash.javapipeline ] Pipeline Java execution initialization time {"seconds"=>1.73}
[2025-06-02T16:30:08,678][INFO ][logstash.inputs.beats ] Starting input listener {:address=>"0.0.0.0:51123"}
[2025-06-02T16:30:08,779][INFO ][logstash.javapipeline ] Pipeline started {"pipeline.id"=>"linux_io_monitoring"}
[2025-06-02T16:30:09,058][INFO ][org.logstash.beats.Server] Starting server on port: 51123

The last line confirms that the pipeline has started successfully.