失效链接处理 |
基于JAVA API方式使用Kafka——编写生产者客户端 PDF 下载
本站整理下载:
提取码:isss
相关截图:
主要内容:
1.导入依赖:
<repositories>
<repository>
<id>nexus-aliyun</id>
<name>nexus-aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.2.1</version>
</dependency>
</dependencies>
2.idea创建java类,类名为KafkaProducerTest
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerTest {
public static void main(String[] args) {
Properties props=new Properties();
//1.指定Kafaka集群的ip地址和端口号
props.put("bootstrap.servers","master:9092");
//2.等待所有副本节点的应答
props.put("acks","all");
//3.消息发送最大尝试次数
props.put("retries",0);
//4.指定一批消息处理次数
props.put("batch.size",16384);
//5.指定请求延时
props.put("linger.ms",1);
//6.指定缓存区内存大小
props.put("buffer.memory",33554432);
//7.设置key序列化
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
//8.设置value序列化
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
// 9、生产数据
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 50; i++) {
producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "ycf vs yy" + i));
}
producer.close();
}
}
3.开启zookeeper集群
4.开启kafka集群
第三步第四步骤不会看我博客:kafka命令操作(详细)
https://blog.csdn.net/fy_1852003327/article/details/106695687
5.进入kafka目录下开启消费者
bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic first
这里需要注意,idea充当生产者生产数据,开启消费者接受数据
idea中指定topic为first,所以kafka上运行命令topic也指定为first
idea中指定Kafaka集群的ip地址和端口号为:master:9092,所以kafka上运行也得指定为master:9092,不过代码可以在master,slave1,slave2三台节点的任意一台输入消费者命令接收数据。
|