KafkaAdminClient-主题分区管理

主题管理

创建主题

public void create(String topic, int partitions, int replication, Map<string> configs) throws Exception {
// 为了兼容性增加一层副本系数和节点数量的判断
if (replication > getBrokerNums())
throw new RuntimeException("副本系数不能大于broker节点数量");
short replication_short = (short) replication;
NewTopic newTopic = new NewTopic(topic, partitions, replication_short);
// 创建主题的相关配置
if (null != configs && configs.size() > 0)
newTopic.configs(configs);
CreateTopicsResult result = adminClient.createTopics(Arrays.asList(newTopic));
result.all().get(timeout, TimeUnit.SECONDS);
}
/<string>

修改主题

public void update(String topic, List<alterconfigop> alterConfigOps) throws Exception {
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
Map<configresource>> configs = new HashMap<>();
configs.put(resource, alterConfigOps);
adminClient.incrementalAlterConfigs(configs).all().get(timeout, TimeUnit.SECONDS);
}
// 新增、更新的参数
// ConfigEntry configEntry = new ConfigEntry(property.getKey(), property.getValue());
// AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET);
// 删除的参数
// ConfigEntry configEntry = new ConfigEntry(deleteProperty, null);
// AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry, AlterConfigOp.OpType.DELETE);
/<configresource>/<alterconfigop>

删除主题

public void delete(String topic) throws Exception {
// 服务端server.properties需要设置delete.topic.enable=true,才可以使用同步删除,否则只是将主题标记为删除
adminClient.deleteTopics(Arrays.asList(topic));
}

列出主题

public Set<string> list() throws Exception {
ListTopicsResult listTopicsResult = adminClient.listTopics();
Set<string> topics = listTopicsResult.names().get(timeout, TimeUnit.SECONDS);
return topics;
}
/<string>/<string>

描述主题

public TopicDescription describe(String topic) throws Exception {
TopicDescription description = adminClient.describeTopics(Arrays.asList(topic)).all()
.get(timeout, TimeUnit.SECONDS).get(topic);
return description;
}

分区管理

列出分区

public List<integer> partitions(String topic) throws Exception {
List<topicpartitioninfo> partitionInfos = describe(topic).partitions();
List<integer> result = new ArrayList<>();
for (TopicPartitionInfo partitionInfo : partitionInfos) {
result.add(partitionInfo.partition());
}
return result;
}
public List<topicpartition> topicPartitions(String topic) throws Exception {
List<topicpartitioninfo> partitionInfos = describe(topic).partitions();
List<topicpartition> result = new ArrayList<>();
for (TopicPartitionInfo partitionInfo : partitionInfos) {
result.add(new TopicPartition(topic, partitionInfo.partition()));
}
return result;
}
/<topicpartition>/<topicpartitioninfo>/<topicpartition>/<integer>/<topicpartitioninfo>/<integer>

新增分区

public void addPartitions(String topic, Integer numPartitions) throws Exception {
NewPartitions newPartitions = NewPartitions.increaseTo(numPartitions);
Map<string> map = new HashMap<>(1, 1);
map.put(topic, newPartitions);
adminClient.createPartitions(map).all().get(timeout, TimeUnit.SECONDS);
}
/<string>

本文由博客一文多发平台 https://openwrite.cn?from=article_bottom 发布!


分享到:


相關文章: