Java使用pulsar-flink-connector读取pulsar catalog元数据代码剖析
短信预约 -IT技能 免费直播动态提醒
简介
通过 pulsar-flink-connector 读取到 Apache pulsar 中的namespaces、topics的元数据信息。
pulsar-flink-connector 的 github: https://github.com/streamnative/pulsar-flink
Maven
<dependency>
<groupId>io.streamnative.connectors</groupId>
<artifactId>pulsar-flink-connector-2.11-1.12</artifactId>
<version>2.7.3</version>
</dependency>
<!-- JAR repositories -->
<repositories>
<repository>
<id>central</id>
<layout>default</layout>
<url>https://repo1.maven.org/maven2</url>
</repository>
<repository>
<id>bintray-streamnative-maven</id>
<name>bintray</name>
<url>https://dl.bintray.com/streamnative/maven</url>
</repository>
</repositories>
CODE
使用PulsarMetadataReader获取元数据
package com.levi.demo;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class Test {
public static void main(String[] args) {
final ClientConfigurationData configurationData = new ClientConfigurationData();
configurationData.setServiceUrl("pulsar://127.0.0.1:6650");
//Your Pulsar Token
final AuthenticationToken token =
new AuthenticationToken(
"eyJxxxxxxxxxxx.eyxxxxxxxxxxxxx.xxxxxxxxxxx");
configurationData.setAuthentication(token);
try (final PulsarMetadataReader reader =
new PulsarMetadataReader("http://127.0.0.1:8443",
configurationData,
"",
new HashMap(),
-1,
-1)) {
//获取namespaces
final List<String> namespaces = reader.listNamespaces();
System.out.println("namespaces: " + namespaces.toString());
for (final String namespace : namespaces) {
//获取Topics
final List<String> topics = reader.getTopics(namespace);
System.out.println("topic: " + topics.toString());
for (String topic : topics) {
//获取字段SchemaInfo
final SchemaInfo schemaInfo = reader.getPulsarSchema(topic);
final String name = schemaInfo.getName();
System.out.println("SchemaName:" + name); //topicName
final SchemaType type = schemaInfo.getType();
System.out.println("SchemaType:" + type.toString());// "JSON"...
final Map<String, String> properties = schemaInfo.getProperties();
System.out.println(properties);
final String schemaDefinition = schemaInfo.getSchemaDefinition();
System.out.println(schemaDefinition); // Field info.
}
}
} catch (IOException | PulsarAdminException e) {
e.printStackTrace();
}
}
}
到此这篇关于Java使用pulsar-flink-connector读取pulsar catalog元数据的文章就介绍到这了,更多相关Java读取pulsar catalog元数据内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341