вторник, 26 мая 2026 г.

Сбор syslog и snmptrap с сетевых устройств в базу Clikhouse через Vector. Rocky Linux 9.7.

Реализуем следующую схему:
1. Настройка NTP сервера для корректного учета времени логов.
Проверяем установку NTP клиента
# systemctl status chronyd
Устанавливаем сервера времени:
# nano /etc/chrony.conf
Сервера для работы:
 server ntp.vtt.net iburst
 pool ru.pool.ntp.org iburst
 server ntp1.yandex.ru
 server ntp2.yandex.ru
 server ntp1.stratum2.ru iburst
 server ntp2.stratum2.ru iburst

# systemctl restart chronyd
Проверка:
# timedatectl status
# date
# chronyc tracking
# chronyc sources


2.Для быстрой работы базы Clickhouse необходимо немного подтюнить операционную систему:
2.1 Отключить использование SWAP
# swapoff -a
Закомментировать строку, подключающую SWAP в fstab
# nano /etc/fstab
У меня это строка:
 #/dev/mapper/rl-swap     none                    swap    defaults        0 0
2.2 Увеличить лимит открытых файлов (для high-load)
# echo "clickhouse soft nofile 262144" | sudo tee -a /etc/security/limits.conf
# echo "clickhouse hard nofile 262144" | sudo tee -a /etc/security/limits.conf

2.3 Включить опцию madvise механизма Transparent Huge Pages (THP) и разрешить Clickhouse опрос задержек ядра.
Цель: что бы ОС Linux не мешала Clikhouse использовать большие блоки памяти (На виртуальных машинах работает не всегда).
И чтобы Clickhouse не ругался на невозможность мониторить параметры задержки ядра процессора.
# nano /etc/sysctl.conf
Добавляем строки:
 vm.transparent_hugepage = madvise
 kernel.task_delayacct = 1

Добавляем в загрузчик GRUB опцию загрузки ОС с параметром madvise:
# grubby --update-kernel=ALL --args="transparent_hugepage=madvise"
Теперь необходима перезагрузка сервера.
# reboot
После перезагрузки проверяем, что опция madvise применилась. Пример правильного вывода:
# cat /proc/cmdline
BOOT_IMAGE=(hd0,msdos1)/vmlinuz-5.14.0-611.36.1.el9_7.x86_64 root=/dev/mapper/rl-root ro resume=/dev/mapper/rl-swap rd.lvm.lv=rl/root rd.lvm.lv=rl/swap crashkernel=1G-2G:192M,2G-64G:256M,64G-:512M transparent_hugepage=madvise
# cat /sys/kernel/mm/transparent_hugepage/enabled
always [madvise] never

# sysctl kernel.task_delayacct
kernel.task_delayacct = 1
# cat /proc/sys/kernel/task_delayacct
1


3. Устанавливаем Clikhouse:
# dnf install yum-utils curl ca-certificates gnupg2
# yum-config-manager --add-repo https://packages.clickhouse.com/rpm/clickhouse.repo
# dnf install -y clickhouse-server clickhouse-client
# systemctl enable --now clickhouse-server
# systemctl status clickhouse-server

После установки и запуска сервера, выполняем проверку:
# clickhouse-client --query "SELECT version();"

4. Правим конфигурационный файл Clikhouse
# nano /etc/clickhouse-server/config.xml
Раскомментируем строку (тем самым отключаем IPv6)
 <listen_host>0.0.0.0</listen_host> 
Изменяем уровень логирования – оставляем только ошибки
 <logger>
   <level>warning</level> 
 <logger>

Ограничиваем сервер полкой в использовании памяти - в 5Г, иначе сервер съест другие приложения
 <max_server_memory_usage>5000000000</max_server_memory_usage>
Отключаем процентное указание в использование памяти
 <max_server_memory_usage_to_ram_ratio>0</max_server_memory_usage_to_ram_ratio>
Рестартуем сервис для изменений:
# systemctl restart clickhouse-server

5. Создаем базу данных и таблицу для записи метрик с партициями по месяцу:
# clickhouse-client -h localhost
 :) CREATE DATABASE IF NOT EXISTS logsdb;
 :) CREATE TABLE logsdb.logsdb (
    ts DateTime64(3),
    d Date MATERIALIZED toDate(ts),
    source_type Enum8('syslog' = 1, 'snmptrap' = 2),
    host String,
    severity LowCardinality(String),
    facility LowCardinality(String),
    message String CODEC(ZSTD(3)),
    INDEX idx_host_token host TYPE tokenbf_v1(1024, 3, 0) GRANULARITY 4,
    INDEX idx_message_token message TYPE tokenbf_v1(4096, 3, 0) GRANULARITY 8,
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(d)
ORDER BY (ts, source_type, host)
TTL d + INTERVAL 1 YEAR
SETTINGS
    index_granularity = 8192,
    compress_primary_key = 1,
    enable_mixed_granularity_parts = 1,
    min_bytes_for_wide_part = 0;


6. Устанавливаем демон SNMPTRAPD, который будет ловить трапы от оборудования. Демон snmptrapd входит в пакет net-snmp.
# dnf install net-snmp net-snmp-utils
Конфиг лежит тут: /etc/snmp/snmptrapd.conf
# nano /etc/snmp/snmptrapd.conf
В конфигурационый файл вносим директивы:
 disableAuthorization yes
 # Формат для SNMPv1
 format1 TRAPIP=%b | SNMPVER=1 | TRAPTIME=%y-%m-%L %H:%J:%k | TRAPTYPE=%W | TRAPVAL: %v\n
 # Формат для SNMPv2c
 format2 TRAPIP=%b | SNMPVER=2 | TRAPTIME=%y-%m-%L %H:%J:%k | TRAPTYPE=%W | TRAPVAL: %v\n

Создаем файл логов snmptrapd:
# touch /var/log/snmptrapd.log
# chmod 644 /var/log/snmptrapd.log

Запускаем демон для теста
# snmptrapd -f -Lo -c /etc/snmp/snmptrapd.conf
Из другой консоли генерируем отправку трапа:
# snmptrap -v 2c -c my_secret_string 127.0.0.1 "" .1.3.6.1.4.1.9.9.43.2.0.1
При этом в консоле демона snmptrapd должны увидеть захваченный лог.

7. Настраиваем unit systemd для запуска SNMPTRAPD.
При настройке важно, что режим перенаправления логов в файл задается при запуске демона:
# systemctl edit snmptrapd
Вставляем команды редактирующие основной unit:
 ### Anything between here and the comment below will become the new contents of the file
 [Service]
 ExecStart=
 ExecStart=/usr/sbin/snmptrapd -f -A -c /etc/snmp/snmptrapd.conf -Lf /var/log/snmptrapd.log
 ### Lines below this comment will be discarded

Запускаем службу:
# systemctl daemon-reload
# systemctl start snmptrapd
# systemctl enable snmptrapd
# systemctl status snmptrapd


8. Организовываем ротацию файлов логов трапов snmp
# nano /etc/logrotate.d/snmptrapd
Содержимое файла:
 /var/log/snmptrapd.log {
    daily
    rotate 7
    missingok
    notifempty
    copytruncate
 }

Проверка синтаксиса:
# logrotate -d /etc/logrotate.d/snmptrapd
Принудительная ротация логов прямо сейчас:
# logrotate -f /etc/logrotate.d/snmptrapd

9. Устанавливаем Vector.
Скачиваем пакет установки Vector для Rocky Linux 9.6:
# cd /usr/src
# wget https://packages.timber.io/vector/0.48.0/vector-0.48.0-1.x86_64.rpm
# sudo dnf localinstall -y vector-0.48.0-1.x86_64.rpm

Проверка
# vector --version
# which vector


10. Конфигурация Vector для приема syslog и snmptrap
# nano /etc/vector/vector.toml
 [sources.syslog]
 type = "socket"
 address = "0.0.0.0:514"
 mode = "udp"
 decoding.codec = "bytes"
 host_key = "source_ip"
 [sources.snmptrap]
 type = "file"
 include = ["/var/log/snmptrapd.log"]
 read_from = "beginning"
 ignore_not_found = true
 fingerprinting.strategy = "device_and_inode"
 [transforms.parse_syslog]
 type = "remap"
 inputs = ["syslog"]
 source = '''
 messageone = to_string!(.message)
 # Ищем PRI в любом месте строки, извлекаем только цифры внутри <>
 matched = parse_regex(messageone, r'\<(?P<pri_value>[0-9]+)>(?P<mess>.*)$') ?? {}
 .message = to_string(matched.mess)
 if matched.pri_value != null {
    .pri = matched.pri_value
    
    # Выводим в лог
    #log("MATCHED PRI VALUE: " + .pri)
    .fac_id = (to_int(to_int!(.pri) / 8))
    .facility_name = "unknown"
    if .fac_id == 0 { .facility_name = "kern" }
    if .fac_id == 1 { .facility_name = "user" }
    if .fac_id == 2 { .facility_name = "mail" }
    if .fac_id == 3 { .facility_name = "daemon" }
    if .fac_id == 4 { .facility_name = "auth" }
    if .fac_id == 5 { .facility_name = "syslog" }
    if .fac_id == 6 { .facility_name = "lpr" }
    if .fac_id == 7 { .facility_name = "news" }
    if .fac_id == 8 { .facility_name = "uucp" }
    if .fac_id == 9 { .facility_name = "cron" }
    if .fac_id == 10 { .facility_name = "authpriv" }
    if .fac_id == 11 { .facility_name = "ftp" }
    if .fac_id == 12 { .facility_name = "ntp" }
    if .fac_id == 13 { .facility_name = "audit" }
    if .fac_id == 14 { .facility_name = "alert" }
    if .fac_id == 15 { .facility_name = "clock" }
    if .fac_id == 16 { .facility_name = "local0" }
    if .fac_id == 17 { .facility_name = "local1" }
    if .fac_id == 18 { .facility_name = "local2" }
    if .fac_id == 19 { .facility_name = "local3" }
    if .fac_id == 20 { .facility_name = "local4" }
    if .fac_id == 21 { .facility_name = "local5" }
    if .fac_id == 22 { .facility_name = "local6" }
    if .fac_id == 23 { .facility_name = "local7" }
    .facility = .facility_name
    sev_id = to_int!(.pri) - (.fac_id * 8)
    .severity_name = "unknown" 
    if sev_id == 0 { .severity_name = "emerg" }
    if sev_id == 1 { .severity_name = "alert" }
    if sev_id == 2 { .severity_name = "crit" }
    if sev_id == 3 { .severity_name = "err" }
    if sev_id == 4 { .severity_name = "warning" }
    if sev_id == 5 { .severity_name = "notice" }
    if sev_id == 6 { .severity_name = "info" }
    if sev_id == 7 { .severity_name = "debug" }
    .severity = .severity_name
 } else {
    #log("PRI not found in message", level: "warn")
    .facility = "unknown"
    .severity = "unknown"
    .message = messageone
 }
 .ts = to_int(to_unix_timestamp(now()) * 1000.0)
 .source_type = 1
 .host = .source_ip
 '''
 [transforms.parse_snmptrap]
 type = "remap"
 inputs = ["snmptrap"]
 source = '''
 ts_value = to_int(to_unix_timestamp(now()) * 1000.0)
 # Разбиваем сообщение на массив
 trap_array = split(string!(.message), "|")
 arr_len = length(trap_array)
 # Инициализируем переменные дефолтными значениями
 snmpver = ""
 trapip = ""
 msg_2 = ""
 msg_3 = ""
 msg_4 = ""
 facility_value = "snmp_unknown"
 message_value = ""
 # Заполняем переменные только если индексы физически существуют в массиве
 if arr_len > 0 { trapip = to_string(trap_array[0]) }
 if arr_len > 1 { snmpver = to_string(trap_array[1]) }
 if arr_len > 2 { msg_2 = to_string(trap_array[2]) }
 if arr_len > 3 { msg_3 = to_string(trap_array[3]) }
 if arr_len > 4 { msg_4 = to_string(trap_array[4]) }
 matched = parse_regex(trapip, r'UDP:\s*\[(?P<ip>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\]') ?? {}
 host_value = to_string(matched.ip)
 if snmpver == " SNMPVER=1 " {
  facility_value = "snmp1"
 } else {
  facility_value = "snmp2"
 }
 # Если строка вообще не содержала разделителей "|", сохраняем оригинальное сообщение целиком
 if msg_2 == "" && msg_3 == "" {
  message_value = strip_whitespace(string!(.message))
 } else {
  message_value = strip_whitespace(msg_2 + "|" + msg_3 + "|" + msg_4)
 }
 # Полностью очищаем объект от системных метаданных Vector
 . = {}
 # Наполняем строго по схеме таблицы net_logs
 .ts = ts_value
 .host = host_value
 .facility = facility_value
 .message = message_value
 .severity = "info"
 .source_type = "snmptrap"
 '''
 [sinks.clickhouse_syslog]
 type = "clickhouse"
 inputs = ["parse_syslog"]
 database = "logsdb"
 table = "net_logs"
 endpoint = "http://localhost:8123"
 compression = "none"
 [sinks.clickhouse_syslog.auth]
 strategy = "basic"
 user = "default"
 password = ""
 [sinks.clickhouse_syslog.batch]
 max_events = 1000
 timeout_secs = 1
 [sinks.clickhouse_snmptrap]
 type = "clickhouse"
 inputs = ["parse_snmptrap"]
 database = "logsdb"
 table = "net_logs"
 endpoint = "http://localhost:8123"
 compression = "none"
 [sinks.clickhouse_snmptrap.auth]
 strategy = "basic"
 user = "default"
 password = ""
 [sinks.clickhouse_snmptrap.batch]
 max_events = 1000
 timeout_secs = 1
 [sinks.clickhouse_snmptrap.request]
 retry_attempts = 0

Проверяем конфиг
# vector validate --config-dir /etc/vector/
Пытаемся запуститься под root
# vector --config /etc/vector/vector.toml
Видим, что все запускается, порты 162 и 514 начинают прослушиваться.

11. Настройка службы запуска vector.
11.1 По умолчанию Linux не позволяет обычным программам, типа vector, слушать порты ниже 1024. Vector же должен слушать порты SNMPtrap (162) и syslog (514):
# setcap 'cap_net_bind_service=+ep' $(which vector)
11.2 Вносим изменения в unit systemd службы vector
# systemctl edit vector
Вставляем директивы запуска, отличные от типовых:
 ### Anything between here and the comment below will become the new contents of the file
 [Unit]
 After=network-online.target snmptrapd.service clickhouse-server.service
 Requires=network-online.target snmptrapd.service clickhouse-server.service
 [Service]
 User=vector
 Group=vector
 ExecStart=
 ExecStart=/usr/bin/vector --config /etc/vector/vector.toml
 AmbientCapabilities=CAP_NET_BIND_SERVICE
 CapabilityBoundingSet=CAP_NET_BIND_SERVICE
 NoNewPrivileges=yes
 Restart=on-failure
 RestartSec=5
 PrivateNetwork=no
 RestrictAddressFamilies=AF_INET AF_INET6 AF_UNIX
 Environment=VECTOR_LOG_FORMAT=text
 ### Lines below this comment will be discarded

Запускаем службу:
# systemctl daemon-reload
# systemctl start vector
# systemctl enable vector

Проверка работы:
# journalctl -u vector -f

12. Настраиваем firewalld:
Открываем порты для внешнего доступа к сервисам Clikhouse (8123 и 9000), для SNMPtrap (162) и syslog (514):
# firewall-cmd --permanent --add-port=8123/tcp
# firewall-cmd --permanent --add-port=9000/tcp
# firewall-cmd --permanent --add-port=514/udp
# firewall-cmd --permanent --add-port=162/udp
# firewall-cmd --reload
# firewall-cmd --list-ports


13. Проверка вставки данных:
13.1 Тестовая отправка syslog:
# logger -n 127.0.0.1 -p local0.info -t vector_test "Pipeline check $(date +%s)"
Тестовая отправка snmptrap:
# snmptrap -v 2c -c my_secret_string 127.0.0.1 "" .1.3.6.1.4.1.9.9.43.2.0.1
# snmptrap -v 1  -c my_secret_string 127.0.0.1 1.3.6.1.4.1.9.9.43 192.168.100.100 6 1 0 .1.3.6.1.4.1.9.9.43.2.0.1 s "test"
13.2 Проверка записи данных в базу Clickhouse
# clickhouse-client -h localhost --query "SELECT * FROM logsdb.net_logs ORDER BY ts DESC LIMIT 5"

ПРИМЕЧАНИЕ:
Если Vector перестал записывать данные логов из файла /var/log/snmptrapd.log, то может быть он запомнил старые данные inode файла, который поменялся при ротации.
Можно очистить checkpoints сервиса vector
# systemctl stop vector
# rm -rf /var/lib/vector/*
# systemctl start vector

Комментариев нет:

Отправить комментарий