2025. 2. 14. 23:31ㆍDevops/ELK
해당 기능은 pipeline을 통하여 logstash 설정을 좀더 간소화 하기 위해 쓰는 기능이라고(?) 볼수있다.
내가 프로젝트에 들어갔을때 많은 Index를 사용했고.. Index에 로그를 분기처리 하기위해 logstash.conf 내용이 엄청 길었떤적이있다.. 그부분을 분기 시키기위해서는 필수로 해야한다고 본다.
0. 참고
해당 부분을 할때 conf를 다양하게 만들어 적용했지만 잘되지 않았다..
할때 input 입력쪽은 config.string을 사용하여 진행하고 나머지 filter나 output은 conf를 작성해도 일단은 작동하는것으로 보인다.
또한 cycles에 피해야할 부분을 문서에 정의해놨는데 항상 한방향으로 흐르게끔 해야한다고 한다.. (말은 쉽지..)
설정은 간단하지만 아키텍쳐 패턴을 사용하는경우 조금 복잡해진다.. 간단설정과 아키텍쳐 패턴을 한번 살펴보자
1. 설정
해당 설정은 config/pipelines.yml을 통해 진행한다.
# config/pipelines.yml
- pipeline.id: upstream
config.string: input { stdin {} } output { pipeline { send_to => [myVirtualAddress] } }
- pipeline.id: downstream
config.string: input { pipeline { address => myVirtualAddress } }
id는 한마디로 logstash에서 id별로 구분한다. upstream에서 받은 stdin 즉 입력값을 pipeline 명 myVitualAddress로 보내고 downstream에서는 upstream에서 받은 값을 downstream 즉 myVirtualAddress로 처리한다고 보면 된다.
사실 이게 끝이다.. 근데 패턴도 있으니 한번 보고 참고하면 좋을듯합니다.
2. 패턴
2-1. distributor pattern
# config/pipelines.yml
- pipeline.id: beats-server
config.string: |
input { beats { port => 5044 } }
output {
if [type] == apache {
pipeline { send_to => weblogs }
} else if [type] == system {
pipeline { send_to => syslog }
} else {
pipeline { send_to => fallback }
}
}
- pipeline.id: weblog-processing
config.string: |
input { pipeline { address => weblogs } }
filter {
# Weblog filter statements here...
}
output {
elasticsearch { hosts => [es_cluster_a_host] }
}
- pipeline.id: syslog-processing
config.string: |
input { pipeline { address => syslog } }
filter {
# Syslog filter statements here...
}
output {
elasticsearch { hosts => [es_cluster_b_host] }
}
- pipeline.id: fallback-processing
config.string: |
input { pipeline { address => fallback } }
output { elasticsearch { hosts => [es_cluster_b_host] } }
pipeline.id: beat-server에서 input을 받아 [type]별로 아래 pipeline으로 보내는 패턴이다. 간단한 패턴이기에 데이터흐름을 쉽게 파악할 수 있다.
2-2.The output isolator pattern
해당 패턴은 일시적 오류가 발생할 경우 Logstash가 문제가 생기는 현상을 방지할수 있는 패턴이라고 한다..
queue.type: persisted 사용하는것으로 보아 디스크에 저장하는거 같다. (해당 옵션은 memory, persisted,persisted with memory 3가지가 있습니다.)
# config/pipelines.yml
- pipeline.id: intake
config.string: |
input { beats { port => 5044 } }
output { pipeline { send_to => [es, http] } }
- pipeline.id: buffered-es
queue.type: persisted
config.string: |
input { pipeline { address => es } }
output { elasticsearch { } }
- pipeline.id: buffered-http
queue.type: persisted
config.string: |
input { pipeline { address => http } }
output { http { } }
2-3. The forked path Pattern
해당 페턴은 위의 패턴의 추가적인 부분인데 두가지 시스템에 쓰기를 해야할 경우 사용하는 패턴이다. 한마디로 elastic에도 저장해야하고 s3에도 저장해야하는 경우 사용하는 패턴이다.
# config/pipelines.yml
- pipeline.id: intake
queue.type: persisted
config.string: |
input { beats { port => 5044 } }
output { pipeline { send_to => ["internal-es", "partner-s3"] } }
- pipeline.id: buffered-es
queue.type: persisted
config.string: |
input { pipeline { address => "internal-es" } }
# Index the full event
output { elasticsearch { } }
- pipeline.id: partner
queue.type: persisted
config.string: |
input { pipeline { address => "partner-s3" } }
filter {
# Remove the sensitive data
mutate { remove_field => 'sensitive-data' }
}
output { s3 { } } # Output to partner's bucket
2-4. The collector Pattern
이 패턴은 1번패턴인 distributor pattern과 반대적인 패턴입니다. 들어오는곳은 여러곳이지만 처리는 한곳에서 처리하는 패턴입니다.
# config/pipelines.yml
- pipeline.id: beats
config.string: |
input { beats { port => 5044 } }
output { pipeline { send_to => [commonOut] } }
- pipeline.id: kafka
config.string: |
input { kafka { ... } }
output { pipeline { send_to => [commonOut] } }
- pipeline.id: partner
# This common pipeline enforces the same logic whether data comes from Kafka or Beats
config.string: |
input { pipeline { address => commonOut } }
filter {
# Always remove sensitive data from all input sources
mutate { remove_field => 'sensitive-data' }
}
output { elasticsearch { } }
'Devops > ELK' 카테고리의 다른 글
| [Filebeat] 전처리(processor)를 통한 ansi 코드 삭제 (1) | 2025.02.14 |
|---|---|
| [ElasticSearch] Index에 Date math Name Alias Rollover 사용해보기 (0) | 2025.02.13 |