kafka复习:(23)事务
短信预约 -IT技能 免费直播动态提醒
一、生产者,开启事务。
package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Date;import java.util.Properties;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;public class KafkaTest22 { public static void main(String[] args) { Properties properties= new Properties(); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092"); properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); properties.setProperty(ProducerConfig.RETRIES_CONFIG, "2"); properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "4"); properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "myTransaction"); KafkaProducer kafkaProducer=new KafkaProducer(properties); ProducerRecord producerRecord=new ProducerRecord<>("study2024",0,"fff","hello sister,now is: "+ new Date()); kafkaProducer.initTransactions(); kafkaProducer.beginTransaction(); try{ Future future = kafkaProducer.send(producerRecord); long offset = 0; try { offset = future.get().offset(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println(offset); Thread.sleep(60000); kafkaProducer.commitTransaction(); } catch (Exception ex){ kafkaProducer.abortTransaction(); } kafkaProducer.close(); }}
二、消费者,设置隔离级别为"read_committed"
package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;import java.time.temporal.TemporalUnit;import java.util.Arrays;import java.util.Properties;import java.util.concurrent.TimeUnit;public class KafkaTest23 { private static Properties getProperties(){ Properties properties=new Properties(); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092"); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"testGroup"); properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); //properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted"); return properties; } public static void main(String[] args) { KafkaConsumer myConsumer=new KafkaConsumer(getProperties()); String topic="study2024"; myConsumer.subscribe(Arrays.asList(topic)); while(true){ ConsumerRecords consumerRecords=myConsumer.poll(Duration.ofMillis(5000)); for(ConsumerRecord record: consumerRecords){ System.out.println(record.value()); System.out.println("record offset is: "+record.offset()); } } }}
三、运行结果,按照上述配置,当生产者发送消息并从kafka broker获取到offset后就会sleep,在生产者sleep的时候,消费者是获取不到消息的,只有sleep完成并提交事务之后,消费者才会获取到消息
来源地址:https://blog.csdn.net/amadeus_liu2/article/details/132567347
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341