4. logsatsh 설정 정보 1) logstash.yml http.host: 0.0.0.0 node.name: logstash 2) logsatsh.conf logstash 설정 소스를 작성 input { kafka { bootstrap_servers => "118.220.143.172:9091,118.220.143.173:9091,118.220.143.173:9092" topics => ["sacp1","sacp2","sacp3","sacp4","sacp5","sacp6","sacp7", "sacp8", "asde"] consumer_threads => 3 # consumer_threads 토픽에서 데이터를 읽을 때 사용하는 컨슈머 스레드 수를 설정하는 옵션 메모리 양이 충분하지 않으면 부하 문제가 발생 일반적으로 1-4개의 컨슈머 스레드를 사용 decorate_events => true #filter 부분에 필요한 정보가 있는 경우 } #파일 부분은 더 확인이 필요 file { path => "/home/gmt/ASDE/logs/sendlog/*.log" type => "logs" start_position => "beginning" } #beats { #port => 5044 #} tcp { port => 50000 } } filter { if [type] == "logs" { grok { match => { "path" => "/home/gmt/ASDE/logs/sendlog/%{GREEDYDATA:filename}\.log" } } mutate { add_field => { "file_name" => "%{filename}" } } } else { #데이터 직렬화 부분 수정 필요 (json형식으로 변환 후 ruby 활용해 데이터 변환 후 삭제 처리 형식) json { source => "message" } ruby { code => " event.set('newfield', event.get('trgt_id')+','+event.get('cat_ty')+',') " } mutate { remove_field => [ "trgt_id", "cat_ty", "@version", "@timestamp", "host", "path", "message" ] } } } output { elasticsearch { if "file_name" in [fields] { index => "spring-%{[file_name]}-2023-03-06" } else { index => "spring-%{[@metadata][kafka][topic]}-2023-03-06" } user => "elastic" password => "changeme" hosts => ["118.220.143.172:9200","118.220.143.173:9201","118.220.143.173:9202"] } }