久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

如何實(shí)現(xiàn)TDMQ中的Pulsar 廣播

共計(jì) 5964 個(gè)字符,預(yù)計(jì)需要花費(fèi) 15 分鐘才能閱讀完成。

如何實(shí)現(xiàn) TDMQ 中的 Pulsar 廣播,很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面丸趣 TV 小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。

Pulsar 作為 Apache 社區(qū)的相對(duì)新的成員,在業(yè)界受到非常大量的關(guān)注。新產(chǎn)品的文檔相對(duì)不齊全也是非常能夠理解的。今天客戶問過來廣播怎么實(shí)現(xiàn)的,我解釋了半天,又找了很多介紹產(chǎn)品的 PPT,最終也沒有找到“官方”的文檔說明這個(gè)事情。于是我就寫了這篇文章,方便大家  copy/paste。

Pulsar 訂閱模型分類

Pulsar 支持的幾種模式如下,依次是   獨(dú)占模式  /  高可用模式  /  分享模式  /  基于鍵值   的分享模式。

如何實(shí)現(xiàn) TDMQ 中的 Pulsar 廣播
 

 

Pulsar 廣播模式

Pulsar 的訂閱模式和很多 MQ 不太一樣。比如 RabbitMQ/Kafka 等,一般消費(fèi)端(Consumer)是直接去對(duì)接 Topic 的,然后 Consumer 自己又有個(gè)組的概念在配置中心去設(shè)置 offset,以此來決定是一起分享 Topic 的數(shù)據(jù),還是每個(gè)人都接收同樣的數(shù)據(jù)。在 Pulsar 的消費(fèi)訂閱模型里,添加了一個(gè) Subscription 的邏輯,Subscription 的 Type 決定了消費(fèi)是獨(dú)享還是分享。

于是廣播模式可以用不同 Subscription 獨(dú)享的模式來實(shí)現(xiàn),具體架構(gòu)可以參照下圖:

如何實(shí)現(xiàn) TDMQ 中的 Pulsar 廣播
 

 

代碼實(shí)現(xiàn)

1. Full-mesh 的形創(chuàng)建 Java 項(xiàng)目(比如:Springboot – 這個(gè)應(yīng)該是相對(duì)簡(jiǎn)單的 IDE 集成開發(fā)組件)

畫重點(diǎn)

pulsar-client-api 和  tdmq-client 需要 2.6.0tdmq-client 需要在騰訊的 repo 里才能拿到,需要使用介紹鏈接介紹的方式進(jìn)行 maven 的配置(gradle 方法類似)

介紹鏈接:https://cloud.tencent.com/document/product/1179/44914

?xml version= 1.0 encoding= UTF-8 ? project xmlns= http://maven.apache.org/POM/4.0.0  xmlns:xsi= http://www.w3.org/2001/XMLSchema-instance  xsi:schemaLocation= http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd   modelVersion 4.0.0 /modelVersion   parent     groupId org.springframework.boot /groupId     artifactId spring-boot-starter-parent /artifactId     version 2.4.3 /version     relativePath / !-- lookup parent from repository --   /parent   groupId com.examble.demo /groupId   artifactId tdmq-demo /artifactId   version 0.0.1-SNAPSHOT /version   name tdmq-demo /name   description demo project to test tdmq /description   properties     java.version 1.8 /java.version   /properties   dependencies     dependency       groupId org.springframework.boot /groupId       artifactId spring-boot-starter-web /artifactId     /dependency     dependency       groupId com.tencent.tdmq /groupId       artifactId tdmq-client /artifactId       version 2.6.0 /version     /dependency     !-- https://mvnrepository.com/artifact/org.apache.pulsar/pulsar-client-api --     dependency       groupId org.apache.pulsar /groupId       artifactId pulsar-client-api /artifactId       version 2.6.0 /version     /dependency     dependency       groupId org.springframework.boot /groupId       artifactId spring-boot-starter-test /artifactId       scope test /scope     /dependency   /dependencies 
  build     plugins       plugin         groupId org.springframework.boot /groupId         artifactId spring-boot-maven-plugin /artifactId       /plugin     /plugins   /build
/project

 

2. 創(chuàng)建一個(gè) Component 用來全局使用 Producer 和 Consumers

這里創(chuàng)建了 1 個(gè) Producer 和 3 個(gè)擁有 exclusive subscription 的 consumers(廣播模式 – 我們期待他們 3 個(gè)每次都收到一樣的信息)

package com.example.demo.tdmq.instance;
import javax.annotation.PostConstruct;
import org.apache.pulsar.client.api.AuthenticationFactory;import org.apache.pulsar.client.api.Consumer;import org.apache.pulsar.client.api.Message;import org.apache.pulsar.client.api.MessageListener;import org.apache.pulsar.client.api.Producer;import org.apache.pulsar.client.api.PulsarClient;import org.apache.pulsar.client.api.PulsarClientException;import org.apache.pulsar.client.api.SubscriptionType;import org.springframework.beans.factory.config.ConfigurableBeanFactory;import org.springframework.context.annotation.Scope;import org.springframework.stereotype.Component;
@Component@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)public class Global { PulsarClient client;  public Producer byte[] producer;  public Consumer byte[] consumer01;  public Consumer byte[] consumer02;  public Consumer byte[] consumer03;
 public Global() {
 }
 @PostConstruct  public void init() {    try {      client = PulsarClient.builder().serviceUrl(pulsar:// Your TDMQ Pulsar Service URL :6000/)          .listenerName(custom: TDMQ Pulsar Instance ID / TDMQ VPC ID / TDMQ Subnet ID)          .authentication(AuthenticationFactory.token(               Your Credential Token from TDMQ))          .build();      producer = client.newProducer().topic(persistent:// TDMQ Pulsar Instance ID / your name space / your topic).create();      consumer01 = client.newConsumer().subscriptionType(SubscriptionType.Exclusive)          .topic(persistent:// TDMQ Pulsar Instance ID / your name space / your topic)          .messageListener(new MessageListener byte[] () {
           /**             *             */            private static final long serialVersionUID = 1L;
           @Override            public void received(Consumer byte[] consumer, Message byte[] msg) {             System.out.println( Consumer01 + - + System.currentTimeMillis() + -                  + new String(msg.getData()));              try {               consumer.acknowledge(msg);              } catch (PulsarClientException e) {               // TODO Auto-generated catch block                e.printStackTrace();              }
           }          }).subscriptionName(my-subscription01).subscribe();      consumer02 = client.newConsumer().subscriptionType(SubscriptionType.Exclusive)          .topic(persistent:// TDMQ Pulsar Instance ID / your name space / your topic)          .messageListener(new MessageListener byte[] () {
           /**             *             */            private static final long serialVersionUID = 1L;
           @Override            public void received(Consumer byte[] consumer, Message byte[] msg) {             System.out.println( Consumer02 + - + System.currentTimeMillis() + -                  + new String(msg.getData()));              try {               consumer.acknowledge(msg);              } catch (PulsarClientException e) {               // TODO Auto-generated catch block                e.printStackTrace();              }
           }          }).subscriptionName(my-subscription02).subscribe();      consumer03 = client.newConsumer().subscriptionType(SubscriptionType.Exclusive)          .topic(persistent:// TDMQ Pulsar Instance ID / your name space / your topic)          .messageListener(new MessageListener byte[] () {
           /**             *             */            private static final long serialVersionUID = 1L;
           @Override            public void received(Consumer byte[] consumer, Message byte[] msg) {             System.out.println( Consumer03 + - + System.currentTimeMillis() + -                  + new String(msg.getData()));              try {               consumer.acknowledge(msg);              } catch (PulsarClientException e) {               // TODO Auto-generated catch block                e.printStackTrace();              }
           }          }).subscriptionName(my-subscription03).subscribe();
   } catch (PulsarClientException e) {     // TODO Auto-generated catch block      e.printStackTrace();    }  }
}

 

3. 最外層的測(cè)試代碼和簡(jiǎn)單的 Message 模型

public class MessageModel {
 private String messageText = null;
 public String getMessageText() {    return messageText;  }
 public void setMessageText(String messageText) {   this.messageText = messageText;  }}

 

跑起來測(cè)試一下,果然 3 個(gè)一起接收一樣的消息

如何實(shí)現(xiàn) TDMQ 中的 Pulsar 廣播   

看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注丸趣 TV 行業(yè)資訊頻道,感謝您對(duì)丸趣 TV 的支持。

正文完
 
丸趣
版權(quán)聲明:本站原創(chuàng)文章,由 丸趣 2023-08-25發(fā)表,共計(jì)5964字。
轉(zhuǎn)載說明:除特殊說明外本站除技術(shù)相關(guān)以外文章皆由網(wǎng)絡(luò)搜集發(fā)布,轉(zhuǎn)載請(qǐng)注明出處。
評(píng)論(沒有評(píng)論)
主站蜘蛛池模板: 宁化县| 温州市| 鹤山市| 格尔木市| 南岸区| 阜新| 大石桥市| 磐安县| 鱼台县| 澄江县| 中山市| 平阴县| 广灵县| 西贡区| 柞水县| 望城县| 韶山市| 任丘市| 鞍山市| 浪卡子县| 玛多县| 阜南县| 东丽区| 柳江县| 运城市| 浪卡子县| 江阴市| 长岛县| 星子县| 咸宁市| 涡阳县| 盘山县| 邹平县| 崇阳县| 星子县| 大悟县| 图们市| 锡林郭勒盟| 遂昌县| 承德市| 抚顺县|