main
MonHun 2024-04-27 17:07:46 +09:00
parent b2856d00ee
commit faacc19a17
13 changed files with 1285 additions and 59 deletions

1
application.pid 100644
View File

@ -0,0 +1 @@
22952

Binary file not shown.

1040
logs/trace.log 100644

File diff suppressed because it is too large Load Diff

45
pom.xml
View File

@ -9,7 +9,6 @@
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>kr.gmtc.gw</groupId>
<!-- <artifactId>socket</artifactId> -->
<artifactId>asderecv</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>EyeGW_AsdeRecv</name>
@ -28,37 +27,31 @@
</properties>
<dependencies>
<!-- <dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency> -->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<!-- <exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions> -->
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- 서버간 상태 체크 -->
<dependency>
<groupId>kr.gmt.so</groupId>
<artifactId>state-spring-boot-starter</artifactId>
<version>1.0.3</version>
<version>1.0.5</version>
<scope>system</scope>
<systemPath>${basedir}/lib/state-spring-boot-starter-1.0.3.jar</systemPath>
<systemPath>${basedir}/lib/state-spring-boot-starter-1.0.5.jar</systemPath>
</dependency>
<!-- <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency> -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
@ -66,29 +59,15 @@
<scope>provided</scope>
</dependency>
<!-- @Resource 사용 -->
<!-- <dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency> -->
<!-- IPWorks -->
<dependency>
<groupId>ipworks.local</groupId>
<artifactId>ipworks-local-1.0.0</artifactId>
<scope>system</scope>
<version>1.0.0</version>
<systemPath>${basedir}/lib/ipworks/local/1.0.0/ipworks-local-1.0.0.jar</systemPath>
<systemPath>${basedir}/lib/ipworks-local-1.0.0.jar</systemPath>
</dependency>
<!-- guava - RateLimiter -->
<!-- <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency> -->
<!-- data format -->
<dependency>
<groupId>org.yaml</groupId>
@ -107,7 +86,7 @@
</dependencies>
<build>
<finalName>EyeGW_AsdeRecv-0.0.1</finalName>
<finalName>EyeGW_AsdeRecv</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>

View File

@ -12,6 +12,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import kr.gmtc.gw.asderecv.kafka.producer.KafkaProducer;
import kr.gmtc.gw.asderecv.rest.vo.ServiceAsdeData;
public class AsterixParserThread {
@ -32,8 +33,10 @@ public class AsterixParserThread {
DateTimeFormatter dfPattern = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
KafkaProducer kafkaProducer;
@SuppressWarnings("unused")
public AsterixParserThread(Queue<String> packetQ, HashMap<Integer, Queue<ServiceAsdeData>> serviceQueue) {
public AsterixParserThread(Queue<String> packetQ, HashMap<Integer, Queue<ServiceAsdeData>> serviceQueue, KafkaProducer kafkaProducer) {
thread = new Thread(new Runnable() {
@Override
public void run() {
@ -75,13 +78,20 @@ public class AsterixParserThread {
if(!serviceMap.isEmpty()) {
for(int idx=0; idx<serviceMap.size(); idx++) {
List<ServiceAsdeData> srcList = new ArrayList<ServiceAsdeData>();
srcList.addAll(serviceMap);
serviceMap.clear();
kafkaProducer.sendASDEMessage(srcList);
for(int idx=0; idx<srcList.size(); idx++) {
for(int qi=0; qi<scvQcount; qi++) {
serviceQueue.get(qi).offer(serviceMap.get(idx));
serviceQueue.get(qi).offer(srcList.get(idx));
}
}
serviceMap.clear();
}
}

View File

@ -6,12 +6,15 @@ import java.io.InputStreamReader;
import java.io.Reader;
import java.util.LinkedHashMap;
import org.json.simple.JSONObject;
// import org.json.simple.JSONObject;
import org.yaml.snakeyaml.Yaml;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import ipworks.Json;
import kr.gmtc.gw.asderecv.asde.asterixSpec.vo.AsterixSpecVO;
public class YamlFileLoader {
@ -42,7 +45,7 @@ public class YamlFileLoader {
LinkedHashMap<String, Object> propMap = new Yaml().load(reader);
// 파일내용을 JSON 포멧으로 변환
JSONObject jsonObject = new JSONObject(propMap);
// JSONObject jsonObject = new JSONObject(propMap);
// JSON 데이터를 Object로 변환
ObjectMapper objectMapper = new ObjectMapper();
@ -51,7 +54,9 @@ public class YamlFileLoader {
objectMapper.enable(DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT);
objectMapper.configure(DeserializationFeature.FAIL_ON_MISSING_EXTERNAL_TYPE_ID_PROPERTY, false);
returnObj = objectMapper.readValue(jsonObject.toJSONString(), cVO);
//returnObj = objectMapper.readValue(jsonObject.toJSONString(), cVO);
returnObj = objectMapper.convertValue(propMap, AsterixSpecVO.class );
} catch (Exception e) {
e.printStackTrace();

View File

@ -1,8 +1,9 @@
package kr.gmtc.gw.asderecv.controller;
import kr.gmt.so.state.StateManager;
import kr.gmt.so.state.callback.IState;
import kr.gmt.so.state.model.SystemState;
import kr.gmtc.gw.asderecv.asde.parser.AsterixParserThread;
import kr.gmtc.gw.asderecv.kafka.producer.KafkaProducer;
import kr.gmtc.gw.asderecv.rest.ServiceQueManager;
import kr.gmtc.gw.asderecv.rest.vo.ServiceAsdeData;
import kr.gmtc.gw.asderecv.utils.CmmnUtil;
@ -31,6 +32,10 @@ import javax.annotation.Resource;
@Component("controller")
public class MainController implements ApplicationListener<ContextClosedEvent>{
protected Logger logger;
protected UDPManager udpManager;
Queue<String> packetQ;
HashMap<Integer, Queue<ServiceAsdeData>> serviceQueue;
@ -55,18 +60,19 @@ public class MainController implements ApplicationListener<ContextClosedEvent>{
@Resource(name = "serviceRunnig")
private boolean serviceRunnig = false;
protected UDPManager udpManager;
private UDPEventListener udpListener;
private AsterixParserThread asterixParserThread;
private ServiceQueManager serviceQueManager;
protected Logger logger;
@Autowired
private StateManager stateMgr;
@Autowired
private KafkaProducer kafkaProducer;
protected MainController(Queue<String> packetQ, HashMap<Integer, Queue<ServiceAsdeData>> serviceQueue){
this.packetQ = packetQ;
@ -82,7 +88,7 @@ public class MainController implements ApplicationListener<ContextClosedEvent>{
udpListener = createUDPEventListener();
udpManager = new UDPManager(path, udpListener);
asterixParserThread = new AsterixParserThread(packetQ, serviceQueue);
asterixParserThread = new AsterixParserThread(packetQ, serviceQueue, kafkaProducer);
asterixParserThread.setScvQcount(scvQcount);
asterixParserThread.setDebugLogMode(debugLogMode);
@ -114,7 +120,7 @@ public class MainController implements ApplicationListener<ContextClosedEvent>{
@Override
public void udpDataIn(MsgObjVO vo) {
recvPacketAdd(vo);
stateMgr.updateState();
stateMgr.updateState(SystemState.Normal);
}
@Override

View File

@ -0,0 +1,41 @@
package kr.gmtc.gw.asderecv.kafka.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration("KafkaConfig")
public class KafkaConfig {
@Value("${kafka.bootstrapAddress}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactoryString() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplateString() {
return new KafkaTemplate<>(producerFactoryString());
}
}

View File

@ -0,0 +1,143 @@
package kr.gmtc.gw.asderecv.kafka.producer;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import kr.gmt.so.state.StateManager;
import kr.gmtc.gw.asderecv.rest.vo.SacpServiceHeader;
import kr.gmtc.gw.asderecv.rest.vo.SacpServiceVO;
import kr.gmtc.gw.asderecv.rest.vo.ServiceAsdeData;
// import gmt.common.type.LogLevelType;
// import gmt.logger.GmtLogManager;
// import kr.gmtc.eyegw.rest.vo.ServiceAsdeData;
@Component("kafkaProducer")
public class KafkaProducer {
private Logger logger = LoggerFactory.getLogger(this.getClass());;
@Value("${kafka.message.topic.recv.asde}")
private String topicName;
@Resource(name = "serviceQueue")
HashMap<Integer, Queue<ServiceAsdeData>> sendQueue;
@Autowired
private KafkaTemplate<String, String> kafkaTemplateString;
ObjectMapper mapper = new ObjectMapper();
String sSendMsg = "";
boolean loopYn = true;
@Autowired
private StateManager stateMgr;
//@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void sendASDEMessage(List<ServiceAsdeData> srcList) {
String sendMsg = "";
int qidx = 2;
stateMgr.updateState();
List<ServiceAsdeData> _srcList = new ArrayList<ServiceAsdeData>();
_srcList.addAll(srcList);
if(stateMgr.isActive()){
//logger.info("************************** serviceList Count : " + sendQueue.get(qidx).size());
long qSize = _srcList.size();
String logMsg = qSize + "건";
String sFirstDate = null, sEndDate = null;
if(qSize > 0){
loopYn = false;
//sendMsg = makeServiceData(sendQueue.get(qidx));
sendMsg = makeServiceData(_srcList);
loopYn = true;
sFirstDate = _srcList.get(0).getRecptn_dt();
sEndDate = _srcList.get((int) (qSize -1)).getRecptn_dt();
}
if(sendMsg != null && !sendMsg.equals("")){
kafkaTemplateString.send(topicName, sendMsg);
logger.info("kafka Produce complate(" + logMsg + " | " + sFirstDate + " | " + sEndDate + ")" );
}
}
}
//private String makeServiceData(Queue<ServiceAsdeData> procQue){
private String makeServiceData(List<ServiceAsdeData> srcList){
String sendCode, sendMsg ;
SacpServiceHeader jsonHeader = new SacpServiceHeader();
List<ServiceAsdeData> jsonData = new ArrayList<ServiceAsdeData>();
sendCode = "200";
sendMsg = "";
SacpServiceVO sacpServiceVO = new SacpServiceVO();
for(int idx=0; idx<srcList.size(); idx++) {
jsonData.add(srcList.get(idx));
}
jsonHeader.setResult_code(sendCode);
jsonHeader.setResult_msg(sendMsg);
sacpServiceVO.setHeader(jsonHeader);
if (sendCode.equals("200")) {
sacpServiceVO.setData(jsonData);
//logger.info("Que["+(queIdx + 1)+"] service count :" + jsonData.size() + "/" + qTotalSize);
}
String sRetJsonData = "";
try {
sRetJsonData = mapper.writeValueAsString(sacpServiceVO);
} catch (JsonProcessingException e) {
logger.error("makeServiceData-JsonProcessingException : " + e);
}
return sRetJsonData;
}
public String getTime() {
return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
}
}

View File

@ -25,7 +25,7 @@ asde:
state:
# 공통코드 CT001의 코드 6자리
id: LK0401
id: TEST01
# 1:Primary, 2:Secondary
type: Primary
@ -62,10 +62,11 @@ database:
kafka:
settings:
bootstrapAddress: 118.220.143.175:9091,118.220.143.175:9091,118.220.143.176:9092
consumer:
group-id: testgroup
bootstrapAddress: 10.200.31.6:9091,10.200.31.8:9091,10.200.31.142:9091
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
message:
topic:
recv:
asde: ic.recv.asde_test

View File

@ -63,7 +63,7 @@
<!-- 로그파일 최대 보관주기, fileNamePattern 에 따라 일별, 월별, 년별-->
<maxHistory>10</maxHistory>
<!-- 아카이브 최대 용량 maxHistory 이후에 적용됨 -->
<totalSizeCap>100mb</totalSizeCap>
<totalSizeCap>100GB</totalSizeCap>
<!-- 시작시 정책 적용 여부 -->
<CleanHistoryOnStart>true</CleanHistoryOnStart>
</rollingPolicy>
@ -88,20 +88,20 @@
<!-- 로그 전역 세팅 -->
<!-- <root level="${LOG_LEVEL}">
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE"/>
</root> -->
</root>
<logger name="kr.gmtc.gw" level="TRACE">
<!-- <logger name="kr.gmtc.gw" level="TRACE">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE"/>
</logger>
</logger> -->
<logger name="kr.gmt.so" level="INFO">
<!-- <logger name="kr.gmt.so" level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE"/>
</logger>
</logger> -->
<!-- <logger name="kr.gmtc.comp.status" level="TRACE">
<appender-ref ref="CONSOLE"/>