본문 바로가기
ELK/logstash

ELK - Logstash 1 : 로그스태시 filter

by 킹차니 2023. 6. 22.

필터입력 플러그인이 받은 데이터를 의미있는 데이터로 구조화하는 역할을 한다.

로그스태시 필터는 비정형 데이터를 정형화하고 데이터 분석을 위한 구조를 잡아준다.

입력으로 받은 데이터를 로그스태시 필터를 사용하여 필요한 정보만 추출하거나 형태를 변환하고 부족한 정보는 추가하는 등 전반적인 데이터 정제/가공 작업을 할 수 있다.

필터 역시 플러그인 형태이며, 입력과 비슷하게 다양한 필터 플러그인들이 존재한다. 자주 사용되는 필터 플러그인을 몇가지 사용해보자.

 

필터 플러그인 설명
grok grok 패턴을 사용하여 메시지를 구조화된 형태로 분석한다. grok 패턴은 일반적인 정규식과 유사하지만, 추가적으로 미리 정의된 패턴이나 필드 이름 설정, 데이터 타입 정의 등을 도와준다.
dissect 간단한 패턴을 사용해 메시지를 구조회된 형태로 분석한다. 정규식을 사용하지 않아 grok에 비해 자유도는 떨어지지만 더 빠른 처리가 가능하다.
mutate 필드명을 변경하거나 문자열 처리 등 일반적인 가공 함수를 제공한다.
date 문자열을 지정한 패턴의 날짜형으로 분석한다.

 

 

 

먼저 테스트 로그는 다음과 같다.

[2023-06-21 17:21] [ID1] 192.10.2.6 9500 [INFO] - connected.
[2023-06-21 17:31] [ID2] 211.25.3.1 1010 [warn] - busy server.

- elasticsearch.log 파일

 

문자열 자르기

mutate -split

mutate 플러그인의 split 옵션을 사용하여 문자열을 잘라보자.

(그 전에 mutate에는 다양한 옵션이 있는데  coerce - rename - update - replace - convert - gsub - uppercase -capitalize - lowercase - strip - remove - split - join - merge - copy  순으로 적용됨을 기억하자.)

 

split 옵션은 구분자를 기준으로 데이터를 자를 수 있다. 아래 로그스태시 설정 파일에서는 공백(" ")을 기준으로 문자열을 분리한다.

input {
  file {
    path => "C:/logstash-7.10.1/config/filter-example.log"
    start_position => "beginning"
    sincedb_path => "nul"
  }
}

filter {
  mutate {
    split => { "message" => " " }
  }
}

output {
  stdout { }
}

위의 설정을 적용하여 실행하면 결과는 아래와 같다.

{
    "@timestamp" => 2023-06-22T06:02:36.376470Z,
         "event" => {
        "original" => "[2023-06-21 17:21] [ID1] 192.10.2.6 9500 [INFO] - connected."
    },
           "log" => {
        "file" => {
            "path" => "/Users/leechanyoung/elasticsearch-8.8.1/logs/elasticsearch.log"
        }
    },
      "@version" => "1",
          "host" => {
        "name" => "ichan-yeong-ui-MacBookPro.local"
    },
       "message" => [
        [0] "[2023-06-21",
        [1] "17:21]",
        [2] "[ID1]",
        [3] "192.10.2.6",
        [4] "9500",
        [5] "[INFO]",
        [6] "-",
        [7] "connected."
    ]
}
{
    "@timestamp" => 2023-06-22T06:02:36.386836Z,
         "event" => {
        "original" => "[2023-06-21 17:31]    [ID2] 211.25.3.1 1010 [warn] - busy server."
    },
           "log" => {
        "file" => {
            "path" => "/Users/leechanyoung/elasticsearch-8.8.1/logs/elasticsearch.log"
        }
    },
      "@version" => "1",
          "host" => {
        "name" => "ichan-yeong-ui-MacBookPro.local"
    },
       "message" => [
        [0] "[2023-06-21",
        [1] "17:31]",
        [2] "[ID2]",
        [3] "211.25.3.1",
        [4] "1010",
        [5] "[warn]",
        [6] "-",
        [7] "busy",
        [8] "server."
    ]
}

message 필드 문자열이 공백을 기준으로 구분되어 배열 형태의 데이터가 되었음을 볼 수 있다. 

이때 구분된 문자열은 '필드명[숫자]'로 접근할 수 있다. 예로 ID는 message[2]와 같다.

 

mutate 플러그인에 다른 옵션도 추가해보자.

 

 

mutate - split + add_field + remove_field

....

filter {
  mutate {
    split => { "message" => " " }
    add_field => { "id" => "%{[message][2]}" }
    remove_field => "message"
  }
}

....

add_field 와 remove_field는 필터 플러그인의 공통 옵션이다.

 

아래는 필터 플러그인 공통 옵션)

공통 옵션 설명
add_field 새로운 필드를 추가
add_tag 성공한 이벤트에 태그 추가
enable_metric 메트릭 로깅을 활성화하거나 비활성화. 디폴트는 활성화로 수집된 데이터는 로그스태시 모니터링에서 해당 필터의 성능을 분석할 때 사용된다.
id 플러그인의 아이디 설정. 모니터링 시 아이디를 이용해 특정 플러그인을쉽게 찾을 수 있음
remove_field 필드를 삭제
remove_tag 성공한 이벤트에 붙은 태그를 제거

 

결과는 아래와 같다.

{
         "event" => {
        "original" => "[2023-06-21 17:21] [ID1] 192.10.2.6 9500 [INFO] - connected."
    },
    "@timestamp" => 2023-06-22T06:33:03.889235Z,
            "id" => "[ID1]",
          "host" => { "name" => "ichan-yeong-ui-MacBookPro.local" },
           "log" => {
             "file" => { "path" => "/Users/leechanyoung/elasticsearch-8.8.1/logs/elasticsearch.log" }
           },
      "@version" => "1"
}
{
         "event" => {
        "original" => "[2023-06-21 17:31]    [ID2] 211.25.3.1 1010 [warn] - busy server."
    },
    "@timestamp" => 2023-06-22T06:33:03.888236Z,
            "id" => "[ID2]",
          "host" => { "name" => "ichan-yeong-ui-MacBookPro.local" },
           "log" => {
             "file" => { "path" => "/Users/leechanyoung/elasticsearch-8.8.1/logs/elasticsearch.log"
           }
    },
      "@version" => "1"
}

결과를 보면 id필드가 생기고, message 필드는 사라진 것을 볼 수 있다.

 

 

 

dissect를 이용한 문자열 파싱

 

mutate 플러그인의 split 옵션을 이용해 문자열을 분리하면 하나의 구분자만 이용해서 데이터를 나눠야 한다는 단점이 있다. 하지만 dissect 플러그인은 패턴을 이용해 문자열을 분석하고 주요 정보를 필드로 추출하는 기능을 수행한다. filter를 아래와 같이 수정해보자.

....

filter {
  dissect {
    mapping => {"message" => "[%{timestamp}] [%{id}] %{ip} %{port} [%{level}] - %{message}."}
  }
}

....

결과는 다음과 같다.

{
           "log" => {
        "file" => {
            "path" => "/Users/leechanyoung/elasticsearch-8.8.1/logs/elasticsearch.log"
        }
    },
         "event" => {
        "original" => "[2023-06-21 17:31]    [ID2] 211.25.3.1 1010 [warn] - busy server."
    },
          "tags" => [
        [0] "_dissectfailure"
    ],
      "@version" => "1",
          "host" => {
        "name" => "ichan-yeong-ui-MacBookPro.local"
    },
    "@timestamp" => 2023-06-22T06:40:08.773074Z,
       "message" => "[2023-06-21 17:31]    [ID2] 211.25.3.1 1010 [warn] - busy server."
}
{
         "level" => "INFO",
     "timestamp" => "2023-06-21 17:21",
            "ip" => "192.10.2.6",
         "event" => {
        "original" => "[2023-06-21 17:21] [ID1] 192.10.2.6 9500 [INFO] - connected."
    },
            "id" => "ID1",
      "@version" => "1",
    "@timestamp" => 2023-06-22T06:40:08.763896Z,
          "port" => "9500",
           "log" => {
        "file" => {
            "path" => "/Users/leechanyoung/elasticsearch-8.8.1/logs/elasticsearch.log"
        }
    },
          "host" => {
        "name" => "ichan-yeong-ui-MacBookPro.local"
    },
       "message" => "connected"
}

dissect 플러그인의 mapping 옵션에 구분자 형태를 정의하고 필드를 구분한다.

%{필드명} 으로 작성하면 중괄호{} 안의 필드명으로 새로운 필드가 생성된다. %{} 외의 문자들은 모두 구분자 역할을 한다.

message 필드의 문자열이 [timestamp[ [id] ip port [level] - msg 형식으로 구성되어 있다면 구분자에 맞추어 문자열을 자르고 생성된 형태에 맞춰 필드를 구분한다.

 

하지만 결과를 보면 [ID2]의 로그는 _dissectfailure가 발생했다. 해당 에러는 dissect 필터 플러그인이 동작하지 않은 경우에 발생한다.

[ID2]의 로그는 다음과 같다.

 

[2023-06-21 17:31]    [ID2] 211.25.3.1 1010 [warn] - busy server.

보면 timestamp와 id 사이의 공백이 한칸이 아닌것을 볼 수 있다. 즉 dissect 플러그인은 공백 한 칸과 세 칸을 다르게 인식한다.

 

이를 해결하기 위해 매핑에 적용할 수 있는 기호를 사용할 수 있다.

....

filter {
  dissect {
    mapping => {"message" => "[%{timestamp}]%{?->}[%{id}] %{ip} %{+ip} [%{?level}] - %{}."}
  }
}

....

-> 기호는 공백을 무시한다.

%{필드명->}을 입력하면 공백이 몇칸이든 하나의 공백으로 인식한다.

%{?필드명} or %{}를 입력하면 그 필드명은 결과에 포함되지 않는다.

{%->} 는 공백들을 하나의 필드로 만들고 무시하게 된다.

%{+필드명}을 작성하면 여러 개의 필드를 하나의 필드로 합쳐서 표현한다. 이제 기존 port필드가 ip 필드에 합쳐진다.

 

결과

{
    "@timestamp" => 2023-06-22T07:05:25.162274Z,
      "@version" => "1",
     "timestamp" => "2023-06-21 17:31",
         "event" => {
        "original" => "[2023-06-21 17:31]    [ID2] 211.25.3.1 1010 [warn] - busy server."
    },
            "ip" => "211.25.3.1 1010",
            "id" => "ID2",
          "host" => {
        "name" => "ichan-yeong-ui-MacBookPro.local"
    },
           "log" => {
        "file" => {
            "path" => "/Users/leechanyoung/elasticsearch-8.8.1/logs/elasticsearch.log"
        }
    },
       "message" => "[2023-06-21 17:31]    [ID2] 211.25.3.1 1010 [warn] - busy server."
}
{
    "@timestamp" => 2023-06-22T07:05:25.160014Z,
      "@version" => "1",
     "timestamp" => "2023-06-21 17:21",
         "event" => {
        "original" => "[2023-06-21 17:21] [ID1] 192.10.2.6 9500 [INFO] - connected."
    },
            "ip" => "192.10.2.6 9500",
            "id" => "ID1",
          "host" => {
        "name" => "ichan-yeong-ui-MacBookPro.local"
    },
           "log" => {
        "file" => {
            "path" => "/Users/leechanyoung/elasticsearch-8.8.1/logs/elasticsearch.log"
        }
    },
       "message" => "[2023-06-21 17:21] [ID1] 192.10.2.6 9500 [INFO] - connected."
}

 

 

 

grok를 이용한 문자열 파싱

grok는 정규 표현식을 이용해 문자열을 파싱할 수 있다. grok는 자주 사용하는 정규 표현식들을 패턴화해놨으며 패턴을 이용해 %{패턴:필드명} 형태로 데이터에서 특정 필드를 파싱할 수 있다. 아래는 grok에서 지원하는 패턴들이다.

패턴명 설명
NUMBER 십진수를 인식. 부호화 소수점 포함가능
SPACE 스페이스, 탭 등 하나 이상의 공백 인식
URI URI를 인식
IP IP주소를 인식
SYSLOGBASE 시스로그의 일반적인 포맷에서 타임스탬프, 중요도, 호스트, 프로세스 정보까지 헤더 부분을 인식
TIMESTAMP_ISO8601 ISO8601 타입의 타임스탬프를 인식 ex) 2023-06-22T12:00+09:00
DATA 이 패턴의 직전 패턴부터 다음 패턴 사이를 모두 인식. 특별히 인식하고자 하는 값의 유형을 신경 쓸 필요가 없으므로 특별히 값이 검증될 필요가 없을 때 주로 사용된다.
GREEDYDATA DATA타입괴 동일하나, 표현식의 가장 뒤에 위치시킬 경우 해당 위치부터 이벤트의 끝까지를 값으로 인식한다.

 

필터 설정을 아래와 같이 바꿔보자.

filter {
  grok {
    match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] [ ]*\[%{DATA:id}\] %{IP:ip} %{NUMBER:port:int} \[%{LOGLEVEL:level}\] \- %{DATA:msg}\."}
  }
}

 

그리고 elasticsearch.log 파일은 아래와 같다.

 

 

 

결과는 아래와 같다.

{
      "@version" => "1",
          "tags" => [
        [0] "_grokparsefailure"
    ],
           "log" => {
        "file" => {
            "path" => "/Users/leechanyoung/elasticsearch-8.8.1/logs/elasticsearch.log"
        }
    },
    "@timestamp" => 2023-06-23T06:04:29.487083Z,
         "event" => {
        "original" => "[023/06/21 17:31]    [ID2] 211.25.3.1 1010 [warn] - busy server."
    },
          "host" => {
        "name" => "ichan-yeong-ui-MacBookPro.local"
    },
       "message" => "[023/06/21 17:31]    [ID2] 211.25.3.1 1010 [warn] - busy server."
}
{
           "log" => {
        "file" => {
            "path" => "/Users/leechanyoung/elasticsearch-8.8.1/logs/elasticsearch.log"
        }
    },
           "msg" => "connected",
     "timestamp" => "2023-06-21 17:21",
            "id" => "ID1",
      "@version" => "1",
          "port" => 9500,
    "@timestamp" => 2023-06-23T06:04:29.486142Z,
         "event" => {
        "original" => "[2023-06-21 17:21] [ID1] 192.10.2.6 9500 [INFO] - connected."
    },
            "ip" => "192.10.2.6",
          "host" => {
        "name" => "ichan-yeong-ui-MacBookPro.local"
    },
         "level" => "INFO",
       "message" => "[2023-06-21 17:21] [ID1] 192.10.2.6 9500 [INFO] - connected."
}

grok는 기본적으로 %{패턴명:변수명} 형태로 작성하면 된다.

TIMESTAMP_ISO8601 : ISO8601 표준 시간 표기법에 대한 패턴. 

DATA : 모든 데이터 인식

IP : IPv4 형태의 데이터를 인식

NUMBER : 숫자를 인식하는데, 변수명 뒤에 :int를 추가하면 변경 시 정수 타입으로 지정한다. 특별한 값을 넣지 않으면 모든 데이터가 문자타입으로 인식된다.

LOGLEVEL : 시스로그 레벨을 인식한다.

[ , ] , - , . 과 같은 기호는 역슬래시를 붙여 이스케이프할 수 있다.

[  ]* : 모든 공백을 허용한다. [timestamp]와 [id] 사이에는 공백이 한칸인 경우도 있고, 세칸인 경우도 있는데 이를 모두 허용하는 것이다.

 

그런데 결과를 보면 ID2의 로그는 날짜의 포맷이 달라 오류가 발생한 것을 볼 수 있다. 

날짜/시간 포맷은 로그를 만드는 사용자가 너무 다양한 형태로 포맷을 만들어 사용하기 때문에 데이터를 수집하는 쪽에서 반드시 포맷을 통일시켜줘야 한다. 현재 grok의 패턴으로는 이를 만족시킬 수 없기 때문에 새로운 패턴을 만들어 볼 것이다.

 

사용자가 패턴을 지정하기 위해 pattern_definitions 옵션을 추가하고 원하는 형식의 정규 표현식을 작성하면 된다.

...

filter {
  grok {
    pattern_definitions => { "MY_TIMESTAMP" => "%{YEAR}[/-]%{MONTHNUM}[/-]%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?" }
    match => { "message" => "%{MY_TIMESTAMP:timestamp} * \[%{DATA:id}\] %{IP:ip} %{NUMBER:port:int} \[%{LOGLEVEL:level}\] \- %{DATA:msg}\."}
  }
}

...

이제 연-월-일 형태만 지원하는 것이 아니라, 연/월/일 형태 역시 지원하도록 수정되었다.

 

 

대소문자 변경하기

mutate 플러그인의 uppercase 옵션을 추가해서 소문자를 대문자로 변경해보자.

...

filter {
  dissect {
    mapping => {"message" => "[%{?timestamp}]%{?->}[%{?id}] %{?ip} %{?port} [%{level}] - %{?msg}."}
  }
  mutate {
    uppercase => ["level"]
  }
}

...

 

결과를 보면 INFO는 원래 대문자였지만, warn은 소문자였는데 대문자로 변경된 것을 볼 수 있다.

{
           "log" => {
        "file" => {
            "path" => "/Users/leechanyoung/elasticsearch-8.8.1/logs/elasticsearch.log"
        }
    },
         "level" => "WARN",
    "@timestamp" => 2023-06-27T06:40:17.716575Z,
         "event" => {
        "original" => "[2023/06/21 14:18:23]   [ID2] 218.25.32.70 1010 [warn] - busy server."
    },
      "@version" => "1",
          "host" => {
        "name" => "ichan-yeong-ui-MacBookPro.local"
    },
       "message" => "[2023/06/21 14:18:23]   [ID2] 218.25.32.70 1010 [warn] - busy server."
}
{
           "log" => {
        "file" => {
            "path" => "/Users/leechanyoung/elasticsearch-8.8.1/logs/elasticsearch.log"
        }
    },
         "level" => "INFO",
    "@timestamp" => 2023-06-27T06:40:17.710439Z,
         "event" => {
        "original" => "[2023-06-21 14:17] [ID1] 192.10.2.6 9500 [INFO] - connected."
    },
      "@version" => "1",
          "host" => {
        "name" => "ichan-yeong-ui-MacBookPro.local"
    },
       "message" => "[2023-06-21 14:17] [ID1] 192.10.2.6 9500 [INFO] - connected."
}

 

 

날자/시간 문자열 분석

이벤트 발생 날짜/시간 정보는 모니터링에 매우 중요한데, 날짜/시간은 매우 다양하게 표현이 가능하다. 이렇게 다양한 포맷을 date 플러그인을 사용하면 날짜/시간 포맷으로 인덱싱할 수 있다. 엘라스틱 서치의 경우 ISO8601 표준 포맷을 기본으로 사용한다.

...

filter {
  dissect {
    mapping => {"message" => "[%{timestamp}]%{?->}[%{?id}] %{?ip} %{?port} [%{?level}] - %{?msg}."}
  }
  mutate {
    strip => "timestamp"
  }
  date {
    match => [ "timestamp", "YYYY-MM-dd HH:mm", "yyyy/MM/dd HH:mm:ss" ]
    target => "new_timestamp"
    timezone => "UTC"
  }
}

...

위 filter를 보면 먼저 dissect - mapping 플러그인을 사용하여 문자열을 하나씩 자른다. 그런데 timestamp 필드는 제외하고 다른 필드들은 모두 무시한다. 

다음으로 mutate - strip 은 선택한 필드 양옆의 공백을 제거한다.

마지막으로 date 플러그인은 match로 "YYYY-MM-dd HH:mm", "yyyy/MM/dd HH:mm:ss" 둘에 매칭된다면 target에 설정해둔 new_timestamp로 필드를 만들고, 설정한 타임존에 따른 날짜/포맷으로 데이터를 수정한다.

 

결과

{
         "@version" => "1",
              "log" => {
        "file" => {
            "path" => "/Users/leechanyoung/elasticsearch-8.8.1/logs/elasticsearch.log"
        }
    },
        "timestamp" => "2023/06/21 14:18:23",
       "@timestamp" => 2023-06-27T06:47:55.191670Z,
            "event" => {
        "original" => "[2023/06/21 14:18:23]   [ID2] 218.25.32.70 1010 [warn] - busy server."
    },
             "host" => {
        "name" => "ichan-yeong-ui-MacBookPro.local"
    },
    "new_timestamp" => 2023-06-21T14:18:23.000Z,
          "message" => "[2023/06/21 14:18:23]   [ID2] 218.25.32.70 1010 [warn] - busy server."
}
{
         "@version" => "1",
              "log" => {
        "file" => {
            "path" => "/Users/leechanyoung/elasticsearch-8.8.1/logs/elasticsearch.log"
        }
    },
        "timestamp" => "2023-06-21 14:17",
       "@timestamp" => 2023-06-27T06:47:55.193104Z,
            "event" => {
        "original" => "[2023-06-21 14:17] [ID1] 192.10.2.6 9500 [INFO] - connected."
    },
             "host" => {
        "name" => "ichan-yeong-ui-MacBookPro.local"
    },
    "new_timestamp" => 2023-06-21T14:17:00.000Z,
          "message" => "[2023-06-21 14:17] [ID1] 192.10.2.6 9500 [INFO] - connected."
}

 

 

조건문

로그 스태시는 if문을 통해서 이벤트 마다 적절한 필터를 적용할 수 있다. 아래와 같이 필터를 수정해보자.

...

filter {
  dissect {
    mapping => {"message" => "[%{timestamp}]%{?->}[%{id}] %{ip} %{port} [%{level}] - %{msg}."}
  }
  if [level] == "INFO" {
    drop { }
  }
  else if [level] == "warn" {
    mutate {
      remove_field => [ "ip", "port", "timestamp", "level" ]
    }
  }
}

...

위 필터를 보면 먼저 dissect - mapping을 사용하여 문자열을 하나씩 자른다. 그 뒤부터 조건문 if가 나오는데, level 필드가 "INFO"라면 drop을 통해 데이터를 삭제한다.

또한 else if에서 level 필드가 "warn"이라면 특정 필드들을 삭제한다. 결과는 아래와 같다.

{
           "log" => {
        "file" => {
            "path" => "/Users/leechanyoung/elasticsearch-8.8.1/logs/elasticsearch.log"
        }
    },
      "@version" => "1",
            "id" => "ID2",
    "@timestamp" => 2023-06-27T06:58:32.706167Z,
           "msg" => "busy server",
          "host" => {
        "name" => "ichan-yeong-ui-MacBookPro.local"
    },
         "event" => {
        "original" => "[2023/06/21 14:18:23]   [ID2] 218.25.32.70 1010 [warn] - busy server."
    },
       "message" => "[2023/06/21 14:18:23]   [ID2] 218.25.32.70 1010 [warn] - busy server."
}

결과를 보면 INFO인 로그는 안나온 것을 볼 수 있다.

또한 warn 로그의 ip, port, timestamp, level 필드는 사라진 것을 볼 수 있다.

 

 

 

 

참고: 엘라스틱 스택 개발부터 운영까지 - 김준영, 정상운 저

'ELK > logstash' 카테고리의 다른 글

모니터링  (0) 2023.07.03
ELK - Logstash 1 : 로그스태시 output  (0) 2023.06.28
ELK - Logstash 1 : 로그스태시란  (0) 2023.06.21