一文详解Elasticsearch和MySQL之间的数据同步问题
Elasticsearch中的数据是来自于Mysql数据库的,因此当数据库中的数据进行增删改后,Elasticsearch中的数据,索引也必须跟着做出改变。而对于管理服务(MySQL)和搜索服务(Elasticsearch)往往会在不同的微服务上。
可以通过微服务之间的同步调用来解决数据同步问题,虽然实现起来比较简单,但是在搜索服务中引入管理服务时,业务的耦合度相对来说是比较高的。因此可以通过消息队列的形式来异步通知管理服务的改变。这样做的有优点是耦合度较低,但是依赖于消息队列的耦合度。
下面根据一个例子来介绍使用RabbitMQ来解决Elasticsearch和MySQL之间的数据同步问题。当酒店数据库中发送增删改时,Elasticsearch中的数据也同时发生对应的改变。
首先在两块微服务中引入RabbitMQ的依赖:
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
创建常量类,将交换机和队列名称设置为常量,使用时方便取名。
public class MqConstants {
public final static String HOTEL_EXCHANGE = "hotel.topic";
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
public final static String HOTEL_INSERT_KEY = "hotel.insert";
public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
创建配置类,声明交换机,并将路由键,队列与交换机之间互相绑定:
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false);
}
@Bean
public Queue insertQueue(){
return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
}
@Bean
public Queue deleteQueue(){
return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true);
}
@Bean
public Binding insertQueueBinding(){
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
}
}
编写controller层,将酒店数据保存到数据库中,并将消息发送到消息队列里:
@Autowired
private IHotelService hotelService;
@Autowired
private RabbitTemplate rabbitTemplate;
@PutMapping
public void save(@RequestBody Hotel hotel){
hotelService.save(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id){
hotelService.removeById(id);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
}
在service中加入两个方法,根据id增加和根据id删除:
void insertById(Long id);
void deleteById(Long id);
利用Ctrl+Alt+B的快捷键,在service的实现类里面重写方法进行实现:
@Override
public void insertById(Long id) {
try {
Hotel hotel = getById(id);
HotelDoc hotelDoc = new HotelDoc(hotel);
// 1.准备Request
IndexRequest request = new IndexRequest("hotel").id(hotelDoc.getId().toString());
// 2.准备请求参数DSL,其实就是文档的JSON字符串
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
// 3.发送请求
client.index(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void deleteById(Long id) {
try {
DeleteRequest request = new DeleteRequest("hotel", id.toString());
// 2.发送请求
client.delete(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
这样即可完成Elasticsearch和MySQL之间的数据同步。
到此这篇关于一文详解Elasticsearch和MySQL之间的数据同步问题的文章就介绍到这了,更多相关Elasticsearch MySQL数据同步内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341