main
MonHun 2024-07-14 12:39:52 +09:00
parent 8d592a310c
commit 716f22c7f5
13 changed files with 283 additions and 26 deletions

View File

@ -162,8 +162,9 @@ public class AiLocationParseRunnable implements Runnable {
sb.append(t.getDrctn()); sb.append(",");
sb.append(t.getArcrftStnd()); sb.append(",");
sb.append(t.getSpd());
sb.append(t.getSpd()); sb.append(",");
sb.append(t.getArcrft_brng());
i++;
histroyQueue.add(t);
}

View File

@ -167,7 +167,15 @@ public class AiVoiceMetaRunnable implements Runnable {
// 1. 포멧 변경
// 2. UTC -> KST
SimpleDateFormat recvDTFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
// Date 필드 포멧(LTE-A 때문에 임시 분기처리)
String strDateFormat = "";
if(swclassification.equals("6")){
strDateFormat = "yyyy-MM-dd HH:mm:ss.SSS";
}else{
strDateFormat = "yyyyMMddHHmmssSSS";
}
SimpleDateFormat recvDTFormat = new SimpleDateFormat(strDateFormat);
Date cnvDate = null;
cnvDate = recvDTFormat.parse(dateRecv);
@ -175,7 +183,14 @@ public class AiVoiceMetaRunnable implements Runnable {
Calendar cal = Calendar.getInstance();
cal.setTime(cnvDate);
cal.add(Calendar.HOUR_OF_DAY, 9);
// UTC -> KST (LTE-A 때문에 임시 분기처리)
if(swclassification.equals("6")){
cal.add(Calendar.HOUR_OF_DAY, 9);
}else{
//cal.add(Calendar.HOUR_OF_DAY, 0);
}
cnvDate = new Date(cal.getTimeInMillis());
dateRecv = dateFormat.format(cnvDate);

View File

@ -108,7 +108,8 @@ public class AsdeParseRunnable implements Runnable {
sb.append(swclassification); sb.append(","); //sw구분
sb.append(classification); sb.append("|"); //서버구분
Date dateRecv = new Date();
// Date asdeRecvDt = new Date();
String dateRecv = "";
int i = 0;
for (LinkedHashMap<String, Object> map : list) {
@ -139,9 +140,30 @@ public class AsdeParseRunnable implements Runnable {
// logger.info("logstash 포맷 변환 건수[" +swclassification + "] {}", histroyQueue.size());
String strRecvDt = dateFormat.format(dateRecv);
//String strRecvDt = dateFormat.format(dateRecv);
stsVO.setIntrfc_dt(strRecvDt);
if(dateRecv == null || dateRecv.equals("")){
dateRecv = dateFormat.format(new Date());
}else{
// 1. 포멧 변경
// 2. UTC -> KST
SimpleDateFormat recvDTFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
Date cnvDate = null;
cnvDate = recvDTFormat.parse(dateRecv);
Calendar cal = Calendar.getInstance();
cal.setTime(cnvDate);
cal.add(Calendar.HOUR_OF_DAY, 9);
cnvDate = new Date(cal.getTimeInMillis());
dateRecv = dateFormat.format(cnvDate);
}
stsVO.setIntrfc_dt(dateRecv);
stsVO.setIntrfc_id(swclassification);
stsVO.setIntrfc_nm("ASDE");
stsVO.setRecv_co(1);

View File

@ -0,0 +1,152 @@
package kr.gmtc.tss.elkdata.runnable;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import kr.gmtc.tss.elkdata.vo.CctvPtzManualVO;
public class CctvPtzManualParseRunnable implements Runnable {
private Logger logger = LoggerFactory.getLogger(this.getClass());
Queue<String> rcvQueue;
Queue<String> dataFileQueue;
private boolean isRunning = false;
private int listChunkSize = 100;
private long sleepMillis;
private String classification;
private String swclassification;
private String logclassification;
public CctvPtzManualParseRunnable(Queue<String> rcvQueue
, Queue<String> dataFileQueue
, int listChunkSize
, long sleepMillis
, String classification, String swclassification, String logclassification) {
this.rcvQueue = rcvQueue;
this.dataFileQueue = dataFileQueue;
this.listChunkSize = listChunkSize;
this.sleepMillis = sleepMillis;
this.classification = classification;
this.swclassification = swclassification;
this.logclassification = logclassification;
}
@Override
public void run() {
isRunning = true;
while (isRunning) {
try {
int loopCnt = 0;
if(rcvQueue.size()>listChunkSize) {
loopCnt = listChunkSize;
}else {
loopCnt = rcvQueue.size();
}
String [] chunkArr = new String[loopCnt];
for (int i = 0; i < loopCnt; i++) {
chunkArr[i] = rcvQueue.poll();
}
for (int i = 0; i < chunkArr.length; i++) {
parseAndEnqueue(chunkArr[i]);
}
} catch (Exception e) {
if (e.getMessage() != null) {
logger.debug("[ParseTarget] Parsing Error Message : " ,e);
} else {
logger.info("[ParserThread] Parsing Error Call Stack" ,e);
}
}
sleep(sleepMillis);
}
}
private void parseAndEnqueue(String result) {
try {
ArrayList<CctvPtzManualVO> histroyQueue = new ArrayList<CctvPtzManualVO>();
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> convert = mapper.readValue(result, new TypeReference<Map<String, Object>>() {});
@SuppressWarnings("unchecked")
ArrayList<LinkedHashMap<String, Object>> list = (ArrayList<LinkedHashMap<String, Object>>) convert.get("data");
logger.info("수신 데이터 변환 건수[" +swclassification + "] {}", list.size());
DateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss.SSS");
String dateTime = dateFormat.format(new Date());
StringBuilder sb = new StringBuilder();
sb.append(dateTime); sb.append(",");
sb.append(logclassification); sb.append(","); //log구분
sb.append(swclassification); sb.append(","); //sw구분
sb.append(classification); sb.append("|"); //서버구분
int i = 0;
for (LinkedHashMap<String, Object> map : list) {
CctvPtzManualVO cnvVO = mapper.convertValue(map, CctvPtzManualVO.class);
if (i > 0) sb.append("^");
sb.append(cnvVO.getCam_id()); sb.append(",");
sb.append(cnvVO.getMove()); sb.append(",");
sb.append(cnvVO.getUser_id()); sb.append(",");
sb.append(cnvVO.getSys_id()); sb.append(",");
sb.append(cnvVO.getCtrl_dt());
histroyQueue.add(cnvVO);
i++;
}
// logger.info("logstash 포맷 변환 건수[" +swclassification + "] {}", histroyQueue.size());
dataFileQueue.add(sb.toString());
histroyQueue.clear();
} catch (Exception e) {
logger.error("[ParserThread] Parsing Error Message : " + e.getMessage());
}
}
// public synchronized CountStatus getCountAndSetZero() {
// CountStatus countStatus = new CountStatus(rcvPos,rcvStatic);
// rcvPos = 0;
// rcvStatic = 0;
// return countStatus;
// }
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
logger.debug("[ParseTargetRunnable] sleep error : "+e.getLocalizedMessage());
}
}
public void setRunning(boolean isRunning) {
this.isRunning = isRunning;
}
}

View File

@ -91,7 +91,7 @@ public class CctvPtzParseRunnable implements Runnable {
@SuppressWarnings("unchecked")
ArrayList<LinkedHashMap<String, Object>> list = (ArrayList<LinkedHashMap<String, Object>>) convert.get("data");
logger.info("수신 데이터 변환 건수[" +swclassification + "] {}", list.size());
// logger.info("수신 데이터 변환 건수[" +swclassification + "] {}", list.size());
DateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss.SSS");
String dateTime = dateFormat.format(new Date());
@ -119,7 +119,7 @@ public class CctvPtzParseRunnable implements Runnable {
i++;
}
logger.info("logstash 포맷 변환 건수[" +swclassification + "] {}", histroyQueue.size());
// logger.info("logstash 포맷 변환 건수[" +swclassification + "] {}", histroyQueue.size());
dataFileQueue.add(sb.toString());

View File

@ -12,6 +12,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import kr.gmtc.tss.elkdata.vo.TrackVO;
@ -85,6 +86,8 @@ public class FusionParseRunnable implements Runnable {
try {
ArrayList<TrackVO> histroyQueue = new ArrayList<TrackVO>();
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Map<String, Object> convert = mapper.readValue(result, new TypeReference<Map<String, Object>>() {});
@ -132,14 +135,14 @@ public class FusionParseRunnable implements Runnable {
sb.append(",");
sb.append(t.getAlt()); sb.append(",");
sb.append(t.getSchdulId()); sb.append(",");
sb.append(t.getIsCntrlzone());
sb.append(t.getIsCntrlzone());
histroyQueue.add(t);
i++;
}
logger.info("logstash 포맷 변환 건수[" +swclassification + "] {}", histroyQueue.size());
// logger.info("logstash 포맷 변환 건수[" +swclassification + "] {}", histroyQueue.size());
dataFileQueue.add(sb.toString());

View File

@ -31,8 +31,8 @@ public class AsdeTrackVO implements Comparable<AsdeTrackVO> {
private String tailNo; //
@JsonProperty("track_no")
private int trackNo; // 추가 2023.04.07
@JsonProperty("recptn_dt") @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss", timezone = "Asia/Seoul")
private Date recptnDt; //
@JsonProperty("recptn_dt")
private String recptnDt; //
@JsonProperty("cat_ty")
private double catTy; //
private double lat; //
@ -118,11 +118,11 @@ public class AsdeTrackVO implements Comparable<AsdeTrackVO> {
this.modeScd = modeScd;
}
public Date getRecptnDt() {
public String getRecptnDt() {
return recptnDt;
}
public void setRecptnDt(Date recptnDt) {
public void setRecptnDt(String recptnDt) {
this.recptnDt = recptnDt;
}

View File

@ -0,0 +1,49 @@
package kr.gmtc.tss.elkdata.vo;
import com.fasterxml.jackson.annotation.JsonProperty;
public class CctvPtzManualVO {
@JsonProperty("cam_id")
private String cam_id;
@JsonProperty("move")
private String move;
@JsonProperty("user_id")
private String user_id;
@JsonProperty("sys_id")
private String sys_id;
@JsonProperty("ctrl_dt")
private String ctrl_dt;
public String getCam_id() {
return cam_id;
}
public void setCam_id(String cam_id) {
this.cam_id = cam_id;
}
public String getMove() {
return move;
}
public void setMove(String move) {
this.move = move;
}
public String getUser_id() {
return user_id;
}
public void setUser_id(String user_id) {
this.user_id = user_id;
}
public String getSys_id() {
return sys_id;
}
public void setSys_id(String sys_id) {
this.sys_id = sys_id;
}
public String getCtrl_dt() {
return ctrl_dt;
}
public void setCtrl_dt(String ctrl_dt) {
this.ctrl_dt = ctrl_dt;
}
}

View File

@ -72,6 +72,9 @@ public class CctvTrackVO implements Comparable<CctvTrackVO> {
@JsonProperty("prc_tm")
private String prcTm;
@JsonProperty("arcrft_brng")
private String arcrft_brng;
@Override

View File

@ -49,6 +49,8 @@ public class TrackVO implements Comparable<TrackVO> {
private String correctedLongitude;
@JsonProperty("is_cntrlzone")
private String isCntrlzone;
@JsonProperty("inout_ty")
private String inout_ty;
private List<BoundingBox> bbox;
@ -334,5 +336,13 @@ public class TrackVO implements Comparable<TrackVO> {
this.py = py;
}
}
public String getInout_ty() {
return inout_ty;
}
public void setInout_ty(String inout_ty) {
this.inout_ty = inout_ty;
}
}

View File

@ -43,7 +43,7 @@ public class KafkaTopicReader implements Runnable {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.count() > 0) logger.info("topic {} 수신 건수 {}", topic, records.count());
// if (records.count() > 0) logger.info("topic {} 수신 건수 {}", topic, records.count());
for (ConsumerRecord<String, String> record : records) {
rcvQueue.add(record.value());

View File

@ -23,6 +23,7 @@ import kr.gmtc.tss.config.TssConfig;
import kr.gmtc.tss.elkdata.runnable.AiLocationParseRunnable;
import kr.gmtc.tss.elkdata.runnable.AiVoiceMetaRunnable;
import kr.gmtc.tss.elkdata.runnable.AsdeParseRunnable;
import kr.gmtc.tss.elkdata.runnable.CctvPtzManualParseRunnable;
import kr.gmtc.tss.elkdata.runnable.CctvPtzParseRunnable;
import kr.gmtc.tss.elkdata.runnable.FusionParseRunnable;
import kr.gmtc.tss.elkdata.runnable.RadarParseRunnable;
@ -72,10 +73,7 @@ public class MainServer implements InitializingBean {
// logger.info("flagFile : "+cfg.flagFile);
// 최초 한번만 실행 - 로그파일이 없을 시 생성
String[] arrSW = cfg.swclassification;
for(String swIdx : arrSW){
DATA_FILE_NAME = fileUtil.createLogFile(cfg.dataZipPath +"/"+ swIdx , cfg.dataZipBakPath +"/"+ swIdx, cfg.dataMakeTime); // logstash data file log
}
DATA_FILE_NAME = fileUtil.createLogFile(cfg.dataZipPath, cfg.dataZipBakPath, cfg.dataMakeTime);
}
@PreDestroy
@ -163,10 +161,7 @@ public class MainServer implements InitializingBean {
@Scheduled(cron = "${process.data-make-cron}")
public void dataFileMake() throws Exception{
String[] arrSW = cfg.swclassification;
for(String swIdx : arrSW){
DATA_FILE_NAME = fileUtil.createLogFile(cfg.dataZipPath +"/"+ swIdx , cfg.dataZipBakPath +"/"+ swIdx, cfg.dataMakeTime);
}
DATA_FILE_NAME = fileUtil.createLogFile(cfg.dataZipPath, cfg.dataZipBakPath, cfg.dataMakeTime);
}
@Scheduled(cron = "${process.data-delete-cron}")
@ -288,6 +283,13 @@ public class MainServer implements InitializingBean {
cfg.classification, swIdx, cfg.logclassification );
thread = new Thread(aiVoiceParseRunnable);
}
else if(swIdx.equals("12")){
CctvPtzManualParseRunnable cctvPtzManualParseRunnable = new CctvPtzManualParseRunnable(rcvQueue, dataFileQue,
cfg.queueChunkSize, cfg.parseTargetSleepMs,
cfg.classification, swIdx, cfg.logclassification );
thread = new Thread(cctvPtzManualParseRunnable);
}
if(thread != null){
thread.setDaemon(true);

View File

@ -65,7 +65,7 @@ kafka:
12: ic.tracking.ptz.manual # IC0502 #추적용CCTV제어정보저장S/W
classification: 1 #이중화되어 있는 서버의 Index
swclassification: 1 #sw구분
swclassification: 2 #sw구분
logclassification: 1 #log구분
state:
@ -74,4 +74,4 @@ state:
# 1:Primary, 2:Secondary
type: Primary
# DisplayLog: true
sendlog: true
# sendlog: true