博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafaka用例&市上最全总结
阅读量:4049 次
发布时间:2019-05-25

本文共 5744 字,大约阅读时间需要 19 分钟。

Kafka史上最详细原理总结 :

<bean id="connectionManager" class="com.suning.retailcloud.ywbs.kafka.KafkaConsumer" init-method="init" />

import java.util.HashMap;

import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import org.apache.commons.lang.StringUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import com.alibaba.fastjson.JSON;

import com.alibaba.fastjson.JSONObject;
import com.suning.rsf.consumer.ServiceAgent;
import com.suning.rsf.consumer.ServiceLocator;

public class KafkaConsumer implements Runnable{

    
    private final static Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
    
    public KafkaConsumer(){}//NOSONAR
    
    private KafkaStream<byte[], byte[]> stream;
    public KafkaConsumer(KafkaStream<byte[], byte[]> stream, SendYxMsg2WebChatWindQSend sendYxMsg2WebChatWindQSend, YWBridgeServiceDao ywBridgeServiceDao) {
        this.stream = stream;
        this.ywBridgeServiceDao = ywBridgeServiceDao;
        this.sendYxMsg2WebChatWindQSend = sendYxMsg2WebChatWindQSend;
    }
    
/*    private YwbsSendEsbService ywbsSendEsbService;
    
    @EsbEIWired
    public void setYwbsSendEsbService(YwbsSendEsbService ywbsSendEsbService) {
        this.ywbsSendEsbService = ywbsSendEsbService;
    }*/
    
    @Autowired
    private SendYxMsg2WebChatWindQSend sendYxMsg2WebChatWindQSend;
    
    @Autowired
    private YWBridgeServiceDao ywBridgeServiceDao;

    /**
     * 从 kafka 中读取 message
     */
    @SuppressWarnings("unchecked")
    public void next() {//NOSONAR
        ConsumerIterator<byte[], byte[]> iter = stream.iterator();    
        
        while (iter.hasNext()) {
            try {
                byte[] item = iter.next().message();
                String msg = new String(item);//NOSONAR
                LOGGER.info("KafkaConsumer get msg"+ msg);
                
                if(!StringUtils.isEmpty(msg)){
                                    
                    //记录发送消息
                    MsgRecordDTO msgRecordDTO = new MsgRecordDTO();
                    msgRecordDTO.setMsgTYPE(1);
                    msgRecordDTO.setSnCustNumU("");
                    msgRecordDTO.setContent(msg);
                    ywBridgeServiceDao.insertMSG(msgRecordDTO);
                    
                    //解析报文
                    YxResponseMsg responseMsg = JSONObject.parseObject(msg, YxResponseMsg.class);
                    LOGGER.info("KafkaConsumer parse msg"+ JSONObject.toJSONString(responseMsg));
                    if(null!=responseMsg && null!=responseMsg.getHead() && null!=responseMsg.getBody()){
                        DialogInfoDTO infoDTO =  ywBridgeServiceDao.queryRecentCommunication(responseMsg.getHead().getTo());
                        LOGGER.info("KafkaConsumer queryRecentCommunication"+ JSONObject.toJSONString(infoDTO));
                        //判断会话ID是否一致
                        if(null!=infoDTO &&
                                responseMsg.getHead().getFrom().split("@")[0].equals(infoDTO.getSnCustNumS()) &&
                                "SNPOS".equals(responseMsg.getHead().getFappCode()) &&
                                "100".equals(responseMsg.getBody().getMsgType())){
                            //组装MQ报文
                            YxToWxDTO dto = new YxToWxDTO();
                            dto.setSnCustNumU(responseMsg.getHead().getTo());
                            dto.setOpenId(infoDTO.getOpenId());
                            dto.setStoreCode(infoDTO.getStoreCode());
                            dto.setStoreName(infoDTO.getStoreName());
                            dto.setSnCustNumS(infoDTO.getSnCustNumS());
                            dto.setSnCustNumSName(infoDTO.getStaffName());
                            dto.setMsgType(responseMsg.getBody().getMsgType());
                            dto.setMsgContent(responseMsg.getBody().getMsgCentent());
                            //发送消息到MQ
                            sendYxMsg2WebChatWindQSend.sendMessage(JSON.toJSONString(dto));    
                            LOGGER.info("KafkaConsumer ywbsSendEsbService.send"+ JSONObject.toJSONString(dto));
                        }else if(null!=infoDTO &&
                                responseMsg.getHead().getFrom().split("@")[0].equals(infoDTO.getSnCustNumS()) &&
                                "SNPOS".equals(responseMsg.getHead().getFappCode()) &&
                                !"100".equals(responseMsg.getBody().getMsgType())){
                            ServiceAgent msgService = ServiceLocator.getServiceAgent("com.suning.yunxin.rsf.MsgService",
                                    "msgServiceImpl");             
                            //调用云信发送消息                        
                            MessageVO messageVO = new MessageVO();
                            //msgId
                            messageVO.setMsgID(UUID.randomUUID().toString());
                            //消息类型
                            messageVO.setMsgType("100");
                            //会话类型
                            messageVO.setChatType("3");
                            //公司
                            messageVO.setCompanyId("");
                            //会话ID
                            messageVO.setChatId(infoDTO.getChatId());
                            //消息接收者
                            messageVO.setTo(infoDTO.getSnCustNumS());
                            //消息发送者
                            messageVO.setFrom(infoDTO.getSnCustNumU());
                            //消息发送者APPCode
                            messageVO.setFappCode("YWQ");
                            messageVO.setTappCode("SNPOS");
                            //消息内容
                            messageVO.setMsgCentent("暂不支持发送图片、语音和视频,程序员小哥努力开发中");
                            Map<String, Object> msgMap = new HashMap<>();
                            msgMap.put("message", JSON.toJSONString(messageVO));    
                            
                            LOGGER.info("next sengMsg param start"+ msgMap);
                            //发送消息接口
                            Map<String, Object> msgResultMap = (Map<String, Object>)msgService.invoke("sengMsg",
                                    new Object[]{msgMap}, new Class[]{Map.class});
                            LOGGER.info("next sengMsg end"+ msgResultMap);
                        }
                    }            
                }
            }  catch (Exception e) {
                LOGGER.error("consumer error", e);
            }
        }
    }
    
    public void init(){
        String topicName = ConfigUtils.getInstance().getString("kafka.fc.topic");
        String zkRoot = ConfigUtils.getInstance().getString("kafka.zookeeper.nodes");
        String groupId = ConfigUtils.getInstance().getString("kafka.groupId");
        
        Properties properties = new Properties();
        properties.put("zookeeper.connect", zkRoot);
        properties.put("group.id", groupId);
        properties.put("auto.offset.reset", "largest");
        ConsumerConfig config = new ConsumerConfig(properties);
        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
        Map<String, Integer> topicCountMap = new HashMap<>();
        topicCountMap.put(topicName, 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        //取出对应的 streams
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topicName);
        //创建一个线程池
        ExecutorService executor = Executors.newCachedThreadPool();
        //创建consumer threads
        for (int i = 0; i < streams.size(); i++) {
            executor.execute(new KafkaConsumer(streams.get(i), sendYxMsg2WebChatWindQSend , ywBridgeServiceDao));
        }
    }

    @Override

    public void run() {
        next();
    }

}

 

转载地址:http://oinci.baihongyu.com/

你可能感兴趣的文章
No.174 - LeetCode1305 - 合并两个搜索树
查看>>
No.175 - LeetCode1306
查看>>
No.176 - LeetCode1309
查看>>
No.182 - LeetCode1325 - C指针的魅力
查看>>
mysql:sql alter database修改数据库字符集
查看>>
mysql:sql truncate (清除表数据)
查看>>
yuv to rgb 转换失败呀。天呀。谁来帮帮我呀。
查看>>
yuv420 format
查看>>
yuv420 还原为RGB图像
查看>>
LED恒流驱动芯片
查看>>
驱动TFT要SDRAM做为显示缓存
查看>>
使用file查看可执行文件的平台性,x86 or arm ?
查看>>
qt 创建异形窗体
查看>>
简单Linux C线程池
查看>>
内存池
查看>>
GNU hello代码分析
查看>>
Qt继电器控制板代码
查看>>
wpa_supplicant控制脚本
查看>>
gstreamer相关工具集合
查看>>
RS232 四入四出模块控制代码
查看>>