Kafka——使用java api进行pub & sub

简介:        之前用过老的api,但是最近在写消费的时候,发现之前老的api很多方法都out了,又去官网看了下最新的0.10.x的api.1,producer org.apache.


       之前用过老的api,但是最近在写消费的时候,发现之前老的api很多方法都out了,又去官网看了下最新的0.10.x的api.

1,producer


<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>0.10.1.0</version>
		</dependency>


  public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.31:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>("defaultTopic", Integer.toString(i), String.valueOf(i)));

        producer.close();
    }


2,Comsumer


<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>0.10.1.0</version>
		</dependency>
 
  
 
  
 public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.31:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("defaultTopic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(10);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }

    }



写完java api,可以try一下spring-intergeted,方便,一些池操作可以屏蔽掉,专注业务。






目录
相关文章
|
4天前
|
自然语言处理 Java API
Java 8的Stream API和Optional类:概念与实战应用
【5月更文挑战第17天】Java 8引入了许多重要的新特性,其中Stream API和Optional类是最引人注目的两个。这些特性不仅简化了集合操作,还提供了更好的方式来处理可能为空的情况,从而提高了代码的健壮性和可读性。
26 7
|
4天前
|
Java API
Java 8新特性之Lambda表达式与Stream API
【5月更文挑战第17天】本文将介绍Java 8中的两个重要特性:Lambda表达式和Stream API。Lambda表达式是一种新的编程语法,它允许我们将函数作为参数传递给其他方法,从而使代码更加简洁。Stream API是一种用于处理集合的新工具,它提供了一种高效且易于使用的方式来处理数据。通过结合使用这两个特性,我们可以编写出更加简洁、高效的Java代码。
13 0
|
1天前
|
Java 程序员 API
Java 8 Lambda 表达式和Stream API:概念、优势和实战应用
【5月更文挑战第20天】在Java 8中,Lambda 表达式和Stream API是两个非常强大的特性,它们显著改变了Java程序员处理数据和编写代码的方式。本篇技术文章将深入探讨这些特性的概念、优点,并提供实战示例,帮助理解如何有效地利用这些工具来编写更简洁、更高效的代码。
19 6
|
3天前
|
网络安全 流计算 Python
实时计算 Flink版操作报错合集之Flink sql-client 针对kafka的protobuf格式数据建表,报错:java.lang.ClassNotFoundException 如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
16 1
|
3天前
|
安全 Java API
Java进阶-Java Stream API详解与使用
效、更易于维护的代码,同时享受到函数式编程带来的好处。
12 2
|
4天前
|
Java 大数据 API
利用Java Stream API实现高效数据处理
在大数据和云计算时代,数据处理效率成为了软件开发者必须面对的重要挑战。Java 8及以后版本引入的Stream API为开发者提供了一种声明式、函数式的数据处理方式,极大提升了数据处理的效率和可读性。本文将详细介绍Java Stream API的基本概念、使用方法和性能优势,并通过实际案例展示如何在实际项目中应用Stream API实现高效数据处理。
|
4天前
|
消息中间件 关系型数据库 网络安全
实时计算 Flink版操作报错合集之Flink sql-client 针对kafka的protobuf格式数据建表,报错:java.lang.ClassNotFoundException 如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
14 1
|
6天前
|
消息中间件 存储 算法
深入了解Kafka的数据持久化机制
深入了解Kafka的数据持久化机制
46 0
|
4天前
|
消息中间件 关系型数据库 Kafka
实时计算 Flink版产品使用合集之想要加快消费 Kafka 数据的速度,该怎么配置参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
17 2
|
6天前
|
消息中间件 Kafka Apache
Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
【2月更文挑战第6天】Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
76 2