Kafka Java API示例

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:         1、Producer端 import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.

        1、Producer端

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class KafkaProducer{
	
	private final Producer<String, String> producer;
    public final static String TOPIC = "testtopic";

    private KafkaProducer(){
    	
        Properties props = new Properties();
        
        // 此处配置的是kafka的broker地址:端口列表
        props.put("metadata.broker.list", "192.168.1.225:9092,192.168.1.226:9092");

        //配置value的序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        
        //配置key的序列化类
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");

        //request.required.acks
        //0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
        //1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).
        //-1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains.
        props.put("request.required.acks","-1");

        producer = new Producer<String, String>(new ProducerConfig(props));
    }

    void produce() {
        int messageNo = 1;
        final int COUNT = 101;

        int messageCount = 0;
        while (messageNo < COUNT) {
            String key = String.valueOf(messageNo);
            String data = "Hello kafka message :" + key;
            producer.send(new KeyedMessage<String, String>(TOPIC, key ,data));
            System.out.println(data);
            messageNo ++;
            messageCount++;
        }
        
        System.out.println("Producer端一共产生了" + messageCount + "条消息!");
    }

    public static void main( String[] args )
    {
        new KafkaProducer().produce();
    }
}
        运行结果:

Hello kafka message :1
Hello kafka message :2
Hello kafka message :3
Hello kafka message :4
Hello kafka message :5
Hello kafka message :6
Hello kafka message :7
Hello kafka message :8
Hello kafka message :9
Hello kafka message :10
Hello kafka message :11
Hello kafka message :12
Hello kafka message :13
Hello kafka message :14
Hello kafka message :15
Hello kafka message :16
Hello kafka message :17
Hello kafka message :18
Hello kafka message :19
Hello kafka message :20
Hello kafka message :21
Hello kafka message :22
Hello kafka message :23
Hello kafka message :24
Hello kafka message :25
Hello kafka message :26
Hello kafka message :27
Hello kafka message :28
Hello kafka message :29
Hello kafka message :30
Hello kafka message :31
Hello kafka message :32
Hello kafka message :33
Hello kafka message :34
Hello kafka message :35
Hello kafka message :36
Hello kafka message :37
Hello kafka message :38
Hello kafka message :39
Hello kafka message :40
Hello kafka message :41
Hello kafka message :42
Hello kafka message :43
Hello kafka message :44
Hello kafka message :45
Hello kafka message :46
Hello kafka message :47
Hello kafka message :48
Hello kafka message :49
Hello kafka message :50
Hello kafka message :51
Hello kafka message :52
Hello kafka message :53
Hello kafka message :54
Hello kafka message :55
Hello kafka message :56
Hello kafka message :57
Hello kafka message :58
Hello kafka message :59
Hello kafka message :60
Hello kafka message :61
Hello kafka message :62
Hello kafka message :63
Hello kafka message :64
Hello kafka message :65
Hello kafka message :66
Hello kafka message :67
Hello kafka message :68
Hello kafka message :69
Hello kafka message :70
Hello kafka message :71
Hello kafka message :72
Hello kafka message :73
Hello kafka message :74
Hello kafka message :75
Hello kafka message :76
Hello kafka message :77
Hello kafka message :78
Hello kafka message :79
Hello kafka message :80
Hello kafka message :81
Hello kafka message :82
Hello kafka message :83
Hello kafka message :84
Hello kafka message :85
Hello kafka message :86
Hello kafka message :87
Hello kafka message :88
Hello kafka message :89
Hello kafka message :90
Hello kafka message :91
Hello kafka message :92
Hello kafka message :93
Hello kafka message :94
Hello kafka message :95
Hello kafka message :96
Hello kafka message :97
Hello kafka message :98
Hello kafka message :99
Hello kafka message :100
Producer端一共产生了100条消息!
        2、Consumer端

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

public class KafkaConsumer {

    private final ConsumerConnector consumer;

    private KafkaConsumer() {
        Properties props = new Properties();
        
        // zookeeper 配置
        props.put("zookeeper.connect", "server3:2181");

        // 消费者所在组
        props.put("group.id", "testgroup");

        // zk连接超时
        props.put("zookeeper.session.timeout.ms", "4000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        
        // 序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder");

        ConsumerConfig config = new ConsumerConfig(props);

        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
    }

    void consume() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(KafkaProducer.TOPIC, new Integer(1));

        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

        Map<String, List<KafkaStream<String, String>>> consumerMap = 
                consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
        KafkaStream<String, String> stream = consumerMap.get(KafkaProducer.TOPIC).get(0);
        ConsumerIterator<String, String> it = stream.iterator();
        
        int messageCount = 0;
        while (it.hasNext()){
        	System.out.println(it.next().message());
        	messageCount++;
        	if(messageCount == 100){
        		System.out.println("Consumer端一共消费了" + messageCount + "条消息!");
        	}
        }
    }

    public static void main(String[] args) {
        new KafkaConsumer().consume();
    }
}
        运行结果:

Hello kafka message :3
Hello kafka message :8
Hello kafka message :14
Hello kafka message :19
Hello kafka message :23
Hello kafka message :28
Hello kafka message :32
Hello kafka message :37
Hello kafka message :41
Hello kafka message :46
Hello kafka message :50
Hello kafka message :55
Hello kafka message :64
Hello kafka message :69
Hello kafka message :73
Hello kafka message :78
Hello kafka message :82
Hello kafka message :87
Hello kafka message :91
Hello kafka message :96
Hello kafka message :2
Hello kafka message :7
Hello kafka message :13
Hello kafka message :18
Hello kafka message :22
Hello kafka message :27
Hello kafka message :31
Hello kafka message :36
Hello kafka message :40
Hello kafka message :45
Hello kafka message :54
Hello kafka message :59
Hello kafka message :63
Hello kafka message :68
Hello kafka message :72
Hello kafka message :77
Hello kafka message :81
Hello kafka message :86
Hello kafka message :90
Hello kafka message :95
Hello kafka message :100
Hello kafka message :5
Hello kafka message :11
Hello kafka message :16
Hello kafka message :20
Hello kafka message :25
Hello kafka message :34
Hello kafka message :39
Hello kafka message :43
Hello kafka message :48
Hello kafka message :52
Hello kafka message :57
Hello kafka message :61
Hello kafka message :66
Hello kafka message :70
Hello kafka message :75
Hello kafka message :84
Hello kafka message :89
Hello kafka message :93
Hello kafka message :98
Hello kafka message :4
Hello kafka message :9
Hello kafka message :10
Hello kafka message :15
Hello kafka message :24
Hello kafka message :29
Hello kafka message :33
Hello kafka message :38
Hello kafka message :42
Hello kafka message :47
Hello kafka message :51
Hello kafka message :56
Hello kafka message :60
Hello kafka message :65
Hello kafka message :74
Hello kafka message :79
Hello kafka message :83
Hello kafka message :88
Hello kafka message :92
Hello kafka message :97
Hello kafka message :1
Hello kafka message :6
Hello kafka message :12
Hello kafka message :17
Hello kafka message :21
Hello kafka message :26
Hello kafka message :30
Hello kafka message :35
Hello kafka message :44
Hello kafka message :49
Hello kafka message :53
Hello kafka message :58
Hello kafka message :62
Hello kafka message :67
Hello kafka message :71
Hello kafka message :76
Hello kafka message :80
Hello kafka message :85
Hello kafka message :94
Hello kafka message :99
Consumer端一共消费了100条消息!




相关文章
|
4天前
|
自然语言处理 Java API
Java 8的Stream API和Optional类:概念与实战应用
【5月更文挑战第17天】Java 8引入了许多重要的新特性,其中Stream API和Optional类是最引人注目的两个。这些特性不仅简化了集合操作,还提供了更好的方式来处理可能为空的情况,从而提高了代码的健壮性和可读性。
26 7
|
4天前
|
Java API
Java 8新特性之Lambda表达式与Stream API
【5月更文挑战第17天】本文将介绍Java 8中的两个重要特性:Lambda表达式和Stream API。Lambda表达式是一种新的编程语法,它允许我们将函数作为参数传递给其他方法,从而使代码更加简洁。Stream API是一种用于处理集合的新工具,它提供了一种高效且易于使用的方式来处理数据。通过结合使用这两个特性,我们可以编写出更加简洁、高效的Java代码。
13 0
|
1天前
|
Java 程序员 API
Java 8 Lambda 表达式和Stream API:概念、优势和实战应用
【5月更文挑战第20天】在Java 8中,Lambda 表达式和Stream API是两个非常强大的特性,它们显著改变了Java程序员处理数据和编写代码的方式。本篇技术文章将深入探讨这些特性的概念、优点,并提供实战示例,帮助理解如何有效地利用这些工具来编写更简洁、更高效的代码。
19 6
|
3天前
|
网络安全 流计算 Python
实时计算 Flink版操作报错合集之Flink sql-client 针对kafka的protobuf格式数据建表,报错:java.lang.ClassNotFoundException 如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
16 1
|
3天前
|
安全 Java API
Java进阶-Java Stream API详解与使用
效、更易于维护的代码,同时享受到函数式编程带来的好处。
12 2
|
4天前
|
Java 大数据 API
利用Java Stream API实现高效数据处理
在大数据和云计算时代,数据处理效率成为了软件开发者必须面对的重要挑战。Java 8及以后版本引入的Stream API为开发者提供了一种声明式、函数式的数据处理方式,极大提升了数据处理的效率和可读性。本文将详细介绍Java Stream API的基本概念、使用方法和性能优势,并通过实际案例展示如何在实际项目中应用Stream API实现高效数据处理。
|
4天前
|
监控 安全 数据挖掘
Email 接口API有哪些?具体分析一下阿里云和AOK的优点
本文介绍了常见的Email接口API,如阿里云邮件推送、AOKSend、SendGrid、Mailgun和Amazon SES。阿里云API以其高稳定性和数据分析功能脱颖而出,支持批量发送和多语言;而AOKSend API以易于集成、高安全性和优秀客户支持为亮点。企业在选择时应考虑自身需求和预算,以优化邮件营销效果。
|
4天前
|
定位技术 API
Angular 调用导入百度地图API接口,2024春招BAT面试真题详解
Angular 调用导入百度地图API接口,2024春招BAT面试真题详解
|
5天前
|
JSON 安全 API
解锁淘宝商品评论API接口:电商数据分析的新视角
淘宝商品评论API接口是淘宝开放平台提供的一组API接口,允许开发者通过编程方式获取淘宝商品评论数据。这些接口可以获取到商品的详细信息、用户评论、评分等数据,为电商数据分析提供了丰富的素材。
|
5天前
|
缓存 负载均衡 安全
探索API接口开发(定制与开发接口)
在当今数字化、互联互通的时代,API(应用程序编程接口)已经成为连接不同软件、服务和应用的关键桥梁。API接口开发,作为软件架构和系统设计的重要组成部分,不仅影响着数据交换的效率,更决定了整个系统的灵活性和可扩展性。本文将深入探讨API接口开发的各个方面,包括其重要性、开发流程、最佳实践以及面临的挑战。

热门文章

最新文章