作者 | 周禮
出品丨AI 科技大本營(ID:rgznai100)
![]()
本文整理自阿里云智能集團(tuán)高級(jí)技術(shù)專家周禮在 上的精彩演講《Apache RocketMQ x AI:面向異步化 Agent 的事件驅(qū)動(dòng)架構(gòu)》,介紹了如何基于 Apache RocketMQ 新特性構(gòu)建異步化 Multi-Agent 系統(tǒng),深入探討了 Agent 間的異步通信、上下文隔離、狀態(tài)恢復(fù)與任務(wù)編排機(jī)制,并通過實(shí)際案例展示如何利用 RocketMQ 實(shí)現(xiàn) Multi-Agent 的任務(wù)調(diào)度。
![]()
Multi-Agent 協(xié)同的核心:Agent 能力發(fā)現(xiàn)與任務(wù)閉環(huán)
隨著大模型能力提升與推理成本下降,MCP、A2A 等協(xié)作協(xié)議的成熟,AI 邁入了 Agentic AI 的時(shí)代,AI 應(yīng)用也從“被動(dòng)響應(yīng)”進(jìn)入了“主動(dòng)決策、自主執(zhí)行”階段。這一演進(jìn)催生了 Multi-Agent 架構(gòu):任務(wù)由多個(gè)專業(yè)化 Agent 協(xié)同完成,不再依賴單一模型或固定流程,開發(fā)者得以在模型自主性與業(yè)務(wù)可控性之間實(shí)現(xiàn)平衡。
與傳統(tǒng)應(yīng)用的固定編排不同,Agentic AI 具備自主規(guī)劃能力,可將目標(biāo)拆解為動(dòng)態(tài)步驟,但規(guī)劃又依賴每一步的結(jié)果反饋,依據(jù)完整的結(jié)果鏈路,Supervisor 才能掌握進(jìn)展、評(píng)估成效,決定下一步行動(dòng)。因此,要實(shí)現(xiàn)真正高效、可靠的協(xié)同,僅靠大模型的推理能力遠(yuǎn)遠(yuǎn)不夠,Agent 之間的協(xié)作有兩個(gè)關(guān)鍵點(diǎn):能力發(fā)現(xiàn)與任務(wù)閉環(huán)。
1、Agent 能力發(fā)現(xiàn)
Agent 的能力發(fā)現(xiàn)有兩個(gè)主要功能:
1. 動(dòng)態(tài)注冊 Agent 的能力(如“我能做數(shù)據(jù)分析”、“我擅長文案生成”);
2. 支持 Supervisor Agent 在運(yùn)行時(shí)查詢并選擇合適的 Sub Agent 執(zhí)行任務(wù)。
若沒有 Agent 能力發(fā)現(xiàn)的功能,Agent 協(xié)作就只能依賴硬編碼,喪失了自主性與擴(kuò)展性。這一機(jī)制可以類比傳統(tǒng)微服務(wù)中的服務(wù)發(fā)現(xiàn)與地址查找,調(diào)用方依賴注冊中心查詢服務(wù)地址,查找的條件是編碼的 ServiceName。Agent 能力的發(fā)現(xiàn)與之相似而又有不同,它是面向語義的能力和意圖驅(qū)動(dòng)匹配,而能力識(shí)別和匹配又是交給大模型處理的,這個(gè)過程是實(shí)現(xiàn)智能分工的前提。
2、任務(wù)協(xié)同
在大模型(LLM)驅(qū)動(dòng)的多 Agent 系統(tǒng)中,多個(gè)智能體(Agent)通過協(xié)作、競爭或分工完成復(fù)雜任務(wù)。尤其在 Supervisor Agent 架構(gòu)中,Supervisor 作為系統(tǒng)的“大腦”,通過高層次的協(xié)調(diào)與管理,將多個(gè)專業(yè)化 Agent 組織成一個(gè)有機(jī)整體,從而完成單個(gè) Agent 無法勝任的復(fù)雜任務(wù)。在 Agent 能力的聲明與發(fā)現(xiàn)的基礎(chǔ)上,為了實(shí)現(xiàn) Supervisor 與其他 Agent 間的高效協(xié)同,需要設(shè)計(jì)合理的通信機(jī)制。不同的通信模式適用于不同場景,在靈活性、可擴(kuò)展性、控制力和性能之間各有取舍。
輪詢式通信:定期主動(dòng)查詢其他 Agent 的狀態(tài),獲取最新信息(如存儲(chǔ)于數(shù)據(jù)庫、Redis 中)。這種方式實(shí)現(xiàn)簡單,但是延遲高,頻繁輪詢浪費(fèi)資源,難以處理動(dòng)態(tài)拓?fù)渥兓?/p>
點(diǎn)對點(diǎn)通信(Point-to-Point Invocation):主動(dòng)調(diào)用另一個(gè) Agent 的接口,等待響應(yīng),如 REST API、gRPC、函數(shù)調(diào)用等方式。這種方式控制流明確,支持強(qiáng)一致性交互,但耦合度高,難以動(dòng)態(tài)調(diào)整,不利于異步處理。
發(fā)布-訂閱模式(Pub/Sub):Agent 將消息發(fā)布到特定主題(topic),其他感興趣的 Agent 訂閱該主題接收消息,解耦調(diào)用和被調(diào)用者,易于擴(kuò)展和水平伸縮。
我們考察基于發(fā)布/訂閱模式實(shí)現(xiàn) Agent 間異步調(diào)用的場景,Sub Agent 接收任務(wù)并完成后,需要將結(jié)果反饋給 Supervisor,并附帶上下文唯一標(biāo)識(shí)(如 Task_ID),以便 Supervisor 異步接收反饋并驅(qū)動(dòng)下一步?jīng)Q策。但要保證 Supervisor Agent 節(jié)點(diǎn)異步獲取到上次任務(wù)的結(jié)果,需要在異步場景下實(shí)現(xiàn)反饋機(jī)制,常見方案如下:
1. 獨(dú)占隊(duì)列:每個(gè) Supervisor 實(shí)例綁定獨(dú)立 Queue 或者 Topic 來接收下游回寫的結(jié)果 —— 資源開銷大,管理復(fù)雜,存在性能瓶頸;
2. 廣播過濾:Supervisor 集群共享消費(fèi)分組,所有實(shí)例接收全部消息并自行過濾 —— 產(chǎn)生大量無效流量,浪費(fèi)資源且有穩(wěn)定性風(fēng)險(xiǎn);
3. 共享存儲(chǔ):結(jié)果存儲(chǔ)在數(shù)據(jù)庫或者緩存 —— 更靈活可靠,但每次 Supervisor Agent 作為發(fā)起方,需要不停進(jìn)行輪詢以確定自己發(fā)起的 Task 是否已經(jīng)有結(jié)果產(chǎn)生,增加了存儲(chǔ)成本與交互成本,實(shí)際類似于上述的輪詢式通信。
可以看到,基于發(fā)布/訂閱模式實(shí)現(xiàn)通信時(shí)比較復(fù)雜,其核心原因是主流的分布式消息中間件主要面向的是靜態(tài)編排的業(yè)務(wù)場景,采用“發(fā)完即忘”(Fire-and-Forget)模式,不關(guān)心下游的反饋,這使得通信鏈路難以完成閉環(huán)。
![]()
RocketMQ 面向 Agentic AI 的新特性
接下來,我們探討如何實(shí)現(xiàn) Agent 的異步通信機(jī)制和動(dòng)態(tài)決策,RocketMQ 在傳統(tǒng)模式的基礎(chǔ)上進(jìn)行了針對性設(shè)計(jì),推出帶語義的 Topic 和 Lite-Topic 的新特性:以 Topic 語義作為能力注冊與發(fā)現(xiàn)的基礎(chǔ),解決調(diào)用誰的問題;以 Lite Topic 動(dòng)態(tài)綁定任務(wù)并等待結(jié)果消息,解決調(diào)用后異步獲取結(jié)果的問題。兩者結(jié)合,以更簡潔的方式實(shí)現(xiàn)需要反饋的異步任務(wù)驅(qū)動(dòng)模式。
1、從數(shù)據(jù)通道到語義載體:Topic 的智能化演進(jìn)
在傳統(tǒng)的消息系統(tǒng)中,Topic 僅作為數(shù)據(jù)傳輸?shù)耐ǖ来嬖冢x了“消息發(fā)往哪里”,但無法表達(dá)“為什么發(fā)”或“誰需要它”。然而,在 Multi-Agent 協(xié)同場景下,通信不再只是簡單的數(shù)據(jù)搬運(yùn),而是意圖驅(qū)動(dòng)的智能協(xié)作過程。為此,我們重新定義了 Topic:它不僅是消息的主題命名和分類,更是業(yè)務(wù)意圖與能力語義的載體。
通過將自然語言描述與結(jié)構(gòu)化元數(shù)據(jù)引入 Topic 定義,每個(gè) Topic 不再只是一個(gè)主題名稱,而是一個(gè)具備“自我表達(dá)能力”的協(xié)作單元。例如,一個(gè)實(shí)現(xiàn)了 A2A 協(xié)議中 AppCard 標(biāo)準(zhǔn)的 Topic 格式如下:
![]()
這樣的設(shè)計(jì)使得 Topic 具備了可讀性、可發(fā)現(xiàn)性與可推理性。結(jié)合 Nameserver 的服務(wù)注冊與發(fā)現(xiàn)機(jī)制,這些帶有語義標(biāo)簽的 Topic 可被統(tǒng)一索引和查詢。
每一個(gè) Agent 可以通過訂閱某個(gè)能描述自身能力的 Topic 來實(shí)現(xiàn)綁定關(guān)系,具備注冊和被發(fā)現(xiàn)的能力,上層 Agent(如 Supervisor Agent)可通過能力關(guān)鍵詞(如“數(shù)據(jù)分析”“內(nèi)容生成”)動(dòng)態(tài)發(fā)現(xiàn)并使用合適的 Topic 來異步驅(qū)動(dòng)下游的 Agent,在任務(wù)編排過程中,Supervisor Agent 能夠像調(diào)用函數(shù)一樣選擇 Topic,實(shí)現(xiàn)基于語義理解的動(dòng)態(tài)路由決策。
2、輕量級(jí)消費(fèi)模式:Lite-Topic
Lite-Topic 是在 RocketMQ 百萬隊(duì)列基礎(chǔ)上設(shè)計(jì)的一種新類型的 Topic,它無需預(yù)創(chuàng)建 Topic 和訂閱關(guān)系,并且能自動(dòng)管理生命周期,主要面向短期、小量消息傳輸、客戶端訂閱關(guān)系動(dòng)態(tài)臨時(shí)變化、訂閱集合高度個(gè)性化的場景。這種輕量化消費(fèi)模型天然就能支持粒度更細(xì)的資源隔離,從而支持異步場景下的結(jié)果反饋機(jī)制,保證 Sub Agent 回寫的結(jié)果能讓發(fā)起任務(wù)的 Supervisor Agent 獲取到。
為了維護(hù)這種“千人千面”的訂閱關(guān)系,我們提出一種去中心化 + 最終一致性的訂閱關(guān)系管理方式:將訂閱關(guān)系提前注冊到 Broker,避免每次請求重復(fù)傳輸,以增量注冊方式應(yīng)對新的模型中 Lite Topic 訂閱的頻繁變化,以全量方式做到最終一致性。同時(shí),在組織方式上不再以 Group 為維度,而是以 Client_ID 為維度管理的訂閱關(guān)系,可以稱之為每個(gè)客戶端的興趣集(InterestSet)。
不同客戶端的訂閱集合維護(hù)在服務(wù)側(cè):
增量/全量同步:實(shí)時(shí)性和最終一致性;
存活檢測:Proxy 通過心跳判斷 Client 是否在線,下線時(shí)通知 Broker 清理 InterestSet;
分片機(jī)制:客戶端的完整的訂閱集合分片存儲(chǔ)。
在消息讀取方面:不再使用傳統(tǒng) Pull / Pop 模型中客戶端針對每個(gè) Topic 的每個(gè) Queue 發(fā)起讀請求的模式,因?yàn)檫@會(huì)帶來數(shù)千個(gè)并發(fā)請求,連接與線程開銷會(huì)線性增長。讀請求不需攜帶 Topic,而只帶上自己的身份即可,這樣保證即使訂閱集合龐大,讀請求依然是輕量的。因?yàn)槊總€(gè) Lite-Topic 的消息量并不大(幾條到幾百條),發(fā)送流量也不高甚至較為離散,但是 Topic 數(shù)量多,所以需要一種新的分發(fā)機(jī)制,在仍然保證低延遲的同時(shí),降低 1:1 讀模式帶來的開銷。
我們引入一個(gè)事件驅(qū)動(dòng)的消息分發(fā)方式,核心組件是 ReadySet(就緒集合),就緒事件集合是每個(gè) Client_ID 的待讀取 Topic 隊(duì)列集合,維護(hù)客戶端的 Ready 事件,即“哪些 Topic 有消息可讀”,存放當(dāng)前有消息可讀的 Lite-Topic。在 RocketMQ 中能觸發(fā)這個(gè)集合變更的事件如下:
事件源
描述
新消息寫入事件
Producer 發(fā)送消息 → 匹配 InterestSet → 加入對應(yīng) client 的 ReadySet
ACK 事件
消費(fèi)者確認(rèn)部分消息后,若隊(duì)列仍有未讀消息
OrderLock 釋放事件
順序消費(fèi)鎖釋放后,下一消息變?yōu)榭勺x → 觸發(fā)加入
訂閱上線事件
客戶端完成注冊,若有消息 → 立即喚醒
![]()
這種直接訪問 ReadySet → 僅處理活躍 topic 的讀取方式,避免了每次讀請求遍歷客戶端所有訂閱集合進(jìn)而輪詢每個(gè) Lite Topic 帶來的無效讀操作。換個(gè)角度看,這個(gè)模型是:Pull 模型 + Push 語義。
保留 Pull 模型的優(yōu)勢:客戶端主動(dòng)控制流控、避免過載;客戶端只需發(fā)起一個(gè)輕量 Poll 請求;
引入 Push 語義的效率:Broker 主動(dòng)告知“哪些 Topic 有消息可讀”。
![]()
從整體上看,通過引入 InterestSet + ReadySet 的事件驅(qū)動(dòng)模型,InterestSet 維護(hù)客戶端的訂閱關(guān)系,并在事件觸發(fā)后分發(fā)到對應(yīng)的 ReadySet,將傳統(tǒng)“盲目輪詢”轉(zhuǎn)化為“精準(zhǔn)喚醒”,最終實(shí)現(xiàn)在大規(guī)模個(gè)性化訂閱場景下的高效、低延遲消息分發(fā)。
![]()
基于 RocketMQ 構(gòu)建異步 Multi-Agent 系統(tǒng)
首先,通過上述 Lite-Topic 的能力,我們可以在 Multi-Agent 中更簡潔地異步獲取 Sub Agent 的結(jié)果,Supervisor Agent 集群中,任何一個(gè) Supervisor Agent 都可以通過動(dòng)態(tài)訂閱 Lite-Topic(以 Task_id 命名)來接收下游任務(wù)結(jié)果,實(shí)現(xiàn)整個(gè)任務(wù)的閉環(huán)。
![]()
其次,再結(jié)合語義化 Topic 的 Agent 能力注冊與發(fā)現(xiàn),我們構(gòu)建了一套面向 Agentic AI 的高效異步協(xié)同架構(gòu)。其核心業(yè)務(wù)流程如下:
1. 能力注冊與發(fā)現(xiàn):每個(gè) Sub Agent 在啟動(dòng)時(shí)創(chuàng)建與其業(yè)務(wù)職責(zé)對應(yīng)的 Topic,并將協(xié)議規(guī)范、輸入輸出 Schema、自然語言描述等元數(shù)據(jù)注冊至 NameServer。通過持續(xù)訂閱該 Topic 接收任務(wù),Sub Agent 不僅完成了通信接入,更實(shí)現(xiàn)了能力的主動(dòng)暴露。
2. 語義驅(qū)動(dòng)的任務(wù)編排:Supervisor Agent 基于用戶目標(biāo)構(gòu)建 Prompt 上下文,并動(dòng)態(tài)查詢 NameServer 獲取當(dāng)前可用的 Topic 列表,將其作為“可調(diào)用函數(shù)庫”注入大模型。LLM 由此可在真實(shí)、可觀測的能力空間中進(jìn)行任務(wù)拆解與路徑規(guī)劃,避免了“幻覺式?jīng)Q策”,提升了執(zhí)行的可行性與可控性。
3. 輕量級(jí)異步任務(wù)分發(fā)與反饋:在執(zhí)行階段,Supervisor 向目標(biāo) Topic 發(fā)送消息,同時(shí)為本次調(diào)用創(chuàng)建一個(gè)臨時(shí)的 Lite-Topic 作為專屬回調(diào)通道。該機(jī)制無需綁定具體實(shí)例,即可實(shí)現(xiàn)高并發(fā)下的結(jié)果路由,兼顧性能與靈活性。
4. 閉環(huán)驅(qū)動(dòng)的持續(xù)決策:Supervisor 訂閱相關(guān) Lite-Topic,異步聚合各子任務(wù)的執(zhí)行結(jié)果,重新注入上下文,驅(qū)動(dòng)下一輪推理與編排。整個(gè)過程形成一個(gè)以反饋為核心、動(dòng)態(tài)演進(jìn)的決策循環(huán),真正實(shí)現(xiàn)了從“靜態(tài)流程”到“自主協(xié)作”的躍遷。
![]()
這套依托 RocketMQ 在發(fā)布/訂閱模型上的創(chuàng)新擴(kuò)展而實(shí)現(xiàn)的架構(gòu),在保證系統(tǒng)松耦合和高擴(kuò)展性的同時(shí),有效支持了 Multi-Agent 場景下任務(wù)編排、結(jié)果反饋和多輪決策的需求。基于 RocketMQ 的這一實(shí)踐,為構(gòu)建可靠、可控的異步智能體協(xié)作系統(tǒng)提供了一種可行的技術(shù)路徑。
特別聲明:以上內(nèi)容(如有圖片或視頻亦包括在內(nèi))為自媒體平臺(tái)“網(wǎng)易號(hào)”用戶上傳并發(fā)布,本平臺(tái)僅提供信息存儲(chǔ)服務(wù)。
Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.