使用kafka在eclipe上可以设计生产着 和消费者
例子一
将本地文件上传到kafka上然后通过设计kafka的消费者取回到本地
上传到kafka上需要
KafkaProducerproducer;
Properties;//kafka的链接需要初始化数据这里需要properties将所需的东西以字符串的形式写在properties文件中所需东西不多且不会修改的情况下可以直接写在类里面.
FileInputStream 以字节流的方式传入到kafka
package com.ocean.kafka;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class HomeWork {
private KafkaProducer<String, byte[]> producer;
private Properties properties;
// 初始化数据
public HomeWork() {
properties = new Properties();
properties.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
producer = new KafkaProducer<String, byte[]>(properties);
}
// 指定发送到的topic
public void assignPartitionSend(String key, byte[] value) {
ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>("home-work_pic",0, key, value);
producer.send(record);
}
// 准备数据
public void preparLocalData() throws IOException {
File file = new File("C:\\Users\\Administrator\\Desktop\\psb.jpg");
FileInputStream fis =new FileInputStream(file);
byte[] context = new byte[1024];
int a = 0;
int length=0;
while ((length=fis.read(context))!=-1) {
//这里要注意得判断读取内容的实际长度 如果不这样设置 回到多出来//很多空格如果是图片的话则取回时无法还原
byte[] newbyte =new byte[length];
System.arraycopy(context, 0, newbyte, 0, length);
assignPartitionSend("TIMES" + a, newbyte);
a++;
}
fis.close();
}
public void close() {
producer.flush();
producer.close();
}
public static void main(String[] args) throws IOException {
HomeWork homeWork = new HomeWork();
try {
homeWork.preparLocalData();
} catch (IOException e) {
e.printStackTrace();
}
homeWork.close();
}
}
AI 代码解读
将数据传到Kafka上之后要想查看 可以通过zookeeper的客户端但是什么都看不懂因为是字节流(需要注意的一点是文件传到kafka上分区为0不然的话就会出现文件对应不上 )
最后想看的话就是取回来 下面的累就是将kafka上的文件数据传回到本地
package com.ocean.kafka;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class HomeWork2 {
private Properties properties = new Properties();
private KafkaConsumer<String, byte[]> consumer;
public HomeWork2() {
properties = new Properties();
properties.put("bootstrap.servers", "master:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
properties.setProperty("group.id", "home-work_pic");
consumer = new KafkaConsumer<String, byte[]>(properties);
}
public void getfile() throws IOException {
File file =new File("C:\\Users\\Administrator\\Desktop\\output.jpg");
FileOutputStream fileOutputStream =new FileOutputStream(file,true);
List<String>topics =new ArrayList<String>();
topics.add("home-work_pic");
consumer.subscribe(topics);
while(true){
ConsumerRecords<String, byte[]> records =consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
if(record.value()!=null){
System.out.println(record.value());
byte[] b =record.value();
fileOutputStream.write(b);
fileOutputStream.flush();
}
}
}
}
public static void main(String[] args) {
HomeWork2 homeWork2 =new HomeWork2();
try {
homeWork2.getfile();
} catch (IOException e) {
e.printStackTrace();
}
}
}
AI 代码解读
这就是一个简单的设置将文件以字节流的方式上传和下载从kafka上