我的编程空间,编程开发者的网络收藏夹
学习永远不晚

spring-kafka多线程顺序消费

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

spring-kafka多线程顺序消费

  业务场景

  我们公司是做共享充电宝的业务的。有一些比较大的代理商或者ka商户,他们需要了解到他们自己下面的商户的订单数据,这些订单数据需要由我们推送给他们。

  大致架构为数据部门通过canal订阅订单表的数据,然后推送到kafka ,我们订阅数据部门kafka获取到代理商下商户的实时订单数据再推送给代理商。比如,代理商下商户产生了一笔订单,整个过程会产生,订单生成,订单已支付,充电宝已被取走,充电宝已归还等多种状态的订单消息,我们需要实时把这些订单消息推送给代理商。我们的业务场景需要消息的顺序推送和多线程并发消费以提高性能

  kafka多线程消费方案

  消费者程序启动多个线程,每个线程维护专属的KafkaConsumer实例,负责完整的消息获取、消息处理

  流程。如下图所示:

  消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以 是一个,也可以是多个,每个线程维护专属的KafkaConsumer实例,处理消息则交由特定的线程池来 做,从而实现消息获取与消息处理的真正解耦。具体架构如下图所示:

  这两种方案孰优孰劣呢?应该说是各有千秋。这两种方案的优缺点,我们先来看看下面这张表格。

  kafka怎么保证顺序消费

  保证顺序消费,需要满足如下条件

  保证相同订单编号的消息需要发送到同一个分区。

  @Configuration

  public class SenderConfig {

  @Value("${kafka.bootstrap-servers}")

  private String bootstrapServers;

  @Bean

  public Map producerConfigs() {

  Map props = new HashMap<>();

  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

  return props;

  }

  @Bean

  public ProducerFactory producerFactory() {

  return new DefaultKafkaProducerFactory<>(producerConfigs());

  }

  @Bean

  public KafkaTemplate kafkaTemplate() {

  return new KafkaTemplate<>(producerFactory());

  }

  @Bean

  public Sender sender() {

  return new Sender();

  }

  }

  public class Sender {

  @Autowired

  private KafkaTemplate kafkaTemplate;

  public void send(String topic, String data) {

  kafkaTemplate.send(topic, data);

  }

  public void send(String topic, int partition, String data) {

  kafkaTemplate.send(topic, partition, data);

  }

  }

  @RunWith(SpringRunner.class)

  @SpringBootTest

  public class SpringKafkaApplicationTest {

  private static String BATCH_TOPIC = "batch.t";

  private static Integer PARTITIONS = 6;

  

  private static Integer PAYED_STATUS = 2;

  

  private static Integer SEND_BACK_STATUS = 3;

  @Autowired

  private Sender sender;

  private static DelayQueue delayQueue = new DelayQueue();

  @Test

  public void testReceive() throws Exception {

  for (int i = 1; i < 50; i++) {

  Integer orderNum = 800010 + i;

  Integer orderPrice = RandomUtil.randomInt(1, 20);

  // 用户支付成功,订单状态为支付成功

  OrderDTO order = new OrderDTO(orderNum, orderPrice, PAYED_STATUS);

  // 发送支付成功订单消息到对应的kafka分区

  Integer destinationPartition = orderNum % PARTITIONS;

  sender.send(BATCH_TOPIC, destinationPartition, JSONUtil.toJsonStr(order));

  // 创建任务放入延迟队列(模拟用户支付成功到取走充电宝花费的时间)

  long delayTime = 200;

  OrderTask orderTask = new OrderTask(delayTime, order);

  delayQueue.offer(orderTask);

  }

  while (true) {

  // 用户取走充电宝,订单状态更改为 已取走

  OrderTask orderTask = (OrderTask) delayQueue.take();

  OrderDTO orderDTO = orderTask.getOrderDTO();

  Integer destinationPartition = orderDTO.getOrderNum() % PARTITIONS;

  orderDTO.setOrderStatus(SEND_BACK_STATUS);

  // 发送已取走订单消息到对应的kafka 分区

  sender.send(BATCH_TOPIC, destinationPartition, JSONUtil.toJsonStr(orderDTO));

  }

  }

  }

  可以看出我们通过订单号对分区数进行取余,来确定该消息发送到哪一个分区,保证相同订单号的消息被发送到相同的分区。当然也可以对字符串这些进行hash ,获得hash值来对分区数取余

  Integer destinationPartition=orderDTO.getOrderNum()%PARTITIONS;

  保证同一个分区的消息由同一个线程来消费。

  我们的业务场景需要采用多线程方案一来处理我们的业务

  普通方式实现方案一

  public class KafkaConsumerRunner implements Runnable {

  private final AtomicBoolean closed = new AtomicBoolean(false);

  private final KafkaConsumer consumer;

  public KafkaConsumerRunner(KafkaConsumer consumer) {

  this.consumer = consumer;

  }

  @Override

  public void run() {

  try {

  consumer.subscribe(Arrays.asList("topic"));

  while (!closed.get()) {

  // 执行消息处理逻辑

  ConsumerRecords records = consumer.poll(10000);

  }

  } catch (Exception e) {

  // Ignore exception if closing

  if (!closed.get()) {

  throw e;

  }

  } finally {

  consumer.close();

  }

  }

  

  public void shutdown() {

  closed.set(true);

  consumer.wakeup();

  }

  }

  spring-kafka为我们做的封装

  消费者相关配置:

  这里我们需要注意的是factory.setConcurrency(4)。

  这个是配置主要是设置KafkaConsumer的数量,最大为topic 的分区数。当然你如果设置的值超过topic 分区数,spring-kafka 还是只会为我们创建最大分区数的KafkaConsumer数量,也就是创建KafkaConsumer数量能少于分区数,但不会超过分区数。少于分区数的话,一个KafkaConsumer会消费多个分区的数据,保证所有的分区数据都有对应的KafkaConsumer来进行消费;但不会出现多个KafkaConsumer消费同一个分区的情况,因为如果是这样也就无法保证消息的顺序消费机制。

  一般情况下如果数据量较大,我们需要把此值设置为topic分区数,这样一个KafkaConsumer消费一个分区的数据,提高数据的并发消费能力。

  @Configuration

  @EnableKafka

  public class ReceiverConfig {

  @Value("${kafka.bootstrap-servers}")

  private String bootstrapServers;

  @Bean

  public Map consumerConfigs() {

  Map props = new HashMap<>();

  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

  props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");

  // maximum records per poll

  props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");

  return props;

  }

  @Bean

  public ConsumerFactory consumerFactory() {

  return new DefaultKafkaConsumerFactory<>(consumerConfigs());

  }

  @Bean(name = "kafkaListenerContainerFactory")

  public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {

  ConcurrentKafkaListenerContainerFactory factory =

  new ConcurrentKafkaListenerContainerFactory<>();

  factory.setConsumerFactory(consumerFactory());

  // enable batch listening

  factory.setBatchListener(true);

  factory.setConcurrency(4);

  return factory;

  }

  @Bean

  public Receiver receiver() {

  return new Receiver();

  }

  }

  Receiver 代码

  public class Receiver {

  @Autowired

  private PushOrderService pushOrderService;

  private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);

  private static final String BATCH_TOPIC = "batch.t";

  @KafkaListener(topics = BATCH_TOPIC, containerFactory = "kafkaListenerContainerFactory")

  public void receivePartitions(List data,

  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List partitions,

  @Header(KafkaHeaders.OFFSET) List offsets) {

  for (int i = 0; i < data.size(); i++) {

  Long threadId = Thread.currentThread().getId();

  // 向第三方推送订单消息

  String orderStr = data.get(i);

  pushOrderService.pushOrderToPlatform(orderStr);

  OrderDTO orderDTO = JSONUtil.toBean(orderStr, OrderDTO.class);

  LOGGER.info("推送订单消息成功,订单号为:{},状态为:{},分区为{},处理线程为:{}", orderDTO.getOrderNum(), orderDTO.getOrderStatus(), partitions.get(i), threadId);

  }

  }

  }

  

  @Service

  public class PushOrderService {

  

  private static Integer PAYED_STATUS = 2;

  public void pushOrderToPlatform(String orderString) {

  // 模拟网络推送订单信息给第三方平台(同步推送)

  OrderDTO orderDTO = JSONUtil.toBean(orderString, OrderDTO.class);

  // 已支付 订单消息

  if (orderDTO.getOrderStatus().equals(PAYED_STATUS)) {

  ThreadUtil.sleep(500);

  } else {

  // 已取走 订单消息

  ThreadUtil.sleep(200);

  }

  }

  }

  测试结果:无锡做人流手术多少钱 http://www.ytsg029.com/

  16:17:47.026 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800014,状态为:2,分区为4,处理线程为:67

  16:17:47.026 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800012,状态为:2,分区为2,处理线程为:66

  16:17:47.534 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800020,状态为:2,分区为4,处理线程为:67

  16:17:47.534 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800018,状态为:2,分区为2,处理线程为:66

  16:17:48.035 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800026,状态为:2,分区为4,处理线程为:67

  16:17:48.036 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800024,状态为:2,分区为2,处理线程为:66

  16:17:48.537 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800015,状态为:2,分区为5,处理线程为:67

  16:17:48.539 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800016,状态为:2,分区为0,处理线程为:66

  16:17:49.044 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800022,状态为:2,分区为0,处理线程为:66

  16:17:49.045 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800021,状态为:2,分区为5,处理线程为:67

  16:17:49.546 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800013,状态为:2,分区为3,处理线程为:67

  16:17:49.547 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800028,状态为:2,分区为0,处理线程为:66

  16:17:50.051 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800019,状态为:2,分区为3,处理线程为:67

  16:17:50.051 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800011,状态为:2,分区为1,处理线程为:66

  16:17:50.554 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800025,状态为:2,分区为3,处理线程为:67

  16:17:50.554 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800017,状态为:2,分区为1,处理线程为:66

  16:17:51.060 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800023,状态为:2,分区为1,处理线程为:66

  16:17:51.576 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800034,状态为:2,分区为0,处理线程为:66

  16:17:51.579 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800031,状态为:2,分区为3,处理线程为:67

  16:17:51.583 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800032,状态为:2,分区为4,处理线程为:70

  16:17:51.583 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800027,状态为:2,分区为5,处理线程为:72

  16:17:52.079 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800040,状态为:2,分区为0,处理线程为:66

  16:17:52.083 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800037,状态为:2,分区为3,处理线程为:67

  16:17:52.088 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800038,状态为:2,分区为4,处理线程为:70

  16:17:52.088 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800033,状态为:2,分区为5,处理线程为:72

  16:17:52.583 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800046,状态为:2,分区为0,处理线程为:66

  16:17:52.588 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800043,状态为:2,分区为3,处理线程为:67

  16:17:52.589 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800044,状态为:2,分区为4,处理线程为:70

  16:17:52.590 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800039,状态为:2,分区为5,处理线程为:72

  16:17:53.089 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800052,状态为:2,分区为0,处理线程为:66

  16:17:53.091 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800050,状态为:2,分区为4,处理线程为:70

  16:17:53.091 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800049,状态为:2,分区为3,处理线程为:67

  16:17:53.095 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800045,状态为:2,分区为5,处理线程为:72

  16:17:53.591 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800058,状态为:2,分区为0,处理线程为:66

  16:17:53.592 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800056,状态为:2,分区为4,处理线程为:70

  16:17:53.593 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800055,状态为:2,分区为3,处理线程为:67

  16:17:53.600 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800051,状态为:2,分区为5,处理线程为:72

  16:17:53.795 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800016,状态为:3,分区为0,处理线程为:66

  16:17:53.796 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800013,状态为:3,分区为3,处理线程为:67

  16:17:53.796 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800014,状态为:3,分区为4,处理线程为:70

  16:17:54.000 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800020,状态为:3,分区为4,处理线程为:70

  16:17:54.000 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800019,状态为:3,分区为3,处理线程为:67

  16:17:54.000 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800022,状态为:3,分区为0,处理线程为:66

  16:17:54.101 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800057,状态为:2,分区为5,处理线程为:72

  16:17:54.205 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800026,状态为:3,分区为4,处理线程为:70

  16:17:54.206 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800025,状态为:3,分区为3,处理线程为:67

  16:17:54.206 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800028,状态为:3,分区为0,处理线程为:66

  16:17:54.306 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800015,状态为:3,分区为5,处理线程为:72

  16:17:54.410 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800037,状态为:3,分区为3,处理线程为:67

  16:17:54.410 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800034,状态为:3,分区为0,处理线程为:66

  16:17:54.410 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800032,状态为:3,分区为4,处理线程为:70

  16:17:54.510 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800021,状态为:3,分区为5,处理线程为:72

  16:17:54.614 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800031,状态为:3,分区为3,处理线程为:67

  16:17:54.615 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800046,状态为:3,分区为0,处理线程为:66

  16:17:54.615 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800038,状态为:3,分区为4,处理线程为:70

  16:17:54.711 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800027,状态为:3,分区为5,处理线程为:72

  16:17:54.820 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800043,状态为:3,分区为3,处理线程为:67

  16:17:54.820 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800040,状态为:3,分区为0,处理线程为:66

  16:17:54.820 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800056,状态为:3,分区为4,处理线程为:70

  16:17:54.914 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800033,状态为:3,分区为5,处理线程为:72

  16:17:55.025 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800044,状态为:3,分区为4,处理线程为:70

  16:17:55.025 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800052,状态为:3,分区为0,处理线程为:66

  16:17:55.025 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800055,状态为:3,分区为3,处理线程为:67

  16:17:55.118 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800039,状态为:3,分区为5,处理线程为:72

  16:17:55.231 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800050,状态为:3,分区为4,处理线程为:70

  16:17:55.231 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800049,状态为:3,分区为3,处理线程为:67

  16:17:55.231 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800058,状态为:3,分区为0,处理线程为:66

  16:17:55.321 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800057,状态为:3,分区为5,处理线程为:72

  16:17:55.525 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800051,状态为:3,分区为5,处理线程为:72

  16:17:55.728 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800045,状态为:3,分区为5,处理线程为:72

  16:17:55.735 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800029,状态为:2,分区为1,处理线程为:66

  16:17:55.737 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800030,状态为:2,分区为2,处理线程为:67

  16:17:56.239 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800036,状态为:2,分区为2,处理线程为:67

  16:17:56.239 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800035,状态为:2,分区为1,处理线程为:66

  16:17:56.743 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800042,状态为:2,分区为2,处理线程为:67

  16:17:56.743 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800041,状态为:2,分区为1,处理线程为:66

  16:17:57.247 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800047,状态为:2,分区为1,处理线程为:66

  16:17:57.247 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800048,状态为:2,分区为2,处理线程为:67

  16:17:57.751 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800053,状态为:2,分区为1,处理线程为:66

  16:17:57.751 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800054,状态为:2,分区为2,处理线程为:67

  16:17:57.953 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800012,状态为:3,分区为2,处理线程为:67

  16:17:58.159 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800018,状态为:3,分区为2,处理线程为:67

  16:17:58.256 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800059,状态为:2,分区为1,处理线程为:66

  16:17:58.361 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800024,状态为:3,分区为2,处理线程为:67

  16:17:58.457 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800011,状态为:3,分区为1,处理线程为:66

  16:17:58.566 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800048,状态为:3,分区为2,处理线程为:67

  16:17:58.662 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800017,状态为:3,分区为1,处理线程为:66

  16:17:58.771 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800042,状态为:3,分区为2,处理线程为:67

  16:17:58.868 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800023,状态为:3,分区为1,处理线程为:66

  16:17:58.975 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800030,状态为:3,分区为2,处理线程为:67

  16:17:59.073 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800029,状态为:3,分区为1,处理线程为:66

  16:17:59.177 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800036,状态为:3,分区为2,处理线程为:67

  16:17:59.279 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800041,状态为:3,分区为1,处理线程为:66

  16:17:59.383 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800054,状态为:3,分区为2,处理线程为:67

  16:17:59.481 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800035,状态为:3,分区为1,处理线程为:66

  16:17:59.685 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800053,状态为:3,分区为1,处理线程为:66

  16:17:59.891 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800047,状态为:3,分区为1,处理线程为:66

  16:18:00.092 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送订单消息成功,订单号为:800059,状态为:3,分区为1,处理线程为:66

  完整代码

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

spring-kafka多线程顺序消费

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

spring-kafka多线程顺序消费

  业务场景  我们公司是做共享充电宝的业务的。有一些比较大的代理商或者ka商户,他们需要了解到他们自己下面的商户的订单数据,这些订单数据需要由我们推送给他们。  大致架构为数据部门通过canal订阅订单表的数据,然后推送到kafka ,我
2023-06-02

kafka java 生产消费程序demo示例

kafka是吞吐量巨大的一个消息系统,它是用scala写的,和普通的消息的生产消费还有所不同,写了个demo程序供大家参考。kafka的安装请参考官方文档。首先我们需要新建一个maven项目,然后在pom中引用kafka jar包,引用依赖
2023-06-03

Java怎么让多线程按顺序执行

本文小编为大家详细介绍“Java怎么让多线程按顺序执行”,内容详细,步骤清晰,细节处理妥当,希望这篇“Java怎么让多线程按顺序执行”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。在子线程中通过join()方法指定
2023-06-30

java怎么实现多线程的顺序执行

这篇文章主要介绍java怎么实现多线程的顺序执行,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!场景编写一个程序,启动三个线程,三个线程的name分别是A,B,C;,每个线程将自己的ID值在屏幕上打印5遍,打印顺序是A
2023-06-15

ReentrantLock条件变量使多个线程顺序执行

这篇文章主要为大家介绍了ReentrantLock条件变量使多个线程顺序执行,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2022-12-19

Java Swing 多线程加载图片(保证顺序一致)

大二的时候做的课程设计,图片管理器,当时遇到图片很多的文件夹,加载顺序非常慢。虽然尝试用多个Thread加载图片,却无法保证图片按顺序加载。直到今天学会了使用Callable接口和Future接口,于是心血来潮实现了这个功能。废话不多说,看
2023-05-30

C++中多线程的执行顺序如你预期吗

这篇文章主要为大家详细介绍一下C++中多线程的执行顺序的相关资料,文中的示例代码讲解详细,对我们学习C++多线程有一定帮助,需要的可以参考一下
2022-11-13

Java多线程中消费者生产者模式怎么实现

这篇文章主要讲解了“Java多线程中消费者生产者模式怎么实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Java多线程中消费者生产者模式怎么实现”吧!/*@author shijin *
2023-06-17

Java多线程中的生产者与消费者案例讲解

这篇文章主要讲解了“Java多线程中的生产者与消费者案例讲解”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Java多线程中的生产者与消费者案例讲解”吧!目录前言工具知识点设计思路具体步骤总结
2023-06-20

java 中多线程生产者消费者问题详细介绍

java 中多线程生产者消费者问题前言:一般面试喜欢问些线程的问题,较基础的问题无非就是死锁,生产者消费者问题,线程同步等等,在前面的文章有写过死锁,这里就说下多生产多消费的问题了import java.util.concurrent.lo
2023-05-31

Java多线程中消费者与生产者的关系是什么

这篇文章将为大家详细讲解有关Java多线程中消费者与生产者的关系是什么,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。  多线程:CPU中各种任务在交替执行过程中,被称为多线程处理。其中,每个任务的一次动态
2023-06-02

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录