我的编程空间,编程开发者的网络收藏夹
学习永远不晚

kafka复习:(23)事务

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

kafka复习:(23)事务

一、生产者,开启事务。

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Date;import java.util.Properties;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;public class KafkaTest22 {    public static void main(String[] args) {        Properties properties= new Properties();        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");        properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");        properties.setProperty(ProducerConfig.RETRIES_CONFIG, "2");        properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "4");        properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "myTransaction");        KafkaProducer kafkaProducer=new KafkaProducer(properties);        ProducerRecord producerRecord=new ProducerRecord<>("study2024",0,"fff","hello sister,now is: "+ new Date());        kafkaProducer.initTransactions();        kafkaProducer.beginTransaction();        try{            Future future = kafkaProducer.send(producerRecord);            long offset = 0;            try {                offset = future.get().offset();            } catch (InterruptedException e) {                e.printStackTrace();            } catch (ExecutionException e) {                e.printStackTrace();            }            System.out.println(offset);            Thread.sleep(60000);            kafkaProducer.commitTransaction();        } catch (Exception ex){            kafkaProducer.abortTransaction();        }        kafkaProducer.close();    }}

二、消费者,设置隔离级别为"read_committed"

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;import java.time.temporal.TemporalUnit;import java.util.Arrays;import java.util.Properties;import java.util.concurrent.TimeUnit;public class KafkaTest23 {    private static Properties getProperties(){        Properties properties=new Properties();        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"testGroup");        properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");        //properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");        return properties;    }    public static void main(String[] args) {        KafkaConsumer myConsumer=new KafkaConsumer(getProperties());        String topic="study2024";        myConsumer.subscribe(Arrays.asList(topic));        while(true){            ConsumerRecords consumerRecords=myConsumer.poll(Duration.ofMillis(5000));            for(ConsumerRecord record: consumerRecords){                System.out.println(record.value());                System.out.println("record offset is: "+record.offset());            }        }    }}

三、运行结果,按照上述配置,当生产者发送消息并从kafka broker获取到offset后就会sleep,在生产者sleep的时候,消费者是获取不到消息的,只有sleep完成并提交事务之后,消费者才会获取到消息

来源地址:https://blog.csdn.net/amadeus_liu2/article/details/132567347

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

kafka复习:(23)事务

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

kafka复习:(23)事务

一、生产者,开启事务。 package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.producer.KafkaProd
2023-08-30

kafka复习:(24)consume-transform-produce模式

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.*;import org.apache.k
2023-08-30

kafka事务是如何实现的

Kafka提供了基于消息的分布式事务机制,可以确保消息的原子性、一致性和持久性。Kafka事务的实现基于以下两个核心概念:生产者事务和消费者事务。1. 生产者事务:生产者事务允许将多个消息写入Kafka的一个或多个主题,并且可以以原子方式将
2023-09-14

MySQL事务学习

MySQL事务:1、事务特性:原子性,一致性,隔离性,持久性原子性:对一些操作,要么同时成功,要么同时失败。一致性:对一些操作,处理结果必须一致的,比如转账:A转给B,那么A账户减少100元,则B账户必须增加100元。隔离性:多个事务操作数据的表或者行,如果没
MySQL事务学习
2019-11-20

RabbitMQ,RocketMQ,Kafka事务性的处理策略是什么

这篇文章主要讲解了“RabbitMQ,RocketMQ,Kafka事务性的处理策略是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“RabbitMQ,RocketMQ,Kafka事务性的处
2023-06-29

MySQL学习之事务详解

目录一. 事务的业务场景二. 事务的使用三. 事务的特性(ACID)1. 原子性(Atomicity)2. 一致性(Consistency)3. 持久性(Durability)4. 隔离性(Isolation)四. 事务并发异常1. 脏读2
2022-12-08

MySQL学习笔记(13):锁和事务

本文更新于2019-09-22,使用MySQL 5.7,操作系统为Deepin 15.4。目录锁锁概述MyISAM表级锁InnoDB行级锁InnoDB表级锁死锁事务事务概述InnoDB事务分布式事务锁锁概述MyISAM和MEMORY存储引擎使用表级锁。BDB存
MySQL学习笔记(13):锁和事务
2015-05-30

Redis学习笔记(二十一) 事务

文章开始啰嗦两句,写到这里共21篇关于redis的琐碎知识,没有过多的写编程过程中redis的应用,着重写的是redis命令、客户端、服务器以及生产环境搭建用到的主从、哨兵、集群实现原理,如果你真的能看的进去,相信对你在以后用到redis时会有一定的帮助。写到
Redis学习笔记(二十一) 事务
2017-07-23

Java事务管理学习之JDBC详解

什么是Java事务通常的观念认为,事务仅与数据库相关。事务必须服从ISO/IEC所制定的ACID原则。ACID是原子性(atomicity)、一致性(consistency)、隔离性(isolation)和持久性(durability)的缩
2023-05-31

编程热搜

目录