main
MonHun 2024-06-14 17:31:59 +09:00
parent 75c63b147b
commit dfeb99b2de
22 changed files with 443 additions and 1412 deletions

View File

@ -14,7 +14,7 @@
<name>TSS</name> <name>TSS</name>
<description>TSS Integrate</description> <description>TSS Integrate</description>
<properties> <properties>
<java.version>18</java.version> <java.version>1.8</java.version>
<!-- versions --> <!-- versions -->
<gmtc.version>0.0.1-SNAPSHOT</gmtc.version> <gmtc.version>0.0.1-SNAPSHOT</gmtc.version>
<maven.test.skip>true</maven.test.skip> <maven.test.skip>true</maven.test.skip>

View File

@ -0,0 +1,20 @@
package kr.gmtc.tss.config;
import java.util.HashMap;
import java.util.Map;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import lombok.Getter;
import lombok.Setter;
@Configuration
@ConfigurationProperties("kafka")
@Getter
@Setter
public class TopicsList {
private Map<String, String> topics = new HashMap<String, String>();
}

View File

@ -1,12 +1,16 @@
package kr.gmtc.tss.config; package kr.gmtc.tss.config;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue; import java.util.Queue;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import kr.gmtc.tss.data.vo.CctvPtzVO; import kr.gmtc.tss.elkdata.vo.CctvPtzVO;
import kr.gmtc.tss.util.ArrayBlockingLoggingQueue; import kr.gmtc.tss.util.ArrayBlockingLoggingQueue;
@Configuration("TssConfig") @Configuration("TssConfig")
@ -15,9 +19,6 @@ public class TssConfig {
@Value("${kafka.bootstrapServers}") @Value("${kafka.bootstrapServers}")
public String bootstrapServers; public String bootstrapServers;
@Value("${kafka.topic.9}")
public String topic;
@Value("${kafka.groupId}") @Value("${kafka.groupId}")
public String consumeGroupId; public String consumeGroupId;
@ -39,14 +40,14 @@ public class TssConfig {
@Value("${data-make-time}") @Value("${data-make-time}")
public int dataMakeTime; public int dataMakeTime;
@Value("${flag.database}") // @Value("${flag.database}")
public boolean flagDataBase; // public boolean flagDataBase;
@Value("${flag.datafile}") @Value("${flag.datafile}")
public boolean flagDataFile; public boolean flagDataFile;
@Value("${flag.file}") // @Value("${flag.file}")
public boolean flagFile; // public boolean flagFile;
@Value("${client.queue-count:10000}") @Value("${client.queue-count:10000}")
public int queueCount=100000; public int queueCount=100000;
@ -73,7 +74,7 @@ public class TssConfig {
public String classification; public String classification;
@Value("${swclassification}") @Value("${swclassification}")
public String swclassification; public String[] swclassification;
@Value("${logclassification}") @Value("${logclassification}")
public String logclassification; public String logclassification;
@ -93,9 +94,17 @@ public class TssConfig {
@Bean(name = "rcvQueue") @Bean(name = "rcvQueue")
public Queue<String> getRcvQueue() { public Map<String, Queue<String>> getRcvQueueMap() {
this.rcvQueue = new ArrayBlockingLoggingQueue<>(queueCount);
return rcvQueue; Map<String, Queue<String>> queList = new HashMap<String, Queue<String>>();
for(String swIdx : swclassification){
Queue<String> rcvQueue = new ArrayBlockingLoggingQueue<>(queueCount);
queList.put(swIdx, rcvQueue);
}
return queList;
} }
@Bean(name = "messageBodyQueue") @Bean(name = "messageBodyQueue")

View File

@ -1,86 +0,0 @@
package kr.gmtc.tss.data;
import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kr.gmtc.tss.data.vo.TrackVO;
import kr.gmtc.tss.db.FusionBatchManager;
public class DataInsertRunnable implements Runnable {
private Logger logger = LoggerFactory.getLogger(this.getClass());
Queue<TrackVO> trackQueue;
Queue<TrackVO> trackLastQueue;
private boolean isRunning = false;
private int batchInsertSize = 1000;
FusionBatchManager batchInsertManager;
private long sleepMillis;
private String topic;
public DataInsertRunnable(Queue<TrackVO> trackQueue, Queue<TrackVO> trackLastQueue, int batchInsertSize, FusionBatchManager batchInsertManager, long sleepMillis, String topic) {
this.trackQueue=trackQueue;
this.trackLastQueue=trackLastQueue;
this.batchInsertSize=batchInsertSize;
this.batchInsertManager=batchInsertManager;
this.sleepMillis=sleepMillis;
this.topic=topic;
}
@Override
public void run() {
isRunning = true;
while (isRunning) {
try {
TrackVO [] chunkTrackArr = getChunkTrackArr(trackQueue);
// TrackVO [] chunkTrackLastArr = getChunkTrackArr(trackLastQueue);
if(isRunning) {
if(chunkTrackArr.length > 0) {
batchInsertManager.insertTrackData(chunkTrackArr,topic);
}
/*
* if(chunkTrackLastArr.length > 0) {
* batchInsertManager.insertTrackLastData(chunkTrackLastArr); }
*/
}
} catch (Exception e) {
e.printStackTrace();
logger.info("[DataInsertRunnable] Data Insert error : " ,e);
}
sleep(sleepMillis);
}
}
private TrackVO [] getChunkTrackArr(Queue<TrackVO> sqlQueue) {
int loopCnt=0;
if(sqlQueue.size()>batchInsertSize) {
loopCnt=batchInsertSize;
}else {
loopCnt=sqlQueue.size();
}
TrackVO [] chunkArr = new TrackVO[loopCnt];
for (int i = 0; i < loopCnt; i++) {
chunkArr[i]=sqlQueue.poll();
}
return chunkArr;
}
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
logger.debug("[TargetManager] sleep error : " +e.getLocalizedMessage());
}
}
public void setRunning(boolean isRunning) {
this.isRunning = isRunning;
}
}

View File

@ -1,36 +0,0 @@
package kr.gmtc.tss.db;
import javax.sql.DataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@Configuration
@EnableTransactionManagement
public class DbConfig {
@Bean(name = "dbDataSource")
@ConfigurationProperties(prefix = "spring.db.datasource")
public DataSource db5DataSource() {
return DataSourceBuilder.create().build();
}
@Bean(name="batchSqlSessionFactory")
public SqlSessionFactory batchSqlSessionFactoryBean(@Qualifier("dbDataSource") DataSource batchDataSource, ApplicationContext applicationContext)
throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(batchDataSource);
factoryBean.setConfigLocation(applicationContext.getResource("classpath:/mybatis-config.xml"));
return factoryBean.getObject();
}
}

View File

@ -1,553 +0,0 @@
package kr.gmtc.tss.db;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.Queue;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kr.gmtc.tss.data.vo.TrackVO;
import kr.gmtc.tss.main.MainServer;
import kr.gmtc.tss.status.CountStatus;
public class FusionBatchManager {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private SqlSessionFactory sqlSessionFactory;
private SqlSession session;
private Queue<TrackVO> sqlLogTrackQueue;
private Queue<TrackVO> sqlLogTrackLastQueue;
private int savePos=0;
private int saveLast=0;
private int saveStatic=0;
private int failPos=0;
private int failLast=0;
private int failStatic=0;
private String topic;
/**
* mybatis batch insert jdbc .
* @param sqlSessionFactory
* @param dbType
*/
public FusionBatchManager(SqlSessionFactory sqlSessionFactory, Queue<TrackVO> sqlLogTrackQueue,Queue<TrackVO> sqlLogTrackLastQueue) {
this.sqlSessionFactory = sqlSessionFactory;
this.sqlLogTrackQueue = sqlLogTrackQueue;
this.sqlLogTrackLastQueue = sqlLogTrackLastQueue;
// this.dbType=dbType;
openSession(false);
}
public void openSession(boolean forceOpen) {
if(session==null||forceOpen)
session = sqlSessionFactory.openSession();
}
public void closeSession() {
if(session!=null)
session.close();
}
public void insertTrackData(TrackVO[] setTrackVOArr, String topic) {
this.topic = topic;
insertTrackData(setTrackVOArr,true);
}
private void insertTrackData(TrackVO[] setTrackVOArr, boolean isBatch) {
int fusionFail = 0;
int fusionSave = 0;
int bboxFail = 0;
int bboxSave = 0;
if(setTrackVOArr==null || setTrackVOArr.length<1) {
// logger.error("insertDataAll - No tableList data");
logger.debug("insertTrackData(Batch:"+isBatch+") - no data in setTrackVOArr");
return;
}
// Table destTable=tableList.get(0);
// String sql = "INSERT INTO " + destTable.getName() + " (" + getInsertCols(destTable.getColumnList()) + ") "
// + "VALUES (" + getInsertValueParams(destTable.getColumnList()) + ")";
// String tableName = "SACP_CCTV_TRACK_" + setTrackVOArr[0].getRecptnDt().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
// SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd");
// String now_dt = format.format(setTrackVOArr[0].getRecptnDt());
// String tableName = "SACP_TRACK_" + now_dt;
String tableName = "SACP_TRACK_FUSION";
String sql = "INSERT INTO "+tableName+" (" +
" TRGT_ID, RECPTN_DT, LAT, LON, SPD, COS, TRGT_TY, REVISN_LAT, REVISN_LON, MODE_S_CD, SSR_CD, CLSGN, AC_REG_NO, REGISTER_ID) " +
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
String subTableName = "SACP_FUSION_TRACK_BBOX";
String subSql = "INSERT INTO "+subTableName+" (" +
" TRGT_ID, RECPTN_DT, CCTV_ID, BBOX_TOP_LEFT_X, BBOX_TOP_LEFT_Y, BBOX_BOTTOM_RIGHT_X, BBOX_BOTTOM_RIGHT_Y, CRDNT_X, CRDNT_Y, REGISTER_ID) " +
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
PreparedStatement ps=null;
PreparedStatement stmt=null;
Connection con=null;
try {
con=session.getConnection();
if(con.isClosed() || !con.isValid(1)) {
closeSession();
openSession(true);
con=session.getConnection();
}
con.setAutoCommit(false);
ps = con.prepareStatement(sql);
stmt = con.prepareStatement(subSql);
for (int i = 0; i < setTrackVOArr.length; i++) {
TrackVO vo = setTrackVOArr[i];
//입력으로 처리
//중복 입력 방지를 위함
vo.setInserted(true);
try {
ps.setString(1, vo.getTargetId());
ps.setString(2, vo.getReceptionDate());
ps.setString(3, vo.getLatitude());
ps.setString(4, vo.getLongitude());
ps.setString(5, vo.getSpeed());
ps.setString(6, vo.getCourse());
ps.setString(7, vo.getTargetType());
ps.setString(8, vo.getCorrectedLatitude());
ps.setString(9, vo.getCorrectedLongitude());
ps.setString(10, vo.getModeSCode());
ps.setString(11, vo.getSsrCode());
ps.setString(12, vo.getCallsign());
ps.setString(13, vo.getTailNumber());
ps.setString(14, "fusion");
///건별 입력
if(!isBatch) {
ps.execute();
savePos++;
fusionSave++;
}
try {
for (int j = 0; j < vo.getBbox().size(); j++) {
stmt.setString(1, vo.getTargetId());
stmt.setString(2, vo.getReceptionDate());
stmt.setString(3, vo.getBbox().get(j).getCctvId());
stmt.setString(4, vo.getBbox().get(j).getLeftTopX());
stmt.setString(5, vo.getBbox().get(j).getLeftTopY());
stmt.setString(6, vo.getBbox().get(j).getRightBottomX());
stmt.setString(7, vo.getBbox().get(j).getRightBottomY());
stmt.setString(8, vo.getBbox().get(j).getPx());
stmt.setString(9, vo.getBbox().get(j).getPy());
stmt.setString(10, "fusion");
///건별 입력
if(!isBatch) {
stmt.execute();
bboxSave++;
} else {
stmt.addBatch();
}
}
} catch(Exception ex) {
bboxFail++;
throw ex;
}
//배치 입력
if(isBatch) {
ps.addBatch();
}
} catch (Exception e) {
logger.debug("Data Insert error UseSqlLog");
sqlLogTrackQueue.add(vo);
failPos++;
fusionFail++;
}
}
if(isBatch) {
ps.executeBatch();
stmt.executeBatch();
//배치 insert는 arr 건수만큼
//savePos+=setTrackVOArr.length;
//logger.info("savePos 건수"+savePos);
fusionSave = setTrackVOArr.length - fusionFail;
logger.info("topic {} 배치 저장 건수 {} : 실패 건수 {}", topic, fusionSave, fusionFail);
} else {
logger.info("topic {} 단건 저장 건수 {} : 실패 건수 {}", topic, fusionSave, fusionFail);
}
con.commit();
} catch (Exception e) {
logger.error("insertData(Batch:"+isBatch+") error", e);
if(isBatch) {
logger.error("Try insert row by row");
insertTrackData(setTrackVOArr, false);
}
} finally {
if(ps!=null) {
try {
ps.close();
} catch (SQLException e) {
logger.error("insertData(Batch:"+isBatch+") PreparedStatement close error",e);
}
}
if(stmt!=null) {
try {
stmt.close();
} catch (SQLException e) {
logger.error("insertData(Batch:"+isBatch+") PreparedStatement close error",e);
}
}
if(con!=null) {
try {
con.commit();
con.setAutoCommit(true);
} catch (SQLException e) {
logger.error("insertData(Batch:"+isBatch+") commit error",e);
}
}
}
}
public void insertTrackLastData(TrackVO[] setTrackVOArr) {
insertTrackLastData(setTrackVOArr,true);
}
private void insertTrackLastData(TrackVO[] setTrackLastVOArr, boolean isBatch) {
if(setTrackLastVOArr == null || setTrackLastVOArr.length<1) {
// logger.error("mergeDynamicLastAll - No tableList data");
logger.debug("insertTrackLastData(Batch:"+isBatch+") - no data in setTrackVOArr");
return;
}
String tableName = "SACP_TRACK_LAST";
String sql = "";
if(MainServer.LAST_DATA_CNT > 0) {
sql = " UPDATE " + tableName
+ " SET AC_REG_NO=?, CLSGN=?, SSR_CD=?, MODE_S_CD=?, RECPTN_DT=?, SENSOR_TY=?, CAT_TY=?, LAT=?, LON=?, "
+ " SPD=?, COS=?, ALT=?, VIRTL_TRGT_AT=?, TRACK_CRRCT_AT=?, TRACK_CNFDNC=?, CCTV_ID=?, X_CNTS=?, Y_CNTS=?, "
+ " UPDUSR_ID=?, UPDT_DT=TO_TIMESTAMP(SYSDATE,'yyyyMMdd')"
+ " WHERE TRGT_ID = ?";
}else {
sql = " INSERT INTO " + tableName
+ " ( TRGT_ID, AC_REG_NO, CLSGN, SSR_CD, MODE_S_CD, RECPTN_DT, SENSOR_TY, CAT_TY, "
+ " LAT, LON, SPD, COS, ALT, VIRTL_TRGT_AT, TRACK_CRRCT_AT, TRACK_CNFDNC, CCTV_ID, "
+ " X_CNTS, Y_CNTS, REGISTER_ID)"
+ " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
}
System.out.println("last sql = "+sql);
PreparedStatement ps=null;
Connection con=null;
try {
con=session.getConnection();
if(con.isClosed() || !con.isValid(1)) {
closeSession();
openSession(true);
con=session.getConnection();
}
con.setAutoCommit(false);
// Statement statement = session.getConnection().createStatement();
ps = con.prepareStatement(sql);
for (int i = 0; i < setTrackLastVOArr.length; i++) {
TrackVO vo = setTrackLastVOArr[i];
//입력으로 처리
//중복 입력 방지를 위함
vo.setInserted(true);
try {
if(MainServer.LAST_DATA_CNT > 0) {
}else {
}
///건별 입력
if(!isBatch) {
ps.execute();
saveLast++;
if(MainServer.LAST_DATA_CNT == 0 ) {
MainServer.LAST_DATA_CNT = 1;
}
}
} catch (Exception e) {
logger.debug("Data Insert error UseSqlLog");
sqlLogTrackLastQueue.add(vo);
failLast++;
}
//배치 입력
if(isBatch) {
ps.addBatch();
}
}
if(isBatch) {
ps.executeBatch();
if(MainServer.LAST_DATA_CNT == 0 ) {
MainServer.LAST_DATA_CNT = 1;
}
//배치 insert는 arr 건수만큼
saveLast += setTrackLastVOArr.length;
}
con.commit();
} catch (Exception e) {
logger.error("trackLast(Batch:"+isBatch+") error", e);
if(isBatch) {
logger.error("Try insert row by row");
// insertTrackLastData(setTrackLastVOArr, false);
}
} finally {
if(ps!=null) {
try {
ps.close();
} catch (SQLException e) {
logger.error("trackLast(Batch:"+isBatch+") PreparedStatement close error",e);
}
}
if(con!=null) {
try {
con.commit();
con.setAutoCommit(true);
} catch (SQLException e) {
logger.error("trackLast(Batch:"+isBatch+") commit error",e);
}
}
}
}
public synchronized CountStatus getCountAndSetZero() {
CountStatus countStatus = new CountStatus(savePos, saveLast, saveStatic, failPos, failLast, failStatic);
savePos = 0;
saveLast = 0;
saveStatic = 0;
failPos = 0;
failLast = 0;
failStatic = 0;
return countStatus;
}
/**
*
* /
* @param setDynamicVoArr
*/
/*public void insertData(SetDynamicVO[] setDynamicVoArr) {
if(setDynamicVoArr==null || setDynamicVoArr.length<1) {
// logger.error("insertDataAll - No tableList data");
logger.debug("insertData - no data in setDynamicVoArr");
return;
}
// Table destTable=tableList.get(0);
// String sql = "INSERT INTO " + destTable.getName() + " (" + getInsertCols(destTable.getColumnList()) + ") "
// + "VALUES (" + getInsertValueParams(destTable.getColumnList()) + ")";
String tableName = "th_ais_" + setDynamicVoArr[0].getDateTime().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
String sql = "INSERT INTO "+tableName +" (" +
" ship_id, recv_dt, lon_val, lat_val, sog_val, cog_val, hdg_val, rot_val, zone_id) " +
" VALUES (?, TO_TIMESTAMP(?,'yyyyMMddhh24miss'), ?, ?, ?, ?, ?, ?, ?)";
PreparedStatement ps=null;
Connection con=null;
try {
con=session.getConnection();
if(con.isClosed() || !con.isValid(1)) {
closeSession();
openSession(true);
con=session.getConnection();
}
con.setAutoCommit(false);
// Statement statement = session.getConnection().createStatement();
ps = con.prepareStatement(sql);
for (int i = 0; i < setDynamicVoArr.length; i++) {
SetDynamicVO vo = setDynamicVoArr[i];
//입력으로 처리
//중복 입력 방지를 위함
vo.setInserted(true);
try {
ps.setString(1, vo.getShipID());
// ps.setDate(2, vo.getDateTime());
ps.setString(2, vo.getDateTime().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")));
ps.setDouble(3, vo.getLongitude());
ps.setDouble(4, vo.getLatitude());
ps.setDouble(5,vo.getSog());
ps.setDouble(6,vo.getCog());
ps.setInt(7, vo.getHeading());
ps.setInt(8, vo.getRot());
ps.setString(9, vo.getZoneId());
// ps.addBatch();
ps.execute();
} catch (Exception e) {
// logger.error("insertData error", e);
}
}
// ps.executeBatch();
con.commit();
} catch (Exception e) {
logger.error("insertData error", e);
} finally {
if(ps!=null) {
try {
ps.close();
} catch (SQLException e) {
logger.error("insertData PreparedStatement close error",e);
}
}
if(con!=null) {
try {
con.commit();
con.setAutoCommit(true);
} catch (SQLException e) {
logger.error("insertData commit error",e);
}
}
}
}*/
// /**
// * Insert colums 성성
// * @param columnList
// * @return
// */
// private String getInsertCols(List<Column> columnList) {
// StringBuilder result=new StringBuilder();
// for (int i = 0; i < columnList.size(); i++) {
// result.append(columnList.get(i).getName());
// //마지막에는 ,를 안붙이기 위해
// if(i!=columnList.size()-1)
// result.append(",");
// }
// return result.toString();
// }
//
//
// private String getInsertValueParams(List<Column> columnList) {
// StringBuilder result=new StringBuilder();
// for (int i = 0; i < columnList.size(); i++) {
// result.append(setInsertParam(columnList.get(i)));
// //마지막에는 ,를 안붙이기 위해
// if(i!=columnList.size()-1)
// result.append(",");
// }
// return result.toString();
// }
//
// /**
// * 파라미터는 ? 로 표현하여 ps.set~~~으로...
// * @param column
// * @return
// */
// private String setInsertParam(Column column) {
// switch (dbType) {
// case ORACLE:
// return setInsertParamOracle(column);
// default:
// //mariadb default
// return setInsertParamMariadb(column);
// }
//
//
// }
//
//
// private String setInsertParamMariadb(Column column) {
//
// if(column.isCustom()) {
// return column.getCustomStr();
// }else {
// switch (column.getColumnTypeName()) {
// case "DATE":
// case "DATETIME":
// return "STR_TO_DATE(?,'%Y%m%d%H%i%S')";
// case "TIMESTAMP":
// case "TIMESTAMP_WITH_TIMEZONE":
// return "STR_TO_DATE(?,'%Y%m%d%H%i%S%f')";
// case "TIME":
// case "TIME_WITH_TIMEZONE":
// return "STR_TO_DATE(?,'%H%i%S')";
// default:
// return "?";
// }
// }
// }
//
//
//
//
// private String setInsertParamOracle(Column column) {
//
// if(column.isCustom()) {
// return column.getCustomStr();
// }else {
// switch (column.getColumnTypeName()) {
// case "DATE":
// case "DATETIME":
// return "TO_DATE(?,'yyyymmddhh24miss')";
// case "TIMESTAMP":
// case "TIMESTAMP_WITH_TIMEZONE":
// return "TO_TIMESTAMP(?,'yyyymmddhh24missff')";
// case "TIME":
// case "TIME_WITH_TIMEZONE":
// return "TO_DATE(?,'hh24miss')";
// default:
// return "?";
// }
// }
//
// }
}

View File

@ -0,0 +1,153 @@
package kr.gmtc.tss.elkdata.runnable;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import kr.gmtc.tss.elkdata.vo.RuteVO;
import kr.gmtc.tss.elkdata.vo.RuteVO.MvPoints;
public class AiVoiceMetaRunnable implements Runnable {
private Logger logger = LoggerFactory.getLogger(this.getClass());
Queue<String> rcvQueue;
Queue<String> dataFileQueue;
private boolean isRunning = false;
private int listChunkSize = 100;
private long sleepMillis;
private String classification;
private String swclassification;
private String logclassification;
public AiVoiceMetaRunnable(Queue<String> rcvQueue
, Queue<String> dataFileQueue
, int listChunkSize
, long sleepMillis
, String classification, String swclassification, String logclassification) {
this.rcvQueue = rcvQueue;
this.dataFileQueue = dataFileQueue;
this.listChunkSize = listChunkSize;
this.sleepMillis = sleepMillis;
this.classification = classification;
this.swclassification = swclassification;
this.logclassification = logclassification;
}
@Override
public void run() {
isRunning = true;
while (isRunning) {
try {
int loopCnt = 0;
if(rcvQueue.size()>listChunkSize) {
loopCnt = listChunkSize;
}else {
loopCnt = rcvQueue.size();
}
String [] chunkArr = new String[loopCnt];
for (int i = 0; i < loopCnt; i++) {
chunkArr[i] = rcvQueue.poll();
}
for (int i = 0; i < chunkArr.length; i++) {
parseAndEnqueue(chunkArr[i]);
}
} catch (Exception e) {
if (e.getMessage() != null) {
logger.debug("[ParseTarget] Parsing Error Message : " ,e);
} else {
logger.info("[ParserThread] Parsing Error Call Stack" ,e);
}
}
sleep(sleepMillis);
}
}
private void parseAndEnqueue(String result) {
try {
ArrayList<RuteVO> histroyQueue = new ArrayList<RuteVO>();
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> convert = mapper.readValue(result, new TypeReference<Map<String, Object>>() {});
ArrayList<LinkedHashMap<String, Object>> list = (ArrayList<LinkedHashMap<String, Object>>) convert.get("data");
logger.info("수신 데이터 변환 건수 {}", list.size());
DateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss.SSS");
String dateTime = dateFormat.format(new Date());
StringBuilder sb = new StringBuilder();
sb.append(dateTime); sb.append(",");
sb.append(logclassification); sb.append(","); //log구분
sb.append(swclassification); sb.append(","); //sw구분
sb.append(classification); sb.append("|"); //서버구분
int i = 0;
int z = 0;
for (LinkedHashMap<String, Object> map : list) {
RuteVO t = mapper.convertValue(map, RuteVO.class);
if (i > 0) sb.append("^");
sb.append(t.getMv_id()); sb.append(",");
sb.append(t.getTrgt_id()); sb.append(",");
sb.append(t.getMv_type()); sb.append(",");
sb.append(t.getDep_arr_ty()); sb.append(",");
int j = 0;
for (MvPoints b : t.getMv_points()) {
if (j > 0) sb.append("$");
sb.append(b.toDataFormat());
j++;
}
logger.info("logstash 포맷 변환 건수 {}", z++);
histroyQueue.add(t);
i++;
}
dataFileQueue.add(sb.toString());
} catch (Exception e) {
logger.error("[ParserThread] Parsing Error Message : " + e.getMessage());
}
}
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
logger.debug("[ParseTargetRunnable] sleep error : "+e.getLocalizedMessage());
}
}
public void setRunning(boolean isRunning) {
this.isRunning = isRunning;
}
}

View File

@ -1,4 +1,4 @@
package kr.gmtc.tss.data; package kr.gmtc.tss.elkdata.runnable;
import java.text.DateFormat; import java.text.DateFormat;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
@ -14,8 +14,8 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import kr.gmtc.tss.data.vo.CctvPtzVO; import kr.gmtc.tss.elkdata.vo.CctvPtzVO;
import kr.gmtc.tss.status.CountStatus;
public class CctvPtzParseRunnable implements Runnable { public class CctvPtzParseRunnable implements Runnable {
@ -43,9 +43,10 @@ public class CctvPtzParseRunnable implements Runnable {
private String logclassification; private String logclassification;
public CctvPtzParseRunnable(Queue<String> rcvQueue, Queue<CctvPtzVO> trackQueue, Queue<CctvPtzVO> trackLastQueue public CctvPtzParseRunnable(Queue<String> rcvQueue, Queue<CctvPtzVO> trackQueue, Queue<CctvPtzVO> trackLastQueue
, Queue<String> dataFileQueue,Queue<CctvPtzVO> messageBodyQueue, int listChunkSize , Queue<String> dataFileQueue,Queue<CctvPtzVO> messageBodyQueue
, long sleepMillis, boolean flagFile, boolean flagDataBase, boolean flagDataFile , int listChunkSize, long sleepMillis
, String classification, String swclassification, String logclassification) { , boolean flagFile, boolean flagDataBase, boolean flagDataFile
, String classification, String logclassification) {
this.rcvQueue = rcvQueue; this.rcvQueue = rcvQueue;
this.trackQueue = trackQueue; this.trackQueue = trackQueue;
this.trackLastQueue = trackLastQueue; this.trackLastQueue = trackLastQueue;
@ -157,12 +158,12 @@ public class CctvPtzParseRunnable implements Runnable {
} }
public synchronized CountStatus getCountAndSetZero() { // public synchronized CountStatus getCountAndSetZero() {
CountStatus countStatus = new CountStatus(rcvPos,rcvStatic); // CountStatus countStatus = new CountStatus(rcvPos,rcvStatic);
rcvPos = 0; // rcvPos = 0;
rcvStatic = 0; // rcvStatic = 0;
return countStatus; // return countStatus;
} // }
private void sleep(long millis) { private void sleep(long millis) {
try { try {

View File

@ -1,4 +1,4 @@
package kr.gmtc.tss.data; package kr.gmtc.tss.elkdata.runnable;
import java.text.DateFormat; import java.text.DateFormat;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
@ -14,9 +14,8 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import kr.gmtc.tss.data.vo.RuteVO; import kr.gmtc.tss.elkdata.vo.RuteVO;
import kr.gmtc.tss.data.vo.RuteVO.MvPoints; import kr.gmtc.tss.elkdata.vo.RuteVO.MvPoints;
import kr.gmtc.tss.status.CountStatus;
public class RuteParseRunnable implements Runnable { public class RuteParseRunnable implements Runnable {
@ -32,10 +31,6 @@ public class RuteParseRunnable implements Runnable {
Queue<String> dataFileQueue; Queue<String> dataFileQueue;
Queue<RuteVO> messageBodyQueue; Queue<RuteVO> messageBodyQueue;
private int rcvPos = 0;
private int rcvStatic = 0;
private boolean isRunning = false;
private int listChunkSize = 100; private int listChunkSize = 100;
private long sleepMillis; private long sleepMillis;
@ -64,9 +59,8 @@ public class RuteParseRunnable implements Runnable {
@Override @Override
public void run() { public void run() {
isRunning = true;
while (isRunning) { while (!Thread.interrupted()) {
try { try {
int loopCnt = 0; int loopCnt = 0;
@ -145,18 +139,18 @@ public class RuteParseRunnable implements Runnable {
} }
if(flagFile) { // if(flagFile) {
messageBodyQueue.addAll(histroyQueue); // messageBodyQueue.addAll(histroyQueue);
} // }
if(flagDataFile) { if(flagDataFile) {
dataFileQueue.add(sb.toString()); dataFileQueue.add(sb.toString());
} }
if(flagDataBase) { // if(flagDataBase) {
if (histroyQueue.size() > 0) { // if (histroyQueue.size() > 0) {
trackQueue.addAll(histroyQueue); // trackQueue.addAll(histroyQueue);
} // }
// if (flagDataBase) { // if (flagDataBase) {
// histroyQueue // histroyQueue
@ -167,7 +161,7 @@ public class RuteParseRunnable implements Runnable {
// trackLastQueue.add(value.get()); // trackLastQueue.add(value.get());
// });; // });;
// } // }
} // }
} catch (Exception e) { } catch (Exception e) {
logger.error("[ParserThread] Parsing Error Message : " + e.getMessage()); logger.error("[ParserThread] Parsing Error Message : " + e.getMessage());
@ -183,12 +177,12 @@ public class RuteParseRunnable implements Runnable {
} }
public synchronized CountStatus getCountAndSetZero() { // public synchronized CountStatus getCountAndSetZero() {
CountStatus countStatus = new CountStatus(rcvPos,rcvStatic); // CountStatus countStatus = new CountStatus(rcvPos,rcvStatic);
rcvPos = 0; // rcvPos = 0;
rcvStatic = 0; // rcvStatic = 0;
return countStatus; // return countStatus;
} // }
private void sleep(long millis) { private void sleep(long millis) {
try { try {
@ -198,7 +192,4 @@ public class RuteParseRunnable implements Runnable {
} }
} }
public void setRunning(boolean isRunning) {
this.isRunning = isRunning;
}
} }

View File

@ -1,4 +1,4 @@
package kr.gmtc.tss.data.vo; package kr.gmtc.tss.elkdata.vo;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;

View File

@ -1,4 +1,4 @@
package kr.gmtc.tss.data.vo; package kr.gmtc.tss.elkdata.vo;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;

View File

@ -1,4 +1,4 @@
package kr.gmtc.tss.data.vo; package kr.gmtc.tss.elkdata.vo;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;

View File

@ -0,0 +1,83 @@
package kr.gmtc.tss.elkdata.vo;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class VoiceVo {
@Getter
@Setter
public static class VoiceKeyword{
@JsonProperty("keyword")
private String keyword;
@JsonProperty("tag")
private String tag;
}
@Getter
@Setter
public static class VoiceReadback{
@JsonProperty("result")
private String result;
@JsonProperty("read_starttime")
private String read_starttime;
@JsonProperty("read_endtime")
private String read_endtime;
private List<VoiceKeyword> keywords;
}
@JsonProperty("starttime")
private String starttime;
@JsonProperty("endtime")
private String endtime;
@JsonProperty("com_type")
private String com_type;
@JsonProperty("channel")
private String channel;
@JsonProperty("speaker")
private String speaker;
@JsonProperty("facility")
private String facility;
@JsonProperty("callsign")
private String callsign;
@JsonProperty("doc_type")
private String doc_type;
@JsonProperty("control_type")
private String control_type;
private List<VoiceKeyword> keywords;
private List<VoiceReadback> readback;
@JsonProperty("frequency")
private String frequency;
@JsonProperty("media_name")
private String media_name;
@JsonProperty("text")
private String text;
}

View File

@ -9,7 +9,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import kr.gmtc.tss.data.vo.TrackVO; import kr.gmtc.tss.elkdata.vo.TrackVO;
import kr.gmtc.tss.main.MainServer; import kr.gmtc.tss.main.MainServer;
import kr.gmtc.tss.util.FileUtil; import kr.gmtc.tss.util.FileUtil;

View File

@ -9,7 +9,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import kr.gmtc.tss.data.vo.TrackVO; import kr.gmtc.tss.elkdata.vo.TrackVO;
import kr.gmtc.tss.main.MainServer; import kr.gmtc.tss.main.MainServer;
import kr.gmtc.tss.util.FileUtil; import kr.gmtc.tss.util.FileUtil;

View File

@ -1,37 +1,32 @@
package kr.gmtc.tss.main; package kr.gmtc.tss.main;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Queue; import java.util.Queue;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import kr.gmt.so.state.StateManager; import kr.gmt.so.state.StateManager;
import kr.gmt.so.state.model.SystemState; import kr.gmt.so.state.model.SystemState;
import kr.gmtc.tss.data.DataInsertRunnable; import kr.gmtc.tss.config.TopicsList;
import kr.gmtc.tss.config.TssConfig; import kr.gmtc.tss.config.TssConfig;
import kr.gmtc.tss.data.CctvPtzParseRunnable; import kr.gmtc.tss.elkdata.runnable.AiVoiceMetaRunnable;
import kr.gmtc.tss.data.vo.CctvPtzVO; import kr.gmtc.tss.elkdata.runnable.CctvPtzParseRunnable;
import kr.gmtc.tss.db.FusionBatchManager; import kr.gmtc.tss.elkdata.vo.RuteVO;
import kr.gmtc.tss.filelog.DataLogger; import kr.gmtc.tss.filelog.DataLogger;
import kr.gmtc.tss.filelog.MessageBodyLogger; import kr.gmtc.tss.filelog.MessageBodyLogger;
import kr.gmtc.tss.kafka.KafkaTopicReader; import kr.gmtc.tss.kafka.KafkaTopicReader;
import kr.gmtc.tss.sqllog.SqlLogger;
import kr.gmtc.tss.util.ArrayBlockingLoggingQueue;
import kr.gmtc.tss.util.FileUtil; import kr.gmtc.tss.util.FileUtil;
@EnableScheduling @EnableScheduling
@ -43,70 +38,38 @@ public class MainServer implements InitializingBean {
private Logger logger = LoggerFactory.getLogger(this.getClass()); private Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
@Qualifier("batchSqlSessionFactory")
public SqlSessionFactory sqlSessionFactory;
private SqlSession session;
KafkaTopicReader kafkaTopicReader; KafkaTopicReader kafkaTopicReader;
CctvPtzParseRunnable parserRunnable; // Runnable parserRunnable;
DataLogger dataLoggerRunnable; DataLogger dataLoggerRunnable;
SqlLogger sqlLoggerRunnable;
FusionBatchManager batchInsertManager;
DataInsertRunnable dataInsertRunnable;
MessageBodyLogger messageBodyLogger; MessageBodyLogger messageBodyLogger;
List<Thread> threadList = new ArrayList<Thread>(); List<Thread> threadList = new ArrayList<Thread>();
// Queue<String> rcvQueue;
// Queue<CctvPtzVO> sqlLogTrackQueue;
// Queue<CctvPtzVO> sqlLogTrackLastQueue;
// // track 위치정보 저장
// Queue<CctvPtzVO> trackQueue;
// Queue<String> dataFileQueue;
// Queue<CctvPtzVO> trackLastQueue;
// Queue<CctvPtzVO> messageBodyQueue;
@Autowired @Autowired
private TssConfig cfg; private TssConfig cfg;
@Autowired
private TopicsList topicList;
@Autowired @Autowired
private FileUtil fileUtil; private FileUtil fileUtil;
@Autowired @Autowired
private StateManager stateMgr; private StateManager stateMgr;
private boolean swBeforeStatus = true;
public static int LAST_DATA_CNT = 0; public static int LAST_DATA_CNT = 0;
@PostConstruct @PostConstruct
public void init() { public void init() {
//stateMgr.updateState();
logger.info("DataQueue Count : "+cfg.queueCount); logger.info("DataQueue Count : "+cfg.queueCount);
logger.info("flagDataBase : "+cfg.flagDataBase); // logger.info("flagDataBase : "+cfg.flagDataBase);
logger.info("flagDataFile : "+cfg.flagDataFile); logger.info("flagDataFile : "+cfg.flagDataFile);
logger.info("flagFile : "+cfg.flagFile); // logger.info("flagFile : "+cfg.flagFile);
// rcvQueue=new ArrayBlockingLoggingQueue<>(queueCount);
// sqlLogAsdeTrackQueue=new ArrayBlockingLoggingQueue<>(queueCount);
// sqlLogAsdeTrackLastQueue=new ArrayBlockingLoggingQueue<>(queueCount);
// asdeTrackQueue=new ArrayBlockingLoggingQueue<>(queueCount);
// dataFileQueue=new ArrayBlockingLoggingQueue<>(queueCount);
// asdeTrackLastQueue=new ArrayBlockingLoggingQueue<>(queueCount);
// messageBodyQueue=new ArrayBlockingLoggingQueue<>(queueCount);
// LAST TABLE 데이터 확인
// DBLastDataStatus();
// 최초 한번만 실행 - 로그파일이 없을 시 생성 // 최초 한번만 실행 - 로그파일이 없을 시 생성
if (cfg.flagFile) FILE_LOG_NAME = fileUtil.createLogFile(cfg.fileZipPath, cfg.fileZipBakPath, cfg.fileMakeTime); // rest log file DATA_FILE_NAME = fileUtil.createLogFile(cfg.dataZipPath, cfg.dataZipBakPath, cfg.dataMakeTime); // logstash data file log
if (cfg.flagDataFile) DATA_FILE_NAME = fileUtil.createLogFile(cfg.dataZipPath, cfg.dataZipBakPath, cfg.dataMakeTime); // logstash data file log
} }
@PreDestroy @PreDestroy
@ -114,18 +77,20 @@ public class MainServer implements InitializingBean {
logger.info("Pre destroy called!!"); logger.info("Pre destroy called!!");
dataInsertRunnable.setRunning(false); // dataInsertRunnable.setRunning(false);
sqlLoggerRunnable.setRunning(false); // sqlLoggerRunnable.setRunning(false);
parserRunnable.setRunning(false); // parserRunnable.setRunning(false);
// for(Thread ct : threadList ){
// ct.stop
// }
} }
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
// if(flagDataBase) startDataInsertThread(); startDataLoggerThread();
if(cfg.flagDataFile) startDataLoggerThread();
// if(cfg.flagFile) startMessageBodyThread();
startKafkaThread(); startKafkaThread();
startParserThread(); startParserThread();
@ -140,17 +105,6 @@ public class MainServer implements InitializingBean {
boolean running = true; boolean running = true;
boolean swNewStatus = stateMgr.isActive(); boolean swNewStatus = stateMgr.isActive();
// // 이중화 Active / Standby 상태 변경 체크
// if(swBeforeStatus != swNewStatus){
// parserRunnable.setRunning(swNewStatus);
// dataInsertRunnable2.setRunning(swNewStatus);
// dataLoggerRunnable.setRunning(swNewStatus);
// sqlLoggerRunnable.setRunning(swNewStatus);
// messageBodyLogger.setRunning(swNewStatus);
// }
// 해당 프로그램이 Active일때 Thread Interrupt 유무 체크 // 해당 프로그램이 Active일때 Thread Interrupt 유무 체크
if(swNewStatus){ if(swNewStatus){
@ -168,16 +122,13 @@ public class MainServer implements InitializingBean {
} }
} }
swBeforeStatus = stateMgr.isActive();
// logger.info("swBeforeStatus : " + swBeforeStatus);
} }
@Scheduled(cron = "${process.file-backup-cron}") @Scheduled(cron = "${process.file-backup-cron}")
public void fileRecv() throws Exception{ public void fileRecv() throws Exception{
if (cfg.flagFile) { // if (cfg.flagFile) {
/* 파일 압축 */ /* 파일 압축 */
fileUtil.createZipFile(cfg.fileZipPath, cfg.fileZipBakPath, cfg.fileMakeTime); // fileUtil.createZipFile(cfg.fileZipPath, cfg.fileZipBakPath, cfg.fileMakeTime);
} // }
} }
@Scheduled(cron = "${process.data-backup-cron}") @Scheduled(cron = "${process.data-backup-cron}")
@ -191,9 +142,9 @@ public class MainServer implements InitializingBean {
// 매 시간 10분 간격 으로 실행 // 매 시간 10분 간격 으로 실행
@Scheduled(cron = "${process.file-make-cron}") @Scheduled(cron = "${process.file-make-cron}")
public void fileMake() throws Exception{ public void fileMake() throws Exception{
if (cfg.flagFile) { // if (cfg.flagFile) {
FILE_LOG_NAME = fileUtil.createLogFile(cfg.fileZipPath, cfg.fileZipBakPath, cfg.fileMakeTime); // FILE_LOG_NAME = fileUtil.createLogFile(cfg.fileZipPath, cfg.fileZipBakPath, cfg.fileMakeTime);
} // }
} }
@Scheduled(cron = "${process.data-make-cron}") @Scheduled(cron = "${process.data-make-cron}")
@ -205,74 +156,87 @@ public class MainServer implements InitializingBean {
public void startKafkaThread() { public void startKafkaThread() {
KafkaTopicReader topicReader = new KafkaTopicReader(cfg.bootstrapServers, cfg.topic, cfg.consumeGroupId, cfg.rcvQueue);
this.kafkaTopicReader=topicReader;
String[] arrSW = cfg.swclassification;
Map<String, String> topics = topicList.getTopics();
for(String swIdx : arrSW){
String topic = topics.get(swIdx);
Queue<String> rcvQueue = cfg.getRcvQueueMap().get(swIdx);
KafkaTopicReader topicReader = new KafkaTopicReader(cfg.bootstrapServers, topic, cfg.consumeGroupId, rcvQueue);
this.kafkaTopicReader=topicReader;
Thread thread = new Thread(topicReader); Thread thread = new Thread(topicReader);
thread.setDaemon(true);
thread.start(); thread.start();
threadList.add(thread); threadList.add(thread);
logger.info("KafkaTopicReader 생성 [" +swIdx+ "]");
}
} }
public void startParserThread() { public void startParserThread() {
CctvPtzParseRunnable cctvPtzParseRunnable = new CctvPtzParseRunnable(cfg.rcvQueue, cfg.trackQueue, cfg.trackLastQueue, cfg.dataFileQueue, cfg.messageBodyQueue, cfg.queueChunkSize, cfg.parseTargetSleepMs ,cfg.flagFile, cfg.flagDataBase, cfg.flagDataFile, cfg.classification, cfg.swclassification, cfg.logclassification );
this.parserRunnable=cctvPtzParseRunnable; Map<String, Object> parseRunnableMap = new HashMap<>();
Thread thread = new Thread(cctvPtzParseRunnable); parseRunnableMap.put("6", "kr.gmtc.tss.elkdata.runnable.AiVoiceMetaRunnable");
parseRunnableMap.put("11", "kr.gmtc.tss.elkdata.runnable.CctvPtzParseRunnable");
String[] arrSW = cfg.swclassification;
Map<String, String> topics = topicList.getTopics();
for(String swIdx : arrSW){
String topic = topics.get(swIdx);
Queue<String> rcvQueue = cfg.getRcvQueueMap().get(swIdx);
if(swIdx.equals("6")){
AiVoiceMetaRunnable aiVoiceParseRunnable = new AiVoiceMetaRunnable(rcvQueue, cfg.dataFileQueue,
cfg.queueChunkSize, cfg.parseTargetSleepMs,
cfg.classification, swIdx, cfg.logclassification );
Thread thread = new Thread(aiVoiceParseRunnable);
thread.setDaemon(true);
thread.start(); thread.start();
threadList.add(thread); threadList.add(thread);
cctvPtzParseRunnable.getCountAndSetZero();
logger.info("parserThread 생성 [" +swIdx+ "]");
} }
// public void startDataInsertThread() { if(swIdx.equals("11")){
// BatchInsertManager batchInsertManager = new BatchInsertManager(sqlSessionFactory, sqlLogTrackQueue,sqlLogTrackLastQueue);
// this.batchInsertManager=batchInsertManager;
// DataInsertRunnable dataInsertRunnable = new DataInsertRunnable(trackQueue, trackLastQueue, batchInsertSize, batchInsertManager, dataInsertSleepMs, topic); AiVoiceMetaRunnable aiVoiceParseRunnable = new AiVoiceMetaRunnable(rcvQueue, cfg.dataFileQueue,
// this.dataInsertRunnable = dataInsertRunnable; cfg.queueChunkSize, cfg.parseTargetSleepMs,
cfg.classification, swIdx, cfg.logclassification );
Thread thread = new Thread(aiVoiceParseRunnable);
thread.setDaemon(true);
thread.start();
threadList.add(thread);
// Thread thread = new Thread(dataInsertRunnable); logger.info("parserThread 생성 [" +swIdx+ "]");
// thread.start(); }
// threadList.add(thread);
// } }
// cctvPtzParseRunnable.getCountAndSetZero();
}
public void startDataLoggerThread() { public void startDataLoggerThread() {
DataLogger dataLoggerRunnable = new DataLogger(cfg.dataFileQueue, cfg.queueChunkSize, cfg.sqlLoggerSleepMs); DataLogger dataLoggerRunnable = new DataLogger(cfg.dataFileQueue, cfg.queueChunkSize, cfg.sqlLoggerSleepMs);
this.dataLoggerRunnable = dataLoggerRunnable; this.dataLoggerRunnable = dataLoggerRunnable;
Thread thread = new Thread(dataLoggerRunnable); Thread thread = new Thread(dataLoggerRunnable);
thread.setDaemon(true);
thread.start(); thread.start();
threadList.add(thread); threadList.add(thread);
} }
// public void startSqlLoggerThread() {
// SqlLogger sqlLoggerRunnable = new SqlLogger(sqlLogAsdeTrackQueue, sqlLogAsdeTrackLastQueue, queueChunkSize, sqlLoggerSleepMs);
// this.sqlLoggerRunnable = sqlLoggerRunnable;
// Thread thread = new Thread(sqlLoggerRunnable);
// thread.start();
// threadList.add(thread);
// }
// public void startMessageBodyThread() {
// MessageBodyLogger messageBodyLogger = new MessageBodyLogger(messageBodyQueue, queueChunkSize, sqlLoggerSleepMs);
// this.messageBodyLogger = messageBodyLogger;
// Thread thread = new Thread(messageBodyLogger);
// thread.start();
// threadList.add(thread);
// }
public void openSession(boolean forceOpen) {
if(session==null||forceOpen)
session = sqlSessionFactory.openSession();
}
public void closeSession() {
if(session!=null)
session.close();
}
} }

View File

@ -1,132 +0,0 @@
package kr.gmtc.tss.sqllog;
import java.text.SimpleDateFormat;
import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kr.gmtc.tss.data.vo.TrackVO;
import kr.gmtc.tss.main.MainServer;
public class SqlLogger implements Runnable {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private Queue<TrackVO> sqlLogTrackQueue;
private Queue<TrackVO> sqlLogTrackLastQueue;
private boolean isRunning=true;
private int listChunkSize= 100;
private long sleepMillis;
public SqlLogger(Queue<TrackVO> sqlLogTrackQueue,Queue<TrackVO> sqlLogTrackLastQueue, int listChunkSize, long sleepMillis) {
this.sqlLogTrackQueue = sqlLogTrackQueue;
this.sqlLogTrackLastQueue = sqlLogTrackLastQueue;
this.listChunkSize = listChunkSize;
this.sleepMillis = sleepMillis;
}
@Override
public void run() {
isRunning = true;
while (isRunning) {
try {
TrackVO [] chunkArr = getChunkTrackArr(sqlLogTrackQueue);
for (int i = 0; i < chunkArr.length; i++) {
logger.error(getInsertTrackSql(chunkArr[i]));
}
/*
* chunkArr = getChunkTrackArr(sqlLogTrackLastQueue); for (int i = 0; i <
* chunkArr.length; i++) { logger.error(getInsertLastTrackSql(chunkArr[i])); }
*/
} catch (Exception e) {
logger.info("-- [SqlLogger] LogSave error : " +e.getLocalizedMessage());
}
sleep(sleepMillis);
}
}
private TrackVO [] getChunkTrackArr(Queue<TrackVO> sqlQueue) {
int loopCnt=0;
if(sqlQueue.size()>listChunkSize) {
loopCnt=listChunkSize;
}else {
loopCnt=sqlQueue.size();
}
TrackVO [] chunkArr = new TrackVO[loopCnt];
for (int i = 0; i < loopCnt; i++) {
chunkArr[i]=sqlQueue.poll();
}
return chunkArr;
}
private String getInsertTrackSql(TrackVO vo) {
StringBuilder sb = new StringBuilder();
// sb.append("\n");
sb.append("INSERT INTO ");
sb.append("SACP_TRACK_FUSION");
sb.append(" (TRGT_ID, RECPTN_DT, LAT, LON, SPD, COS, TRGT_TY, REVISN_LAT, REVISN_LON, MODE_S_CD, SSR_CD, CLSGN, AC_REG_NO, REGISTER_ID)");
sb.append("VALUES (");
sb.append("'"); sb.append(vo.getTargetId()); sb.append("',");
sb.append("'"); sb.append(vo.getReceptionDate()); sb.append("',");
sb.append("'"); sb.append(vo.getLatitude()); sb.append("',");
sb.append("'"); sb.append(vo.getLongitude()); sb.append("',");
sb.append("'"); sb.append(vo.getSpeed()); sb.append("',");
sb.append("'"); sb.append(vo.getCourse()); sb.append("',");
sb.append("'"); sb.append(vo.getTargetType()); sb.append("',");
sb.append("'"); sb.append(vo.getCorrectedLatitude()); sb.append("',");
sb.append("'"); sb.append(vo.getCorrectedLongitude()); sb.append("',");
sb.append("'"); sb.append(vo.getModeSCode()); sb.append("',");
sb.append("'"); sb.append(vo.getSsrCode()); sb.append("',");
sb.append("'"); sb.append(vo.getCallsign()); sb.append("',");
sb.append("'"); sb.append(vo.getTailNumber()); sb.append("',");
sb.append("'"); sb.append("fusion"); sb.append("');");
for (int j = 0; j < vo.getBbox().size(); j++) {
sb.append("\n");
sb.append("INSERT INTO ");
sb.append("SACP_FUSION_TRACK_BBOX");
sb.append(" (TRGT_ID, RECPTN_DT, CCTV_ID, BBOX_TOP_LEFT_X, BBOX_TOP_LEFT_Y, BBOX_BOTTOM_RIGHT_X, BBOX_BOTTOM_RIGHT_Y, CRDNT_X, CRDNT_Y, REGISTER_ID)");
sb.append("VALUES (");
sb.append("'"); sb.append(vo.getTargetId()); sb.append("',");
sb.append("'"); sb.append(vo.getReceptionDate()); sb.append("',");
sb.append("'"); sb.append(vo.getBbox().get(j).getCctvId()); sb.append("',");
sb.append("'"); sb.append(vo.getBbox().get(j).getLeftTopX()); sb.append("',");
sb.append("'"); sb.append(vo.getBbox().get(j).getLeftTopY()); sb.append("',");
sb.append("'"); sb.append(vo.getBbox().get(j).getRightBottomX()); sb.append("',");
sb.append("'"); sb.append(vo.getBbox().get(j).getRightBottomY()); sb.append("',");
sb.append("'"); sb.append(vo.getBbox().get(j).getPx()); sb.append("',");
sb.append("'"); sb.append(vo.getBbox().get(j).getPy()); sb.append("',");
sb.append("'"); sb.append("fusion"); sb.append("');");
}
return sb.toString();
}
private String getInsertLastTrackSql(TrackVO vo) {
StringBuilder sb = new StringBuilder();
if(MainServer.LAST_DATA_CNT > 0) {
sb.append("UPDATE SACP_TRACK_LAST");
}
return sb.toString();
}
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
logger.debug("[SqlLogger] sleep error : "+e.getLocalizedMessage());
}
}
public void setRunning(boolean isRunning) {
this.isRunning = isRunning;
}
}

View File

@ -1,8 +0,0 @@
package kr.gmtc.tss.status;
public interface ContextDestroyEventListener {
/**
* Context destroy event handler.
*/
void onDestroy();
}

View File

@ -1,64 +0,0 @@
package kr.gmtc.tss.status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
@Component
@Scope("singleton")
public class ContextDestroyStatus {
/**
* Logger.
*/
private final Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* linstener list.
*/
private final List<ContextDestroyEventListener> contextDestroyEventListeners = new ArrayList<ContextDestroyEventListener>();
/**
* context destroy .
*/
private boolean shutdown = false;
/**
* Shutdown flag . listener onDestroy event fire .
*/
@PreDestroy
public void destroy() {
logger.info("Called destroy()!");
shutdown = true;
for (ContextDestroyEventListener listener : contextDestroyEventListeners) {
listener.onDestroy();
}
}
/**
* @return destroy true. false.
*/
public boolean getShutdownStat() {
return shutdown;
}
public void setShutdownStat(boolean shutdown ) {
this.shutdown=shutdown;
// return shutdown;
}
/**
* @param listener
* event fire class.
*/
public void addListener(final ContextDestroyEventListener listener) {
contextDestroyEventListeners.add(listener);
logger.info("addListener ({}) complete. Listeners count is {}.", listener.getClass().getName(),
contextDestroyEventListeners.size());
}
}

View File

@ -1,240 +0,0 @@
package kr.gmtc.tss.status;
public class CountStatus {
private int rcvPos=0;
private int rcvStat=0;
private int savePos=0;
private int saveLast=0;
private int saveStatic=0;
private int failPos=0;
private int failLast=0;
private int failStatic=0;
private int exceptPos=0;
private int rcvQueueSize=0;
private int dynamicVoQueueSize=0;
private int staticVoQueueSize=0;
private int dynamicInsertQueueSize=0;
private int sqlLogDynamicQueueSize=0;
private int sqlLogDynamicLastQueueSize=0;
private int sqlLogStaticQueueSize=0;
public CountStatus(int rcvPos, int rcvStat) {
this.rcvPos=rcvPos;
this.rcvStat=rcvStat;
}
public CountStatus(int savePos, int saveLast, int saveStatic, int failPos, int failLast, int failStatic) {
this.savePos=savePos;
this.saveLast=saveLast;
this.saveStatic=saveStatic;
this.failPos=failPos;
this.failLast=failLast;
this.failStatic=failStatic;
}
public CountStatus() {
}
public int getRcvPos() {
return rcvPos;
}
public void setRcvPos(int rcvPos) {
this.rcvPos = rcvPos;
}
public int getRcvStat() {
return rcvStat;
}
public void setRcvStat(int rcvStat) {
this.rcvStat = rcvStat;
}
public int getSavePos() {
return savePos;
}
public void setSavePos(int savePos) {
this.savePos = savePos;
}
public int getSaveLast() {
return saveLast;
}
public void setSaveLast(int saveLast) {
this.saveLast = saveLast;
}
public int getSaveStatic() {
return saveStatic;
}
public void setSaveStatic(int saveStatic) {
this.saveStatic = saveStatic;
}
public int getFailPos() {
return failPos;
}
public void setFailPos(int failPos) {
this.failPos = failPos;
}
public int getFailLast() {
return failLast;
}
public void setFailLast(int failLast) {
this.failLast = failLast;
}
public int getFailStatic() {
return failStatic;
}
public void setFailStatic(int failStatic) {
this.failStatic = failStatic;
}
public int getExceptPos() {
return exceptPos;
}
public void setExceptPos(int exceptPos) {
this.exceptPos = exceptPos;
}
public int getRcvQueueSize() {
return rcvQueueSize;
}
public void setRcvQueueSize(int rcvQueueSize) {
this.rcvQueueSize = rcvQueueSize;
}
public int getDynamicVoQueueSize() {
return dynamicVoQueueSize;
}
public void setDynamicVoQueueSize(int dynamicVoQueueSize) {
this.dynamicVoQueueSize = dynamicVoQueueSize;
}
public int getStaticVoQueueSize() {
return staticVoQueueSize;
}
public void setStaticVoQueueSize(int staticVoQueueSize) {
this.staticVoQueueSize = staticVoQueueSize;
}
public int getDynamicInsertQueueSize() {
return dynamicInsertQueueSize;
}
public void setDynamicInsertQueueSize(int dynamicInsertQueueSize) {
this.dynamicInsertQueueSize = dynamicInsertQueueSize;
}
public int getSqlLogDynamicQueueSize() {
return sqlLogDynamicQueueSize;
}
public void setSqlLogDynamicQueueSize(int sqlLogDynamicQueueSize) {
this.sqlLogDynamicQueueSize = sqlLogDynamicQueueSize;
}
public int getSqlLogDynamicLastQueueSize() {
return sqlLogDynamicLastQueueSize;
}
public void setSqlLogDynamicLastQueueSize(int sqlLogDynamicLastQueueSize) {
this.sqlLogDynamicLastQueueSize = sqlLogDynamicLastQueueSize;
}
public int getSqlLogStaticQueueSize() {
return sqlLogStaticQueueSize;
}
public void setSqlLogStaticQueueSize(int sqlLogStaticQueueSize) {
this.sqlLogStaticQueueSize = sqlLogStaticQueueSize;
}
public void initZero() {
rcvPos = 0;
rcvStat = 0;
savePos = 0;
saveLast = 0;
saveStatic = 0;
failPos = 0;
failLast = 0;
failStatic = 0;
}
public String getCountLog() {
StringBuilder sb = new StringBuilder();
sb.append( "\r\n");
sb.append( "===============================================\r\n" );
sb.append( " Count Info \r\n" );
sb.append( "===============================================\r\n" );
sb.append( " 위치정보 \r\n" );
sb.append( "-----------------------------------------------\r\n" );
sb.append( " 수신 / 저장성공 / 저장실패 / 제외 / \r\n" );
sb.append( "-----------------------------------------------\r\n" );
sb.append( " "+String.format(" %5s", rcvPos)+" / "+String.format(" %5s", savePos)+" / "+String.format(" %5s", failPos)+" / "+String.format(" %5s", exceptPos)+"\r\n" );
sb.append( "===============================================\r\n" );
sb.append( " 정적정보 \r\n" );
sb.append( "-----------------------------------------------\r\n" );
sb.append( " 수신 / 저장성공 / 저장실패 / \r\n" );
sb.append( "-----------------------------------------------\r\n" );
sb.append( " "+String.format(" %5s", rcvStat)+" / "+String.format(" %5s", saveStatic)+" / "+String.format(" %5s", failStatic)+" / \r\n");
sb.append( "===============================================\r\n");;
sb.append( " 최종위치정보 \r\n" );
sb.append( "-----------------------------------------------\r\n" );
sb.append( " 저장성공 / 저장실패 / \r\n" );
sb.append( "-----------------------------------------------\r\n" );
sb.append( " "+String.format(" %5s", saveLast)+" / "+String.format(" %5s", failLast)+" / \r\n");
sb.append( "===============================================\r\n");
sb.append( " 큐 상태 \r\n" );
sb.append( "-----------------------------------------------\r\n" );
sb.append( " 수신 / 동적 / 동적(입력) / 정적 / \r\n" );
sb.append( "-----------------------------------------------\r\n" );
sb.append( " "+String.format(" %5s", rcvQueueSize)+" / "+String.format(" %5s", dynamicVoQueueSize)+" / "+String.format(" %5s", dynamicInsertQueueSize)+" / "+String.format(" %5s", staticVoQueueSize)+" / \r\n");
sb.append( "-----------------------------------------------\r\n" );
sb.append( " SQL \r\n" );
sb.append( "-----------------------------------------------\r\n" );
sb.append( " 동적 / 최종 / 정적 / \r\n" );
sb.append( "-----------------------------------------------\r\n" );
sb.append( " "+String.format(" %5s", sqlLogDynamicQueueSize)+" / "+String.format(" %5s", sqlLogDynamicLastQueueSize)+" / "+String.format(" %5s", sqlLogStaticQueueSize)+" / \r\n");
sb.append( "===============================================\r\n");
//
// private int rcvQueueSize=0;
// private int dynamicVoQueueSize=0;
// private int staticVoQueueSize=0;
// private int dynamicInsertQueueSize=0;
// private int sqlLogDynamicQueueSize=0;
// private int sqlLogDynamicLastQueueSize=0;
// private int sqlLogStaticQueueSize=0;
initZero();
return sb.toString();
}
}

View File

@ -1,86 +0,0 @@
package kr.gmtc.tss.status;
import org.springframework.stereotype.Component;
@Component
public class CountingComponent {
private int rcvPos=0;
private int rcvStatic=0;
private int savePos=0;
private int failPos=0;
private int saveStatic=0;
private int failStatic=0;
public int getRcvPos() {
return rcvPos;
}
public void setRcvPos(int rcvPos) {
this.rcvPos = rcvPos;
}
public int getRcvStatic() {
return rcvStatic;
}
public void setRcvStatic(int rcvStatic) {
this.rcvStatic = rcvStatic;
}
public int getSavePos() {
return savePos;
}
public void setSavePos(int savePos) {
this.savePos = savePos;
}
public int getFailPos() {
return failPos;
}
public void setFailPos(int failPos) {
this.failPos = failPos;
}
public int getSaveStatic() {
return saveStatic;
}
public void setSaveStatic(int saveStatic) {
this.saveStatic = saveStatic;
}
public int getFailStatic() {
return failStatic;
}
public void setFailStatic(int failStatic) {
this.failStatic = failStatic;
}
public void initZero() {
rcvPos=0;
rcvStatic=0;
savePos=0;
failPos=0;
saveStatic=0;
failStatic=0;
}
public String getCountLog() {
String log = "\r\n"
+ "===============================================\r\n"
+ " Count Info \r\n"
+ "===============================================\r\n"
+ " 위치정보 \r\n"
+ "-----------------------------------------------\r\n"
+ " 수신 / 저장성공 / 저장실패 \r\n"
+ "-----------------------------------------------\r\n"
+ " "+String.format(" %5s", rcvPos)+" / "+String.format(" %5s", savePos)+" / "+String.format(" %5s", failPos)+"\r\n"
+ "===============================================\r\n"
+ " 정적정보 \r\n"
+ "-----------------------------------------------\r\n"
+ " 수신 / 저장성공 / 저장실패 \r\n"
+ "-----------------------------------------------\r\n"
+ " "+String.format(" %5s", rcvStatic)+" / "+String.format(" %5s", saveStatic)+" / "+String.format(" %5s", failStatic)+"\r\n"
+ "===============================================\r\n";
initZero();
return log;
}
}

View File

@ -1,21 +1,25 @@
server: server:
port: 8090 port: 8090
spring: # spring:
# datasource:
# hikari:
# maximum-pool-size: 1
# minimum-idle: 1
#main: #main:
#web-application-type: none #web-application-type: none
db: # db:
#db-type:oracle,mariadb(other - coding more!) # #db-type:oracle,mariadb(other - coding more!)
#db-type: oracle # #db-type: oracle
batch-insert-size: 1000 # batch-insert-size: 1000
datasource: # datasource:
#jdbc-url: jdbc:tibero:thin:@118.220.143.174:18629:SACP_T_DB # #jdbc-url: jdbc:tibero:thin:@118.220.143.174:18629:SACP_T_DB
jdbc-url: jdbc:tibero:thin:@10.200.31.4:8629:sacp #공항 # jdbc-url: jdbc:tibero:thin:@10.200.31.4:8629:sacp #공항
driver-class-name: com.tmax.tibero.jdbc.TbDriver # driver-class-name: com.tmax.tibero.jdbc.TbDriver
username: UTRACK # username: UTRACK
password: UTRACK # password: UTRACK
connectionTimeout: 5000 # connectionTimeout: 5000
maxLifetime: 30000 # maxLifetime: 30000
client: client:
@ -56,9 +60,9 @@ process:
status-check-cron: 0/10 * * * * * status-check-cron: 0/10 * * * * *
flag: flag:
database: false # database: false
datafile: true #logstash data log file, Log 포멧 datafile: true #logstash data log file, Log 포멧
file: true #rest api log file, Json # file: true #rest api log file, Json
file-zip-path: ./logs/file file-zip-path: ./logs/file
file-zip-bak-path: ./logs/backup file-zip-bak-path: ./logs/backup
@ -72,11 +76,22 @@ data-make-time: 10
kafka: kafka:
bootstrapServers: http://10.200.31.6:9091,http://10.200.31.8:9091,http://10.200.31.142:9091 #공항 bootstrapServers: http://10.200.31.6:9091,http://10.200.31.8:9091,http://10.200.31.142:9091 #공항
groupId: TssTopicReader_TEST groupId: TssTopicReader_TEST
topic: # swclassification 동일하게 번호 설정 topics: # swclassification 동일하게 번호 설정
- 9: ic.tracking.ptz 1: ic.recv.asde
2: ai.analyze.location
3: ic.tracking.fusion
4: ic.recv.radar
5: ai.analyze.video
6: ai.analyze.voice
7: ic.analyze.situation
8: ic.analyze.stand.status
9: ic.tracking.ptz
10: ic.service.route
11: ai.analyze.ltea
12: ic.tracking.ptz.manual
classification: 1 #이중화되어 있는 서버의 Index classification: 1 #이중화되어 있는 서버의 Index
swclassification: 9 #sw구분 swclassification: 6, 11 #sw구분
logclassification: 1 #log구분 logclassification: 1 #log구분
state: state: