久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

如何理解分布式事務(wù)框架seata

194次閱讀
沒有評論

共計 14590 個字符,預(yù)計需要花費 37 分鐘才能閱讀完成。

這篇文章將為大家詳細講解有關(guān)如何理解分布式事務(wù)框架 seata-golang 通信模型,文章內(nèi)容質(zhì)量較高,因此丸趣 TV 小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

一、簡介

Java 的世界里,大家廣泛使用的一個高性能網(wǎng)絡(luò)通信框架 netty,很多 RPC 框架都是基于 netty 來實現(xiàn)的。在 golang 的世界里,getty 也是一個類似 netty 的高性能網(wǎng)絡(luò)通信庫。getty 最初由 dubbogo 項目負責(zé)人于雨開發(fā),作為底層通信庫在 dubbo-go 中使用。隨著 dubbo-go 捐獻給 apache 基金會,在社區(qū)小伙伴的共同努力下,getty 也最終進入到 apache 這個大家庭,并改名 dubbo-getty。

二、如何基于 getty 實現(xiàn) RPC 通信

getty 框架的整體模型圖如下:

如何理解分布式事務(wù)框架 seata-golang 通信模型

下面結(jié)合相關(guān)代碼,詳述 seata-golang 的 RPC 通信過程。

1. 建立連接

實現(xiàn) RPC 通信,首先要建立網(wǎng)絡(luò)連接吧,我們從 client.go 開始看起。

func (c *client) connect() {
 var (
 err error
 ss Session
 for {
 //  建立一個  session  連接
 ss = c.dial()
 if ss == nil {
 // client has been closed
 break
 err = c.newSession(ss)
 if err == nil {
 //  收發(fā)報文
 ss.(*session).run()
 //  此處省略部分代碼
 
 break
 // don t distinguish between tcp connection and websocket connection. Because
 // gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close()
 ss.Conn().Close()
}

connect() 方法通過 dial() 方法得到了一個 session 連接,進入 dial() 方法:

func (c *client) dial() Session {
 switch c.endPointType {
 case TCP_CLIENT:
 return c.dialTCP()
 case UDP_CLIENT:
 return c.dialUDP()
 case WS_CLIENT:
 return c.dialWS()
 case WSS_CLIENT:
 return c.dialWSS()
 return nil
}

我們關(guān)注的是 TCP 連接,所以繼續(xù)進入 c.dialTCP() 方法:

func (c *client) dialTCP() Session {
 var (
 err error
 conn net.Conn
 for {if c.IsClosed() {
 return nil
 if c.sslEnabled {if sslConfig, err := c.tlsConfigBuilder.BuildTlsConfig(); err == nil   sslConfig != nil {d :=  net.Dialer{Timeout: connectTimeout}
 //  建立加密連接
 conn, err = tls.DialWithDialer(d,  tcp , c.addr, sslConfig)
 } else {
 //  建立  tcp  連接
 conn, err = net.DialTimeout(tcp , c.addr, connectTimeout)
 if err == nil   gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {conn.Close()
 err = errSelfConnect
 if err == nil {
 //  返回一個  TCPSession
 return newTCPSession(conn, c)
 log.Infof(net.DialTimeout(addr:%s, timeout:%v) = error:%+v , c.addr, connectTimeout, perrors.WithStack(err))
 -wheel.After(connectInterval)
}

至此,我們知道了 getty 如何建立 TCP 連接,并返回 TCPSession。

2. 收發(fā)報文

那它是怎么收發(fā)報文的呢,我們回到 connection 方法接著往下看,有這樣一行 ss.(*session).run(),在這行代碼之后代碼都是很簡單的操作,我們猜測這行代碼運行的邏輯里面一定包含收發(fā)報文的邏輯,接著進入 run() 方法:

func (s *session) run() {
 //  省略部分代碼
 
 go s.handleLoop()
 go s.handlePackage()}

br / 這里起了兩個 goroutine,handleLoop 和 handlePackage,看字面意思符合我們的猜想,進入 handleLoop() 方法:br /

func (s *session) handleLoop() {
 //  省略部分代碼
 
 for {
 // A select blocks until one of its cases is ready to run.
 // It choose one at random if multiple are ready. Otherwise it choose default branch if none is ready.
 select {
 //  省略部分代碼
 
 case outPkg, ok =  -s.wQ:
 //  省略部分代碼
 iovec = iovec[:0]
 for idx := 0; idx   maxIovecNum; idx++ { //  通過  s.writer  將  interface{}  類型的  outPkg  編碼成二進制的比特
 pkgBytes, err = s.writer.Write(s, outPkg)
 //  省略部分代碼
 
 iovec = append(iovec, pkgBytes)
 // 省略部分代碼
 //  將這些二進制比特發(fā)送出去
 err = s.WriteBytesArray(iovec[:]...)
 if err != nil {log.Errorf( %s, [session.handleLoop]s.WriteBytesArray(iovec len:%d) = error:%+v ,
 s.sessionToken(), len(iovec), perrors.WithStack(err))
 s.stop()
 // break LOOP
 flag = false
 case  -wheel.After(s.period):
 if flag {
 if wsFlag {err := wsConn.writePing()
 if err != nil {log.Warnf( wsConn.writePing() = error:%+v , perrors.WithStack(err))
 //  定時執(zhí)行的邏輯,心跳等
 s.listener.OnCron(s)
}

通過上面的代碼,我們不難發(fā)現(xiàn),handleLoop() 方法處理的是發(fā)送報文的邏輯,RPC 需要發(fā)送的消息首先由 s.writer 編碼成二進制比特,然后通過建立的 TCP 連接發(fā)送出去。這個 s.writer 對應(yīng)的 Writer 接口是 RPC 框架必須要實現(xiàn)的一個接口。

繼續(xù)看 handlePackage() 方法:

func (s *session) handlePackage() {
 //  省略部分代碼
 if _, ok := s.Connection.(*gettyTCPConn); ok {
 if s.reader == nil {errStr := fmt.Sprintf( session{name:%s, conn:%#v, reader:%#v} , s.name, s.Connection, s.reader)
 log.Error(errStr)
 panic(errStr)
 err = s.handleTCPPackage()} else if _, ok := s.Connection.(*gettyWSConn); ok {err = s.handleWSPackage()
 } else if _, ok := s.Connection.(*gettyUDPConn); ok {err = s.handleUDPPackage()
 } else {panic(fmt.Sprintf( unknown type session{%#v} , s))
}

進入 handleTCPPackage() 方法:

func (s *session) handleTCPPackage() error {
 //  省略部分代碼
 conn = s.Connection.(*gettyTCPConn)
 for {
 //  省略部分代碼
 bufLen = 0
 for {
 // for clause for the network timeout condition check
 // s.conn.SetReadTimeout(time.Now().Add(s.rTimeout))
 //  從  TCP  連接中收到報文
 bufLen, err = conn.recv(buf)
 //  省略部分代碼
 
 break
 //  省略部分代碼
 
 //  將收到的報文二進制比特寫入  pkgBuf
 pktBuf.Write(buf[:bufLen])
 for {if pktBuf.Len()  = 0 {
 break
 //  通過  s.reader  將收到的報文解碼成  RPC  消息
 pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes())
 //  省略部分代碼
 s.UpdateActive()
 //  將收到的消息放入  TaskQueue  供  RPC  消費端消費
 s.addTask(pkg)
 pktBuf.Next(pkgLen)
 // continue to handle case 5
 if exit {
 break
 return perrors.WithStack(err)
}

從上面的代碼邏輯我們分析出,RPC 消費端需要將從 TCP 連接收到的二進制比特報文解碼成 RPC 能消費的消息,這個工作由 s.reader 實現(xiàn),所以,我們要構(gòu)建 RPC 通信層也需要實現(xiàn) s.reader 對應(yīng)的 Reader 接口。

3. 底層處理網(wǎng)絡(luò)報文的邏輯如何與業(yè)務(wù)邏輯解耦

我們都知道,netty 通過 boss 線程和 worker 線程實現(xiàn)了底層網(wǎng)絡(luò)邏輯和業(yè)務(wù)邏輯的解耦。那么,getty 是如何實現(xiàn)的呢?

在 handlePackage() 方法最后,我們看到,收到的消息被放入了 s.addTask(pkg) 這個方法,接著往下分析:

func (s *session) addTask(pkg interface{}) {f := func() {s.listener.OnMessage(s, pkg)
 s.incReadPkgNum()
 if taskPool := s.EndPoint().GetTaskPool(); taskPool != nil {taskPool.AddTaskAlways(f)
 return
}

pkg 參數(shù)傳遞到了一個匿名方法,這個方法最終放入了 taskPool。這個方法很關(guān)鍵,在我后來寫 seata-golang 代碼的時候,就遇到了一個坑,這個坑后面分析。

接著我們看一下 taskPool 的定義:

// NewTaskPoolSimple build a simple task pool
func NewTaskPoolSimple(size int) GenericTaskPool {
 if size   1 {size = runtime.NumCPU() * 100
 return  taskPoolSimple{work: make(chan task),
 sem: make(chan struct{}, size),
 done: make(chan struct{}),
}

構(gòu)建了一個緩沖大小為 size(默認為  runtime.NumCPU() * 100)的 channel sem。再看方法 AddTaskAlways(t task):

func (p *taskPoolSimple) AddTaskAlways(t task) {
 select {
 case  -p.done:
 return
 default:
 select {
 case p.work  - t:
 return
 default:
 select {
 case p.work  - t:
 case p.sem  - struct{}{}:
 p.wg.Add(1)
 go p.worker(t)
 default:
 goSafely(t)
}

加入的任務(wù),會先由 len(p.sem) 個 goroutine 去消費,如果沒有 goroutine 空閑,則會啟動一個臨時的 goroutine 去運行 t()。相當(dāng)于有  len(p.sem) 個 goroutine 組成了 goroutine pool,pool 中的 goroutine 去處理業(yè)務(wù)邏輯,而不是由處理網(wǎng)絡(luò)報文的 goroutine 去運行業(yè)務(wù)邏輯,從而實現(xiàn)了解耦。寫 seata-golang 時遇到的一個坑,就是忘記設(shè)置 taskPool 造成了處理業(yè)務(wù)邏輯和處理底層網(wǎng)絡(luò)報文邏輯的 goroutine 是同一個,我在業(yè)務(wù)邏輯中阻塞等待一個任務(wù)完成時,阻塞了整個 goroutine,使得阻塞期間收不到任何報文。

4. 具體實現(xiàn)

下面的代碼見 getty.go:

// Reader is used to unmarshal a complete pkg from buffer
type Reader interface {Read(Session, []byte) (interface{}, int, error)
// Writer is used to marshal pkg and write to session
type Writer interface {
 // if @Session is udpGettySession, the second parameter is UDPContext.
 Write(Session, interface{}) ([]byte, error)
// ReadWriter interface use for handle application packages
type ReadWriter interface {
 Reader
 Writer
}
// EventListener is used to process pkg that received from remote session
type EventListener interface {
 // invoked when session opened
 // If the return error is not nil, @Session will be closed.
 OnOpen(Session) error
 // invoked when session closed.
 OnClose(Session)
 // invoked when got error.
 OnError(Session, error)
 // invoked periodically, its period can be set by (Session)SetCronPeriod
 OnCron(Session)
 // invoked when getty received a package. Pls attention that do not handle long time
 // logic processing in this func. You d better set the package s maximum length.
 // If the message s length is greater than it, u should should return err in
 // Reader{Read} and getty will close this connection soon.
 // If ur logic processing in this func will take a long time, u should start a goroutine
 // pool(like working thread pool in cpp) to handle the processing asynchronously. Or u
 // can do the logic processing in other asynchronous way.
 // !!!In short, ur OnMessage callback func should return asap.
 // If this is a udp event listener, the second parameter type is UDPContext.
 OnMessage(Session, interface{})
}

通過對整個 getty 代碼的分析,我們只要實現(xiàn)  ReadWriter 來對 RPC   消息編解碼,再實現(xiàn) EventListener 來處理 RPC 消息的對應(yīng)的具體邏輯,將 ReadWriter 實現(xiàn)和 EventLister 實現(xiàn)注入到 RPC 的 Client 和 Server 端,則可實現(xiàn) RPC 通信。

4.1 編解碼協(xié)議實現(xiàn)

下面是 seata 協(xié)議的定義:

如何理解分布式事務(wù)框架 seata-golang 通信模型

在 ReadWriter 接口的實現(xiàn) RpcPackageHandler 中,調(diào)用 Codec 方法對消息體按照上面的格式編解碼:

//  消息編碼為二進制比特
func MessageEncoder(codecType byte, in interface{}) []byte {
 switch codecType {
 case SEATA:
 return SeataEncoder(in)
 default:
 log.Errorf(not support codecType, %s , codecType)
 return nil
//  二進制比特解碼為消息體
func MessageDecoder(codecType byte, in []byte) (interface{}, int) {
 switch codecType {
 case SEATA:
 return SeataDecoder(in)
 default:
 log.Errorf(not support codecType, %s , codecType)
 return nil, 0
}

4.2 Client 端實現(xiàn)

再來看 client 端 EventListener 的實現(xiàn) RpcRemotingClient:

func (client *RpcRemoteClient) OnOpen(session getty.Session) error {go func() 
 request := protocal.RegisterTMRequest{AbstractIdentifyRequest: protocal.AbstractIdentifyRequest{
 ApplicationId: client.conf.ApplicationId,
 TransactionServiceGroup: client.conf.TransactionServiceGroup,
 //  建立連接后向  Transaction Coordinator  發(fā)起注冊  TransactionManager  的請求
 _, err := client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT)
 if err == nil {
 //  將與  Transaction Coordinator  建立的連接保存在連接池供后續(xù)使用
 clientSessionManager.RegisterGettySession(session)
 client.GettySessionOnOpenChannel  - session.RemoteAddr()
 return nil
// OnError ...
func (client *RpcRemoteClient) OnError(session getty.Session, err error) {clientSessionManager.ReleaseGettySession(session)
// OnClose ...
func (client *RpcRemoteClient) OnClose(session getty.Session) {clientSessionManager.ReleaseGettySession(session)
// OnMessage ...
func (client *RpcRemoteClient) OnMessage(session getty.Session, pkg interface{}) {log.Info( received message:{%v} , pkg)
 rpcMessage, ok := pkg.(protocal.RpcMessage)
 if ok {heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage)
 if isHeartBeat   heartBeat == protocal.HeartBeatMessagePong {log.Debugf( received PONG from %s , session.RemoteAddr())
 if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST ||
 rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY {log.Debugf( msgId:%s, body:%v , rpcMessage.Id, rpcMessage.Body)
 
 //  處理事務(wù)消息,提交  or  回滾
 client.onMessage(rpcMessage, session.RemoteAddr())
 } else {resp, loaded := client.futures.Load(rpcMessage.Id)
 if loaded {response := resp.(*getty2.MessageFuture)
 response.Response = rpcMessage.Body
 response.Done  - true
 client.futures.Delete(rpcMessage.Id)
// OnCron ...
func (client *RpcRemoteClient) OnCron(session getty.Session) {
 //  發(fā)送心跳
 client.defaultSendRequest(session, protocal.HeartBeatMessagePing)
}

clientSessionManager.RegisterGettySession(session) 的邏輯將在下文中分析。

4.3 Server 端 Transaction Coordinator 實現(xiàn)

代碼見 DefaultCoordinator:

func (coordinator *DefaultCoordinator) OnOpen(session getty.Session) error {log.Infof( got getty_session:%s , session.Stat())
 return nil
func (coordinator *DefaultCoordinator) OnError(session getty.Session, err error) {
 //  釋放  TCP  連接
 SessionManager.ReleaseGettySession(session)
 session.Close()
 log.Errorf(getty_session{%s} got error{%v}, will be closed. , session.Stat(), err)
func (coordinator *DefaultCoordinator) OnClose(session getty.Session) {log.Info( getty_session{%s} is closing...... , session.Stat())
func (coordinator *DefaultCoordinator) OnMessage(session getty.Session, pkg interface{}) {log.Debugf( received message:{%v} , pkg)
 rpcMessage, ok := pkg.(protocal.RpcMessage)
 if ok {_, isRegTM := rpcMessage.Body.(protocal.RegisterTMRequest)
 if isRegTM {
 //  將  TransactionManager  信息和  TCP  連接建立映射關(guān)系
 coordinator.OnRegTmMessage(rpcMessage, session)
 return
 heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage)
 if isHeartBeat   heartBeat == protocal.HeartBeatMessagePing {coordinator.OnCheckMessage(rpcMessage, session)
 return
 if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST ||
 rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY {log.Debugf( msgId:%s, body:%v , rpcMessage.Id, rpcMessage.Body)
 _, isRegRM := rpcMessage.Body.(protocal.RegisterRMRequest)
 if isRegRM {
 //  將  ResourceManager  信息和  TCP  連接建立映射關(guān)系
 coordinator.OnRegRmMessage(rpcMessage, session)
 } else {if SessionManager.IsRegistered(session) {defer func() {if err := recover(); err != nil {log.Errorf( Catch Exception while do RPC, request: %v,err: %w , rpcMessage, err)
 //  處理事務(wù)消息,全局事務(wù)注冊、分支事務(wù)注冊、分支事務(wù)提交、全局事務(wù)回滾等
 coordinator.OnTrxMessage(rpcMessage, session)
 } else {session.Close()
 log.Infof(close a unhandled connection! [%v] , session)
 } else {resp, loaded := coordinator.futures.Load(rpcMessage.Id)
 if loaded {response := resp.(*getty2.MessageFuture)
 response.Response = rpcMessage.Body
 response.Done  - true
 coordinator.futures.Delete(rpcMessage.Id)
func (coordinator *DefaultCoordinator) OnCron(session getty.Session) {}

coordinator.OnRegTmMessage(rpcMessage, session) 注冊 Transaction Manager,coordinator.OnRegRmMessage(rpcMessage, session) 注冊 Resource Manager。具體邏輯分析見下文。

消息進入 coordinator.OnTrxMessage(rpcMessage, session) 方法,將按照消息的類型碼路由到具體的邏輯當(dāng)中:

switch msg.GetTypeCode() {
 case protocal.TypeGlobalBegin:
 req := msg.(protocal.GlobalBeginRequest)
 resp := coordinator.doGlobalBegin(req, ctx)
 return resp
 case protocal.TypeGlobalStatus:
 req := msg.(protocal.GlobalStatusRequest)
 resp := coordinator.doGlobalStatus(req, ctx)
 return resp
 case protocal.TypeGlobalReport:
 req := msg.(protocal.GlobalReportRequest)
 resp := coordinator.doGlobalReport(req, ctx)
 return resp
 case protocal.TypeGlobalCommit:
 req := msg.(protocal.GlobalCommitRequest)
 resp := coordinator.doGlobalCommit(req, ctx)
 return resp
 case protocal.TypeGlobalRollback:
 req := msg.(protocal.GlobalRollbackRequest)
 resp := coordinator.doGlobalRollback(req, ctx)
 return resp
 case protocal.TypeBranchRegister:
 req := msg.(protocal.BranchRegisterRequest)
 resp := coordinator.doBranchRegister(req, ctx)
 return resp
 case protocal.TypeBranchStatusReport:
 req := msg.(protocal.BranchReportRequest)
 resp := coordinator.doBranchReport(req, ctx)
 return resp
 default:
 return nil
 }

4.4 session manager 分析

Client 端同 Transaction Coordinator 建立連接起連接后,通過 clientSessionManager.RegisterGettySession(session) 將連接保存在 serverSessions = sync.Map{} 這個 map 中。map 的 key 為從 session 中獲取的 RemoteAddress 即 Transaction Coordinator 的地址,value 為 session。這樣,Client 端就可以通過 map 中的一個 session 來向 Transaction Coordinator 注冊 Transaction Manager 和 Resource Manager 了。具體代碼見 getty_client_session_manager.go。

Transaction Manager 和 Resource Manager 注冊到 Transaction Coordinator 后,一個連接既有可能用來發(fā)送 TM 消息也有可能用來發(fā)送 RM 消息。我們通過 RpcContext 來標(biāo)識一個連接信息:

type RpcContext struct {
 Version string
 TransactionServiceGroup string
 ClientRole meta.TransactionRole
 ApplicationId string
 ClientId string
 ResourceSets *model.Set
 Session getty.Session
}

當(dāng)收到事務(wù)消息時,我們需要構(gòu)造這樣一個 RpcContext 供后續(xù)事務(wù)處理邏輯使用。所以,我們會構(gòu)造下列 map 來緩存映射關(guān)系:

var (
 // session -  transactionRole
 // TM will register before RM, if a session is not the TM registered,
 // it will be the RM registered
 session_transactionroles = sync.Map{}
 // session -  applicationId
 identified_sessions = sync.Map{}
 // applicationId -  ip -  port -  session
 client_sessions = sync.Map{}
 // applicationId -  resourceIds
 client_resources = sync.Map{})

這樣,Transaction Manager 和 Resource Manager 分別通過 coordinator.OnRegTmMessage(rpcMessage, session) 和 coordinator.OnRegRmMessage(rpcMessage, session) 注冊到 Transaction Coordinator 時,會在上述 client_sessions map 中緩存 applicationId、ip、port 與 session 的關(guān)系,在 client_resources map 中緩存 applicationId 與 resourceIds(一個應(yīng)用可能存在多個 Resource Manager)的關(guān)系。在需要時,我們就可以通過上述映射關(guān)系構(gòu)造一個 RpcContext。這部分的實現(xiàn)和 java 版 seata 有很大的不同,感興趣的可以深入了解一下。具體代碼見 getty_session_manager.go。

至此,我們就分析完了 seata-golang 整個 RPC 通信模型的機制。

三、seata-golang 的未來

seata-golang   從今年 4 月份開始開發(fā),到 8 月份基本實現(xiàn)和 java 版 seata 1.2 協(xié)議的互通,對 mysql 數(shù)據(jù)庫實現(xiàn)了 AT 模式(自動協(xié)調(diào)分布式事務(wù)的提交回滾),實現(xiàn)了 TCC 模式,TC 端使用 mysql 存儲數(shù)據(jù),使 TC 變成一個無狀態(tài)應(yīng)用支持高可用部署。下圖展示了 AT 模式的原理:

如何理解分布式事務(wù)框架 seata-golang 通信模型

關(guān)于如何理解分布式事務(wù)框架 seata-golang 通信模型就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

正文完
 
丸趣
版權(quán)聲明:本站原創(chuàng)文章,由 丸趣 2023-08-25發(fā)表,共計14590字。
轉(zhuǎn)載說明:除特殊說明外本站除技術(shù)相關(guān)以外文章皆由網(wǎng)絡(luò)搜集發(fā)布,轉(zhuǎn)載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 丰台区| 平武县| 巴彦县| 崇明县| 城固县| 呼玛县| 布拖县| 治多县| 仁怀市| 泾川县| 岳西县| 洛川县| 安义县| 桃园市| 渝北区| 蓬溪县| 漳浦县| 潼关县| 商水县| 武安市| 芦山县| 凉城县| 游戏| 盈江县| 卢龙县| 兴海县| 连南| 班戈县| 荔波县| 滦南县| 红原县| 青冈县| 望都县| 卢湾区| 仪征市| 闸北区| 灵璧县| 兴义市| 元朗区| 仙居县| 保靖县|