本文共 5744 字,大约阅读时间需要 19 分钟。
Kafka史上最详细原理总结 :
<bean id="connectionManager" class="com.suning.retailcloud.ywbs.kafka.KafkaConsumer" init-method="init" />
import java.util.HashMap;
import java.util.List; import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector;import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.suning.rsf.consumer.ServiceAgent; import com.suning.rsf.consumer.ServiceLocator;public class KafkaConsumer implements Runnable{
private final static Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class); public KafkaConsumer(){}//NOSONAR private KafkaStream<byte[], byte[]> stream; public KafkaConsumer(KafkaStream<byte[], byte[]> stream, SendYxMsg2WebChatWindQSend sendYxMsg2WebChatWindQSend, YWBridgeServiceDao ywBridgeServiceDao) { this.stream = stream; this.ywBridgeServiceDao = ywBridgeServiceDao; this.sendYxMsg2WebChatWindQSend = sendYxMsg2WebChatWindQSend; } /* private YwbsSendEsbService ywbsSendEsbService; @EsbEIWired public void setYwbsSendEsbService(YwbsSendEsbService ywbsSendEsbService) { this.ywbsSendEsbService = ywbsSendEsbService; }*/ @Autowired private SendYxMsg2WebChatWindQSend sendYxMsg2WebChatWindQSend; @Autowired private YWBridgeServiceDao ywBridgeServiceDao; /** * 从 kafka 中读取 message */ @SuppressWarnings("unchecked") public void next() {//NOSONAR ConsumerIterator<byte[], byte[]> iter = stream.iterator(); while (iter.hasNext()) { try { byte[] item = iter.next().message(); String msg = new String(item);//NOSONAR LOGGER.info("KafkaConsumer get msg"+ msg); if(!StringUtils.isEmpty(msg)){ //记录发送消息 MsgRecordDTO msgRecordDTO = new MsgRecordDTO(); msgRecordDTO.setMsgTYPE(1); msgRecordDTO.setSnCustNumU(""); msgRecordDTO.setContent(msg); ywBridgeServiceDao.insertMSG(msgRecordDTO); //解析报文 YxResponseMsg responseMsg = JSONObject.parseObject(msg, YxResponseMsg.class); LOGGER.info("KafkaConsumer parse msg"+ JSONObject.toJSONString(responseMsg)); if(null!=responseMsg && null!=responseMsg.getHead() && null!=responseMsg.getBody()){ DialogInfoDTO infoDTO = ywBridgeServiceDao.queryRecentCommunication(responseMsg.getHead().getTo()); LOGGER.info("KafkaConsumer queryRecentCommunication"+ JSONObject.toJSONString(infoDTO)); //判断会话ID是否一致 if(null!=infoDTO && responseMsg.getHead().getFrom().split("@")[0].equals(infoDTO.getSnCustNumS()) && "SNPOS".equals(responseMsg.getHead().getFappCode()) && "100".equals(responseMsg.getBody().getMsgType())){ //组装MQ报文 YxToWxDTO dto = new YxToWxDTO(); dto.setSnCustNumU(responseMsg.getHead().getTo()); dto.setOpenId(infoDTO.getOpenId()); dto.setStoreCode(infoDTO.getStoreCode()); dto.setStoreName(infoDTO.getStoreName()); dto.setSnCustNumS(infoDTO.getSnCustNumS()); dto.setSnCustNumSName(infoDTO.getStaffName()); dto.setMsgType(responseMsg.getBody().getMsgType()); dto.setMsgContent(responseMsg.getBody().getMsgCentent()); //发送消息到MQ sendYxMsg2WebChatWindQSend.sendMessage(JSON.toJSONString(dto)); LOGGER.info("KafkaConsumer ywbsSendEsbService.send"+ JSONObject.toJSONString(dto)); }else if(null!=infoDTO && responseMsg.getHead().getFrom().split("@")[0].equals(infoDTO.getSnCustNumS()) && "SNPOS".equals(responseMsg.getHead().getFappCode()) && !"100".equals(responseMsg.getBody().getMsgType())){ ServiceAgent msgService = ServiceLocator.getServiceAgent("com.suning.yunxin.rsf.MsgService", "msgServiceImpl"); //调用云信发送消息 MessageVO messageVO = new MessageVO(); //msgId messageVO.setMsgID(UUID.randomUUID().toString()); //消息类型 messageVO.setMsgType("100"); //会话类型 messageVO.setChatType("3"); //公司 messageVO.setCompanyId(""); //会话ID messageVO.setChatId(infoDTO.getChatId()); //消息接收者 messageVO.setTo(infoDTO.getSnCustNumS()); //消息发送者 messageVO.setFrom(infoDTO.getSnCustNumU()); //消息发送者APPCode messageVO.setFappCode("YWQ"); messageVO.setTappCode("SNPOS"); //消息内容 messageVO.setMsgCentent("暂不支持发送图片、语音和视频,程序员小哥努力开发中"); Map<String, Object> msgMap = new HashMap<>(); msgMap.put("message", JSON.toJSONString(messageVO)); LOGGER.info("next sengMsg param start"+ msgMap); //发送消息接口 Map<String, Object> msgResultMap = (Map<String, Object>)msgService.invoke("sengMsg", new Object[]{msgMap}, new Class[]{Map.class}); LOGGER.info("next sengMsg end"+ msgResultMap); } } } } catch (Exception e) { LOGGER.error("consumer error", e); } } } public void init(){ String topicName = ConfigUtils.getInstance().getString("kafka.fc.topic"); String zkRoot = ConfigUtils.getInstance().getString("kafka.zookeeper.nodes"); String groupId = ConfigUtils.getInstance().getString("kafka.groupId"); Properties properties = new Properties(); properties.put("zookeeper.connect", zkRoot); properties.put("group.id", groupId); properties.put("auto.offset.reset", "largest"); ConsumerConfig config = new ConsumerConfig(properties); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put(topicName, 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); //取出对应的 streams List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topicName); //创建一个线程池 ExecutorService executor = Executors.newCachedThreadPool(); //创建consumer threads for (int i = 0; i < streams.size(); i++) { executor.execute(new KafkaConsumer(streams.get(i), sendYxMsg2WebChatWindQSend , ywBridgeServiceDao)); } }@Override
public void run() { next(); }}
转载地址:http://oinci.baihongyu.com/