Springboot集成Kafka

Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。支持通过Kafka服务器和消费机集群来分区消息。支持Hadoop并行数据加载。

Springboot的基本搭建和配置我在之前的文章已经给出代码示例了,如果还不了解的话可以先按照 SpringMVC配置太多?试试SpringBoot 进行学习哦。 那么如今很火的Springboot与kafka怎么完美的结合呢?多说无宜,放码过来 (talk is cheap,show me your code)!

安装Kafka

因为安装kafka需要zookeeper的支持,所以Windows安装时需要将zookeeper先安装上,然后将kafka安装好就可以了。 下面我给出Mac安装的步骤以及需要注意的点吧,windows的配置除了所在位置不太一样其他几乎没什么不同。

brew install kafka

对,就是这么简单,mac上一个命令就可以搞定了,这个安装过程可能需要等一会儿,应该是和网络状况有关系。安装提示信息可能有错误消息,如”Error: Could not link: /usr/local/share/doc/homebrew” 这个没关系,自动忽略掉了。 最终我们看到下面的样子就成功咯。

==> Summary 🍺/usr/local/Cellar/kafka/1.1.0: 157 files, 47.8MB

安装的配置文件位置如下,根据自己的需要修改端口号什么的就可以了。

安装的zoopeeper和kafka的位置 /usr/local/Cellar/

配置文件 /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/zookeeper.properties

启动zookeeper

./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &

启动kafka

./bin/kafka-server-start /usr/local/etc/kafka/server.properties &

为kafka创建Topic,topic 名为test,可以配置成自己想要的名字,回头再代码中配置正确就可以了。

./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

代码示例pom.xml

  <parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.2.RELEASE</version>
<relativePath/>
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>RELEASE</version>
</dependency>

</dependencies>

application.yml

server:
servlet:
context-path: /
port: 8080
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
batch-size: 16
retries: 0
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
auto-offset-reset: latest
enable-auto-commit: true
auto-commit-interval: 100
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: test-consumer-group

Producer 消息生产者


@Component
public class Producer {

@Autowired
private KafkaTemplate kafkaTemplate;

private static Gson gson = new GsonBuilder().create();

//发送消息方法
public void send() {
    Message message = new Message();
    message.setId("KFK_"+System.currentTimeMillis());
    message.setMsg(UUID.randomUUID().toString());
    message.setSendTime(new Date());
    kafkaTemplate.send("test", gson.toJson(message));
}

}


public class Message {

private String id;

private String msg;

private Date sendTime;

public String getId() {
    return id;
}

public void setId(String id) {
    this.id = id;
}

public String getMsg() {
    return msg;
}

public void setMsg(String msg) {
    this.msg = msg;
}

public Date getSendTime() {
    return sendTime;
}

public void setSendTime(Date sendTime) {
    this.sendTime = sendTime;
}

}

Consumer 消息消费者


public class Consumer {

@KafkaListener(topics = {"test"})
public void listen(ConsumerRecord<?, ?> record){

    Optional<?> kafkaMessage = Optional.ofNullable(record.value());

    if (kafkaMessage.isPresent()) {

        Object message = kafkaMessage.get();
        System.out.println("---->"+record);
        System.out.println("---->"+message);

    }

}

}

测试接口用例

这里我们用一个接口来测试我们的消息发送会不会被消费者接收。

@RestController
@RequestMapping(“/kafka”)
public class SendController {

@Autowired
private Producer producer;

@RequestMapping(value = "/send")
public String send() {
    producer.send();
    return "{\"code\":0}";
}

}

在Springboot启动类启动后在浏览器访问http://127.0.0.1:8080/kafka/send,我们可以再IDE控制台中看到输出的结果,这时候我们的整合基本上就完成啦。 具体代码可以在SpringBootKafkaDemo@github获取哦。pringBootKafkaDemo@github](https://github.com/xiaour/SpringBootDemo/tree/master/SpringBootKafkaDemo)获取哦。

© 2020 张涛的博客·XIAOUR All Rights Reserved. 本站访客数人次 本站总访问量
Theme by hiero