Agentless data collection
General recommendations
The agentless method of data collection involves the use of Logstash and its plugins.
All the configurations listed below work provided that the data collection plug-ins are installed.
When parsing any data into components, it is recommended to adhere to certain standards to maintain versatility and improve migration between versions. The common names of such standards in Elastic are Elastic Common Schema (ECS). Since the Logstash 8.x version, the v8 standard is used by default. For Logstash 7.x versions, the use of ECS is optional.
We recommend that you adhere to the ECS v8 standard.
Pay attention to the version of Logstash you are using when viewing the parameters for a specific plugin on the official website.
There can be multiple sources in one pipeline. It is recommended to do this only if the collected data from different sources have the same format. For example, you are collecting nginx web server logs in one place and you need to specify several files for processing.
Some plugins have a number of required parameters. A list of such parameters is provided on the official documentation page of each plugin.
Plugins often accept information presented in various data formats. In order to process them properly and break them down for the convenience of further analysis, it is necessary to use filters. If the received data suits you and does not require additional processing, then the filter block can be omitted.
All Logstash plugins are included in the SAF installer distribution and are installed automatically. We are using Logstash version 8.4, so all the examples are given for it.
Saving credentials to connect to sources
Credentials for connecting to sources are stored using an encrypted logstash keystore.
To add credentials to the storage on the logstash server, run the following commands:
# Go to the executable directory, the default path
is cd /app/logstash/bin/
# Adding a user record named source.user to the repository. The input is done from
the sudo -u logstash keyboard./logstash-keystore add source.user
# Adding a password entry named source.password to the repository. The input is done from
the sudo -u logstash keyboard./logstash-keystore add source.password
# Adding an SNMP Community record named source.snmp_comm to the repository. The input is done from
the sudo -u logstash keyboard ./logstash-keystore add source.snmp_comm
# Checking the list of stored records
sudo -u logstash ./logstash-keystore list
``
Further, in the data collection configuration files, the saved credentials are specified in the form of keystore tokens:
```bash
user => "${source.user}"
password => "${source.password}"
community => "${source.snmp_comm}"
Saving any identification data (username, password, DB name, SNMP community name, etc.) in configuration files in plain text is unsafe.
Frequently used plugins
file
Allows you to stream events from files line by line, it is possible to read several lines at once. You can read more on the official documentation page.
For the plugin to work properly, you need to make sure that Logstash (usually the user of the same name and the "logstash" group are used) have access to the directory where the specified file is located and have rights to read the file itself.
In the example below, we read the data from the HR2m.csv file and modify the received data before sending it further. The frequently used plugin parameters are listed below:
- path- the path to the file, _ is a required parameter_
- start_position- from which place to read the file. beginning - reading the file from the beginning, end - reading only the last entries, the default value is end;
- sincedb_path\ is a sincedb database file for storing the last read position from the file.
input {
file {
path => "/path/to/HR2m.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv{
columns => [ "id", "name_prefix", "first_name", "middle_initial", "last_name", "gender", "e_mail", "fathers_name", "mothers_name", "mothers_maiden_name", "date_of_birth", "time_of_birth", "age_in", "weight", "date_of_joining", "quarter_of_joining", "half_of_joining", "year_of_joining", "month_of_joining", "month_name_of_joining","short_month", "day_of_joining", "dow_of_joining", "short_dow", "age", "salary", "last_hike", "ssn", "phone", "place_name", "county", "city", "state", "zip", "region", "username", "password"]
separator => ","
skip_header => true
source => "message"
}
}
Please note that in this example, additional data manipulation is performed in the filter block - using the csv filter plugin, which will allow you to split each message by fields.
tcp
Reading events via a tcp socket in server or client mode. You can read more on the official documentation page.
In the configuration example below, we read events via a tcp socket on port 5411 in server mode (a third-party application will send data to port 5411 of this server), the data will arrive in json format, after reading without preprocessing, it will be sent further using plug-ins in the output block.
Please note that this example does not use additional processing of the received data, as a result, the filter block is omitted. The optional host parameter is also omitted. For this example, the mode parameter can also be omitted.
- port- listening port, required parameter
- mode- the mode of operation of the plugin, can be "server" or "client", by default "server" is used
- host- for the server mode of operation, the address at which to receive data is specified, for the client mode, the address from where to collect data is specified. The default value is "0.0.0.0"
- codec - defines the format of the source data
input {
tcp {
port => 5411
mode => "server"
codec => "json"
}
}
snmp
SNMP polls sources to collect information related to the current state of operation and system information. You can read more on the official documentation page.
In the example below, metrics are read from several workstations. Please note that you can specify several plugins in one pipeline, including the same ones. It is worth noting that the plugin does not have required parameters. Frequently used parameters are listed below:
- get - request for the values of the specified end OIDs, specified as a list (array)
- walk- request the values of all nested (lower-level) OIDs, specified as a list (array)
- tables- query values from several walk subqueries (columns), specified as a table (two-dimensional array)
- hosts- a list of hosts to request configured options, specified as a list (array)
- type - the "type" field is added for all data from this source, which makes it easier to process data by "labeling", for example, in the filter block. The parameter is not a unique parameter of this filter, it can be used for all plugins in the input block
input {
snmp {
get => ["1.3.6.1.2.1.2.2.1.8.1", "1.3.6.1.2.1.2.2.1.10.1", "1.3.6.1.2.1.2.2.1.16.1", "1.3.6.1.2.1.2.2.1.2.1", "1.3.6.1.2.1.2.2.1.6.1"]
walk => ["1.3.6.1.2.1.2.2.1.2.1", "1.3.6.1.2.1.4.20.1.1", "1.3.6.1.2.1.4.20.1.2", "1.3.6.1.2.1.4.20.1.3"]
tables => [{"name" => "table_name" "columns" => ["1.3.6.1.2.1.2.2.1.1", "1.3.6.1.2.1.2.2.1.2", "1.3.6.1.2.1.2.2.1.3"]}]
hosts => [{host => "udp:172.16.0.110/161" community => "${community.name}" version => "2c" retries => 2 timeout => 1000},
{host => "udp:172.16.0.146/161" community => "${community.name}" version => "2c" retries => 2 timeout => 1000},
{host => "udp:172.16.0.148/161" community => "${community.name}" version => "2c" retries => 2 timeout => 1000}]
type => "1"
}
snmp {
get => ["1.3.6.1.2.1.2.2.1.8.2", "1.3.6.1.2.1.2.2.1.10.2", "1.3.6.1.2.1.2.2.1.16.2", "1.3.6.1.2.1.2.2.1.2.2", "1.3.6.1.2.1.2.2.1.6.2"]
walk => ["1.3.6.1.2.1.2.2.1.2.1", "1.3.6.1.2.1.4.20.1.1", "1.3.6.1.2.1.4.20.1.2", "1.3.6.1.2.1.4.20.1.3"]
hosts => [{host => "udp:172.16.0.110/161" community => "${community.name}" version => "2c" retries => 2 timeout => 1000},
{host => "udp:172.16.0.146/161" community => "${community.name}" version => "2c" retries => 2 timeout => 1000},
{host => "udp:172.16.0.148/161" community => "${community.name}" version => "2c" retries => 2 timeout => 1000}]
type => "2"
}
}
jdbc
The JDBC plugin allows you to extract data from any database with the JDBC interface. You can read more on the official documentation page. The frequently used parameters are shown below:
- jdbc_driver_class - the JDBC driver to the corresponding database. _ Mandatory parameter_
- jdbc_connection_string - the JDBC connection string, the login and password to the database can be specified here, but we do not recommend doing so. _ Mandatory parameter_
- jdbc_user - the database user must have the rights to execute the corresponding request. _ Mandatory parameter_
- jdbc_password is the password of the database user. We recommend that you do not specify the password in clear text, but use the keystore
- statement - query to the database for execution, allows you to use parameters that can be set in the parameters parameter. parameter is not explicitly required, you can use statement_filepath instead of this parameter
- schedule - schedule the plugin launch using rufus-scheduler, the syntax is similar to cron, but has much more features
- use_column_value - defines the value of the sql_last_value variable, if set to true - the value of the tracking_column parameter is used, otherwise - the time of the last query execution
- tracking_column - column whose value should be tracked if the use_column_value parameter is set to true
- tracking_column_type \ is the type of column specified in the tracking_column parameter. It can take one of two values - "numeric", "timestamp". By default, "numeric" is used - last_run_metadata_path - the path to the file with the last request execution time
input {
jdbc {
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://<ip>\\sqlexpress:<port>"
jdbc_user => "${db_username}"
jdbc_password => "${db_password}"
statement => "SELECT MNAME, LASTACTIVITY, CLIENTID, VERSION FROM [SN7_SERVER_SCHEMA].[dbo].[CLIENT] WHERE CLIENTCLASS = 'Service' GROUP BY MNAME, LASTACTIVITY, CLIENTID, VERSION"
schedule => "1-59/5 * * * *"
use_column_value => true
tracking_column => clientid
tracking_column_type => "numeric"
last_run_metadata_path => "/path/to/metadata/file"
}
}
kafka
Allows you to read events from Apache Kafka. You can read more in our documentation or on the official plugin documentation page. The plugin has no required parameters, the main parameters used are shown below:
- topics - list (array) topic to read, default value ["logstash"]
- bootstrap_servers is the address of the Apache Kafka broker. The default value is "localhost:9092"
- consumer_threads - the number of threads to read data from Apache Kafka, by default 1
- decorate_events - allows you to add additional information from Apache Kafka to the read data. The kafka field is added, which includes the parameters topic, consumer_group, partition, offset, key. It can take one of the values - "none", "basic", "extended". the default value is "none"
- auto_offset_reset - selection of behavior if there is no offset value or if it is not correct. It can take the values "earliest", "latest", "none". If the group that consumer belongs to has not previously used this topic, then with the "earliest" parameter, all existing records from Kafka will be read
- client_id and group_id — the name and group of consumer. It is worth noting that if several Kafka input plugins are used in one pipeline, then it is recommended to set your own user and your own group for each
- type - the "type" field is added for all data from this source, which makes it easier to process data by "labeling", for example, in the filter block. The parameter is not a unique parameter of this filter, it can be used for all plugins in the input block. It is recommended to use this parameter if data is being read from Kafka by several logstash servers for additional analysis capabilities in the future
input {
kafka {
topics => ["volgablob"]
bootstrap_servers => '172.16.0.26:9092'
consumer_threads => 3
decorate_events => "basic"
auto_offset_reset => "earliest"
client_id => "manylogstash_2"
group_id => "manylogstash"
type => "logstash2"
}
}
http_poller
Allows you to call the HTTP API and process the received data. Reads from a list of URLs. You can read more on the official documentation page.
urls - hash of URLs in the format "name" => "url". Both values will be passed along with the data. The "url" can be either a string or a hash. _ A mandatory parameter._ In the case of a "url" as a hash, it may contain parameters supported by Manticore , the following parameters are used most often:
- url - url string, _ mandatory parameter_
- method - HTTP method to use, GET is used by default
- user - user for authorization using the HTTP Basic Auth scheme
- password - the user's password for authorization using the HTTP Basic Auth scheme
- headers - hash of value fields
- schedule - schedule the plugin launch using rufus-scheduler, the syntax is similar to cron, but has much more features. _ Mandatory parameter_
- request_timeout - timeout (in seconds) for the entire request, the default value is 60 seconds;
- codec - allows you to convert the received data before receiving it according to some standard, it is a universal parameter for all plug-ins in the input block. The default value is "json". The list of available codecs can be found in the official documentation
input {
http_poller {
urls => {
serverclasses => {
method => get
user => "${username}"
password => "${password}"
url => "https://<opensearch_ip>:<opensearch_port>/serverclasses"
}
}
request_timeout => 60
schedule => { cron => "* * * * * UTC"}
codec => "json"
}
}
http
Allows you to receive single-line or multi-line events via HTTP(S), can both receive and accept webhook requests. You can read more on the official documentation page. The plugin has no required parameters. The frequently used parameters are shown below:
- host - the server host or IP address where to receive data, the default value is "0.0.0.0"
- port - tcp port if the plug-in is in receive mode. The default port is 8080
input {
http {
host => "localhost"
port => 8080
}
}
LDAPSearch
Allows you to filter out many records from the LDAP server and generate an event for the extracted record. Each dataset contains attributes defined in the ATTRS parameter, the plugin was developed by a third-party company, so the documentation can be read on the official page разработчиков. This plugin does not use many parameters, but almost all of them are required. All the parameters are shown below:
- host - Server IP address, mandatory parameter
- password - password
- dn is a unique name that identifies the user or group, a required parameter
- base - domain components, required parameter
- filter is a field for filtering records, a mandatory parameter
- attrs - list (array) of attributes of each record, by default ['uid']
- port - connection port, default is "389"
- usessl - whether to use SSL, default is "false"
- schedule - schedule the plugin launch using rufus-scheduler, the syntax is similar to cron, but has much more features
input {
LDAPSearch {
host => "<host_ip>"
password => "${ldap_password}"
dn => "dn"
base => 'dc'
attrs => []
filter => "(objectClass=computer)"
schedule => "14 * * * *"
}
}
opensearch
Allows you to reindex the data for additional processing and then write to another index. OpenSearch is a fork for ElasticSearch, so you can view the information in the elasticsearch plugin on the official documentation page.. The plugin does not have required parameters, frequently used parameters are presented below:
- hosts - list (array) the URLs of the OpenSearch server. Each url can include IP, HOST, IP:port, HOST:port
- index - which index is being read from. It is important to specify a specific index without using the index pattern
- user is the username for connecting to OpenSearch
- password - the user's password to connect to OpenSearch
- query - the query being executed for reading. For more information, see the official documentation
- ca_file - path to the Certificate Authority certificate file, must include all necessary certificate chains
- schedule - schedule the plugin launch using rufus-scheduler, the syntax is similar to cron, but has much more features
input {
opensearch {
hosts => ["localhost:9200"]
index => "linux_deamon-*"
user => "${opensearch_username}"
password => "${opensearch_password}"
query => '{ "size": 10, "query": { "match_all": {}} }'
ca_file => "/path/to/cert"
schedule => "* * * * *"
}
}