首页 > SpringBoot实战(十四)之整合KafKa

SpringBoot实战(十四)之整合KafKa

 本人今天上午参考了不少博文,发现不少博文不是特别好,不是因为依赖冲突问题就是因为版本问题。

于是我结合相关的博文和案例,自己改写了下并参考了下,于是就有了这篇文章。希望能够给大家帮助,少走一些弯路。

 

一、KafKa的介绍

1.主要功能

根据官网的介绍,ApacheKafka®是一个分布式流媒体平台,它主要有3种功能:

  a.发布和订阅消息流,这个功能类似于消息队列,这也是kafka归类为消息队列框架的原因。

  b.以容错的方式记录消息流,kafka以文件的方式来存储消息流。

  c.可以再消息发布的时候进行处理。

 

2.使用场景

a.在系统或应用程序之间构建可靠的用于传输实时数据的管道,消息队列功能。

b.构建实时的流数据处理程序来变换或处理数据流,数据处理功能。

 

3.详细介绍

 Kafka目前主要作为一个分布式的发布订阅式的消息系统使用,下面简单介绍一下kafka的基本机制

消息传输过程:

 

Producer即生产者,向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic,上图展示了两个producer发送了分类为topic1的消息,另外一个发送了topic2的消息。

 

Topic即主题,通过对消息指定主题可以将消息分类,消费者可以只关注自己需要的Topic中的消息

 

Consumer即消费者,消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取消息,然后可以对这些消息进行处理。

 

二、安装

安装包下载地址:http://kafka.apache.org/downloads

找到0.11.0.1版本,如图:

1.下载

wget https://archive.apache.org/dist/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz

 

2.解压

tar -xzvf kafka_2.11-0.11.0.1.tgz

配置说明:

    consumer.properites 消费者配置,这个配置文件用于配置开启的消费者,此处我们使用默认的即可。

    producer.properties 生产者配置,这个配置文件用于配置开启的生产者,此处我们使用默认的即可。

  server.properties kafka服务器的配置,此配置文件用来配置kafka服务器,目前仅介绍几个最基础的配置。

       a.broker.id 申明当前kafka服务器在集群中的唯一ID,需配置为integer,并且集群中的每一个kafka服务器的id都应是唯一的,我们这里采用默认配置即可。

       b.listeners 申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置,

例如:listeners=PLAINTEXT:// 192.168.126.143:9092。并确保服务器的9092端口能够访问。

  c.zookeeper.connect 申明kafka所连接的zookeeper的地址 ,需配置为zookeeper的地址,由于本次使用的是kafka高版本中自带zookeeper,

使用默认配置即可,zookeeper.connect=localhost:2181。

 

3.运行

首先运行zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

运行成功,显示如图:

 

然后运行kafka

bin/kafka-server-start.sh config/server.properties

 运行成功,显示如图:

 

三、整合KafKa

1.新建Maven项目导入Maven依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0modelVersion><groupId>cn.testgroupId><artifactId>kafka_demoartifactId><version>0.0.1-SNAPSHOTversion><parent><groupId>org.springframework.bootgroupId><artifactId>spring-boot-starter-parentartifactId><version>1.5.9.RELEASEversion><relativePath/> parent><properties><project.build.sourceEncoding>UTF-8project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8project.reporting.outputEncoding><java.version>1.8java.version>properties><dependencies><dependency><groupId>org.springframework.bootgroupId><artifactId>spring-boot-starter-webartifactId>dependency><dependency><groupId>org.projectlombokgroupId><artifactId>lombokartifactId><optional>trueoptional>dependency><dependency><groupId>org.springframework.bootgroupId><artifactId>spring-boot-starter-testartifactId><scope>testscope>dependency><dependency><groupId>org.springframework.kafkagroupId><artifactId>spring-kafkaartifactId><version>1.1.1.RELEASEversion>dependency><dependency><groupId>com.google.code.gsongroupId><artifactId>gsonartifactId><version>2.8.2version>dependency>dependencies><build><plugins><plugin><groupId>org.springframework.bootgroupId><artifactId>spring-boot-maven-pluginartifactId>plugin><plugin><groupId>org.apache.maven.pluginsgroupId><artifactId>maven-compiler-pluginartifactId><configuration><source>1.8source><target>1.8target>configuration>plugin>plugins><finalName>${project.artifactId}finalName>build>project>

 

2.编写消息实体

package com.springboot.kafka.bean;import java.util.Date;import lombok.Data;@Data
public class Message {private Long id;    //idprivate String msg; //消息private Date sendTime;  //时间戳}

 有了lombok,每次编写实体不必要使用快捷键生成seter或geter方法了,代码看起来更加简洁了。

 

3.编写消息发送者(可以理解为生产者,最好联系详细介绍中的图)

package com.springboot.kafka.producer;import java.util.Date;
import java.util.UUID;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.springboot.kafka.bean.Message;import lombok.extern.slf4j.Slf4j;@Component
@Slf4j
public class KafkaSender {@Autowiredprivate KafkaTemplate kafkaTemplate;private Gson gson = new GsonBuilder().create();//发送消息方法public void send() {Message message = new Message();message.setId(System.currentTimeMillis());message.setMsg(UUID.randomUUID().toString());message.setSendTime(new Date());log.info("+++++++++++++++++++++  message = {}", gson.toJson(message));kafkaTemplate.send("zhisheng", gson.toJson(message));}
}

 

4.编写消息接收者(可以理解为消费者)

package com.springboot.kafka.producer;import java.util.Date;
import java.util.UUID;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.springboot.kafka.bean.Message;import lombok.extern.slf4j.Slf4j;@Component
@Slf4j
public class KafkaSender {@Autowiredprivate KafkaTemplate kafkaTemplate;private Gson gson = new GsonBuilder().create();//发送消息方法public void send() {Message message = new Message();message.setId(System.currentTimeMillis());message.setMsg(UUID.randomUUID().toString());message.setSendTime(new Date());log.info("+++++++++++++++++++++  message = {}", gson.toJson(message));kafkaTemplate.send("zhisheng", gson.toJson(message));}
}

 

5.编写启动类

package com.springboot.kafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;import com.springboot.kafka.producer.KafkaSender;@SpringBootApplication
public class KafkaApplication {public static void main(String[] args) {ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args);KafkaSender sender = context.getBean(KafkaSender.class);for (int i = 0; i < 3; i++) {//调用消息发送类中的消息发送方法
            sender.send();try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}}
}

 

 

6.编写application.properties配置文件

#============== kafka ===================
# u6307u5B9Akafka u4EE3u7406u5730u5740uFF0Cu53EFu4EE5u591Au4E2A
spring.kafka.bootstrap-servers=192.168.126.143:9092#=============== provider  =======================spring.kafka.producer.retries=0
# u6BCFu6B21u6279u91CFu53D1u9001u6D88u606Fu7684u6570u91CF
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432# u6307u5B9Au6D88u606Fkeyu548Cu6D88u606Fu4F53u7684u7F16u89E3u7801u65B9u5F0F
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#=============== consumer  =======================
# u6307u5B9Au9ED8u8BA4u6D88u8D39u8005group id
spring.kafka.consumer.group-id=test-consumer-groupspring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100# u6307u5B9Au6D88u606Fkeyu548Cu6D88u606Fu4F53u7684u7F16u89E3u7801u65B9u5F0F
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

 

7.运行结果

 

示例代码地址:https://github.com/youcong1996/study_simple_demo/tree/kafka_demo

如果按照上述流程没有达到预计的效果可以git clone到本地。

 

转载于:https://www.cnblogs.com/youcong/p/10216573.html

更多相关:

  • 首先对微擎的工作原理做简单描述, 微擎使用规则和模块的机制来处理公众平台的请求数据并返回响应的结果.执行流程描述为: 粉丝用户与公众号码进行对话或交互, 而后公众平台将粉丝用户的请求消息(当前包括: 文本, 图片, 位置, 链接, 事件. 请参阅消息类型)传递给微擎系统, 微擎系统按照消息类型和对应的公众号所设定的规则列表匹配到合适的...

  • 消息队列的使用场景以下介绍消息队列在实际应用常用的使用场景。异步处理、应用解耦、流量削锋和消息通讯四个场景。1】异步处理:场景说明:用户注册后,需要发注册邮件和注册短信。引入消息队列后架构如下:用户的响应时间=注册信息写入数据库的时间,例如50毫秒。发注册邮箱、发注册短信写入消息队列后,直接返回客户端,因写入消息队列的速度很快,基...

  • 下面是我凭记忆想到的几个题目,有需要的同学就拿去吧,我也算做了点善事. 中体骏彩C++笔试题 2013-11-18 1.指针的含义是:B A.名字 B.地址 C.名称 D.符号 2.给出下面的程序输出: #include #include #include ...

  • 双端通信描述 利用消息队列针对发送接受消息的类型唯一性 进行多个客户端之间消息传递,而不需要server端进行消息转发。 同时消息队列的读阻塞和写阻塞特性(消息队列中已经写入数据,如果再不读出来,则无法再次写入)让消息队列的实现过程只能如下: 客户端1的父进程用来处理类型1的消息写,子进程处理类型2的消息读客户端2的父进程处理类型...

  • 文章目录基本介绍编程接口代码实例消息队列的发送和接收消息队列中的消息对象的属性控制 基本介绍 支持不同进程之间以消息(messages)的形式进行数据交换,消息能够拥有自己的标识,且内核使用链表方式进行消息管理。进程之间的通信角色为:发送者和接受者 发送者: a. 获取消息队列的ID(key或者msgid) b. 将数据放入...

  • 有一天,我写了一个自信满满的自定义组件myComponent,在多个页面import使用了,结果控制台给我来这个 我特么裤子都脱了,你给我来这个提示是几个意思 仔细一看 The Component 'MyComponentComponent' is declared by more than one NgModule...

  • 创建一个带路由的项目,依次执行下面每行代码 ng n RouingApp --routingcd RouingAppng g c components/firstng g c components/secondng g m components/second --routing    代码拷贝: import {NgModul...

  •       cnpm install vue-quill-editor cnpm install quill-image-drop-module cnpm install quill-image-resize-module 执行上面的命令安装,然后在main.js下面加入 //引入quill-editor编辑器import...

  • 首先要理解Vue项目加载顺序: index.html → main.js → App.vue → nav.json→ routes.js → page1.vue index.html建议加入样式

  • 简单记录平时画图用到的python 便捷小脚本 1. 从单个文件输入 绘制坐标系图 #!/usr/bin/python # coding: utf-8 import matplotlib.pyplot as plt import numpy as np import matplotlib as mpl import sysf...